Persistence
Postgres-backed run history, schedules, and alerts with Drizzle.
Redis holds live queue state, and BullMQ's retention is bounded — completed
jobs age out, and a FLUSHALL takes your history with it. The Postgres
adapter gives OpenQueue a durable side: run history with full payloads and
event timelines, dynamic schedules that survive restarts, and alert
configuration. Workbench reads all of it when present.
Quick setup
bunx openqueue add persistenceThis scaffolds src/queue-schema.ts and drizzle.config.ts, adds
drizzle-orm and drizzle-kit to your package.json, and prints the
storage block to paste into your worker config. It never overwrites:
existing files are left untouched (with merge hints instead), and re-running
it just reports the current state. The sections below explain everything it
sets up.
The tables
defineQueueSchema() returns plain Drizzle pg-core tables, so they ride
your existing drizzle-kit setup — no separate migration tool.
| Table | What it holds |
|---|---|
catalog | Snapshot of the task catalog, republished on worker boot. |
runs | One row per run — input, output, error, status, tags, timestamps. Upserted on every lifecycle transition. |
run_events | Append-only event log per run (enqueued, started, progress, completed, failed). Cascades on run delete. |
schedules | Dynamic and declarative schedules, with a unique deduplication key. |
schedule_instances | Per-schedule tick state (next/last run). |
alert_channels | Alert contact points (webhook/Slack presets). |
alert_rules | Alert rules — trigger, severity, thresholds, cooldown. |
Table names are deliberately short (runs, catalog, schedules), so put
them in a dedicated Postgres schema instead of public:
import { defineQueueSchema } from '@openqueue/sdk';
export const queueSchema = defineQueueSchema({ schema: 'jobs' });
export const {
queueCatalog,
queueSchedules,
queueScheduleInstances,
queueRuns,
queueRunEvents,
alertChannels,
alertRules,
} = queueSchema;Re-exporting each table individually is what lets drizzle-kit discover them.
Generating migrations
Point a drizzle-kit config at the file and scope it to the queue schema:
import { defineConfig } from 'drizzle-kit';
export default defineConfig({
dialect: 'postgresql',
schema: './src/queue-schema.ts',
out: './drizzle',
dbCredentials: {
url: process.env.DATABASE_URL!,
},
schemaFilter: ['jobs'],
migrations: {
table: '__drizzle_migrations',
schema: 'jobs',
},
});bunx drizzle-kit generate # emits CREATE SCHEMA "jobs" + all tables
bunx drizzle-kit migrateschemaFilter keeps drizzle-kit from touching your application tables; the
CREATE SCHEMA IF NOT EXISTS "jobs" statement is generated automatically.
If your app already has a drizzle-kit config, you can instead add
./src/queue-schema.ts to its schema array and 'jobs' to its
schemaFilter — one migration history for everything.
Wiring the adapter
Pass the same schema object (or just the schema name) to postgresAdapter
in your worker config:
import { defineConfig, postgresAdapter } from '@openqueue/sdk';
import { db } from './src/db';
import { queueSchema } from './src/queue-schema';
export default defineConfig({
namespace: 'my-app',
dirs: ['./worker'],
redis: { url: process.env.REDIS_URL! },
storage: {
adapter: postgresAdapter({ db, schema: queueSchema }),
},
});db is your existing Drizzle database instance — the adapter issues plain
Drizzle queries through it, so it shares your pool, your transactions
config, and your connection lifecycle.
What gets written
Every run lifecycle event flows through the adapter: the runs row is
upserted with the latest status, and a run_events row is appended with
the event payload — so you get both "where is this run now" and "what
happened, in order". Failures store the serialized error; ctx.progress()
patches land in metadata.
Runs also carry meta from trigger time. OpenQueue treats it as opaque
application data, and indexes the JSON document for containment filters:
await exportCsv.trigger(payload, {
meta: { tenantId: tenant.id, customerId: customer.id, tags: ['billing'] },
});Tags are stored separately for tag filters. Workbench's run filters (task, status, schedule, meta, tags, time range) map directly onto generic queue state rather than application-specific columns.
Suggested patterns
-
One schema object, two consumers. Define
queueSchemaonce and hand it to both drizzle-kit (migrations) andpostgresAdapter(runtime). A schema-name string in one place and an object in the other drifts eventually. -
Storage is required for dynamic schedules and alerts. Without an adapter,
task.schedules.create()throws and alert rules have nowhere to live. Declarativecrontasks work either way. -
Prune with a task. Run history grows forever by design — deleting is your call. A cron task keeps it honest, and
run_eventscascades:worker/prune-runs.ts import { lt } from 'drizzle-orm'; import { queueRuns } from '../src/queue-schema'; export const pruneRuns = task({ id: 'prune-runs', schema: z.object({}), cron: '0 4 * * *', run: async () => { const cutoff = new Date(Date.now() - 90 * 24 * 3600 * 1000); await db.delete(queueRuns).where(lt(queueRuns.createdAt, cutoff.toISOString())); }, }); -
Worker pools share one set of tables. Point every pool's adapter at the same schema — run history is keyed by run id, and schedules deduplicate. The catalog table mirrors the Redis catalog's last-writer-wins behavior across pools (see Scaling).