Werk

The central object. Manages schema migrations and exposes all job management operations.

pgwerk does not maintain an internal connection pool. Each operation opens and closes its own short-lived connection, which makes it compatible with external poolers like PgBouncer in transaction pooling mode. Point your DSN at the pooler and pgwerk will use it transparently.

Constructor

Werk(
    dsn: str,
    *,
    config: WerkConfig | dict | None = None,
    schema: str | None = None,
    prefix: str | None = None,
    serializer: Serializer | None = None,
    max_active_secs: int | None = None,
    log_level: int | str | None = None,
    log_format: str | None = None,
    log_color: bool | None = None,
    log_fmt: str | None = None,
    auto_migrate: bool = True,
)
Parameter Default Description
dsn required Postgres connection string (or PgBouncer DSN)
config WerkConfig() Base configuration; keyword arguments take precedence
schema None Postgres schema to qualify all table names
prefix "_pgwerk" Prefix applied to every table name
serializer JSONSerializer Payload/result serializer
max_active_secs 3600 Seconds before an active job is considered stuck by sweep
log_level None Logging level passed to configure_logging
log_format None "text" or "json"
log_color None Enable/disable ANSI colour in text logs
log_fmt None Custom log format string
auto_migrate True Run schema migrations on connect(). Set to False when using werk migrate in CI/CD.

Lifecycle

connect() → None

Run schema migrations and initialise internal state. Idempotent — safe to call multiple times. Runs registered on_startup hooks on first call.

disconnect() → None

Run on_shutdown hooks and clean up internal state. Idempotent.

async with app

Calls connect() on entry and disconnect() on exit.

on_startup(func) → func

Register a callable to run on connect(). Can be used as a decorator.

@app.on_startup
async def init_cache():
    ...

on_shutdown(func) → func

Register a callable to run on disconnect().

Enqueueing

enqueue(func, *args, **kwargs) → Job | None

Enqueue a job. Control options are keyword arguments prefixed with _; all other keyword arguments are forwarded to the handler as payload.

Returns the inserted Job, or None if an idempotency key collision was detected.

Option Type Description
_queue str Queue name (default "default")
_priority int Higher values run first (default 0)
_delay int Seconds from now before the job becomes eligible
_at datetime Absolute UTC datetime for scheduled jobs
_retry Retry \| int Max attempts or a Retry object with back-off
_timeout int Seconds before the job is timed out
_heartbeat int Heartbeat renewal interval for long-running jobs
_key str Idempotency key — duplicate keys are silently dropped
_group str Concurrency group name
_conn AsyncConnection Existing psycopg connection for transactional enqueue
_meta dict Arbitrary metadata stored with the job
_result_ttl int Seconds to retain completed job rows
_failure_ttl int Seconds to retain failed job rows
_ttl int Seconds until an unstarted job expires
_on_success Callback \| Callable \| str Callback on successful completion
_on_failure Callback \| Callable \| str Callback on failure
_on_stopped Callback \| Callable \| str Callback when cancelled/stopped
_repeat Repeat Repeat policy for recurring jobs
_depends_on Dependency \| Job \| list Upstream jobs that must complete first
_failure_mode str "hold" (default) or "delete"
_sync bool Execute the job synchronously in the calling process instead of enqueueing it

enqueue_many(specs, *, _conn=None) → list[Job | None]

Enqueue multiple jobs in a single batch insert. specs is a list of EnqueueParams objects. Returns results in the same order; an entry is None for duplicate-key collisions.

apply(func, *args, timeout=None, poll_interval=0.5, **enqueue_kwargs) → Any

Enqueue a job and block until its result is available. Raises JobError if the job fails or is aborted.

map(func, iter_kwargs, *, timeout=None, poll_interval=0.5, return_exceptions=False, **shared_enqueue_kwargs) → list[Any]

Enqueue func for each dict in iter_kwargs and collect all results. With return_exceptions=True, failed jobs produce a JobError in the results list instead of raising.

wait_for(job_id, *, timeout=None, poll_interval=2.0) → Job

Block until a job reaches a terminal state. Uses LISTEN/NOTIFY for instant wake-up with polling as fallback.

Job management

get_job(job_id) → Job

Fetch a single job by ID. Raises JobNotFound if missing.

list_jobs(queue, status, worker_id, search, schedule_name, retried, limit, offset) → list[Job]

List jobs with optional filters. Pass retried=True to return only jobs that have prior execution attempts (i.e. were requeued at least once).

cancel_job(job_id) → bool

Cancel a queued or scheduled job. Returns True if found and cancelled.

requeue_job(job_id) → bool

Reset a failed or aborted job back to queued. Execution history is preserved — prior attempts remain in get_executions() and the retry budget is extended rather than reset.

abort_job(job_id) → bool

Request an active job to stop and mark it aborted.

delete_job(job_id) → None

Permanently delete a job row.

touch_job(job_id) → None

Extend the heartbeat deadline for a long-running job.

get_executions(job_id) → list[JobExecution]

Fetch the per-attempt execution history for a job.

get_job_dependencies(job_id) → list[str]

Return the IDs of jobs that must complete before the given job can run.

sweep() → list[str]

Detect stuck active jobs (no heartbeat within max_active_secs) and requeue them. Returns the list of requeued job IDs.

bulk_requeue_jobs(queue, function_name) → int

Requeue multiple failed or aborted jobs at once, optionally filtered by queue or function name. Attempt history is preserved across bulk requeues.

bulk_cancel_jobs(queue) → int

Cancel multiple queued jobs at once.

purge_jobs(statuses, older_than_days) → int

Delete job rows by status and age.

Maintenance

vacuum() → None

Run VACUUM ANALYZE on all werk tables.

truncate() → None

Truncate all werk tables. Useful in tests.

Lifecycle hooks

register_before_enqueue(callback) → None

Register a callback invoked with each Job just before it is inserted.

unregister_before_enqueue(callback) → None

Remove a previously registered before-enqueue callback.


WerkConfig

Centralised configuration. Keyword arguments to Werk() take precedence over values in a WerkConfig instance.

from pgwerk import Werk, WerkConfig

config = WerkConfig(
    prefix="_jobs",
    sweep_interval=30.0,
)
app = Werk(dsn, config=config)
Attribute Default Description
schema "pgwerk" Postgres schema for table qualification
prefix "_pgwerk" Table name prefix
max_active_secs 3600 Stuck-job threshold for sweep
heartbeat_interval 10 Worker heartbeat cadence (seconds)
poll_interval 5.0 Idle poll cadence (seconds)
abort_interval 1.0 Abort-signal check cadence (seconds)
sweep_interval 60.0 Maintenance sweep cadence (seconds)
shutdown_timeout 30.0 Graceful shutdown timeout (seconds)
sigterm_grace 5 ForkWorker SIGTERM→SIGKILL grace (seconds)
ephemeral_tables False Use UNLOGGED tables for worker/worker_jobs
default_retry_backoff True Synthesize exponential backoff when _retry is an integer
allow_truncate False Enable truncate() — useful in tests, dangerous in production