Remove pd/np dependencies, add exception handling, call requests concurrently

This commit is contained in:
Ivan Kazakov
2022-03-26 19:07:16 +03:00
parent d3bef4005a
commit b3e8de3b5a

View File

@@ -1,45 +1,85 @@
import requests
import numpy as np
import pandas as pd
import argparse
import json
from string import Template
from datetime import datetime, timezone
import sys
from statistics import pstdev
from itertools import product
import csv
from concurrent.futures import ThreadPoolExecutor
# gets instagram "locations" around a particular lat/lng using internal API
# (requires session cookie for authentication)
def get_instagram_locations(lat, lng, cookie):
locs = requests.get("https://www.instagram.com/location_search/?latitude=" + str(lat) + "&longitude=" + str(lng) + "&__a=1", headers={
'Cookie': cookie
}).json()
return locs['venues']
timeout = 5.0
lat_long = f"lat: {lat:.6f} | lng: {lng:.6f}"
url = "https://www.instagram.com/location_search/"
params = {"latitude": lat, "longitude": lng, "__a": 1}
headers = {'Cookie': cookie}
try:
response = requests.get(url, params=params, headers=headers, timeout=timeout)
except requests.exceptions.ConnectionError as e:
print(f"Connection failed for {lat_long}: {e}")
return []
except requests.exceptions.Timeout:
print(f"Connections timed out after {timeout} seconds")
return []
try:
locations = response.json()
except json.JSONDecodeError:
print(f"Failed to get location data for {lat_long}")
return []
if not isinstance(locations, dict):
print(f"Got invalid response for {lat_long}")
return []
locations = locations.get("venues", [])
return locations
def get_instagram_locations_by_query(query):
locs = requests.get("https://www.instagram.com/web/search/topsearch/?context=place&query=" + query).json()
return [v['place']['location'] for v in locs['places']]
# queries the instagram location API for several points around a central lat/lng
# in order to return additional results
def get_fuzzy_locations(lat, lng, cookie, sigma=2):
locs = get_instagram_locations(lat, lng, cookie)
std_lat = np.std([v['lat'] for v in locs if 'lat' in v])
std_lng = np.std([v['lng'] for v in locs if 'lng' in v])
for delta_lat in range(-sigma, sigma+1):
for delta_lng in range(-sigma, sigma+1):
new_locs = get_instagram_locations(lat + delta_lat * std_lat, lng + delta_lng * std_lng, cookie)
loc_ids = [v['external_id'] for v in locs]
for loc in new_locs:
if loc['external_id'] not in loc_ids:
locs.append(loc)
loc_ids = {v['external_id'] for v in locs if "external_id" in v}
std_lat = pstdev([v['lat'] for v in locs if 'lat' in v])
std_lng = pstdev([v['lng'] for v in locs if 'lng' in v])
# filter to avoid calling with both lat and lng deltas equal zero (which would duplicate the call
# to obtain the initial loc)
deltas = (
(
lat + delta_lat * std_lat,
lng + delta_lng * std_lng
) for delta_lat, delta_lng in filter(lambda x: any(x), product(range(-sigma, sigma + 1), repeat=2))
)
# to change args order for convenient unpacking
insta_loc_func = lambda ckie, lt, ln: get_instagram_locations(lt, ln, ckie)
with ThreadPoolExecutor() as ex:
results = ex.map(lambda x: insta_loc_func(cookie, *x), deltas)
for new_locs in results:
for loc in new_locs:
if 'external_id' in loc and loc['external_id'] not in loc_ids:
locs.append(loc)
loc_ids.add(loc["external_id"])
return locs
# converts list of instagram locations into valid geojson
def make_geojson(locations):
features = []
@@ -50,12 +90,13 @@ def make_geojson(locations):
"geometry": {
"type": "Point",
"coordinates": [location["lng"], location["lat"]]
},
},
"properties": location}
features.append(feature)
return {"type": "FeatureCollection", "features": features}
def encode_date(date_str: str):
'''Convert date into Instagram "snowflake" ID'''
try:
@@ -67,12 +108,13 @@ def encode_date(date_str: str):
print('Unable to parse date. Please use format "yyyy-mm-dd".', file=sys.stderr)
sys.exit(1)
date = date.replace(hour=23, minute=59, second=59, tzinfo=timezone.utc)
date_ts = int(date.timestamp()) * 1000 # milliseconds
date_ts = int(date.timestamp()) * 1000 # milliseconds
insta_epoch = date_ts - 1314220021300
max_id_num = insta_epoch << 23
return str(max_id_num)
html_template = '''<html>
<head>
<title>Instagram location visualizations</title>
@@ -137,6 +179,7 @@ html_template = '''<html>
</body>
</html>'''
def main():
parser = argparse.ArgumentParser(description="Get a list of Instagram locations near a lat/lng")
parser.add_argument("--session", action="store", dest="session")
@@ -159,13 +202,13 @@ def main():
locations = get_fuzzy_locations(float(args.lat), float(args.lng), cookie)
if (args.output):
if args.output:
json.dump(locations, open(args.output, 'w'))
if (args.geojson):
if args.geojson:
json.dump(make_geojson(locations), open(args.geojson, 'w'))
if (args.map):
if args.map:
s = Template(html_template)
viz = s.substitute(lat=args.lat, lng=args.lng, locs=json.dumps(make_geojson(locations)), date_var=date_var)
@@ -173,15 +216,25 @@ def main():
f.write(viz)
f.close()
if (args.csv):
df = pd.DataFrame(locations)
df['url'] = df['external_id'].apply(lambda v: 'https://www.instagram.com/explore/locations/' + str(v) + date_var)
df.to_csv(args.csv)
if (args.dump_ids):
ids = map(lambda loc: str(loc['external_id']), locations)
with open(args.dump_ids, 'w') as f:
f.write('\n'.join(ids))
if args.csv:
for i in locations:
i["url"] = f"https://www.instagram.com/explore/locations/{i['external_id']}{date_var}"
# leading empty string for 'id' column is for backward compatibility since that's the pandas behavior.
fieldnames = ['', 'name', 'external_id', 'external_id_source', 'lat', 'lng', 'address', 'minimum_age', 'url']
with open(args.csv, "w") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for idx, row in enumerate(locations):
row[''] = idx
writer.writerow(row)
if args.dump_ids:
ids = map(lambda loc: str(loc['external_id']), locations)
with open(args.dump_ids, 'w') as f:
f.write('\n'.join(ids))
if __name__ == "__main__":
main()