Skip to content

Commit d38acb1

Browse files
committed
Migrate work for finished CurrentThreadExecutor to previous executor
A CurrentThreadExecutor may terminate with work still remaining in its queue, or new work may be submitted later. We previously discarded remaining work, leading to deadlocks, and raised an error on submitting late work. But if there’s another CurrentThreadExecutor for the same thread below us on the stack, we should instead migrate the extra work there to allow it to eventually run. Doing this in a thread-safe way requires replacing the queue.Queue abstraction with collections.deque and threading.ConditionVariable (the same primitives used to implement queue.Queue). Fixes #491; fixes #492. Signed-off-by: Anders Kaseorg <[email protected]>
1 parent f34be84 commit d38acb1

File tree

2 files changed

+53
-24
lines changed

2 files changed

+53
-24
lines changed

Diff for: asgiref/current_thread_executor.py

+52-23
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import queue
21
import sys
32
import threading
3+
from collections import deque
44
from concurrent.futures import Executor, Future
5-
from typing import TYPE_CHECKING, Any, Callable, TypeVar, Union
5+
from typing import TYPE_CHECKING, Any, Callable, TypeVar
66

77
if sys.version_info >= (3, 10):
88
from typing import ParamSpec
@@ -53,10 +53,12 @@ class CurrentThreadExecutor(Executor):
5353
the thread they came from.
5454
"""
5555

56-
def __init__(self) -> None:
56+
def __init__(self, old_executor: "CurrentThreadExecutor | None") -> None:
5757
self._work_thread = threading.current_thread()
58-
self._work_queue: queue.Queue[Union[_WorkItem, "Future[Any]"]] = queue.Queue()
59-
self._broken = False
58+
self._work_ready = threading.Condition(threading.Lock())
59+
self._work_items = deque[_WorkItem]() # synchronized by _work_ready
60+
self._broken = False # synchronized by _work_ready
61+
self._old_executor = old_executor
6062

6163
def run_until_future(self, future: "Future[Any]") -> None:
6264
"""
@@ -68,20 +70,37 @@ def run_until_future(self, future: "Future[Any]") -> None:
6870
raise RuntimeError(
6971
"You cannot run CurrentThreadExecutor from a different thread"
7072
)
71-
future.add_done_callback(self._work_queue.put)
72-
# Keep getting and running work items until we get the future we're waiting for
73-
# back via the future's done callback.
74-
try:
75-
while True:
73+
74+
is_done = False # synchronized by _work_ready
75+
76+
def done(future: "Future[Any]") -> None:
77+
with self._work_ready:
78+
nonlocal is_done
79+
is_done = True
80+
self._work_ready.notify()
81+
82+
future.add_done_callback(done)
83+
# Keep getting and running work items until the future we're waiting for
84+
# is done.
85+
while True:
86+
with self._work_ready:
87+
while not is_done and not self._work_items:
88+
self._work_ready.wait()
89+
if is_done:
90+
self._broken = True
91+
break
7692
# Get a work item and run it
77-
work_item = self._work_queue.get()
78-
if work_item is future:
79-
return
80-
assert isinstance(work_item, _WorkItem)
81-
work_item.run()
82-
del work_item
83-
finally:
84-
self._broken = True
93+
work_item = self._work_items.popleft()
94+
work_item.run()
95+
del work_item
96+
97+
# Resubmit remaining work to the previous CurrentThreadExecutor on the
98+
# stack, if any.
99+
if self._old_executor is not None and self._work_items:
100+
with self._old_executor._work_ready:
101+
assert not self._old_executor._broken # It's running below us
102+
self._old_executor._work_items.extend(self._work_items)
103+
self._old_executor._work_ready.notify()
85104

86105
def _submit(
87106
self,
@@ -94,13 +113,23 @@ def _submit(
94113
raise RuntimeError(
95114
"You cannot submit onto CurrentThreadExecutor from its own thread"
96115
)
97-
# Check they're not too late or the executor errored
98-
if self._broken:
99-
raise RuntimeError("CurrentThreadExecutor already quit or is broken")
100-
# Add to work queue
101116
f: "Future[_R]" = Future()
102117
work_item = _WorkItem(f, fn, *args, **kwargs)
103-
self._work_queue.put(work_item)
118+
119+
# Walk up the CurrentThreadExecutor stack to find the closest one still
120+
# running
121+
executor = self
122+
while True:
123+
with executor._work_ready:
124+
if not executor._broken:
125+
# Add to work queue
126+
executor._work_items.append(work_item)
127+
executor._work_ready.notify()
128+
break
129+
if executor._old_executor is None:
130+
raise RuntimeError("CurrentThreadExecutor already quit or is broken")
131+
executor = executor._old_executor
132+
104133
# Return the future
105134
return f
106135

Diff for: asgiref/sync.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
196196
# need one for every sync frame, even if there's one above us in the
197197
# same thread.
198198
old_executor = getattr(self.executors, "current", None)
199-
current_executor = CurrentThreadExecutor()
199+
current_executor = CurrentThreadExecutor(old_executor)
200200
self.executors.current = current_executor
201201

202202
# Wrapping context in list so it can be reassigned from within

0 commit comments

Comments
 (0)