diff --git a/instagram-locations.py b/instagram-locations.py index 2b47f45..0ff4e71 100644 --- a/instagram-locations.py +++ b/instagram-locations.py @@ -1,45 +1,85 @@ import requests -import numpy as np -import pandas as pd import argparse import json from string import Template from datetime import datetime, timezone import sys +from statistics import pstdev +from itertools import product +import csv +from concurrent.futures import ThreadPoolExecutor + # gets instagram "locations" around a particular lat/lng using internal API # (requires session cookie for authentication) def get_instagram_locations(lat, lng, cookie): - locs = requests.get("https://www.instagram.com/location_search/?latitude=" + str(lat) + "&longitude=" + str(lng) + "&__a=1", headers={ - 'Cookie': cookie - }).json() - return locs['venues'] + timeout = 5.0 + lat_long = f"lat: {lat:.6f} | lng: {lng:.6f}" + url = "https://www.instagram.com/location_search/" + params = {"latitude": lat, "longitude": lng, "__a": 1} + headers = {'Cookie': cookie} + try: + response = requests.get(url, params=params, headers=headers, timeout=timeout) + except requests.exceptions.ConnectionError as e: + print(f"Connection failed for {lat_long}: {e}") + return [] + except requests.exceptions.Timeout: + print(f"Connections timed out after {timeout} seconds") + return [] + + try: + locations = response.json() + except json.JSONDecodeError: + print(f"Failed to get location data for {lat_long}") + return [] + + if not isinstance(locations, dict): + print(f"Got invalid response for {lat_long}") + return [] + + locations = locations.get("venues", []) + return locations def get_instagram_locations_by_query(query): locs = requests.get("https://www.instagram.com/web/search/topsearch/?context=place&query=" + query).json() - + return [v['place']['location'] for v in locs['places']] + # queries the instagram location API for several points around a central lat/lng # in order to return additional results def get_fuzzy_locations(lat, lng, cookie, sigma=2): locs = get_instagram_locations(lat, lng, cookie) - - std_lat = np.std([v['lat'] for v in locs if 'lat' in v]) - std_lng = np.std([v['lng'] for v in locs if 'lng' in v]) - - for delta_lat in range(-sigma, sigma+1): - for delta_lng in range(-sigma, sigma+1): - new_locs = get_instagram_locations(lat + delta_lat * std_lat, lng + delta_lng * std_lng, cookie) - loc_ids = [v['external_id'] for v in locs] - - for loc in new_locs: - if loc['external_id'] not in loc_ids: - locs.append(loc) - + loc_ids = {v['external_id'] for v in locs if "external_id" in v} + + std_lat = pstdev([v['lat'] for v in locs if 'lat' in v]) + std_lng = pstdev([v['lng'] for v in locs if 'lng' in v]) + + # filter to avoid calling with both lat and lng deltas equal zero (which would duplicate the call + # to obtain the initial loc) + deltas = ( + ( + lat + delta_lat * std_lat, + lng + delta_lng * std_lng + ) for delta_lat, delta_lng in filter(lambda x: any(x), product(range(-sigma, sigma + 1), repeat=2)) + ) + + # to change args order for convenient unpacking + insta_loc_func = lambda ckie, lt, ln: get_instagram_locations(lt, ln, ckie) + + with ThreadPoolExecutor() as ex: + results = ex.map(lambda x: insta_loc_func(cookie, *x), deltas) + + for new_locs in results: + for loc in new_locs: + if 'external_id' in loc and loc['external_id'] not in loc_ids: + locs.append(loc) + loc_ids.add(loc["external_id"]) + return locs + # converts list of instagram locations into valid geojson def make_geojson(locations): features = [] @@ -50,12 +90,13 @@ def make_geojson(locations): "geometry": { "type": "Point", "coordinates": [location["lng"], location["lat"]] - }, + }, "properties": location} features.append(feature) return {"type": "FeatureCollection", "features": features} + def encode_date(date_str: str): '''Convert date into Instagram "snowflake" ID''' try: @@ -67,12 +108,13 @@ def encode_date(date_str: str): print('Unable to parse date. Please use format "yyyy-mm-dd".', file=sys.stderr) sys.exit(1) date = date.replace(hour=23, minute=59, second=59, tzinfo=timezone.utc) - date_ts = int(date.timestamp()) * 1000 # milliseconds + date_ts = int(date.timestamp()) * 1000 # milliseconds insta_epoch = date_ts - 1314220021300 max_id_num = insta_epoch << 23 return str(max_id_num) + html_template = ''' Instagram location visualizations @@ -137,6 +179,7 @@ html_template = ''' ''' + def main(): parser = argparse.ArgumentParser(description="Get a list of Instagram locations near a lat/lng") parser.add_argument("--session", action="store", dest="session") @@ -159,13 +202,13 @@ def main(): locations = get_fuzzy_locations(float(args.lat), float(args.lng), cookie) - if (args.output): + if args.output: json.dump(locations, open(args.output, 'w')) - if (args.geojson): + if args.geojson: json.dump(make_geojson(locations), open(args.geojson, 'w')) - if (args.map): + if args.map: s = Template(html_template) viz = s.substitute(lat=args.lat, lng=args.lng, locs=json.dumps(make_geojson(locations)), date_var=date_var) @@ -173,15 +216,25 @@ def main(): f.write(viz) f.close() - if (args.csv): - df = pd.DataFrame(locations) - df['url'] = df['external_id'].apply(lambda v: 'https://www.instagram.com/explore/locations/' + str(v) + date_var) - df.to_csv(args.csv) - - if (args.dump_ids): - ids = map(lambda loc: str(loc['external_id']), locations) - with open(args.dump_ids, 'w') as f: - f.write('\n'.join(ids)) + if args.csv: + for i in locations: + i["url"] = f"https://www.instagram.com/explore/locations/{i['external_id']}{date_var}" + + # leading empty string for 'id' column is for backward compatibility since that's the pandas behavior. + fieldnames = ['', 'name', 'external_id', 'external_id_source', 'lat', 'lng', 'address', 'minimum_age', 'url'] + + with open(args.csv, "w") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + for idx, row in enumerate(locations): + row[''] = idx + writer.writerow(row) + + if args.dump_ids: + ids = map(lambda loc: str(loc['external_id']), locations) + with open(args.dump_ids, 'w') as f: + f.write('\n'.join(ids)) + if __name__ == "__main__": main()