Job

Represents a job row in the database.

Field Type Description
id str UUID
function str Dotted import path of the handler
queue str Queue name
status JobStatus Current status
priority int Higher values run first
attempts int Number of execution attempts so far
max_attempts int Maximum allowed attempts
scheduled_at datetime When the job becomes eligible to run
enqueued_at datetime When the job was inserted
key str \| None Idempotency key
group_key str \| None Concurrency group
payload dict \| None Deserialized handler arguments
result Any Return value of the handler (set on completion)
error str \| None Traceback string (set on failure)
timeout_secs int \| None Job timeout in seconds
heartbeat_secs int \| None Heartbeat renewal interval
started_at datetime \| None When the current attempt started
completed_at datetime \| None When the job reached a terminal state
touched_at datetime \| None Last heartbeat timestamp
expires_at datetime \| None Row expiry time
worker_id str \| None ID of the worker processing the job
meta dict \| None Metadata attached at enqueue time
result_ttl int \| None Seconds to retain completed rows
failure_ttl int \| None Seconds to retain failed rows
ttl int \| None Seconds until an unstarted job expires
on_success str \| None Dotted path to success callback
on_failure str \| None Dotted path to failure callback
on_stopped str \| None Dotted path to stopped callback
retry_intervals list[int] \| None Per-attempt retry delays
repeat_remaining int \| None Remaining repeat iterations
repeat_interval_secs int \| None Uniform repeat interval
repeat_intervals list[int] \| None Per-run repeat delays
cron_name str \| None Cron schedule that created this job
failure_mode str "hold" or "delete"

JobStatus

class JobStatus(str, enum.Enum):
    Scheduled = "scheduled"   # waiting for scheduled_at
    Queued    = "queued"      # eligible for dequeue
    Waiting   = "waiting"     # blocked on dependencies
    Active    = "active"      # being executed by a worker
    Aborting  = "aborting"    # abort requested, not yet confirmed
    Complete  = "complete"    # finished successfully
    Failed    = "failed"      # retries exhausted
    Aborted   = "aborted"     # cancelled before or during execution

Terminal statuses: Complete, Failed, Aborted.


JobExecution

Represents a single execution attempt.

Field Type Description
id str UUID
job_id str Parent job ID
attempt int Attempt number (1-based)
status ExecutionStatus Outcome of this attempt
worker_id str \| None Worker that ran this attempt
error str \| None Traceback if failed
result Any Return value if successful
started_at datetime \| None When this attempt started
completed_at datetime \| None When this attempt ended

ExecutionStatus

class ExecutionStatus(str, enum.Enum):
    Running  = "running"
    Complete = "complete"
    Failed   = "failed"
    Aborted  = "aborted"

Retry

Configure maximum attempts and back-off delays.

from pgwerk import Retry

# 4 total attempts with custom per-attempt delays
retry = Retry(max=4, intervals=[10, 60, 300])

# 3 attempts with a uniform 30-second delay
retry = Retry(max=3, intervals=30)
Field Type Description
max int Total attempts (including the first). Must be ≥ 1
intervals int \| list[int] Delay in seconds between attempts. A single int for uniform delay; a list for per-attempt delays. The last value is reused when the list is shorter than max - 1

Repeat

Re-enqueue a job after each successful run.

from pgwerk import Repeat

# Run 6 times total (first + 5 repeats), 1 hour apart
repeat = Repeat(times=5, interval=3600)

# Custom per-run delays
repeat = Repeat(times=3, intervals=[60, 300, 3600])
Field Type Description
times int Additional runs after the first. Must be ≥ 1
interval int Uniform delay in seconds between runs (default 0)
intervals list[int] \| None Per-run delays; overrides interval

Dependency

Declare that a job must wait for an upstream job.

from pgwerk import Dependency

job_a = await app.enqueue(step_one)
await app.enqueue(step_two, _depends_on=Dependency(job_a, allow_failure=True))
Field Type Description
job str \| Job Upstream job ID or Job object
allow_failure bool If True, the dependent job still runs even if this dependency fails or is aborted

Callback

A callback function with an optional timeout.

from pgwerk import Callback

await app.enqueue(my_func, _on_success=Callback(func=notify, timeout=10))
Field Type Description
func Callable \| str The callback function or its dotted import path
timeout int \| None Seconds before the callback is cancelled

Context

Execution context injected into handlers.

Field Type Description
app Werk The connected app instance
worker BaseWorker The worker processing this job
job Job The job being executed
exception Exception \| None Set in after_process hooks when the job raised

EnqueueParams

Specification for a single job in enqueue_many.

from pgwerk import EnqueueParams

await app.enqueue_many([
    EnqueueParams(func=process, kwargs={"id": i}, queue="bulk")
    for i in range(100)
])
Field Default Description
func required Callable to execute
args () Positional arguments
kwargs {} Keyword arguments
queue "default" Queue name
priority 0 Job priority
delay None Seconds from now before eligible
at None Absolute scheduled time
retry 1 Max attempts or Retry object
timeout None Job timeout in seconds
heartbeat None Heartbeat interval in seconds
key None Idempotency key
group None Concurrency group
meta None Metadata dict
result_ttl None Completed-row retention
failure_ttl None Failed-row retention
ttl None Unstarted-job expiry
on_success None Success callback
on_failure None Failure callback
on_stopped None Stopped callback
repeat None Repeat policy
depends_on None Upstream dependencies
failure_mode "hold" "hold" or "delete"

CronJob

A function registered with CronScheduler.

Field Type Description
func Callable Function to enqueue
queue str Target queue (default "default")
args tuple Positional arguments
kwargs dict Keyword arguments
interval int \| None Seconds between runs
cron str \| None Cron expression
timeout int \| None Job timeout in seconds
result_ttl int \| None Completed-row retention
failure_ttl int \| None Failed-row retention
meta dict \| None Metadata attached to every enqueued job
name str Unique name (defaults to module.qualname)
paused bool Whether this job is currently paused

FailureMode

class FailureMode(str, enum.Enum):
    Hold   = "hold"    # keep the failed job row (default)
    Delete = "delete"  # remove the row on terminal failure