Rx.NET to async.java, line-for-line
A real Rx.NET pipeline rewritten in async.java, two ways. Includes a side-by-side
operator-mapping table and an honest take on when async.java earns its keep over plain
CompletableFuture.
The runnable Java reference lives in
RxFiveStagesComparison.java
and is pinned by
RxFiveStagesComparisonTest
(4 tests, including happy-path byte-identity and three failure-mode error-frame equivalences).
The Rx.NET original
The F# pipeline (from
RxAdvanced.fs):
let private rxFiveStages (inbound: IObservable<string>) : IObservable<string> =
inbound.SelectMany(fun input ->
let body : IObservable<string> =
Observable
.Return(input)
.Select(fun s -> parse s)
.Select(fun n -> validate n)
.SelectMany(fun validated ->
let a =
Observable.Start(
(fun () -> enrichLookupA validated),
TaskPoolScheduler.Default)
let b =
Observable.Start(
(fun () -> enrichLookupB validated),
TaskPoolScheduler.Default)
Observable.Zip(
a, b,
fun lookupA lookupB ->
struct (validated, lookupA, lookupB)))
.Select(fun (struct (validated, lookupA, lookupB)) ->
score validated lookupA lookupB)
.Select(fun scored -> serialize scored)
.Select(fun out -> sprintf "{\"ok\":true,\"result\":%s}" out)
body.Catch(fun (ex: exn) ->
Observable.Return(perMessageErrorFrame ex)))Six .Select calls, one .SelectMany for the fan-out, one .Catch for the error funnel.
The inner subgraph is re-materialised per emission via the outer .SelectMany.
Version 1 — callback-style with Asyncc.Waterfall + Asyncc.Parallel
Closest line-for-line equivalent. Each .Select becomes a Waterfall stage. The
Observable.Zip(Observable.Start(...), Observable.Start(...)) pair becomes an
Asyncc.Parallel(List.of(a, b), ...). The .Catch becomes the Waterfall’s terminal err
branch.
public static CompletableFuture<String> runWaterfall(final String inputFrame, final Executor exec) {
final CompletableFuture<String> outcome = new CompletableFuture<>();
final List<NeoWaterfallI.AsyncTask<Object, Throwable>> stages = new ArrayList<>();
// .Select(fun s -> parse s)
stages.add(c -> {
try { c.success("parsed", PipelineStages.parse(inputFrame)); }
catch (Throwable t) { c.fail(t); }
});
// .Select(fun n -> validate n)
stages.add(c -> {
try { c.success("validated", PipelineStages.validate((JsonNode) c.get("parsed"))); }
catch (Throwable t) { c.fail(t); }
});
// .SelectMany(fun validated -> Zip(Start(enrichA), Start(enrichB)))
stages.add(c -> {
final JsonNode validated = (JsonNode) c.get("validated");
// List<Asyncc.Task<String>> flows directly into Parallel thanks to v0.2.8-rc2's
// `List<? extends AsyncTask<T, E>>` widening — no cast, no defensive copy.
final List<Asyncc.Task<String>> lookups = List.of(
inner -> exec.execute(() -> {
try { inner.success(PipelineStages.enrichLookupA(validated)); }
catch (Throwable t) { inner.fail(t); }
}),
inner -> exec.execute(() -> {
try { inner.success(PipelineStages.enrichLookupB(validated)); }
catch (Throwable t) { inner.fail(t); }
}));
Asyncc.<String, Throwable>Parallel(lookups, (err, results) -> {
if (err != null) { c.fail(err); return; }
c.success("lookups", new ArrayList<>(results));
});
});
// .Select(fun (v, a, b) -> score v a b)
stages.add(c -> {
try {
final JsonNode validated = (JsonNode) c.get("validated");
@SuppressWarnings("unchecked")
final List<String> lookups = (List<String>) c.get("lookups");
c.success("scored", PipelineStages.score(validated, lookups.get(0), lookups.get(1)));
} catch (Throwable t) { c.fail(t); }
});
// .Select(fun scored -> serialize scored)
stages.add(c -> {
try { c.success("serialized", PipelineStages.serialize((JsonNode) c.get("scored"))); }
catch (Throwable t) { c.fail(t); }
});
// .Select(fun out -> sprintf "{\"ok\":true,\"result\":%s}" out)
stages.add(c ->
c.success("envelope", "{\"ok\":true,\"result\":" + c.get("serialized") + "}"));
// body.Catch(fun ex -> Observable.Return(perMessageErrorFrame ex))
Asyncc.Waterfall(stages, (err, all) -> {
if (err != null) { outcome.complete(perMessageErrorFrame(err)); return; }
outcome.complete((String) all.get("envelope"));
});
return outcome;
}Version 2 — promise-style with AsyncFut.ParallelF
AsyncFut.ParallelF (v0.2.8-rc3+) accepts already-started CompletionStages, so
Observable.Start(fn, TaskPool) maps directly to CompletableFuture.supplyAsync(() -> fn(), exec)
and Observable.Zip(a, b) maps to AsyncFut.ParallelF(List.of(a, b)).
public static CompletableFuture<String> runWithAsyncFut(final String inputFrame, final Executor exec) {
return CompletableFuture
.completedFuture(inputFrame)
.thenApply(s -> { // .Select(parse)
try { return PipelineStages.parse(s); }
catch (Exception e) { throw new RuntimeException(e); }
})
.thenApply(PipelineStages::validate) // .Select(validate)
.thenCompose(validated -> // .SelectMany + .Zip
AsyncFut.<String>ParallelF(List.<CompletionStage<String>>of(
CompletableFuture.supplyAsync(() -> PipelineStages.enrichLookupA(validated), exec),
CompletableFuture.supplyAsync(() -> PipelineStages.enrichLookupB(validated), exec)
)).thenApply(lookups -> new Object[] { validated, lookups.get(0), lookups.get(1) }))
.thenApply(t -> // .Select(score)
PipelineStages.score((JsonNode) t[0], (String) t[1], (String) t[2]))
.thenApply(scored -> { // .Select(serialize)
try { return PipelineStages.serialize(scored); }
catch (Exception e) { throw new RuntimeException(e); }
})
.thenApply(s -> "{\"ok\":true,\"result\":" + s + "}") // .Select(envelope)
.exceptionally(RxFiveStagesComparison::perMessageErrorFrame); // .Catch
}Operator mapping
The same five-stage pipeline expressed in four orchestrators: F# Rx, async.java callbacks,
async.java promises, and Akka Streams. The Akka Streams column tracks the production
AkkaStreamsPipeline.java
shape that lives next to the async.java reference in akka-ws-server.
| Rx (F#) | async.java callback (Waterfall) | async.java promise (AsyncFut) | Akka Streams |
|---|---|---|---|
inbound.SelectMany(fun input -> ...) |
one runWaterfall call per inbound frame |
one runWithAsyncFut call per inbound frame |
Source.single(input).via(flow).runWith(Sink.head(), system) (per-message materialisation) |
Observable.Return(input).Select(f) |
Waterfall stage publishing c.success(k, f(x)) |
.thenApply(f) |
Flow.<T>create().map(f) |
Observable.Start(fn, TaskPool) |
exec.execute(() -> try inner.success(fn()) catch inner.fail(t)) |
CompletableFuture.supplyAsync(() -> fn(), exec) |
CompletableFuture.supplyAsync(() -> fn(), system.executionContext()) inside a mapAsync stage |
Observable.Zip(a, b, combiner) |
Asyncc.Parallel(List.of(a, b), (err, results) -> combiner(results.get(0), results.get(1))) |
AsyncFut.ParallelF(List.of(a, b)).thenApply(combiner) |
.mapAsync(2, x -> aFut.thenCombine(bFut, combiner)) — or a Broadcast + Zip subgraph |
body.Catch(fun ex -> errorFrame ex) |
Waterfall's terminal if (err != null) emitErrorFrame(err) branch |
.exceptionally(errorFrame) |
.recover(ex -> errorFrame(ex)) Flow stage (or a Supervision.Strategy.resumingDecider on the materialiser) |
| Backpressure | not native — pair with NeoQueue for submission-side cap |
not native — same: NeoQueue or Semaphore-style permits |
structural — demand signal propagates upstream from the sink; buffer(n, overflowStrategy) tunes it per stage |
| Per-pipeline overhead | ~50 µs (heap allocs + atomic counter increments + lambda dispatch) | ~50 µs + one CompletableFuture per stage |
~200–400 µs per materialisation — see the load-curve breakdown for the source of the gap |
Honest take: when async.java earns its keep over plain CompletableFuture
For this exact pipeline (linear flow, 2-way fan-out, single error sink) the most
idiomatic Java answer is probably plain CompletableFuture with .thenCombine(...) instead
of AsyncFut.ParallelF. 2-way fan-out doesn’t need a List-based combinator. The F# Rx
version is doing the same thing with extra ceremony, and writing the AsyncFut equivalent of
ceremony for ceremony’s sake doesn’t pay off.
async.java’s value-add appears when one or more of these enters the picture:
-
N parallel tasks, not 2.
Asyncc.Parallel(List.of(t1, ..., tN))orParallelLimit(k, List.of(...))is a one-liner for arbitrary fan-out width. WiththenCombineyou’d be writing acombine-cascade. -
Bounded concurrency.
ParallelLimit(4, tasks, ...)caps in-flight work. Rx hasMerge(maxConcurrent: 4); plainCompletableFuturehas no equivalent without writing your own semaphore. async.java’sParallelLimitis hardened by 100-iteration regression tests (v0.2.5 fixed an off-by-one inRunTasksLimit). -
Backpressure.
NeoQueueis a bounded async work queue withsaturated/unsaturated/drainlifecycle hooks — the equivalent of Rx’sBuffer/Throttle/Windowfor “accept submissions but cap concurrent execution”. -
Shared-state coordination across pipelines.
NeoLockandNeoRwLock(v0.2.6) for cross-frame mutual exclusion. Rx has no native equivalent — you’d reach forReentrantLockand live with the pinning. -
Composability of more exotic flow.
Whilst,FilterMap,GroupBy,Reduce,Race,Inject— Rx has these too, but async.java’s callback contract is uniform across all of them, so they nest without conversion adapters. See the composability showcase for an 11-combinator pipeline.
Tests pinning the equivalence
The runnable Java reference is pinned by
RxFiveStagesComparisonTest.java:
- Happy path: Waterfall and AsyncFut versions produce byte-identical output for the same input.
- Parse failure (
"not json"): both emit{"ok":false,"error":"JsonParseException: ..."}on the success channel of the returned future. - Validation failure (missing
id): both emit anIllegalArgumentException-typed error frame. - Score failure (
id="poison"): both emit anIllegalStateException-typed error frame, proving the error funnel works downstream of the fan-out — same observable behavior as Rx’sbody.Catchat the outer subgraph boundary.
Run them with:
mvn -pl remote/akka-ws-server test -Dtest=RxFiveStagesComparisonTest