mirror of
https://github.com/bellingcat/snscrape.git
synced 2026-06-13 04:48:28 +03:00
@@ -17,6 +17,7 @@ import urllib.parse
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
_API_AUTHORIZATION_HEADER = 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA'
|
_API_AUTHORIZATION_HEADER = 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA'
|
||||||
|
_globalGuestTokenManager = None
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
@dataclasses.dataclass
|
||||||
@@ -169,11 +170,39 @@ class ScrollDirection(enum.Enum):
|
|||||||
BOTH = enum.auto()
|
BOTH = enum.auto()
|
||||||
|
|
||||||
|
|
||||||
|
class GuestTokenManager:
|
||||||
|
def __init__(self):
|
||||||
|
self._token = None
|
||||||
|
self._setTime = 0.0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def token(self):
|
||||||
|
return self._token
|
||||||
|
|
||||||
|
@token.setter
|
||||||
|
def token(self, token):
|
||||||
|
self._token = token
|
||||||
|
self._setTime = time.time()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def setTime(self):
|
||||||
|
return self._setTime
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
self._token = None
|
||||||
|
self._setTime = 0.0
|
||||||
|
|
||||||
|
|
||||||
class TwitterAPIScraper(snscrape.base.Scraper):
|
class TwitterAPIScraper(snscrape.base.Scraper):
|
||||||
def __init__(self, baseUrl, **kwargs):
|
def __init__(self, baseUrl, guestTokenManager = None, **kwargs):
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
self._baseUrl = baseUrl
|
self._baseUrl = baseUrl
|
||||||
self._guestToken = None
|
if guestTokenManager is None:
|
||||||
|
global _globalGuestTokenManager
|
||||||
|
if _globalGuestTokenManager is None:
|
||||||
|
_globalGuestTokenManager = GuestTokenManager()
|
||||||
|
guestTokenManager = _globalGuestTokenManager
|
||||||
|
self._guestTokenManager = guestTokenManager
|
||||||
self._userAgent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)}'
|
self._userAgent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)}'
|
||||||
self._apiHeaders = {
|
self._apiHeaders = {
|
||||||
'User-Agent': self._userAgent,
|
'User-Agent': self._userAgent,
|
||||||
@@ -183,24 +212,23 @@ class TwitterAPIScraper(snscrape.base.Scraper):
|
|||||||
}
|
}
|
||||||
|
|
||||||
def _ensure_guest_token(self, url = None):
|
def _ensure_guest_token(self, url = None):
|
||||||
if self._guestToken is not None:
|
if self._guestTokenManager.token is None:
|
||||||
return
|
logger.info('Retrieving guest token')
|
||||||
logger.info('Retrieving guest token')
|
r = self._get(self._baseUrl if url is None else url, headers = {'User-Agent': self._userAgent})
|
||||||
r = self._get(self._baseUrl if url is None else url, headers = {'User-Agent': self._userAgent})
|
if (match := re.search(r'document\.cookie = decodeURIComponent\("gt=(\d+); Max-Age=10800; Domain=\.twitter\.com; Path=/; Secure"\);', r.text)):
|
||||||
if (match := re.search(r'document\.cookie = decodeURIComponent\("gt=(\d+); Max-Age=10800; Domain=\.twitter\.com; Path=/; Secure"\);', r.text)):
|
logger.debug('Found guest token in HTML')
|
||||||
logger.debug('Found guest token in HTML')
|
self._guestTokenManager.token = match.group(1)
|
||||||
self._guestToken = match.group(1)
|
if 'gt' in r.cookies:
|
||||||
if 'gt' in r.cookies:
|
logger.debug('Found guest token in cookies')
|
||||||
logger.debug('Found guest token in cookies')
|
self._guestTokenManager.token = r.cookies['gt']
|
||||||
self._guestToken = r.cookies['gt']
|
if not self._guestTokenManager.token:
|
||||||
if self._guestToken:
|
raise snscrape.base.ScraperException('Unable to find guest token')
|
||||||
self._session.cookies.set('gt', self._guestToken, domain = '.twitter.com', path = '/', secure = True, expires = time.time() + 10800)
|
logger.debug(f'Using guest token {self._guestTokenManager.token}')
|
||||||
self._apiHeaders['x-guest-token'] = self._guestToken
|
self._session.cookies.set('gt', self._guestTokenManager.token, domain = '.twitter.com', path = '/', secure = True, expires = self._guestTokenManager.setTime + 10800)
|
||||||
return
|
self._apiHeaders['x-guest-token'] = self._guestTokenManager.token
|
||||||
raise snscrape.base.ScraperException('Unable to find guest token')
|
|
||||||
|
|
||||||
def _unset_guest_token(self):
|
def _unset_guest_token(self):
|
||||||
self._guestToken = None
|
self._guestTokenManager.reset()
|
||||||
del self._session.cookies['gt']
|
del self._session.cookies['gt']
|
||||||
del self._apiHeaders['x-guest-token']
|
del self._apiHeaders['x-guest-token']
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user