mirror of
https://github.com/bellingcat/cisticola.git
synced 2026-06-11 04:48:33 +03:00
Add features for running archive-media simultaneously
This commit is contained in:
17
app.py
17
app.py
@@ -35,14 +35,14 @@ def get_db_session():
|
||||
return session
|
||||
|
||||
|
||||
def get_scraper_controller():
|
||||
def get_scraper_controller(telethon_session_name = None):
|
||||
engine = create_engine(os.environ["DB"])
|
||||
|
||||
controller = ScraperController()
|
||||
controller.connect_to_db(engine)
|
||||
|
||||
scrapers = [VkontakteScraper(),
|
||||
TelegramTelethonScraper(),
|
||||
TelegramTelethonScraper(telethon_session_name = telethon_session_name),
|
||||
GettrScraper(),
|
||||
BitchuteScraper(),
|
||||
RumbleScraper()]
|
||||
@@ -85,8 +85,15 @@ def scrape_channel_info(args):
|
||||
def archive_media(args):
|
||||
logger.info(f"Archiving unarchived media")
|
||||
|
||||
controller = get_scraper_controller()
|
||||
controller.archive_unarchived_media()
|
||||
if args.telethon_session:
|
||||
controller = get_scraper_controller(telethon_session_name=args.telethon_session)
|
||||
else:
|
||||
controller = get_scraper_controller()
|
||||
|
||||
if args.chronological:
|
||||
controller.archive_unarchived_media(chronological=True)
|
||||
else:
|
||||
controller.archive_unarchived_media()
|
||||
|
||||
def transform(args):
|
||||
logger.info(f"Transforming untransformed posts")
|
||||
@@ -127,6 +134,8 @@ if __name__ == "__main__":
|
||||
parser.add_argument(
|
||||
"--media", action="store_true", help="[scrape-channels] Add this flag to media"
|
||||
)
|
||||
parser.add_argument("--chronological", action="store_true")
|
||||
parser.add_argument("--telethon_session", type=str)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
|
||||
@@ -354,6 +354,7 @@ class Media:
|
||||
blob = make_request(self.url)
|
||||
return blob.content
|
||||
|
||||
@logger.catch
|
||||
def hydrate(self, blob = None):
|
||||
"""Download media file as bytes blob and extract data from content.
|
||||
"""
|
||||
@@ -382,6 +383,7 @@ class Image(Media):
|
||||
#: Extracted OCR content from image
|
||||
ocr: str = None
|
||||
|
||||
@logger.catch
|
||||
def hydrate(self, blob=None):
|
||||
"""Download image file as bytes blob and extract Exif and OCR content
|
||||
from the image.
|
||||
@@ -510,7 +512,7 @@ media_table = Table('media', mapper_registry.metadata,
|
||||
Column('id', Integer, primary_key=True,
|
||||
autoincrement=True),
|
||||
Column('type', String),
|
||||
Column('raw_id', Integer, ForeignKey('raw_posts.id')),
|
||||
Column('raw_id', Integer, ForeignKey('raw_posts.id'), index=True),
|
||||
Column('post', Integer, ForeignKey('posts.id')),
|
||||
Column('url', String),
|
||||
Column('original_url', String),
|
||||
|
||||
@@ -408,11 +408,14 @@ class ScraperController:
|
||||
added = 0
|
||||
|
||||
# get most recent post
|
||||
# Note: a "bug" in Postgres can cause this query to hang for a really long time
|
||||
# when searching for a single row, hence the limit(10).all() when we really just need
|
||||
# the first row.
|
||||
rows = session.query(ScraperResult).where(
|
||||
ScraperResult.channel == channel.id).order_by(
|
||||
ScraperResult.date.desc()).limit(1).all()
|
||||
ScraperResult.date.desc()).limit(10).all()
|
||||
|
||||
if len(rows) == 1:
|
||||
if len(rows) > 0:
|
||||
since = rows[0]
|
||||
else:
|
||||
since = None
|
||||
@@ -424,9 +427,6 @@ class ScraperController:
|
||||
session.commit()
|
||||
added += 1
|
||||
|
||||
if added > 100:
|
||||
break
|
||||
|
||||
session.commit()
|
||||
logger.info(
|
||||
f"{scraper} found {added} new posts from {channel}")
|
||||
@@ -438,39 +438,44 @@ class ScraperController:
|
||||
session.close()
|
||||
|
||||
@logger.catch(reraise = True)
|
||||
def archive_unarchived_media(self):
|
||||
def archive_unarchived_media(self, chronological=False):
|
||||
if self.session is None:
|
||||
logger.error("No DB session")
|
||||
return
|
||||
|
||||
session = self.session()
|
||||
|
||||
# this query is really slow (~2.5 minutes) because of the shuffle. shuffling is so that multiple media archivers could work
|
||||
# simultaneously with low risk of collision (at least while the number of unarchived items is very large)
|
||||
posts = session.query(ScraperResult).where(ScraperResult.media_archived == None).order_by(func.random()).limit(4000).all()
|
||||
while True:
|
||||
if chronological:
|
||||
posts = session.query(ScraperResult).where(ScraperResult.media_archived == None).order_by(ScraperResult.date.desc()).limit(5000).all()
|
||||
else:
|
||||
# this query is really slow (~2.5 minutes) because of the shuffle. shuffling is so that multiple media archivers could work
|
||||
# simultaneously with low risk of collision (at least while the number of unarchived items is very large)
|
||||
posts = session.query(ScraperResult).where(ScraperResult.media_archived == None).order_by(func.random()).limit(5000).all()
|
||||
|
||||
logger.info(f"Found {len(posts)} posts without media. Archiving now")
|
||||
logger.info(f"Found {len(posts)} posts without media. Archiving now")
|
||||
|
||||
for post in posts:
|
||||
handled = False
|
||||
for post in posts:
|
||||
handled = False
|
||||
|
||||
for scraper in self.scrapers:
|
||||
# compare major versions
|
||||
if scraper.__version__.split('.')[0] == post.scraper.split('.')[0]:
|
||||
handled = True
|
||||
logger.debug(f"{scraper} is archiving media for ID {post.id}")
|
||||
post = scraper.archive_files(post)
|
||||
for scraper in self.scrapers:
|
||||
# compare major versions
|
||||
if scraper.__version__.split('.')[0] == post.scraper.split('.')[0]:
|
||||
handled = True
|
||||
logger.debug(f"{scraper} is archiving media for ID {post.id}")
|
||||
post = scraper.archive_files(post)
|
||||
|
||||
if post:
|
||||
session.query(ScraperResult).where(ScraperResult.id == post.id).update({'archived_urls': post.archived_urls, 'media_archived': post.media_archived})
|
||||
session.commit()
|
||||
if post:
|
||||
session.query(ScraperResult).where(ScraperResult.id == post.id).update({'archived_urls': post.archived_urls, 'media_archived': post.media_archived})
|
||||
session.commit()
|
||||
|
||||
break
|
||||
break
|
||||
|
||||
if not handled:
|
||||
logger.warning(f"No handler found for post scraped with {post.scraper}")
|
||||
|
||||
session.commit()
|
||||
|
||||
if not handled:
|
||||
logger.warning(f"No handler found for post scraped with {post.scraper}")
|
||||
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
@logger.catch(reraise = True)
|
||||
|
||||
@@ -21,15 +21,18 @@ class TelegramTelethonScraper(Scraper):
|
||||
__version__ = "TelegramTelethonScraper 0.0.3"
|
||||
client = None
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, telethon_session_name = None):
|
||||
super().__init__()
|
||||
|
||||
api_id = os.environ['TELEGRAM_API_ID']
|
||||
api_hash = os.environ['TELEGRAM_API_HASH']
|
||||
phone = os.environ['TELEGRAM_PHONE']
|
||||
|
||||
if telethon_session_name is None:
|
||||
telethon_session_name = phone
|
||||
|
||||
# set up a persistent client for Telethon
|
||||
self.client = TelegramClient(phone, api_id, api_hash)
|
||||
self.client = TelegramClient(telethon_session_name, api_id, api_hash)
|
||||
self.client.connect()
|
||||
|
||||
def __del__(self):
|
||||
@@ -70,6 +73,7 @@ class TelegramTelethonScraper(Scraper):
|
||||
message = self.client.get_messages(raw['peer_id']['channel_id'], ids=[raw['id']])
|
||||
|
||||
blob = None
|
||||
output_file_with_ext = None
|
||||
if len(message) > 0 and message[0] is not None:
|
||||
blob, output_file_with_ext = self.archive_post_media(message[0])
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user