Package org.ores.async
async.java — callback-based async control flow for the JVM
async.java is a Java port of the Node.js
async library. It provides a small set of
combinators — Parallel, Series, Waterfall, Race,
Map, Reduce, Each, Times, FilterMap, GroupBy,
Concat, Inject, Whilst, DoWhilst — plus a coordination
pair, NeoQueue (bounded async work queue) and
NeoLock (async mutex).
All combinators share a single shape:
Asyncc.<combinator>(tasks, finalCallback);where
finalCallback is an error-first callback (err, value) -> .... The
library guarantees that the final callback fires at most once, regardless of
how many tasks succeed, fail, double-fire, or throw synchronously.
The c convention
Code examples in this library consistently name the continuation parameter c, short
for continuation. The continuation receives the result of an async step and is what
happens next — either the final callback of the combinator, or the next inner step.
Existing code using cb or any other name works unchanged; this is purely a
documentation convention.
Each continuation can be fired three ways:
c.done(null, value); // canonical error-first form c.success(value); // shorthand for done(null, value) — v0.2.4+ c.fail(err); // shorthand for done(err, null) — v0.2.4+
30-second example
var exec = Executors.newVirtualThreadPerTaskExecutor();
var tasks = List.<Asyncc.AsyncTask<String, Throwable>>of(
c -> exec.submit(() -> c.success(fetchA())),
c -> exec.submit(() -> c.success(fetchB())),
c -> exec.submit(() -> c.success(fetchC()))
);
Asyncc.Parallel(tasks, (err, results) -> {
if (err != null) { log.error("at least one failed", err); return; }
reply.send(combine(results.get(0), results.get(1), results.get(2)));
});
results preserves the order of the input list. The callback fires once when the
last task completes — or once with err set the first time any task fails (other
tasks are short-circuited via ShortCircuit).
Skipping the error-check boilerplate
For call sites that do not want to handle the error inline, wrap a value-only consumer with
wrap(java.util.function.Consumer):
import static org.ores.async.WrapErrFirst.wrap;
Asyncc.Parallel(tasks, wrap(results -> {
reply.send(combine(results.get(0), results.get(1), results.get(2)));
}));
The single-arg form throws on any unhandled error (carrying the original Throwable
as the cause when available). The two-arg form wrap(onSuccess, onError) keeps both
branches explicit.
Composition
Combinators nest because they all use the same callback contract. A Waterfall wrapping a Map wrapping a Parallel is a perfectly normal pipeline:
Asyncc.Waterfall(List.of(
c -> fetchPage(url, c),
(html, c) -> c.success(extractLinks(html)),
(links, c) -> Asyncc.Map(links, (link, inner) -> {
Asyncc.Parallel(List.of(
c2 -> exec.submit(() -> c2.success(headOk(link))),
c2 -> exec.submit(() -> c2.success(classify(link)))
), (err, pair) -> inner.done(err, pair));
}, c)
), (err, perLinkData) -> {
// ...consume the per-link classification results...
});
See async-java.github.io/examples for larger composition examples.
Project Loom
async.java does not own a thread pool. Pass a virtual-thread executor and every fan-out spawns on a virtual thread:
var vt = Executors.newVirtualThreadPerTaskExecutor(); NeoQueue.setExecutor(vt); // optional: NeoQueue callbacks via VTs // Tasks submit directly to `vt` themselves.
On JDK 21+, virtual-thread spawn is ~250 ns and blocking I/O inside a task is a continuation park (not a kernel thread block). Per-call coordination overhead stays under 50 µs.
Concurrency contract
Under concurrent task completion, the library guarantees:
- at-most-once final callback — dedup-guarded via
NeoUtils.fireFinalCallback(org.ores.async.ShortCircuit, java.lang.Object, org.ores.async.NeoEachI.IEachCallback<E>); - result-slot visibility — per-index slot writes happen-before the atomic counter increment that releases the final callback (fixed in v0.2.2);
- at-most-once short-circuit — the first error wins and other tasks' results are discarded;
- volatile / atomic internal counters — lost-update races have
reproducer tests pinning the fix (
CounterLimitRaceTest).
If you can break any of these in production, it is a bug. File at github.com/async-java/async.java/issues.
-
ClassDescriptionEntry point for async.java's combinators.Asyncc.AsyncCallback<T,
E> Asyncc.AsyncTask<T,E> Error-first callback (Node.js-style).Asyncc.ICallbacks<T,E> Asyncc.IMapper<T,V, E> Asyncc.KeyValue<K,V> NeoEachI.IEacher<T,E> NeoFilterI.IMapper<T,V, E> NeoFilterMapI.IMapper<T,V, E> NeoGeneric<T,V, E> NeoGroupByI.IMapper<T,E> NeoInject.Task<T,E> new BeanTranslator.Builder() .translate( newTranslator<String, Integer>(String.class, Integer.class){ @Override public Integer translate(String instance) { return Integer.valueOf(instance); }}) .build();NeoInjectI.ValueTask<T,E> An async (non-blocking) mutex.NeoQueue<T,V> A bounded async work queue.NeoQueue.Task<T,V> NeoRaceIfc.AsyncTask<T,E> NeoRaceIfc.IMapper<T,V, E> NeoReduceI.IReducer<T,V, E> NeoReduceI.ReduceArg<T,V> NeoTimesI.ITimesr<T,E> NeoWhilstI.AsyncTask<T,E> Token returned to aacquirecallback.Helpers that adapt error-first callbacks into more concise call sites.