Use telethon session CLI argument always; improvements to Telegram transformer (author id/username for chats, min_id via CLI argument, use the same session)

This commit is contained in:
Logan Williams
2023-03-04 09:51:15 +01:00
parent 7d55eace3d
commit 2320ea1efd
3 changed files with 61 additions and 28 deletions

42
app.py
View File

@@ -35,12 +35,17 @@ def get_db_session():
return session return session
def get_scraper_controller(telethon_session_name = None): def get_scraper_controller(args):
engine = create_engine(os.environ["DB"]) engine = create_engine(os.environ["DB"])
controller = ScraperController() controller = ScraperController()
controller.connect_to_db(engine) controller.connect_to_db(engine)
if args.telethon_session:
telethon_session_name = args.telethon_session
else:
telethon_session_name = None
scrapers = [ #VkontakteScraper(), scrapers = [ #VkontakteScraper(),
TelegramTelethonScraper(telethon_session_name = telethon_session_name), TelegramTelethonScraper(telethon_session_name = telethon_session_name),
GettrScraper(), GettrScraper(),
@@ -51,14 +56,19 @@ def get_scraper_controller(telethon_session_name = None):
return controller return controller
def get_transformer_controller(): def get_transformer_controller(args):
engine = create_engine(os.environ["DB"]) engine = create_engine(os.environ["DB"])
controller = ETLController() controller = ETLController()
controller.connect_to_db(engine) controller.connect_to_db(engine)
if args.telethon_session:
telethon_session_name = args.telethon_session
else:
telethon_session_name = None
transformers = [ #VkontakteTransformer(), transformers = [ #VkontakteTransformer(),
TelegramTelethonTransformer(), TelegramTelethonTransformer(telethon_session_name = telethon_session_name),
GettrTransformer(), GettrTransformer(),
BitchuteTransformer(), BitchuteTransformer(),
RumbleTransformer()] RumbleTransformer()]
@@ -71,29 +81,26 @@ def get_transformer_controller():
def scrape_channels(args): def scrape_channels(args):
logger.info(f"Scraping channels, media: {args.media}") logger.info(f"Scraping channels, media: {args.media}")
controller = get_scraper_controller() controller = get_scraper_controller(args)
controller.scrape_all_channels(archive_media=args.media) controller.scrape_all_channels(archive_media=args.media)
def scrape_channels_old(args): def scrape_channels_old(args):
logger.info(f"Scraping old posts from channels, media: {args.media}") logger.info(f"Scraping old posts from channels, media: {args.media}")
controller = get_scraper_controller() controller = get_scraper_controller(args)
controller.scrape_all_channels(archive_media=args.media, fetch_old=True) controller.scrape_all_channels(archive_media=args.media, fetch_old=True)
def scrape_channel_info(args): def scrape_channel_info(args):
logger.info(f"Scraping channel info") logger.info(f"Scraping channel info")
controller = get_scraper_controller() controller = get_scraper_controller(args)
controller.scrape_all_channel_info() controller.scrape_all_channel_info()
def archive_media(args): def archive_media(args):
logger.info(f"Archiving unarchived media") logger.info(f"Archiving unarchived media")
if args.telethon_session: controller = get_scraper_controller(args)
controller = get_scraper_controller(telethon_session_name=args.telethon_session)
else:
controller = get_scraper_controller()
if args.chronological: if args.chronological:
controller.archive_unarchived_media(chronological=True) controller.archive_unarchived_media(chronological=True)
@@ -103,13 +110,19 @@ def archive_media(args):
def transform(args): def transform(args):
logger.info(f"Transforming untransformed posts") logger.info(f"Transforming untransformed posts")
controller = get_transformer_controller() controller = get_transformer_controller(args)
controller.transform_all_untransformed()
if args.min_id:
min_id = int(args.min_id)
else:
min_id = 0
controller.transform_all_untransformed(min_id=min_id)
def transform_info(args): def transform_info(args):
logger.info(f"Transforming untransformed channel info") logger.info(f"Transforming untransformed channel info")
controller = get_transformer_controller() controller = get_transformer_controller(args)
controller.transform_all_untransformed_info() controller.transform_all_untransformed_info()
# sync_channels(args, get_db_session()) # sync_channels(args, get_db_session())
@@ -117,7 +130,7 @@ def transform_info(args):
def transform_media(args): def transform_media(args):
logger.info(f"Transforming untransformed channel media") logger.info(f"Transforming untransformed channel media")
controller = get_transformer_controller() controller = get_transformer_controller(args)
controller.transform_all_untransformed_media() controller.transform_all_untransformed_media()
def init_db(): def init_db():
@@ -143,6 +156,7 @@ if __name__ == "__main__":
) )
parser.add_argument("--chronological", action="store_true") parser.add_argument("--chronological", action="store_true")
parser.add_argument("--telethon_session", type=str) parser.add_argument("--telethon_session", type=str)
parser.add_argument("--min_id", type=int)
args = parser.parse_args() args = parser.parse_args()

View File

@@ -218,7 +218,7 @@ class ETLController:
logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})") logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})")
@logger.catch(reraise=True) @logger.catch(reraise=True)
def transform_all_untransformed(self, hydrate: bool = True): def transform_all_untransformed(self, hydrate: bool = True, min_id=0):
"""Transform all ScraperResult objects in the database that do not have an """Transform all ScraperResult objects in the database that do not have an
equivalent Post object stored. equivalent Post object stored.
@@ -242,7 +242,7 @@ class ETLController:
batch = (session.query(ScraperResult) batch = (session.query(ScraperResult)
.join(Post, isouter=True) .join(Post, isouter=True)
.where(ScraperResult.id > 35000000) # TODO this can be a CLI argument or something .where(ScraperResult.id > min_id)
.where(Post.raw_id == None) .where(Post.raw_id == None)
.order_by(ScraperResult.date.asc()) .order_by(ScraperResult.date.asc())
.limit(BATCH_SIZE) .limit(BATCH_SIZE)
@@ -255,11 +255,12 @@ class ETLController:
logger.info(f"Fetching untransformed posts batch of {BATCH_SIZE}, offset {max(batch, key=lambda v: v.date).date}") logger.info(f"Fetching untransformed posts batch of {BATCH_SIZE}, offset {max(batch, key=lambda v: v.date).date}")
batch = (query(ScraperResult) batch = (session.query(ScraperResult)
.join(Post, isouter=True) .join(Post, isouter=True)
.where(ScraperResult.id > 35000000) # TODO this can be a CLI argument or something .where(ScraperResult.id > min_id)
.where(Post.raw_id == None) .where(Post.raw_id == None)
.where(ScraperResult.date >= max(batch, key=lambda v: v.date).date) .where(ScraperResult.id != batch[-1].id)
.where(ScraperResult.date >= batch[-1].date)
.order_by(ScraperResult.date.asc()) .order_by(ScraperResult.date.asc())
.limit(BATCH_SIZE) .limit(BATCH_SIZE)
).all() ).all()

View File

@@ -20,7 +20,7 @@ from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Ima
class TelegramTelethonTransformer(Transformer): class TelegramTelethonTransformer(Transformer):
__version__ = 'TelegramTelethonTransformer 0.0.3' __version__ = 'TelegramTelethonTransformer 0.0.4'
bad_channels = {} bad_channels = {}
@@ -30,18 +30,28 @@ class TelegramTelethonTransformer(Transformer):
return True return True
return False return False
def __init__(self, telethon_session_name = None):
super().__init__()
def get_screenname_from_id(self, channel_id):
api_id = os.environ['TELEGRAM_API_ID'] api_id = os.environ['TELEGRAM_API_ID']
api_hash = os.environ['TELEGRAM_API_HASH'] api_hash = os.environ['TELEGRAM_API_HASH']
phone = os.environ['TELEGRAM_PHONE']
if telethon_session_name is None:
telethon_session_name = phone
# set up a persistent client for Telethon
self.client = TelegramClient(telethon_session_name, api_id, api_hash)
self.client.connect()
def get_screenname_from_id(self, channel_id):
try: try:
with TelegramClient("transform.session", api_id, api_hash) as client: data = self.client.get_entity(channel_id)
data = client.get_entity(channel_id) if isinstance(data, types.User):
if isinstance(data, types.User): return (data.username, str(data.first_name or "") + " " + str(data.last_name or ""), "")
return (data.username, str(data.first_name or "") + " " + str(data.last_name or ""), "") else:
else: return (data.username, data.title, "")
return (data.username, data.title, "")
except ChannelPrivateError: except ChannelPrivateError:
logger.info("ChannelPrivateError") logger.info("ChannelPrivateError")
return ("", "", "ChannelPrivateError") return ("", "", "ChannelPrivateError")
@@ -231,6 +241,14 @@ class TelegramTelethonTransformer(Transformer):
url = "" url = ""
author_username = "" author_username = ""
author_id = raw.get('peer_id', {}).get('channel_id')
if raw['from_id'] and 'user_id' in raw['from_id']:
author_id = raw['from_id']['user_id']
author_username = ""
(screenname, name, notes) = self.get_screenname_from_id(author_id)
if screenname:
author_username = screenname
transformed = Post( transformed = Post(
raw_id = data.id, raw_id = data.id,
platform_id = raw['id'], platform_id = raw['id'],
@@ -243,7 +261,7 @@ class TelegramTelethonTransformer(Transformer):
date_transformed=datetime.now(timezone.utc), date_transformed=datetime.now(timezone.utc),
url=url, url=url,
content=add_markdown_links(raw), content=add_markdown_links(raw),
author_id=raw.get('peer_id', {}).get('channel_id'), author_id=author_id,
author_username=author_username, author_username=author_username,
forwarded_from=fwd_from, forwarded_from=fwd_from,
reply_to=reply_to, reply_to=reply_to,