From fb6a6dabad6687eee2ddab0dbfc65ee03668cb71 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Fri, 21 Mar 2025 08:14:08 +0000 Subject: [PATCH 01/16] avoid holding a reference to Outcome in to_thread_run_sync --- src/trio/_core/_tests/test_run.py | 12 +------ src/trio/_core/_tests/tutil.py | 11 ++++++ src/trio/_tests/test_threads.py | 58 ++++++++++++++++++++++++++++++- src/trio/_threads.py | 24 +++++++------ 4 files changed, 82 insertions(+), 23 deletions(-) diff --git a/src/trio/_core/_tests/test_run.py b/src/trio/_core/_tests/test_run.py index 7728a6f3d4..fe191aa319 100644 --- a/src/trio/_core/_tests/test_run.py +++ b/src/trio/_core/_tests/test_run.py @@ -33,6 +33,7 @@ create_asyncio_future_in_new_loop, gc_collect_harder, ignore_coroutine_never_awaited_warnings, + no_other_refs, restore_unraisablehook, slow, ) @@ -2802,17 +2803,6 @@ async def spawn_tasks_in_old_nursery(task_status: _core.TaskStatus[None]) -> Non assert RaisesGroup(ValueError, ValueError).matches(excinfo.value.__cause__) -if sys.version_info >= (3, 11): - - def no_other_refs() -> list[object]: - return [] - -else: - - def no_other_refs() -> list[object]: - return [sys._getframe(1)] - - @pytest.mark.skipif( sys.implementation.name != "cpython", reason="Only makes sense with refcounting GC", diff --git a/src/trio/_core/_tests/tutil.py b/src/trio/_core/_tests/tutil.py index 063fa1dd80..ae47d360eb 100644 --- a/src/trio/_core/_tests/tutil.py +++ b/src/trio/_core/_tests/tutil.py @@ -115,3 +115,14 @@ def check_sequence_matches(seq: Sequence[T], template: Iterable[T | set[T]]) -> def create_asyncio_future_in_new_loop() -> asyncio.Future[object]: with closing(asyncio.new_event_loop()) as loop: return loop.create_future() + + +if sys.version_info >= (3, 11): + + def no_other_refs() -> list[object]: + return [] + +else: + + def no_other_refs() -> list[object]: + return [sys._getframe(1)] diff --git a/src/trio/_tests/test_threads.py b/src/trio/_tests/test_threads.py index 380da3833b..a60533690d 100644 --- a/src/trio/_tests/test_threads.py +++ b/src/trio/_tests/test_threads.py @@ -1,6 +1,7 @@ from __future__ import annotations import contextvars +import gc import queue as stdlib_queue import re import sys @@ -29,7 +30,7 @@ sleep_forever, ) from .._core._tests.test_ki import ki_self -from .._core._tests.tutil import slow +from .._core._tests.tutil import gc_collect_harder, no_other_refs, slow from .._threads import ( active_thread_count, current_default_thread_limiter, @@ -1141,3 +1142,58 @@ async def wait_no_threads_left() -> None: async def test_wait_all_threads_completed_no_threads() -> None: await wait_all_threads_completed() assert active_thread_count() == 0 + + +@pytest.mark.skipif( + sys.implementation.name == "pypy", + reason=( + "gc.get_referrers is broken on PyPy (see " + "https://github.com/pypy/pypy/issues/5075)" + ), +) +async def test_run_sync_worker_references() -> None: + class Foo: + pass + + def foo(_: Foo) -> Foo: + return Foo() + + cvar = contextvars.ContextVar[Foo]("cvar") + contextval = Foo() + arg = Foo() + cvar.set(contextval) + v = await to_thread_run_sync(foo, arg) + + cvar.set(Foo()) + gc_collect_harder() + + assert gc.get_referrers(contextval) == no_other_refs() + assert gc.get_referrers(foo) == no_other_refs() + assert gc.get_referrers(arg) == no_other_refs() + assert gc.get_referrers(v) == no_other_refs() + + +@pytest.mark.skipif( + sys.implementation.name == "pypy", + reason=( + "gc.get_referrers is broken on PyPy (see " + "https://github.com/pypy/pypy/issues/5075)" + ), +) +async def test_run_sync_workerreferences_exc() -> None: + + class MyException(Exception): + pass + + def throw() -> None: + raise MyException + + e = None + try: + await to_thread_run_sync(throw) + except MyException as err: + e = err + + gc_collect_harder() + + assert gc.get_referrers(e) == no_other_refs() diff --git a/src/trio/_threads.py b/src/trio/_threads.py index 394e5b06ac..1894799ec7 100644 --- a/src/trio/_threads.py +++ b/src/trio/_threads.py @@ -443,17 +443,19 @@ def abort(raise_cancel: RaiseCancelT) -> trio.lowlevel.Abort: msg_from_thread: outcome.Outcome[RetT] | Run[object] | RunSync[object] = ( await trio.lowlevel.wait_task_rescheduled(abort) ) - if isinstance(msg_from_thread, outcome.Outcome): - return msg_from_thread.unwrap() - elif isinstance(msg_from_thread, Run): - await msg_from_thread.run() - elif isinstance(msg_from_thread, RunSync): - msg_from_thread.run_sync() - else: # pragma: no cover, internal debugging guard TODO: use assert_never - raise TypeError( - f"trio.to_thread.run_sync received unrecognized thread message {msg_from_thread!r}.", - ) - del msg_from_thread + try: + if isinstance(msg_from_thread, outcome.Outcome): + return msg_from_thread.unwrap() + elif isinstance(msg_from_thread, Run): + await msg_from_thread.run() + elif isinstance(msg_from_thread, RunSync): + msg_from_thread.run_sync() + else: # pragma: no cover, internal debugging guard TODO: use assert_never + raise TypeError( + f"trio.to_thread.run_sync received unrecognized thread message {msg_from_thread!r}.", + ) + finally: + del msg_from_thread def from_thread_check_cancelled() -> None: From 53fddde19b61fc88fe0b82cb740b5aae6b5286a0 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Fri, 21 Mar 2025 08:18:06 +0000 Subject: [PATCH 02/16] add newsfragment --- newsfragments/3229.bugfix.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/3229.bugfix.rst diff --git a/newsfragments/3229.bugfix.rst b/newsfragments/3229.bugfix.rst new file mode 100644 index 0000000000..28944b1a5f --- /dev/null +++ b/newsfragments/3229.bugfix.rst @@ -0,0 +1 @@ +Avoid holding refs to result/exception from ``trio.to_thread.run_sync``. From fd732806cb3bf1df2af43f56108bc630a15a592d Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Fri, 21 Mar 2025 08:23:40 +0000 Subject: [PATCH 03/16] grab f_generator on 3.14 no_other_refs --- src/trio/_core/_tests/test_run.py | 4 ---- src/trio/_core/_tests/tutil.py | 7 ++++++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/trio/_core/_tests/test_run.py b/src/trio/_core/_tests/test_run.py index fe191aa319..9c6237d55d 100644 --- a/src/trio/_core/_tests/test_run.py +++ b/src/trio/_core/_tests/test_run.py @@ -2807,10 +2807,6 @@ async def spawn_tasks_in_old_nursery(task_status: _core.TaskStatus[None]) -> Non sys.implementation.name != "cpython", reason="Only makes sense with refcounting GC", ) -@pytest.mark.xfail( - sys.version_info >= (3, 14), - reason="https://github.com/python/cpython/issues/125603", -) async def test_ki_protection_doesnt_leave_cyclic_garbage() -> None: class MyException(Exception): pass diff --git a/src/trio/_core/_tests/tutil.py b/src/trio/_core/_tests/tutil.py index ae47d360eb..65f4a6728d 100644 --- a/src/trio/_core/_tests/tutil.py +++ b/src/trio/_core/_tests/tutil.py @@ -117,7 +117,12 @@ def create_asyncio_future_in_new_loop() -> asyncio.Future[object]: return loop.create_future() -if sys.version_info >= (3, 11): +if sys.version_info >= (3, 14): + + def no_other_refs() -> list[object]: + return [sys._getframe().f_generator] + +elif sys.version_info >= (3, 11): def no_other_refs() -> list[object]: return [] From 99bf248693c62758bcae86a3158409ffde31b43f Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Fri, 21 Mar 2025 08:47:00 +0000 Subject: [PATCH 04/16] use an outcome that clears itself --- src/trio/_threads.py | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/src/trio/_threads.py b/src/trio/_threads.py index 1894799ec7..134e8a25c2 100644 --- a/src/trio/_threads.py +++ b/src/trio/_threads.py @@ -6,7 +6,7 @@ import queue as stdlib_queue import threading from itertools import count -from typing import TYPE_CHECKING, Generic, TypeVar +from typing import TYPE_CHECKING, Generic, TypeVar, Protocol, Final, NoReturn import attrs import outcome @@ -36,6 +36,7 @@ Ts = TypeVarTuple("Ts") RetT = TypeVar("RetT") +T_co = TypeVar("T_co", covariant=True) class _ParentTaskData(threading.local): @@ -253,6 +254,32 @@ def run_in_system_nursery(self, token: TrioToken) -> None: token.run_sync_soon(self.run_sync) +class _SupportsUnwrap(Protocol, Generic[T_co]): + def unwrap(self) -> T_co: ... + + +class _Value(_SupportsUnwrap[T_co]): + def __init__(self, v: T_co) -> None: + self._v: Final = v + + def unwrap(self) -> T_co: + try: + return self._v + finally: + del self._v + + +class _Error(_SupportsUnwrap[NoReturn]): + def __init__(self, e: BaseException) -> None: + self._e: Final = e + + def unwrap(self) -> NoReturn: + try: + raise self._e + finally: + del self._e + + @enable_ki_protection async def to_thread_run_sync( sync_fn: Callable[[Unpack[Ts]], RetT], @@ -372,11 +399,15 @@ def do_release_then_return_result() -> RetT: try: return result.unwrap() finally: + del result limiter.release_on_behalf_of(placeholder) result = outcome.capture(do_release_then_return_result) + if isinstance(result, outcome.Error): + result2: _SupportsUnwrap[RetT] = _Error(result.error) + result2 = _Value(result.value) if task_register[0] is not None: - trio.lowlevel.reschedule(task_register[0], outcome.Value(result)) + trio.lowlevel.reschedule(task_register[0], outcome.Value(result2)) current_trio_token = trio.lowlevel.current_trio_token() From 0d94cb5fb4847bbc172092bdc77ec220be0ffa9c Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Fri, 21 Mar 2025 17:00:03 +0000 Subject: [PATCH 05/16] fix no_other_refs outside async def --- src/trio/_core/_tests/tutil.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/trio/_core/_tests/tutil.py b/src/trio/_core/_tests/tutil.py index 65f4a6728d..b497e03c7b 100644 --- a/src/trio/_core/_tests/tutil.py +++ b/src/trio/_core/_tests/tutil.py @@ -120,7 +120,8 @@ def create_asyncio_future_in_new_loop() -> asyncio.Future[object]: if sys.version_info >= (3, 14): def no_other_refs() -> list[object]: - return [sys._getframe().f_generator] + gen = sys._getframe().f_generator + return [] if gen is None else [gen] elif sys.version_info >= (3, 11): From fc2edc7a8c584e83ef7c5c81e3f6e3097c80353f Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Fri, 21 Mar 2025 17:29:04 +0000 Subject: [PATCH 06/16] pre-commit --- src/trio/_threads.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/trio/_threads.py b/src/trio/_threads.py index 134e8a25c2..c9da97040c 100644 --- a/src/trio/_threads.py +++ b/src/trio/_threads.py @@ -6,7 +6,7 @@ import queue as stdlib_queue import threading from itertools import count -from typing import TYPE_CHECKING, Generic, TypeVar, Protocol, Final, NoReturn +from typing import TYPE_CHECKING, Final, Generic, NoReturn, Protocol, TypeVar import attrs import outcome @@ -399,7 +399,6 @@ def do_release_then_return_result() -> RetT: try: return result.unwrap() finally: - del result limiter.release_on_behalf_of(placeholder) result = outcome.capture(do_release_then_return_result) From 926a8b1301bc5ab954b86809d63f9995ecf7eae9 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Fri, 21 Mar 2025 18:53:54 +0000 Subject: [PATCH 07/16] fix mypy --- src/trio/_threads.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/trio/_threads.py b/src/trio/_threads.py index c9da97040c..6c6b0c16fd 100644 --- a/src/trio/_threads.py +++ b/src/trio/_threads.py @@ -404,7 +404,8 @@ def do_release_then_return_result() -> RetT: result = outcome.capture(do_release_then_return_result) if isinstance(result, outcome.Error): result2: _SupportsUnwrap[RetT] = _Error(result.error) - result2 = _Value(result.value) + elif isinstance(result, outcome.Value): + result2 = _Value(result.value) if task_register[0] is not None: trio.lowlevel.reschedule(task_register[0], outcome.Value(result2)) From 85d0e31220f91d6bda7eb6763fab779f0e14ae8f Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Fri, 21 Mar 2025 19:18:35 +0000 Subject: [PATCH 08/16] fixes --- src/trio/_threads.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/trio/_threads.py b/src/trio/_threads.py index 6c6b0c16fd..1eacc5de1e 100644 --- a/src/trio/_threads.py +++ b/src/trio/_threads.py @@ -406,6 +406,9 @@ def do_release_then_return_result() -> RetT: result2: _SupportsUnwrap[RetT] = _Error(result.error) elif isinstance(result, outcome.Value): result2 = _Value(result.value) + else: + raise RuntimeError("invalid outcome") + del result if task_register[0] is not None: trio.lowlevel.reschedule(task_register[0], outcome.Value(result2)) @@ -471,11 +474,11 @@ def abort(raise_cancel: RaiseCancelT) -> trio.lowlevel.Abort: while True: # wait_task_rescheduled return value cannot be typed - msg_from_thread: outcome.Outcome[RetT] | Run[object] | RunSync[object] = ( + msg_from_thread: _Value[RetT] | _Error | Run[object] | RunSync[object] = ( await trio.lowlevel.wait_task_rescheduled(abort) ) try: - if isinstance(msg_from_thread, outcome.Outcome): + if isinstance(msg_from_thread, (_Value, _Error)): return msg_from_thread.unwrap() elif isinstance(msg_from_thread, Run): await msg_from_thread.run() From fc6bd2e285082029c1114e66f035fdd997bbcf38 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Fri, 21 Mar 2025 20:50:39 +0000 Subject: [PATCH 09/16] more fixes --- src/trio/_core/_entry_queue.py | 2 ++ src/trio/_core/_thread_cache.py | 2 ++ src/trio/_tests/test_threads.py | 3 --- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/trio/_core/_entry_queue.py b/src/trio/_core/_entry_queue.py index 988b45ca00..e0b77959b5 100644 --- a/src/trio/_core/_entry_queue.py +++ b/src/trio/_core/_entry_queue.py @@ -83,6 +83,8 @@ async def kill_everything( # noqa: RUF029 # await not used "Internal error: `parent_nursery` should never be `None`", ) from exc # pragma: no cover parent_nursery.start_soon(kill_everything, exc) + finally: + del sync_fn, args, job # This has to be carefully written to be safe in the face of new items # being queued while we iterate, and to do a bounded amount of work on diff --git a/src/trio/_core/_thread_cache.py b/src/trio/_core/_thread_cache.py index 44820e7711..5d26065b73 100644 --- a/src/trio/_core/_thread_cache.py +++ b/src/trio/_core/_thread_cache.py @@ -188,6 +188,8 @@ def _handle_job(self) -> None: except BaseException as e: print("Exception while delivering result of thread", file=sys.stderr) traceback.print_exception(type(e), e, e.__traceback__) + finally: + del result def _work(self) -> None: while True: diff --git a/src/trio/_tests/test_threads.py b/src/trio/_tests/test_threads.py index a60533690d..526b85d0c8 100644 --- a/src/trio/_tests/test_threads.py +++ b/src/trio/_tests/test_threads.py @@ -1165,7 +1165,6 @@ def foo(_: Foo) -> Foo: v = await to_thread_run_sync(foo, arg) cvar.set(Foo()) - gc_collect_harder() assert gc.get_referrers(contextval) == no_other_refs() assert gc.get_referrers(foo) == no_other_refs() @@ -1194,6 +1193,4 @@ def throw() -> None: except MyException as err: e = err - gc_collect_harder() - assert gc.get_referrers(e) == no_other_refs() From 78f3d1259f98687a583f3024c6477f5b75eeb2e6 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Fri, 21 Mar 2025 20:55:06 +0000 Subject: [PATCH 10/16] typing fixes --- src/trio/_tests/test_threads.py | 2 +- src/trio/_threads.py | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/trio/_tests/test_threads.py b/src/trio/_tests/test_threads.py index 526b85d0c8..f05f2be9fc 100644 --- a/src/trio/_tests/test_threads.py +++ b/src/trio/_tests/test_threads.py @@ -30,7 +30,7 @@ sleep_forever, ) from .._core._tests.test_ki import ki_self -from .._core._tests.tutil import gc_collect_harder, no_other_refs, slow +from .._core._tests.tutil import no_other_refs, slow from .._threads import ( active_thread_count, current_default_thread_limiter, diff --git a/src/trio/_threads.py b/src/trio/_threads.py index 1eacc5de1e..f8ee31c3e2 100644 --- a/src/trio/_threads.py +++ b/src/trio/_threads.py @@ -390,7 +390,7 @@ async def to_thread_run_sync( # This function gets scheduled into the Trio run loop to deliver the # thread's result. - def report_back_in_trio_thread_fn(result: outcome.Outcome[RetT]) -> None: + def report_back_in_trio_thread_fn(result: _SupportsUnwrap[RetT]) -> None: def do_release_then_return_result() -> RetT: # release_on_behalf_of is an arbitrary user-defined method, so it # might raise an error. If it does, we want that error to @@ -401,16 +401,15 @@ def do_release_then_return_result() -> RetT: finally: limiter.release_on_behalf_of(placeholder) - result = outcome.capture(do_release_then_return_result) + result: _SupportsUnwrap[RetT] = outcome.capture(do_release_then_return_result) if isinstance(result, outcome.Error): - result2: _SupportsUnwrap[RetT] = _Error(result.error) + result = _Error(result.error) elif isinstance(result, outcome.Value): - result2 = _Value(result.value) + result = _Value(result.value) else: raise RuntimeError("invalid outcome") - del result if task_register[0] is not None: - trio.lowlevel.reschedule(task_register[0], outcome.Value(result2)) + trio.lowlevel.reschedule(task_register[0], outcome.Value(result)) current_trio_token = trio.lowlevel.current_trio_token() From f158761d97ccaf6224ca447be9ce2a30ea77f072 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Fri, 21 Mar 2025 21:01:28 +0000 Subject: [PATCH 11/16] fix typing --- src/trio/_threads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/trio/_threads.py b/src/trio/_threads.py index f8ee31c3e2..8ea9eaec3b 100644 --- a/src/trio/_threads.py +++ b/src/trio/_threads.py @@ -401,7 +401,7 @@ def do_release_then_return_result() -> RetT: finally: limiter.release_on_behalf_of(placeholder) - result: _SupportsUnwrap[RetT] = outcome.capture(do_release_then_return_result) + result = outcome.capture(do_release_then_return_result) if isinstance(result, outcome.Error): result = _Error(result.error) elif isinstance(result, outcome.Value): From 09de6b9483aff65ca57d26c900a2f8f753efd7ff Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Fri, 21 Mar 2025 21:03:36 +0000 Subject: [PATCH 12/16] get the right frame for no_other_refs --- src/trio/_core/_tests/tutil.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/trio/_core/_tests/tutil.py b/src/trio/_core/_tests/tutil.py index b497e03c7b..80e8197ab5 100644 --- a/src/trio/_core/_tests/tutil.py +++ b/src/trio/_core/_tests/tutil.py @@ -120,7 +120,7 @@ def create_asyncio_future_in_new_loop() -> asyncio.Future[object]: if sys.version_info >= (3, 14): def no_other_refs() -> list[object]: - gen = sys._getframe().f_generator + gen = sys._getframe(1).f_generator return [] if gen is None else [gen] elif sys.version_info >= (3, 11): From 1f1339c5f9977efbbfa4e5f50f4fabc3ec255fee Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 22 Mar 2025 09:25:04 +0000 Subject: [PATCH 13/16] no-cover invalid outcome --- src/trio/_threads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/trio/_threads.py b/src/trio/_threads.py index 8ea9eaec3b..7f22afe875 100644 --- a/src/trio/_threads.py +++ b/src/trio/_threads.py @@ -406,7 +406,7 @@ def do_release_then_return_result() -> RetT: result = _Error(result.error) elif isinstance(result, outcome.Value): result = _Value(result.value) - else: + else: # pragma: no cover raise RuntimeError("invalid outcome") if task_register[0] is not None: trio.lowlevel.reschedule(task_register[0], outcome.Value(result)) From 6797ec41845844c4edcdb4eace8e0bae95532f6e Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 22 Mar 2025 16:51:26 +0000 Subject: [PATCH 14/16] use outcome that invalidates --- ci.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/ci.sh b/ci.sh index f414d94a2c..08f51ecb5b 100755 --- a/ci.sh +++ b/ci.sh @@ -46,6 +46,7 @@ python -m uv pip install build python -m build wheel_package=$(ls dist/*.whl) python -m uv pip install "trio @ $wheel_package" -c test-requirements.txt +python -m uv pip install https://github.com/python-trio/outcome/archive/e0f317813a499f1a3629b37c3b8caed72825d9c0.zip # Actual tests # expands to 0 != 1 if NO_TEST_REQUIREMENTS is not set, if set the `-0` has no effect From 33bb48d686e1833a429d0712a63a6b4ee493b6c5 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 22 Mar 2025 16:57:04 +0000 Subject: [PATCH 15/16] remove code that worked around Outcome holding values/errors after unwrap --- src/trio/_core/_thread_cache.py | 2 -- src/trio/_threads.py | 40 ++++----------------------------- 2 files changed, 4 insertions(+), 38 deletions(-) diff --git a/src/trio/_core/_thread_cache.py b/src/trio/_core/_thread_cache.py index 5d26065b73..44820e7711 100644 --- a/src/trio/_core/_thread_cache.py +++ b/src/trio/_core/_thread_cache.py @@ -188,8 +188,6 @@ def _handle_job(self) -> None: except BaseException as e: print("Exception while delivering result of thread", file=sys.stderr) traceback.print_exception(type(e), e, e.__traceback__) - finally: - del result def _work(self) -> None: while True: diff --git a/src/trio/_threads.py b/src/trio/_threads.py index 7f22afe875..ae40b5d712 100644 --- a/src/trio/_threads.py +++ b/src/trio/_threads.py @@ -6,7 +6,7 @@ import queue as stdlib_queue import threading from itertools import count -from typing import TYPE_CHECKING, Final, Generic, NoReturn, Protocol, TypeVar +from typing import TYPE_CHECKING, Generic, TypeVar import attrs import outcome @@ -254,32 +254,6 @@ def run_in_system_nursery(self, token: TrioToken) -> None: token.run_sync_soon(self.run_sync) -class _SupportsUnwrap(Protocol, Generic[T_co]): - def unwrap(self) -> T_co: ... - - -class _Value(_SupportsUnwrap[T_co]): - def __init__(self, v: T_co) -> None: - self._v: Final = v - - def unwrap(self) -> T_co: - try: - return self._v - finally: - del self._v - - -class _Error(_SupportsUnwrap[NoReturn]): - def __init__(self, e: BaseException) -> None: - self._e: Final = e - - def unwrap(self) -> NoReturn: - try: - raise self._e - finally: - del self._e - - @enable_ki_protection async def to_thread_run_sync( sync_fn: Callable[[Unpack[Ts]], RetT], @@ -390,7 +364,7 @@ async def to_thread_run_sync( # This function gets scheduled into the Trio run loop to deliver the # thread's result. - def report_back_in_trio_thread_fn(result: _SupportsUnwrap[RetT]) -> None: + def report_back_in_trio_thread_fn(result: outcome.Outcome[RetT]) -> None: def do_release_then_return_result() -> RetT: # release_on_behalf_of is an arbitrary user-defined method, so it # might raise an error. If it does, we want that error to @@ -402,12 +376,6 @@ def do_release_then_return_result() -> RetT: limiter.release_on_behalf_of(placeholder) result = outcome.capture(do_release_then_return_result) - if isinstance(result, outcome.Error): - result = _Error(result.error) - elif isinstance(result, outcome.Value): - result = _Value(result.value) - else: # pragma: no cover - raise RuntimeError("invalid outcome") if task_register[0] is not None: trio.lowlevel.reschedule(task_register[0], outcome.Value(result)) @@ -473,11 +441,11 @@ def abort(raise_cancel: RaiseCancelT) -> trio.lowlevel.Abort: while True: # wait_task_rescheduled return value cannot be typed - msg_from_thread: _Value[RetT] | _Error | Run[object] | RunSync[object] = ( + msg_from_thread: outcome.Outcome[RetT] | Run[object] | RunSync[object] = ( await trio.lowlevel.wait_task_rescheduled(abort) ) try: - if isinstance(msg_from_thread, (_Value, _Error)): + if isinstance(msg_from_thread, outcome.Outcome): return msg_from_thread.unwrap() elif isinstance(msg_from_thread, Run): await msg_from_thread.run() From 0987dc625df5546e9327e15c2714869e6656fd77 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 22 Mar 2025 16:59:07 +0000 Subject: [PATCH 16/16] remove unused typevar T_co --- src/trio/_threads.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/trio/_threads.py b/src/trio/_threads.py index ae40b5d712..1894799ec7 100644 --- a/src/trio/_threads.py +++ b/src/trio/_threads.py @@ -36,7 +36,6 @@ Ts = TypeVarTuple("Ts") RetT = TypeVar("RetT") -T_co = TypeVar("T_co", covariant=True) class _ParentTaskData(threading.local):