Python asyncio 原生组件在共享状态处理上的不足
What Python's asyncio primitives get wrong about shared state

原始链接: https://www.inngest.com/blog/no-lost-updates-python-asyncio

## 在Asyncio中协调并发任务:超越事件和条件 在Python的`asyncio`中,跨并发任务管理共享状态是一个常见挑战。虽然`asyncio.Event`和`asyncio.Condition`是标准工具,但在实际并发场景下表现不足。这是因为`asyncio.Event`需要多个事件来表示不同的状态,而`asyncio.Condition`存在“丢失更新”问题——如果状态在通知和消费者运行之间发生变化,消费者可能会错过状态转换。 提出的解决方案是`ValueWatcher`类。它不依赖于消费者不断检查状态,而是将*每次*状态转换都缓冲到每个消费者的队列中。然后,消费者从其队列中读取数据,确保它们不会错过任何变化,即使状态快速波动。 该实现包括线程安全、原子注册、泛型、超时、条件设置、变化监听和回调API等功能。它有效地解决了标准库原语的局限性,为在`asyncio`应用程序中围绕共享可变状态协调并发任务提供了一种强大而可靠的方法。完整实现大约300行,可供使用。

一个 Hacker News 的讨论围绕着一篇批评 Python 的 `asyncio` 原语在共享状态管理方面的问题的文章。一位评论者,TZubiri,质疑从传统轮询到 `asyncio` 的转变,认为许多开发者缺乏对线程和并发基础的扎实理解。 TZubiri 认为,一些异步编程的新手认为它是一种解决 Python 单线程限制(由于 GIL)的最新方案,而忽略了中间的并发选项。他们认为最近的 GIL 移除和 `asyncio` 改进被误解为首次实现了真正的并行性,导致初级开发者通过一个可能具有误导性的框架学习并发概念。核心论点在于实际 Python 开发与对“前沿”异步技术的感知需求之间的脱节。
相关文章

原文

Coordinating concurrent tasks around shared state is one of the most common problems in Python's asyncio. The standard library gives you asyncio.Event and asyncio.Condition, but each has a gap that only shows up under real concurrency pressure. We hit this while building Inngest's Python SDK, where multiple async handlers coordinate around WebSocket connection state.

This post works through each primitive, shows exactly where it breaks, and iterates toward a solution that handles every case we threw at it.

The scenario

Imagine an async Python app managing a connection that moves through states:

txt

disconnected → connecting → connected → closing → closed

One of your concurrent handlers needs to drain pending requests when the connection starts shutting down. It has to wait for the closing state:

state = "disconnected"

async def drain_requests():
    
    ...
    print("draining pending requests")

Simple enough. Let's see how each stdlib tool handles it.

Attempt 1: Polling

The most obvious approach: check the value in a loop.

async def drain_requests():
    while state != "closing":
        await asyncio.sleep(0.1)
    print("draining pending requests")

This works. But the tradeoffs are bad:

  • Latency vs. efficiency: A short sleep interval wastes CPU cycles. A long one adds latency. There's no good value.
  • Duplication: Every consumer reimplements the same polling loop with the same tradeoff.
  • No event-driven wake: The consumer runs whether or not anything changed.

We can do better. What we actually want is to sleep until the state changes, not sleep for an arbitrary duration and check.

Attempt 2: asyncio.Event

asyncio.Event is the stdlib's answer to "wake me up when something happens":

closing_event = asyncio.Event()

async def drain_requests():
    await closing_event.wait()
    print("draining pending requests")

No polling, no wasted cycles. The handler blocks until the event fires. But Event is boolean: it's either set or unset. Our connection has five states, and drain_requests only cares about one of them. What happens when another handler needs to wait for connected? You need a second event. A third handler waiting for "not disconnected"? A third event with inverted logic. The setter has to know about all of them:

closing_event = asyncio.Event()
connected_event = asyncio.Event()

async def set_state(new_state):
    global state
    state = new_state
    if new_state == "closing":
        closing_event.set()
    if new_state == "connected":
        connected_event.set()

Every new condition requires another Event object. The coordination between events is where bugs live. Forget a set() or clear() call and a consumer blocks forever.

Attempt 3: asyncio.Condition

asyncio.Condition lets consumers wait on arbitrary predicates:

state = "disconnected"
condition = asyncio.Condition()

async def drain_requests():
    async with condition:
        await condition.wait_for(lambda: state == "closing")
    print("draining pending requests")

One coordination point, arbitrary predicates, no proliferation of Event objects. This is much better.

But it breaks under a common pattern.

The lost update

Condition is designed to check the current value when a consumer wakes up. That's fine when state only moves forward, but it falls apart when transitions are fast. When the setter changes state, it calls notify_all(), which schedules wakeups for every waiting consumer. But in a single-threaded event loop, no consumer actually runs until the current coroutine yields. If the value changes again before that happens, consumers wake up and re-evaluate their predicate against the current value, not the value that triggered the notification. The predicate fails and the consumer goes back to sleep, potentially forever.


await set_state("closing")  
await set_state("closed")   



Here's a runnable reproduction:

import asyncio

state = "disconnected"
condition = asyncio.Condition()

async def set_state(new_state):
    global state
    async with condition:
        state = new_state
        condition.notify_all()

async def drain_requests():
    async with condition:
        await condition.wait_for(lambda: state == "closing")
    print("draining pending requests")

async def main():
    task = asyncio.create_task(drain_requests())
    await asyncio.sleep(0)  

    await set_state("closing")  
    await set_state("closed")   

    await asyncio.wait_for(task, timeout=1.0)
    

asyncio.run(main())

The value was "closing", but by the time drain_requests wakes and checks, it's already "closed". The intermediate state is gone.

This isn't a contrived edge case. In our SDK's connection manager, a close signal can arrive and the connection can shut down in the same event loop tick. drain_requests never runs, and any in-flight work just disappears.

The fix: per-consumer queues

Instead of waking consumers and asking "is the current state what you want?", buffer every transition into a per-consumer queue. Each consumer drains its own queue and checks each transition individually. The consumer never misses a state.

Each consumer registers its own asyncio.Queue. When the value changes, the setter pushes (old, new) into every registered queue. Here's a simplified version that illustrates the core idea:

class ValueWatcher:
    def __init__(self, initial_value):
        self._value = initial_value
        self._watch_queues: list[asyncio.Queue] = []

    @property
    def value(self):
        return self._value

    @value.setter
    def value(self, new_value):
        if new_value == self._value:
            return

        old_value = self._value
        self._value = new_value

        
        for queue in self._watch_queues:
            queue.put_nowait((old_value, new_value))

    async def wait_for(self, target):
        queue = asyncio.Queue()
        self._watch_queues.append(queue)

        try:
            if self._value == target:
                return

            while True:
                old, new = await queue.get()
                if new == target:
                    return
        finally:
            self._watch_queues.remove(queue)

wait_for registers a queue, checks the current value, then drains transitions until it finds a match. The try/finally ensures the queue gets deregistered even if the caller cancels.

The queue buffers and delivers every intermediate transition in order, even if the value changes multiple times before a consumer runs.

Making it production-ready

We still need a handful of features to make it production-ready. Our final implementation needs the following:

  • Thread safety: A threading.Lock protects the value and queue list. Each queue is paired with its event loop, and the setter uses loop.call_soon_threadsafe instead of put_nowait directly.
  • Atomic registration: wait_for checks the current value and registers the queue inside the same lock acquisition, closing the race where a transition could slip between registration and the initial check.
  • Full generic typing: Generic[T] end-to-end, so predicates, queues, and return values are all type-checked.
  • Predicate-based matching: wait_for, wait_for_not, and wait_for_not_none all route through a shared _wait_for_condition(predicate) core.
  • Timeouts: Every wait method accepts an optional timeout, backed by asyncio.wait_for.
  • Conditional set: set_if atomically sets the value only when the current value satisfies a predicate, useful for state machine transitions that should only happen from a specific state.
  • Change watching: wait_for_change waits for any transition regardless of value, handy for logging or reacting to state churn.
  • Callback API: on_change and on_value for synchronous consumers alongside the async wait API.
  • Resilient notifications: The setter catches RuntimeError (closed loop) and suppresses callback exceptions so one failure doesn't block other consumers.

The full implementation is about 300 lines, most of which is docstrings and convenience methods built on the same core. Feel free to copy it into your codebase!

Full ValueWatcher source
from __future__ import annotations

import asyncio
import threading
import typing

T = typing.TypeVar("T")


S = typing.TypeVar("S")


class ValueWatcher(typing.Generic[T]):
    """
    Thread-safe observable value with async watchers.

    Watchers can await value changes via methods like `wait_for` and
    `wait_for_change`. Alternatively, they can add callbacks via `on_change` and
    `on_value`.

    Any thread can set `.value`, and the watcher will react accordingly.
    """

    def __init__(
        self,
        initial_value: T,
        *,
        on_change: typing.Callable[[T, T], None] | None = None,
    ) -> None:
        """
        Args:
            initial_value: The initial value.
            on_change: Called when the value changes. Good for debug logging.
        """

        self._lock = threading.Lock()
        self._on_changes: list[typing.Callable[[T, T], None]] = []
        if on_change:
            self._on_changes.append(on_change)

        
        
        
        self._watch_queues: list[
            tuple[asyncio.AbstractEventLoop, asyncio.Queue[tuple[T, T]]]
        ] = []

        
        self._background_tasks: set[asyncio.Task[T]] = set()

        self._value = initial_value

    @property
    def value(self) -> T:
        with self._lock:
            return self._value

    @value.setter
    def value(self, new_value: T) -> None:
        with self._lock:
            if new_value == self._value:
                return

            old_value = self._value
            self._value = new_value

            
            queues = list(self._watch_queues)
            callbacks = list(self._on_changes)

        
        for loop, queue in queues:
            try:
                
                
                
                
                
                
                
                
                loop.call_soon_threadsafe(
                    queue.put_nowait, (old_value, new_value)
                )
            except RuntimeError:
                
                pass

        for on_change in callbacks:
            try:
                on_change(old_value, new_value)
            except Exception:
                
                
                pass

    def set_if(
        self,
        new_value: T,
        condition: typing.Callable[[T], bool],
    ) -> bool:
        """
        Atomically set the value only if the current value satisfies the
        condition. Returns True if the value was set.
        """

        with self._lock:
            if not condition(self._value):
                return False

            if new_value == self._value:
                return True

            old_value = self._value
            self._value = new_value

            queues = list(self._watch_queues)
            callbacks = list(self._on_changes)

        for loop, queue in queues:
            try:
                loop.call_soon_threadsafe(
                    queue.put_nowait, (old_value, new_value)
                )
            except RuntimeError:
                pass

        for on_change in callbacks:
            try:
                on_change(old_value, new_value)
            except Exception:
                pass

        return True

    def on_change(self, callback: typing.Callable[[T, T], None]) -> None:
        """
        Add a callback that's called when the value changes.

        Args:
            callback: Called with (old_value, new_value) on each change.
        """

        with self._lock:
            self._on_changes.append(callback)

    def on_value(self, value: T, callback: typing.Callable[[], None]) -> None:
        """
        One-shot callback for when the value equals `value`. Requires a
        running event loop (internally spawns a background task).

        Args:
            value: The value to wait for.
            callback: Called when the internal value equals `value`.
        """

        task = asyncio.create_task(self.wait_for(value))
        self._background_tasks.add(task)

        def _done(t: asyncio.Task[T]) -> None:
            self._background_tasks.discard(t)
            if not t.cancelled() and t.exception() is None:
                callback()

        task.add_done_callback(_done)

    async def wait_for(
        self,
        value: T,
        *,
        immediate: bool = True,
        timeout: float | None = None,
    ) -> T:
        """
        Wait for the internal value to equal the given value.

        Args:
            value: Return when the internal value is equal to this.
            immediate: If True and the internal value is already equal to the given value, return immediately. Defaults to True.
            timeout: Seconds to wait before raising `asyncio.TimeoutError`. None means wait forever.
        """

        return await self._wait_for_condition(
            lambda v: v == value,
            immediate=immediate,
            timeout=timeout,
        )

    async def wait_for_not(
        self,
        value: T,
        *,
        immediate: bool = True,
        timeout: float | None = None,
    ) -> T:
        """
        Wait for the internal value to not equal the given value.

        Args:
            value: Return when the internal value is not equal to this.
            immediate: If True and the internal value is already not equal to the given value, return immediately. Defaults to True.
            timeout: Seconds to wait before raising `asyncio.TimeoutError`. None means wait forever.
        """

        return await self._wait_for_condition(
            lambda v: v != value,
            immediate=immediate,
            timeout=timeout,
        )

    async def wait_for_not_none(
        self: ValueWatcher[S | None],
        *,
        immediate: bool = True,
        timeout: float | None = None,
    ) -> S:
        """
        Wait for the internal value to be not None.

        Args:
            immediate: If True and the internal value is already not None, return immediately. Defaults to True.
            timeout: Seconds to wait before raising `asyncio.TimeoutError`. None means wait forever.
        """

        result = await self._wait_for_condition(
            lambda v: v is not None,
            immediate=immediate,
            timeout=timeout,
        )
        if result is None:
            raise AssertionError("unreachable")
        return result

    async def _wait_for_condition(
        self,
        condition: typing.Callable[[T], bool],
        *,
        immediate: bool = True,
        timeout: float | None = None,
    ) -> T:
        """
        Wait until `condition(current_value)` is true, then return the
        matching value. Handles the TOCTOU gap between checking the current
        value and subscribing to the change queue.
        """

        
        if immediate:
            
            current = self.value
            if condition(current):
                return current

        async def _wait() -> T:
            with self._watch() as queue:
                
                
                if immediate:
                    
                    current = self.value
                    if condition(current):
                        return current

                while True:
                    _, new = await queue.get()
                    if condition(new):
                        return new

        return await asyncio.wait_for(_wait(), timeout=timeout)

    async def wait_for_change(
        self,
        *,
        timeout: float | None = None,
    ) -> T:
        """
        Wait for the internal value to change.

        Args:
            timeout: Seconds to wait before raising `asyncio.TimeoutError`. None means wait forever.
        """

        async def _wait() -> T:
            with self._watch() as queue:
                _, new = await queue.get()
                return new

        return await asyncio.wait_for(_wait(), timeout=timeout)

    def _watch(self) -> _WatchContextManager[T]:
        """
        Watch for all changes to the value. This method returns a context
        manager so it must be used in a `with` statement.

        Its return value is a queue that yields tuples of the old and new
        values.
        """

        loop = asyncio.get_running_loop()
        queue = asyncio.Queue[tuple[T, T]]()
        with self._lock:
            self._watch_queues.append((loop, queue))

        return _WatchContextManager(
            on_exit=lambda: self._remove_queue(queue),
            queue=queue,
        )

    def _remove_queue(self, queue: asyncio.Queue[tuple[T, T]]) -> None:
        """
        Remove a queue from the watch list in a thread-safe manner.
        """

        with self._lock:
            self._watch_queues = [
                entry for entry in self._watch_queues if entry[1] is not queue
            ]


class _WatchContextManager(typing.Generic[T]):
    """
    Context manager that's used to automatically delete a queue when it's no
    longer being watched.

    Returns a queue that yields tuples of the old and new values.
    """

    def __init__(
        self,
        on_exit: typing.Callable[[], None],
        queue: asyncio.Queue[tuple[T, T]],
    ) -> None:
        self._on_exit = on_exit
        self._queue = queue

    def __enter__(self) -> asyncio.Queue[tuple[T, T]]:
        
        
        return self._queue

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: object,
    ) -> None:
        self._on_exit()

wait_for_not_none is particularly useful since we love type safety:


await state.wait_for_not("disconnected")


ws_watcher = ValueWatcher[Connection | None](None)
ws: Connection = await ws_watcher.wait_for_not_none()

One caveat

The setter deduplicates by equality: if the new value == the current value, no notification fires. This works well for enums, strings, and ints, but mutating a mutable object in place and reassigning the same reference won't trigger consumers (because obj == obj is trivially True). Stick to immutable values and this isn't a concern.

Wrapping up

The core insight is simple: asyncio.Condition asks consumers "is the current state what you want?" when it should ask "did the state ever become what you want?" Per-consumer queues make that possible by buffering every transition instead of just notifying about the latest one.

We use ValueWatcher throughout Inngest's Python SDK to coordinate WebSocket connection state, worker lifecycle, and graceful shutdown. If you're managing shared mutable state in asyncio, give it a try.

联系我们 contact @ memedata.com