Install

pip install pgwerk

Requires Python 3.11+ and Postgres 14+. For cron expression support, add the optional extra:

pip install "pgwerk[cron]"

Connect

Create a Werk instance with your Postgres DSN. Call connect() once at startup and disconnect() at shutdown. The async context manager is shorthand for the same pair:

from pgwerk import Werk

app = Werk("postgresql://user:pass@localhost/mydb")

# Explicit lifecycle
await app.connect()
# ... your app ...
await app.disconnect()

# Context manager (recommended)
async with app:
    ...

connect() is idempotent — calling it multiple times is safe. On first connect it runs schema migrations using a Postgres advisory lock, so multiple processes starting simultaneously will not race.

Connection pooling

pgwerk does not maintain an internal connection pool. Each operation opens and closes its own short-lived psycopg connection, so it works naturally with external poolers like PgBouncer in transaction pooling mode. Point your DSN at the pooler and pgwerk uses it transparently — no extra configuration needed.

Define handlers

Handlers are plain async (or sync) functions. werk records their dotted import path and imports them on the worker side when a job runs.

async def send_email(to: str, subject: str) -> None:
    ...

async def resize_image(path: str, width: int) -> str:
    ...

Execution context

Handlers can optionally receive an execution context as their first argument. werk injects it automatically when the first parameter is named ctx or annotated as Context:

from pgwerk import Context

async def send_email(ctx: Context, to: str) -> None:
    print(f"Job {ctx.job.id} on worker {ctx.worker.name}")
    ...

Context carries the connected app, the worker, the job being executed, and any exception raised (available in after_process hooks).

Enqueue a job

Call enqueue from anywhere — web handlers, background tasks, other jobs:

await app.enqueue(send_email, to="user@example.com", subject="Hello")

enqueue returns the inserted Job object, or None when an idempotency key collision is detected.

Run a worker

Workers dequeue and execute jobs. Run one in a separate process or alongside your application:

import asyncio
from pgwerk import AsyncWorker

async def main():
    worker = AsyncWorker(app=app, queues=["default"], concurrency=10)
    await worker.run()

asyncio.run(main())

Or start one from the CLI:

werk worker myapp.tasks:app --queues default --concurrency 10

APP is a module:attribute path to your Werk instance.

Minimal end-to-end example

import asyncio
from pgwerk import Werk, AsyncWorker

app = Werk("postgresql://user:pass@localhost/mydb")

async def greet(name: str) -> str:
    return f"Hello, {name}!"

async def main():
    async with app:
        await app.enqueue(greet, name="world")
        worker = AsyncWorker(app=app, queues=["default"], concurrency=5)
        await worker.run()

asyncio.run(main())