Tasks

The task() definition — schemas, retries, concurrency, and context.

A task is the unit of work in OpenQueue. One exported task() per job type, discovered from the directories listed in your config.

import { task } from '@openqueue/sdk';
import { z } from 'zod';

export const resizeImage = task({
  id: 'resize-image',
  schema: z.object({
    src: z.string(),
    sizes: z.array(z.number()).default([128, 512]),
  }),
  attempts: 5,
  backoff: { type: 'exponential', delay: 1000 },
  concurrency: 10,
  run: async (payload, ctx) => {
    ctx.logger.info('resizing', { src: payload.src });
    await ctx.progress({ step: 'uploading' });
    return { ok: true };
  },
});

Options

OptionDefaultDescription
idUnique task id. Also the job name on the queue.
schemaOptional Zod schema. When present, validates and parses input and types payload in run. Without it, input is passed through unvalidated.
queuetask idQueue name (string) or a queue definition. Tasks sharing a queue share its worker.
concurrency5Max jobs of this task processed in parallel.
attempts3Total tries before the job is marked failed.
backoffexponential, 1s{ type: 'exponential' | 'fixed', delay } — or a number for fixed delay.
cronCron expression for a static schedule. See Schedules.
ttlMax runtime in milliseconds before the run is timed out.
tags[]Free-form labels, filterable in Workbench.
descriptionShown in the Workbench task catalog.

The context

run(payload, ctx) receives a context with:

  • ctx.logger — structured logger (info, warn, error). Lines are attached to the run and visible in Workbench.
  • ctx.progress(data) — report progress; rendered live in the run view.

The return value of run is stored as the run's output and displayed in the dashboard.

Tasks do not need a schema:

export const freeform = task({
  id: 'freeform',
  run: async (payload) => payload,
});

Schema-less tasks accept any trigger payload. Workbench uses {} as their default Test body.

Triggering

Every task carries a typed trigger:

const { id } = await resizeImage.trigger(
  { src: 's3://uploads/a.heic', sizes: [128] },
  { delay: 5_000 },
);

You can also trigger by id — useful across service boundaries where you don't want to import the handler:

import { trigger } from '@openqueue/sdk';

await trigger('resize-image', { src: 's3://uploads/a.heic', sizes: [128] });

Triggering requires a queue runtime (created by the worker, or via createQueueClient in API processes that only enqueue).

On this page