Roadmap
Planned runtime and broker work for OpenQueue.
OpenQueue currently uses BullMQ on Redis as its queue runtime. That is still the default path: it is mature, simple to run, and supports the full feature set today. The next major runtime direction is making that dependency an adapter instead of the center of the architecture, so OpenQueue can support other brokers such as Kafka and RabbitMQ without changing the task API.
Runtime adapters
The target is a broker-neutral runtime boundary:
export default defineConfig({
namespace: 'my-app',
dirs: ['./worker'],
runtime: kafkaRuntime({
brokers: process.env.KAFKA_BROKERS!.split(','),
groupId: 'my-app-workers',
}),
storage: {
adapter: postgresAdapter({ db, schema: queueSchema }),
},
});The existing BullMQ setup would become the first adapter:
export default defineConfig({
namespace: 'my-app',
dirs: ['./worker'],
runtime: bullmqRuntime({
redis: { url: process.env.REDIS_URL! },
}),
});For a compatibility window, redis can keep selecting BullMQ implicitly:
export default defineConfig({
namespace: 'my-app',
dirs: ['./worker'],
redis: { url: process.env.REDIS_URL! },
});The public task model should stay stable:
task()keeps defining an id, schema, handler, queue, concurrency, retries, timeout, TTL, cron schedules, and hooks.task.trigger()keeps being the enqueue API.enqueueFlow()remains the API for dependent work, but it becomes a runtime capability instead of a BullMQ-only assumption.ctx.logger,ctx.progress(),ctx.runId,ctx.attempt, and OpenTelemetry context stay broker-neutral.- Workbench keeps showing queues, runs, flows, schedules, errors, metrics, and logs, but reads from runtime capabilities plus durable storage instead of directly using BullMQ APIs.
Why this matters
Redis/BullMQ is a good default, but different teams already have different operational centers of gravity:
| Runtime | Why teams want it |
|---|---|
| BullMQ / Redis | Simple local development, delayed jobs, retries, flows, and good all-in-one semantics. |
| Kafka | Existing event backbone, high throughput, durable ordered logs, replay, large worker fleets. |
| RabbitMQ | Mature work queues, routing keys, exchanges, acknowledgements, prefetch, and enterprise operations. |
| SQS / Pub/Sub / NATS | Managed cloud queues, simpler operations, or lower-latency messaging in specific environments. |
The goal is not to pretend every broker has the same semantics. The goal is to expose the same OpenQueue developer API where possible, make runtime differences explicit through capabilities, and keep operational state portable through storage.
Capability model
Not every broker can support every feature natively. Adapters should declare what they support:
type RuntimeCapabilities = {
delayed: 'native' | 'storage' | false;
retries: 'native' | 'storage' | false;
priorities: 'native' | 'emulated' | false;
flows: 'native' | 'storage' | false;
schedules: 'native' | 'storage' | false;
pauseResume: 'queue' | 'worker' | false;
repeatableJobs: 'native' | 'storage' | false;
jobLogs: 'native' | 'storage' | false;
jobProgress: 'native' | 'storage' | false;
inspectJobs: 'native' | 'storage' | false;
};Workbench and the CLI can use those capabilities to decide which controls to show. For example, a Kafka runtime may support retry and delay through Postgres storage, but not support true queue-level pause. RabbitMQ may support dead-letter routing and manual acknowledgements natively, while using storage for long delays and flow dependency tracking.
Runtime contract
The adapter boundary should be small and boring. Core owns task discovery, payload validation, lifecycle events, OpenTelemetry, storage writes, and the developer API. Runtime adapters own broker IO.
type QueueRuntime = {
name: string;
capabilities: RuntimeCapabilities;
connect(options: RuntimeConnectOptions): Promise<void>;
close(): Promise<void>;
enqueue(message: RuntimeMessage, options: EnqueueOptions): Promise<EnqueueResult>;
enqueueBatch(messages: RuntimeMessage[]): Promise<EnqueueResult[]>;
createWorker(options: WorkerOptions): RuntimeWorker;
inspect?: RuntimeInspector;
scheduler?: RuntimeScheduler;
flows?: RuntimeFlowEngine;
};Messages should carry OpenQueue fields independently of broker metadata:
type RuntimeMessage = {
id: string;
namespace: string;
queue: string;
taskId: string;
payload: unknown;
meta?: Record<string, unknown>;
traceparent?: string;
createdAt: string;
scheduledFor?: string;
attempt: number;
maxAttempts: number;
timeoutMs?: number;
ttlMs?: number;
priority?: number;
dedupeKey?: string;
flow?: {
id: string;
nodeId: string;
parentId?: string;
dependencyIds?: string[];
};
};The worker surface should normalize acknowledgement behavior:
type RuntimeJob = {
message: RuntimeMessage;
ack(): Promise<void>;
retry(error: SerializedError, options: RetryDecision): Promise<void>;
fail(error: SerializedError): Promise<void>;
extendLock?(durationMs: number): Promise<void>;
};Core can then run the same lifecycle for every backend:
- Receive a runtime job.
- Load the matching task definition.
- Validate payload with the task schema.
- Create the execution context.
- Run the task with timeout handling.
- Persist progress, logs, result, and errors.
- Ack, retry, or fail through the runtime adapter.
- Emit drains and metrics.
Storage as the portable source of truth
BullMQ currently owns a lot of state in Redis: delayed jobs, job status, attempts, logs, repeatable jobs, flows, and queue counters. A multi-broker OpenQueue needs more of that state in the storage adapter, especially for brokers that do not expose BullMQ-like inspection APIs.
The existing Postgres adapter is the right foundation, but the schema likely needs several runtime-neutral tables:
| Table | Purpose |
|---|---|
runs | Current durable run state, including status, task id, queue, timestamps, attempt, payload, result, error, and metadata. |
run_events | Append-only event log for enqueue, start, progress, log, complete, fail, retry, cancel, and schedule ticks. |
outbox | Transactional enqueue intents that can be delivered to the broker after the database commit. |
queue_state | Queue pause state, drain state, runtime metadata, and visibility for Workbench. |
flow_nodes | Dependency graph state for broker-neutral flows. |
dead_letters | Terminal failures with enough broker metadata to diagnose poison messages. |
runtime_offsets | Optional adapter checkpoints when a backend needs explicit offset or cursor bookkeeping. |
For BullMQ-only installs, storage can remain optional for basic queues. For Kafka and RabbitMQ, storage should be strongly recommended and may be required for schedules, delayed retries, flow dependencies, Workbench inspection, and exact operational history.
Transactional enqueue
Adapters should support an outbox path so application state and enqueueing do not drift:
await db.transaction(async (tx) => {
await tx.insert(invoices).values(invoice);
await queue.outbox(tx).enqueue(sendInvoice, {
invoiceId: invoice.id,
});
});The outbox dispatcher then publishes to BullMQ, Kafka, RabbitMQ, or another adapter and marks the outbox row delivered. This is especially important for Kafka and RabbitMQ because a database commit and a broker publish are separate systems.
Important details:
- Outbox rows need a deterministic id and optional
dedupeKey. - Dispatch should be idempotent.
- Broker publish confirmation should be recorded.
- Failed publish attempts should retry with backoff.
- Workbench should show stuck outbox rows as operational errors.
- Enqueue without an outbox can still exist for simple cases, but should be documented as at-least-once without transactional coupling.
Retry and delay model
BullMQ has native attempts, backoff, and delayed jobs. Kafka and RabbitMQ do not map cleanly to that model across all delay lengths and operational modes. OpenQueue should own the retry policy and let each adapter choose the best delivery mechanism.
The runtime-neutral retry decision should include:
- attempt number
- max attempts
- next run time
- backoff reason
- serialized error
- retryable versus non-retryable classification
- original run id
- optional new broker message id
For short delays, a broker-native mechanism can be acceptable. For long or important delays, storage-backed scheduling is safer:
- Worker records a retry event and next attempt time.
- Worker acks or rejects the broker message according to adapter semantics.
- Scheduler scans due retries from storage.
- Scheduler republishes a new broker message for the same run.
- Worker receives the retry and continues with incremented attempt state.
This makes retry behavior consistent across backends even when broker features differ.
Scheduling
OpenQueue has two scheduling shapes:
- declarative schedules from task definitions, such as
cron - dynamic schedules created through runtime APIs and shown in Workbench
The adapter roadmap should move schedule ownership into storage plus a scheduler process. Broker-native delayed messages can be an optimization, but the durable schedule source should be Postgres.
The scheduler should:
- claim due schedule instances with row-level locking
- create run records before publishing
- publish through the runtime adapter
- record schedule tick events
- handle missed ticks explicitly
- expose next run, last run, status, and error state to Workbench
This avoids tying schedules to BullMQ repeatable jobs and gives Kafka and RabbitMQ the same schedule semantics.
Flows
BullMQ has native flow support. Kafka and RabbitMQ do not have an equivalent
parent-child job graph built in. To keep enqueueFlow() portable, OpenQueue
needs a storage-backed flow engine.
Flow state should track:
- flow id
- node id
- task id
- queue
- payload
- parent node id
- dependency node ids
- node status
- output or error
- timestamps
Execution model:
- Insert all flow nodes in storage.
- Publish only nodes whose dependencies are satisfied.
- On node completion, mark it completed and check dependents.
- Publish newly unblocked children through the runtime adapter.
- If a node fails terminally, mark dependent nodes blocked or failed according to the flow policy.
BullMQ can keep using native flows internally at first, but the long-term shape should be the same Workbench graph and same flow state regardless of broker.
Workbench changes
Workbench currently benefits from BullMQ inspection APIs. A broker-neutral Workbench needs an OpenQueue control plane API instead of direct BullMQ objects.
The control plane should expose:
- queue list and queue metadata
- run list with filters and pagination
- run detail, payload, output, error, logs, and event timeline
- queue counters derived from storage plus runtime metrics
- retry, cancel, remove, and promote operations where supported
- schedule list, detail, pause, resume, create, update, delete
- flow list and graph detail
- runtime capabilities for feature gating
- runtime health and broker connection state
Controls should degrade by capability. For example:
- BullMQ can show native delayed counts and promote delayed jobs.
- Kafka can show consumer lag and partition assignment, but may not support promoting an individual broker message.
- RabbitMQ can show queue depth, consumers, unacked count, and dead-letter routing, but long-delay promotion may be storage-driven.
Kafka runtime
Kafka is a strong fit for high-throughput background work when a team already runs Kafka as its event backbone. It is not a simple drop-in replacement for BullMQ because Kafka is a partitioned log, not a job queue.
Recommended mapping:
| OpenQueue concept | Kafka mapping |
|---|---|
| namespace | topic prefix or message header |
| queue | Kafka topic |
| run id | message key or header |
| task id | message header and payload field |
| payload | message value |
| metadata | message headers plus value fields |
| worker pool | consumer group |
| concurrency | partitions, consumers, and per-consumer execution limits |
| retry | storage-backed retry topic or delayed scheduler |
| dead letter | dead-letter topic |
| progress/logs | storage events |
| schedule | storage scheduler publishing to Kafka |
Kafka design details:
- Topics should usually be per queue, such as
my-app.defaultandmy-app.email. - Message keys should be chosen deliberately. A run id spreads work evenly; an entity id preserves per-entity ordering.
- OpenQueue should commit offsets only after the task succeeds or after the retry/failure state has been durably recorded.
- Long-running jobs require careful max poll interval and heartbeat handling.
- Retrying by not committing offsets can block a partition behind one bad message, so OpenQueue should prefer recording retry state, committing the original offset, and republishing later.
- Kafka has no native per-message delay in the same sense as BullMQ delayed jobs. Use retry topics for coarse delays or storage-backed scheduling for exact delays.
- Workbench should show consumer group lag and partition assignment when the client exposes it.
- Idempotent producers and deterministic run ids should be used for safer publish retries.
Kafka operational caveats:
- Partition count is the hard ceiling for parallelism inside one consumer group.
- Ordering and concurrency trade off against each other.
- Poison messages need dead-letter handling or they can stall progress.
- Replays are powerful but must not accidentally re-run side-effecting tasks without an explicit replay mode.
- Large payloads should be stored externally, with the message carrying a pointer.
Kafka Share Groups
Kafka Share Groups are an emerging Kafka model that behaves more like a queue: multiple consumers can share records without strict partition ownership. This is closer to OpenQueue's worker-pool semantics than classic consumer groups.
Potential benefits:
- Better work sharing for queue-like tasks.
- Less partition-bound parallelism.
- More natural acknowledgement semantics for background jobs.
- Easier scaling for heterogeneous worker fleets.
Roadmap stance:
- Support classic Kafka consumer groups first.
- Keep the adapter boundary flexible enough for Share Groups later.
- Treat Share Groups as an optional Kafka mode, not the baseline.
- Document client and provider maturity before recommending it in production.
RabbitMQ runtime
RabbitMQ is a natural fit for background jobs because it already models queues, acknowledgements, routing, and prefetch. It is closer to BullMQ than Kafka in worker semantics, but still lacks BullMQ's full job-state model.
Recommended mapping:
| OpenQueue concept | RabbitMQ mapping |
|---|---|
| namespace | exchange prefix or vhost |
| queue | RabbitMQ queue |
| task id | routing key or message header |
| run id | message id or header |
| payload | message body |
| metadata | message headers |
| worker concurrency | consumer prefetch and local execution limit |
| retry | delayed exchange, TTL/DLX, or storage scheduler |
| dead letter | DLX and dead-letter queue |
| progress/logs | storage events |
| schedule | storage scheduler publishing to exchange |
RabbitMQ design details:
- Use durable exchanges and durable queues for production.
- Use persistent messages for jobs that must survive broker restarts.
- Use publisher confirms for enqueue success.
- Use manual acknowledgements, not auto-ack.
- Set prefetch to match worker concurrency.
- Ack only after task success or after retry/failure state is durable.
- Reject or dead-letter poison messages intentionally; do not requeue forever.
- Use quorum queues for stronger durability where appropriate.
- Keep long delays in storage unless the deployment intentionally uses a delayed-message strategy.
RabbitMQ routing options:
- Direct exchange: simple
queue.taskrouting keys. - Topic exchange: route by queue, task id, tenant, or priority class.
- Separate queues per OpenQueue queue: easiest Workbench and scaling model.
- Priority queues: possible, but should be an explicit feature because they affect broker behavior and resource usage.
RabbitMQ retry options:
| Strategy | Use case | Tradeoff |
|---|---|---|
| Storage-backed retry scheduler | Default portable behavior. | Requires Postgres-backed runtime state. |
| TTL plus dead-letter exchange | Simple broker-native retry lanes. | Coarse delays and more queue topology. |
| Delayed-message exchange | Convenient per-message delay. | Plugin availability and operational support vary. |
| Immediate requeue | Very short transient failure. | Can hot-loop and starve other work. |
BullMQ adapter
BullMQ should remain the default adapter and the compatibility reference implementation.
Initial adapter responsibilities:
- preserve current
redisconfig behavior - keep current delayed jobs, retries, logs, progress, schedules, and flows
- implement the new runtime contract
- expose BullMQ inspection capabilities to Workbench
- keep the same job ids and queue naming where possible
This lets the adapter architecture land without forcing existing users to migrate broker behavior immediately.
Configuration shape
The long-term config should separate runtime and storage:
export default defineConfig({
namespace: 'my-app',
dirs: ['./worker'],
runtime: rabbitmqRuntime({
url: process.env.RABBITMQ_URL!,
exchange: 'openqueue',
}),
storage: {
adapter: postgresAdapter({ db, schema: queueSchema }),
},
});Runtime-specific packages could be split like this:
| Package | Purpose |
|---|---|
@openqueue/sdk | Main task API, config, and re-exports. |
@openqueue/core | Runtime-neutral core. |
@openqueue/bullmq | BullMQ runtime adapter. |
@openqueue/kafka | Kafka runtime adapter. |
@openqueue/rabbitmq | RabbitMQ runtime adapter. |
@openqueue/worker | Worker process and Workbench host. |
@openqueue/workbench | Dashboard and control plane UI. |
The main package can re-export BullMQ by default to keep quickstart simple, while advanced runtimes can be installed explicitly.
Compatibility plan
The migration should be staged:
Phase 1: Runtime boundary
- Introduce the
runtimeconfig option. - Implement BullMQ through the new adapter interface.
- Keep
redisas a compatibility shortcut. - Move BullMQ-specific imports out of core runtime paths where possible.
- Add capability metadata.
- Keep Workbench behavior unchanged for BullMQ.
Phase 2: Storage-backed control plane
- Move run inspection, logs, progress, schedules, and alerts toward storage.
- Add outbox support.
- Add queue state and runtime health tables.
- Add storage-backed delayed retry support.
- Add storage-backed schedule ticks.
- Make Workbench read through the control plane instead of BullMQ directly.
Phase 3: Kafka preview
- Add Kafka runtime package.
- Support enqueue, consume, ack-through-offset, failure, retry, and dead letters.
- Support topic-per-queue topology.
- Require storage for schedules, flow state, logs, progress, and Workbench history.
- Add lag and partition metadata to Workbench.
- Document ordering, partitioning, replay, and poison-message behavior.
Phase 4: RabbitMQ preview
- Add RabbitMQ runtime package.
- Support durable queues, publisher confirms, manual ack, prefetch, retry, and dead-lettering.
- Support direct or topic exchange topology.
- Use storage-backed long delays and schedules.
- Add queue depth, consumer count, unacked count, and DLX state to Workbench.
Phase 5: Portable flows and advanced controls
- Implement storage-backed flow execution.
- Make
enqueueFlow()portable across adapters that declare flow support. - Add broker-neutral cancellation semantics where possible.
- Add runtime-specific Workbench panels.
- Add replay tooling with explicit side-effect warnings.
Testing requirements
Every runtime should pass the same core behavior suite:
- enqueue and execute a task
- validate payload before publish where possible
- preserve trace context
- record lifecycle events
- complete successfully
- fail with a serialized error
- retry with exponential backoff
- stop retrying on
NonRetryableError - honor timeout and TTL
- run with configured concurrency
- support graceful shutdown
- expose queue and run state to Workbench
- support schedules according to declared capabilities
- support flows according to declared capabilities
Runtime-specific test suites should cover broker semantics:
- BullMQ: delayed jobs, job logs, flows, repeatable jobs, pause/resume, job promotion, and cleanup.
- Kafka: offset commits, partition assignment, consumer group rebalance, idempotent publish, retry topics or storage retries, dead-letter topics, lag reporting, and replay behavior.
- RabbitMQ: publisher confirms, manual ack, nack/reject behavior, prefetch, durable queue recovery, dead-letter exchange routing, quorum queues, and delay strategy.
Open questions
- Should non-BullMQ runtimes require Postgres storage, or only require it when enabling schedules, flows, and Workbench history?
- Should
enqueueFlow()throw when the runtime lacks flow support, or should storage-backed flows be part of core and always available with storage? - Should queue-level pause be a required runtime feature, or should worker pause be enough for Kafka and some managed brokers?
- Should task priority be portable, or declared runtime-specific?
- Should the outbox API live on the queue client, storage adapter, or both?
- Should large payload externalization be a core feature before Kafka support?
- Should replay be a Workbench feature, a CLI feature, or both?
Current recommendation
Ship the adapter architecture around BullMQ first, then move Workbench and schedules onto storage-backed control-plane APIs. After that, Kafka and RabbitMQ can be added as preview runtimes without compromising the existing BullMQ experience.
Kafka should be the first non-BullMQ backend if the target is high-throughput event infrastructure. RabbitMQ should be the first non-BullMQ backend if the target is queue semantics that are operationally familiar to teams already running AMQP.
In both cases, Postgres-backed storage is the key that makes OpenQueue feel like OpenQueue instead of exposing every broker's raw edge cases to task authors.