diff --git a/app.py b/app.py index 7bd9082..c8221da 100644 --- a/app.py +++ b/app.py @@ -35,12 +35,17 @@ def get_db_session(): return session -def get_scraper_controller(telethon_session_name = None): +def get_scraper_controller(args): engine = create_engine(os.environ["DB"]) controller = ScraperController() controller.connect_to_db(engine) + if args.telethon_session: + telethon_session_name = args.telethon_session + else: + telethon_session_name = None + scrapers = [ #VkontakteScraper(), TelegramTelethonScraper(telethon_session_name = telethon_session_name), GettrScraper(), @@ -51,14 +56,19 @@ def get_scraper_controller(telethon_session_name = None): return controller -def get_transformer_controller(): +def get_transformer_controller(args): engine = create_engine(os.environ["DB"]) controller = ETLController() controller.connect_to_db(engine) + if args.telethon_session: + telethon_session_name = args.telethon_session + else: + telethon_session_name = None + transformers = [ #VkontakteTransformer(), - TelegramTelethonTransformer(), + TelegramTelethonTransformer(telethon_session_name = telethon_session_name), GettrTransformer(), BitchuteTransformer(), RumbleTransformer()] @@ -71,29 +81,26 @@ def get_transformer_controller(): def scrape_channels(args): logger.info(f"Scraping channels, media: {args.media}") - controller = get_scraper_controller() + controller = get_scraper_controller(args) controller.scrape_all_channels(archive_media=args.media) def scrape_channels_old(args): logger.info(f"Scraping old posts from channels, media: {args.media}") - controller = get_scraper_controller() + controller = get_scraper_controller(args) controller.scrape_all_channels(archive_media=args.media, fetch_old=True) def scrape_channel_info(args): logger.info(f"Scraping channel info") - controller = get_scraper_controller() + controller = get_scraper_controller(args) controller.scrape_all_channel_info() def archive_media(args): logger.info(f"Archiving unarchived media") - if args.telethon_session: - controller = get_scraper_controller(telethon_session_name=args.telethon_session) - else: - controller = get_scraper_controller() + controller = get_scraper_controller(args) if args.chronological: controller.archive_unarchived_media(chronological=True) @@ -103,13 +110,19 @@ def archive_media(args): def transform(args): logger.info(f"Transforming untransformed posts") - controller = get_transformer_controller() - controller.transform_all_untransformed() + controller = get_transformer_controller(args) + + if args.min_id: + min_id = int(args.min_id) + else: + min_id = 0 + + controller.transform_all_untransformed(min_id=min_id) def transform_info(args): logger.info(f"Transforming untransformed channel info") - controller = get_transformer_controller() + controller = get_transformer_controller(args) controller.transform_all_untransformed_info() # sync_channels(args, get_db_session()) @@ -117,7 +130,7 @@ def transform_info(args): def transform_media(args): logger.info(f"Transforming untransformed channel media") - controller = get_transformer_controller() + controller = get_transformer_controller(args) controller.transform_all_untransformed_media() def init_db(): @@ -143,6 +156,7 @@ if __name__ == "__main__": ) parser.add_argument("--chronological", action="store_true") parser.add_argument("--telethon_session", type=str) + parser.add_argument("--min_id", type=int) args = parser.parse_args() diff --git a/cisticola/transformer/base.py b/cisticola/transformer/base.py index 5ff5263..5ce5ed0 100644 --- a/cisticola/transformer/base.py +++ b/cisticola/transformer/base.py @@ -218,7 +218,7 @@ class ETLController: logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})") @logger.catch(reraise=True) - def transform_all_untransformed(self, hydrate: bool = True): + def transform_all_untransformed(self, hydrate: bool = True, min_id=0): """Transform all ScraperResult objects in the database that do not have an equivalent Post object stored. @@ -242,7 +242,7 @@ class ETLController: batch = (session.query(ScraperResult) .join(Post, isouter=True) - .where(ScraperResult.id > 35000000) # TODO this can be a CLI argument or something + .where(ScraperResult.id > min_id) .where(Post.raw_id == None) .order_by(ScraperResult.date.asc()) .limit(BATCH_SIZE) @@ -255,11 +255,12 @@ class ETLController: logger.info(f"Fetching untransformed posts batch of {BATCH_SIZE}, offset {max(batch, key=lambda v: v.date).date}") - batch = (query(ScraperResult) + batch = (session.query(ScraperResult) .join(Post, isouter=True) - .where(ScraperResult.id > 35000000) # TODO this can be a CLI argument or something + .where(ScraperResult.id > min_id) .where(Post.raw_id == None) - .where(ScraperResult.date >= max(batch, key=lambda v: v.date).date) + .where(ScraperResult.id != batch[-1].id) + .where(ScraperResult.date >= batch[-1].date) .order_by(ScraperResult.date.asc()) .limit(BATCH_SIZE) ).all() diff --git a/cisticola/transformer/telegram_telethon.py b/cisticola/transformer/telegram_telethon.py index d45da33..2524b57 100644 --- a/cisticola/transformer/telegram_telethon.py +++ b/cisticola/transformer/telegram_telethon.py @@ -20,7 +20,7 @@ from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Ima class TelegramTelethonTransformer(Transformer): - __version__ = 'TelegramTelethonTransformer 0.0.3' + __version__ = 'TelegramTelethonTransformer 0.0.4' bad_channels = {} @@ -30,18 +30,28 @@ class TelegramTelethonTransformer(Transformer): return True return False + + def __init__(self, telethon_session_name = None): + super().__init__() - def get_screenname_from_id(self, channel_id): api_id = os.environ['TELEGRAM_API_ID'] api_hash = os.environ['TELEGRAM_API_HASH'] + phone = os.environ['TELEGRAM_PHONE'] + if telethon_session_name is None: + telethon_session_name = phone + + # set up a persistent client for Telethon + self.client = TelegramClient(telethon_session_name, api_id, api_hash) + self.client.connect() + + def get_screenname_from_id(self, channel_id): try: - with TelegramClient("transform.session", api_id, api_hash) as client: - data = client.get_entity(channel_id) - if isinstance(data, types.User): - return (data.username, str(data.first_name or "") + " " + str(data.last_name or ""), "") - else: - return (data.username, data.title, "") + data = self.client.get_entity(channel_id) + if isinstance(data, types.User): + return (data.username, str(data.first_name or "") + " " + str(data.last_name or ""), "") + else: + return (data.username, data.title, "") except ChannelPrivateError: logger.info("ChannelPrivateError") return ("", "", "ChannelPrivateError") @@ -231,6 +241,14 @@ class TelegramTelethonTransformer(Transformer): url = "" author_username = "" + author_id = raw.get('peer_id', {}).get('channel_id') + if raw['from_id'] and 'user_id' in raw['from_id']: + author_id = raw['from_id']['user_id'] + author_username = "" + (screenname, name, notes) = self.get_screenname_from_id(author_id) + if screenname: + author_username = screenname + transformed = Post( raw_id = data.id, platform_id = raw['id'], @@ -243,7 +261,7 @@ class TelegramTelethonTransformer(Transformer): date_transformed=datetime.now(timezone.utc), url=url, content=add_markdown_links(raw), - author_id=raw.get('peer_id', {}).get('channel_id'), + author_id=author_id, author_username=author_username, forwarded_from=fwd_from, reply_to=reply_to,