Merge pull request #6 from antevis/no_deps

Remove dependency on numpy and pandas
This commit is contained in:
Logan Williams
2022-07-29 12:58:57 +02:00
committed by GitHub
2 changed files with 104 additions and 54 deletions

View File

@@ -2,7 +2,7 @@
## Prerequisites
This Python application requires `requests`, `numpy`, and `pandas` to be properly installed. This can be done with `pip3 install requests numpy pandas`.
This Python application requires `requests` to be properly installed. This can be done with `pip3 install requests`.
## Example usage

View File

@@ -1,79 +1,118 @@
import requests
import numpy as np
import pandas as pd
import argparse
import csv
import json
from string import Template
from datetime import datetime, timezone
import sys
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from itertools import product
from statistics import pstdev
from string import Template
import requests
# gets instagram "locations" around a particular lat/lng using internal API
# (requires session cookie for authentication)
def get_instagram_locations(lat, lng, cookie):
locs = requests.get("https://www.instagram.com/location_search/?latitude=" + str(lat) + "&longitude=" + str(lng) + "&__a=1", headers={
'Cookie': cookie
}).json()
return locs['venues']
timeout = 5.0
lat_long = f"lat: {lat:.6f} | lng: {lng:.6f}"
url = "https://www.instagram.com/location_search/"
params = {"latitude": lat, "longitude": lng, "__a": 1}
headers = {"Cookie": cookie}
try:
response = requests.get(url, params=params, headers=headers, timeout=timeout)
except requests.exceptions.ConnectionError as e:
print(f"Connection failed for {lat_long}: {e}")
return []
except requests.exceptions.Timeout:
print(f"Connections timed out after {timeout} seconds")
return []
try:
locations = response.json()
except json.JSONDecodeError:
print(f"Failed to get location data for {lat_long}")
return []
if not isinstance(locations, dict):
print(f"Got invalid response for {lat_long}")
return []
locations = locations.get("venues", [])
return locations
def get_instagram_locations_by_query(query):
locs = requests.get("https://www.instagram.com/web/search/topsearch/?context=place&query=" + query).json()
return [v['place']['location'] for v in locs['places']]
return [v["place"]["location"] for v in locs["places"]]
# queries the instagram location API for several points around a central lat/lng
# in order to return additional results
def get_fuzzy_locations(lat, lng, cookie, sigma=2):
locs = get_instagram_locations(lat, lng, cookie)
std_lat = np.std([v['lat'] for v in locs if 'lat' in v])
std_lng = np.std([v['lng'] for v in locs if 'lng' in v])
for delta_lat in range(-sigma, sigma+1):
for delta_lng in range(-sigma, sigma+1):
new_locs = get_instagram_locations(lat + delta_lat * std_lat, lng + delta_lng * std_lng, cookie)
loc_ids = [v['external_id'] for v in locs]
for loc in new_locs:
if loc['external_id'] not in loc_ids:
locs.append(loc)
loc_ids = {v["external_id"] for v in locs if "external_id" in v}
std_lat = pstdev([v["lat"] for v in locs if "lat" in v])
std_lng = pstdev([v["lng"] for v in locs if "lng" in v])
# filter to avoid calling with both lat and lng deltas equal zero (which would duplicate the call
# to obtain the initial loc)
deltas = (
(lat + delta_lat * std_lat, lng + delta_lng * std_lng)
for delta_lat, delta_lng in filter(lambda x: any(x), product(range(-sigma, sigma + 1), repeat=2))
)
# to change args order for convenient unpacking
insta_loc_func = lambda ckie, lt, ln: get_instagram_locations(lt, ln, ckie)
with ThreadPoolExecutor() as ex:
results = ex.map(lambda x: insta_loc_func(cookie, *x), deltas)
for new_locs in results:
for loc in new_locs:
if "external_id" in loc and loc["external_id"] not in loc_ids:
locs.append(loc)
loc_ids.add(loc["external_id"])
return locs
# converts list of instagram locations into valid geojson
def make_geojson(locations):
features = []
for location in [location for location in locations if 'lng' in location]:
for location in [location for location in locations if "lng" in location]:
feature = {
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [location["lng"], location["lat"]]
},
"properties": location}
"geometry": {"type": "Point", "coordinates": [location["lng"], location["lat"]]},
"properties": location,
}
features.append(feature)
return {"type": "FeatureCollection", "features": features}
def encode_date(date_str: str):
'''Convert date into Instagram "snowflake" ID'''
"""Convert date into Instagram "snowflake" ID"""
try:
date = datetime.strptime(date_str, '%Y-%m-%d')
date = datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
try:
date = datetime.strptime(date_str, '%Y-%m-%d')
date = datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
print('Unable to parse date. Please use format "yyyy-mm-dd".', file=sys.stderr)
sys.exit(1)
date = date.replace(hour=23, minute=59, second=59, tzinfo=timezone.utc)
date_ts = int(date.timestamp()) * 1000 # milliseconds
date_ts = int(date.timestamp()) * 1000 # milliseconds
insta_epoch = date_ts - 1314220021300
max_id_num = insta_epoch << 23
return str(max_id_num)
html_template = '''<html>
html_template = """<html>
<head>
<title>Instagram location visualizations</title>
@@ -135,7 +174,8 @@ html_template = '''<html>
centerMarker._icon.classList.add('selected-location');
</script>
</body>
</html>'''
</html>"""
def main():
parser = argparse.ArgumentParser(description="Get a list of Instagram locations near a lat/lng")
@@ -153,35 +193,45 @@ def main():
cookie = args.cookie
date_var = ''
date_var = ""
if args.date is not None:
date_var = '?max_id=' + encode_date(args.date)
date_var = "?max_id=" + encode_date(args.date)
locations = get_fuzzy_locations(float(args.lat), float(args.lng), cookie)
if (args.output):
json.dump(locations, open(args.output, 'w'))
if args.output:
json.dump(locations, open(args.output, "w"))
if (args.geojson):
json.dump(make_geojson(locations), open(args.geojson, 'w'))
if args.geojson:
json.dump(make_geojson(locations), open(args.geojson, "w"))
if (args.map):
if args.map:
s = Template(html_template)
viz = s.substitute(lat=args.lat, lng=args.lng, locs=json.dumps(make_geojson(locations)), date_var=date_var)
f = open(args.map, 'w')
f = open(args.map, "w")
f.write(viz)
f.close()
if (args.csv):
df = pd.DataFrame(locations)
df['url'] = df['external_id'].apply(lambda v: 'https://www.instagram.com/explore/locations/' + str(v) + date_var)
df.to_csv(args.csv)
if (args.dump_ids):
ids = map(lambda loc: str(loc['external_id']), locations)
with open(args.dump_ids, 'w') as f:
f.write('\n'.join(ids))
if args.csv:
for i in locations:
i["url"] = f"https://www.instagram.com/explore/locations/{i['external_id']}{date_var}"
# leading empty string for 'id' column is for backward compatibility since that's the pandas behavior.
fieldnames = ["", "name", "external_id", "external_id_source", "lat", "lng", "address", "minimum_age", "url"]
with open(args.csv, "w") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for idx, row in enumerate(locations):
row[""] = idx
writer.writerow(row)
if args.dump_ids:
ids = map(lambda loc: str(loc["external_id"]), locations)
with open(args.dump_ids, "w") as f:
f.write("\n".join(ids))
if __name__ == "__main__":
main()