Package org.ores.async


package org.ores.async

async.java — callback-based async control flow for the JVM

async.java is a Java port of the Node.js async library. It provides a small set of combinatorsParallel, Series, Waterfall, Race, Map, Reduce, Each, Times, FilterMap, GroupBy, Concat, Inject, Whilst, DoWhilst — plus a coordination pair, NeoQueue (bounded async work queue) and NeoLock (async mutex).

All combinators share a single shape:

   Asyncc.<combinator>(tasks, finalCallback);
 
where finalCallback is an error-first callback (err, value) -&gt; .... The library guarantees that the final callback fires at most once, regardless of how many tasks succeed, fail, double-fire, or throw synchronously.

The c convention

Code examples in this library consistently name the continuation parameter c, short for continuation. The continuation receives the result of an async step and is what happens next — either the final callback of the combinator, or the next inner step. Existing code using cb or any other name works unchanged; this is purely a documentation convention.

Each continuation can be fired three ways:

   c.done(null, value);   // canonical error-first form
   c.success(value);      // shorthand for done(null, value)     — v0.2.4+
   c.fail(err);           // shorthand for done(err, null)       — v0.2.4+
 

30-second example

   var exec = Executors.newVirtualThreadPerTaskExecutor();

   var tasks = List.<Asyncc.AsyncTask<String, Throwable>>of(
       c -> exec.submit(() -> c.success(fetchA())),
       c -> exec.submit(() -> c.success(fetchB())),
       c -> exec.submit(() -> c.success(fetchC()))
   );

   Asyncc.Parallel(tasks, (err, results) -> {
       if (err != null) { log.error("at least one failed", err); return; }
       reply.send(combine(results.get(0), results.get(1), results.get(2)));
   });
 

results preserves the order of the input list. The callback fires once when the last task completes — or once with err set the first time any task fails (other tasks are short-circuited via ShortCircuit).

Skipping the error-check boilerplate

For call sites that do not want to handle the error inline, wrap a value-only consumer with wrap(java.util.function.Consumer):

   import static org.ores.async.WrapErrFirst.wrap;

   Asyncc.Parallel(tasks, wrap(results -> {
       reply.send(combine(results.get(0), results.get(1), results.get(2)));
   }));
 

The single-arg form throws on any unhandled error (carrying the original Throwable as the cause when available). The two-arg form wrap(onSuccess, onError) keeps both branches explicit.

Composition

Combinators nest because they all use the same callback contract. A Waterfall wrapping a Map wrapping a Parallel is a perfectly normal pipeline:

   Asyncc.Waterfall(List.of(
       c -> fetchPage(url, c),
       (html, c) -> c.success(extractLinks(html)),
       (links, c) -> Asyncc.Map(links, (link, inner) -> {
           Asyncc.Parallel(List.of(
               c2 -> exec.submit(() -> c2.success(headOk(link))),
               c2 -> exec.submit(() -> c2.success(classify(link)))
           ), (err, pair) -> inner.done(err, pair));
       }, c)
   ), (err, perLinkData) -> {
       // ...consume the per-link classification results...
   });
 

See async-java.github.io/examples for larger composition examples.

Project Loom

async.java does not own a thread pool. Pass a virtual-thread executor and every fan-out spawns on a virtual thread:

   var vt = Executors.newVirtualThreadPerTaskExecutor();
   NeoQueue.setExecutor(vt);                 // optional: NeoQueue callbacks via VTs
   // Tasks submit directly to `vt` themselves.
 

On JDK 21+, virtual-thread spawn is ~250 ns and blocking I/O inside a task is a continuation park (not a kernel thread block). Per-call coordination overhead stays under 50 µs.

Concurrency contract

Under concurrent task completion, the library guarantees:

  • at-most-once final callback — dedup-guarded via NeoUtils.fireFinalCallback(org.ores.async.ShortCircuit, java.lang.Object, org.ores.async.NeoEachI.IEachCallback<E>);
  • result-slot visibility — per-index slot writes happen-before the atomic counter increment that releases the final callback (fixed in v0.2.2);
  • at-most-once short-circuit — the first error wins and other tasks' results are discarded;
  • volatile / atomic internal counters — lost-update races have reproducer tests pinning the fix (CounterLimitRaceTest).

If you can break any of these in production, it is a bug. File at github.com/async-java/async.java/issues.

See Also: