Class NeoQueue<T,V>
- Type Parameters:
T- task input typeV- task result type
Hand the queue a task handler; push tasks; the queue dispatches them via the shared
executor, respecting a concurrency cap. Callers can subscribe to saturated,
unsaturated, and drain lifecycle events for back-pressure modelling.
Why a queue (and not just Asyncc.ParallelLimit(int, java.util.Map<java.lang.Object, org.ores.async.Asyncc.AsyncTask<T, E>>, org.ores.async.Asyncc.IAsyncCallback<java.util.Map<java.lang.Object, T>, E>))?
ParallelLimit takes a fixed list of tasks. A NeoQueue accepts streaming
arrivals — you push tasks as they show up and the queue keeps the concurrency cap
honoured. Useful for incoming WS frames, paged downstream calls, file watcher events.
Usage
The handler's continuation parameter is conventionally named c (for
continuation). Fire it via c.success(value), c.fail(error), or the
canonical c.done(err, value).
NeoQueue<JobSpec, JobResult> queue = new NeoQueue<>(4); // concurrency = 4
queue.setTaskHandler((task, c) -> {
try {
c.success(processJob(task.getValue()));
} catch (Throwable t) {
c.fail(t);
}
});
queue.saturated((q) -> log.warn("queue saturated; in-flight at cap"));
queue.drain((q) -> log.info("queue drained"));
incoming.forEach(spec -> queue.push(spec));
With virtual threads
The queue ships with a daemon-threaded default executor. For production deployments on JDK 21+, swap it for a VT executor once at startup:
NeoQueue.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
Lifecycle events
Three lifecycle hooks (onSaturated, onUnsaturated, onDrain). The
semantics, pinned by NeoQueueLifecycleTest as of v0.2.5:
onSaturated— fires when anincrementStartedcauses in-flight to reach the concurrency cap. Gated by an internal flag so it fires once per saturation event, not on every dispatch while at cap.onUnsaturated— fires at task-completion time when the pending queue is empty AND we were saturated. Pairs withonSaturatedto bracket "the queue is backlogged" intervals: it does not fire every time in-flight drops below the cap if there is still work pending. Use it as a "no longer backlogged" signal, not as a "below cap right now" signal.onDrain— fires at task-completion time when there is no pending work AND all started tasks have finished. Gated so it fires exactly once per drain transition. A subsequentpushresets the gate.
Concurrency cap invariant
If the queue is constructed with concurrency = N, no more than N task handlers
will be in flight at any instant. Pinned by NeoQueueConcurrencyTest across burst
push (200 tasks), trickle push (1 ms apart), and repeated runs.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classstatic interfacestatic interfaceError-first continuation used byNeoQueue.ITaskHandler.static interfacestatic interfacestatic class -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionbooleanvoidnudge()voidvoidvoidvoidpause()voidpush(NeoQueue.Task<T, V> t) voidpush(NeoQueue.Task<T, V> t, NeoQueue.IAsyncErrFirstCb<V> cb) voidresume()voidsetDrained(boolean drained) static voidSwap the shared executor used for callback delivery.static voidshutdown()Shut down the default executor.voidunshift(NeoQueue.Task<T, V> t)
-
Constructor Details
-
NeoQueue
-
NeoQueue
-
-
Method Details
-
setExecutor
Swap the shared executor used for callback delivery. Useful when an application wants to route async.java callbacks onto a Vert.x context, an Akka dispatcher, or a bounded ForkJoinPool. -
shutdown
public static void shutdown()Shut down the default executor. No-op if a custom executor was installed viasetExecutor(ExecutorService)— the caller owns that one. -
getConcurrency
-
isDrained
public boolean isDrained() -
getOnDrainCbs
-
setDrained
public void setDrained(boolean drained) -
getOnSaturatedCbs
-
getOnUnsaturatedCbs
-
setConcurrency
-
nudge
public void nudge() -
push
-
push
-
onDrain
-
onSaturated
-
onUnsaturated
-
unshift
-
pause
public void pause() -
resume
public void resume()
-