mirror of
https://github.com/bellingcat/cisticola.git
synced 2026-06-08 03:18:34 +03:00
Merge pull request #57 from bellingcat/synchronization-improvements
Google sheet channel synchronization improvements
This commit is contained in:
16
Pipfile
16
Pipfile
@@ -24,6 +24,10 @@ ratelimit = "*"
|
||||
pytz = "*"
|
||||
langdetect = "*"
|
||||
spacy = "==3.2.4"
|
||||
ocrd-pyexiftool = "*"
|
||||
gabber = {git = "https://github.com/stanfordio/gabber.git"}
|
||||
snscrape = {git = "https://github.com/bellingcat/snscrape"}
|
||||
polyphemus = {git = "https://github.com/bellingcat/polyphemus"}
|
||||
|
||||
[dev-packages]
|
||||
pytest = "*"
|
||||
@@ -39,15 +43,3 @@ python_version = "3.9"
|
||||
|
||||
[pipenv]
|
||||
allow_prereleases = true
|
||||
|
||||
[packages.polyphemus]
|
||||
git = "https://github.com/bellingcat/polyphemus.git"
|
||||
|
||||
[packages.PyExifTool]
|
||||
git = "https://github.com/smarnach/pyexiftool.git"
|
||||
|
||||
[packages.gabber]
|
||||
git = "https://github.com/stanfordio/gabber.git"
|
||||
|
||||
[packages.snscrape]
|
||||
git = "https://github.com/bellingcat/snscrape"
|
||||
|
||||
1051
Pipfile.lock
generated
1051
Pipfile.lock
generated
File diff suppressed because it is too large
Load Diff
115
app.py
115
app.py
@@ -1,13 +1,11 @@
|
||||
import argparse
|
||||
from loguru import logger
|
||||
import gspread
|
||||
from sqlalchemy import create_engine, func
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
import os
|
||||
import time
|
||||
import sys
|
||||
|
||||
from cisticola.base import Channel, mapper_registry
|
||||
from cisticola.base import mapper_registry
|
||||
from cisticola.scraper import (
|
||||
ScraperController,
|
||||
VkontakteScraper,
|
||||
@@ -17,112 +15,7 @@ from cisticola.scraper import (
|
||||
RumbleScraper,
|
||||
)
|
||||
from cisticola.transformer import (ETLController, TelegramTelethonTransformer)
|
||||
|
||||
|
||||
def sync_channels(args):
|
||||
logger.info("Synchronizing channels")
|
||||
|
||||
session = get_db_session()
|
||||
|
||||
gc = gspread.service_account(filename="service_account.json")
|
||||
|
||||
# Open a sheet from a spreadsheet in one go
|
||||
wks = gc.open_by_url(args.gsheet).worksheet("channels")
|
||||
channels = wks.get_all_records()
|
||||
row = 2
|
||||
|
||||
for c in channels:
|
||||
if c["public"] == "":
|
||||
c["public"] = False
|
||||
if c["chat"] == "":
|
||||
c["chat"] = False
|
||||
|
||||
for k in c.keys():
|
||||
if c[k] == "TRUE" or c[k] == "yes":
|
||||
c[k] = True
|
||||
if c[k] == "FALSE" or c[k] == "no":
|
||||
c[k] = False
|
||||
|
||||
if c[k] == "":
|
||||
c[k] = None
|
||||
|
||||
del c["followers"]
|
||||
|
||||
# add new channel
|
||||
if c["id"] == "" or c["id"] is None:
|
||||
del c["id"]
|
||||
|
||||
# check to see if this already exists,
|
||||
platform_id = None
|
||||
if c["platform_id"] != "":
|
||||
platform_id = c["platform_id"]
|
||||
|
||||
channel = (
|
||||
session.query(Channel)
|
||||
.filter_by(
|
||||
platform_id=str(platform_id), platform=str(c["platform"])
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if not channel:
|
||||
channel = session.query(Channel).filter_by(platform=str(c["platform"]), url=str(c["url"])).first()
|
||||
|
||||
if not channel and c["screenname"] != '' and c["screenname"] is not None:
|
||||
channel = session.query(Channel).filter_by(platform=str(c["platform"]), screenname=str(c["screenname"])).first()
|
||||
|
||||
if not channel:
|
||||
channel = Channel(**c, source="researcher")
|
||||
logger.debug(f"{channel} does not exist, adding")
|
||||
session.add(channel)
|
||||
session.flush()
|
||||
session.commit()
|
||||
|
||||
wks.update_cell(row, 1, channel.id)
|
||||
time.sleep(1)
|
||||
else:
|
||||
logger.info(f"Channel found, updating channel {channel}")
|
||||
channel.name = c["name"]
|
||||
channel.category = c["category"]
|
||||
channel.platform = c["platform"]
|
||||
channel.url = c["url"]
|
||||
channel.screenname = c["screenname"]
|
||||
channel.country = c["country"]
|
||||
channel.influencer = c["influencer"]
|
||||
channel.public = c["public"]
|
||||
channel.chat = c["chat"]
|
||||
channel.notes = c["notes"]
|
||||
channel.source = "researcher"
|
||||
|
||||
session.flush()
|
||||
session.commit()
|
||||
|
||||
wks.update_cell(row, 1, channel.id)
|
||||
time.sleep(1)
|
||||
|
||||
else:
|
||||
channel = session.query(Channel).filter_by(id=int(c["id"])).first()
|
||||
|
||||
logger.info(f"Updating channel {channel}")
|
||||
channel.name = c["name"]
|
||||
channel.category = c["category"]
|
||||
channel.platform = c["platform"]
|
||||
channel.url = c["url"]
|
||||
channel.screenname = c["screenname"]
|
||||
channel.country = c["country"]
|
||||
channel.influencer = c["influencer"]
|
||||
channel.public = c["public"]
|
||||
channel.chat = c["chat"]
|
||||
channel.notes = c["notes"]
|
||||
channel.source = "researcher"
|
||||
|
||||
session.flush()
|
||||
session.commit()
|
||||
|
||||
row += 1
|
||||
|
||||
session.commit()
|
||||
|
||||
from sync_with_gsheet import sync_channels
|
||||
|
||||
def get_db_session():
|
||||
engine = create_engine(os.environ["DB"])
|
||||
@@ -223,7 +116,7 @@ if __name__ == "__main__":
|
||||
init_db()
|
||||
elif args.command == "sync-channels":
|
||||
logger.add("logs/sync-channels.log", level="TRACE", rotation="100 MB")
|
||||
sync_channels(args)
|
||||
sync_channels(args, get_db_session())
|
||||
elif args.command == "scrape-channels":
|
||||
logger.add("logs/scrape-channels.log", level="TRACE", rotation="100 MB")
|
||||
scrape_channels(args)
|
||||
|
||||
@@ -396,6 +396,11 @@ channel_info_table = Table('channel_info', mapper_registry.metadata,
|
||||
Column('id', Integer, primary_key=True, autoincrement=True),
|
||||
Column('raw_channel_info_id', Integer, ForeignKey('raw_channel_info.id'), index=True),
|
||||
Column('channel', Integer, ForeignKey('channels.id'), index=True),
|
||||
Column('platform_id', String),
|
||||
Column('scraper', String),
|
||||
Column('platform', String),
|
||||
Column('transformer', String),
|
||||
Column('platform', String),
|
||||
Column('screenname', String),
|
||||
Column('name', String),
|
||||
Column('description', String),
|
||||
@@ -406,7 +411,7 @@ channel_info_table = Table('channel_info', mapper_registry.metadata,
|
||||
Column('verified', Boolean),
|
||||
Column('date_created', DateTime),
|
||||
Column('date_archived', DateTime, index=True),
|
||||
Column('date_transformed', DateTime, index=True)
|
||||
Column('date_transformed', DateTime, index=True),
|
||||
)
|
||||
|
||||
channel_table = Table('channels', mapper_registry.metadata,
|
||||
|
||||
@@ -20,12 +20,24 @@ class TelegramTelethonScraper(Scraper):
|
||||
"""An implementation of a Scraper for Telegram, using Telethon library"""
|
||||
__version__ = "TelegramTelethonScraper 0.0.2"
|
||||
|
||||
def get_username_from_url(self, url):
|
||||
def get_username_from_url(url):
|
||||
username = url.split('https://t.me/')[1]
|
||||
if username.startswith('s/'):
|
||||
username = username.split('s/')[1]
|
||||
return username
|
||||
|
||||
def get_channel_identifier(channel: Channel):
|
||||
identifier = channel.platform_id
|
||||
|
||||
if identifier is None:
|
||||
identifier = channel.screenname
|
||||
if identifier is None:
|
||||
identifier = TelegramTelethonScraper.get_username_from_url(channel.url)
|
||||
else:
|
||||
identifier = int(identifier)
|
||||
|
||||
return identifier
|
||||
|
||||
@logger.catch
|
||||
def archive_files(self, result: ScraperResult, client : TelegramClient = None) -> ScraperResult:
|
||||
if len(result.archived_urls.keys()) == 0:
|
||||
@@ -108,14 +120,12 @@ class TelegramTelethonScraper(Scraper):
|
||||
return (blob, output_file_with_ext)
|
||||
|
||||
def can_handle(self, channel):
|
||||
if channel.platform == "Telegram" and channel.public:
|
||||
if channel.platform == "Telegram":
|
||||
return True
|
||||
|
||||
@logger.catch
|
||||
def get_posts(self, channel: Channel, since: ScraperResult = None, archive_media: bool = True) -> Generator[ScraperResult, None, None]:
|
||||
username = channel.screenname
|
||||
if username is None:
|
||||
username = self.get_username_from_url(channel.url)
|
||||
username = TelegramTelethonScraper.get_channel_identifier(channel)
|
||||
|
||||
api_id = os.environ['TELEGRAM_API_ID']
|
||||
api_hash = os.environ['TELEGRAM_API_HASH']
|
||||
@@ -156,9 +166,7 @@ class TelegramTelethonScraper(Scraper):
|
||||
|
||||
@logger.catch
|
||||
def get_profile(self, channel: Channel) -> RawChannelInfo:
|
||||
username = channel.screenname
|
||||
if username is None:
|
||||
username = self.get_username_from_url(channel.url)
|
||||
username = TelegramTelethonScraper.get_channel_identifier(channel)
|
||||
|
||||
api_id = os.environ['TELEGRAM_API_ID']
|
||||
api_hash = os.environ['TELEGRAM_API_HASH']
|
||||
|
||||
9
spacy_setup.sh
Normal file
9
spacy_setup.sh
Normal file
@@ -0,0 +1,9 @@
|
||||
#!/bin/bash
|
||||
|
||||
pipenv run python -m spacy download xx_ent_wiki_sm
|
||||
pipenv run python -m spacy download fr_core_news_sm
|
||||
pipenv run python -m spacy download de_core_news_sm
|
||||
pipenv run python -m spacy download nl_core_news_sm
|
||||
pipenv run python -m spacy download it_core_news_sm
|
||||
pipenv run python -m spacy download ru_core_news_sm
|
||||
pipenv run python -m spacy download en_core_web_sm
|
||||
134
sync_with_gsheet.py
Normal file
134
sync_with_gsheet.py
Normal file
@@ -0,0 +1,134 @@
|
||||
import gspread
|
||||
import time
|
||||
from loguru import logger
|
||||
|
||||
from cisticola.base import Channel, ChannelInfo
|
||||
|
||||
def sync_channels(args, session):
|
||||
logger.info("Synchronizing channels")
|
||||
|
||||
gc = gspread.service_account(filename="service_account.json")
|
||||
|
||||
# Open a sheet from a spreadsheet in one go
|
||||
wks = gc.open_by_url(args.gsheet).worksheet("channels")
|
||||
channels = wks.get_all_records()
|
||||
row = 2
|
||||
|
||||
for c in channels:
|
||||
# defaults for unset values
|
||||
if c["public"] == "":
|
||||
c["public"] = True
|
||||
if c["chat"] == "":
|
||||
c["chat"] = False
|
||||
|
||||
# normalize the values slightly from the Google Sheet
|
||||
for k in c.keys():
|
||||
if c[k] == "TRUE" or c[k] == "yes":
|
||||
c[k] = True
|
||||
if c[k] == "FALSE" or c[k] == "no":
|
||||
c[k] = False
|
||||
|
||||
if c[k] == "":
|
||||
c[k] = None
|
||||
|
||||
# add new channel
|
||||
if c["id"] == "" or c["id"] is None:
|
||||
del c["id"]
|
||||
|
||||
# check to see if this already exists,
|
||||
platform_id = None
|
||||
if c["platform_id"] != "":
|
||||
platform_id = c["platform_id"]
|
||||
|
||||
channel = (
|
||||
session.query(Channel)
|
||||
.filter_by(
|
||||
platform_id=str(platform_id), platform=str(c["platform"])
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if not channel:
|
||||
channel = session.query(Channel).filter_by(platform=str(c["platform"]), url=str(c["url"])).first()
|
||||
|
||||
if not channel and c["screenname"] != '' and c["screenname"] is not None:
|
||||
channel = session.query(Channel).filter_by(platform=str(c["platform"]), screenname=str(c["screenname"])).first()
|
||||
|
||||
if not channel:
|
||||
channel = Channel(**c, source="researcher")
|
||||
logger.debug(f"{channel} does not exist, adding")
|
||||
session.add(channel)
|
||||
session.flush()
|
||||
session.commit()
|
||||
|
||||
wks.update_cell(row, 1, channel.id)
|
||||
time.sleep(1)
|
||||
else:
|
||||
logger.info(f"Channel found, updating channel {channel}")
|
||||
was_researcher = channel.source == "researcher"
|
||||
|
||||
channel.name = c["name"]
|
||||
channel.category = c["category"]
|
||||
channel.platform = c["platform"]
|
||||
channel.url = c["url"]
|
||||
channel.screenname = c["screenname"]
|
||||
channel.country = c["country"]
|
||||
channel.influencer = c["influencer"]
|
||||
channel.public = c["public"]
|
||||
channel.chat = c["chat"]
|
||||
channel.notes = c["notes"]
|
||||
channel.source = "researcher"
|
||||
|
||||
session.flush()
|
||||
session.commit()
|
||||
|
||||
wks.update_cell(row, 1, channel.id)
|
||||
|
||||
# this likely means that the channel was duplicated in the Google Sheet, so add a red highlight
|
||||
if was_researcher:
|
||||
logger.warning(f"This channel (ID {channel.id}) is possibly a duplicate.")
|
||||
|
||||
wks.format(f"A{str(row)}:A{str(row)}", {
|
||||
"backgroundColor": {
|
||||
"red": 1.0,
|
||||
"green": 0.0,
|
||||
"blue": 0.0
|
||||
}})
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
# channel has ID
|
||||
else:
|
||||
cid = int(c["id"])
|
||||
|
||||
channel = session.query(Channel).filter_by(id=cid).first()
|
||||
channel_info = session.query(ChannelInfo).filter_by(channel=cid).order_by(ChannelInfo.date_archived.desc()).first()
|
||||
|
||||
logger.info(f"Updating channel {channel}")
|
||||
logger.info(channel_info)
|
||||
|
||||
channel.name = c["name"]
|
||||
channel.category = c["category"]
|
||||
channel.platform = c["platform"]
|
||||
channel.url = c["url"]
|
||||
channel.screenname = c["screenname"]
|
||||
channel.country = c["country"]
|
||||
channel.influencer = c["influencer"]
|
||||
channel.public = c["public"]
|
||||
channel.chat = c["chat"]
|
||||
channel.notes = c["notes"]
|
||||
channel.source = "researcher"
|
||||
|
||||
if channel_info:
|
||||
channel.screenname = channel_info.screenname
|
||||
wks.update_cell(row, 7, channel_info.screenname)
|
||||
|
||||
channel.platform_id = channel_info.platform_id
|
||||
wks.update_cell(row, 3, channel_info.platform_id)
|
||||
|
||||
session.flush()
|
||||
session.commit()
|
||||
|
||||
row += 1
|
||||
|
||||
session.commit()
|
||||
Reference in New Issue
Block a user