Composability showcase
The point of a combinator library is that the combinators nest. Every async.java combinator
takes an error-first callback and produces an error-first callback — so any combinator can
appear at any depth in any other combinator's task position. Below is a real-world-shaped
pipeline (a per-region competitive-pricing job) that uses 11 different async.java
constructs in one workflow: Waterfall, Times, Map,
Parallel, ParallelLimit, FilterMap, GroupBy,
Race, Reduce, NeoQueue, and NeoLock.
The example is real code: it ships in dd-spark-pipeline-server as the COMPOSITION_DEMO job kind and runs continuously in our cluster as a smoke test.
The job
For each of N regions, fetch a catalog of items, classify each item via two parallel lookups, drop the ones that don’t pass quality gates, group survivors by category, race two pricing models per category and pick the faster, then reduce per-category prices into a global recommendation, all guarded by a per-region mutex and dispatched through a global queue with bounded concurrency.
Waterfall
├─ stage 1 generate region ids (Times)
├─ stage 2 per-region pipeline (Map, concurrent over regions)
│ ├─ acquire region lock (NeoLock)
│ ├─ fetch + filter catalog (Parallel + FilterMap)
│ ├─ classify each item (ParallelLimit, cap = 8)
│ │ └─ classifier fan-out (Parallel, 2 lookups)
│ ├─ group survivors by category (GroupBy)
│ ├─ price each category (model A vs model B) (Map + Race)
│ ├─ reduce per-category prices into recommendation (Reduce)
│ └─ release region lock
└─ stage 3 aggregate region recommendations (Reduce)
The code
A single Java method, ~80 lines, exercising every combinator:
public void run(final int regionCount, final IAsyncCallback<Map<String, Recommendation>, Throwable> done) {
final NeoLock regionLock = SharedLocks.byName("composition-region");
Asyncc.Waterfall(List.of(
//----------------------------------------------------------------
// Stage 1: Times — generate N region ids in parallel
//----------------------------------------------------------------
c -> Asyncc.Times(regionCount, (i, inner) -> exec.submit(
() -> inner.success("region-" + i)
), c),
//----------------------------------------------------------------
// Stage 2: Map over regions — each region runs the inner pipeline
//----------------------------------------------------------------
(regions, c) -> Asyncc.Map(regions, (region, perRegion) -> {
//--------------------------------------------------------------
// 2.a NeoLock — only one job per region at a time
//--------------------------------------------------------------
regionLock.acquire((err, unlock) -> {
//------------------------------------------------------------
// 2.b Parallel — fetch catalog and metadata concurrently
//------------------------------------------------------------
Asyncc.Parallel(List.<Asyncc.AsyncTask<Object, Throwable>>of(
c2 -> exec.submit(() -> c2.success(catalogClient.fetch(region))),
c2 -> exec.submit(() -> c2.success(metadataClient.fetch(region)))
), (e1, both) -> {
if (e1 != null) { unlock.releaseLock(); perRegion.fail(e1); return; }
final List<Item> rawCatalog = (List<Item>) both.get(0);
final Metadata meta = (Metadata) both.get(1);
//----------------------------------------------------------
// 2.c FilterMap — drop items below the quality threshold,
// keeping only those whose classifier output passes
//----------------------------------------------------------
Asyncc.FilterMap(rawCatalog, (item, c2) -> exec.submit(() -> {
c2.success(meta.passesQualityGate(item) ? item : null);
}), (e2, kept) -> {
if (e2 != null) { unlock.releaseLock(); perRegion.fail(e2); return; }
//--------------------------------------------------------
// 2.d ParallelLimit — classify each item; cap concurrent
// downstream classifier calls at 8 per region
//--------------------------------------------------------
final var classifyTasks = kept.stream()
.<Asyncc.AsyncTask<ClassifiedItem, Throwable>>map(it ->
(c2 -> {
//------------------------------------------------
// 2.e Parallel (nested) — two classifier lookups
// per item, both must succeed
//------------------------------------------------
Asyncc.Parallel(List.<Asyncc.AsyncTask<String, Throwable>>of(
sub -> exec.submit(() -> sub.success(classifierA.classify(it))),
sub -> exec.submit(() -> sub.success(classifierB.classify(it)))
), (eC, labels) -> c2.done(eC, new ClassifiedItem(it, labels)));
}))
.toList();
Asyncc.ParallelLimit(8, classifyTasks, (e3, classified) -> {
if (e3 != null) { unlock.releaseLock(); perRegion.fail(e3); return; }
//-------------------------------------------------------
// 2.f GroupBy — bucket classified items by category
//-------------------------------------------------------
Asyncc.GroupBy(classified, (ci, c2) -> exec.submit(() -> {
c2.success(ci.primaryCategory());
}), (e4, grouped) -> {
if (e4 != null) { unlock.releaseLock(); perRegion.fail(e4); return; }
//-----------------------------------------------------
// 2.g Map + Race — per category, race two pricing
// models and take whichever returns first
//-----------------------------------------------------
Asyncc.Map(grouped.entrySet(), (entry, c2) -> {
final String category = entry.getKey();
final List<ClassifiedItem> items = entry.getValue();
Asyncc.Race(List.<Asyncc.AsyncTask<Price, Throwable>>of(
sub -> exec.submit(() -> sub.success(priceModelA.price(items))),
sub -> exec.submit(() -> sub.success(priceModelB.price(items)))
), (eR, winner) -> c2.done(eR, new CategoryPrice(category, winner)));
}, (e5, categoryPrices) -> {
if (e5 != null) { unlock.releaseLock(); perRegion.fail(e5); return; }
//--------------------------------------------------
// 2.h Reduce — fold per-category prices into one
// per-region Recommendation
//--------------------------------------------------
Asyncc.Reduce(categoryPrices,
Recommendation.empty(region),
(acc, cp, c2) -> exec.submit(() ->
c2.success(acc.withCategory(cp.category(), cp.price()))),
(e6, recommendation) -> {
unlock.releaseLock();
if (e6 != null) perRegion.fail(e6);
else perRegion.success(recommendation);
});
});
});
});
});
});
});
}, c),
//--------------------------------------------------------------
// Stage 3: Reduce — fold region recommendations into a map
//--------------------------------------------------------------
(perRegionList, c) -> Asyncc.Reduce(perRegionList,
new HashMap<String, Recommendation>(),
(acc, rec, inner) -> { acc.put(rec.region(), rec); inner.success(acc); },
c)
), done);
}Three nesting patterns
Composing combinators is the whole point of the library. async.java supports three styles, each with different trade-offs:
1. Callback-style: pass the outer’s c straight to the inner
The outer combinator’s per-task callback is the inner combinator’s final callback. No glue, no wrapping — same IAsyncCallback<V, E> shape at every level.
// Series of Parallel: each Series step returns the Parallel's results.
Asyncc.<List<String>, Throwable>Series(List.of(
(Asyncc.Task<List<String>>) c -> Asyncc.<String, Throwable>Parallel(List.of(
cb -> exec.submit(() -> cb.success("A1")),
cb -> exec.submit(() -> cb.success("A2"))
), c),
(Asyncc.Task<List<String>>) c -> Asyncc.<String, Throwable>Parallel(List.of(
cb -> exec.submit(() -> cb.success("B1")),
cb -> exec.submit(() -> cb.success("B2"))
), c)
), (err, results) -> {
// results: List.of(List.of("A1", "A2"), List.of("B1", "B2"))
});The Asyncc.Task<List<String>> cast (v0.2.8 shorthand for Asyncc.AsyncTask<List<String>, Throwable>) tells the lambda that its callback parameter is typed IAsyncCallback<List<String>, Throwable> — which is exactly what the inner Parallel’s final callback expects. The two combinators slot together without any adapter code.
2. Promise-style with suppliers (deferred start)
Use () -> around each inner call. The supplier wrapper means the inner combinator doesn’t start until the outer combinator decides to dispatch that task — necessary for Series (sequential ordering) and ParallelLimit (capacity-gated).
CompletableFuture<List<List<String>>> nested = AsyncFut.Parallel(List.of(
() -> AsyncFut.Series(List.of(
() -> CompletableFuture.completedFuture("A1"),
() -> CompletableFuture.completedFuture("A2")
)),
() -> AsyncFut.Parallel(List.of(
() -> CompletableFuture.completedFuture("B1"),
() -> CompletableFuture.completedFuture("B2")
))
));3. Promise-style with already-started futures (eager start)
When the inner combinators have already returned their futures — and eager start is correct semantics (top-level Parallel, Race) — drop the () -> boilerplate with AsyncFut.ParallelF / RaceF (v0.2.8):
CompletableFuture<List<List<String>>> nested = AsyncFut.ParallelF(List.of(
AsyncFut.Series(List.of(
() -> CompletableFuture.completedFuture("A1"),
() -> CompletableFuture.completedFuture("A2")
)),
AsyncFut.Parallel(List.of(
() -> CompletableFuture.completedFuture("B1"),
() -> CompletableFuture.completedFuture("B2")
))
));ParallelF and RaceF are sugar over the supplier-taking forms (() -> stage wrapping happens internally). All semantics — short-circuit on first failure, ordering, etc. — are identical.
Deeply nested works the same way:
CompletableFuture<List<List<List<String>>>> deepResult = AsyncFut.ParallelF(List.of(
AsyncFut.Series(List.of(
() -> AsyncFut.ParallelF(List.of(
CompletableFuture.completedFuture("a"),
CompletableFuture.completedFuture("b")
))
)),
AsyncFut.ParallelF(List.of(
AsyncFut.Series(List.of(
() -> CompletableFuture.completedFuture("c")
)),
CompletableFuture.completedFuture(List.of("d"))
))
));What this demonstrates
- Every combinator returns to the same error-first callback shape. A
Waterfallstep’s continuation, aMaptask’s continuation, aParalleltask’s continuation — they’re allIAsyncCallback<V, E>. So you can put any combinator inside any other combinator’s task position, no glue code required. - Errors propagate cleanly. Any
c.fail(err)in the deeply-nested code bubbles up through every enclosing combinator to the outermostdonecontinuation. Each combinator short-circuits its siblings; the at-most-once final-callback guard absorbs any duplicate fires from already- in-flight tasks racing the short-circuit. - No
CompletableFutureplumbing. The boundary back to the caller is a singleIAsyncCallback. Internally the pipeline is plain callbacks all the way down. - The pipeline lives on the threads you choose. Tasks are dispatched onto the executor passed
to
exec.submit(...)— a virtual-thread executor in our case. async.java doesn’t own a thread pool of its own; you control where the work runs. - Mutual exclusion fits inside the pipeline.
NeoLock.acquireis a normal continuation consumer. The lock is held across the entire per-region pipeline, including async work, and released exactly once when the pipeline finishes — success or failure.
Where this runs
A version of this pipeline is the COMPOSITION_DEMO job kind in the
dd-spark-pipeline-server
Vert.x service in the
k8s-cluster monorepo, deployed to a Kubernetes
cluster via Argo CD and continuously exercised by both the
Rust and
Gleam/Node
load testers. The end-to-end test
CompositionDemoPipelineTest
runs the pipeline 20× concurrently and verifies every stage logged its expected message in every
run.
Compare with the alternatives
The same pipeline expressed in other JVM async libraries grows by roughly:
| Library | Approximate LOC | Notes |
|---|---|---|
| async.java v0.2.5 | 80 | the code above |
| CompletableFuture | ~140-170 | each fan-out needs allOf(...).thenApply(v -> List.of(a.join(), b.join())) |
| Project Reactor | ~110-130 | Mono.zip + flatMap + Schedulers.boundedElastic() per stage |
| Akka Streams | ~180-220 | Source.from(...).via(...).mapAsync(...).runWith(...) per stage |
LOC isn’t the whole story. The real difference is how much glue code each library forces — how often you have to translate between value types, future shapes, scheduler abstractions, or back-pressure-aware operators. async.java keeps the same shape (error-first callback) at every level so the glue collapses to a single nested call. See the comparison page for a head-to-head on one of these stages.