From ae34cd42806107f7d371aaa852a7aa59030193cf Mon Sep 17 00:00:00 2001 From: Armin Burgmeier Date: Wed, 11 Feb 2026 17:18:19 +0100 Subject: [PATCH 1/5] Add `nest` to nest tinyio loops This allows to isolate them from each other so that an exception in a nested group does not affect coroutines running outside. --- tests/test_nest.py | 103 ++++++++++++++++++++++++++++++++++++++++ tinyio/__init__.py | 1 + tinyio/_integrations.py | 19 ++++++++ 3 files changed, 123 insertions(+) create mode 100644 tests/test_nest.py diff --git a/tests/test_nest.py b/tests/test_nest.py new file mode 100644 index 0000000..287a0de --- /dev/null +++ b/tests/test_nest.py @@ -0,0 +1,103 @@ +import pytest +import tinyio + + +class SingleElementQueue: + def __init__(self): + self._event = tinyio.Event() + self._elem = None + + def put(self, x): + if self._elem is not None: + raise ValueError("Queue is full") + + self._elem = x + self._event.set() + + def get(self): + while self._elem is None: + yield self._event.wait() + x = self._elem + self._elem = None + return x + + +@pytest.mark.parametrize("nest_g", (False, True)) +@pytest.mark.parametrize("nest_h", (False, True)) +def test_nest(nest_g: bool, nest_h: bool): + """Test that all coroutines make progress when some are nested""" + q1 = SingleElementQueue() + q2 = SingleElementQueue() + + # Intertwine two coroutines in such a way that they can only + # finish if both of them make progress at the same time, but + # not if one blocks until the other has completed. + def g() -> tinyio.Coro[int]: + q1.put(1) + x = yield q2.get() + q1.put(x + 1) + return (yield q2.get()) + + def h() -> tinyio.Coro[int]: + x = yield q1.get() + q2.put(x + 1) + x = yield q1.get() + q2.put(x + 1) + return x + + def maybe_nest(c: tinyio.Coro[int], nest: bool) -> tinyio.Coro[int]: + if nest: + return tinyio.nest(c) + else: + return c + + def f() -> tinyio.Coro[list[int]]: + return (yield [maybe_nest(g(), nest_g), maybe_nest(h(), nest_h)]) + + out = tinyio.Loop().run(f()) + assert out == [4, 3] + + +def test_nest_with_error_in_inner_loop(): + """Test that if an inner coroutine raises an exception, nested + coroutines are cancelled but outer ones keep running""" + q1 = SingleElementQueue() + q2 = SingleElementQueue() + + g_was_cancelled = True + i_was_cancelled = True + + def g() -> tinyio.Coro[int]: + nonlocal g_was_cancelled + q2.put(5) + yield tinyio.sleep(1) + g_was_cancelled = False + return 0 + + def h() -> tinyio.Coro[int]: + x = yield q1.get() + y = yield q2.get() + if x == 5 and y == 5: + raise RuntimeError("Kaboom") + return x + y + + def i() -> tinyio.Coro[int]: + nonlocal i_was_cancelled + q1.put(5) + yield tinyio.sleep(1) + i_was_cancelled = False + return 0 + + def nested() -> tinyio.Coro[list[int]]: + return (yield [h(), i()]) + + def f() -> tinyio.Coro[list[int | list[int]]]: + return (yield [g(), tinyio.nest(nested())]) + + try: + tinyio.Loop().run(f()) + except RuntimeError: + pass + + assert not g_was_cancelled + assert i_was_cancelled diff --git a/tinyio/__init__.py b/tinyio/__init__.py index 0e6cddc..6462160 100644 --- a/tinyio/__init__.py +++ b/tinyio/__init__.py @@ -8,6 +8,7 @@ from ._integrations import ( from_asyncio as from_asyncio, from_trio as from_trio, + nest as nest, to_asyncio as to_asyncio, to_trio as to_trio, ) diff --git a/tinyio/_integrations.py b/tinyio/_integrations.py index cf5afb8..50cc909 100644 --- a/tinyio/_integrations.py +++ b/tinyio/_integrations.py @@ -100,3 +100,22 @@ async def to_trio(coro: Coro[_Return], exception_group: None | bool = None) -> _ await trio.sleep(0) else: await trio.to_thread.run_sync(wait) + + +def nest(coro: Coro[_Return], exception_group: None | bool = None) -> Coro[_Return]: + """Runs a coroutine in a separate "inner" loop. + + In particular, this isolates coroutines running in the "outer" loop from exceptions + occurring from coroutines in the inner one, while still allowing corountines in both + loops to make progress simultaneously. + """ + with Loop().runtime(coro, exception_group) as gen: + while True: + try: + wait = next(gen) + except StopIteration as e: + return e.value + if wait is None: + yield + else: + yield run_in_thread(wait) From 5c59ccc985f3a944db7413aec432769374b1c31e Mon Sep 17 00:00:00 2001 From: Armin Burgmeier Date: Thu, 12 Feb 2026 05:28:35 +0100 Subject: [PATCH 2/5] Make test with exceptions work --- tests/test_nest.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/tests/test_nest.py b/tests/test_nest.py index 287a0de..88466e7 100644 --- a/tests/test_nest.py +++ b/tests/test_nest.py @@ -72,7 +72,7 @@ def g() -> tinyio.Coro[int]: q2.put(5) yield tinyio.sleep(1) g_was_cancelled = False - return 0 + return 1 def h() -> tinyio.Coro[int]: x = yield q1.get() @@ -86,18 +86,24 @@ def i() -> tinyio.Coro[int]: q1.put(5) yield tinyio.sleep(1) i_was_cancelled = False - return 0 + return 2 def nested() -> tinyio.Coro[list[int]]: return (yield [h(), i()]) - def f() -> tinyio.Coro[list[int | list[int]]]: - return (yield [g(), tinyio.nest(nested())]) + def try_nested() -> tinyio.Coro[list[int]]: + try: + return (yield from tinyio.nest(nested())) + except RuntimeError as e: + assert str(e) == "Kaboom" + return [-1, -1] + else: + assert False + + def f() -> tinyio.Coro[list[int]]: + return (yield [g(), try_nested()]) - try: - tinyio.Loop().run(f()) - except RuntimeError: - pass + assert tinyio.Loop().run(f()) == [1, [-1, -1]] assert not g_was_cancelled assert i_was_cancelled From 37a892039b2bdbab04a4d0554fae46d5319ab523 Mon Sep 17 00:00:00 2001 From: Armin Burgmeier Date: Mon, 16 Feb 2026 17:26:38 +1300 Subject: [PATCH 3/5] Use isolate() instead of nest() as public API --- tests/test_isolate.py | 152 ++++++++++++++++++++++++++++++++++++++++ tests/test_nest.py | 109 ---------------------------- tinyio/__init__.py | 2 +- tinyio/_integrations.py | 19 ----- tinyio/_isolate.py | 114 ++++++++++++++++++++++++++++++ 5 files changed, 267 insertions(+), 129 deletions(-) create mode 100644 tests/test_isolate.py delete mode 100644 tests/test_nest.py create mode 100644 tinyio/_isolate.py diff --git a/tests/test_isolate.py b/tests/test_isolate.py new file mode 100644 index 0000000..069455b --- /dev/null +++ b/tests/test_isolate.py @@ -0,0 +1,152 @@ +from collections.abc import Callable + +import pytest +import tinyio + + +class SingleElementQueue: + def __init__(self): + self._event = tinyio.Event() + self._elem = None + + def put(self, x): + if self._elem is not None: + raise ValueError("Queue is full") + + self._elem = x + self._event.set() + + def get(self): + while self._elem is None: + yield self._event.wait() + x = self._elem + self._elem = None + return x + + +@pytest.mark.parametrize("isolate_g", (False, True)) +@pytest.mark.parametrize("isolate_h", (False, True)) +def test_isolate(isolate_g: bool, isolate_h: bool): + """Test that all coroutines make progress when some are isolated""" + q1 = SingleElementQueue() + q2 = SingleElementQueue() + + # Intertwine two coroutines in such a way that they can only + # finish if both of them make progress at the same time, but + # not if one blocks until the other has completed. + def g() -> tinyio.Coro[int]: + q1.put(1) + x = yield q2.get() + q1.put(x + 1) + return (yield q2.get()) + + def h() -> tinyio.Coro[int]: + x = yield q1.get() + q2.put(x + 1) + x = yield q1.get() + q2.put(x + 1) + return x + + def maybe_isolate(c: Callable[[], tinyio.Coro[int]], isolate: bool) -> tinyio.Coro[int]: + def cleanup(e: BaseException) -> tinyio.Coro[int]: + del e + yield + return 999 + + if isolate: + x, _ = yield tinyio.isolate(c, cleanup) + return x + else: + return (yield c()) + + def f() -> tinyio.Coro[list[int]]: + return (yield [maybe_isolate(g, isolate_g), maybe_isolate(h, isolate_h)]) + + out = tinyio.Loop().run(f()) + assert out == [4, 3] + + +def test_isolate_with_error_in_inner_loop(): + """Test exceptions happening in the isolated loop. + + If an isolated coroutine raises an exception, all other coroutines within + the isolation are cancelled, but outer coroutines keep running.""" + q1 = SingleElementQueue() + q2 = SingleElementQueue() + q3 = SingleElementQueue() + + g_was_cancelled = True + i_was_cancelled = True + + def g() -> tinyio.Coro[int]: + nonlocal g_was_cancelled + q2.put(5) + yield q3.get() + g_was_cancelled = False + return 1 + + def h() -> tinyio.Coro[int]: + x = yield q1.get() + y = yield q2.get() + if x == 5 and y == 5: + raise RuntimeError("Kaboom") + return x + y + + def i() -> tinyio.Coro[int]: + nonlocal i_was_cancelled + q1.put(5) + yield tinyio.sleep(1) + i_was_cancelled = False + return 2 + + def isolated() -> tinyio.Coro[list[int]]: + return (yield [h(), i()]) + + def try_isolated() -> tinyio.Coro[list[int]]: + def cleanup(e: BaseException) -> tinyio.Coro[list[int]]: + assert str(e) == "Kaboom" + yield + return [-1, -1] + + x, _ = yield tinyio.isolate(isolated, cleanup) + q3.put(0) # wake up the "outer" loop g() + return x + + def f() -> tinyio.Coro[list[int]]: + return (yield [g(), try_isolated()]) + + assert tinyio.Loop().run(f()) == [1, [-1, -1]] + + assert not g_was_cancelled + assert i_was_cancelled + + +def test_isolate_with_args(): + """Test that isolate can be called with additional coroutines as arguments""" + + def slow_add_one(x: int) -> tinyio.Coro[int]: + yield + return x + 1 + + def unreliable_add_two(get_x: tinyio.Coro[int]) -> tinyio.Coro[int]: + x = yield get_x + if x == 3: + raise RuntimeError("That is too hard.") + else: + y = yield slow_add_one(x) + z = yield slow_add_one(y) + return z + + def cleanup(e: BaseException) -> tinyio.Coro[int]: + del e + yield + return 999 + + def try_add_three(x: int) -> tinyio.Coro[int]: + return (yield tinyio.isolate(unreliable_add_two, cleanup, slow_add_one(x))) + + assert tinyio.Loop().run(try_add_three(0)) == (3, True) + assert tinyio.Loop().run(try_add_three(1)) == (4, True) + assert tinyio.Loop().run(try_add_three(2)) == (999, False) + assert tinyio.Loop().run(try_add_three(3)) == (6, True) + assert tinyio.Loop().run(try_add_three(4)) == (7, True) diff --git a/tests/test_nest.py b/tests/test_nest.py deleted file mode 100644 index 88466e7..0000000 --- a/tests/test_nest.py +++ /dev/null @@ -1,109 +0,0 @@ -import pytest -import tinyio - - -class SingleElementQueue: - def __init__(self): - self._event = tinyio.Event() - self._elem = None - - def put(self, x): - if self._elem is not None: - raise ValueError("Queue is full") - - self._elem = x - self._event.set() - - def get(self): - while self._elem is None: - yield self._event.wait() - x = self._elem - self._elem = None - return x - - -@pytest.mark.parametrize("nest_g", (False, True)) -@pytest.mark.parametrize("nest_h", (False, True)) -def test_nest(nest_g: bool, nest_h: bool): - """Test that all coroutines make progress when some are nested""" - q1 = SingleElementQueue() - q2 = SingleElementQueue() - - # Intertwine two coroutines in such a way that they can only - # finish if both of them make progress at the same time, but - # not if one blocks until the other has completed. - def g() -> tinyio.Coro[int]: - q1.put(1) - x = yield q2.get() - q1.put(x + 1) - return (yield q2.get()) - - def h() -> tinyio.Coro[int]: - x = yield q1.get() - q2.put(x + 1) - x = yield q1.get() - q2.put(x + 1) - return x - - def maybe_nest(c: tinyio.Coro[int], nest: bool) -> tinyio.Coro[int]: - if nest: - return tinyio.nest(c) - else: - return c - - def f() -> tinyio.Coro[list[int]]: - return (yield [maybe_nest(g(), nest_g), maybe_nest(h(), nest_h)]) - - out = tinyio.Loop().run(f()) - assert out == [4, 3] - - -def test_nest_with_error_in_inner_loop(): - """Test that if an inner coroutine raises an exception, nested - coroutines are cancelled but outer ones keep running""" - q1 = SingleElementQueue() - q2 = SingleElementQueue() - - g_was_cancelled = True - i_was_cancelled = True - - def g() -> tinyio.Coro[int]: - nonlocal g_was_cancelled - q2.put(5) - yield tinyio.sleep(1) - g_was_cancelled = False - return 1 - - def h() -> tinyio.Coro[int]: - x = yield q1.get() - y = yield q2.get() - if x == 5 and y == 5: - raise RuntimeError("Kaboom") - return x + y - - def i() -> tinyio.Coro[int]: - nonlocal i_was_cancelled - q1.put(5) - yield tinyio.sleep(1) - i_was_cancelled = False - return 2 - - def nested() -> tinyio.Coro[list[int]]: - return (yield [h(), i()]) - - def try_nested() -> tinyio.Coro[list[int]]: - try: - return (yield from tinyio.nest(nested())) - except RuntimeError as e: - assert str(e) == "Kaboom" - return [-1, -1] - else: - assert False - - def f() -> tinyio.Coro[list[int]]: - return (yield [g(), try_nested()]) - - assert tinyio.Loop().run(f()) == [1, [-1, -1]] - - assert not g_was_cancelled - assert i_was_cancelled diff --git a/tinyio/__init__.py b/tinyio/__init__.py index 6462160..cc2d7c4 100644 --- a/tinyio/__init__.py +++ b/tinyio/__init__.py @@ -8,10 +8,10 @@ from ._integrations import ( from_asyncio as from_asyncio, from_trio as from_trio, - nest as nest, to_asyncio as to_asyncio, to_trio as to_trio, ) +from ._isolate import isolate as isolate from ._sync import Barrier as Barrier, Lock as Lock, Semaphore as Semaphore from ._thread import ThreadPool as ThreadPool, run_in_thread as run_in_thread from ._time import TimeoutError as TimeoutError, sleep as sleep, timeout as timeout diff --git a/tinyio/_integrations.py b/tinyio/_integrations.py index 50cc909..cf5afb8 100644 --- a/tinyio/_integrations.py +++ b/tinyio/_integrations.py @@ -100,22 +100,3 @@ async def to_trio(coro: Coro[_Return], exception_group: None | bool = None) -> _ await trio.sleep(0) else: await trio.to_thread.run_sync(wait) - - -def nest(coro: Coro[_Return], exception_group: None | bool = None) -> Coro[_Return]: - """Runs a coroutine in a separate "inner" loop. - - In particular, this isolates coroutines running in the "outer" loop from exceptions - occurring from coroutines in the inner one, while still allowing corountines in both - loops to make progress simultaneously. - """ - with Loop().runtime(coro, exception_group) as gen: - while True: - try: - wait = next(gen) - except StopIteration as e: - return e.value - if wait is None: - yield - else: - yield run_in_thread(wait) diff --git a/tinyio/_isolate.py b/tinyio/_isolate.py new file mode 100644 index 0000000..73839ed --- /dev/null +++ b/tinyio/_isolate.py @@ -0,0 +1,114 @@ +from collections.abc import Callable +from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar + +import tinyio + + +_P = ParamSpec("_P") +_T = TypeVar("_T") +_R = TypeVar("_R") + + +def _dupe(coro: tinyio.Coro[_T]) -> tuple[tinyio.Coro[None], tinyio.Coro[_T]]: + """Takes a coro assumed to be scheduled on an event loop, and returns: + + - a new coroutine that should be scheduled in the background of the same loop; + - a new coroutine that can be scheduled anywhere at all (typically a new loop), and + will return the same value as the original coroutine. + + Thus, this is a pipe through which two event loops can talk to one another. + """ + pipe = [] + done = tinyio.Event() + failed = tinyio.Event() + + def put_on_old_loop(): + try: + out = yield coro + except BaseException: + failed.set() + done.set() + raise + else: + pipe.append(out) + done.set() + + def put_on_new_loop(): + yield done.wait() + if failed.is_set(): + raise RuntimeError("Could not get input as underlying coroutine failed.") + else: + return pipe[0] + + return put_on_old_loop(), put_on_new_loop() + + +def _nest(coro: tinyio.Coro[_R], exception_group: None | bool = None) -> tinyio.Coro[_R]: + """Runs one tinyio event loop within another. + + The outer loop will be in control of the stepping. The inner loop will have a + separate collection of coroutines, which will be grouped and mutually shut down if + one of them produces an error. Thus, this provides a way to isolate a group of + coroutines within a broader collection. + """ + with tinyio.Loop().runtime(coro, exception_group) as gen: + while True: + try: + wait = next(gen) + except StopIteration as e: + return e.value + if wait is None: + yield + else: + yield tinyio.run_in_thread(wait) + + +def isolate( + fn: Callable[..., tinyio.Coro[_R]], cleanup: Callable[[BaseException], tinyio.Coro[_R]], /, *args: tinyio.Coro +) -> tinyio.Coro[tuple[_R, bool]]: + """Runs a coroutine in an isolated event loop, and if it fails then cleanup is ran. + + **Arguments:** + + - `fn`: a function that returns a tinyio coroutine. Will be called as `fn(*args)` in order to get the coroutine to + run. All coroutines that it depends on must be passed as `*args` (so that communication can be established + between the two loops). + - `cleanup`: if `fn(*args)` raises an error, then `cleanup(exception)` should provide a coroutine that can be called + to clean things up. + - `*args`: all coroutines that `fn` depends upon. + + **Returns:** + + A 2-tuple: + + - the first element is either the result of `fn(*args)` or `cleanup(exception)`. + - whether `fn(*args)` succeeded or failed. + """ + if args: + olds, news = zip(*map(_dupe, args), strict=True) + else: + olds, news = [], [] + yield set(olds) + try: + # This `yield from` is load bearing! We must not allow the tinyio event loop to + # interpose itself between the exception arising out of `fn(*news)`, and the + # current stack frame. Otherwise we would get a `CancelledError` here instead. + return (yield from _nest(fn(*news))), True + except BaseException as e: + return (yield cleanup(e)), False + + +# Stand back, some typing hackery required. +if TYPE_CHECKING: + + def _fn_signature(*args: tinyio.Coro[_T]): ... + + def _make_isolate( + fn: Callable[_P, Any], + ) -> Callable[ + Concatenate[Callable[_P, tinyio.Coro[_R]], Callable[[BaseException], tinyio.Coro[_R]], _P], + tinyio.Coro[tuple[_R, bool]], + ]: ... + + isolate = _make_isolate(_fn_signature) + del _fn_signature, _make_isolate From 362902ae13384ec8e83aff216207b3eebe32b97f Mon Sep 17 00:00:00 2001 From: Armin Burgmeier Date: Fri, 27 Feb 2026 23:00:14 +1300 Subject: [PATCH 4/5] Explicit boolean check and add exception_group parameter --- tinyio/_isolate.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tinyio/_isolate.py b/tinyio/_isolate.py index 73839ed..61a6abf 100644 --- a/tinyio/_isolate.py +++ b/tinyio/_isolate.py @@ -64,7 +64,11 @@ def _nest(coro: tinyio.Coro[_R], exception_group: None | bool = None) -> tinyio. def isolate( - fn: Callable[..., tinyio.Coro[_R]], cleanup: Callable[[BaseException], tinyio.Coro[_R]], /, *args: tinyio.Coro + fn: Callable[..., tinyio.Coro[_R]], + cleanup: Callable[[BaseException], tinyio.Coro[_R]], + /, + *args: tinyio.Coro, + exception_group: None | bool = None, ) -> tinyio.Coro[tuple[_R, bool]]: """Runs a coroutine in an isolated event loop, and if it fails then cleanup is ran. @@ -84,7 +88,7 @@ def isolate( - the first element is either the result of `fn(*args)` or `cleanup(exception)`. - whether `fn(*args)` succeeded or failed. """ - if args: + if len(args) > 0: olds, news = zip(*map(_dupe, args), strict=True) else: olds, news = [], [] @@ -93,7 +97,7 @@ def isolate( # This `yield from` is load bearing! We must not allow the tinyio event loop to # interpose itself between the exception arising out of `fn(*news)`, and the # current stack frame. Otherwise we would get a `CancelledError` here instead. - return (yield from _nest(fn(*news))), True + return (yield from _nest(fn(*news), exception_group=exception_group)), True except BaseException as e: return (yield cleanup(e)), False @@ -101,7 +105,7 @@ def isolate( # Stand back, some typing hackery required. if TYPE_CHECKING: - def _fn_signature(*args: tinyio.Coro[_T]): ... + def _fn_signature(*args: tinyio.Coro[_T], exception_group: None | bool = None): ... def _make_isolate( fn: Callable[_P, Any], From 7be49a44cbed236362cdfb37579c800fa4c53561 Mon Sep 17 00:00:00 2001 From: Armin Burgmeier Date: Mon, 2 Mar 2026 10:32:18 +1300 Subject: [PATCH 5/5] Return exception instead of calling `cleanup` with it --- tests/test_isolate.py | 30 +++++++++++------------------- tinyio/_isolate.py | 17 +++++++---------- 2 files changed, 18 insertions(+), 29 deletions(-) diff --git a/tests/test_isolate.py b/tests/test_isolate.py index 069455b..a7c4219 100644 --- a/tests/test_isolate.py +++ b/tests/test_isolate.py @@ -48,13 +48,8 @@ def h() -> tinyio.Coro[int]: return x def maybe_isolate(c: Callable[[], tinyio.Coro[int]], isolate: bool) -> tinyio.Coro[int]: - def cleanup(e: BaseException) -> tinyio.Coro[int]: - del e - yield - return 999 - if isolate: - x, _ = yield tinyio.isolate(c, cleanup) + x, _ = yield tinyio.isolate(c) return x else: return (yield c()) @@ -103,12 +98,10 @@ def isolated() -> tinyio.Coro[list[int]]: return (yield [h(), i()]) def try_isolated() -> tinyio.Coro[list[int]]: - def cleanup(e: BaseException) -> tinyio.Coro[list[int]]: - assert str(e) == "Kaboom" - yield - return [-1, -1] + x, success = yield tinyio.isolate(isolated) + if not success: + x = [-1, -1] - x, _ = yield tinyio.isolate(isolated, cleanup) q3.put(0) # wake up the "outer" loop g() return x @@ -137,16 +130,15 @@ def unreliable_add_two(get_x: tinyio.Coro[int]) -> tinyio.Coro[int]: z = yield slow_add_one(y) return z - def cleanup(e: BaseException) -> tinyio.Coro[int]: - del e - yield - return 999 - - def try_add_three(x: int) -> tinyio.Coro[int]: - return (yield tinyio.isolate(unreliable_add_two, cleanup, slow_add_one(x))) + def try_add_three(x: int) -> tinyio.Coro[tuple[int, bool]]: + return (yield tinyio.isolate(unreliable_add_two, slow_add_one(x))) assert tinyio.Loop().run(try_add_three(0)) == (3, True) assert tinyio.Loop().run(try_add_three(1)) == (4, True) - assert tinyio.Loop().run(try_add_three(2)) == (999, False) assert tinyio.Loop().run(try_add_three(3)) == (6, True) assert tinyio.Loop().run(try_add_three(4)) == (7, True) + + result, success = tinyio.Loop().run(try_add_three(2)) + assert not success + assert type(result) is RuntimeError + assert str(result) == "That is too hard." diff --git a/tinyio/_isolate.py b/tinyio/_isolate.py index 61a6abf..002d91f 100644 --- a/tinyio/_isolate.py +++ b/tinyio/_isolate.py @@ -65,28 +65,25 @@ def _nest(coro: tinyio.Coro[_R], exception_group: None | bool = None) -> tinyio. def isolate( fn: Callable[..., tinyio.Coro[_R]], - cleanup: Callable[[BaseException], tinyio.Coro[_R]], /, *args: tinyio.Coro, exception_group: None | bool = None, -) -> tinyio.Coro[tuple[_R, bool]]: - """Runs a coroutine in an isolated event loop, and if it fails then cleanup is ran. +) -> tinyio.Coro[tuple[_R | BaseException, bool]]: + """Runs a coroutine in an isolated event loop, and if it fails, returns the exception that occurred. **Arguments:** - `fn`: a function that returns a tinyio coroutine. Will be called as `fn(*args)` in order to get the coroutine to run. All coroutines that it depends on must be passed as `*args` (so that communication can be established between the two loops). - - `cleanup`: if `fn(*args)` raises an error, then `cleanup(exception)` should provide a coroutine that can be called - to clean things up. - `*args`: all coroutines that `fn` depends upon. **Returns:** A 2-tuple: - - the first element is either the result of `fn(*args)` or `cleanup(exception)`. - - whether `fn(*args)` succeeded or failed. + - the first element is either the result of `fn(*args)` or an exception. + - whether `fn(*args)` succeeded or raised an exception. """ if len(args) > 0: olds, news = zip(*map(_dupe, args), strict=True) @@ -99,7 +96,7 @@ def isolate( # current stack frame. Otherwise we would get a `CancelledError` here instead. return (yield from _nest(fn(*news), exception_group=exception_group)), True except BaseException as e: - return (yield cleanup(e)), False + return e, False # Stand back, some typing hackery required. @@ -110,8 +107,8 @@ def _fn_signature(*args: tinyio.Coro[_T], exception_group: None | bool = None): def _make_isolate( fn: Callable[_P, Any], ) -> Callable[ - Concatenate[Callable[_P, tinyio.Coro[_R]], Callable[[BaseException], tinyio.Coro[_R]], _P], - tinyio.Coro[tuple[_R, bool]], + Concatenate[Callable[_P, tinyio.Coro[_R]], _P], + tinyio.Coro[tuple[_R | BaseException, bool]], ]: ... isolate = _make_isolate(_fn_signature)