Persistence

Postgres-backed run history, schedules, and alerts with Drizzle.

Redis holds live queue state, and BullMQ's retention is bounded — completed jobs age out, and a FLUSHALL takes your history with it. The Postgres adapter gives OpenQueue a durable side: run history with full payloads and event timelines, dynamic schedules that survive restarts, and alert configuration. Workbench reads all of it when present.

Quick setup

bunx openqueue add persistence

This scaffolds src/queue-schema.ts and drizzle.config.ts, adds drizzle-orm and drizzle-kit to your package.json, and prints the storage block to paste into your worker config. It never overwrites: existing files are left untouched (with merge hints instead), and re-running it just reports the current state. The sections below explain everything it sets up.

The tables

defineQueueSchema() returns plain Drizzle pg-core tables, so they ride your existing drizzle-kit setup — no separate migration tool.

TableWhat it holds
catalogSnapshot of the task catalog, republished on worker boot.
runsOne row per run — input, output, error, status, tags, timestamps. Upserted on every lifecycle transition.
run_eventsAppend-only event log per run (enqueued, started, progress, completed, failed). Cascades on run delete.
schedulesDynamic and declarative schedules, with a unique deduplication key.
schedule_instancesPer-schedule tick state (next/last run).
alert_channelsAlert contact points (webhook/Slack presets).
alert_rulesAlert rules — trigger, severity, thresholds, cooldown.

Table names are deliberately short (runs, catalog, schedules), so put them in a dedicated Postgres schema instead of public:

src/queue-schema.ts
import { defineQueueSchema } from '@openqueue/sdk';

export const queueSchema = defineQueueSchema({ schema: 'jobs' });

export const {
  queueCatalog,
  queueSchedules,
  queueScheduleInstances,
  queueRuns,
  queueRunEvents,
  alertChannels,
  alertRules,
} = queueSchema;

Re-exporting each table individually is what lets drizzle-kit discover them.

Generating migrations

Point a drizzle-kit config at the file and scope it to the queue schema:

drizzle.config.ts
import { defineConfig } from 'drizzle-kit';

export default defineConfig({
  dialect: 'postgresql',
  schema: './src/queue-schema.ts',
  out: './drizzle',
  dbCredentials: {
    url: process.env.DATABASE_URL!,
  },
  schemaFilter: ['jobs'],
  migrations: {
    table: '__drizzle_migrations',
    schema: 'jobs',
  },
});
bunx drizzle-kit generate   # emits CREATE SCHEMA "jobs" + all tables
bunx drizzle-kit migrate

schemaFilter keeps drizzle-kit from touching your application tables; the CREATE SCHEMA IF NOT EXISTS "jobs" statement is generated automatically. If your app already has a drizzle-kit config, you can instead add ./src/queue-schema.ts to its schema array and 'jobs' to its schemaFilter — one migration history for everything.

Wiring the adapter

Pass the same schema object (or just the schema name) to postgresAdapter in your worker config:

worker.config.ts
import { defineConfig, postgresAdapter } from '@openqueue/sdk';
import { db } from './src/db';
import { queueSchema } from './src/queue-schema';

export default defineConfig({
  namespace: 'my-app',
  dirs: ['./worker'],
  redis: { url: process.env.REDIS_URL! },
  storage: {
    adapter: postgresAdapter({ db, schema: queueSchema }),
  },
});

db is your existing Drizzle database instance — the adapter issues plain Drizzle queries through it, so it shares your pool, your transactions config, and your connection lifecycle.

What gets written

Every run lifecycle event flows through the adapter: the runs row is upserted with the latest status, and a run_events row is appended with the event payload — so you get both "where is this run now" and "what happened, in order". Failures store the serialized error; ctx.progress() patches land in metadata.

Runs also carry meta from trigger time. OpenQueue treats it as opaque application data, and indexes the JSON document for containment filters:

await exportCsv.trigger(payload, {
  meta: { tenantId: tenant.id, customerId: customer.id, tags: ['billing'] },
});

Tags are stored separately for tag filters. Workbench's run filters (task, status, schedule, meta, tags, time range) map directly onto generic queue state rather than application-specific columns.

Suggested patterns

  • One schema object, two consumers. Define queueSchema once and hand it to both drizzle-kit (migrations) and postgresAdapter (runtime). A schema-name string in one place and an object in the other drifts eventually.

  • Storage is required for dynamic schedules and alerts. Without an adapter, task.schedules.create() throws and alert rules have nowhere to live. Declarative cron tasks work either way.

  • Prune with a task. Run history grows forever by design — deleting is your call. A cron task keeps it honest, and run_events cascades:

    worker/prune-runs.ts
    import { lt } from 'drizzle-orm';
    import { queueRuns } from '../src/queue-schema';
    
    export const pruneRuns = task({
      id: 'prune-runs',
      schema: z.object({}),
      cron: '0 4 * * *',
      run: async () => {
        const cutoff = new Date(Date.now() - 90 * 24 * 3600 * 1000);
        await db.delete(queueRuns).where(lt(queueRuns.createdAt, cutoff.toISOString()));
      },
    });
  • Worker pools share one set of tables. Point every pool's adapter at the same schema — run history is keyed by run id, and schedules deduplicate. The catalog table mirrors the Redis catalog's last-writer-wins behavior across pools (see Scaling).

On this page