Workers dequeue jobs from Postgres and execute them. werk ships with four worker types, all sharing the same base behaviour.
Worker types
AsyncWorker
The default. Runs handlers as coroutines on a single asyncio event loop. Best for I/O-bound work (HTTP calls, database queries, file I/O).
from pgwerk import AsyncWorker
worker = AsyncWorker(app=app, queues=["default"], concurrency=20)
await worker.run()
ThreadWorker
Runs each handler in a thread-pool executor. Suitable for blocking libraries that are not async-aware.
from pgwerk import ThreadWorker
worker = ThreadWorker(app=app, concurrency=8)
await worker.run()
ProcessWorker
Runs handlers in a process-pool executor. Use this for CPU-bound work that benefits from true parallelism (bypasses the GIL).
from pgwerk import ProcessWorker
worker = ProcessWorker(app=app, concurrency=4)
await worker.run()
ForkWorker
Forks a new process for each individual job. Provides maximum isolation — a crashing job cannot affect the worker process. A SIGTERM grace period allows the child process to finish before SIGKILL is sent.
from pgwerk import ForkWorker
worker = ForkWorker(app=app, concurrency=4)
await worker.run()
Configuration
All workers share these constructor parameters:
| Parameter | Default | Description |
|---|---|---|
app |
required | Connected Werk instance |
queues |
["default"] |
Queue names to consume |
concurrency |
10 |
Max jobs processed simultaneously |
heartbeat_interval |
10 |
Seconds between worker heartbeats |
poll_interval |
5.0 |
Seconds between database polls when idle |
dequeue_strategy |
Priority |
How queues are ordered when dequeuing |
burst |
False |
Exit once the queue is empty |
before_process |
[] |
Hooks called before each job starts |
after_process |
[] |
Hooks called after each job finishes |
sweep_interval |
60.0 |
Seconds between maintenance sweeps |
abort_interval |
1.0 |
Seconds between abort-signal checks |
shutdown_timeout |
30.0 |
Seconds to wait for in-flight jobs during shutdown |
Dequeue strategies
Control how the worker picks jobs from multiple queues:
from pgwerk import AsyncWorker, DequeueStrategy
# Priority (default) — higher-priority jobs run first across all queues
worker = AsyncWorker(app=app, queues=["high", "low"], dequeue_strategy=DequeueStrategy.Priority)
# Round-robin — cycle through queues evenly
worker = AsyncWorker(app=app, queues=["a", "b", "c"], dequeue_strategy=DequeueStrategy.RoundRobin)
# Random — pick a queue at random each poll
worker = AsyncWorker(app=app, queues=["a", "b", "c"], dequeue_strategy=DequeueStrategy.Random)
Burst mode
The worker exits as soon as the queue drains instead of waiting for new work:
worker = AsyncWorker(app=app, burst=True)
await worker.run()
# returns when the queue is empty
Lifecycle hooks
Run code before or after every job:
from pgwerk import Context
async def log_start(ctx: Context) -> None:
print(f"Starting {ctx.job.function} [{ctx.job.id}]")
async def log_end(ctx: Context) -> None:
if ctx.exception:
print(f"Failed: {ctx.exception}")
else:
print(f"Completed [{ctx.job.id}]")
worker = AsyncWorker(
app=app,
before_process=[log_start],
after_process=[log_end],
)
# Or add them later
worker.add_before_process(log_start)
worker.add_after_process(log_end)
Hooks receive a Context object. In after_process hooks, ctx.exception is set if the job raised.
Exception handlers
Push handlers onto a stack to intercept job failures:
async def report_error(job, exc):
sentry_sdk.capture_exception(exc)
worker.push_exception_handler(report_error)
Handlers are called in reverse push order (most recently pushed first). Use pop_exception_handler() to remove the top handler.
Signals and shutdown
Workers install handlers for SIGTERM and SIGINT. On receipt, the worker stops accepting new jobs and waits up to shutdown_timeout seconds for in-flight jobs to finish before cancelling them.
Running alongside a web server
Workers can run in the same process as an ASGI/WSGI application using background tasks or a separate asyncio task:
import asyncio
from pgwerk import AsyncWorker
async def main():
async with app:
worker = AsyncWorker(app=app)
await asyncio.gather(
run_web_server(),
worker.run(),
)