From 3194fee95d8d93aa01e9298dc195506191a5b6a5 Mon Sep 17 00:00:00 2001 From: msramalho <19508417+msramalho@users.noreply.github.com> Date: Thu, 12 Mar 2026 10:20:11 +0000 Subject: [PATCH] fix telethon bug when running in celery workers that close the event loop --- .../telethon_extractor/telethon_extractor.py | 11 ++++ tests/extractors/test_telethon_extractor.py | 51 +++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/src/auto_archiver/modules/telethon_extractor/telethon_extractor.py b/src/auto_archiver/modules/telethon_extractor/telethon_extractor.py index 84d1a5b..279139d 100644 --- a/src/auto_archiver/modules/telethon_extractor/telethon_extractor.py +++ b/src/auto_archiver/modules/telethon_extractor/telethon_extractor.py @@ -1,3 +1,4 @@ +import asyncio import os import shutil import re @@ -53,6 +54,16 @@ class TelethonExtractor(Extractor): logger.debug(f"Making a copy of the session file {base_session_filepath} to {self.session_file}.session") shutil.copy(base_session_filepath, f"{self.session_file}.session") + # ensure a running event loop exists (Needed when used by Celery workers which may close the default one) + try: + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + # initiate the client self.client = TelegramClient(self.session_file, self.api_id, self.api_hash) diff --git a/tests/extractors/test_telethon_extractor.py b/tests/extractors/test_telethon_extractor.py index a1a5aa9..49785d4 100644 --- a/tests/extractors/test_telethon_extractor.py +++ b/tests/extractors/test_telethon_extractor.py @@ -1,3 +1,4 @@ +import asyncio import os from datetime import date @@ -60,3 +61,53 @@ def test_valid_url_regex(url, expected, get_lazy_module): def test_invite_pattern_regex(invite, expected, get_lazy_module): match = TelethonExtractor.invite_pattern.search(invite) assert bool(match) == expected + + +def test_setup_with_closed_event_loop(get_lazy_module, tmp_path, mocker): + """ + Simulate the Celery worker scenario where the asyncio event loop is closed + before setup() runs. The fix should create a new event loop so that + TelegramClient.start() does not raise 'Event loop is closed'. + """ + # create a session file so setup doesn't fail on missing file + session_file = tmp_path / "test.session" + session_file.touch() + + # close the current event loop to simulate a Celery worker environment + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.close() + + lazy_module = get_lazy_module("telethon_extractor") + module = lazy_module.load( + {"telethon_extractor": {"session_file": str(session_file), "api_id": 123, "api_hash": "ABC"}} + ) + + # setup should have succeeded and a new open event loop should exist + new_loop = asyncio.get_event_loop() + assert not new_loop.is_closed() + assert module.client is not None + + +def test_setup_with_no_event_loop(get_lazy_module, tmp_path, mocker): + """ + Simulate the scenario where there is no current event loop at all + (e.g. running in a non-main thread). The fix should create one. + """ + session_file = tmp_path / "test.session" + session_file.touch() + + # Remove the current event loop entirely + # In Python 3.12+, get_event_loop() in a non-main thread raises RuntimeError + mocker.patch("asyncio.get_event_loop", side_effect=RuntimeError("no current event loop")) + new_loop_mock = mocker.MagicMock() + new_loop_mock.is_closed.return_value = False + mocker.patch("asyncio.new_event_loop", return_value=new_loop_mock) + set_loop = mocker.patch("asyncio.set_event_loop") + + lazy_module = get_lazy_module("telethon_extractor") + lazy_module.load({"telethon_extractor": {"session_file": str(session_file), "api_id": 123, "api_hash": "ABC"}}) + + # a new event loop should have been created and set + asyncio.new_event_loop.assert_called_once() + set_loop.assert_called_once_with(new_loop_mock)