Scheduler enqueues recurring jobs on a fixed interval or a cron expression. Schedule definitions are persisted in _pgwerk_schedules — multiple scheduler processes share load via SELECT … FOR UPDATE SKIP LOCKED with no primary/standby coordination required.
Basic usage
import asyncio
from pgwerk import Werk, AsyncWorker, Scheduler
app = Werk("postgresql://user:pass@localhost/mydb")
scheduler = Scheduler(app)
@scheduler.register(cron="0 9 * * *", _queue="reports")
async def daily_report():
...
@scheduler.register(interval=300)
async def cleanup():
...
async def main():
async with app:
worker = AsyncWorker(app=app)
await asyncio.gather(worker.run(), scheduler.run())
asyncio.run(main())
register() stages schedules in-process. They are upserted into the database when run() (or sync()) is called.
Cron expressions
Cron expressions require the croniter package:
pip install "pgwerk[cron]"
# or
pip install croniter
Standard five-field cron syntax is supported:
┌───────── minute (0-59)
│ ┌─────── hour (0-23)
│ │ ┌───── day of month (1-31)
│ │ │ ┌─── month (1-12)
│ │ │ │ ┌─ day of week (0-6, Sunday=0)
│ │ │ │ │
* * * * *
@scheduler.register(cron="*/15 * * * *") # every 15 minutes
async def poll():
...
@scheduler.register(cron="0 0 * * 1") # every Monday at midnight
async def weekly_digest():
...
register() options
All options are _-prefixed to avoid collisions with function kwargs passed via args/kwargs:
| Option | Default | Description |
|---|---|---|
cron |
— | Cron expression (mutually exclusive with interval) |
interval |
— | Seconds between runs (mutually exclusive with cron) |
_queue |
"default" |
Target queue for enqueued jobs |
_name |
module.qualname |
Unique schedule name |
args |
[] |
Positional arguments forwarded to the handler |
kwargs |
{} |
Keyword arguments forwarded to the handler |
_timeout |
None |
Job timeout in seconds |
_result_ttl |
None |
Seconds to retain completed job rows |
_failure_ttl |
None |
Seconds to retain failed job rows |
_meta |
None |
Metadata dict attached to every enqueued job |
Imperative scheduling
For dynamic registration after startup, use the async methods. These write to the database immediately and are safe to call while the scheduler is running.
# By cron expression
await scheduler.cron("0 9 * * *", send_report, _queue="reports")
await scheduler.cron("*/5 * * * *", poll_feed, feed_id=42, _timeout=30)
# By interval
await scheduler.interval(3600, sync_data, _queue="etl")
await scheduler.interval(300, cleanup, older_than_days=7, _timeout=60)
# Full control — upserts and recomputes next_run_at from the policy
await scheduler.schedule(send_report, cron="0 10 * * *")
# Start the first run at a specific time
from datetime import datetime, timezone
await scheduler.schedule_at(send_report, datetime(2026, 1, 1, 9, 0, tzinfo=timezone.utc), interval=86400)
# Start the first run after a delay
await scheduler.schedule_in(3600, send_report, interval=86400)
Reconciliation on startup
The on_unregistered parameter controls what happens to schedules that exist in the database but were not registered by the current process at startup:
# "pause" (default) — safe for rolling deploys; pauses orphaned schedules
scheduler = Scheduler(app, on_unregistered="pause")
# "keep" — DB is the source of truth; code has no effect on unregistered schedules
scheduler = Scheduler(app, on_unregistered="keep")
# "delete" — removes schedules not registered by this process (use with care)
scheduler = Scheduler(app, on_unregistered="delete")
Managing schedules
# Update a schedule in place
await scheduler.update("myapp.tasks.cleanup", interval=600, _timeout=120)
# Pause / resume
await scheduler.pause("myapp.tasks.cleanup")
await scheduler.resume("myapp.tasks.cleanup")
# Force a schedule due immediately (fires on next tick)
await scheduler.trigger("myapp.tasks.cleanup")
# Delete a schedule row
await scheduler.delete("myapp.tasks.cleanup")
Introspection
# All schedules
schedules = await scheduler.list_schedules()
for s in schedules:
print(s.name, s.next_run_at, s.paused)
# Single schedule
s = await scheduler.get("myapp.tasks.cleanup")
Manual sync
Push staged registrations without calling run():
inserted, updated, reconciled = await scheduler.sync()
Distributed scheduling
Multiple processes can run Scheduler.run() against the same Postgres instance simultaneously. Each picks up due schedules via SELECT … FOR UPDATE SKIP LOCKED, so work is distributed naturally without any primary election or advisory lock.
Deduplication
Each enqueue uses an idempotency key derived from the schedule name and tick timestamp. If a schedule fires before its previous job has been consumed, the duplicate is silently dropped.