Examples
One small example per combinator. All snippets compile against com.github.async-java:async.java:v0.2.6.
Imports are elided for readability.
c (for continuation). Continuations fire via c.success(v), c.fail(e), or the canonical c.done(err, v). The final callback is the error-first callback the combinator takes; wrap it with WrapErrFirst.wrap(...) to skip the if (err != null)... preamble.
Asyncc.Parallel
Fan out N independent tasks. The final callback fires once, with results in the same order as the tasks were submitted.
List<Asyncc.AsyncTask<String, Throwable>> tasks = List.of(
c -> exec.submit(() -> c.success(fetchA())),
c -> exec.submit(() -> c.success(fetchB())),
c -> exec.submit(() -> c.success(fetchC()))
);
Asyncc.Parallel(tasks, (err, results) -> {
if (err != null) { log.error("at least one failed", err); return; }
// results.get(0) is fetchA's value, etc.
});Or with wrap to skip the error-check preamble:
import static org.ores.async.WrapErrFirst.wrap;
Asyncc.Parallel(tasks, wrap(results -> {
reply.send(combine(results.get(0), results.get(1), results.get(2)));
}));Asyncc.ParallelLimit
Parallel with a concurrency cap. Useful when you have many tasks but want to keep in-flight count bounded.
Asyncc.ParallelLimit(8, downloadTasks, wrap(paths -> {
// At most 8 downloads run concurrently. The next task starts as soon
// as one finishes. Errors short-circuit the remaining queue.
publish(paths);
}));Asyncc.Series
Run tasks one after another, collecting each result. Stops at the first error.
Asyncc.Series(List.of(
c -> validate(req, c),
c -> persist(req, c),
c -> notify(req, c)
), (err, results) -> {
if (err != null) return; // first failure stops the chain
// results is a List of each task's value in order.
});Asyncc.Waterfall
Each task receives the previous task's value. A sequential pipeline with typed hand-offs.
Asyncc.Waterfall(List.of(
c -> c.success(parseRequest(raw)), // -> ParsedRequest
(req, c) -> c.success(authorize(req)), // -> AuthorizedRequest
(authd, c) -> c.success(run(authd)) // -> Result
), wrap(finalValue -> {
// finalValue is the Result from the last stage.
}));Asyncc.Race
Fan out tasks; the first to call back wins. The rest are ignored (cancellation is best-effort).
Asyncc.Race(List.of(
c -> exec.submit(() -> c.success(fromPrimary())),
c -> exec.submit(() -> c.success(fromReplica()))
), wrap(winnerValue -> {
// whichever returned first
}));Asyncc.Map
Run an async transform over each element. Results preserve input order even though work runs concurrently.
Asyncc.Map(userIds, (id, c) -> {
exec.submit(() -> c.success(fetchProfile(id)));
}, wrap(profiles -> {
// profiles.get(i) corresponds to userIds.get(i)
}));Asyncc.FilterMap
Async map + async filter in one pass. Tasks that emit null are dropped from the result.
Asyncc.FilterMap(candidateIds, (id, c) -> {
exec.submit(() -> {
var profile = fetchProfile(id);
c.success(profile.isActive() ? profile : null);
});
}, wrap(active -> {
// active contains only the profiles where isActive() was true
}));Asyncc.Reduce
Sequential fold with an async reducer. Each step sees the running accumulator.
Asyncc.Reduce(transactions, BigDecimal.ZERO, (acc, txn, c) -> {
exec.submit(() -> c.success(acc.add(txn.amount())));
}, wrap(total -> {
// total is the final sum
}));Asyncc.Times
Run the same task N times in parallel, collect each result. Useful for "spawn N workers" or "generate N samples".
Asyncc.Times(8, (i, c) -> {
exec.submit(() -> c.success(generateSample(i)));
}, wrap(samples -> {
// samples.size() == 8
}));Asyncc.GroupBy
Apply an async keying function to each element, then group input elements by their key.
Asyncc.GroupBy(users, (user, c) -> {
c.success(user.region());
}, wrap(grouped -> {
// grouped is Map<String, List<User>> — users keyed by region
}));NeoQueue
A bounded async work queue. Push tasks; the queue serialises (or limits) them. Great when you need backpressure without modelling a stream.
NeoQueue<Job, Void> queue = new NeoQueue<>(4); // concurrency = 4
queue.setTaskHandler((task, c) -> {
exec.submit(() -> {
try { processJob(task.getValue()); c.success(null); }
catch (Throwable t) { c.fail(t); }
});
});
queue.saturated(q -> log.warn("queue saturated; in-flight at cap"));
queue.drain(q -> log.info("queue drained"));
incoming.forEach(queue::push);NeoLock
An async mutex. Unlike synchronized it doesn't tie the release to the acquiring thread — useful when the critical section ends inside an async completion handler.
NeoLock lock = new NeoLock("inventory");
lock.acquire((err, unlock) -> {
try {
// critical section — safe to await async work here
mutate(sharedState);
} finally {
unlock.releaseLock();
}
});
// Or use the leak-safe sync helper (v0.2.5+):
lock.withLock(() -> mutate(sharedState));NeoRwLock (v0.2.6+)
Async reader/writer lock. Many concurrent readers, one exclusive writer. FIFO with reader-burst fairness — adjacent queued readers wake up concurrently when the lock becomes free.
NeoRwLock cacheLock = new NeoRwLock("config-cache");
// Reader — concurrent with other readers.
cacheLock.acquireRead((err, unlock) -> {
try {
return cache.get(key);
} finally {
unlock.releaseLock();
}
});
// Writer — exclusive.
cacheLock.acquireWrite((err, unlock) -> {
try {
cache.put(key, value);
} finally {
unlock.releaseLock();
}
});
// Sync helpers — auto-release even if the body throws.
cacheLock.withRead(() -> renderTemplate(cache));
cacheLock.withWrite(() -> cache.refresh());
// Non-blocking attempt — empty if waiters queued (preserves FIFO).
cacheLock.tryAcquireRead().ifPresent(u -> {
try { /* read-only work */ } finally { u.releaseLock(); }
});
// Bounded wait — fires onError with TimeoutException on miss.
cacheLock.acquireWrite(500L, (err, unlock) -> {
if (err instanceof TimeoutException) { backOff(); return; }
// ... exclusive write under the timeout budget
});Composition: nesting combinators
Combinators nest cleanly because they all use the same error-first callback contract. Below: Waterfall wrapping a Map wrapping a Parallel.
Asyncc.Waterfall(List.of(
c -> fetchPage(url, c), // -> String html
(html, c) -> c.success(extractLinks(html)), // -> List<URI>
(links, c) -> Asyncc.Map(links, (link, inner) -> { // -> List<List<String>>
Asyncc.Parallel(List.of(
c2 -> exec.submit(() -> c2.success(headOk(link))),
c2 -> exec.submit(() -> c2.success(classify(link)))
), inner);
}, c)
), wrap(perLinkData -> {
// perLinkData is List<List<String>>, ordered the same as the source links list
}));For a much larger composition — 8+ combinators in one pipeline — see the composability showcase.
Looking for full javadoc? See the latest javadoc or the v0.2.4 snapshot, or browse the source.