mirror of
https://github.com/bellingcat/sugartrail.git
synced 2026-06-12 13:38:29 +03:00
148 lines
7.3 KiB
Python
148 lines
7.3 KiB
Python
from requests.auth import HTTPBasicAuth
|
|
import requests
|
|
import pandas as pd
|
|
import sys
|
|
from IPython.display import clear_output
|
|
import time
|
|
import collections
|
|
from datetime import datetime
|
|
import math
|
|
access_token = ""
|
|
username = access_token
|
|
password = ""
|
|
size = "5000"
|
|
basic = HTTPBasicAuth(username, password)
|
|
|
|
def get_appointments(officer_id):
|
|
url = "https://api.company-information.service.gov.uk/officers/" + officer_id + "/appointments?size=" + size
|
|
response = requests.get(url, auth=basic)
|
|
# print metadata
|
|
df = pd.DataFrame(response.json()['items'])
|
|
appointments = len(df)
|
|
print(str(appointments) + " appointments")
|
|
print(str(appointments - df["resigned_on"].count()) + " active appointments")
|
|
return response.json()
|
|
|
|
def get_locations(companies, address_type: str):
|
|
df = pd.DataFrame(companies['items'])
|
|
if address_type == "correspondance":
|
|
postcode = [address['postal_code'] for address in df['address']]
|
|
addresses = [address['premises'] + ", " + address['address_line_1'] + ", " + address['locality'] + ", " + address['country'] + ", " + address['postal_code'] for address in df['address']]
|
|
elif address_type == "registered":
|
|
addresses = []
|
|
keys = ["address_line_1","address_line_2","country","locality","postal_code"]
|
|
for link in df['links']:
|
|
url = "https://api.company-information.service.gov.uk" + link['company'] + "/registered-office-address"
|
|
response = requests.get(url, auth=basic)
|
|
address = []
|
|
postcode = []
|
|
for key in keys:
|
|
if key in response.json():
|
|
address += [response.json()[key]]
|
|
if key == "postal_code":
|
|
postcode += [response.json()[key]]
|
|
address = ", ".join(address)
|
|
addresses += [address]
|
|
else:
|
|
print("unrecognised address type: should be either corresponance or registered")
|
|
return None
|
|
postcode_frequency = dict(collections.Counter(postcode).items(), key=lambda item: item[1], reverse=True)
|
|
print(str(len(postcode_frequency)) + " unique postcodes")
|
|
frequency = dict(sorted(collections.Counter(addresses).items(), key=lambda item: item[1], reverse=True))
|
|
print(str(len(frequency)) + " unique " + address_type + " addresses")
|
|
print(frequency)
|
|
return addresses
|
|
|
|
def year_of_creation(companies):
|
|
years = [address['date_of_creation'][0:4] for address in companies]
|
|
frequency = collections.Counter(years)
|
|
return dict(sorted(frequency.items(), key=lambda item: item[1], reverse=True))
|
|
|
|
def age(creation: str, cessation: str):
|
|
delta = datetime.strptime(cessation, "%Y-%m-%d")-datetime.strptime(creation, "%Y-%m-%d")
|
|
return math.floor(delta.days/365)
|
|
|
|
|
|
def get_companies(addresses):
|
|
companies = {}
|
|
companies_summary = {}
|
|
for address in addresses:
|
|
url = "https://api.company-information.service.gov.uk/advanced-search/companies?location=" + address + "&size=" + size
|
|
response = requests.get(url, auth=basic)
|
|
if response.status_code == 200:
|
|
companies[address] = response.json()['items']
|
|
companies_summary[address] = {}
|
|
companies_summary[address]["frequency"] = response.json()['hits']
|
|
all_companies = [address for address in response.json()['items']]
|
|
active_companies = [address for address in response.json()['items'] if address['company_status'] == 'active']
|
|
dead_companies = [address for address in response.json()['items'] if address['company_status'] == 'dissolved']
|
|
companies_summary[address]["active_companies"] = len(active_companies)
|
|
years = year_of_creation(all_companies)
|
|
survival_months = [age(address['date_of_creation'],address['date_of_cessation']) for address in dead_companies]
|
|
survival_frequency = collections.Counter(survival_months)
|
|
survival_frequency = dict(sorted(survival_frequency.items(), key=lambda item: item[1], reverse=True))
|
|
active_years = year_of_creation(active_companies)
|
|
companies_summary[address]["3_years_active"] = {k: active_years[k] for k in list(active_years)[:3]}
|
|
companies_summary[address]["3_years_all"] = {k: years[k] for k in list(years)[:3]}
|
|
companies_summary[address]["3_survival"] = {k: survival_frequency[k] for k in list(survival_frequency)[:3]}
|
|
companies_summary = dict(sorted(companies_summary.items(), key=lambda item: item[1]["frequency"],reverse=True))
|
|
for i,company in enumerate(companies_summary):
|
|
print("Index: " + str(i))
|
|
print(company)
|
|
print(str(companies_summary[company]['frequency']) + " companies registered or corresponding here, " + str(companies_summary[company]['active_companies']) + " are active.")
|
|
keys = list(companies_summary[company]['3_years_active'].keys())
|
|
life_keys = list(companies_summary[company]['3_survival'].keys())
|
|
for key in keys:
|
|
print(str(companies_summary[company]['3_years_active'][key]) + " currently active companies registered in " + str(key))
|
|
for key in life_keys:
|
|
print(str(companies_summary[company]['3_survival'][key]) + " companies dissolved between years " + str(key+1) + "-" + str(key))
|
|
print("")
|
|
|
|
return {key: companies[key] for key in companies_summary if key in companies}
|
|
|
|
def get_officers(company_locations, indices):
|
|
officers = {}
|
|
for index in indices:
|
|
# get businesses at location
|
|
company_name = list(company_locations.keys())[index]
|
|
officers[str(company_name)] = []
|
|
companies = company_locations[company_name]
|
|
length = len(companies)
|
|
for i, business in enumerate(companies):
|
|
company_number = business['company_number']
|
|
url = "https://api.company-information.service.gov.uk/company/" + company_number + "/officers?size=" + size
|
|
while True:
|
|
try:
|
|
clear_output(wait=True)
|
|
print("completion: " + str(100*i/length) + ", index:" + str(i))
|
|
leadership = requests.get(url, auth=basic)
|
|
print(leadership)
|
|
if leadership.json():
|
|
officers[str(company_name)] += [[officer['name'] for officer in leadership.json()['items']]]
|
|
clear_output(wait=True)
|
|
time.sleep(0.41)
|
|
break
|
|
else:
|
|
officers[str(company_name)] += [[]]
|
|
clear_output(wait=True)
|
|
time.sleep(0.41)
|
|
break
|
|
except:
|
|
print(sys.exc_info()[0])
|
|
print("taking a 10 second timeout")
|
|
time.sleep(10)
|
|
clear_output(wait=True)
|
|
for location in list(officers.keys()):
|
|
directors = []
|
|
for business in officers[location]:
|
|
directors += business
|
|
frequency = collections.Counter(directors)
|
|
frequency = dict(sorted(frequency.items(), key=lambda item: item[1], reverse=True))
|
|
print(location)
|
|
print("-")
|
|
print("Most prolific officers:")
|
|
for officer in list(frequency):
|
|
print(str(officer) + " runs " + str(frequency[str(officer)]) + " businesses")
|
|
print("")
|
|
return officers
|