Class NeoQueue<T,V>

java.lang.Object
org.ores.async.NeoQueue<T,V>
Type Parameters:
T - task input type
V - task result type

public class NeoQueue<T,V> extends Object
A bounded async work queue.

Hand the queue a task handler; push tasks; the queue dispatches them via the shared executor, respecting a concurrency cap. Callers can subscribe to saturated, unsaturated, and drain lifecycle events for back-pressure modelling.

Why a queue (and not just Asyncc.ParallelLimit(int, java.util.Map<java.lang.Object, org.ores.async.Asyncc.AsyncTask<T, E>>, org.ores.async.Asyncc.IAsyncCallback<java.util.Map<java.lang.Object, T>, E>))?

ParallelLimit takes a fixed list of tasks. A NeoQueue accepts streaming arrivals — you push tasks as they show up and the queue keeps the concurrency cap honoured. Useful for incoming WS frames, paged downstream calls, file watcher events.

Usage

The handler's continuation parameter is conventionally named c (for continuation). Fire it via c.success(value), c.fail(error), or the canonical c.done(err, value).

   NeoQueue<JobSpec, JobResult> queue = new NeoQueue<>(4); // concurrency = 4

   queue.setTaskHandler((task, c) -> {
       try {
           c.success(processJob(task.getValue()));
       } catch (Throwable t) {
           c.fail(t);
       }
   });

   queue.saturated((q) -> log.warn("queue saturated; in-flight at cap"));
   queue.drain((q) -> log.info("queue drained"));

   incoming.forEach(spec -> queue.push(spec));
 

With virtual threads

The queue ships with a daemon-threaded default executor. For production deployments on JDK 21+, swap it for a VT executor once at startup:

   NeoQueue.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
 

Lifecycle events

Three lifecycle hooks (onSaturated, onUnsaturated, onDrain). The semantics, pinned by NeoQueueLifecycleTest as of v0.2.5:

  • onSaturated — fires when an incrementStarted causes in-flight to reach the concurrency cap. Gated by an internal flag so it fires once per saturation event, not on every dispatch while at cap.
  • onUnsaturated — fires at task-completion time when the pending queue is empty AND we were saturated. Pairs with onSaturated to bracket "the queue is backlogged" intervals: it does not fire every time in-flight drops below the cap if there is still work pending. Use it as a "no longer backlogged" signal, not as a "below cap right now" signal.
  • onDrain — fires at task-completion time when there is no pending work AND all started tasks have finished. Gated so it fires exactly once per drain transition. A subsequent push resets the gate.

Concurrency cap invariant

If the queue is constructed with concurrency = N, no more than N task handlers will be in flight at any instant. Pinned by NeoQueueConcurrencyTest across burst push (200 tasks), trickle push (1 ms apart), and repeated runs.