Function decoration for backoff and retry — modern, fast, zero dependencies.
backon is a modern evolution of backoff — a zero-dependency Python library for retry with exponential backoff. It provides decorator, functional, and context manager APIs for both sync and async code.
- Zero dependencies — pure Python, stdlib only
- Four APIs — decorator (
@on_exception,@on_predicate), functional (retry()), context manager (Retrying), callable (RetryingCaller/AsyncRetryingCaller) - Async native — same API works for
async deffunctions - Full type hints — validated with mypy, strict mode compatible
- Global toggle —
backon.disable()/backon.enable()for testing - Custom sleep — inject your own sleep function (useful for testing with
asyncio.Event) - Multiple wait strategies — exponential, constant, Fibonacci, decay, runtime, randomized, incremental, and composable chains
- Jitter — full jitter, random jitter, or none
- Rich callbacks —
on_attempt,on_backoff,on_success,on_giveup,before_sleep,before,after - Circuit breaker — CLOSED/OPEN/HALF_OPEN states with automatic recovery
- Hedging — concurrent retry requests, first-success-wins
- Prometheus / OpenTelemetry metrics — optional, zero hard dependencies
- Testing module —
disable_retries(),limit_retries(),remove_backoff(),assert_retried() - Trio support — retry with the trio async framework
- Operator overloading — compose stops with
|/&, wait generators with+ - Iterator API —
for attempt in Retrying(...): - Modern packaging — PEP 621, PDM, py.typed
Requires Python 3.10+.
import backon
@backon.on_exception(backon.expo, ValueError, max_tries=3)
def fetch_data():
return api.call()@backon.on_predicate(backon.constant, max_tries=5, interval=0.5)
def poll_status():
return check_ready()result = backon.retry(
fetch_data,
backon.expo,
exception=ValueError,
max_tries=3,
)with backon.Retrying(backon.expo, exception=ValueError, max_tries=3) as r:
result = r.call(fetch_data)Async variant:
async with backon.Retrying(backon.constant, exception=ValueError, max_tries=3, interval=0.5) as r:
result = await r.async_call(fetch_data)Retry when the decorated function raises one of the specified exceptions.
@backon.on_exception(backon.expo, (ValueError, TimeoutError), max_tries=5)
def fetch():
...| Argument | Type | Default | Description |
|---|---|---|---|
wait_gen |
WaitGenerator |
— | Wait strategy (expo, constant, fibo, etc.) |
exception |
type or tuple[type] |
— | Exception class(es) to retry on |
max_tries |
int or Callable[[], int] |
None |
Maximum number of attempts |
max_time |
float, timedelta, or Callable |
None |
Maximum total elapsed time |
jitter |
Jitterer or None |
full_jitter |
Jitter function |
giveup |
Callable[[Exception], bool or float] |
lambda e: False |
Stop retrying for matching exceptions; return float to override wait |
on_success |
Handler or list |
None |
Called after successful attempt |
on_backoff |
Handler or list |
None |
Called before each retry |
on_giveup |
Handler or list |
None |
Called when retries exhausted |
on_attempt |
Handler or list |
None |
Called before each attempt |
before_sleep |
Handler or list |
None |
Called before sleeping |
before |
Handler or list |
None |
Called before each attempt (lower-level than on_attempt) |
after |
Handler or list |
None |
Called after each attempt (lower-level than on_success/on_giveup) |
retry_error_callback |
Callable[[dict], Any] |
None |
Called when retry gives up instead of raising |
raise_on_giveup |
bool |
True |
Raise final exception when giving up |
logger |
str or Logger |
"backon" |
Logger name or instance |
backoff_log_level |
int |
logging.INFO |
Log level for backoff messages |
giveup_log_level |
int |
logging.ERROR |
Log level for giveup messages |
sleep |
Callable[[float], Any] |
None |
Custom sleep function |
**wait_gen_kwargs |
varies | — | Extra kwargs passed to the wait generator (e.g. base=3, interval=0.5) |
Retry while the predicate matches the return value.
@backon.on_predicate(backon.constant, predicate=lambda x: x is None, max_tries=5)
def poll():
...Accepts all parameters from on_exception except exception, giveup, and raise_on_giveup. Adds:
| Argument | Type | Default | Description |
|---|---|---|---|
predicate |
Callable[[Any], bool] |
operator.not_ |
Retry when this returns True for the return value |
result = backon.retry(
target=my_function,
wait_gen=backon.expo,
exception=ValueError,
max_tries=3,
)Accepts all parameters from on_exception plus on_predicate extras, plus:
| Argument | Type | Default | Description |
|---|---|---|---|
condition |
RetryCondition |
None |
Advanced retry condition object |
stop |
Stop |
None |
Advanced stop condition object |
name |
str |
"" |
Identifier for the retry call |
**wait_gen_kwargs |
varies | — | Extra kwargs passed to the wait generator |
If target is a coroutine function, retry() returns a coroutine. Otherwise it returns the result synchronously.
with backon.Retrying(backon.expo, exception=ValueError, max_tries=3) as r:
r.call(my_function)
async with backon.Retrying(backon.constant, exception=ValueError, max_tries=3, interval=0.5) as r:
await r.async_call(my_async_function)| Method | Description |
|---|---|
call(target, *args, **kwargs) |
Execute synchronously |
async_call(target, *args, **kwargs) |
Execute asynchronously |
copy() |
Return a modified copy of the Retrying instance |
statistics |
Property returning dict with attempt_number, elapsed, idle_for, start_time |
call_state |
Property returning the current RetryCallState |
enabled |
Property to enable/disable retry per-instance |
Arguments: Same as retry(), plus enabled (default True).
A callable object with pre-bound exception type via .on().
caller = backon.RetryingCaller(backon.expo, max_tries=3)
caller = caller.on(ValueError)
result = caller(my_function, arg1, arg2)Async variant of RetryingCaller.
caller = backon.AsyncRetryingCaller(backon.expo, max_tries=3).on(ValueError)
result = await caller(my_async_function, arg1, arg2)| Method | Description |
|---|---|
.on(exception) |
Return a copy bound to the given exception type |
.copy() |
Return a modified copy |
.__call__(target, *args, **kwargs) |
Execute with retry |
All wait generators are callables that produce a sequence of wait times. Pass extra kwargs (e.g. interval=0.5, base=3) as **wait_gen_kwargs to decorators and functions.
| Generator | Signature | Description |
|---|---|---|
expo |
(base=2, factor=1, max_value=None) |
Exponential backoff: factor * base^n |
constant |
(interval=1) |
Fixed interval; accepts float or Sequence[float] for varied intervals |
fibo |
(max_value=None) |
Fibonacci sequence: 1, 1, 2, 3, 5, 8, ... |
runtime |
(value=Callable) |
Dynamic wait from return value or exception — useful for Retry-After headers |
decay |
(initial_value=1, decay_factor=1, min_value=None) |
Exponential decay: initial * e^(-t * decay_factor) |
wait_random_exponential |
(multiplier=1, max_value=None, exp_base=2, min_value=0) |
Randomized exponential (uniform random between 0 and the exponential value) |
wait_incrementing |
(start=1, increment=1, max_value=None) |
Linear increment: start + n * increment |
wait_chain |
(*generators) |
Sequentially play through multiple generators |
wait_exception |
(value=Callable) |
Dynamic wait based on the caught exception |
wait_random |
(min=0, max=1) |
Uniform random wait between min and max |
wait_exponential_jitter |
(initial=1, max=60, exp_base=2, jitter=1) |
Exponential backoff with added random jitter |
wait_none |
() |
Always returns 0 (no wait) |
Composition: Combine wait generators with +:
wait_strategy = backon.expo(base=3) + backon.constant(interval=0.5)Stop conditions determine when retry should cease. They can be composed with | (any) and & (all).
| Condition | Description |
|---|---|
stop_after_attempt(max_attempts) |
Stop after N attempts |
stop_after_delay(max_delay) |
Stop after total elapsed time exceeds max_delay seconds |
stop_before_delay(max_delay) |
Stop if the next wait would exceed max_delay |
stop_all(*stops) |
Stop when all sub-conditions are met |
stop_any(*stops) |
Stop when any sub-condition is met |
stop_never() |
Never stop (retry indefinitely) |
stop_when_event_set(event) |
Stop when a threading.Event is set |
from backon import stop_after_attempt, stop_after_delay, stop_any
stop = stop_after_attempt(5) | stop_after_delay(30.0)Retry conditions determine whether a retry should happen. They can be composed with | and &.
| Condition | Description |
|---|---|
retry_if_exception_type(*types) |
Retry if exception is an instance of given type(s) |
retry_if_exception(predicate) |
Retry if the exception matches a custom predicate |
retry_if_exception_message(message, match=None) |
Retry if exception message contains a string (or matches regex with match="re") |
retry_if_result(predicate) |
Retry if the return value matches a predicate |
retry_if_not_result(predicate) |
Retry if the return value does NOT match a predicate |
retry_all(*conditions) |
Retry only when all conditions pass |
retry_any(*conditions) |
Retry when any condition passes |
retry_always() |
Always retry |
retry_never() |
Never retry |
from backon import retry_if_exception_type, retry_if_exception_message, retry_all
condition = retry_all(
retry_if_exception_type(HTTPError),
retry_if_exception_message("429"),
)@backon.on_exception(backon.expo, ValueError, jitter=backon.full_jitter)
def f():
...| Jitter | Effect |
|---|---|
backon.full_jitter |
Random value between 0 and the calculated wait time |
backon.random_jitter |
Adds random() to the calculated wait time (~+0.5s on average) |
None |
No jitter (deterministic waits) |
Handlers receive a details dict with contextual information:
def handler(details):
print(f"Attempt {details['tries']}, elapsed {details['elapsed']:.2f}s")
@backon.on_exception(
backon.expo, ValueError, max_tries=3,
on_attempt=handler,
on_backoff=handler,
on_success=handler,
on_giveup=handler,
)
def f():
...Available keys in details:
| Key | Available in |
|---|---|
target |
All |
args, kwargs |
All |
tries |
All |
elapsed |
All |
value |
on_success, on_backoff, on_giveup |
exception |
on_backoff, on_giveup |
wait |
on_backoff, before_sleep |
Useful in tests to disable retry logic globally:
backon.disable() # skip retry, call function directly
backon.enable() # re-enable retryPer-instance toggle via Retrying.enabled:
r = backon.Retrying(backon.expo, exception=ValueError, max_tries=3)
r.enabled = False
result = r.call(fn) # no retryAll three APIs work with async functions transparently:
@backon.on_exception(backon.expo, ValueError, max_tries=3)
async def fetch():
return await api.call()
result = await backon.retry(fetch, backon.expo, exception=ValueError, max_tries=3)
async with backon.Retrying(backon.expo, exception=ValueError, max_tries=3) as r:
result = await r.async_call(fetch)Replace the default sleep for testing or special environments:
@backon.on_exception(
backon.expo, ValueError, max_tries=3,
sleep=lambda s: print(f"waiting {s}s"),
)
def f():
...
# With asyncio.Event for testing
import asyncio
event = asyncio.Event()
@backon.on_exception(
backon.expo, ValueError, max_tries=3,
sleep=backon.sleep_using_event(event),
)
async def f():
...Circuit breaker with three states: CLOSED (normal), OPEN (failing), HALF_OPEN (testing recovery).
from backon._circuit_breaker import CircuitBreaker, BreakerRetrying, CircuitOpenError
breaker = BreakerRetrying(
backon.expo, max_tries=3,
breaker=CircuitBreaker(
failure_threshold=5,
recovery_timeout=60.0,
half_open_max_calls=1,
),
)
try:
result = breaker.call(fetch)
except CircuitOpenError:
print("Circuit is open, skipping request")CircuitBreaker parameter |
Default | Description |
|---|---|---|
failure_threshold |
5 |
Consecutive failures before opening the circuit |
recovery_timeout |
60.0 |
Seconds before transitioning from OPEN to HALF_OPEN |
half_open_max_calls |
1 |
Allowed calls in HALF_OPEN state before fully closing |
name |
"" |
Identifier for the breaker |
Run multiple retry attempts concurrently and return the first success.
from backon._hedging import hedge, HedgingRetrying
# Functional
result = hedge(fetch, backon.expo, max_hedge=3)
# Decorator
@backon.on_hedge(backon.expo, max_hedge=3)
def fetch():
...
# Context manager
with HedgingRetrying(backon.expo, max_hedge=3) as h:
result = h.call(fetch)| Parameter | Default | Description |
|---|---|---|
max_hedge |
3 |
Number of concurrent hedged requests |
timeout |
None |
Maximum time to wait for any hedge |
on_hedge |
None |
Callback when a hedge request is sent |
Optional Prometheus and OpenTelemetry metrics. Requires prometheus_client or opentelemetry-api to be installed.
from backon._instrumentation import PrometheusMetrics, OTelMetrics, set_metrics_collector
# Prometheus
set_metrics_collector(PrometheusMetrics())
# OpenTelemetry
set_metrics_collector(OTelMetrics(meter_name="myapp.backon"))Metrics collected:
backon_retry_attempts_total(attempts, labeled by target and exception type)backon_retry_success_total(successes)backon_retry_failure_total(failures)backon_circuit_breaker_open_total/backon_circuit_breaker_close_totalbackon_hedge_requests_totalbackon.retry.attempt_duration(histogram, OTel only)
from backon._testing import (
disable_retries, enable_retries,
test_config, limit_retries, remove_backoff,
assert_retried, assert_not_retried,
)
# Context manager that skips retry for a block
with disable_retries():
result = fetch()
# Limit max retries in tests
with limit_retries(2):
fetch()
# Remove backoff delay entirely
with remove_backoff():
fetch()
# Assert the function was retried N times
assert_retried(fetch, expected_tries=3)Retry with the trio async framework:
from backon._trio import retry_exception, retry_predicate
@retry_exception(backon.expo, ValueError, max_tries=3)
async def fetch():
...Requires trio to be installed.
Check if code is running inside a retry and get the current attempt number anywhere in the call stack:
from backon import is_retrying, get_attempt_number
def log_attempt():
if is_retrying():
print(f"This is attempt #{get_attempt_number()}")
@backon.on_exception(backon.expo, ValueError, max_tries=3)
def fetch():
log_attempt()
return api.call()Uses contextvars — thread-safe and async-safe.
Override the wait time per attempt by returning a float from the giveup callback. Useful for respecting Retry-After headers.
def respect_retry_after(exc: HTTPError) -> float:
return exc.response.headers.get("Retry-After", 1.0)
@backon.on_exception(backon.expo, HTTPError, giveup=respect_retry_after)
def fetch():
...When 5 or more retries occur with less than 100ms between them, backon logs a warning. This helps detect misconfigured retry policies before they cause issues.
r = backon.Retrying(backon.expo, exception=ValueError, max_tries=3)
result = r.call(fetch)
print(r.statistics)
# {'start_time': ..., 'attempt_number': 2, 'idle_for': 1.5, 'elapsed': 2.3}
print(r.call_state)
# RetryCallState(fn=..., attempt_number=2, ...)Compose stops, conditions, and wait generators using Python operators:
# Stop when either condition is met
stop = stop_after_attempt(5) | stop_after_delay(30.0)
# Retry when both conditions pass
cond = retry_if_exception_type(TimeoutError) & retry_if_result(lambda x: x is None)
# Wait with combined strategy
wait = backon.expo(base=3) + backon.constant(interval=0.5)for attempt in backon.Retrying(backon.expo, exception=ValueError, max_tries=3):
with attempt:
result = fetch()
if not attempt.failed:
breakbackon is a near-drop-in replacement. Change your imports:
- import backoff
+ import backon
- @backoff.on_exception(backoff.expo, ValueError, max_tries=3)
+ @backon.on_exception(backon.expo, ValueError, max_tries=3)Key differences:
| Area | backoff | backon |
|---|---|---|
| Python support | 3.7+ | 3.10+ |
| Type hints | Partial | Full |
on_attempt callback |
Not supported | Supported |
| Context manager | Not supported | Retrying class |
| Functional API | Not supported | retry() function, RetryingCaller |
| Global toggle | Not supported | disable() / enable() |
| Custom sleep | Not supported | sleep= parameter |
| Circuit breaker | Not supported | CircuitBreaker + BreakerRetrying |
| Hedging | Not supported | hedge() / on_hedge() |
| Metrics | Not supported | Prometheus / OTel |
| Wait generator composition | Not supported | + operator |
| Stop / RetryCondition composition | Not supported | | / & operators |
| Trio | Not supported | import from backon._trio |
| Iterator API | Not supported | for attempt in Retrying(): |
| Build system | Poetry | PDM (PEP 621) |
git clone https://github.com/Llucs/backon.git
cd backon
pip install pdm
pdm install
pdm run ruff check backon/ tests/
pdm run mypy backon/
pdm run pytest tests/ -qMade by Llucs with ❤️
