Archive media in Twitter scraper

This commit is contained in:
Logan Williams
2022-02-24 18:48:48 +01:00
parent 214287b7a8
commit e64d845002
3 changed files with 43 additions and 10 deletions

2
Pipfile.lock generated
View File

@@ -509,7 +509,7 @@
}, },
"snscrape": { "snscrape": {
"git": "https://github.com/bellingcat/snscrape.git", "git": "https://github.com/bellingcat/snscrape.git",
"ref": "72b26f2373f3fecf53bdf9c62d7408df3d15a329" "ref": "de4ebed81f3f6a4bb4c65630daab6ec63784959b"
}, },
"soupsieve": { "soupsieve": {
"hashes": [ "hashes": [

View File

@@ -1,8 +1,9 @@
import cisticola.base import cisticola.base
import cisticola.scraper.base import cisticola.scraper.base
from datetime import datetime from datetime import datetime, timezone
from typing import List from typing import List
import snscrape.modules import snscrape.modules
from loguru import logger
class TwitterScraper(cisticola.scraper.base.Scraper): class TwitterScraper(cisticola.scraper.base.Scraper):
@@ -20,13 +21,43 @@ class TwitterScraper(cisticola.scraper.base.Scraper):
def get_posts(self, channel: cisticola.base.Channel, since: cisticola.base.ScraperResult = None) -> List[cisticola.base.ScraperResult]: def get_posts(self, channel: cisticola.base.Channel, since: cisticola.base.ScraperResult = None) -> List[cisticola.base.ScraperResult]:
posts = [] posts = []
scraper = snscrape.modules.twitter.TwitterUserScraper( scraper = snscrape.modules.twitter.TwitterProfileScraper(
TwitterScraper.get_username_from_url(channel.url)) TwitterScraper.get_username_from_url(channel.url))
first = True
for tweet in scraper.get_items(): for tweet in scraper.get_items():
if since is not None and tweet.date.timestamp() <= since.date_archived.timestamp(): if len(posts) >= 10:
break break
if since is not None and tweet.date.replace(tzinfo=timezone.utc) <= since.date_archived.replace(tzinfo=timezone.utc):
# with TwitterProfileScraper, the first tweet could be an old pinned tweet
if first:
first = False
continue
else:
break
archived_urls = {}
if tweet.media:
for media in tweet.media:
if type(media) == snscrape.modules.twitter.Video:
variant = max(
[v for v in media.variants if v.bitrate], key=lambda v: v.bitrate)
url = variant.url
elif type(media) == snscrape.modules.twitter.Gif:
url = media.variants[0].url
elif type(media) == snscrape.modules.twitter.Photo:
url = media.fullUrl
else:
logger.warning(f"Could not get media URL of {media}")
url = None
if url is not None:
archived_url = self.archive_media(url)
archived_urls[url] = archived_url
posts.append(cisticola.base.ScraperResult( posts.append(cisticola.base.ScraperResult(
scraper=self.__version__, scraper=self.__version__,
platform="Twitter", platform="Twitter",
@@ -34,7 +65,8 @@ class TwitterScraper(cisticola.scraper.base.Scraper):
platform_id=tweet.id, platform_id=tweet.id,
date=tweet.date, date=tweet.date,
date_archived=datetime.now(), date_archived=datetime.now(),
raw_data=tweet.json())) raw_data=tweet.json(),
archived_urls=archived_urls))
return posts return posts

11
test.py
View File

@@ -1,5 +1,6 @@
import cisticola import cisticola
import cisticola.scraper.telegram_snscrape import cisticola.scraper.telegram_snscrape
import cisticola.scraper.twitter
from sqlalchemy import create_engine from sqlalchemy import create_engine
@@ -25,13 +26,13 @@ test_channels = [cisticola.base.Channel(id=0, name="Logan Williams (test)", plat
controller = cisticola.ScraperController() controller = cisticola.ScraperController()
# scraper = cisticola.scraper.twitter.TwitterScraper() twitter = cisticola.scraper.twitter.TwitterScraper()
# controller.register_scraper(scraper) controller.register_scraper(twitter)
scraper = cisticola.scraper.telegram_snscrape.TelegramSnscrapeScraper() telegram = cisticola.scraper.telegram_snscrape.TelegramSnscrapeScraper()
controller.register_scraper(scraper) controller.register_scraper(telegram)
engine = create_engine('sqlite:///test4.db') engine = create_engine('sqlite:///test.db')
controller.connect_to_db(engine) controller.connect_to_db(engine)
controller.scrape_channels(test_channels) controller.scrape_channels(test_channels)