diff --git a/README.md b/README.md index dcfc61c..9b44553 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ ## Prerequisites -This Python application requires `requests`, `numpy`, and `pandas` to be properly installed. This can be done with `pip3 install requests numpy pandas`. +This Python application requires `requests` to be properly installed. This can be done with `pip3 install requests`. ## Example usage diff --git a/instagram-locations.py b/instagram-locations.py index af40884..4323bac 100644 --- a/instagram-locations.py +++ b/instagram-locations.py @@ -1,79 +1,118 @@ -import requests -import numpy as np -import pandas as pd import argparse +import csv import json -from string import Template -from datetime import datetime, timezone import sys +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timezone +from itertools import product +from statistics import pstdev +from string import Template + +import requests + # gets instagram "locations" around a particular lat/lng using internal API # (requires session cookie for authentication) def get_instagram_locations(lat, lng, cookie): - locs = requests.get("https://www.instagram.com/location_search/?latitude=" + str(lat) + "&longitude=" + str(lng) + "&__a=1", headers={ - 'Cookie': cookie - }).json() - return locs['venues'] + timeout = 5.0 + lat_long = f"lat: {lat:.6f} | lng: {lng:.6f}" + url = "https://www.instagram.com/location_search/" + params = {"latitude": lat, "longitude": lng, "__a": 1} + headers = {"Cookie": cookie} + try: + response = requests.get(url, params=params, headers=headers, timeout=timeout) + except requests.exceptions.ConnectionError as e: + print(f"Connection failed for {lat_long}: {e}") + return [] + except requests.exceptions.Timeout: + print(f"Connections timed out after {timeout} seconds") + return [] + + try: + locations = response.json() + except json.JSONDecodeError: + print(f"Failed to get location data for {lat_long}") + return [] + + if not isinstance(locations, dict): + print(f"Got invalid response for {lat_long}") + return [] + + locations = locations.get("venues", []) + return locations def get_instagram_locations_by_query(query): locs = requests.get("https://www.instagram.com/web/search/topsearch/?context=place&query=" + query).json() - - return [v['place']['location'] for v in locs['places']] + + return [v["place"]["location"] for v in locs["places"]] + # queries the instagram location API for several points around a central lat/lng # in order to return additional results def get_fuzzy_locations(lat, lng, cookie, sigma=2): locs = get_instagram_locations(lat, lng, cookie) - - std_lat = np.std([v['lat'] for v in locs if 'lat' in v]) - std_lng = np.std([v['lng'] for v in locs if 'lng' in v]) - - for delta_lat in range(-sigma, sigma+1): - for delta_lng in range(-sigma, sigma+1): - new_locs = get_instagram_locations(lat + delta_lat * std_lat, lng + delta_lng * std_lng, cookie) - loc_ids = [v['external_id'] for v in locs] - - for loc in new_locs: - if loc['external_id'] not in loc_ids: - locs.append(loc) - + loc_ids = {v["external_id"] for v in locs if "external_id" in v} + + std_lat = pstdev([v["lat"] for v in locs if "lat" in v]) + std_lng = pstdev([v["lng"] for v in locs if "lng" in v]) + + # filter to avoid calling with both lat and lng deltas equal zero (which would duplicate the call + # to obtain the initial loc) + deltas = ( + (lat + delta_lat * std_lat, lng + delta_lng * std_lng) + for delta_lat, delta_lng in filter(lambda x: any(x), product(range(-sigma, sigma + 1), repeat=2)) + ) + + # to change args order for convenient unpacking + insta_loc_func = lambda ckie, lt, ln: get_instagram_locations(lt, ln, ckie) + + with ThreadPoolExecutor() as ex: + results = ex.map(lambda x: insta_loc_func(cookie, *x), deltas) + + for new_locs in results: + for loc in new_locs: + if "external_id" in loc and loc["external_id"] not in loc_ids: + locs.append(loc) + loc_ids.add(loc["external_id"]) + return locs + # converts list of instagram locations into valid geojson def make_geojson(locations): features = [] - for location in [location for location in locations if 'lng' in location]: + for location in [location for location in locations if "lng" in location]: feature = { "type": "Feature", - "geometry": { - "type": "Point", - "coordinates": [location["lng"], location["lat"]] - }, - "properties": location} + "geometry": {"type": "Point", "coordinates": [location["lng"], location["lat"]]}, + "properties": location, + } features.append(feature) return {"type": "FeatureCollection", "features": features} + def encode_date(date_str: str): - '''Convert date into Instagram "snowflake" ID''' + """Convert date into Instagram "snowflake" ID""" try: - date = datetime.strptime(date_str, '%Y-%m-%d') + date = datetime.strptime(date_str, "%Y-%m-%d") except ValueError: try: - date = datetime.strptime(date_str, '%Y-%m-%d') + date = datetime.strptime(date_str, "%Y-%m-%d") except ValueError: print('Unable to parse date. Please use format "yyyy-mm-dd".', file=sys.stderr) sys.exit(1) date = date.replace(hour=23, minute=59, second=59, tzinfo=timezone.utc) - date_ts = int(date.timestamp()) * 1000 # milliseconds + date_ts = int(date.timestamp()) * 1000 # milliseconds insta_epoch = date_ts - 1314220021300 max_id_num = insta_epoch << 23 return str(max_id_num) -html_template = ''' + +html_template = """ Instagram location visualizations @@ -135,7 +174,8 @@ html_template = ''' centerMarker._icon.classList.add('selected-location'); -''' +""" + def main(): parser = argparse.ArgumentParser(description="Get a list of Instagram locations near a lat/lng") @@ -153,35 +193,45 @@ def main(): cookie = args.cookie - date_var = '' + date_var = "" if args.date is not None: - date_var = '?max_id=' + encode_date(args.date) + date_var = "?max_id=" + encode_date(args.date) locations = get_fuzzy_locations(float(args.lat), float(args.lng), cookie) - if (args.output): - json.dump(locations, open(args.output, 'w')) + if args.output: + json.dump(locations, open(args.output, "w")) - if (args.geojson): - json.dump(make_geojson(locations), open(args.geojson, 'w')) + if args.geojson: + json.dump(make_geojson(locations), open(args.geojson, "w")) - if (args.map): + if args.map: s = Template(html_template) viz = s.substitute(lat=args.lat, lng=args.lng, locs=json.dumps(make_geojson(locations)), date_var=date_var) - f = open(args.map, 'w') + f = open(args.map, "w") f.write(viz) f.close() - if (args.csv): - df = pd.DataFrame(locations) - df['url'] = df['external_id'].apply(lambda v: 'https://www.instagram.com/explore/locations/' + str(v) + date_var) - df.to_csv(args.csv) - - if (args.dump_ids): - ids = map(lambda loc: str(loc['external_id']), locations) - with open(args.dump_ids, 'w') as f: - f.write('\n'.join(ids)) + if args.csv: + for i in locations: + i["url"] = f"https://www.instagram.com/explore/locations/{i['external_id']}{date_var}" + + # leading empty string for 'id' column is for backward compatibility since that's the pandas behavior. + fieldnames = ["", "name", "external_id", "external_id_source", "lat", "lng", "address", "minimum_age", "url"] + + with open(args.csv, "w") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + for idx, row in enumerate(locations): + row[""] = idx + writer.writerow(row) + + if args.dump_ids: + ids = map(lambda loc: str(loc["external_id"]), locations) + with open(args.dump_ids, "w") as f: + f.write("\n".join(ids)) + if __name__ == "__main__": main()