Install

pip install werk

Requires Python 3.11+ and Postgres 14+. For cron expression support, add the optional extra:

pip install "pgwerk[cron]"

Connect

Create a Werk instance with your Postgres DSN. Call connect() once at startup and disconnect() at shutdown. The async context manager is shorthand for the same pair:

from pgwerk import Werk

app = Werk("postgresql://user:pass@localhost/mydb")

# Explicit lifecycle
await app.connect()
# ... your app ...
await app.disconnect()

# Context manager (recommended)
async with app:
    ...

connect() is idempotent — calling it multiple times is safe. On first connect it runs schema migrations using a Postgres advisory lock, so multiple processes starting simultaneously will not race.

Define handlers

Handlers are plain async (or sync) functions. werk records their dotted import path and imports them on the worker side when a job runs.

async def send_email(to: str, subject: str) -> None:
    ...

async def resize_image(path: str, width: int) -> str:
    ...

Execution context

Handlers can optionally receive an execution context as their first argument. werk injects it automatically when the first parameter is named ctx or annotated as Context:

from pgwerk import Context

async def send_email(ctx: Context, to: str) -> None:
    print(f"Job {ctx.job.id} on worker {ctx.worker.name}")
    ...

Context carries the connected app, the worker, the job being executed, and any exception raised (available in after_process hooks).

Enqueue a job

Call enqueue from anywhere — web handlers, background tasks, other jobs:

await app.enqueue(send_email, to="user@example.com", subject="Hello")

enqueue returns the inserted Job object, or None when an idempotency key collision is detected.

Run a worker

Workers dequeue and execute jobs. Run one in a separate process or alongside your application:

import asyncio
from pgwerk import AsyncWorker

async def main():
    worker = AsyncWorker(app=app, queues=["default"], concurrency=10)
    await worker.run()

asyncio.run(main())

Or start one from the CLI:

werkworker myapp.tasks:app --queues default --concurrency 10

APP is a module:attribute path to your Werk instance.

Minimal end-to-end example

import asyncio
from pgwerk import Werk, AsyncWorker

app = Werk("postgresql://user:pass@localhost/mydb")

async def greet(name: str) -> str:
    return f"Hello, {name}!"

async def main():
    async with app:
        await app.enqueue(greet, name="world")
        worker = AsyncWorker(app=app, queues=["default"], concurrency=5)
        await worker.run()

asyncio.run(main())