Examples
One small example per combinator. All snippets compile against com.github.async-java:async.java:v0.2.9.
Imports are elided for readability.
c (for continuation). Continuations fire via c.success(v), c.fail(e), or the canonical c.done(err, v). The final callback is the error-first callback the combinator takes; wrap it with WrapErrFirst.wrap(...) to skip the if (err != null)... preamble.
Asyncc.Parallel
Fan out N independent tasks. The final callback fires once, with results in the same order as the tasks were submitted.
List<Asyncc.AsyncTask<String, Throwable>> tasks = List.of(
c -> exec.submit(() -> c.success(fetchA())),
c -> exec.submit(() -> c.success(fetchB())),
c -> exec.submit(() -> c.success(fetchC()))
);
Asyncc.Parallel(tasks, (err, results) -> {
if (err != null) { log.error("at least one failed", err); return; }
// results.get(0) is fetchA's value, etc.
});Or with wrap to skip the error-check preamble:
import static org.ores.async.WrapErrFirst.wrap;
Asyncc.Parallel(tasks, wrap(results -> {
reply.send(combine(results.get(0), results.get(1), results.get(2)));
}));Asyncc.ParallelLimit
Parallel with a concurrency cap. Useful when you have many tasks but want to keep in-flight count bounded.
Asyncc.ParallelLimit(8, downloadTasks, wrap(paths -> {
// At most 8 downloads run concurrently. The next task starts as soon
// as one finishes. Errors short-circuit the remaining queue.
publish(paths);
}));Asyncc.Series
Run tasks one after another, collecting each result. Stops at the first error.
Asyncc.Series(List.of(
c -> validate(req, c),
c -> persist(req, c),
c -> notify(req, c)
), (err, results) -> {
if (err != null) return; // first failure stops the chain
// results is a List of each task's value in order.
});Asyncc.Waterfall
Each task receives the previous task's value. A sequential pipeline with typed hand-offs.
Asyncc.Waterfall(List.of(
c -> c.success(parseRequest(raw)), // -> ParsedRequest
(req, c) -> c.success(authorize(req)), // -> AuthorizedRequest
(authd, c) -> c.success(run(authd)) // -> Result
), wrap(finalValue -> {
// finalValue is the Result from the last stage.
}));Asyncc.Race
Fan out tasks; the first to call back wins. The rest are ignored (cancellation is best-effort).
Asyncc.Race(List.of(
c -> exec.submit(() -> c.success(fromPrimary())),
c -> exec.submit(() -> c.success(fromReplica()))
), wrap(winnerValue -> {
// whichever returned first
}));Asyncc.Map
Run an async transform over each element. Results preserve input order even though work runs concurrently.
Asyncc.Map(userIds, (id, c) -> {
exec.submit(() -> c.success(fetchProfile(id)));
}, wrap(profiles -> {
// profiles.get(i) corresponds to userIds.get(i)
}));Asyncc.FilterMap
Async map + async filter in one pass. Tasks that emit null are dropped from the result.
Asyncc.FilterMap(candidateIds, (id, c) -> {
exec.submit(() -> {
var profile = fetchProfile(id);
c.success(profile.isActive() ? profile : null);
});
}, wrap(active -> {
// active contains only the profiles where isActive() was true
}));Asyncc.Reduce
Sequential fold with an async reducer. Each step sees the running accumulator.
Asyncc.Reduce(transactions, BigDecimal.ZERO, (acc, txn, c) -> {
exec.submit(() -> c.success(acc.add(txn.amount())));
}, wrap(total -> {
// total is the final sum
}));Asyncc.Times
Run the same task N times in parallel, collect each result. Useful for "spawn N workers" or "generate N samples".
Asyncc.Times(8, (i, c) -> {
exec.submit(() -> c.success(generateSample(i)));
}, wrap(samples -> {
// samples.size() == 8
}));Asyncc.GroupBy
Apply an async keying function to each element, then group input elements by their key.
Asyncc.GroupBy(users, (user, c) -> {
c.success(user.region());
}, wrap(grouped -> {
// grouped is Map<String, List<User>> — users keyed by region
}));NeoQueue
A bounded async work queue. Push tasks; the queue serialises (or limits) them. Great when you need backpressure without modelling a stream.
NeoQueue<Job, Void> queue = new NeoQueue<>(4); // concurrency = 4
queue.setTaskHandler((task, c) -> {
exec.submit(() -> {
try { processJob(task.getValue()); c.success(null); }
catch (Throwable t) { c.fail(t); }
});
});
queue.saturated(q -> log.warn("queue saturated; in-flight at cap"));
queue.drain(q -> log.info("queue drained"));
incoming.forEach(queue::push);NeoLock
An async mutex. Unlike synchronized it doesn't tie the release to the acquiring thread — useful when the critical section ends inside an async completion handler.
NeoLock lock = new NeoLock("inventory");
lock.acquire((err, unlock) -> {
try {
// critical section — safe to await async work here
mutate(sharedState);
} finally {
unlock.releaseLock();
}
});
// Or use the leak-safe sync helper (v0.2.5+):
lock.withLock(() -> mutate(sharedState));NeoRwLock (v0.2.6+)
Async reader/writer lock. Many concurrent readers, one exclusive writer. FIFO with reader-burst fairness — adjacent queued readers wake up concurrently when the lock becomes free.
NeoRwLock cacheLock = new NeoRwLock("config-cache");
// Reader — concurrent with other readers.
cacheLock.acquireRead((err, unlock) -> {
try {
return cache.get(key);
} finally {
unlock.releaseLock();
}
});
// Writer — exclusive.
cacheLock.acquireWrite((err, unlock) -> {
try {
cache.put(key, value);
} finally {
unlock.releaseLock();
}
});
// Sync helpers — auto-release even if the body throws.
cacheLock.withRead(() -> renderTemplate(cache));
cacheLock.withWrite(() -> cache.refresh());
// Non-blocking attempt — empty if waiters queued (preserves FIFO).
cacheLock.tryAcquireRead().ifPresent(u -> {
try { /* read-only work */ } finally { u.releaseLock(); }
});
// Bounded wait — fires onError with TimeoutException on miss.
cacheLock.acquireWrite(500L, (err, unlock) -> {
if (err instanceof TimeoutException) { backOff(); return; }
// ... exclusive write under the timeout budget
});Composition: nesting combinators
Combinators nest cleanly because they all use the same error-first callback contract. Below: Waterfall wrapping a Map wrapping a Parallel.
Asyncc.Waterfall(List.of(
c -> fetchPage(url, c), // -> String html
(html, c) -> c.success(extractLinks(html)), // -> List<URI>
(links, c) -> Asyncc.Map(links, (link, inner) -> { // -> List<List<String>>
Asyncc.Parallel(List.of(
c2 -> exec.submit(() -> c2.success(headOk(link))),
c2 -> exec.submit(() -> c2.success(classify(link)))
), inner);
}, c)
), wrap(perLinkData -> {
// perLinkData is List<List<String>>, ordered the same as the source links list
}));For a much larger composition — 8+ combinators in one pipeline — see the composability showcase.
WrapFuture (v0.2.7+)
Bidirectional bridge to CompletableFuture. Wrap any combinator call as a promise at the boundary, or wrap a third-party promise as an async.java task.
import static org.ores.async.WrapFuture.toFuture;
import static org.ores.async.WrapFuture.fromStage;
// Return a CompletableFuture to a Spring WebFlux / Akka HTTP / gRPC boundary,
// while using async.java's combinators internally.
public CompletableFuture<String> handle(Request req) {
return toFuture(c ->
Asyncc.<String, Throwable>Parallel(List.of(
cb -> exec.submit(() -> cb.success(fetchA(req))),
cb -> exec.submit(() -> cb.success(fetchB(req)))
), c)
).thenApply(parts -> combine(parts.get(0), parts.get(1)));
}
// Consume third-party CompletionStage-returning APIs inside an async.java combinator.
Asyncc.Parallel(List.of(
fromStage(db.queryAsync("SELECT ...")),
fromStage(redis.getAsync(key)),
fromStage(http.sendAsync(req))
), (err, results) -> { /* ... */ });AsyncFut (v0.2.7+)
Promise-returning sibling to Asyncc. Same combinator vocabulary, but each call returns a CompletableFuture instead of taking a final callback.
// Parallel: fan out N tasks, return a future of their ordered results.
CompletableFuture<List<String>> both = AsyncFut.Parallel(List.of(
() -> CompletableFuture.supplyAsync(this::fetchA, exec),
() -> CompletableFuture.supplyAsync(this::fetchB, exec)
));
// ParallelLimit: bounded concurrency.
CompletableFuture<List<Path>> downloaded = AsyncFut.ParallelLimit(8, downloads);
// Series: sequential.
CompletableFuture<List<Step>> chain = AsyncFut.Series(List.of(
() -> validate(req), () -> persist(req), () -> notify(req)
));
// Race: first completer wins.
CompletableFuture<String> winner = AsyncFut.Race(List.of(
() -> fromPrimary(),
() -> fromReplica()
));
// Map: async transform preserving input order.
CompletableFuture<List<Profile>> profiles =
AsyncFut.Map(userIds, id -> fetchProfileAsync(id));
// Reduce: sequential async fold.
CompletableFuture<BigDecimal> total =
AsyncFut.Reduce(txns, BigDecimal.ZERO, (acc, t) -> computeAsync(acc, t));
// Times: N parallel iterations.
CompletableFuture<List<Sample>> samples =
AsyncFut.Times(8, i -> generateAsync(i));
// Each: per-element fire-and-forget; future completes when all done.
CompletableFuture<Void> sent = AsyncFut.Each(users, u -> sendEmailAsync(u));
// Compose with regular CompletableFuture operators.
AsyncFut.Parallel(taskSuppliers)
.thenApply(parts -> combine(parts))
.thenCompose(combined -> store(combined))
.exceptionally(err -> { log.error("pipeline failed", err); return null; });Looking for full javadoc? See the latest javadoc or the v0.2.7 snapshot, or browse the source.