4 Commits

Author SHA1 Message Date
Tristan Lee
40b8d9f267 Merge pull request #7 from bellingcat/more-tg-info
More tg info
2022-07-05 08:29:09 -07:00
Tristan Lee
fdc40f7411 Merge pull request #6 from bellingcat/add-vk-user
added User dataclass as argument to VKontaktePost dataclass
2022-07-05 08:28:01 -07:00
Tristan Lee
82351800d6 Merge pull request #5 from JustAnotherArchivist/master
merge upstream
2022-07-05 08:25:20 -07:00
Tristan Lee
cb429909d0 added User dataclass as argument to VKontaktePost dataclass 2022-07-05 10:21:59 -05:00
17 changed files with 389 additions and 942 deletions

View File

@@ -1,97 +0,0 @@
name: Bug report
description: Are you experiencing a problem? Create a report to help us improve!
labels: 'bug'
body:
- type: markdown
attributes:
value: |
## Self Check
- Try searching existing GitHub Issues (open or closed) for similar issues.
- type: textarea
validations:
required: true
attributes:
label: Describe the bug
description: A clear description of what the bug is.
placeholder: e.g. I see an AssertionError when trying to scrape a Twitter user!
- type: textarea
validations:
required: true
attributes:
label: How to reproduce
description: |
How to reproduce the problem.
This should be a minimal reproducible example, i.e. the shortest possible code or the smallest number of steps that still causes the error.
placeholder: e.g. I can reproduce this issue by scraping the textfiles user with the twitter-user scraper.
- type: textarea
validations:
required: true
attributes:
label: Expected behaviour
description: A brief description of what should happen.
- type: textarea
attributes:
label: Screenshots and recordings
description: |
If applicable, add screenshots or videos to help explain your problem. (Videos should be as short as possible! Avoid watermarks too.)
- type: input
validations:
required: true
attributes:
label: Operating system
description: Include the version too, please!
placeholder: e.g. Windows 10, Ubuntu 20.04, macOS 10.15...
- type: input
validations:
required: true
attributes:
label: |
Python version: output of `python3 --version`
- type: input
validations:
required: true
attributes:
label: |
snscrape version: output of `snscrape --version`
- type: input
validations:
required: true
attributes:
label: Scraper
placeholder: e.g. twitter-user, reddit-search, TwitterSearchScraper, ...
- type: dropdown
validations:
required: true
attributes:
label: How are you using snscrape?
options: ['CLI (`snscrape ...` as a command, e.g. in a terminal)', 'Module (`import snscrape.modules.something` in Python code)']
- type: textarea
validations:
required: false
attributes:
label: Backtrace
description: What is the error snscrape gives you, if any?
- type: textarea
validations:
required: false
attributes:
label: Log output
description: |
Insert here the debug log of snscrape.
If you use the CLI, add the global options `-vv` to the command, e.g. `snscrape -vv twitter-search ...`.
If you use the module, set the debug level in your Python code before any use of snscrape: `import logging; logging.basicConfig(level = logging.DEBUG)`.
If you already use `logging` in your own code, you may need to adjust the level there instead.
- type: textarea
validations:
required: false
attributes:
label: Dump of locals
description: |
Here attach the dump of your snscrape locals, if it's a crash. (snscrape should tell you the path).
Please note that it may contain identifying info such as IP address, if the website returns that.
You can also optionally request to exchange the file in private.
Finally, if snscrape didn't crash, leave this field blank.
- type: textarea
attributes:
label: Additional context
description: Add any other context about the problem here.

View File

@@ -1,27 +0,0 @@
name: Feature Request
description: Want a feature? Ask; we don't bite!
labels: 'enhancement'
body:
- type: markdown
attributes:
value: |
## Self Check
- Try searching existing GitHub Issues (open or closed) for similar issues.
- type: textarea
validations:
required: true
attributes:
label: Describe the feature
description: A clear description of what the feature is.
- type: textarea
validations:
required: false
attributes:
label: Would this fix a problem you're experiencing? If so, specify.
- type: textarea
attributes:
label: Did you consider other alternatives?
description: If so, specify
- type: input
attributes:
label: Additional context

View File

@@ -1,6 +0,0 @@
---
name: Question
about: Ask away! (Do not use this for bugs or features.)
labels: 'question'
---

View File

@@ -8,7 +8,7 @@ The following services are currently supported:
* Mastodon: user profiles and toots (single or thread)
* Reddit: users, subreddits, and searches (via Pushshift)
* Telegram: channels
* Twitter: users, user profiles, hashtags, searches (live tweets, top tweets, and users), tweets (single or surrounding thread), list posts, communities, and trends
* Twitter: users, user profiles, hashtags, searches, tweets (single or surrounding thread), list posts, and trends
* VKontakte: user profiles
* Weibo (Sina Weibo): user profiles
@@ -59,10 +59,7 @@ To get the latest 100 tweets with the hashtag #archiveteam:
It is also possible to use snscrape as a library in Python, but this is currently undocumented.
## Issue reporting
If you discover an issue with snscrape, please report it at <https://github.com/JustAnotherArchivist/snscrape/issues>. If you use the CLI, please run snscrape with `-vv` and include the log output in the issue. If you use snscrape as a module, please enable debug-level logging using `import logging; logging.basicConfig(level = logging.DEBUG)` (before using snscrape at all) and include the log output in the issue.
### Dump files
In some cases, debugging may require more information than is available in the log. The CLI has a `--dump-locals` option that enables dumping all local variables within snscrape based on important log messages (rather than, by default, only on crashes). Note that the dump files may contain sensitive information in some cases and could potentially be used to identify you (e.g. if the service includes your IP address in its response). If you prefer to arrange a file transfer privately, just mention that in the issue.
If you discover an issue with snscrape, please report it at <https://github.com/JustAnotherArchivist/snscrape/issues>. If possible please run snscrape with `-vv` and `--dump-locals` and include the log output as well as the dump files referenced in the log in the issue. Note that the files may contain sensitive information in some cases and could potentially be used to identify you (e.g. if the service includes your IP address in its response). If you prefer to arrange a file transfer privately, just mention that in the issue.
## License
This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

View File

@@ -1,37 +0,0 @@
[build-system]
requires = ['setuptools>=61', 'setuptools_scm>=6.2']
build-backend = 'setuptools.build_meta'
[tool.setuptools]
packages = ['snscrape', 'snscrape.modules']
[tool.setuptools_scm]
[project]
name = 'snscrape'
description = 'A social networking service scraper'
readme = 'README.md'
authors = [{name = 'JustAnotherArchivist'}]
classifiers = [
'Development Status :: 4 - Beta',
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
]
dependencies = [
'requests[socks]',
'lxml',
'beautifulsoup4',
'pytz; python_version < "3.9.0"',
'filelock',
]
requires-python = '~=3.8'
dynamic = ['version']
[project.urls]
repository = "https://github.com/JustAnotherArchivist/snscrape"
[project.scripts]
snscrape = 'snscrape._cli:main'

42
setup.py Normal file
View File

@@ -0,0 +1,42 @@
import os.path
import setuptools
with open(os.path.join(os.path.dirname(__file__), 'README.md')) as fp:
readme = fp.read()
setuptools.setup(
name = 'snscrape',
description = 'A social networking service scraper',
long_description = readme,
long_description_content_type = 'text/markdown',
author = 'JustAnotherArchivist',
url = 'https://github.com/JustAnotherArchivist/snscrape',
classifiers = [
'Development Status :: 4 - Beta',
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
],
packages = ['snscrape', 'snscrape.modules'],
setup_requires = ['setuptools_scm'],
use_scm_version = True,
install_requires = [
'requests[socks]',
'lxml',
'beautifulsoup4',
'pytz; python_version < "3.9.0"',
'filelock',
],
python_requires = '~=3.8',
extras_require = {
'test': ['coverage'],
},
entry_points = {
'console_scripts': [
'snscrape = snscrape._cli:main',
],
},
)

View File

@@ -6,7 +6,6 @@ import datetime
import importlib.metadata
import inspect
import logging
import os
import requests
# Imported in parse_args() after setting up the logger:
#import snscrape.base
@@ -24,7 +23,7 @@ logger = logging # Replaced below after setting the logger class
class Logger(logging.Logger):
def _log_with_stack(self, level, *args, **kwargs):
super().log(level, *args, **kwargs)
if dumpLocals and not kwargs.get('extra', {}).get('_snscrapeSuppressDumpLocals', False):
if dumpLocals:
stack = inspect.stack()
if len(stack) >= 3:
name = _dump_stack_and_locals(stack[2:][::-1])
@@ -119,7 +118,7 @@ def _dump_locals_on_exception():
trace = inspect.trace()
if len(trace) >= 2:
name = _dump_stack_and_locals(trace[1:], exc = e)
logger.fatal(f'Dumped stack and locals to {name}', extra = {'_snscrapeSuppressDumpLocals': True})
logger.fatal(f'Dumped stack and locals to {name}')
raise
@@ -308,36 +307,32 @@ def main():
i = 0
with _dump_locals_on_exception():
try:
if args.withEntity and (entity := scraper.entity):
if args.jsonl:
print(entity.json())
else:
print(entity)
if args.maxResults == 0:
logger.info('Exiting after 0 results')
return
for i, item in enumerate(scraper.get_items(), start = 1):
if args.since is not None and item.date < args.since:
logger.info(f'Exiting due to reaching older results than {args.since}')
break
if args.jsonl:
print(item.json())
elif args.format is not None:
print(args.format.format(item))
else:
print(item)
if args.progress and i % 100 == 0:
print(f'Scraping, {i} results so far', file = sys.stderr)
if args.maxResults and i >= args.maxResults:
logger.info(f'Exiting after {i} results')
if args.progress:
print(f'Stopped scraping after {i} results due to --max-results', file = sys.stderr)
break
if args.withEntity and (entity := scraper.entity):
if args.jsonl:
print(entity.json())
else:
logger.info(f'Done, found {i} results')
print(entity)
if args.maxResults == 0:
logger.info('Exiting after 0 results')
return
for i, item in enumerate(scraper.get_items(), start = 1):
if args.since is not None and item.date < args.since:
logger.info(f'Exiting due to reaching older results than {args.since}')
break
if args.jsonl:
print(item.json())
elif args.format is not None:
print(args.format.format(item))
else:
print(item)
if args.progress and i % 100 == 0:
print(f'Scraping, {i} results so far', file = sys.stderr)
if args.maxResults and i >= args.maxResults:
logger.info(f'Exiting after {i} results')
if args.progress:
print(f'Finished, {i} results', file = sys.stderr)
except BrokenPipeError:
os.dup2(os.open(os.devnull, os.O_WRONLY), sys.stdout.fileno())
sys.exit(1)
print(f'Stopped scraping after {i} results due to --max-results', file = sys.stderr)
break
else:
logger.info(f'Done, found {i} results')
if args.progress:
print(f'Finished, {i} results', file = sys.stderr)

View File

@@ -1,6 +1,3 @@
__all__ = ['DeprecatedFeatureWarning', 'IntWithGranularity', 'Item', 'Scraper', 'ScraperException']
import abc
import copy
import dataclasses
@@ -9,28 +6,11 @@ import functools
import json
import logging
import requests
import requests.adapters
import urllib3.connection
import time
import warnings
_logger = logging.getLogger(__name__)
def _module_deprecation_helper(all, **names):
def __getattr__(name):
if name in names:
warnings.warn(f'{name} is deprecated, use {names[name].__name__} instead', DeprecatedFeatureWarning, stacklevel = 2)
return names[name]
raise AttributeError(f'module {__name__!r} has no attribute {name!r}')
def __dir__():
return sorted(all + list(names.keys()))
return __getattr__, __dir__
class DeprecatedFeatureWarning(FutureWarning):
pass
logger = logging.getLogger(__name__)
class _DeprecatedProperty:
@@ -42,7 +22,7 @@ class _DeprecatedProperty:
def __get__(self, obj, objType):
if obj is None: # if the access is through the class using _DeprecatedProperty rather than an instance of the class:
return self
warnings.warn(f'{self.name} is deprecated, use {self.replStr} instead', DeprecatedFeatureWarning, stacklevel = 2)
warnings.warn(f'{self.name} is deprecated, use {self.replStr} instead', FutureWarning, stacklevel = 2)
return self.repl(obj)
@@ -63,9 +43,9 @@ def _json_dataclass_to_dict(obj):
if field.name.startswith('_'):
continue
out[field.name] = _json_dataclass_to_dict(getattr(obj, field.name))
# Add properties
# Add in (non-deprecated) properties
for k in dir(obj):
if isinstance(getattr(type(obj), k, None), (property, _DeprecatedProperty)):
if isinstance(getattr(type(obj), k, None), property):
assert k != '_type'
if k.startswith('_'):
continue
@@ -88,9 +68,7 @@ class _JSONDataclass:
def json(self):
'''Convert the object to a JSON string'''
with warnings.catch_warnings():
warnings.filterwarnings(action = 'ignore', category = DeprecatedFeatureWarning)
out = _json_dataclass_to_dict(self)
out = _json_dataclass_to_dict(self)
for key, value in list(out.items()): # Modifying the dict below, so make a copy first
if isinstance(value, IntWithGranularity):
out[key] = int(value)
@@ -101,7 +79,7 @@ class _JSONDataclass:
@dataclasses.dataclass
class Item(_JSONDataclass):
'''An abstract base class for an item returned by the scraper.
'''An abstract base class for an item returned by the scraper's get_items generator.
An item can really be anything. The string representation should be useful for the CLI output (e.g. a direct URL for the item).
'''
@@ -111,6 +89,18 @@ class Item(_JSONDataclass):
pass
@dataclasses.dataclass
class Entity(_JSONDataclass):
'''An abstract base class for an entity returned by the scraper's entity property.
An entity is typically the account of a person or organisation. The string representation should be the preferred direct URL to the entity's page on the network.
'''
@abc.abstractmethod
def __str__(self):
pass
class IntWithGranularity(int):
'''A number with an associated granularity
@@ -126,31 +116,18 @@ class IntWithGranularity(int):
return (IntWithGranularity, (int(self), self.granularity))
class _HTTPSAdapter(requests.adapters.HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
super().init_poolmanager(*args, **kwargs)
#FIXME: Uses private urllib3.PoolManager attribute pool_classes_by_scheme.
try:
self.poolmanager.pool_classes_by_scheme['https'].ConnectionCls = _HTTPSConnection
except (AttributeError, KeyError) as e:
_logger.debug(f'Could not install TLS cipher logger: {type(e).__module__}.{type(e).__name__} {e!s}')
class URLItem(Item):
'''A generic item which only holds a URL string.'''
def __init__(self, url):
self._url = url
class _HTTPSConnection(urllib3.connection.HTTPSConnection):
def connect(self, *args, **kwargs):
conn = super().connect(*args, **kwargs)
#FIXME: Uses undocumented attribute self.sock and beyond.
try:
_logger.debug(f'Connected to: {self.sock.getpeername()}')
except AttributeError:
# self.sock might be a urllib3.util.ssltransport.SSLTransport, which lacks getpeername.
pass
try:
_logger.debug(f'Connection cipher: {self.sock.cipher()}')
except AttributeError:
# Shouldn't be possible, but better safe than sorry.
pass
return conn
@property
def url(self):
return self._url
def __str__(self):
return self._url
class ScraperException(Exception):
@@ -166,7 +143,6 @@ class Scraper:
self._retries = retries
self._proxies = proxies
self._session = requests.Session()
self._session.mount('https://', _HTTPSAdapter())
@abc.abstractmethod
def get_items(self):
@@ -188,17 +164,16 @@ class Scraper:
def _request(self, method, url, params = None, data = None, headers = None, timeout = 10, responseOkCallback = None, allowRedirects = True, proxies = None):
proxies = proxies or self._proxies or {}
errors = []
for attempt in range(self._retries + 1):
# The request is newly prepared on each retry because of potential cookie updates.
req = self._session.prepare_request(requests.Request(method, url, params = params, data = data, headers = headers))
environmentSettings = self._session.merge_environment_settings(req.url, proxies, None, None, None)
_logger.info(f'Retrieving {req.url}')
_logger.debug(f'... with headers: {headers!r}')
logger.info(f'Retrieving {req.url}')
logger.debug(f'... with headers: {headers!r}')
if data:
_logger.debug(f'... with data: {data!r}')
logger.debug(f'... with data: {data!r}')
if environmentSettings:
_logger.debug(f'... with environmentSettings: {environmentSettings!r}')
logger.debug(f'... with environmentSettings: {environmentSettings!r}')
try:
r = self._session.send(req, allow_redirects = allowRedirects, timeout = timeout, **environmentSettings)
except requests.exceptions.RequestException as exc:
@@ -208,25 +183,21 @@ class Scraper:
else:
retrying = ''
level = logging.ERROR
_logger.log(level, f'Error retrieving {req.url}: {exc!r}{retrying}')
errors.append(repr(exc))
logger.log(level, f'Error retrieving {req.url}: {exc!r}{retrying}')
else:
redirected = f' (redirected to {r.url})' if r.history else ''
_logger.info(f'Retrieved {req.url}{redirected}: {r.status_code}')
_logger.debug(f'... with response headers: {r.headers!r}')
logger.info(f'Retrieved {req.url}{redirected}: {r.status_code}')
if r.history:
for i, redirect in enumerate(r.history):
_logger.debug(f'... request {i}: {redirect.request.url}: {redirect.status_code} (Location: {redirect.headers.get("Location")})')
_logger.debug(f'... ... with response headers: {redirect.headers!r}')
logger.debug(f'... request {i}: {redirect.request.url}: {r.status_code} (Location: {r.headers.get("Location")})')
if responseOkCallback is not None:
success, msg = responseOkCallback(r)
errors.append(msg)
else:
success, msg = (True, None)
msg = f': {msg}' if msg else ''
if success:
_logger.debug(f'{req.url} retrieved successfully{msg}')
logger.debug(f'{req.url} retrieved successfully{msg}')
return r
else:
if attempt < self._retries:
@@ -235,15 +206,14 @@ class Scraper:
else:
retrying = ''
level = logging.ERROR
_logger.log(level, f'Error retrieving {req.url}{msg}{retrying}')
logger.log(level, f'Error retrieving {req.url}{msg}{retrying}')
if attempt < self._retries:
sleepTime = 1.0 * 2**attempt # exponential backoff: sleep 1 second after first attempt, 2 after second, 4 after third, etc.
_logger.info(f'Waiting {sleepTime:.0f} seconds')
logger.info(f'Waiting {sleepTime:.0f} seconds')
time.sleep(sleepTime)
else:
msg = f'{self._retries + 1} requests to {req.url} failed, giving up.'
_logger.fatal(msg)
_logger.fatal(f'Errors: {", ".join(errors)}')
logger.fatal(msg)
raise ScraperException(msg)
raise RuntimeError('Reached unreachable code')
@@ -274,6 +244,3 @@ def nonempty_string(name):
raise ValueError('must not be an empty string')
f.__name__ = name
return f
__getattr__, __dir__ = _module_deprecation_helper(__all__, Entity = Item)

View File

@@ -30,7 +30,7 @@ class FacebookPost(snscrape.base.Item):
@dataclasses.dataclass
class User(snscrape.base.Item):
class User(snscrape.base.Entity):
username: str
pageId: int
name: str

View File

@@ -32,7 +32,7 @@ class InstagramPost(snscrape.base.Item):
@dataclasses.dataclass
class User(snscrape.base.Item):
class User(snscrape.base.Entity):
username: str
name: typing.Optional[str]
followers: snscrape.base.IntWithGranularity

View File

@@ -67,7 +67,7 @@ class PollOption:
@dataclasses.dataclass
class User(snscrape.base.Item):
class User(snscrape.base.Entity):
account: str # @username@domain.invalid
displayName: typing.Optional[str] = None
displayNameWithCustomEmojis: typing.Optional[typing.List[typing.Union[str, 'CustomEmoji']]] = None

View File

@@ -133,21 +133,6 @@ class _RedditPushshiftScraper(snscrape.base.Scraper):
return cls(**kwargs)
def _iter_api(self, url, params = None):
'''Iterate through the Pushshift API using the 'until' parameter and yield the items.'''
lowestIdSeen = None
if params is None:
params = {}
while True:
obj = self._get_api(url, params = params)
if not obj['data'] or (lowestIdSeen is not None and all(_cmp_id(d['id'], lowestIdSeen) >= 0 for d in obj['data'])): # end of pagination
break
for d in obj['data']:
if lowestIdSeen is None or _cmp_id(d['id'], lowestIdSeen) == -1:
yield self._api_obj_to_item(d)
lowestIdSeen = d['id']
params['until'] = obj["data"][-1]["created_utc"] + 1
class _RedditPushshiftSearchScraper(_RedditPushshiftScraper):
def __init__(self, name, *, submissions = True, comments = True, before = None, after = None, **kwargs):
@@ -163,20 +148,35 @@ class _RedditPushshiftSearchScraper(_RedditPushshiftScraper):
if not self._submissions and not self._comments:
raise ValueError('At least one of submissions and comments must be True')
def _iter_api(self, url, params = None):
'''Iterate through the Pushshift API using the 'before' parameter and yield the items.'''
lowestIdSeen = None
if params is None:
params = {}
if self._before is not None:
params['before'] = self._before
if self._after is not None:
params['after'] = self._after
params['sort'] = 'desc'
while True:
obj = self._get_api(url, params = params)
if not obj['data'] or (lowestIdSeen is not None and all(_cmp_id(d['id'], lowestIdSeen) >= 0 for d in obj['data'])): # end of pagination
break
for d in obj['data']:
if lowestIdSeen is None or _cmp_id(d['id'], lowestIdSeen) == -1:
yield self._api_obj_to_item(d)
lowestIdSeen = d['id']
params['before'] = obj["data"][-1]["created_utc"] + 1
def _iter_api_submissions_and_comments(self, params: dict):
# Retrieve both submissions and comments, interleave the results to get a reverse-chronological order
params['limit'] = '1000'
if self._before is not None:
params['until'] = self._before
if self._after is not None:
params['since'] = self._after
params['size'] = '1000'
if self._submissions:
submissionsIter = self._iter_api('https://api.pushshift.io/reddit/search/submission', params.copy()) # Pass copies to prevent the two iterators from messing each other up by using the same dict
submissionsIter = self._iter_api('https://api.pushshift.io/reddit/search/submission/', params.copy()) # Pass copies to prevent the two iterators from messing each other up by using the same dict
else:
submissionsIter = iter(())
if self._comments:
commentsIter = self._iter_api('https://api.pushshift.io/reddit/search/comment', params.copy())
commentsIter = self._iter_api('https://api.pushshift.io/reddit/search/comment/', params.copy())
else:
commentsIter = iter(())
@@ -260,15 +260,21 @@ class RedditSubmissionScraper(_RedditPushshiftScraper):
self._submissionId = submissionId
def get_items(self):
obj = self._get_api(f'https://api.pushshift.io/reddit/search/submission?ids={self._submissionId}')
obj = self._get_api(f'https://api.pushshift.io/reddit/search/submission/?ids={self._submissionId}')
if not obj['data']:
return
if len(obj['data']) != 1:
raise snscrape.base.ScraperException(f'Got {len(obj["data"])} results instead of 1')
yield self._api_obj_to_item(obj['data'][0])
# Upstream bug: link_id must be provided in decimal https://old.reddit.com/r/pushshift/comments/zkggt0/update_on_colo_switchover_bug_fixes_reindexing/
yield from self._iter_api('https://api.pushshift.io/reddit/search/comment', {'link_id': int(self._submissionId, 36), 'limit': 1000})
obj = self._get_api(f'https://api.pushshift.io/reddit/submission/comment_ids/{self._submissionId}')
if not obj['data']:
return
commentIds = obj['data']
for i in range(0, len(commentIds), 500):
ids = commentIds[i : i + 500]
obj = self._get_api(f'https://api.pushshift.io/reddit/comment/search?ids={",".join(ids)}')
yield from map(self._api_obj_to_item, obj['data'])
@classmethod
def _cli_setup_parser(cls, subparser):

View File

@@ -24,7 +24,7 @@ class LinkPreview:
@dataclasses.dataclass
class Channel(snscrape.base.Item):
class Channel(snscrape.base.Entity):
username: str
title: typing.Optional[str] = None
verified: typing.Optional[bool] = None
@@ -269,10 +269,13 @@ class TelegramChannelScraper(snscrape.base.Scraper):
if r.status_code != 200:
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
soup = bs4.BeautifulSoup(r.text, 'lxml')
if (membersDiv := soup.find('div', class_ = 'tgme_page_extra')):
if membersDiv.text.split(',')[0].endswith((' members', ' subscribers')):
membersStr = ''.join(membersDiv.text.split(',')[0].split(' ')[:-1])
kwargs['members'] = 0 if membersStr == 'no' else int(membersStr)
membersDiv = soup.find('div', class_ = 'tgme_page_extra')
if membersDiv.text.split(',')[0].endswith((' members', ' subscribers')):
membersStr = ''.join(membersDiv.text.split(',')[0].split(' ')[:-1])
if membersStr == 'no':
kwargs['members'] = 0
else:
kwargs['members'] = int(membersStr)
photoImg = soup.find('img', class_ = 'tgme_page_photo_image')
if photoImg is not None:
kwargs['photo'] = photoImg.attrs['src']

File diff suppressed because it is too large Load Diff

View File

@@ -38,11 +38,35 @@ _datePattern = re.compile(r'^(?P<date>today'
r'\s+at\s+(?P<hour>\d+):(?P<minute>\d+)\s+(?P<ampm>[ap]m)$')
@dataclasses.dataclass
class User(snscrape.base.Entity):
username: str
name: str
verified: bool
description: typing.Optional[str] = None
websites: typing.Optional[typing.List[str]] = None
followers: typing.Optional[snscrape.base.IntWithGranularity] = None
posts: typing.Optional[snscrape.base.IntWithGranularity] = None
photos: typing.Optional[snscrape.base.IntWithGranularity] = None
tags: typing.Optional[snscrape.base.IntWithGranularity] = None
following: typing.Optional[snscrape.base.IntWithGranularity] = None
followersGranularity = snscrape.base._DeprecatedProperty('followersGranularity', lambda self: self.followers.granularity, 'followers.granularity')
postsGranularity = snscrape.base._DeprecatedProperty('postsGranularity', lambda self: self.posts.granularity, 'posts.granularity')
photosGranularity = snscrape.base._DeprecatedProperty('photosGranularity', lambda self: self.photos.granularity, 'photos.granularity')
tagsGranularity = snscrape.base._DeprecatedProperty('tagsGranularity', lambda self: self.tags.granularity, 'tags.granularity')
followingGranularity = snscrape.base._DeprecatedProperty('followingGranularity', lambda self: self.following.granularity, 'following.granularity')
def __str__(self):
return f'https://vk.com/{self.username}'
@dataclasses.dataclass
class VKontaktePost(snscrape.base.Item):
url: str
date: typing.Optional[typing.Union[datetime.datetime, datetime.date]]
content: str
user: User
outlinks: typing.Optional[typing.List[str]] = None
photos: typing.Optional[typing.List['Photo']] = None
video: typing.Optional['Video'] = None
@@ -74,29 +98,6 @@ class Video:
thumbUrl: str
@dataclasses.dataclass
class User(snscrape.base.Item):
username: str
name: str
verified: bool
description: typing.Optional[str] = None
websites: typing.Optional[typing.List[str]] = None
followers: typing.Optional[snscrape.base.IntWithGranularity] = None
posts: typing.Optional[snscrape.base.IntWithGranularity] = None
photos: typing.Optional[snscrape.base.IntWithGranularity] = None
tags: typing.Optional[snscrape.base.IntWithGranularity] = None
following: typing.Optional[snscrape.base.IntWithGranularity] = None
followersGranularity = snscrape.base._DeprecatedProperty('followersGranularity', lambda self: self.followers.granularity, 'followers.granularity')
postsGranularity = snscrape.base._DeprecatedProperty('postsGranularity', lambda self: self.posts.granularity, 'posts.granularity')
photosGranularity = snscrape.base._DeprecatedProperty('photosGranularity', lambda self: self.photos.granularity, 'photos.granularity')
tagsGranularity = snscrape.base._DeprecatedProperty('tagsGranularity', lambda self: self.tags.granularity, 'tags.granularity')
followingGranularity = snscrape.base._DeprecatedProperty('followingGranularity', lambda self: self.following.granularity, 'following.granularity')
def __str__(self):
return f'https://vk.com/{self.username}'
class VKontakteUserScraper(snscrape.base.Scraper):
name = 'vkontakte-user'
@@ -117,9 +118,6 @@ class VKontakteUserScraper(snscrape.base.Scraper):
return urllib.parse.unquote(a['href'][13 : end])
return None
def is_photo(self, a):
return 'aria-label' in a.attrs and a.attrs['aria-label'].startswith('photo')
def _date_span_to_date(self, dateSpan):
if not dateSpan:
return None
@@ -175,7 +173,7 @@ class VKontakteUserScraper(snscrape.base.Scraper):
not (not isCopy and thumbsDiv.parent.name == 'div' and 'class' in thumbsDiv.parent.attrs and 'copy_quote' in thumbsDiv.parent.attrs['class']): # Skip post quotes
photos = []
for a in thumbsDiv.find_all('a', class_ = 'page_post_thumb_wrap'):
if not self.is_photo(a) and 'data-video' not in a.attrs:
if 'data-photo-id' not in a.attrs and 'data-video' not in a.attrs:
_logger.warning(f'Skipping non-photo and non-video thumb wrap on {url}')
continue
if 'data-video' in a.attrs:
@@ -215,14 +213,24 @@ class VKontakteUserScraper(snscrape.base.Scraper):
photoUrl = f'https://vk.com{a["href"]}' if 'href' in a.attrs and a['href'].startswith('/photo') and a['href'][6:].strip('0123456789-_') == '' else None
photos.append(Photo(variants = photoVariants, url = photoUrl))
quotedPost = self._post_div_to_item(quoteDiv, isCopy = True) if (quoteDiv := post.find('div', class_ = 'copy_quote')) else None
authorHeading = post.find('h5', class_ = ['post_author', 'copy_post_author'])
authorLink = authorHeading.find('a', class_ = ['author', 'copy_author'])
username = authorLink['href'].split('/')[-1]
name = authorLink.text
if authorHeading.find('div', class_ = 'page_verified') is not None:
verified = True
else:
verified = False
user = User(username = username, name = name, verified = verified)
return VKontaktePost(
url = url,
date = self._date_span_to_date(dateSpan),
content = textDiv.text if textDiv else None,
outlinks = outlinks or None,
photos = photos or None,
video = video or None,
quotedPost = quotedPost,
url = url,
date = self._date_span_to_date(dateSpan),
content = textDiv.text if textDiv else None,
user = user,
outlinks = outlinks or None,
photos = photos or None,
video = video or None,
quotedPost = quotedPost,
)
def _soup_to_items(self, soup):
@@ -379,6 +387,13 @@ class VKontakteUserScraper(snscrape.base.Scraper):
if (followersDiv := soup.find('div', id = 'public_followers')):
if (topDiv := followersDiv.find('div', class_ = 'header_top')) and topDiv.find('span', class_ = 'header_label').text == 'Followers':
kwargs['followers'] = snscrape.base.IntWithGranularity(*parse_num(topDiv.find('span', class_ = 'header_count').text))
# On community groups, this is where followers are listed
elif (followersDiv := soup.find('div', class_ = 'group_friends_text')):
kwargs['followers'] = snscrape.base.IntWithGranularity(*parse_num(followersDiv.find('span', class_ = 'group_friends_count').text))
# On public groups, this is where followers are listed
elif (followersDiv := soup.find('div', id = 'group_followers')):
if (topDiv := followersDiv.find('div', class_ = 'header_top')) and topDiv.find('span', class_ = 'header_label').text == 'Members':
kwargs['followers'] = snscrape.base.IntWithGranularity(*parse_num(topDiv.find('span', class_ = 'header_count').text))
return User(**kwargs)

View File

@@ -34,7 +34,7 @@ class Post(snscrape.base.Item):
@dataclasses.dataclass
class User(snscrape.base.Item):
class User(snscrape.base.Entity):
screenname: str
uid: int
verified: bool
@@ -81,8 +81,6 @@ class WeiboUserScraper(snscrape.base.Scraper):
return True, None
def _mblog_to_item(self, mblog):
if mblog.get('page_info', {}).get('type') not in (None, 'video', 'webpage'):
_logger.warning(f'Skipping unknown page info {mblog["page_info"]["type"]!r} on status {mblog["id"]}')
return Post(
url = f'https://m.weibo.cn/status/{mblog["bid"]}',
id = mblog['id'],
@@ -94,7 +92,7 @@ class WeiboUserScraper(snscrape.base.Scraper):
likesCount = mblog.get('attitudes_count'),
picturesCount = mblog.get('pic_num'),
pictures = [x['large']['url'] for x in mblog['pics']] if 'pics' in mblog else None,
video = urls.get('mp4_720p_mp4') or urls.get('mp4_hd_mp4') or urls['mp4_ld_mp4'] if 'page_info' in mblog and mblog['page_info']['type'] == 'video' and (urls := mblog['page_info']['urls']) else None,
video = mblog['page_info']['media_info']['mp4_720p_mp4'] if 'page_info' in mblog and mblog['page_info']['type'] == 'video' else None,
link = mblog['page_info']['page_url'] if 'page_info' in mblog and mblog['page_info']['type'] == 'webpage' else None,
repostedPost = self._mblog_to_item(mblog['retweeted_status']) if 'retweeted_status' in mblog else None,
)

View File

@@ -1,16 +0,0 @@
def dict_map(input, keyMap):
'''Return a new dict from an input dict and a {'input_key': 'output_key'} mapping'''
return {outputKey: input[inputKey] for inputKey, outputKey in keyMap.items() if inputKey in input}
def snake_to_camel(**kwargs):
'''Return a new dict from kwargs with snake_case keys replaced by camelCase'''
out = {}
for key, value in kwargs.items():
keyParts = key.split('_')
for i in range(1, len(keyParts)):
keyParts[i] = keyParts[i][:1].upper() + keyParts[i][1:]
out[''.join(keyParts)] = value
return out