This commit is contained in:
Logan Williams
2022-07-28 09:18:34 +00:00
4 changed files with 55 additions and 17 deletions

View File

@@ -18,6 +18,9 @@ import spacy
from .utils import make_request
# Disable decompression bomb check
PIL.Image.MAX_IMAGE_PIXELS = 1024 * 1024 * 256
@dataclass
class ScraperResult:
"""A minimally processed result from a scraper
@@ -275,6 +278,10 @@ class Post:
except LangDetectException:
self.detected_language = ""
# Dutch (NL) is often misdetected as Afrikaans (af)
if self.detected_language == "af":
self.detected_language = "nl"
self.hydrate_spacy()
def hydrate_spacy(self):
@@ -493,7 +500,7 @@ post_table = Table('posts', mapper_registry.metadata,
Column('author_username', String),
Column('content', String),
Column('forwarded_from', Integer, ForeignKey('channels.id'), index=True),
Column('reply_to', Integer, ForeignKey('posts.id', ondelete="CASCADE"), index=True),
Column('reply_to', Integer, ForeignKey('posts.id'), index=True),
Column('named_entities', JSON),
Column('cryptocurrency_addresses', JSON),
Column('hashtags', JSON),
@@ -513,7 +520,7 @@ media_table = Table('media', mapper_registry.metadata,
autoincrement=True),
Column('type', String),
Column('raw_id', Integer, ForeignKey('raw_posts.id'), index=True),
Column('post', Integer, ForeignKey('posts.id')),
Column('post', Integer, ForeignKey('posts.id'), index=True),
Column('url', String),
Column('original_url', String),
Column('exif', String),

View File

@@ -32,7 +32,7 @@ class TelegramTelethonScraper(Scraper):
telethon_session_name = phone
# set up a persistent client for Telethon
self.client = TelegramClient(telethon_session_name, api_id, api_hash)
self.client = TelegramClient(telethon_session_name, api_id, api_hash)
self.client.connect()
def __del__(self):
@@ -145,9 +145,11 @@ class TelegramTelethonScraper(Scraper):
break
archived_urls = {}
media_archived = datetime(1970, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc)
if post.media is not None:
archived_urls[post_url] = None
media_archived = None
# if archive_media:
# blob, output_file_with_ext = self.archive_post_media(post, client)
@@ -165,7 +167,7 @@ class TelegramTelethonScraper(Scraper):
date_archived=datetime.now(timezone.utc),
raw_data=json.dumps(post.to_dict(), default=str),
archived_urls=archived_urls,
media_archived=datetime.now(timezone.utc) if archive_media else None)
media_archived=media_archived)
@logger.catch
def get_profile(self, channel: Channel) -> RawChannelInfo:

View File

@@ -349,23 +349,30 @@ class ETLController:
session = self.session()
BATCH_SIZE = 5000
offset = 0
batch = []
BATCH_SIZE = 50000
query = (session.query(ScraperResult, Post)
logger.info(f"Fetching first untransformed post media batch of {BATCH_SIZE}")
batch = (session.query(ScraperResult, Post)
.join(Post)
.join(Media, isouter=True)
.filter((ScraperResult.media_archived != None) & (cast(ScraperResult.archived_urls, String) != '{}') & (Media.id == None))
.order_by(ScraperResult.date.asc())
)
.order_by(ScraperResult.date.desc())
.limit(BATCH_SIZE)
).all()
while len(batch) > 0 or offset == 0:
logger.info(f"Fetching untransformed post media batch of {BATCH_SIZE}, offset {offset}")
batch = query.slice(offset, offset + BATCH_SIZE).all()
offset += BATCH_SIZE
logger.info(f"Found {len(batch)} items to ETL ({offset} already processed)")
while len(batch) > 0:
logger.info(f"Found {len(batch)} items to ETL")
self.transform_media(batch, hydrate=hydrate)
logger.info(f"Fetching untransformed post media batch of {BATCH_SIZE}, offset {min(batch, key=lambda v: v.ScraperResult.date).ScraperResult.date}")
batch = (session.query(ScraperResult, Post)
.join(Post)
.join(Media, isouter=True)
.where(ScraperResult.date <= min(batch, key=lambda v: v.ScraperResult.date).ScraperResult.date)
.filter((ScraperResult.media_archived != None) & (cast(ScraperResult.archived_urls, String) != '{}') & (Media.id == None))
.order_by(ScraperResult.date.desc())
.limit(BATCH_SIZE)
).all()

22
telethon_session_init.py Normal file
View File

@@ -0,0 +1,22 @@
import argparse
from telethon.sync import TelegramClient
import os
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Cisticola command line tools")
parser.add_argument("--telethon_session", type=str)
args = parser.parse_args()
api_id = os.environ['TELEGRAM_API_ID']
api_hash = os.environ['TELEGRAM_API_HASH']
phone = os.environ['TELEGRAM_PHONE']
telethon_session_name = args.telethon_session
if telethon_session_name is None:
telethon_session_name = phone
client = TelegramClient(telethon_session_name, api_id, api_hash)
client.start()
client.disconnect()