mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-11 04:38:29 +03:00
telethon join channels working
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
# we need to explicitly expose the available imports here
|
||||
from .base_archiver import Archiver, ArchiveResult
|
||||
from .archiver import Archiverv2
|
||||
from .telegram_archiver import TelegramArchiver
|
||||
from .telethon_archiver import TelethonArchiver
|
||||
from .tiktok_archiver import TiktokArchiver
|
||||
@@ -8,4 +9,6 @@ from .youtubedl_archiver import YoutubeDLArchiver
|
||||
from .twitter_archiver import TwitterArchiver
|
||||
from .vk_archiver import VkArchiver
|
||||
from .twitter_api_archiver import TwitterApiArchiver
|
||||
from .instagram_archiver import InstagramArchiver
|
||||
from .instagram_archiver import InstagramArchiver
|
||||
|
||||
from .telethon_archiverv2 import TelethonArchiver
|
||||
26
src/archivers/archiver.py
Normal file
26
src/archivers/archiver.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from __future__ import annotations
|
||||
from abc import abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from metadata import Metadata
|
||||
from steps.step import Step
|
||||
|
||||
|
||||
@dataclass
|
||||
class Archiverv2(Step):
|
||||
name = "archiver"
|
||||
|
||||
def __init__(self, config: dict) -> None:
|
||||
# without this STEP.__init__ is not called
|
||||
super().__init__(config)
|
||||
# self.setup()
|
||||
|
||||
# only for typing...
|
||||
def init(name: str, config: dict) -> Archiverv2:
|
||||
return Step.init(name, config, Archiverv2)
|
||||
|
||||
def setup(self) -> None:
|
||||
# used when archivers need to login or do other one-time setup
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def download(self, item: Metadata) -> Metadata: pass
|
||||
114
src/archivers/telethon_archiverv2.py
Normal file
114
src/archivers/telethon_archiverv2.py
Normal file
@@ -0,0 +1,114 @@
|
||||
from archivers import Archiverv2
|
||||
from metadata import Metadata
|
||||
from telethon.sync import TelegramClient
|
||||
from telethon.errors import ChannelInvalidError
|
||||
from telethon.tl.types import PeerUser, PeerChat, PeerChannel
|
||||
from telethon.tl.functions.messages import ImportChatInviteRequest
|
||||
from telethon.errors.rpcerrorlist import UserAlreadyParticipantError, FloodWaitError, InviteRequestSentError, InviteHashExpiredError
|
||||
from loguru import logger
|
||||
from tqdm import tqdm
|
||||
import re, time, json
|
||||
|
||||
|
||||
|
||||
class TelethonArchiver(Archiverv2):
|
||||
name = "telethon"
|
||||
link_pattern = re.compile(r"https:\/\/t\.me(\/c){0,1}\/(.+)\/(\d+)")
|
||||
invite_pattern = re.compile(r"t.me(\/joinchat){0,1}\/\+?(.+)")
|
||||
|
||||
def __init__(self, config: dict) -> None:
|
||||
super().__init__(config)
|
||||
assert self.api_id is not None and type(self.api_id) == str and len(self.api_id) > 0, f"invalid telethon api_id value ({self.api_id}) should be a valid string"
|
||||
assert self.api_hash is not None and type(self.api_hash) == str and len(self.api_hash) > 0, f"invalid telethon api_hash value ({self.api_hash}) should be a valid string"
|
||||
|
||||
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
|
||||
|
||||
@staticmethod
|
||||
def configs() -> dict:
|
||||
return {
|
||||
"api_id": {"default": None, "help": "telegram API_ID value, go to https://my.telegram.org/apps"},
|
||||
"api_hash": {"default": None, "help": "telegram API_HASH value, go to https://my.telegram.org/apps"},
|
||||
# "bot_token": {"default": None, "help": "optional, but allows access to more content such as large videos, talk to @botfather"},
|
||||
"session_file": {"default": "secrets/anon", "help": "optional, records the telegram login session for future usage"},
|
||||
"channel_invites": {
|
||||
"default": {},
|
||||
"help": "(JSON string) private channel invite links (format: t.me/joinchat/HASH OR t.me/+HASH) and (optional but important to avoid hanging for minutes on startup) channel id (format: CHANNEL_ID taken from a post url like https://t.me/c/CHANNEL_ID/1), the telegram account will join any new channels on setup",
|
||||
"cli_set": lambda cli_val, cur_val: dict(cur_val, **json.loads(cli_val))
|
||||
}
|
||||
}
|
||||
|
||||
def setup(self) -> None:
|
||||
logger.info(f"SETUP {self.name} checking login...")
|
||||
with self.client.start(): pass
|
||||
|
||||
if len(self.channel_invites):
|
||||
logger.info(f"SETUP {self.name} joining channels...")
|
||||
with self.client.start():
|
||||
# get currently joined channels
|
||||
# https://docs.telethon.dev/en/stable/modules/custom.html#module-telethon.tl.custom.dialog
|
||||
joined_channel_ids = [c.id for c in self.client.get_dialogs() if c.is_channel]
|
||||
logger.info(f"already part of {len(joined_channel_ids)} channels")
|
||||
|
||||
i = 0
|
||||
pbar = tqdm(desc=f"joining {len(self.channel_invites)} invite links", total=len(self.channel_invites))
|
||||
while i < len(self.channel_invites):
|
||||
channel_invite = self.channel_invites[i]
|
||||
channel_id = channel_invite.get("id", False)
|
||||
invite = channel_invite["invite"]
|
||||
if (match := self.invite_pattern.search(invite)):
|
||||
try:
|
||||
if channel_id:
|
||||
ent = self.client.get_entity(int(channel_id)) # fails if not a member
|
||||
else:
|
||||
ent = self.client.get_entity(invite) # fails if not a member
|
||||
logger.warning(f"please add the property id='{ent.id}' to the 'channel_invites' configuration where {invite=}, not doing so can lead to a minutes-long setup time due to telegram's rate limiting.")
|
||||
except ValueError as e:
|
||||
logger.info(f"joining new channel {invite=}")
|
||||
try:
|
||||
self.client(ImportChatInviteRequest(match.group(2)))
|
||||
except UserAlreadyParticipantError as e:
|
||||
logger.info(f"already joined {invite=}")
|
||||
except InviteRequestSentError:
|
||||
logger.warning(f"already sent a join request with {invite} still no answer")
|
||||
except InviteHashExpiredError:
|
||||
logger.warning(f"{invite=} has expired please find a more recent one")
|
||||
except Exception as e:
|
||||
logger.error(f"could not join channel with {invite=} due to {e}")
|
||||
except FloodWaitError as e:
|
||||
logger.warning(f"got a flood error, need to wait {e.seconds} seconds")
|
||||
time.sleep(e.seconds)
|
||||
continue
|
||||
else:
|
||||
logger.warning(f"Invalid invite link {invite}")
|
||||
i+=1
|
||||
pbar.update()
|
||||
|
||||
|
||||
def download(self, item: Metadata) -> Metadata:
|
||||
url = self.get_url(item)
|
||||
print(f"downloading {url=}")
|
||||
# detect URLs that we definitely cannot handle
|
||||
match = self.link_pattern.search(url)
|
||||
if not match: return False
|
||||
|
||||
# app will ask (stall for user input!) for phone number and auth code if anon.session not found
|
||||
# TODO: not using bot_token since then private channels cannot be archived
|
||||
# with self.client.start(bot_token=self.bot_token):
|
||||
with self.client.start():
|
||||
# self.client(ImportChatInviteRequest('4kAkN49IKJBhZDk6'))
|
||||
is_private = match.group(1) == "/c"
|
||||
print(f"{is_private=}")
|
||||
chat = int(match.group(2)) if is_private else match.group(2)
|
||||
post_id = int(match.group(3))
|
||||
|
||||
try:
|
||||
post = self.client.get_messages(chat, ids=post_id)
|
||||
except ValueError as e:
|
||||
logger.error(f"Could not fetch telegram {url} possibly it's private: {e}")
|
||||
return False
|
||||
except ChannelInvalidError as e:
|
||||
logger.error(f"Could not fetch telegram {url}. This error can be fixed if you setup a bot_token in addition to api_id and api_hash: {e}")
|
||||
return False
|
||||
|
||||
if post is None: return False
|
||||
print(post)
|
||||
@@ -3,9 +3,9 @@
|
||||
import argparse, yaml
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List
|
||||
from feeders.feeder import Feeder
|
||||
from archivers import Archiverv2
|
||||
from feeders import Feeder
|
||||
from steps.step import Step
|
||||
from utils import Util
|
||||
from enrichers import Enricher
|
||||
from collections import defaultdict
|
||||
|
||||
@@ -16,10 +16,11 @@ class ConfigV2:
|
||||
configurable_parents = [
|
||||
Feeder,
|
||||
Enricher,
|
||||
Archiverv2,
|
||||
# Util
|
||||
]
|
||||
feeder: Step # TODO:= BaseFeeder
|
||||
archivers: List[Step] = field(default_factory=[]) # TODO: fix type
|
||||
archivers: List[Archiverv2] = field(default_factory=[]) # TODO: fix type
|
||||
enrichers: List[Enricher] = field(default_factory=[])
|
||||
formatters: List[Step] = field(default_factory=[]) # TODO: fix type
|
||||
storages: List[Step] = field(default_factory=[]) # TODO: fix type
|
||||
@@ -48,7 +49,7 @@ class ConfigV2:
|
||||
assert "." not in child.name, f"class prop name cannot contain dots('.'): {child.name}"
|
||||
assert "." not in config, f"config property cannot contain dots('.'): {config}"
|
||||
config_path = f"{child.name}.{config}"
|
||||
parser.add_argument(f'--{config_path}', action='store', dest=config_path, help=details['help'])
|
||||
parser.add_argument(f'--{config_path}', action='store', dest=config_path, help=f"{details['help']} (defaults to {details['default']})")
|
||||
self.defaults[config_path] = details["default"]
|
||||
if "cli_set" in details:
|
||||
self.cli_ops[config_path] = details["cli_set"]
|
||||
@@ -82,9 +83,11 @@ class ConfigV2:
|
||||
|
||||
self.feeder = Feeder.init(steps.get("feeder", "cli_feeder"), self.config)
|
||||
self.enrichers = [Enricher.init(e, self.config) for e in steps.get("enrichers", [])]
|
||||
self.archivers = [Archiverv2.init(e, self.config) for e in steps.get("archivers", [])]
|
||||
|
||||
print("feeder", self.feeder)
|
||||
print("enrichers", [e for e in self.enrichers])
|
||||
print("archivers", [e for e in self.archivers])
|
||||
|
||||
def validate(self):
|
||||
pass
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
from utils import Webdriver
|
||||
from . import Enricher
|
||||
from metadata import Metadata
|
||||
from loguru import logger
|
||||
from selenium.common.exceptions import TimeoutException
|
||||
import time
|
||||
|
||||
|
||||
class ScreenshotEnricher(Enricher):
|
||||
@@ -11,43 +14,19 @@ class ScreenshotEnricher(Enricher):
|
||||
return {
|
||||
"width": {"default": 1280, "help": "width of the screenshots"},
|
||||
"height": {"default": 720, "help": "height of the screenshots"},
|
||||
"timeout": {"default": 60, "help": "timeout for taking the screenshot"}
|
||||
}
|
||||
|
||||
def enrich(self, item: Metadata) -> Metadata:
|
||||
url = self.get_url(item)
|
||||
print("enrich")
|
||||
# driver = config.webdriver
|
||||
# with driver as Webdriver(): # TODO: make a util
|
||||
# #TODO: take screenshot
|
||||
# pass
|
||||
print(f"enriching {url=}")
|
||||
with Webdriver(self.width, self.height, self.timeout, 'facebook.com' in url) as driver: # TODO: make a util
|
||||
try:
|
||||
driver.get(url)
|
||||
time.sleep(2)
|
||||
except TimeoutException:
|
||||
logger.info("TimeoutException loading page for screenshot")
|
||||
|
||||
# logger.debug(f"getting screenshot for {url=}")
|
||||
# key = self._get_key_from_url(url, ".png", append_datetime=True)
|
||||
# filename = os.path.join(Storage.TMP_FOLDER, key)
|
||||
|
||||
# # Accept cookies popup dismiss for ytdlp video
|
||||
# if 'facebook.com' in url:
|
||||
# try:
|
||||
# logger.debug(f'Trying fb click accept cookie popup for {url}')
|
||||
# self.driver.get("http://www.facebook.com")
|
||||
# foo = self.driver.find_element(By.XPATH, "//button[@data-cookiebanner='accept_only_essential_button']")
|
||||
# foo.click()
|
||||
# logger.debug(f'fb click worked')
|
||||
# # linux server needs a sleep otherwise facebook cookie won't have worked and we'll get a popup on next page
|
||||
# time.sleep(2)
|
||||
# except:
|
||||
# logger.warning(f'Failed on fb accept cookies for url {url}')
|
||||
|
||||
# try:
|
||||
# self.driver.get(url)
|
||||
# time.sleep(6)
|
||||
# except TimeoutException:
|
||||
# logger.info("TimeoutException loading page for screenshot")
|
||||
|
||||
# self.driver.save_screenshot(filename)
|
||||
# self.storage.upload(filename, key, extra_args={'ACL': 'public-read', 'ContentType': 'image/png'})
|
||||
|
||||
# cdn_url = self.storage.get_cdn_url(key)
|
||||
# self.add_to_media(cdn_url, key)
|
||||
|
||||
# return cdn_url
|
||||
#TODO: return saved object
|
||||
driver.save_screenshot("TODO-HASH_OR_UUID.png")
|
||||
return None
|
||||
|
||||
@@ -4,7 +4,7 @@ import json, gspread
|
||||
from loguru import logger
|
||||
|
||||
# from . import Enricher
|
||||
from feeders.feeder import Feeder
|
||||
from feeders import Feeder
|
||||
from steps.gsheet import Gsheets
|
||||
from utils import GWorksheet
|
||||
|
||||
@@ -30,7 +30,7 @@ class GsheetsFeeder(Gsheets, Feeder):
|
||||
},
|
||||
"block_worksheets": {
|
||||
"default": set(),
|
||||
"help": "(CSV) explicitly block some worksheets from being processed, defaults to empty",
|
||||
"help": "(CSV) explicitly block some worksheets from being processed",
|
||||
"cli_set": lambda cli_val, cur_val: set(cli_val.split(","))
|
||||
}
|
||||
})
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
|
||||
from __future__ import annotations
|
||||
from typing import Union, Dict
|
||||
from typing import Any, Union, Dict
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@@ -12,18 +12,28 @@ class Metadata:
|
||||
# title: str
|
||||
# url: str
|
||||
# hash: str
|
||||
metadata: Dict[str, Metadata]
|
||||
metadata: Dict[str, Any]
|
||||
|
||||
@staticmethod
|
||||
def merge(left: Metadata, right: Metadata, overwrite_left=True) -> Metadata:
|
||||
# TODO: remove and use default?
|
||||
def __init__(self) -> None:
|
||||
self.status = ""
|
||||
self.metadata = {}
|
||||
|
||||
# @staticmethod
|
||||
def merge(self: Metadata, right: Metadata, overwrite_left=True) -> Metadata:
|
||||
# should return a merged version of the Metadata
|
||||
# will work for archived() and enriched()
|
||||
# what if 2 metadatas contain the same keys? only one can remain! : overwrite_left
|
||||
pass
|
||||
|
||||
def get(self, key: str) -> Union[Metadata, str]:
|
||||
# TODO: setters?
|
||||
def set(self, key: str, val: Any) -> Union[Metadata, str]:
|
||||
# goes through metadata and returns the Metadata available
|
||||
pass
|
||||
self.metadata[key] = val
|
||||
|
||||
def get(self, key: str, default: Any = None) -> Union[Metadata, str]:
|
||||
# goes through metadata and returns the Metadata available
|
||||
return self.metadata.get(key, default)
|
||||
|
||||
def as_json(self) -> str:
|
||||
# converts all metadata and data into JSON
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
from __future__ import annotations
|
||||
from ast import List
|
||||
from typing import Union, Dict
|
||||
from dataclasses import dataclass
|
||||
from archivers.archiver import Archiverv2
|
||||
|
||||
from enrichers.enricher import Enricher
|
||||
from metadata import Metadata
|
||||
|
||||
"""
|
||||
how not to couple the different pieces of logic
|
||||
@@ -108,12 +111,13 @@ Once an archiver returns a link to a local file (for eg to a storage), how do we
|
||||
The context metadata should include a temporary folder (maybe a LocalStorage instance?)
|
||||
"""
|
||||
|
||||
|
||||
class ArchivingOrchestrator:
|
||||
def __init__(self, config) -> None:
|
||||
# in config.py we should test that the archivers exist and log mismatches (blocking execution)
|
||||
# identify each formatter, storage, database, etc
|
||||
# self.feeder = Feeder.init(config.feeder, config.get(config.feeder))
|
||||
|
||||
|
||||
# Is it possible to overwrite config.yaml values? it could be useful: share config file and modify gsheets_feeder.sheet via CLI
|
||||
# where does that update/processing happen? in config.py
|
||||
# reflection for Archiver to know wihch child classes it has? use Archiver.__subclasses__
|
||||
@@ -123,7 +127,13 @@ class ArchivingOrchestrator:
|
||||
# ]
|
||||
self.feeder = config.feeder
|
||||
self.enrichers = config.enrichers
|
||||
self.archivers: List[Archiverv2] = config.archivers
|
||||
|
||||
for a in self.archivers: a.setup()
|
||||
|
||||
self.formatters = []
|
||||
self.storages = []
|
||||
self.databases = []
|
||||
# self.formatters = [
|
||||
# Formatter.init(f, config)
|
||||
# for f in config.formatters
|
||||
@@ -145,30 +155,33 @@ class ArchivingOrchestrator:
|
||||
def feed(self) -> list(ArchiveResult):
|
||||
for url in self.feeder:
|
||||
print("ARCHIVING", url)
|
||||
# self.archive(url)
|
||||
self.archive(url)
|
||||
# how does this handle the parameters like folder which can be different for each archiver?
|
||||
# the storage needs to know where to archive!!
|
||||
# solution: feeders have context: extra metadata that they can read or ignore,
|
||||
# solution: feeders have context: extra metadata that they can read or ignore,
|
||||
# all of it should have sensible defaults (eg: folder)
|
||||
# default feeder is a list with 1 element
|
||||
|
||||
def archive(self, url) -> Union[ArchiveResult, None]:
|
||||
url = clear_url(url)
|
||||
result = Metadata(url=url)
|
||||
|
||||
# TODO:
|
||||
# url = clear_url(url)
|
||||
# result = Metadata(url=url)
|
||||
result = Metadata()
|
||||
result.set("url", url)
|
||||
|
||||
should_archive = True
|
||||
for d in databases: should_archive &= d.should_process(url)
|
||||
for d in self.databases: should_archive &= d.should_process(url)
|
||||
# should storages also be able to check?
|
||||
for s in storages: should_archive &= s.should_process(url)
|
||||
for s in self.storages: should_archive &= s.should_process(url)
|
||||
|
||||
if not should_archive:
|
||||
print("skipping")
|
||||
return "skipping"
|
||||
|
||||
# signal to DB that archiving has started
|
||||
for d in databases:
|
||||
for d in self.databases:
|
||||
# are the databases to decide whether to archive?
|
||||
# they can simply return True by default, otherwise they can avoid duplicates. should this logic be more granular, for example on the archiver level: a tweet will not need be scraped twice, whereas an instagram profile might. the archiver could not decide from the link which parts to archive,
|
||||
# they can simply return True by default, otherwise they can avoid duplicates. should this logic be more granular, for example on the archiver level: a tweet will not need be scraped twice, whereas an instagram profile might. the archiver could not decide from the link which parts to archive,
|
||||
# instagram profile example: it would always re-archive everything
|
||||
# maybe the database/storage could use a hash/key to decide if there's a need to re-archive
|
||||
if d.should_process(url):
|
||||
@@ -180,15 +193,15 @@ class ArchivingOrchestrator:
|
||||
return
|
||||
|
||||
# vk, telethon, ...
|
||||
for a in archivers:
|
||||
for a in self.archivers:
|
||||
# with automatic try/catch in download + archived (+ the other ops below)
|
||||
# should the archivers come with the config already? are there configs which change at runtime?
|
||||
# should the archivers come with the config already? are there configs which change at runtime?
|
||||
# think not, so no need to pass config as parameter
|
||||
# do they need to be refreshed with every execution?
|
||||
# do they need to be refreshed with every execution?
|
||||
# this is where the Hashes come from, the place with access to all content
|
||||
# the archiver does not have access to storage
|
||||
result.update(a.download(url))
|
||||
if result.is_success(): break
|
||||
result.merge(a.download(result))
|
||||
if True or result.is_success(): break
|
||||
|
||||
# what if an archiver returns multiple entries and one is to be part of HTMLgenerator?
|
||||
# should it call the HTMLgenerator as if it's not an enrichment?
|
||||
@@ -196,20 +209,20 @@ class ArchivingOrchestrator:
|
||||
# then how to execute it last? should there also be post-processors? are there other examples?
|
||||
# maybe as a PDF? or a Markdown file
|
||||
# side captures: screenshot, wacz, webarchive, thumbnails, HTMLgenerator
|
||||
for e in enrichers:
|
||||
result.update(e.enrich(result))
|
||||
for e in self.enrichers:
|
||||
result.merge(e.enrich(result))
|
||||
|
||||
# formatters, enrichers, and storages will sometimes look for specific properties: eg <li>Screenshot: <img src="{res.get("screenshot")}"> </li>
|
||||
for p in formatter:
|
||||
result.update(p.process(result))
|
||||
for f in self.formatters:
|
||||
result.merge(f.format(result))
|
||||
|
||||
# storages
|
||||
for s in storages:
|
||||
for s in self.storages:
|
||||
for m in result.media:
|
||||
m.update(s.store(m))
|
||||
m.merge(s.store(m))
|
||||
|
||||
# signal completion to databases (DBs, Google Sheets, CSV, ...)
|
||||
# a hash registration service could be one database: forensic archiving
|
||||
for d in databases: d.done( result)
|
||||
for d in self.databases: d.done(result)
|
||||
|
||||
return result
|
||||
return result
|
||||
|
||||
@@ -36,7 +36,7 @@ class Gsheets(Step):
|
||||
'wacz': 'wacz',
|
||||
'replaywebpage': 'replaywebpage',
|
||||
},
|
||||
"help": "names of columns in the google sheet",
|
||||
"help": "names of columns in the google sheet (stringified JSON object)",
|
||||
"cli_set": lambda cli_val, cur_val: dict(cur_val, **json.loads(cli_val))
|
||||
},
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
# we need to explicitly expose the available imports here
|
||||
from .gworksheet import GWorksheet
|
||||
from .misc import *
|
||||
from .util import Util
|
||||
from .util import Util
|
||||
from .webdriver import Webdriver
|
||||
@@ -1,11 +1,12 @@
|
||||
from __future__ import annotations
|
||||
from abc import abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from abc import abstractmethod, ABC
|
||||
from metadata import Metadata
|
||||
from steps.step import Step
|
||||
|
||||
#TODO: likely unused
|
||||
@dataclass
|
||||
class Util(Step, ABC):
|
||||
class Util(Step):
|
||||
name = "util"
|
||||
|
||||
def __init__(self, config: dict) -> None:
|
||||
|
||||
45
src/utils/webdriver.py
Normal file
45
src/utils/webdriver.py
Normal file
@@ -0,0 +1,45 @@
|
||||
from __future__ import annotations
|
||||
from selenium import webdriver
|
||||
from selenium.common.exceptions import TimeoutException
|
||||
from loguru import logger
|
||||
from selenium.webdriver.common.by import By
|
||||
import time
|
||||
|
||||
|
||||
class Webdriver:
|
||||
def __init__(self, width: int, height: int, timeout_seconds: int, facebook_accept_cookies: bool = False) -> webdriver:
|
||||
self.width = width
|
||||
self.height = height
|
||||
self.timeout_seconds = timeout_seconds
|
||||
self.facebook_accept_cookies = facebook_accept_cookies
|
||||
|
||||
def __enter__(self) -> webdriver:
|
||||
options = webdriver.FirefoxOptions()
|
||||
options.headless = True
|
||||
options.set_preference('network.protocol-handler.external.tg', False)
|
||||
try:
|
||||
self.driver = webdriver.Firefox(options=options)
|
||||
self.driver.set_window_size(self.width, self.height)
|
||||
self.driver.set_page_load_timeout(self.timeout_seconds)
|
||||
except TimeoutException as e:
|
||||
logger.error(f"failed to get new webdriver, possibly due to insufficient system resources or timeout settings: {e}")
|
||||
|
||||
if self.facebook_accept_cookies:
|
||||
try:
|
||||
logger.debug(f'Trying fb click accept cookie popup.')
|
||||
self.driver.get("http://www.facebook.com")
|
||||
foo = self.driver.find_element(By.XPATH, "//button[@data-cookiebanner='accept_only_essential_button']")
|
||||
foo.click()
|
||||
logger.debug(f'fb click worked')
|
||||
# linux server needs a sleep otherwise facebook cookie won't have worked and we'll get a popup on next page
|
||||
time.sleep(2)
|
||||
except:
|
||||
logger.warning(f'Failed on fb accept cookies.')
|
||||
|
||||
return self.driver
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.driver.close()
|
||||
self.driver.quit()
|
||||
del self.driver
|
||||
return True
|
||||
Reference in New Issue
Block a user