From b3e8de3b5a280d4695c57ae8c63c1f45deb126ce Mon Sep 17 00:00:00 2001 From: Ivan Kazakov Date: Sat, 26 Mar 2022 19:07:16 +0300 Subject: [PATCH 1/3] Remove pd/np dependencies, add exception handling, call requests concurrently --- instagram-locations.py | 121 +++++++++++++++++++++++++++++------------ 1 file changed, 87 insertions(+), 34 deletions(-) diff --git a/instagram-locations.py b/instagram-locations.py index 2b47f45..0ff4e71 100644 --- a/instagram-locations.py +++ b/instagram-locations.py @@ -1,45 +1,85 @@ import requests -import numpy as np -import pandas as pd import argparse import json from string import Template from datetime import datetime, timezone import sys +from statistics import pstdev +from itertools import product +import csv +from concurrent.futures import ThreadPoolExecutor + # gets instagram "locations" around a particular lat/lng using internal API # (requires session cookie for authentication) def get_instagram_locations(lat, lng, cookie): - locs = requests.get("https://www.instagram.com/location_search/?latitude=" + str(lat) + "&longitude=" + str(lng) + "&__a=1", headers={ - 'Cookie': cookie - }).json() - return locs['venues'] + timeout = 5.0 + lat_long = f"lat: {lat:.6f} | lng: {lng:.6f}" + url = "https://www.instagram.com/location_search/" + params = {"latitude": lat, "longitude": lng, "__a": 1} + headers = {'Cookie': cookie} + try: + response = requests.get(url, params=params, headers=headers, timeout=timeout) + except requests.exceptions.ConnectionError as e: + print(f"Connection failed for {lat_long}: {e}") + return [] + except requests.exceptions.Timeout: + print(f"Connections timed out after {timeout} seconds") + return [] + + try: + locations = response.json() + except json.JSONDecodeError: + print(f"Failed to get location data for {lat_long}") + return [] + + if not isinstance(locations, dict): + print(f"Got invalid response for {lat_long}") + return [] + + locations = locations.get("venues", []) + return locations def get_instagram_locations_by_query(query): locs = requests.get("https://www.instagram.com/web/search/topsearch/?context=place&query=" + query).json() - + return [v['place']['location'] for v in locs['places']] + # queries the instagram location API for several points around a central lat/lng # in order to return additional results def get_fuzzy_locations(lat, lng, cookie, sigma=2): locs = get_instagram_locations(lat, lng, cookie) - - std_lat = np.std([v['lat'] for v in locs if 'lat' in v]) - std_lng = np.std([v['lng'] for v in locs if 'lng' in v]) - - for delta_lat in range(-sigma, sigma+1): - for delta_lng in range(-sigma, sigma+1): - new_locs = get_instagram_locations(lat + delta_lat * std_lat, lng + delta_lng * std_lng, cookie) - loc_ids = [v['external_id'] for v in locs] - - for loc in new_locs: - if loc['external_id'] not in loc_ids: - locs.append(loc) - + loc_ids = {v['external_id'] for v in locs if "external_id" in v} + + std_lat = pstdev([v['lat'] for v in locs if 'lat' in v]) + std_lng = pstdev([v['lng'] for v in locs if 'lng' in v]) + + # filter to avoid calling with both lat and lng deltas equal zero (which would duplicate the call + # to obtain the initial loc) + deltas = ( + ( + lat + delta_lat * std_lat, + lng + delta_lng * std_lng + ) for delta_lat, delta_lng in filter(lambda x: any(x), product(range(-sigma, sigma + 1), repeat=2)) + ) + + # to change args order for convenient unpacking + insta_loc_func = lambda ckie, lt, ln: get_instagram_locations(lt, ln, ckie) + + with ThreadPoolExecutor() as ex: + results = ex.map(lambda x: insta_loc_func(cookie, *x), deltas) + + for new_locs in results: + for loc in new_locs: + if 'external_id' in loc and loc['external_id'] not in loc_ids: + locs.append(loc) + loc_ids.add(loc["external_id"]) + return locs + # converts list of instagram locations into valid geojson def make_geojson(locations): features = [] @@ -50,12 +90,13 @@ def make_geojson(locations): "geometry": { "type": "Point", "coordinates": [location["lng"], location["lat"]] - }, + }, "properties": location} features.append(feature) return {"type": "FeatureCollection", "features": features} + def encode_date(date_str: str): '''Convert date into Instagram "snowflake" ID''' try: @@ -67,12 +108,13 @@ def encode_date(date_str: str): print('Unable to parse date. Please use format "yyyy-mm-dd".', file=sys.stderr) sys.exit(1) date = date.replace(hour=23, minute=59, second=59, tzinfo=timezone.utc) - date_ts = int(date.timestamp()) * 1000 # milliseconds + date_ts = int(date.timestamp()) * 1000 # milliseconds insta_epoch = date_ts - 1314220021300 max_id_num = insta_epoch << 23 return str(max_id_num) + html_template = ''' Instagram location visualizations @@ -137,6 +179,7 @@ html_template = ''' ''' + def main(): parser = argparse.ArgumentParser(description="Get a list of Instagram locations near a lat/lng") parser.add_argument("--session", action="store", dest="session") @@ -159,13 +202,13 @@ def main(): locations = get_fuzzy_locations(float(args.lat), float(args.lng), cookie) - if (args.output): + if args.output: json.dump(locations, open(args.output, 'w')) - if (args.geojson): + if args.geojson: json.dump(make_geojson(locations), open(args.geojson, 'w')) - if (args.map): + if args.map: s = Template(html_template) viz = s.substitute(lat=args.lat, lng=args.lng, locs=json.dumps(make_geojson(locations)), date_var=date_var) @@ -173,15 +216,25 @@ def main(): f.write(viz) f.close() - if (args.csv): - df = pd.DataFrame(locations) - df['url'] = df['external_id'].apply(lambda v: 'https://www.instagram.com/explore/locations/' + str(v) + date_var) - df.to_csv(args.csv) - - if (args.dump_ids): - ids = map(lambda loc: str(loc['external_id']), locations) - with open(args.dump_ids, 'w') as f: - f.write('\n'.join(ids)) + if args.csv: + for i in locations: + i["url"] = f"https://www.instagram.com/explore/locations/{i['external_id']}{date_var}" + + # leading empty string for 'id' column is for backward compatibility since that's the pandas behavior. + fieldnames = ['', 'name', 'external_id', 'external_id_source', 'lat', 'lng', 'address', 'minimum_age', 'url'] + + with open(args.csv, "w") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + for idx, row in enumerate(locations): + row[''] = idx + writer.writerow(row) + + if args.dump_ids: + ids = map(lambda loc: str(loc['external_id']), locations) + with open(args.dump_ids, 'w') as f: + f.write('\n'.join(ids)) + if __name__ == "__main__": main() From 04d92faf97035a90e15dfe02a4da4a5d99e63645 Mon Sep 17 00:00:00 2001 From: Ivan Kazakov Date: Sat, 26 Mar 2022 19:45:52 +0300 Subject: [PATCH 2/3] Blackify --- instagram-locations.py | 75 ++++++++++++++++++++---------------------- 1 file changed, 36 insertions(+), 39 deletions(-) diff --git a/instagram-locations.py b/instagram-locations.py index 0ff4e71..9a65407 100644 --- a/instagram-locations.py +++ b/instagram-locations.py @@ -1,13 +1,14 @@ -import requests import argparse -import json -from string import Template -from datetime import datetime, timezone -import sys -from statistics import pstdev -from itertools import product import csv +import json +import sys from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timezone +from itertools import product +from statistics import pstdev +from string import Template + +import requests # gets instagram "locations" around a particular lat/lng using internal API @@ -17,7 +18,7 @@ def get_instagram_locations(lat, lng, cookie): lat_long = f"lat: {lat:.6f} | lng: {lng:.6f}" url = "https://www.instagram.com/location_search/" params = {"latitude": lat, "longitude": lng, "__a": 1} - headers = {'Cookie': cookie} + headers = {"Cookie": cookie} try: response = requests.get(url, params=params, headers=headers, timeout=timeout) except requests.exceptions.ConnectionError as e: @@ -44,25 +45,23 @@ def get_instagram_locations(lat, lng, cookie): def get_instagram_locations_by_query(query): locs = requests.get("https://www.instagram.com/web/search/topsearch/?context=place&query=" + query).json() - return [v['place']['location'] for v in locs['places']] + return [v["place"]["location"] for v in locs["places"]] # queries the instagram location API for several points around a central lat/lng # in order to return additional results def get_fuzzy_locations(lat, lng, cookie, sigma=2): locs = get_instagram_locations(lat, lng, cookie) - loc_ids = {v['external_id'] for v in locs if "external_id" in v} + loc_ids = {v["external_id"] for v in locs if "external_id" in v} - std_lat = pstdev([v['lat'] for v in locs if 'lat' in v]) - std_lng = pstdev([v['lng'] for v in locs if 'lng' in v]) + std_lat = pstdev([v["lat"] for v in locs if "lat" in v]) + std_lng = pstdev([v["lng"] for v in locs if "lng" in v]) # filter to avoid calling with both lat and lng deltas equal zero (which would duplicate the call # to obtain the initial loc) deltas = ( - ( - lat + delta_lat * std_lat, - lng + delta_lng * std_lng - ) for delta_lat, delta_lng in filter(lambda x: any(x), product(range(-sigma, sigma + 1), repeat=2)) + (lat + delta_lat * std_lat, lng + delta_lng * std_lng) + for delta_lat, delta_lng in filter(lambda x: any(x), product(range(-sigma, sigma + 1), repeat=2)) ) # to change args order for convenient unpacking @@ -73,7 +72,7 @@ def get_fuzzy_locations(lat, lng, cookie, sigma=2): for new_locs in results: for loc in new_locs: - if 'external_id' in loc and loc['external_id'] not in loc_ids: + if "external_id" in loc and loc["external_id"] not in loc_ids: locs.append(loc) loc_ids.add(loc["external_id"]) @@ -84,26 +83,24 @@ def get_fuzzy_locations(lat, lng, cookie, sigma=2): def make_geojson(locations): features = [] - for location in [location for location in locations if 'lng' in location]: + for location in [location for location in locations if "lng" in location]: feature = { "type": "Feature", - "geometry": { - "type": "Point", - "coordinates": [location["lng"], location["lat"]] - }, - "properties": location} + "geometry": {"type": "Point", "coordinates": [location["lng"], location["lat"]]}, + "properties": location, + } features.append(feature) return {"type": "FeatureCollection", "features": features} def encode_date(date_str: str): - '''Convert date into Instagram "snowflake" ID''' + """Convert date into Instagram "snowflake" ID""" try: - date = datetime.strptime(date_str, '%Y-%m-%d') + date = datetime.strptime(date_str, "%Y-%m-%d") except ValueError: try: - date = datetime.strptime(date_str, '%Y-%m-%d') + date = datetime.strptime(date_str, "%Y-%m-%d") except ValueError: print('Unable to parse date. Please use format "yyyy-mm-dd".', file=sys.stderr) sys.exit(1) @@ -115,7 +112,7 @@ def encode_date(date_str: str): return str(max_id_num) -html_template = ''' +html_template = """ Instagram location visualizations @@ -177,7 +174,7 @@ html_template = ''' centerMarker._icon.classList.add('selected-location'); -''' +""" def main(): @@ -194,25 +191,25 @@ def main(): args = parser.parse_args() - cookie = 'sessionid=' + args.session + cookie = "sessionid=" + args.session - date_var = '' + date_var = "" if args.date is not None: - date_var = '?max_id=' + encode_date(args.date) + date_var = "?max_id=" + encode_date(args.date) locations = get_fuzzy_locations(float(args.lat), float(args.lng), cookie) if args.output: - json.dump(locations, open(args.output, 'w')) + json.dump(locations, open(args.output, "w")) if args.geojson: - json.dump(make_geojson(locations), open(args.geojson, 'w')) + json.dump(make_geojson(locations), open(args.geojson, "w")) if args.map: s = Template(html_template) viz = s.substitute(lat=args.lat, lng=args.lng, locs=json.dumps(make_geojson(locations)), date_var=date_var) - f = open(args.map, 'w') + f = open(args.map, "w") f.write(viz) f.close() @@ -221,19 +218,19 @@ def main(): i["url"] = f"https://www.instagram.com/explore/locations/{i['external_id']}{date_var}" # leading empty string for 'id' column is for backward compatibility since that's the pandas behavior. - fieldnames = ['', 'name', 'external_id', 'external_id_source', 'lat', 'lng', 'address', 'minimum_age', 'url'] + fieldnames = ["", "name", "external_id", "external_id_source", "lat", "lng", "address", "minimum_age", "url"] with open(args.csv, "w") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for idx, row in enumerate(locations): - row[''] = idx + row[""] = idx writer.writerow(row) if args.dump_ids: - ids = map(lambda loc: str(loc['external_id']), locations) - with open(args.dump_ids, 'w') as f: - f.write('\n'.join(ids)) + ids = map(lambda loc: str(loc["external_id"]), locations) + with open(args.dump_ids, "w") as f: + f.write("\n".join(ids)) if __name__ == "__main__": From 1616ebcb35e790413dc02a0fe42a32b5935a9a4a Mon Sep 17 00:00:00 2001 From: Ivan Kazakov Date: Sat, 26 Mar 2022 19:50:22 +0300 Subject: [PATCH 3/3] Update readme to reflect that no numpy/pandas required --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4be6301..2894a55 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ ## Prerequisites -This Python application requires `requests`, `numpy`, and `pandas` to be properly installed. This can be done with `pip3 install requests numpy pandas`. +This Python application requires `requests` to be properly installed. This can be done with `pip3 install requests`. ## Example usage