CronScheduler enqueues recurring jobs on a fixed interval or a cron expression. A Postgres advisory lock ensures only one scheduler instance is active at a time — competing instances enter standby and automatically promote if the primary's connection drops.
Basic usage
from pgwerk import Werk, AsyncWorker, CronScheduler
app = Werk("postgresql://user:pass@localhost/mydb")
async def daily_report():
...
async def cleanup():
...
scheduler = CronScheduler(app)
# Cron expression: every day at 09:00 UTC
scheduler.register(daily_report, cron="0 9 * * *")
# Fixed interval: every 5 minutes
scheduler.register(cleanup, interval=300)
async with app:
worker = AsyncWorker(app=app)
await asyncio.gather(worker.run(), scheduler.run())
Cron expressions
Cron expressions require the croniter package:
pip install "pgwerk[cron]"
# or
pip install croniter
Standard five-field cron syntax is supported:
┌───────── minute (0-59)
│ ┌─────── hour (0-23)
│ │ ┌───── day of month (1-31)
│ │ │ ┌─── month (1-12)
│ │ │ │ ┌─ day of week (0-6, Sunday=0)
│ │ │ │ │
* * * * *
scheduler.register(my_func, cron="*/15 * * * *") # every 15 minutes
scheduler.register(my_func, cron="0 0 * * 1") # every Monday at midnight
scheduler.register(my_func, cron="30 8 1 * *") # 1st of each month at 08:30
CronJob objects
For more control, construct a CronJob directly:
from pgwerk import CronJob
job = CronJob(
func=daily_report,
queue="reports",
cron="0 9 * * *",
timeout=300,
result_ttl=86400,
failure_ttl=604800,
meta={"source": "scheduler"},
)
scheduler.register(job)
CronJob parameters:
| Parameter | Description |
|---|---|
func |
The callable to enqueue |
queue |
Target queue (default "default") |
args / kwargs |
Arguments forwarded to the callable |
interval |
Seconds between runs (mutually exclusive with cron) |
cron |
Cron expression (mutually exclusive with interval) |
timeout |
Job timeout in seconds |
result_ttl |
Seconds to retain completed job rows |
failure_ttl |
Seconds to retain failed job rows |
meta |
Metadata dict attached to every enqueued job |
name |
Unique scheduler name (defaults to module.qualname) |
paused |
Start in paused state |
Pausing and resuming
scheduler.pause("myapp.tasks.cleanup")
scheduler.resume("myapp.tasks.cleanup")
Paused jobs remain registered but are not enqueued until resumed.
Dynamic registration
Jobs can be added and removed while the scheduler is running:
new_job = scheduler.register(new_func, interval=60)
scheduler.unregister("myapp.tasks.old_func")
Introspection
# All registered jobs
for name, cron_job in scheduler.jobs.items():
print(name, cron_job.next_run_at)
# Single job
job = scheduler.get("myapp.tasks.cleanup")
# Count
print(len(scheduler))
# Membership
print("myapp.tasks.cleanup" in scheduler)
Distributed scheduling
When multiple processes run CronScheduler.run() against the same Postgres instance, only one acquires the advisory lock and becomes the primary. The others poll every cron_standby_retry_interval seconds (default 30) and promote automatically if the primary's session ends.
This means you can run a CronScheduler alongside every worker process without configuring a separate scheduler process — failover is automatic.
Deduplication
CronScheduler uses an idempotency key derived from the job name and the tick timestamp. If a tick fires before the previous enqueue has been consumed, the duplicate is silently dropped.