mirror of
https://github.com/bellingcat/snscrape.git
synced 2026-06-11 03:48:29 +03:00
Add support for Instagram hashtags (fixes #29)
This commit is contained in:
@@ -21,22 +21,42 @@ class InstagramPost(typing.NamedTuple, snscrape.base.Item):
|
||||
return self.cleanUrl
|
||||
|
||||
|
||||
class InstagramUserScraper(snscrape.base.Scraper):
|
||||
name = 'instagram-user'
|
||||
|
||||
def __init__(self, username, **kwargs):
|
||||
class InstagramCommonScraper(snscrape.base.Scraper):
|
||||
def __init__(self, mode, name, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._username = username
|
||||
if mode not in ('User', 'Hashtag'):
|
||||
raise ValueError('Invalid mode')
|
||||
self._mode = mode
|
||||
self._name = name
|
||||
|
||||
def _response_to_items(self, response, username):
|
||||
for node in response['user']['edge_owner_to_timeline_media']['edges']:
|
||||
if self._mode == 'User':
|
||||
self._initialUrl = f'https://www.instagram.com/{self._name}/'
|
||||
self._pageName = 'ProfilePage'
|
||||
self._responseContainer = 'user'
|
||||
self._edgeXToMedia = 'edge_owner_to_timeline_media'
|
||||
self._pageIDKey = 'id'
|
||||
self._queryHash = '42323d64886122307be10013ad2dcc44'
|
||||
self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}'
|
||||
elif self._mode == 'Hashtag':
|
||||
self._initialUrl = f'https://www.instagram.com/explore/tags/{self._name}/'
|
||||
self._pageName = 'TagPage'
|
||||
self._responseContainer = 'hashtag'
|
||||
self._edgeXToMedia = 'edge_hashtag_to_media'
|
||||
self._pageIDKey = 'name'
|
||||
self._queryHash = 'f92f56d47dc7a55b606908374b43a314'
|
||||
self._variablesFormat = '{{"tag_name":"{pageID}","show_ranked":false,"first":10,"after":"{endCursor}"}}'
|
||||
|
||||
def _response_to_items(self, response):
|
||||
for node in response[self._responseContainer][self._edgeXToMedia]['edges']:
|
||||
print(f'Processing: {node!r}')
|
||||
code = node['node']['shortcode']
|
||||
usernameQuery = '?taken-by=' + node['node']['owner']['username'] if 'username' in node['node']['owner'] else ''
|
||||
cleanUrl = f'https://www.instagram.com/p/{code}/'
|
||||
yield InstagramPost(
|
||||
cleanUrl = cleanUrl,
|
||||
dirtyUrl = f'{cleanUrl}?taken-by={username}',
|
||||
dirtyUrl = f'{cleanUrl}{usernameQuery}',
|
||||
date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc),
|
||||
content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'],
|
||||
content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None,
|
||||
thumbnailUrl = node['node']['thumbnail_src'],
|
||||
displayUrl = node['node']['display_url'],
|
||||
)
|
||||
@@ -45,9 +65,9 @@ class InstagramUserScraper(snscrape.base.Scraper):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
|
||||
|
||||
logger.info('Retrieving initial data')
|
||||
r = self._get(f'https://www.instagram.com/{self._username}/', headers = headers)
|
||||
r = self._get(self._initialUrl, headers = headers)
|
||||
if r.status_code == 404:
|
||||
logger.warning('User does not exist')
|
||||
logger.warning(f'{self._mode} does not exist')
|
||||
return
|
||||
elif r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
@@ -55,42 +75,57 @@ class InstagramUserScraper(snscrape.base.Scraper):
|
||||
jsonData = r.text.split('<script type="text/javascript">window._sharedData = ')[1].split(';</script>')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble.
|
||||
response = json.loads(jsonData)
|
||||
rhxGis = response['rhx_gis']
|
||||
if response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['count'] == 0:
|
||||
logger.info('User has no posts')
|
||||
if response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['count'] == 0:
|
||||
logger.info(f'{self._mode} has no posts')
|
||||
return
|
||||
if not response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['edges']:
|
||||
if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['edges']:
|
||||
logger.warning('Private account')
|
||||
return
|
||||
userID = response['entry_data']['ProfilePage'][0]['graphql']['user']['id']
|
||||
username = response['entry_data']['ProfilePage'][0]['graphql']['user']['username'] # Might have different capitalisation than self._username
|
||||
yield from self._response_to_items(response['entry_data']['ProfilePage'][0]['graphql'], username)
|
||||
if not response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']:
|
||||
pageID = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._pageIDKey]
|
||||
yield from self._response_to_items(response['entry_data'][self._pageName][0]['graphql'])
|
||||
if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
|
||||
return
|
||||
endCursor = response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
|
||||
endCursor = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
|
||||
|
||||
while True:
|
||||
logger.info(f'Retrieving endCursor = {endCursor!r}')
|
||||
variables = f'{{"id":"{userID}","first":50,"after":"{endCursor}"}}'
|
||||
variables = self._variablesFormat.format(**locals())
|
||||
headers['X-Requested-With'] = 'XMLHttpRequest'
|
||||
headers['X-Instagram-GIS'] = hashlib.md5(f'{rhxGis}:{variables}'.encode('utf-8')).hexdigest()
|
||||
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash=42323d64886122307be10013ad2dcc44&variables={variables}', headers = headers)
|
||||
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers)
|
||||
|
||||
if r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
|
||||
response = json.loads(r.text)
|
||||
if not response['data']['user']['edge_owner_to_timeline_media']['edges']:
|
||||
if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']:
|
||||
return
|
||||
yield from self._response_to_items(response['data'], username)
|
||||
if not response['data']['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']:
|
||||
yield from self._response_to_items(response['data'])
|
||||
if not response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
|
||||
return
|
||||
endCursor = response['data']['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
|
||||
endCursor = response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
|
||||
|
||||
|
||||
class InstagramUserScraper(InstagramCommonScraper):
|
||||
name = 'instagram-user'
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('username', help = 'An Instagram username')
|
||||
subparser.add_argument('username', help = 'An Instagram username (no leading @)')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.username, retries = args.retries)
|
||||
return cls('User', args.username, retries = args.retries)
|
||||
|
||||
|
||||
class InstagramHashtagScraper(InstagramCommonScraper):
|
||||
name = 'instagram-hashtag'
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('hashtag', help = 'An Instagram hashtag (no leading #)')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls('Hashtag', args.hashtag, retries = args.retries)
|
||||
|
||||
Reference in New Issue
Block a user