Add Telegram transformer for channel info

This commit is contained in:
Logan Williams
2022-05-18 09:17:49 +01:00
parent 7f55b721dd
commit 6145fd0b6b
3 changed files with 145 additions and 13 deletions

View File

@@ -114,6 +114,49 @@ class RawChannelInfo:
#: Datetime (relative to UTC) that the scraped post was archived at.
date_archived: datetime
@dataclass
class ChannelInfo:
"""A processed set of information about a channel.
"""
# Foreign key from the raw_channel_info table
raw_channel_info_id: int
# Foreign ckey from the channels table
channel: int
# platform specific ID of the channel
platform_id: str
#: Name of platform from which result was scraped, e.g. ``"Twitter"``.
platform: str
#: String specifying name and version of scraper used to generate result, e.g. ``"TwitterScraper 0.0.1"``.
scraper: str
#: String specifying name and version of transformer used to tranform result, e.g. ``"TwitterTransformer 0.0.1"``.
transformer: str
#: attributes extracted from the raw channel info object
screenname: str
name: str
description: str
description_url: str
description_location: str
followers: int
following: int
verified: bool
date_created: datetime
#: Datetime (relative to UTC) that the scraped channel info was archived at.
date_archived: datetime
#: Datetime (UTC) that the scraped channel info was transformed at.
date_transformed: datetime
def hydrate(self):
pass
nlp_en = spacy.load('en_core_web_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
nlp_de = spacy.load('de_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
nlp_it = spacy.load('it_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
@@ -349,6 +392,23 @@ raw_channel_info_table = Table('raw_channel_info', mapper_registry.metadata,
Column('raw_data', String),
Column('date_archived', DateTime, index=True))
channel_info_table = Table('channel_info', mapper_registry.metadata,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('raw_channel_info_id', Integer, ForeignKey('raw_channel_info.id'), index=True),
Column('channel', Integer, ForeignKey('channels.id'), index=True),
Column('screenname', String),
Column('name', String),
Column('description', String),
Column('description_url', String),
Column('description_location', String),
Column('followers', Integer),
Column('following', Integer),
Column('verified', Boolean),
Column('date_created', DateTime),
Column('date_archived', DateTime, index=True),
Column('date_transformed', DateTime, index=True)
)
channel_table = Table('channels', mapper_registry.metadata,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('name', String),
@@ -406,6 +466,7 @@ mapper_registry.map_imperatively(Post, post_table)
mapper_registry.map_imperatively(Channel, channel_table)
mapper_registry.map_imperatively(ScraperResult, raw_posts_table)
mapper_registry.map_imperatively(RawChannelInfo, raw_channel_info_table)
mapper_registry.map_imperatively(ChannelInfo, channel_info_table)
mapper_registry.map_imperatively(Media, media_table, polymorphic_on='type', polymorphic_identity='media')
mapper_registry.map_imperatively(Image, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='image')
mapper_registry.map_imperatively(Video, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='video')

View File

@@ -6,7 +6,7 @@ from sqlalchemy.sql.expression import func
from collections import defaultdict
from datetime import datetime
from cisticola.base import ScraperResult, Post, Media, Channel, mapper_registry
from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Media, Channel, mapper_registry
class Transformer:
@@ -194,27 +194,71 @@ class ETLController:
batch = []
query = (session.query(ScraperResult)
# .filter_by(platform="Telegram")
# .filter(ScraperResult.raw_data.notlike("%MessageService%"))
.join(Post, isouter=True)
.where(Post.raw_id == None)
# .order_by(func.random())
.order_by(ScraperResult.date.asc())
)
while len(batch) > 0 or offset == 0:
logger.info(f"Fetching untransformed batch of {BATCH_SIZE}, offset {offset}")
logger.info(f"Fetching untransformed posts batch of {BATCH_SIZE}, offset {offset}")
batch = query.slice(offset, offset + BATCH_SIZE).all()
# untransformed = (
# .limit(BATCH_SIZE)
# .offset(offset)
# .all()
# )
offset += BATCH_SIZE
logger.info(f"Found {len(batch)} items to ETL ({offset} already processed)")
self.transform_results(batch, hydrate=hydrate)
@logger.catch(reraise=True)
def transform_info(self, results: List[ChannelInfo]):
if self.session is None:
logger.error("No DB session")
return
session = self.session()
for result in results:
if result.scraper is not None and result.platform is not None:
for transformer in self.transformers:
handled = False
if transformer.can_handle(result):
logger.trace(f"{transformer} is handling raw info result {result.id} ({result.date})")
handled = True
transformer.transform(result, lambda obj: self.insert_or_select(obj, session, False), session)
session.commit()
break
if handled == False:
logger.warning(f"No Transformer could handle raw channel info ID {result.id} with platform {result.platform} ({result.date})")
@logger.catch(reraise=True)
def transform_all_untransformed_info(self):
if self.session is None:
logger.error("No DB session")
return
session = self.session()
BATCH_SIZE = 50000
offset = 0
batch = []
query = (session.query(RawChannelInfo)
.join(ChannelInfo, isouter=True)
.where(ChannelInfo.raw_channel_info_id == None)
.order_by(RawChannelInfo.date_archived.asc())
)
while len(batch) > 0 or offset == 0:
logger.info(f"Fetching untransformed info batch of {BATCH_SIZE}, offset {offset}")
batch = query.slice(offset, offset + BATCH_SIZE).all()
offset += BATCH_SIZE
logger.info(f"Found {len(batch)} info items to ETL ({offset} already processed)")
self.transform_info(batch)

View File

@@ -12,7 +12,7 @@ import os
from datetime import datetime, timezone
from cisticola.transformer.base import Transformer
from cisticola.base import ScraperResult, Post, Image, Video, Media, Channel
from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Image, Video, Media, Channel
class TelegramTelethonTransformer(Transformer):
@@ -88,6 +88,33 @@ class TelegramTelethonTransformer(Transformer):
return name
def transform_info(self, data: RawChannelInfo, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
chat_raw = raw['chats'][0]
transformed = ChannelInfo(
raw_channel_info_id=data.id,
channel=data.channel,
platform_id=raw['full_chat']['id'],
platform=data.platform,
scraper=data.scraper,
transformer=data.transformer,
screenname=chat_raw['username'],
name=chat_raw['title'],
description=raw['full_chat']['about'],
description_url='', # does not exist for Telegram
description_location='', # does not exist for Telegram
followers=raw['full_chat']['participants_count'],
following=-1, # does not exist for Telegram
verified=False, #does not exist for Telegram
date_created=dateutil.parser.parse(chat_raw['date']),
date_archived=data.date_archived,
date_transformed=datetime.now(timezone.utc)
)
transformed = insert(transformed)
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)