Backon – Python 重试库(零依赖、熔断器、原生支持异步)
Backon – Python retry (zero deps, circuit breaker, async native)

原始链接: https://github.com/Llucs/backon

**backon** 是一个现代化的 Python (3.10+) 库,旨在为重试逻辑提供零依赖、指数退避(exponential backoff)的稳健解决方案。它作为传统库(如 `backoff`)的高性能、功能丰富的替代方案。 **主要特性:** * **多功能 API:** 支持装饰器、函数调用、上下文管理器,并为同步和异步代码提供专用的迭代器 API。 * **高级策略:** 包含全面的等待生成器(指数、常量、斐波那契等)、抖动(jitter)支持,以及使用标准 Python 运算符(`|`, `&`)的可组合停止/重试条件。 * **企业级工具:** 内置对熔断器、请求对冲(request hedging)的支持,以及通过 Prometheus 和 OpenTelemetry 实现的可选可观测性。 * **开发者友好:** 功能包括实例级重试开关、用于测试的自定义睡眠注入、完整的类型提示以及对 Trio 的支持。 * **可观测性:** 提供丰富的回调(`on_attempt`、`on_backoff` 等)和执行统计信息。 **backon** 专为简洁和现代工作流而设计,是现有解决方案的近乎直接的替代品,在无需外部依赖的情况下,提供了更高的可扩展性和对复杂重试场景的内置支持。

``` Hacker News 最新 | 过往 | 评论 | 提问 | 展示 | 招聘 | 提交 登录 Backon – Python 重试库(零依赖,含熔断机制,原生支持异步)(github.com/llucs) Llucs 发布于 2 小时前,4 分 | 隐藏 | 过往 | 收藏 | 讨论 帮助 准则 | 常见问题 | 列表 | API | 安全 | 法律 | 申请加入 YC | 联系 搜索: ```
相关文章

原文

Function decoration for backoff and retry — modern, fast, zero dependencies.

backon is a modern evolution of backoff — a zero-dependency Python library for retry with exponential backoff. It provides decorator, functional, and context manager APIs for both sync and async code.

demo



  • Zero dependencies — pure Python, stdlib only
  • Four APIs — decorator (@on_exception, @on_predicate), functional (retry()), context manager (Retrying), callable (RetryingCaller / AsyncRetryingCaller)
  • Async native — same API works for async def functions
  • Full type hints — validated with mypy, strict mode compatible
  • Global togglebackon.disable() / backon.enable() for testing
  • Custom sleep — inject your own sleep function (useful for testing with asyncio.Event)
  • Multiple wait strategies — exponential, constant, Fibonacci, decay, runtime, randomized, incremental, and composable chains
  • Jitter — full jitter, random jitter, or none
  • Rich callbackson_attempt, on_backoff, on_success, on_giveup, before_sleep, before, after
  • Circuit breaker — CLOSED/OPEN/HALF_OPEN states with automatic recovery
  • Hedging — concurrent retry requests, first-success-wins
  • Prometheus / OpenTelemetry metrics — optional, zero hard dependencies
  • Testing moduledisable_retries(), limit_retries(), remove_backoff(), assert_retried()
  • Trio support — retry with the trio async framework
  • Operator overloading — compose stops with | / &, wait generators with +
  • Iterator APIfor attempt in Retrying(...):
  • Modern packaging — PEP 621, PDM, py.typed

Requires Python 3.10+.


import backon

@backon.on_exception(backon.expo, ValueError, max_tries=3)
def fetch_data():
    return api.call()
@backon.on_predicate(backon.constant, max_tries=5, interval=0.5)
def poll_status():
    return check_ready()
result = backon.retry(
    fetch_data,
    backon.expo,
    exception=ValueError,
    max_tries=3,
)
with backon.Retrying(backon.expo, exception=ValueError, max_tries=3) as r:
    result = r.call(fetch_data)

Async variant:

async with backon.Retrying(backon.constant, exception=ValueError, max_tries=3, interval=0.5) as r:
    result = await r.async_call(fetch_data)

@backon.on_exception(wait_gen, exception, ...)

Retry when the decorated function raises one of the specified exceptions.

@backon.on_exception(backon.expo, (ValueError, TimeoutError), max_tries=5)
def fetch():
    ...
Argument Type Default Description
wait_gen WaitGenerator Wait strategy (expo, constant, fibo, etc.)
exception type or tuple[type] Exception class(es) to retry on
max_tries int or Callable[[], int] None Maximum number of attempts
max_time float, timedelta, or Callable None Maximum total elapsed time
jitter Jitterer or None full_jitter Jitter function
giveup Callable[[Exception], bool or float] lambda e: False Stop retrying for matching exceptions; return float to override wait
on_success Handler or list None Called after successful attempt
on_backoff Handler or list None Called before each retry
on_giveup Handler or list None Called when retries exhausted
on_attempt Handler or list None Called before each attempt
before_sleep Handler or list None Called before sleeping
before Handler or list None Called before each attempt (lower-level than on_attempt)
after Handler or list None Called after each attempt (lower-level than on_success/on_giveup)
retry_error_callback Callable[[dict], Any] None Called when retry gives up instead of raising
raise_on_giveup bool True Raise final exception when giving up
logger str or Logger "backon" Logger name or instance
backoff_log_level int logging.INFO Log level for backoff messages
giveup_log_level int logging.ERROR Log level for giveup messages
sleep Callable[[float], Any] None Custom sleep function
**wait_gen_kwargs varies Extra kwargs passed to the wait generator (e.g. base=3, interval=0.5)

@backon.on_predicate(wait_gen, predicate, ...)

Retry while the predicate matches the return value.

@backon.on_predicate(backon.constant, predicate=lambda x: x is None, max_tries=5)
def poll():
    ...

Accepts all parameters from on_exception except exception, giveup, and raise_on_giveup. Adds:

Argument Type Default Description
predicate Callable[[Any], bool] operator.not_ Retry when this returns True for the return value

backon.retry(target, wait_gen, ...)

result = backon.retry(
    target=my_function,
    wait_gen=backon.expo,
    exception=ValueError,
    max_tries=3,
)

Accepts all parameters from on_exception plus on_predicate extras, plus:

Argument Type Default Description
condition RetryCondition None Advanced retry condition object
stop Stop None Advanced stop condition object
name str "" Identifier for the retry call
**wait_gen_kwargs varies Extra kwargs passed to the wait generator

If target is a coroutine function, retry() returns a coroutine. Otherwise it returns the result synchronously.

backon.Retrying(wait_gen, ...)

with backon.Retrying(backon.expo, exception=ValueError, max_tries=3) as r:
    r.call(my_function)

async with backon.Retrying(backon.constant, exception=ValueError, max_tries=3, interval=0.5) as r:
    await r.async_call(my_async_function)
Method Description
call(target, *args, **kwargs) Execute synchronously
async_call(target, *args, **kwargs) Execute asynchronously
copy() Return a modified copy of the Retrying instance
statistics Property returning dict with attempt_number, elapsed, idle_for, start_time
call_state Property returning the current RetryCallState
enabled Property to enable/disable retry per-instance

Arguments: Same as retry(), plus enabled (default True).

backon.RetryingCaller(wait_gen, ...)

A callable object with pre-bound exception type via .on().

caller = backon.RetryingCaller(backon.expo, max_tries=3)
caller = caller.on(ValueError)

result = caller(my_function, arg1, arg2)

backon.AsyncRetryingCaller(wait_gen, ...)

Async variant of RetryingCaller.

caller = backon.AsyncRetryingCaller(backon.expo, max_tries=3).on(ValueError)
result = await caller(my_async_function, arg1, arg2)
Method Description
.on(exception) Return a copy bound to the given exception type
.copy() Return a modified copy
.__call__(target, *args, **kwargs) Execute with retry

All wait generators are callables that produce a sequence of wait times. Pass extra kwargs (e.g. interval=0.5, base=3) as **wait_gen_kwargs to decorators and functions.

Generator Signature Description
expo (base=2, factor=1, max_value=None) Exponential backoff: factor * base^n
constant (interval=1) Fixed interval; accepts float or Sequence[float] for varied intervals
fibo (max_value=None) Fibonacci sequence: 1, 1, 2, 3, 5, 8, ...
runtime (value=Callable) Dynamic wait from return value or exception — useful for Retry-After headers
decay (initial_value=1, decay_factor=1, min_value=None) Exponential decay: initial * e^(-t * decay_factor)
wait_random_exponential (multiplier=1, max_value=None, exp_base=2, min_value=0) Randomized exponential (uniform random between 0 and the exponential value)
wait_incrementing (start=1, increment=1, max_value=None) Linear increment: start + n * increment
wait_chain (*generators) Sequentially play through multiple generators
wait_exception (value=Callable) Dynamic wait based on the caught exception
wait_random (min=0, max=1) Uniform random wait between min and max
wait_exponential_jitter (initial=1, max=60, exp_base=2, jitter=1) Exponential backoff with added random jitter
wait_none () Always returns 0 (no wait)

Composition: Combine wait generators with +:

wait_strategy = backon.expo(base=3) + backon.constant(interval=0.5)

Stop conditions determine when retry should cease. They can be composed with | (any) and & (all).

Condition Description
stop_after_attempt(max_attempts) Stop after N attempts
stop_after_delay(max_delay) Stop after total elapsed time exceeds max_delay seconds
stop_before_delay(max_delay) Stop if the next wait would exceed max_delay
stop_all(*stops) Stop when all sub-conditions are met
stop_any(*stops) Stop when any sub-condition is met
stop_never() Never stop (retry indefinitely)
stop_when_event_set(event) Stop when a threading.Event is set
from backon import stop_after_attempt, stop_after_delay, stop_any

stop = stop_after_attempt(5) | stop_after_delay(30.0)

Retry conditions determine whether a retry should happen. They can be composed with | and &.

Condition Description
retry_if_exception_type(*types) Retry if exception is an instance of given type(s)
retry_if_exception(predicate) Retry if the exception matches a custom predicate
retry_if_exception_message(message, match=None) Retry if exception message contains a string (or matches regex with match="re")
retry_if_result(predicate) Retry if the return value matches a predicate
retry_if_not_result(predicate) Retry if the return value does NOT match a predicate
retry_all(*conditions) Retry only when all conditions pass
retry_any(*conditions) Retry when any condition passes
retry_always() Always retry
retry_never() Never retry
from backon import retry_if_exception_type, retry_if_exception_message, retry_all

condition = retry_all(
    retry_if_exception_type(HTTPError),
    retry_if_exception_message("429"),
)

@backon.on_exception(backon.expo, ValueError, jitter=backon.full_jitter)
def f():
    ...
Jitter Effect
backon.full_jitter Random value between 0 and the calculated wait time
backon.random_jitter Adds random() to the calculated wait time (~+0.5s on average)
None No jitter (deterministic waits)

Handlers receive a details dict with contextual information:

def handler(details):
    print(f"Attempt {details['tries']}, elapsed {details['elapsed']:.2f}s")

@backon.on_exception(
    backon.expo, ValueError, max_tries=3,
    on_attempt=handler,
    on_backoff=handler,
    on_success=handler,
    on_giveup=handler,
)
def f():
    ...

Available keys in details:

Key Available in
target All
args, kwargs All
tries All
elapsed All
value on_success, on_backoff, on_giveup
exception on_backoff, on_giveup
wait on_backoff, before_sleep

Useful in tests to disable retry logic globally:

backon.disable()   # skip retry, call function directly
backon.enable()    # re-enable retry

Per-instance toggle via Retrying.enabled:

r = backon.Retrying(backon.expo, exception=ValueError, max_tries=3)
r.enabled = False
result = r.call(fn)  # no retry

All three APIs work with async functions transparently:

@backon.on_exception(backon.expo, ValueError, max_tries=3)
async def fetch():
    return await api.call()

result = await backon.retry(fetch, backon.expo, exception=ValueError, max_tries=3)

async with backon.Retrying(backon.expo, exception=ValueError, max_tries=3) as r:
    result = await r.async_call(fetch)

Replace the default sleep for testing or special environments:

@backon.on_exception(
    backon.expo, ValueError, max_tries=3,
    sleep=lambda s: print(f"waiting {s}s"),
)
def f():
    ...

# With asyncio.Event for testing
import asyncio

event = asyncio.Event()
@backon.on_exception(
    backon.expo, ValueError, max_tries=3,
    sleep=backon.sleep_using_event(event),
)
async def f():
    ...

Circuit breaker with three states: CLOSED (normal), OPEN (failing), HALF_OPEN (testing recovery).

from backon._circuit_breaker import CircuitBreaker, BreakerRetrying, CircuitOpenError

breaker = BreakerRetrying(
    backon.expo, max_tries=3,
    breaker=CircuitBreaker(
        failure_threshold=5,
        recovery_timeout=60.0,
        half_open_max_calls=1,
    ),
)

try:
    result = breaker.call(fetch)
except CircuitOpenError:
    print("Circuit is open, skipping request")
CircuitBreaker parameter Default Description
failure_threshold 5 Consecutive failures before opening the circuit
recovery_timeout 60.0 Seconds before transitioning from OPEN to HALF_OPEN
half_open_max_calls 1 Allowed calls in HALF_OPEN state before fully closing
name "" Identifier for the breaker

Run multiple retry attempts concurrently and return the first success.

from backon._hedging import hedge, HedgingRetrying

# Functional
result = hedge(fetch, backon.expo, max_hedge=3)

# Decorator
@backon.on_hedge(backon.expo, max_hedge=3)
def fetch():
    ...

# Context manager
with HedgingRetrying(backon.expo, max_hedge=3) as h:
    result = h.call(fetch)
Parameter Default Description
max_hedge 3 Number of concurrent hedged requests
timeout None Maximum time to wait for any hedge
on_hedge None Callback when a hedge request is sent

Optional Prometheus and OpenTelemetry metrics. Requires prometheus_client or opentelemetry-api to be installed.

from backon._instrumentation import PrometheusMetrics, OTelMetrics, set_metrics_collector

# Prometheus
set_metrics_collector(PrometheusMetrics())

# OpenTelemetry
set_metrics_collector(OTelMetrics(meter_name="myapp.backon"))

Metrics collected:

  • backon_retry_attempts_total (attempts, labeled by target and exception type)
  • backon_retry_success_total (successes)
  • backon_retry_failure_total (failures)
  • backon_circuit_breaker_open_total / backon_circuit_breaker_close_total
  • backon_hedge_requests_total
  • backon.retry.attempt_duration (histogram, OTel only)
from backon._testing import (
    disable_retries, enable_retries,
    test_config, limit_retries, remove_backoff,
    assert_retried, assert_not_retried,
)

# Context manager that skips retry for a block
with disable_retries():
    result = fetch()

# Limit max retries in tests
with limit_retries(2):
    fetch()

# Remove backoff delay entirely
with remove_backoff():
    fetch()

# Assert the function was retried N times
assert_retried(fetch, expected_tries=3)

Retry with the trio async framework:

from backon._trio import retry_exception, retry_predicate

@retry_exception(backon.expo, ValueError, max_tries=3)
async def fetch():
    ...

Requires trio to be installed.

Check if code is running inside a retry and get the current attempt number anywhere in the call stack:

from backon import is_retrying, get_attempt_number

def log_attempt():
    if is_retrying():
        print(f"This is attempt #{get_attempt_number()}")

@backon.on_exception(backon.expo, ValueError, max_tries=3)
def fetch():
    log_attempt()
    return api.call()

Uses contextvars — thread-safe and async-safe.

Override the wait time per attempt by returning a float from the giveup callback. Useful for respecting Retry-After headers.

def respect_retry_after(exc: HTTPError) -> float:
    return exc.response.headers.get("Retry-After", 1.0)

@backon.on_exception(backon.expo, HTTPError, giveup=respect_retry_after)
def fetch():
    ...

When 5 or more retries occur with less than 100ms between them, backon logs a warning. This helps detect misconfigured retry policies before they cause issues.

r = backon.Retrying(backon.expo, exception=ValueError, max_tries=3)
result = r.call(fetch)

print(r.statistics)
# {'start_time': ..., 'attempt_number': 2, 'idle_for': 1.5, 'elapsed': 2.3}

print(r.call_state)
# RetryCallState(fn=..., attempt_number=2, ...)

Compose stops, conditions, and wait generators using Python operators:

# Stop when either condition is met
stop = stop_after_attempt(5) | stop_after_delay(30.0)

# Retry when both conditions pass
cond = retry_if_exception_type(TimeoutError) & retry_if_result(lambda x: x is None)

# Wait with combined strategy
wait = backon.expo(base=3) + backon.constant(interval=0.5)
for attempt in backon.Retrying(backon.expo, exception=ValueError, max_tries=3):
    with attempt:
        result = fetch()
    if not attempt.failed:
        break

backon is a near-drop-in replacement. Change your imports:

- import backoff
+ import backon

- @backoff.on_exception(backoff.expo, ValueError, max_tries=3)
+ @backon.on_exception(backon.expo, ValueError, max_tries=3)

Key differences:

Area backoff backon
Python support 3.7+ 3.10+
Type hints Partial Full
on_attempt callback Not supported Supported
Context manager Not supported Retrying class
Functional API Not supported retry() function, RetryingCaller
Global toggle Not supported disable() / enable()
Custom sleep Not supported sleep= parameter
Circuit breaker Not supported CircuitBreaker + BreakerRetrying
Hedging Not supported hedge() / on_hedge()
Metrics Not supported Prometheus / OTel
Wait generator composition Not supported + operator
Stop / RetryCondition composition Not supported | / & operators
Trio Not supported import from backon._trio
Iterator API Not supported for attempt in Retrying():
Build system Poetry PDM (PEP 621)

git clone https://github.com/Llucs/backon.git
cd backon
pip install pdm
pdm install
pdm run ruff check backon/ tests/
pdm run mypy backon/
pdm run pytest tests/ -q

MIT

Made by Llucs with ❤️

联系我们 contact @ memedata.com