Rx.NET to async.java, line-for-line

A real Rx.NET pipeline rewritten in async.java, two ways. Includes a side-by-side operator-mapping table and an honest take on when async.java earns its keep over plain CompletableFuture.

The runnable Java reference lives in RxFiveStagesComparison.java and is pinned by RxFiveStagesComparisonTest (4 tests, including happy-path byte-identity and three failure-mode error-frame equivalences).

The Rx.NET original

The F# pipeline (from RxAdvanced.fs):

let private rxFiveStages (inbound: IObservable<string>) : IObservable<string> =
    inbound.SelectMany(fun input ->
        let body : IObservable<string> =
            Observable
                .Return(input)
                .Select(fun s -> parse s)
                .Select(fun n -> validate n)
                .SelectMany(fun validated ->
                    let a =
                        Observable.Start(
                            (fun () -> enrichLookupA validated),
                            TaskPoolScheduler.Default)
                    let b =
                        Observable.Start(
                            (fun () -> enrichLookupB validated),
                            TaskPoolScheduler.Default)
                    Observable.Zip(
                        a, b,
                        fun lookupA lookupB ->
                            struct (validated, lookupA, lookupB)))
                .Select(fun (struct (validated, lookupA, lookupB)) ->
                    score validated lookupA lookupB)
                .Select(fun scored -> serialize scored)
                .Select(fun out -> sprintf "{\"ok\":true,\"result\":%s}" out)
        body.Catch(fun (ex: exn) ->
            Observable.Return(perMessageErrorFrame ex)))

Six .Select calls, one .SelectMany for the fan-out, one .Catch for the error funnel. The inner subgraph is re-materialised per emission via the outer .SelectMany.

Version 1 — callback-style with Asyncc.Waterfall + Asyncc.Parallel

Closest line-for-line equivalent. Each .Select becomes a Waterfall stage. The Observable.Zip(Observable.Start(...), Observable.Start(...)) pair becomes an Asyncc.Parallel(List.of(a, b), ...). The .Catch becomes the Waterfall’s terminal err branch.

public static CompletableFuture<String> runWaterfall(final String inputFrame, final Executor exec) {
    final CompletableFuture<String> outcome = new CompletableFuture<>();
    final List<NeoWaterfallI.AsyncTask<Object, Throwable>> stages = new ArrayList<>();

    // .Select(fun s -> parse s)
    stages.add(c -> {
        try { c.success("parsed", PipelineStages.parse(inputFrame)); }
        catch (Throwable t) { c.fail(t); }
    });

    // .Select(fun n -> validate n)
    stages.add(c -> {
        try { c.success("validated", PipelineStages.validate((JsonNode) c.get("parsed"))); }
        catch (Throwable t) { c.fail(t); }
    });

    // .SelectMany(fun validated -> Zip(Start(enrichA), Start(enrichB)))
    stages.add(c -> {
        final JsonNode validated = (JsonNode) c.get("validated");
        // List<Asyncc.Task<String>> flows directly into Parallel thanks to v0.2.8-rc2's
        // `List<? extends AsyncTask<T, E>>` widening — no cast, no defensive copy.
        final List<Asyncc.Task<String>> lookups = List.of(
            inner -> exec.execute(() -> {
                try { inner.success(PipelineStages.enrichLookupA(validated)); }
                catch (Throwable t) { inner.fail(t); }
            }),
            inner -> exec.execute(() -> {
                try { inner.success(PipelineStages.enrichLookupB(validated)); }
                catch (Throwable t) { inner.fail(t); }
            }));
        Asyncc.<String, Throwable>Parallel(lookups, (err, results) -> {
            if (err != null) { c.fail(err); return; }
            c.success("lookups", new ArrayList<>(results));
        });
    });

    // .Select(fun (v, a, b) -> score v a b)
    stages.add(c -> {
        try {
            final JsonNode validated = (JsonNode) c.get("validated");
            @SuppressWarnings("unchecked")
            final List<String> lookups = (List<String>) c.get("lookups");
            c.success("scored", PipelineStages.score(validated, lookups.get(0), lookups.get(1)));
        } catch (Throwable t) { c.fail(t); }
    });

    // .Select(fun scored -> serialize scored)
    stages.add(c -> {
        try { c.success("serialized", PipelineStages.serialize((JsonNode) c.get("scored"))); }
        catch (Throwable t) { c.fail(t); }
    });

    // .Select(fun out -> sprintf "{\"ok\":true,\"result\":%s}" out)
    stages.add(c ->
        c.success("envelope", "{\"ok\":true,\"result\":" + c.get("serialized") + "}"));

    // body.Catch(fun ex -> Observable.Return(perMessageErrorFrame ex))
    Asyncc.Waterfall(stages, (err, all) -> {
        if (err != null) { outcome.complete(perMessageErrorFrame(err)); return; }
        outcome.complete((String) all.get("envelope"));
    });
    return outcome;
}

Version 2 — promise-style with AsyncFut.ParallelF

AsyncFut.ParallelF (v0.2.8-rc3+) accepts already-started CompletionStages, so Observable.Start(fn, TaskPool) maps directly to CompletableFuture.supplyAsync(() -> fn(), exec) and Observable.Zip(a, b) maps to AsyncFut.ParallelF(List.of(a, b)).

public static CompletableFuture<String> runWithAsyncFut(final String inputFrame, final Executor exec) {
    return CompletableFuture
        .completedFuture(inputFrame)
        .thenApply(s -> {                                                 // .Select(parse)
            try { return PipelineStages.parse(s); }
            catch (Exception e) { throw new RuntimeException(e); }
        })
        .thenApply(PipelineStages::validate)                              // .Select(validate)
        .thenCompose(validated ->                                         // .SelectMany + .Zip
            AsyncFut.<String>ParallelF(List.<CompletionStage<String>>of(
                CompletableFuture.supplyAsync(() -> PipelineStages.enrichLookupA(validated), exec),
                CompletableFuture.supplyAsync(() -> PipelineStages.enrichLookupB(validated), exec)
            )).thenApply(lookups -> new Object[] { validated, lookups.get(0), lookups.get(1) }))
        .thenApply(t ->                                                   // .Select(score)
            PipelineStages.score((JsonNode) t[0], (String) t[1], (String) t[2]))
        .thenApply(scored -> {                                            // .Select(serialize)
            try { return PipelineStages.serialize(scored); }
            catch (Exception e) { throw new RuntimeException(e); }
        })
        .thenApply(s -> "{\"ok\":true,\"result\":" + s + "}")             // .Select(envelope)
        .exceptionally(RxFiveStagesComparison::perMessageErrorFrame);     // .Catch
}

Operator mapping

The same five-stage pipeline expressed in four orchestrators: F# Rx, async.java callbacks, async.java promises, and Akka Streams. The Akka Streams column tracks the production AkkaStreamsPipeline.java shape that lives next to the async.java reference in akka-ws-server.

Rx (F#) async.java callback (Waterfall) async.java promise (AsyncFut) Akka Streams
inbound.SelectMany(fun input -> ...) one runWaterfall call per inbound frame one runWithAsyncFut call per inbound frame Source.single(input).via(flow).runWith(Sink.head(), system) (per-message materialisation)
Observable.Return(input).Select(f) Waterfall stage publishing c.success(k, f(x)) .thenApply(f) Flow.<T>create().map(f)
Observable.Start(fn, TaskPool) exec.execute(() -> try inner.success(fn()) catch inner.fail(t)) CompletableFuture.supplyAsync(() -> fn(), exec) CompletableFuture.supplyAsync(() -> fn(), system.executionContext()) inside a mapAsync stage
Observable.Zip(a, b, combiner) Asyncc.Parallel(List.of(a, b), (err, results) -> combiner(results.get(0), results.get(1))) AsyncFut.ParallelF(List.of(a, b)).thenApply(combiner) .mapAsync(2, x -> aFut.thenCombine(bFut, combiner))or a Broadcast + Zip subgraph
body.Catch(fun ex -> errorFrame ex) Waterfall's terminal if (err != null) emitErrorFrame(err) branch .exceptionally(errorFrame) .recover(ex -> errorFrame(ex)) Flow stage (or a Supervision.Strategy.resumingDecider on the materialiser)
Backpressure not native — pair with NeoQueue for submission-side cap not native — same: NeoQueue or Semaphore-style permits structural — demand signal propagates upstream from the sink; buffer(n, overflowStrategy) tunes it per stage
Per-pipeline overhead ~50 µs (heap allocs + atomic counter increments + lambda dispatch) ~50 µs + one CompletableFuture per stage ~200–400 µs per materialisation — see the load-curve breakdown for the source of the gap

Honest take: when async.java earns its keep over plain CompletableFuture

For this exact pipeline (linear flow, 2-way fan-out, single error sink) the most idiomatic Java answer is probably plain CompletableFuture with .thenCombine(...) instead of AsyncFut.ParallelF. 2-way fan-out doesn’t need a List-based combinator. The F# Rx version is doing the same thing with extra ceremony, and writing the AsyncFut equivalent of ceremony for ceremony’s sake doesn’t pay off.

async.java’s value-add appears when one or more of these enters the picture:

  1. N parallel tasks, not 2. Asyncc.Parallel(List.of(t1, ..., tN)) or ParallelLimit(k, List.of(...)) is a one-liner for arbitrary fan-out width. With thenCombine you’d be writing a combine-cascade.

  2. Bounded concurrency. ParallelLimit(4, tasks, ...) caps in-flight work. Rx has Merge(maxConcurrent: 4); plain CompletableFuture has no equivalent without writing your own semaphore. async.java’s ParallelLimit is hardened by 100-iteration regression tests (v0.2.5 fixed an off-by-one in RunTasksLimit).

  3. Backpressure. NeoQueue is a bounded async work queue with saturated / unsaturated / drain lifecycle hooks — the equivalent of Rx’s Buffer / Throttle / Window for “accept submissions but cap concurrent execution”.

  4. Shared-state coordination across pipelines. NeoLock and NeoRwLock (v0.2.6) for cross-frame mutual exclusion. Rx has no native equivalent — you’d reach for ReentrantLock and live with the pinning.

  5. Composability of more exotic flow. Whilst, FilterMap, GroupBy, Reduce, Race, Inject — Rx has these too, but async.java’s callback contract is uniform across all of them, so they nest without conversion adapters. See the composability showcase for an 11-combinator pipeline.

Tests pinning the equivalence

The runnable Java reference is pinned by RxFiveStagesComparisonTest.java:

Run them with:

mvn -pl remote/akka-ws-server test -Dtest=RxFiveStagesComparisonTest