Werk
The central object. Holds the connection pool and exposes all job management operations.
Constructor
Werk(
dsn: str,
*,
config: WerkConfig | dict | None = None,
schema: str | None = None,
prefix: str | None = None,
min_pool_size: int | None = None,
max_pool_size: int | None = None,
serializer: Serializer | None = None,
max_active_secs: int | None = None,
log_level: int | str | None = None,
log_format: str | None = None,
log_color: bool | None = None,
log_fmt: str | None = None,
)
| Parameter | Default | Description |
|---|---|---|
dsn |
required | Postgres connection string |
config |
WerkConfig() |
Base configuration; keyword arguments take precedence |
schema |
None |
Postgres schema to qualify all table names |
prefix |
"_pgwerk" |
Prefix applied to every table name |
min_pool_size |
2 |
Minimum pooled connections |
max_pool_size |
10 |
Maximum pooled connections |
serializer |
JSONSerializer |
Payload/result serializer |
max_active_secs |
3600 |
Seconds before an active job is considered stuck by sweep |
log_level |
None |
Logging level passed to configure_logging |
log_format |
None |
"text" or "json" |
log_color |
None |
Enable/disable ANSI colour in text logs |
log_fmt |
None |
Custom log format string |
Lifecycle
connect() → None
Open the connection pool and run schema migrations. Idempotent — safe to call multiple times. Runs registered on_startup hooks after the pool is open.
disconnect() → None
Run on_shutdown hooks and close the connection pool. Idempotent.
async with app
Calls connect() on entry and disconnect() on exit.
on_startup(func) → func
Register a callable to run after the pool opens. Can be used as a decorator.
@app.on_startup
async def init_cache():
...
on_shutdown(func) → func
Register a callable to run before the pool closes.
Enqueueing
enqueue(func, *args, **kwargs) → Job | None
Enqueue a job. Control options are keyword arguments prefixed with _; all other keyword arguments are forwarded to the handler as payload.
Returns the inserted Job, or None if an idempotency key collision was detected.
| Option | Type | Description |
|---|---|---|
_queue |
str |
Queue name (default "default") |
_priority |
int |
Higher values run first (default 0) |
_delay |
int |
Seconds from now before the job becomes eligible |
_at |
datetime |
Absolute UTC datetime for scheduled jobs |
_retry |
Retry \| int |
Max attempts or a Retry object with back-off |
_timeout |
int |
Seconds before the job is timed out |
_heartbeat |
int |
Heartbeat renewal interval for long-running jobs |
_key |
str |
Idempotency key — duplicate keys are silently dropped |
_group |
str |
Concurrency group name |
_conn |
AsyncConnection |
Existing psycopg connection for transactional enqueue |
_meta |
dict |
Arbitrary metadata stored with the job |
_result_ttl |
int |
Seconds to retain completed job rows |
_failure_ttl |
int |
Seconds to retain failed job rows |
_ttl |
int |
Seconds until an unstarted job expires |
_on_success |
Callback \| Callable \| str |
Callback on successful completion |
_on_failure |
Callback \| Callable \| str |
Callback on failure |
_on_stopped |
Callback \| Callable \| str |
Callback when cancelled/stopped |
_repeat |
Repeat |
Repeat policy for recurring jobs |
_depends_on |
Dependency \| Job \| list |
Upstream jobs that must complete first |
_failure_mode |
str |
"hold" (default) or "delete" |
enqueue_many(specs, *, _conn=None) → list[Job | None]
Enqueue multiple jobs in a single batch insert. specs is a list of EnqueueParams objects. Returns results in the same order; an entry is None for duplicate-key collisions.
apply(func, *args, timeout=None, poll_interval=0.5, **enqueue_kwargs) → Any
Enqueue a job and block until its result is available. Raises JobError if the job fails or is aborted.
map(func, iter_kwargs, *, timeout=None, poll_interval=0.5, return_exceptions=False, **shared_enqueue_kwargs) → list[Any]
Enqueue func for each dict in iter_kwargs and collect all results. With return_exceptions=True, failed jobs produce a JobError in the results list instead of raising.
wait_for(job_id, *, timeout=None, poll_interval=2.0) → Job
Block until a job reaches a terminal state. Uses LISTEN/NOTIFY for instant wake-up with polling as fallback.
Job management
get_job(job_id) → Job
Fetch a single job by ID. Raises JobNotFound if missing.
list_jobs(queue, status, worker_id, search, limit, offset) → list[Job]
List jobs with optional filters.
cancel_job(job_id) → bool
Cancel a queued or scheduled job. Returns True if found and cancelled.
requeue_job(job_id) → bool
Reset a completed or failed job back to queued.
abort_job(job_id) → bool
Request an active job to stop and mark it aborted.
delete_job(job_id) → None
Permanently delete a job row.
touch_job(job_id) → None
Extend the heartbeat deadline for a long-running job.
get_executions(job_id) → list[JobExecution]
Fetch the per-attempt execution history for a job.
get_job_dependencies(job_id) → list[str]
Return the IDs of jobs that must complete before the given job can run.
sweep() → list[str]
Detect stuck active jobs (no heartbeat within max_active_secs) and requeue them. Returns the list of requeued job IDs.
bulk_requeue_jobs(queue, function_name) → int
Requeue multiple jobs at once, optionally filtered by queue or function name.
bulk_cancel_jobs(queue) → int
Cancel multiple queued jobs at once.
purge_jobs(statuses, older_than_days) → int
Delete job rows by status and age.
Maintenance
vacuum() → None
Run VACUUM ANALYZE on all werk tables.
truncate() → None
Truncate all werk tables. Useful in tests.
Lifecycle hooks
register_before_enqueue(callback) → None
Register a callback invoked with each Job just before it is inserted.
unregister_before_enqueue(callback) → None
Remove a previously registered before-enqueue callback.
WerkConfig
Centralised configuration. Keyword arguments to Werk() take precedence over values in a WerkConfig instance.
from pgwerk import Werk, WerkConfig
config = WerkConfig(
prefix="_jobs",
max_pool_size=20,
sweep_interval=30.0,
)
app = Werk(dsn, config=config)
| Attribute | Default | Description |
|---|---|---|
schema |
None |
Postgres schema for table qualification |
prefix |
"_pgwerk" |
Table name prefix |
min_pool_size |
2 |
Min connections in pool |
max_pool_size |
10 |
Max connections in pool |
max_active_secs |
3600 |
Stuck-job threshold for sweep |
heartbeat_interval |
10 |
Worker heartbeat cadence (seconds) |
poll_interval |
5.0 |
Idle poll cadence (seconds) |
abort_interval |
1.0 |
Abort-signal check cadence (seconds) |
sweep_interval |
60.0 |
Maintenance sweep cadence (seconds) |
shutdown_timeout |
30.0 |
Graceful shutdown timeout (seconds) |
sigterm_grace |
5 |
ForkWorker SIGTERM→SIGKILL grace (seconds) |
cron_standby_retry_interval |
30.0 |
Standby scheduler retry cadence (seconds) |
ephemeral_tables |
False |
Use UNLOGGED tables for worker/worker_jobs |