mirror of
https://github.com/bellingcat/cisticola.git
synced 2026-06-08 03:18:34 +03:00
Merge pull request #55 from bellingcat/channel-info-transformers
Transformers for raw channel info
This commit is contained in:
8
app.py
8
app.py
@@ -189,6 +189,11 @@ def transform(args):
|
||||
controller = get_transformer_controller()
|
||||
controller.transform_all_untransformed()
|
||||
|
||||
def transform_info(args):
|
||||
logger.info(f"Transforming untransformed channel info")
|
||||
|
||||
controller = get_transformer_controller()
|
||||
controller.transform_all_untransformed_info()
|
||||
|
||||
def init_db():
|
||||
engine = create_engine(os.environ["DB"])
|
||||
@@ -231,5 +236,8 @@ if __name__ == "__main__":
|
||||
elif args.command == "transform":
|
||||
logger.add("logs/transform.log", level="TRACE", rotation="100 MB")
|
||||
transform(args)
|
||||
elif args.command == "transform-info":
|
||||
logger.add("logs/transform-info.log", level="TRACE", rotation="100 MB")
|
||||
transform_info(args)
|
||||
else:
|
||||
logger.error(f"Unrecognized command {args.command}")
|
||||
|
||||
@@ -114,6 +114,49 @@ class RawChannelInfo:
|
||||
#: Datetime (relative to UTC) that the scraped post was archived at.
|
||||
date_archived: datetime
|
||||
|
||||
@dataclass
|
||||
class ChannelInfo:
|
||||
"""A processed set of information about a channel.
|
||||
"""
|
||||
|
||||
# Foreign key from the raw_channel_info table
|
||||
raw_channel_info_id: int
|
||||
|
||||
# Foreign ckey from the channels table
|
||||
channel: int
|
||||
|
||||
# platform specific ID of the channel
|
||||
platform_id: str
|
||||
|
||||
#: Name of platform from which result was scraped, e.g. ``"Twitter"``.
|
||||
platform: str
|
||||
|
||||
#: String specifying name and version of scraper used to generate result, e.g. ``"TwitterScraper 0.0.1"``.
|
||||
scraper: str
|
||||
|
||||
#: String specifying name and version of transformer used to tranform result, e.g. ``"TwitterTransformer 0.0.1"``.
|
||||
transformer: str
|
||||
|
||||
#: attributes extracted from the raw channel info object
|
||||
screenname: str
|
||||
name: str
|
||||
description: str
|
||||
description_url: str
|
||||
description_location: str
|
||||
followers: int
|
||||
following: int
|
||||
verified: bool
|
||||
date_created: datetime
|
||||
|
||||
#: Datetime (relative to UTC) that the scraped channel info was archived at.
|
||||
date_archived: datetime
|
||||
|
||||
#: Datetime (UTC) that the scraped channel info was transformed at.
|
||||
date_transformed: datetime
|
||||
|
||||
def hydrate(self):
|
||||
pass
|
||||
|
||||
nlp_en = spacy.load('en_core_web_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
|
||||
nlp_de = spacy.load('de_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
|
||||
nlp_it = spacy.load('it_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
|
||||
@@ -349,6 +392,23 @@ raw_channel_info_table = Table('raw_channel_info', mapper_registry.metadata,
|
||||
Column('raw_data', String),
|
||||
Column('date_archived', DateTime, index=True))
|
||||
|
||||
channel_info_table = Table('channel_info', mapper_registry.metadata,
|
||||
Column('id', Integer, primary_key=True, autoincrement=True),
|
||||
Column('raw_channel_info_id', Integer, ForeignKey('raw_channel_info.id'), index=True),
|
||||
Column('channel', Integer, ForeignKey('channels.id'), index=True),
|
||||
Column('screenname', String),
|
||||
Column('name', String),
|
||||
Column('description', String),
|
||||
Column('description_url', String),
|
||||
Column('description_location', String),
|
||||
Column('followers', Integer),
|
||||
Column('following', Integer),
|
||||
Column('verified', Boolean),
|
||||
Column('date_created', DateTime),
|
||||
Column('date_archived', DateTime, index=True),
|
||||
Column('date_transformed', DateTime, index=True)
|
||||
)
|
||||
|
||||
channel_table = Table('channels', mapper_registry.metadata,
|
||||
Column('id', Integer, primary_key=True, autoincrement=True),
|
||||
Column('name', String),
|
||||
@@ -406,6 +466,7 @@ mapper_registry.map_imperatively(Post, post_table)
|
||||
mapper_registry.map_imperatively(Channel, channel_table)
|
||||
mapper_registry.map_imperatively(ScraperResult, raw_posts_table)
|
||||
mapper_registry.map_imperatively(RawChannelInfo, raw_channel_info_table)
|
||||
mapper_registry.map_imperatively(ChannelInfo, channel_info_table)
|
||||
mapper_registry.map_imperatively(Media, media_table, polymorphic_on='type', polymorphic_identity='media')
|
||||
mapper_registry.map_imperatively(Image, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='image')
|
||||
mapper_registry.map_imperatively(Video, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='video')
|
||||
@@ -6,7 +6,7 @@ from sqlalchemy.sql.expression import func
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
|
||||
from cisticola.base import ScraperResult, Post, Media, Channel, mapper_registry
|
||||
from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Media, Channel, mapper_registry
|
||||
|
||||
|
||||
class Transformer:
|
||||
@@ -194,27 +194,71 @@ class ETLController:
|
||||
batch = []
|
||||
|
||||
query = (session.query(ScraperResult)
|
||||
# .filter_by(platform="Telegram")
|
||||
# .filter(ScraperResult.raw_data.notlike("%MessageService%"))
|
||||
.join(Post, isouter=True)
|
||||
.where(Post.raw_id == None)
|
||||
# .order_by(func.random())
|
||||
.order_by(ScraperResult.date.asc())
|
||||
)
|
||||
|
||||
while len(batch) > 0 or offset == 0:
|
||||
logger.info(f"Fetching untransformed batch of {BATCH_SIZE}, offset {offset}")
|
||||
logger.info(f"Fetching untransformed posts batch of {BATCH_SIZE}, offset {offset}")
|
||||
|
||||
batch = query.slice(offset, offset + BATCH_SIZE).all()
|
||||
# untransformed = (
|
||||
|
||||
# .limit(BATCH_SIZE)
|
||||
# .offset(offset)
|
||||
# .all()
|
||||
# )
|
||||
|
||||
offset += BATCH_SIZE
|
||||
|
||||
logger.info(f"Found {len(batch)} items to ETL ({offset} already processed)")
|
||||
|
||||
self.transform_results(batch, hydrate=hydrate)
|
||||
|
||||
@logger.catch(reraise=True)
|
||||
def transform_info(self, results: List[ChannelInfo]):
|
||||
if self.session is None:
|
||||
logger.error("No DB session")
|
||||
return
|
||||
|
||||
session = self.session()
|
||||
|
||||
for result in results:
|
||||
if result.scraper is not None and result.platform is not None:
|
||||
for transformer in self.transformers:
|
||||
handled = False
|
||||
|
||||
if transformer.can_handle(result):
|
||||
logger.trace(f"{transformer} is handling raw info result {result.id} ({result.date_archived})")
|
||||
handled = True
|
||||
|
||||
transformer.transform_info(result, lambda obj: self.insert_or_select(obj, session, False), session)
|
||||
|
||||
session.commit()
|
||||
break
|
||||
|
||||
if handled == False:
|
||||
logger.warning(f"No Transformer could handle raw channel info ID {result.id} with platform {result.platform} ({result.date_archived})")
|
||||
|
||||
|
||||
@logger.catch(reraise=True)
|
||||
def transform_all_untransformed_info(self):
|
||||
if self.session is None:
|
||||
logger.error("No DB session")
|
||||
return
|
||||
|
||||
session = self.session()
|
||||
|
||||
BATCH_SIZE = 50000
|
||||
offset = 0
|
||||
batch = []
|
||||
|
||||
query = (session.query(RawChannelInfo)
|
||||
.join(ChannelInfo, isouter=True)
|
||||
.where(ChannelInfo.raw_channel_info_id == None)
|
||||
.order_by(RawChannelInfo.date_archived.asc())
|
||||
)
|
||||
|
||||
while len(batch) > 0 or offset == 0:
|
||||
logger.info(f"Fetching untransformed info batch of {BATCH_SIZE}, offset {offset}")
|
||||
|
||||
batch = query.slice(offset, offset + BATCH_SIZE).all()
|
||||
offset += BATCH_SIZE
|
||||
|
||||
logger.info(f"Found {len(batch)} info items to ETL ({offset} already processed)")
|
||||
|
||||
self.transform_info(batch)
|
||||
|
||||
@@ -12,11 +12,11 @@ import os
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from cisticola.transformer.base import Transformer
|
||||
from cisticola.base import ScraperResult, Post, Image, Video, Media, Channel
|
||||
from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Image, Video, Media, Channel
|
||||
|
||||
|
||||
class TelegramTelethonTransformer(Transformer):
|
||||
__version__ = 'TelegramTelethonTransformer 0.0.1'
|
||||
__version__ = 'TelegramTelethonTransformer 0.0.2'
|
||||
|
||||
bad_channels = {}
|
||||
|
||||
@@ -88,6 +88,33 @@ class TelegramTelethonTransformer(Transformer):
|
||||
|
||||
return name
|
||||
|
||||
def transform_info(self, data: RawChannelInfo, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
|
||||
raw = json.loads(data.raw_data)
|
||||
|
||||
chat_raw = raw['chats'][0]
|
||||
|
||||
transformed = ChannelInfo(
|
||||
raw_channel_info_id=data.id,
|
||||
channel=data.channel,
|
||||
platform_id=raw['full_chat']['id'],
|
||||
platform=data.platform,
|
||||
scraper=data.scraper,
|
||||
transformer=self.__version__,
|
||||
screenname=chat_raw['username'],
|
||||
name=chat_raw['title'],
|
||||
description=raw['full_chat']['about'],
|
||||
description_url='', # does not exist for Telegram
|
||||
description_location='', # does not exist for Telegram
|
||||
followers=raw['full_chat']['participants_count'],
|
||||
following=-1, # does not exist for Telegram
|
||||
verified=False, #does not exist for Telegram
|
||||
date_created=dateutil.parser.parse(chat_raw['date']),
|
||||
date_archived=data.date_archived,
|
||||
date_transformed=datetime.now(timezone.utc)
|
||||
)
|
||||
|
||||
transformed = insert(transformed)
|
||||
|
||||
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
|
||||
raw = json.loads(data.raw_data)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user