Basic enqueue

Pass the callable and its arguments:

await app.enqueue(send_email, to="user@example.com", subject="Hello")

Positional arguments are also supported:

await app.enqueue(process_image, "/tmp/photo.jpg", 800)

enqueue returns the inserted Job object, or None when an idempotency key collision is detected.

Control options

All control options are keyword arguments prefixed with _. Everything else is forwarded to the handler as its payload.

Queue and priority

await app.enqueue(my_func, x=1, _queue="high", _priority=10)

Higher _priority values run first within the same queue. The default queue is "default" and the default priority is 0.

Scheduling

from datetime import datetime, timezone

# Run 30 seconds from now
await app.enqueue(my_func, _delay=30)

# Run at an absolute time
await app.enqueue(my_func, _at=datetime(2025, 6, 1, 9, 0, tzinfo=timezone.utc))

Retries

Pass an integer for a simple max-attempt count, or a Retry object for fine-grained control:

from pgwerk import Retry

# Retry up to 3 times total (including the first attempt)
await app.enqueue(my_func, _retry=3)

# Custom back-off: wait 10s, then 60s, then 300s between retries
await app.enqueue(my_func, _retry=Retry(max=4, intervals=[10, 60, 300]))

Retry.intervals can also be a single integer for a uniform delay between all retries.

Timeout

Cancel the job if it does not finish within the given number of seconds:

await app.enqueue(my_func, _timeout=120)

Heartbeat

For long-running jobs, instruct the worker to periodically renew the job's active timestamp so the sweep does not reap it:

await app.enqueue(my_func, _heartbeat=30)  # renew every 30 seconds

Idempotency key

Duplicate enqueues with the same key are silently dropped:

await app.enqueue(send_invoice, invoice_id=42, _key="invoice:42")

Concurrency group

At most one job from the same group can be active at a time:

await app.enqueue(sync_user, user_id=99, _group="user:99")

Metadata

Attach arbitrary data to the job for inspection or callbacks:

await app.enqueue(my_func, _meta={"source": "api", "request_id": "abc"})

TTL and expiry

# Expire an unstarted job after 60 seconds
await app.enqueue(my_func, _ttl=60)

# Keep the completed-job row for 1 hour
await app.enqueue(my_func, _result_ttl=3600)

# Keep the failed-job row for 24 hours
await app.enqueue(my_func, _failure_ttl=86400)

Failure mode

By default, failed jobs are kept in the database (failure_mode="hold") so you can inspect and retry them. Set failure_mode="delete" to remove the row on terminal failure:

await app.enqueue(my_func, _failure_mode="delete")

Callbacks

Register functions to be called when a job completes, fails, or is stopped:

from pgwerk import Callback

async def on_done(ctx):
    print(f"Job {ctx.job.id} finished")

async def on_error(ctx):
    print(f"Job {ctx.job.id} failed: {ctx.job.error}")

await app.enqueue(
    my_func,
    _on_success=on_done,
    _on_failure=on_error,
    _on_stopped=on_error,
)

# With a per-callback timeout
await app.enqueue(my_func, _on_success=Callback(func=on_done, timeout=10))

Repeating jobs

Re-enqueue a job automatically after each successful run:

from pgwerk import Repeat

# Run 6 times total (first + 5 repeats), waiting 1 hour between each
await app.enqueue(cleanup, _repeat=Repeat(times=5, interval=3600))

# Custom per-run delays
await app.enqueue(cleanup, _repeat=Repeat(times=3, intervals=[60, 300, 3600]))

Dependencies

Jobs can wait for one or more upstream jobs before they become eligible:

from pgwerk import Dependency

job_a = await app.enqueue(step_one)
job_b = await app.enqueue(step_two, _depends_on=job_a)

# Allow the dependent job to run even if job_a fails
job_b = await app.enqueue(step_two, _depends_on=Dependency(job_a, allow_failure=True))

# Multiple dependencies
job_c = await app.enqueue(step_three, _depends_on=[job_a, job_b])

The dependent job enters waiting status and is promoted to queued once all its dependencies reach a terminal state.

Bulk enqueue

Insert multiple jobs in a single round-trip:

from pgwerk import EnqueueParams

await app.enqueue_many([
    EnqueueParams(func=process, kwargs={"item_id": i}, queue="bulk")
    for i in range(1000)
])

enqueue_many returns a list of Job | None in the same order as the input specs.

Transactional enqueue

Enqueue a job inside an existing database transaction by passing _conn:

async with pool.connection() as conn:
    await conn.execute("INSERT INTO orders ...", ...)
    await app.enqueue(fulfill_order, order_id=123, _conn=conn)
    # both the INSERT and the enqueue commit or roll back together

Wait for a result

apply enqueues a job and blocks until it finishes, returning its result:

result = await app.apply(greet, name="world", timeout=30)
# result == "Hello, world!"

map does the same for multiple inputs in parallel:

results = await app.map(
    process_item,
    [{"item_id": 1}, {"item_id": 2}, {"item_id": 3}],
    timeout=60,
)