From 39358c7f23776278cf9e1a4bcf001961541b6b8f Mon Sep 17 00:00:00 2001 From: Logan Williams Date: Mon, 6 Jun 2022 16:36:39 +0200 Subject: [PATCH] Update platform ID and screenname when synchronizing with gsheet; highlight dupes --- app.py | 115 +--------------------- cisticola/base.py | 7 +- cisticola/scraper/telegram_telethon.py | 24 +++-- sync_with_gsheet.py | 130 +++++++++++++++++++++++++ 4 files changed, 156 insertions(+), 120 deletions(-) create mode 100644 sync_with_gsheet.py diff --git a/app.py b/app.py index db75263..0ede270 100644 --- a/app.py +++ b/app.py @@ -1,13 +1,11 @@ import argparse from loguru import logger -import gspread -from sqlalchemy import create_engine, func +from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker import os -import time import sys -from cisticola.base import Channel, mapper_registry +from cisticola.base import mapper_registry from cisticola.scraper import ( ScraperController, VkontakteScraper, @@ -17,112 +15,7 @@ from cisticola.scraper import ( RumbleScraper, ) from cisticola.transformer import (ETLController, TelegramTelethonTransformer) - - -def sync_channels(args): - logger.info("Synchronizing channels") - - session = get_db_session() - - gc = gspread.service_account(filename="service_account.json") - - # Open a sheet from a spreadsheet in one go - wks = gc.open_by_url(args.gsheet).worksheet("channels") - channels = wks.get_all_records() - row = 2 - - for c in channels: - if c["public"] == "": - c["public"] = False - if c["chat"] == "": - c["chat"] = False - - for k in c.keys(): - if c[k] == "TRUE" or c[k] == "yes": - c[k] = True - if c[k] == "FALSE" or c[k] == "no": - c[k] = False - - if c[k] == "": - c[k] = None - - del c["followers"] - - # add new channel - if c["id"] == "" or c["id"] is None: - del c["id"] - - # check to see if this already exists, - platform_id = None - if c["platform_id"] != "": - platform_id = c["platform_id"] - - channel = ( - session.query(Channel) - .filter_by( - platform_id=str(platform_id), platform=str(c["platform"]) - ) - .first() - ) - - if not channel: - channel = session.query(Channel).filter_by(platform=str(c["platform"]), url=str(c["url"])).first() - - if not channel and c["screenname"] != '' and c["screenname"] is not None: - channel = session.query(Channel).filter_by(platform=str(c["platform"]), screenname=str(c["screenname"])).first() - - if not channel: - channel = Channel(**c, source="researcher") - logger.debug(f"{channel} does not exist, adding") - session.add(channel) - session.flush() - session.commit() - - wks.update_cell(row, 1, channel.id) - time.sleep(1) - else: - logger.info(f"Channel found, updating channel {channel}") - channel.name = c["name"] - channel.category = c["category"] - channel.platform = c["platform"] - channel.url = c["url"] - channel.screenname = c["screenname"] - channel.country = c["country"] - channel.influencer = c["influencer"] - channel.public = c["public"] - channel.chat = c["chat"] - channel.notes = c["notes"] - channel.source = "researcher" - - session.flush() - session.commit() - - wks.update_cell(row, 1, channel.id) - time.sleep(1) - - else: - channel = session.query(Channel).filter_by(id=int(c["id"])).first() - - logger.info(f"Updating channel {channel}") - channel.name = c["name"] - channel.category = c["category"] - channel.platform = c["platform"] - channel.url = c["url"] - channel.screenname = c["screenname"] - channel.country = c["country"] - channel.influencer = c["influencer"] - channel.public = c["public"] - channel.chat = c["chat"] - channel.notes = c["notes"] - channel.source = "researcher" - - session.flush() - session.commit() - - row += 1 - - session.commit() - +from sync_with_gsheet import sync_channels def get_db_session(): engine = create_engine(os.environ["DB"]) @@ -223,7 +116,7 @@ if __name__ == "__main__": init_db() elif args.command == "sync-channels": logger.add("logs/sync-channels.log", level="TRACE", rotation="100 MB") - sync_channels(args) + sync_channels(args, get_db_session()) elif args.command == "scrape-channels": logger.add("logs/scrape-channels.log", level="TRACE", rotation="100 MB") scrape_channels(args) diff --git a/cisticola/base.py b/cisticola/base.py index 50bf0c7..f4a23f8 100644 --- a/cisticola/base.py +++ b/cisticola/base.py @@ -396,6 +396,11 @@ channel_info_table = Table('channel_info', mapper_registry.metadata, Column('id', Integer, primary_key=True, autoincrement=True), Column('raw_channel_info_id', Integer, ForeignKey('raw_channel_info.id'), index=True), Column('channel', Integer, ForeignKey('channels.id'), index=True), + Column('platform_id', String), + Column('scraper', String), + Column('platform', String), + Column('transformer', String), + Column('platform', String), Column('screenname', String), Column('name', String), Column('description', String), @@ -406,7 +411,7 @@ channel_info_table = Table('channel_info', mapper_registry.metadata, Column('verified', Boolean), Column('date_created', DateTime), Column('date_archived', DateTime, index=True), - Column('date_transformed', DateTime, index=True) + Column('date_transformed', DateTime, index=True), ) channel_table = Table('channels', mapper_registry.metadata, diff --git a/cisticola/scraper/telegram_telethon.py b/cisticola/scraper/telegram_telethon.py index 3091fda..1de9d98 100644 --- a/cisticola/scraper/telegram_telethon.py +++ b/cisticola/scraper/telegram_telethon.py @@ -20,12 +20,24 @@ class TelegramTelethonScraper(Scraper): """An implementation of a Scraper for Telegram, using Telethon library""" __version__ = "TelegramTelethonScraper 0.0.2" - def get_username_from_url(self, url): + def get_username_from_url(url): username = url.split('https://t.me/')[1] if username.startswith('s/'): username = username.split('s/')[1] return username + def get_channel_identifier(channel: Channel): + identifier = channel.platform_id + + if identifier is None: + identifier = channel.screenname + if identifier is None: + identifier = TelegramTelethonScraper.get_username_from_url(channel.url) + else: + identifier = int(identifier) + + return identifier + @logger.catch def archive_files(self, result: ScraperResult, client : TelegramClient = None) -> ScraperResult: if len(result.archived_urls.keys()) == 0: @@ -108,14 +120,12 @@ class TelegramTelethonScraper(Scraper): return (blob, output_file_with_ext) def can_handle(self, channel): - if channel.platform == "Telegram" and channel.public: + if channel.platform == "Telegram": return True @logger.catch def get_posts(self, channel: Channel, since: ScraperResult = None, archive_media: bool = True) -> Generator[ScraperResult, None, None]: - username = channel.screenname - if username is None: - username = self.get_username_from_url(channel.url) + username = TelegramTelethonScraper.get_channel_identifier(channel) api_id = os.environ['TELEGRAM_API_ID'] api_hash = os.environ['TELEGRAM_API_HASH'] @@ -156,9 +166,7 @@ class TelegramTelethonScraper(Scraper): @logger.catch def get_profile(self, channel: Channel) -> RawChannelInfo: - username = channel.screenname - if username is None: - username = self.get_username_from_url(channel.url) + username = TelegramTelethonScraper.get_channel_identifier(channel) api_id = os.environ['TELEGRAM_API_ID'] api_hash = os.environ['TELEGRAM_API_HASH'] diff --git a/sync_with_gsheet.py b/sync_with_gsheet.py new file mode 100644 index 0000000..d206e48 --- /dev/null +++ b/sync_with_gsheet.py @@ -0,0 +1,130 @@ +import gspread +import time +from loguru import logger + +from cisticola.base import Channel, ChannelInfo + +def sync_channels(args, session): + logger.info("Synchronizing channels") + + gc = gspread.service_account(filename="service_account.json") + + # Open a sheet from a spreadsheet in one go + wks = gc.open_by_url(args.gsheet).worksheet("channels") + channels = wks.get_all_records() + row = 2 + + for c in channels: + if c["public"] == "": + c["public"] = False + if c["chat"] == "": + c["chat"] = False + + for k in c.keys(): + if c[k] == "TRUE" or c[k] == "yes": + c[k] = True + if c[k] == "FALSE" or c[k] == "no": + c[k] = False + + if c[k] == "": + c[k] = None + + # add new channel + if c["id"] == "" or c["id"] is None: + del c["id"] + + # check to see if this already exists, + platform_id = None + if c["platform_id"] != "": + platform_id = c["platform_id"] + + channel = ( + session.query(Channel) + .filter_by( + platform_id=str(platform_id), platform=str(c["platform"]) + ) + .first() + ) + + if not channel: + channel = session.query(Channel).filter_by(platform=str(c["platform"]), url=str(c["url"])).first() + + if not channel and c["screenname"] != '' and c["screenname"] is not None: + channel = session.query(Channel).filter_by(platform=str(c["platform"]), screenname=str(c["screenname"])).first() + + if not channel: + channel = Channel(**c, source="researcher") + logger.debug(f"{channel} does not exist, adding") + session.add(channel) + session.flush() + session.commit() + + wks.update_cell(row, 1, channel.id) + time.sleep(1) + else: + logger.info(f"Channel found, updating channel {channel}") + was_researcher = channel.source == "researcher" + + channel.name = c["name"] + channel.category = c["category"] + channel.platform = c["platform"] + channel.url = c["url"] + channel.screenname = c["screenname"] + channel.country = c["country"] + channel.influencer = c["influencer"] + channel.public = c["public"] + channel.chat = c["chat"] + channel.notes = c["notes"] + channel.source = "researcher" + + session.flush() + session.commit() + + wks.update_cell(row, 1, channel.id) + + # this likely means that the channel was duplicated in the Google Sheet, so add a red highlight + if was_researcher: + wks.format(f"A{str(row)}:A{str(row)}", { + "backgroundColor": { + "red": 1.0, + "green": 0.0, + "blue": 0.0 + }}) + + time.sleep(1) + + # channel has ID + else: + cid = int(c["id"]) + + channel = session.query(Channel).filter_by(id=cid).first() + channel_info = session.query(ChannelInfo).filter_by(channel=cid).order_by(ChannelInfo.date_archived.desc()).first() + + logger.info(f"Updating channel {channel}") + logger.info(channel_info) + + channel.name = c["name"] + channel.category = c["category"] + channel.platform = c["platform"] + channel.url = c["url"] + channel.screenname = c["screenname"] + channel.country = c["country"] + channel.influencer = c["influencer"] + channel.public = c["public"] + channel.chat = c["chat"] + channel.notes = c["notes"] + channel.source = "researcher" + + if channel_info: + channel.screenname = channel_info.screenname + wks.update_cell(row, 7, channel_info.screenname) + + channel.platform_id = channel_info.platform_id + wks.update_cell(row, 3, channel_info.platform_id) + + session.flush() + session.commit() + + row += 1 + + session.commit()