Files
sugartrail/sugartrail.py
seangreaves 98fb28408b first commit
2022-09-04 14:43:57 +01:00

148 lines
7.3 KiB
Python

from requests.auth import HTTPBasicAuth
import requests
import pandas as pd
import sys
from IPython.display import clear_output
import time
import collections
from datetime import datetime
import math
access_token = ""
username = access_token
password = ""
size = "5000"
basic = HTTPBasicAuth(username, password)
def get_appointments(officer_id):
url = "https://api.company-information.service.gov.uk/officers/" + officer_id + "/appointments?size=" + size
response = requests.get(url, auth=basic)
# print metadata
df = pd.DataFrame(response.json()['items'])
appointments = len(df)
print(str(appointments) + " appointments")
print(str(appointments - df["resigned_on"].count()) + " active appointments")
return response.json()
def get_locations(companies, address_type: str):
df = pd.DataFrame(companies['items'])
if address_type == "correspondance":
postcode = [address['postal_code'] for address in df['address']]
addresses = [address['premises'] + ", " + address['address_line_1'] + ", " + address['locality'] + ", " + address['country'] + ", " + address['postal_code'] for address in df['address']]
elif address_type == "registered":
addresses = []
keys = ["address_line_1","address_line_2","country","locality","postal_code"]
for link in df['links']:
url = "https://api.company-information.service.gov.uk" + link['company'] + "/registered-office-address"
response = requests.get(url, auth=basic)
address = []
postcode = []
for key in keys:
if key in response.json():
address += [response.json()[key]]
if key == "postal_code":
postcode += [response.json()[key]]
address = ", ".join(address)
addresses += [address]
else:
print("unrecognised address type: should be either corresponance or registered")
return None
postcode_frequency = dict(collections.Counter(postcode).items(), key=lambda item: item[1], reverse=True)
print(str(len(postcode_frequency)) + " unique postcodes")
frequency = dict(sorted(collections.Counter(addresses).items(), key=lambda item: item[1], reverse=True))
print(str(len(frequency)) + " unique " + address_type + " addresses")
print(frequency)
return addresses
def year_of_creation(companies):
years = [address['date_of_creation'][0:4] for address in companies]
frequency = collections.Counter(years)
return dict(sorted(frequency.items(), key=lambda item: item[1], reverse=True))
def age(creation: str, cessation: str):
delta = datetime.strptime(cessation, "%Y-%m-%d")-datetime.strptime(creation, "%Y-%m-%d")
return math.floor(delta.days/365)
def get_companies(addresses):
companies = {}
companies_summary = {}
for address in addresses:
url = "https://api.company-information.service.gov.uk/advanced-search/companies?location=" + address + "&size=" + size
response = requests.get(url, auth=basic)
if response.status_code == 200:
companies[address] = response.json()['items']
companies_summary[address] = {}
companies_summary[address]["frequency"] = response.json()['hits']
all_companies = [address for address in response.json()['items']]
active_companies = [address for address in response.json()['items'] if address['company_status'] == 'active']
dead_companies = [address for address in response.json()['items'] if address['company_status'] == 'dissolved']
companies_summary[address]["active_companies"] = len(active_companies)
years = year_of_creation(all_companies)
survival_months = [age(address['date_of_creation'],address['date_of_cessation']) for address in dead_companies]
survival_frequency = collections.Counter(survival_months)
survival_frequency = dict(sorted(survival_frequency.items(), key=lambda item: item[1], reverse=True))
active_years = year_of_creation(active_companies)
companies_summary[address]["3_years_active"] = {k: active_years[k] for k in list(active_years)[:3]}
companies_summary[address]["3_years_all"] = {k: years[k] for k in list(years)[:3]}
companies_summary[address]["3_survival"] = {k: survival_frequency[k] for k in list(survival_frequency)[:3]}
companies_summary = dict(sorted(companies_summary.items(), key=lambda item: item[1]["frequency"],reverse=True))
for i,company in enumerate(companies_summary):
print("Index: " + str(i))
print(company)
print(str(companies_summary[company]['frequency']) + " companies registered or corresponding here, " + str(companies_summary[company]['active_companies']) + " are active.")
keys = list(companies_summary[company]['3_years_active'].keys())
life_keys = list(companies_summary[company]['3_survival'].keys())
for key in keys:
print(str(companies_summary[company]['3_years_active'][key]) + " currently active companies registered in " + str(key))
for key in life_keys:
print(str(companies_summary[company]['3_survival'][key]) + " companies dissolved between years " + str(key+1) + "-" + str(key))
print("")
return {key: companies[key] for key in companies_summary if key in companies}
def get_officers(company_locations, indices):
officers = {}
for index in indices:
# get businesses at location
company_name = list(company_locations.keys())[index]
officers[str(company_name)] = []
companies = company_locations[company_name]
length = len(companies)
for i, business in enumerate(companies):
company_number = business['company_number']
url = "https://api.company-information.service.gov.uk/company/" + company_number + "/officers?size=" + size
while True:
try:
clear_output(wait=True)
print("completion: " + str(100*i/length) + ", index:" + str(i))
leadership = requests.get(url, auth=basic)
print(leadership)
if leadership.json():
officers[str(company_name)] += [[officer['name'] for officer in leadership.json()['items']]]
clear_output(wait=True)
time.sleep(0.41)
break
else:
officers[str(company_name)] += [[]]
clear_output(wait=True)
time.sleep(0.41)
break
except:
print(sys.exc_info()[0])
print("taking a 10 second timeout")
time.sleep(10)
clear_output(wait=True)
for location in list(officers.keys()):
directors = []
for business in officers[location]:
directors += business
frequency = collections.Counter(directors)
frequency = dict(sorted(frequency.items(), key=lambda item: item[1], reverse=True))
print(location)
print("-")
print("Most prolific officers:")
for officer in list(frequency):
print(str(officer) + " runs " + str(frequency[str(officer)]) + " businesses")
print("")
return officers