diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..a4a85a8 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +# syntax=docker/dockerfile:1 + +FROM python:latest + +WORKDIR /app + +COPY . . + +RUN pip install --upgrade pip && pip install build && python -m build && pip install dist/*.whl + +ENTRYPOINT ["instagram_locations"] \ No newline at end of file diff --git a/instagram_locations/__init__.py b/instagram_locations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/instagram_locations/instagram_locations.py b/instagram_locations/instagram_locations.py new file mode 100644 index 0000000..3029251 --- /dev/null +++ b/instagram_locations/instagram_locations.py @@ -0,0 +1,234 @@ +import argparse +import csv +import json +import sys +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timezone +from itertools import product +from statistics import pstdev +from string import Template + +import requests + + +# gets instagram "locations" around a particular lat/lng using internal API +# (requires session cookie for authentication) +def get_instagram_locations(lat, lng, cookie): + timeout = 5.0 + lat_long = f"lat: {lat:.6f} | lng: {lng:.6f}" + url = "https://www.instagram.com/location_search/" + params = {"latitude": lat, "longitude": lng, "__a": 1} + headers = {"Cookie": cookie} + try: + response = requests.get(url, params=params, headers=headers, timeout=timeout) + except requests.exceptions.ConnectionError as e: + print(f"Connection failed for {lat_long}: {e}") + return [] + except requests.exceptions.Timeout: + print(f"Connections timed out after {timeout} seconds") + return [] + + try: + locations = response.json() + except json.JSONDecodeError: + print(f"Failed to get location data for {lat_long}") + return [] + + if not isinstance(locations, dict): + print(f"Got invalid response for {lat_long}") + return [] + + locations = locations.get("venues", []) + return locations + + +def get_instagram_locations_by_query(query): + locs = requests.get("https://www.instagram.com/web/search/topsearch/?context=place&query=" + query).json() + + return [v["place"]["location"] for v in locs["places"]] + + +# queries the instagram location API for several points around a central lat/lng +# in order to return additional results +def get_fuzzy_locations(lat, lng, cookie, sigma=2): + locs = get_instagram_locations(lat, lng, cookie) + loc_ids = {v["external_id"] for v in locs if "external_id" in v} + + std_lat = pstdev([v["lat"] for v in locs if "lat" in v]) + std_lng = pstdev([v["lng"] for v in locs if "lng" in v]) + + # filter to avoid calling with both lat and lng deltas equal zero (which would duplicate the call + # to obtain the initial loc) + deltas = ( + (lat + delta_lat * std_lat, lng + delta_lng * std_lng) + for delta_lat, delta_lng in filter(lambda x: any(x), product(range(-sigma, sigma + 1), repeat=2)) + ) + + # to change args order for convenient unpacking + insta_loc_func = lambda ckie, lt, ln: get_instagram_locations(lt, ln, ckie) + + with ThreadPoolExecutor() as ex: + results = ex.map(lambda x: insta_loc_func(cookie, *x), deltas) + + for new_locs in results: + for loc in new_locs: + if "external_id" in loc and loc["external_id"] not in loc_ids: + locs.append(loc) + loc_ids.add(loc["external_id"]) + + return locs + + +# converts list of instagram locations into valid geojson +def make_geojson(locations): + features = [] + + for location in [location for location in locations if "lng" in location]: + feature = { + "type": "Feature", + "geometry": {"type": "Point", "coordinates": [location["lng"], location["lat"]]}, + "properties": location, + } + features.append(feature) + + return {"type": "FeatureCollection", "features": features} + + +def encode_date(date_str: str): + """Convert date into Instagram "snowflake" ID""" + try: + date = datetime.strptime(date_str, "%Y-%m-%d") + except ValueError: + try: + date = datetime.strptime(date_str, "%Y-%m-%d") + except ValueError: + print('Unable to parse date. Please use format "yyyy-mm-dd".', file=sys.stderr) + sys.exit(1) + date = date.replace(hour=23, minute=59, second=59, tzinfo=timezone.utc) + date_ts = int(date.timestamp()) * 1000 # milliseconds + insta_epoch = date_ts - 1314220021300 + max_id_num = insta_epoch << 23 + + return str(max_id_num) + + +html_template = """ + + Instagram location visualizations + + + + + + + + + + +
+ + + +""" + + +def main(): + parser = argparse.ArgumentParser(description="Get a list of Instagram locations near a lat/lng") + parser.add_argument("--cookie", action="store", dest="cookie") + parser.add_argument("--json", action="store", dest="output") + parser.add_argument("--geojson", action="store", dest="geojson") + parser.add_argument("--map", action="store", dest="map") + parser.add_argument("--csv", action="store", dest="csv") + parser.add_argument("--lat", action="store", dest="lat") + parser.add_argument("--lng", action="store", dest="lng") + parser.add_argument("--date", action="store", dest="date") + parser.add_argument("--ids", action="store", dest="dump_ids") + + args = parser.parse_args() + + cookie = args.cookie + + date_var = "" + if args.date is not None: + date_var = "?max_id=" + encode_date(args.date) + + locations = get_fuzzy_locations(float(args.lat), float(args.lng), cookie) + + if args.output: + json.dump(locations, open(args.output, "w")) + + if args.geojson: + json.dump(make_geojson(locations), open(args.geojson, "w")) + + if args.map: + s = Template(html_template) + viz = s.substitute(lat=args.lat, lng=args.lng, locs=json.dumps(make_geojson(locations)), date_var=date_var) + + f = open(args.map, "w") + f.write(viz) + f.close() + + if args.csv: + for i in locations: + i["url"] = f"https://www.instagram.com/explore/locations/{i['external_id']}{date_var}" + + # leading empty string for 'id' column is for backward compatibility since that's the pandas behavior. + fieldnames = ["", "name", "external_id", "external_id_source", "lat", "lng", "address", "minimum_age", "url"] + + with open(args.csv, "w") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + for idx, row in enumerate(locations): + row[""] = idx + writer.writerow(row) + + if args.dump_ids: + ids = map(lambda loc: str(loc["external_id"]), locations) + with open(args.dump_ids, "w") as f: + f.write("\n".join(ids)) + \ No newline at end of file diff --git a/instagram_locations/main.py b/instagram_locations/main.py new file mode 100644 index 0000000..d571463 --- /dev/null +++ b/instagram_locations/main.py @@ -0,0 +1,10 @@ +from instagram_locations.instagram_locations import main + + +def start(): + try: + main() + except KeyboardInterrupt as ctrlc: + raise KeyboardInterrupt(ctrlc) from ctrlc + except Exception as err: + raise Exception(err) from err diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..b78bf31 --- /dev/null +++ b/setup.py @@ -0,0 +1,29 @@ +import setuptools + +with open("README.md", "r", encoding="utf-8") as file: + long_description = file.read() + +setuptools.setup( + name="instagram-location-search", + version="1.0.0", + author="Bellingcat", + packages=["instagram_locations"], + description="Finds Instagram location IDs near a specified latitude and longitude.", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://www.bellingcat.com", + license="MIT License", + install_requires=["requests"], + classifiers=[ + 'Intended Audience :: Information Technology', + 'License :: OSI Approved :: MIT License', + 'Operating System :: OS Independent', + 'Natural Language :: English', + 'Programming Language :: Python :: 3' + ], + entry_points={ + "console_scripts": [ + "instagram_locations=instagram_locations.main:start", + ] + }, +) \ No newline at end of file