展示 HN: FastScheduler – 装饰器优先的 Python 任务调度器,支持异步
Show HN: FastScheduler – Decorator-first Python task scheduler, async support

原始链接: https://github.com/MichielMe/fastscheduler

## FastScheduler:轻量级 Python 任务调度器 FastScheduler 是一个简单但强大的 Python 库,用于调度任务,提供异步支持、时区处理和实时仪表盘。安装很简单,可以使用 `pip`,提供基本功能、FastAPI 集成或包含 cron 支持的完整功能选项。 任务使用简洁的装饰器 API 定义,支持间隔(秒、分钟等)、每日/每周计划和复杂的 cron 表达式。功能包括持久化状态(在重启后存活)、具有指数退避的自动重试、作业超时、暂停/恢复功能以及用于失败作业的死信队列。 一个关键特性是集成的 FastAPI 仪表盘,它通过服务器发送事件 (SSE) 提供实时监控、作业控制、执行历史记录和统计信息。配置选项允许自定义状态持久化、历史记录保留和工作器并发性。 FastScheduler 会自动将状态保存到磁盘,确保作业在重启后恢复,并提供方法以编程方式管理作业、查看历史记录和检索统计信息。它正在积极开发中,并欢迎贡献!

Hacker News 新闻 | 过去 | 评论 | 提问 | 展示 | 招聘 | 提交 登录 展示 HN: FastScheduler – 装饰器优先的 Python 任务调度器,支持异步 (github.com/michielme) 4 分,michielme 发表于 2 小时前 | 隐藏 | 过去 | 收藏 | 讨论 大家好!我构建这个是因为我发现自己总是需要 Celery 来处理简单的计划任务,但感觉过于复杂。我只需要“每小时运行这个函数”或“每天早上 9 点运行”,而不是分布式工作者。所以它使用装饰器进行调度 (@scheduler.every(5).minutes, @scheduler.daily.at("09:00")),状态保存到 JSON 文件,以便任务在重启后仍然存在,并且有一个可选的 FastAPI 控制面板,如果你想查看正在运行的任务。 不需要 Redis,不需要消息代理,在你的应用进程内运行。权衡是它只能单进程运行——如果你需要分布式工作者,请坚持使用 Celery。 指南 | 常见问题 | 列表 | API | 安全 | 法律 | 申请 YC | 联系方式 搜索:
相关文章

原文

Simple, lightweight task scheduler for Python with async support, timezone handling, cron expressions, and a beautiful real-time dashboard.

If this saves you time, ⭐️ the repo and open an issue for ideas — I'm actively improving it.

GitHub Stars License: MIT Python 3.10+

FastScheduler Demo

  • 🎯 Simple decorator-based API - Schedule tasks in one line
  • Async/await support - Native async function support
  • 🕐 Timezone support - Schedule jobs in any timezone
  • 📅 Cron expressions - Complex schedules with cron syntax
  • 💾 Persistent state - Survives restarts, handles missed jobs
  • 🎨 FastAPI dashboard - Beautiful real-time monitoring UI
  • 🔄 Automatic retries - Configurable retry with exponential backoff
  • ⏱️ Job timeouts - Kill long-running jobs automatically
  • ⏸️ Pause/Resume - Control jobs without removing them
  • 📋 Dead Letter Queue - Track and debug failed jobs
# Basic installation
pip install fastscheduler

# With FastAPI dashboard
pip install fastscheduler[fastapi]

# With cron expression support
pip install fastscheduler[cron]

# All features
pip install fastscheduler[all]
from fastscheduler import FastScheduler

scheduler = FastScheduler(quiet=True)

@scheduler.every(10).seconds
def task():
    print("Task executed")

@scheduler.daily.at("14:30")
async def daily_task():
    print("Daily task at 2:30 PM")

scheduler.start()
@scheduler.every(10).seconds
@scheduler.every(5).minutes
@scheduler.every(2).hours
@scheduler.every(1).days
@scheduler.daily.at("09:00")              # Daily at 9 AM
@scheduler.hourly.at(":30")               # Every hour at :30
@scheduler.weekly.monday.at("10:00")      # Every Monday at 10 AM
@scheduler.weekly.weekdays.at("09:00")    # Weekdays at 9 AM
@scheduler.weekly.weekends.at("12:00")    # Weekends at noon

Requires: pip install fastscheduler[cron]

@scheduler.cron("0 9 * * MON-FRI")        # 9 AM on weekdays
def market_open():
    ...

@scheduler.cron("*/15 * * * *")           # Every 15 minutes
def frequent_check():
    ...

@scheduler.cron("0 0 1 * *")              # First day of each month
def monthly_report():
    ...
@scheduler.once(60)                        # Run once after 60 seconds
def delayed_task():
    ...

@scheduler.at("2024-12-25 00:00:00")      # Run at specific datetime
def christmas_task():
    ...

Schedule jobs in any timezone:

# Using the tz parameter
@scheduler.daily.at("09:00", tz="America/New_York")
def nyc_morning():
    print("Good morning, New York!")

# Using the .tz() method (chainable)
@scheduler.weekly.monday.tz("Europe/London").at("09:00")
def london_standup():
    print("Monday standup")

# With cron expressions
@scheduler.cron("0 9 * * MON-FRI").tz("Asia/Tokyo")
def tokyo_market():
    print("Tokyo market open")

Common timezones: UTC, America/New_York, America/Los_Angeles, Europe/London, Europe/Paris, Asia/Tokyo, Asia/Shanghai, Australia/Sydney

Kill jobs that run too long:

@scheduler.every(1).minutes.timeout(30)   # Kill if runs > 30 seconds
def quick_task():
    ...

@scheduler.daily.at("02:00").timeout(3600)  # 1 hour max
def nightly_backup():
    ...

Configure automatic retries on failure:

@scheduler.every(5).minutes.retries(5)    # Retry up to 5 times
def flaky_api_call():
    ...

Retries use exponential backoff (2s, 4s, 8s, 16s, ...).

Don't run missed jobs after restart:

@scheduler.every(1).hours.no_catch_up()
def hourly_stats():
    ...

Pause, Resume, and Cancel

# Pause a job (stays in queue but won't execute)
scheduler.pause_job("job_0")

# Resume a paused job
scheduler.resume_job("job_0")

# Cancel and remove a job
scheduler.cancel_job("job_0")

# Cancel all jobs with a specific function name
scheduler.cancel_job_by_name("my_task")

Add a beautiful real-time dashboard to your FastAPI app:

from fastapi import FastAPI
from fastscheduler import FastScheduler
from fastscheduler.fastapi_integration import create_scheduler_routes

app = FastAPI()
scheduler = FastScheduler(quiet=True)

# Add dashboard at /scheduler/
app.include_router(create_scheduler_routes(scheduler))

@scheduler.every(30).seconds
def background_task():
    print("Background work")

scheduler.start()

Access at http://localhost:8000/scheduler/

FastScheduler Dashboard

  • Real-time updates via Server-Sent Events (SSE)
  • Job table with status indicators, last 5 run results, and countdown timers
  • Quick actions - Pause/Resume/Cancel directly from the UI
  • Execution history tab with filtering and search
  • Dead letter queue tab - view failed jobs with error details
  • Statistics - Success rate, uptime, active jobs count
  • Toast notifications - Alerts for job completions and failures
Endpoint Method Description
/scheduler/ GET Dashboard UI
/scheduler/api/status GET Scheduler status
/scheduler/api/jobs GET List all jobs
/scheduler/api/jobs/{job_id} GET Get specific job
/scheduler/api/jobs/{job_id}/pause POST Pause a job
/scheduler/api/jobs/{job_id}/resume POST Resume a job
/scheduler/api/jobs/{job_id}/cancel POST Cancel a job
/scheduler/api/history GET Execution history
/scheduler/api/dead-letters GET Dead letter queue (failed jobs)
/scheduler/api/dead-letters DELETE Clear dead letter queue
/scheduler/events GET SSE event stream
scheduler = FastScheduler(
    state_file="scheduler.json",    # Persistence file (default: fastscheduler_state.json)
    quiet=True,                     # Suppress log messages (default: False)
    auto_start=False,               # Start immediately (default: False)
    max_history=5000,               # Max history entries to keep (default: 10000)
    max_workers=20,                 # Concurrent job threads (default: 10)
    history_retention_days=8,       # Delete history older than X days (default: 7)
    max_dead_letters=500,           # Max failed jobs in dead letter queue (default: 500)
)

History is automatically cleaned up based on two limits (both are enforced):

  • Count limit: max_history - maximum number of entries
  • Time limit: history_retention_days - maximum age in days

Set history_retention_days=0 to disable time-based cleanup (only count limit applies).

Failed job executions are automatically stored in a separate dead letter queue for debugging:

# Get failed jobs
dead_letters = scheduler.get_dead_letters(limit=100)

# Clear the queue
scheduler.clear_dead_letters()

The dead letter queue:

  • Stores the last max_dead_letters failed jobs (default: 500)
  • Persists to a separate JSON file (*_dead_letters.json)
  • Includes error messages, timestamps, run counts, and execution times
  • Viewable in the dashboard "Failed" tab
# Get all jobs
jobs = scheduler.get_jobs()

# Get specific job
job = scheduler.get_job("job_0")

# Get execution history
history = scheduler.get_history(limit=100)
history = scheduler.get_history(func_name="my_task", limit=50)

# Get statistics
stats = scheduler.get_statistics()
# Returns: total_runs, total_failures, uptime, per_job stats

# Print simple status to console
scheduler.print_status()
with FastScheduler(quiet=True) as scheduler:
    @scheduler.every(5).seconds
    def task():
        print("Running")

    # Scheduler starts automatically
    time.sleep(30)
# Scheduler stops automatically on exit

FastScheduler automatically saves state to disk:

  • Job definitions and schedules
  • Execution history
  • Statistics
  • Job counter (ensures unique IDs across restarts)

On restart, it:

  1. Restores all jobs
  2. Calculates missed executions
  3. Runs catch-up jobs (unless no_catch_up() is set)
import asyncio
import time
from fastscheduler import FastScheduler

scheduler = FastScheduler(quiet=True)

# Simple interval job
@scheduler.every(10).seconds
def heartbeat():
    print(f"[{time.strftime('%H:%M:%S')}] ❤️ Heartbeat")

# Async job with timezone
@scheduler.daily.at("09:00", tz="America/New_York").timeout(60)
async def morning_report():
    print("Generating report...")
    await asyncio.sleep(5)
    print("Report sent!")

# Cron job with retries
@scheduler.cron("*/5 * * * *").retries(3)
def check_api():
    print("Checking API health")

# Weekly job
@scheduler.weekly.monday.at("10:00")
def weekly_standup():
    print("Time for standup!")

# Start scheduler
scheduler.start()

try:
    while True:
        time.sleep(60)
        scheduler.print_status()
except KeyboardInterrupt:
    scheduler.stop()
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastscheduler import FastScheduler
from fastscheduler.fastapi_integration import create_scheduler_routes

scheduler = FastScheduler(quiet=True)

@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler.start()
    yield
    scheduler.stop(wait=True)

app = FastAPI(lifespan=lifespan)
app.include_router(create_scheduler_routes(scheduler))

@scheduler.every(30).seconds
def background_job():
    print("Working...")
Method Description
start() Start the scheduler
stop(wait=True, timeout=30) Stop gracefully
get_jobs() List all scheduled jobs
get_job(job_id) Get specific job by ID
get_history(func_name=None, limit=50) Get execution history
get_statistics() Get runtime statistics
get_dead_letters(limit=100) Get dead letter queue (failed jobs)
clear_dead_letters() Clear all dead letter entries
pause_job(job_id) Pause a job
resume_job(job_id) Resume a paused job
cancel_job(job_id) Cancel and remove a job
cancel_job_by_name(func_name) Cancel all jobs by function name
print_status() Print status to console
Method Description
every(n).seconds/minutes/hours/days Interval scheduling
daily.at("HH:MM") Daily at specific time
hourly.at(":MM") Hourly at specific minute
weekly.monday/tuesday/.../sunday.at("HH:MM") Weekly scheduling
weekly.weekdays/weekends.at("HH:MM") Weekday/weekend scheduling
cron("expression") Cron expression scheduling
once(seconds) One-time delayed execution
at("YYYY-MM-DD HH:MM:SS") One-time at specific datetime
Modifier Description
.timeout(seconds) Maximum execution time
.retries(n) Maximum retry attempts
.no_catch_up() Skip missed executions
.tz("timezone") Set timezone for schedule

MIT

Contributions welcome! Please open an issue or PR on GitHub.

联系我们 contact @ memedata.com