diff --git a/telebot/async_telebot.py b/telebot/async_telebot.py index 418cd39ee..99349a15c 100644 --- a/telebot/async_telebot.py +++ b/telebot/async_telebot.py @@ -193,6 +193,10 @@ def __init__(self, token: str, parse_mode: Optional[str]=None, offset: Optional[ self._user = None # set during polling self._polling = None + # Strong references to background tasks created via asyncio.create_task(). + # asyncio only keeps weak references, so unreferenced tasks can be GC'd + # mid-execution; see https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task + self._pending_tasks: set[asyncio.Task[Any]] = set() self.webhook_listener = None if validate_token: @@ -456,8 +460,10 @@ async def _process_polling(self, non_stop: bool=False, interval: int=0, timeout: updates = await self.get_updates(offset=self.offset, allowed_updates=allowed_updates, timeout=timeout, request_timeout=request_timeout) if updates: self.offset = updates[-1].update_id + 1 - # noinspection PyAsyncCall - asyncio.create_task(self.process_new_updates(updates)) # Seperate task for processing updates + # Retain a strong reference so the task isn't GC'd mid-execution. + task = asyncio.create_task(self.process_new_updates(updates)) + self._pending_tasks.add(task) + task.add_done_callback(self._pending_tasks.discard) if interval: await asyncio.sleep(interval) error_interval = 0.25 # drop error_interval if no errors diff --git a/tests/test_async_telebot.py b/tests/test_async_telebot.py new file mode 100644 index 000000000..fae838a52 --- /dev/null +++ b/tests/test_async_telebot.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +"""Unit tests for `telebot.async_telebot.AsyncTeleBot`. + +These tests are self-contained (no TOKEN/CHAT_ID required) and stub out all +network I/O. +""" +import asyncio + +from telebot import types +from telebot.async_telebot import AsyncTeleBot + + +def _make_fake_me() -> types.User: + return types.User.de_json({ + "id": 1, + "is_bot": True, + "first_name": "Test", + "username": "test_bot", + }) + + +def test_process_polling_retains_update_processing_tasks(): + """Regression test for issue #2572. + + Tasks fired by `_process_polling` for `process_new_updates` must be held + in `self._pending_tasks` while running and discarded on completion, so + they cannot be garbage-collected mid-execution. + """ + bot = AsyncTeleBot("1:fake", validate_token=False) + + task_was_tracked_during_run: list[bool] = [] + process_completed = asyncio.Event() + + async def fake_process_new_updates(updates): + current = asyncio.current_task() + task_was_tracked_during_run.append(current in bot._pending_tasks) + process_completed.set() + + async def fake_get_me(): + return _make_fake_me() + + # Deliver a single update batch, then stop polling on the next tick. + fake_update = types.Update.de_json({"update_id": 1}) + call_count = {"n": 0} + + async def fake_get_updates(*args, **kwargs): + call_count["n"] += 1 + if call_count["n"] == 1: + return [fake_update] + bot._polling = False + return [] + + async def noop(): + return None + + bot.get_me = fake_get_me + bot.get_updates = fake_get_updates + bot.process_new_updates = fake_process_new_updates + bot.close_session = noop # stub: no real aiohttp session in tests + + async def driver(): + await bot._process_polling(non_stop=True, interval=0, timeout=0) + # Allow the fire-and-forget task to finish plus one yield for the + # add_done_callback discard to run. A timeout guards against the + # stub ever being rewired such that the processing task never runs. + await asyncio.wait_for(process_completed.wait(), timeout=1) + await asyncio.sleep(0) + + asyncio.run(driver()) + + assert task_was_tracked_during_run == [True], ( + "In-flight processing task must be held by _pending_tasks" + ) + assert bot._pending_tasks == set(), ( + "Completed processing tasks must be discarded from _pending_tasks" + )