BaseWorker
Abstract base class shared by all worker implementations. Provides the polling loop, dequeue logic, ack/nack handling, heartbeating, LISTEN/NOTIFY integration, sweep, and graceful shutdown.
Constructor
BaseWorker(
app: Werk,
queues: list[str] | None = None,
concurrency: int = 10,
heartbeat_interval: int | None = None,
poll_interval: float | None = None,
dequeue_strategy: DequeueStrategy = DequeueStrategy.Priority,
burst: bool = False,
before_process: list[Callable] | None = None,
after_process: list[Callable] | None = None,
sweep_interval: float | None = None,
abort_interval: float | None = None,
shutdown_timeout: float | None = None,
)
Methods
run() → None
Start the worker. Registers the worker in the database, starts side tasks (heartbeat, listen, abort, sweep loops), processes jobs until shutdown, then deregisters.
add_before_process(hook) → None
Append a hook called before each job starts. Hooks receive a Context argument.
add_after_process(hook) → None
Append a hook called after each job finishes (whether it succeeded or failed).
push_exception_handler(handler) → None
Push a handler onto the exception stack. Handlers are called in reverse push order when a job raises. Signature: handler(job: Job, exc: Exception) -> None.
pop_exception_handler() → Callable
Remove and return the top exception handler. Raises IndexError if the stack is empty.
AsyncWorker
Runs handlers as coroutines on the asyncio event loop. Best for I/O-bound work.
from pgwerk import AsyncWorker
worker = AsyncWorker(app=app, queues=["default", "high"], concurrency=20)
await worker.run()
ThreadWorker
Runs each handler in a thread-pool executor. The event loop remains unblocked. Use for blocking code that cannot be made async.
from pgwerk import ThreadWorker
worker = ThreadWorker(app=app, concurrency=8)
await worker.run()
ProcessWorker
Runs handlers in a process-pool executor. Provides true CPU parallelism by bypassing the GIL.
from pgwerk import ProcessWorker
worker = ProcessWorker(app=app, concurrency=4)
await worker.run()
ForkWorker
Forks a new process for each job. Provides maximum isolation — a crashing job cannot corrupt the worker process. A SIGTERM grace period (sigterm_grace) is applied before SIGKILL.
from pgwerk import ForkWorker
worker = ForkWorker(app=app, concurrency=4)
await worker.run()
DequeueStrategy
Controls how queues are ordered when selecting jobs.
| Value | Behaviour |
|---|---|
DequeueStrategy.Priority |
Higher-priority jobs run first across all queues |
DequeueStrategy.RoundRobin |
Cycle through queues in turn |
DequeueStrategy.Random |
Pick a queue at random each poll |
from pgwerk import AsyncWorker, DequeueStrategy
worker = AsyncWorker(
app=app,
queues=["a", "b", "c"],
dequeue_strategy=DequeueStrategy.RoundRobin,
)