fix telethon bug when running in celery workers that close the event loop

This commit is contained in:
msramalho
2026-03-12 10:20:11 +00:00
parent 0040810e2e
commit 3194fee95d
2 changed files with 62 additions and 0 deletions

View File

@@ -1,3 +1,4 @@
import asyncio
import os
import shutil
import re
@@ -53,6 +54,16 @@ class TelethonExtractor(Extractor):
logger.debug(f"Making a copy of the session file {base_session_filepath} to {self.session_file}.session")
shutil.copy(base_session_filepath, f"{self.session_file}.session")
# ensure a running event loop exists (Needed when used by Celery workers which may close the default one)
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# initiate the client
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)

View File

@@ -1,3 +1,4 @@
import asyncio
import os
from datetime import date
@@ -60,3 +61,53 @@ def test_valid_url_regex(url, expected, get_lazy_module):
def test_invite_pattern_regex(invite, expected, get_lazy_module):
match = TelethonExtractor.invite_pattern.search(invite)
assert bool(match) == expected
def test_setup_with_closed_event_loop(get_lazy_module, tmp_path, mocker):
"""
Simulate the Celery worker scenario where the asyncio event loop is closed
before setup() runs. The fix should create a new event loop so that
TelegramClient.start() does not raise 'Event loop is closed'.
"""
# create a session file so setup doesn't fail on missing file
session_file = tmp_path / "test.session"
session_file.touch()
# close the current event loop to simulate a Celery worker environment
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.close()
lazy_module = get_lazy_module("telethon_extractor")
module = lazy_module.load(
{"telethon_extractor": {"session_file": str(session_file), "api_id": 123, "api_hash": "ABC"}}
)
# setup should have succeeded and a new open event loop should exist
new_loop = asyncio.get_event_loop()
assert not new_loop.is_closed()
assert module.client is not None
def test_setup_with_no_event_loop(get_lazy_module, tmp_path, mocker):
"""
Simulate the scenario where there is no current event loop at all
(e.g. running in a non-main thread). The fix should create one.
"""
session_file = tmp_path / "test.session"
session_file.touch()
# Remove the current event loop entirely
# In Python 3.12+, get_event_loop() in a non-main thread raises RuntimeError
mocker.patch("asyncio.get_event_loop", side_effect=RuntimeError("no current event loop"))
new_loop_mock = mocker.MagicMock()
new_loop_mock.is_closed.return_value = False
mocker.patch("asyncio.new_event_loop", return_value=new_loop_mock)
set_loop = mocker.patch("asyncio.set_event_loop")
lazy_module = get_lazy_module("telethon_extractor")
lazy_module.load({"telethon_extractor": {"session_file": str(session_file), "api_id": 123, "api_hash": "ABC"}})
# a new event loop should have been created and set
asyncio.new_event_loop.assert_called_once()
set_loop.assert_called_once_with(new_loop_mock)