added additional comments and decorators in base class

This commit is contained in:
seangreaves
2023-01-18 17:45:07 +00:00
parent 7d3cfd87d8
commit c898e94512
11 changed files with 396 additions and 274 deletions

3
.gitignore vendored
View File

@@ -48,3 +48,6 @@ coverage.xml
# Sphinx documentation
docs/_build/
# Testing notebook
notebooks/testing.ipynb

BIN
assets/.DS_Store vendored Normal file

Binary file not shown.

BIN
dashboard/.DS_Store vendored Normal file

Binary file not shown.

View File

@@ -542,7 +542,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.15"
"version": "3.10.4"
}
},
"nbformat": 4,

View File

@@ -173,7 +173,7 @@
"metadata": {},
"outputs": [],
"source": [
"# western_crown_network = base.Network(file='western_crown_network.json')"
"western_crown_network = base.Network(file='western_crown_network.json')"
]
},
{
@@ -605,7 +605,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.15"
"version": "3.10.4"
}
},
"nbformat": 4,

View File

@@ -258,7 +258,7 @@
"metadata": {},
"outputs": [],
"source": [
"# regent_street_network = base.Network(file='regent_street_network.json')"
"regent_street_network = base.Network(file='regent_street_network.json')"
]
},
{
@@ -402,7 +402,7 @@
"metadata": {},
"outputs": [],
"source": [
"# shelton_street_network = base.Network(file='shelton_street_network.json')"
"shelton_street_network = base.Network(file='shelton_street_network.json')"
]
},
{
@@ -440,7 +440,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.15"
"version": "3.10.4"
}
},
"nbformat": 4,

View File

@@ -90,7 +90,7 @@
},
"outputs": [],
"source": [
"n = 3\n",
"n = 4\n",
"network = base.Network(company_id=company_id)\n",
"network.perform_hop(n)"
]
@@ -161,7 +161,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.15"
"version": "3.10.4"
}
},
"nbformat": 4,

View File

@@ -1,6 +1,7 @@
import requests
import time
import os
import functools
access_token = ""
username = ""
@@ -8,7 +9,18 @@ password = ""
size = "5000"
basic_auth = requests.auth.HTTPBasicAuth(username, password)
def auth(func):
"""Checks if user has set API Key."""
@functools.wraps(func)
def wrapper_auth(*args, **kwargs):
if not basic_auth.username:
print("Authentication required")
else:
func(*args, **kwargs)
return wrapper_auth
def test():
"""Test auth."""
url = "https://api.company-information.service.gov.uk/advanced-search/companies"
response = requests.get(url, auth=basic_auth)
if response.status_code == 200:
@@ -16,54 +28,60 @@ def test():
else:
return False
# @auth
def make_request(url, input, input_type, response_type):
if basic_auth.username:
time.sleep(0.5)
try:
response = requests.get(url, auth=basic_auth)
response.raise_for_status()
if response.status_code == 200:
return response.json()
except requests.exceptions.RequestException as err:
# print (err, f"{os.linesep}Failed to get {response_type} for {input_type}:", str(input))
return
except requests.exceptions.HTTPError as errh:
# print (errh, f"{os.linesep}Failed to get {response_type} for {input_type}:", str(input))
return
except requests.exceptions.ConnectionError as errc:
# print (errc, f"{os.linesep}Failed to get {response_type} for {input_type}:", str(input))
return
except requests.exceptions.Timeout as errt:
# print (errt, f"{os.linesep}Failed to get {response_type} for {input_type}:", str(input))
return
else:
print("Authentication required")
"""Query Companies House API."""
time.sleep(0.5)
try:
response = requests.get(url, auth=basic_auth)
response.raise_for_status()
if response.status_code == 200:
return response.json()
except requests.exceptions.RequestException as err:
# print (err, f"{os.linesep}Failed to get {response_type} for {input_type}:", str(input))
return
except requests.exceptions.HTTPError as errh:
# print (errh, f"{os.linesep}Failed to get {response_type} for {input_type}:", str(input))
return
except requests.exceptions.ConnectionError as errc:
# print (errc, f"{os.linesep}Failed to get {response_type} for {input_type}:", str(input))
return
except requests.exceptions.Timeout as errt:
# print (errt, f"{os.linesep}Failed to get {response_type} for {input_type}:", str(input))
return
def get_company_officers(company_id):
"""Get officers for input company."""
url = "https://api.company-information.service.gov.uk/company/" + company_id + "/officers"
return make_request(url, company_id, 'company', 'officers')
def get_psc(company_id):
"""Get PSCs for input company."""
url = "https://api.company-information.service.gov.uk/company/" + company_id + "/persons-with-significant-control"
return make_request(url, company_id, 'company', 'psc')
def get_company(company_id):
"""Get company records for input company."""
url = "https://api.company-information.service.gov.uk/company/" + company_id
return make_request(url, company_id, 'company', 'company')
def get_address_changes(company_id):
"""Get address changes for input company."""
url = "https://api.company-information.service.gov.uk/company/" + str(company_id) + "/filing-history/?category=address"
return make_request(url, company_id, 'company', 'address history')
def get_correspondance_address(officer_id):
"""Get correspondance address for input officer."""
url = "https://api.company-information.service.gov.uk/officers/" + officer_id + "/appointments?size=" + size
return make_request(url, officer_id, 'officer', 'correspondance address')
def get_appointments(officer_id):
"""Get appointments for input officer."""
url = "https://api.company-information.service.gov.uk/officers/" + officer_id + "/appointments"
return make_request(url, officer_id, 'officer', 'appointments')
def get_duplicate_officers(officer_id):
"""Get duplicate officers for input officer."""
url = "https://api.company-information.service.gov.uk/officers/" + officer_id + "/appointments"
response = make_request(url, officer_id, 'officer', 'appointments')
if response:
@@ -85,10 +103,12 @@ def get_duplicate_officers(officer_id):
return
def get_companies_at_address(address):
"""Get companies at input address location."""
url = "https://api.company-information.service.gov.uk/advanced-search/companies?location=" + address + "&size=" + "5000"
return make_request(url, address, 'address', 'companies')
def get_officers_at_address(address):
"""Get officers at input address location."""
url = "https://api.company-information.service.gov.uk/search/officers?q=location:" + address
response = make_request(url, address, 'address', 'officers')
if response:

View File

@@ -1,13 +1,13 @@
from sugartrail import api
from sugartrail import processing
import sugartrail
import IPython
import numpy as np
import math
import warnings
import json
import functools
from string import ascii_letters as alc
class Network:
"""Class represents a network of connected companies, officers and
addresses. Class contains methods to build network of user defined size from
a single seed company, officer or address."""
def __init__(self, officer_id=None, company_id=None, address=None, file=None):
self.addresses = []
self.officer_ids = []
@@ -26,80 +26,128 @@ class Network:
self.processed_companies = []
self.processed_addresses = []
self._file = self.load(file)
self.initialise()
self.initialise_node(officer_id, company_id, address, file)
def clear_state(func):
"""Resets the class attributes to pre-init state."""
@functools.wraps(func)
def wrapper_clear(*args, **kwargs):
args[0].addresses = []
args[0].officer_ids = []
args[0].company_ids = []
args[0].companies = []
args[0].address_history = []
args[0]._officer_id = None
args[0]._company_id = None
args[0]._address = None
args[0].n = 0
args[0].link_type = None
args[0].hop_history = []
args[0].maxsize_entities = []
args[0].processed_officers = []
args[0].processed_companies = []
args[0].processed_addresses = []
func(*args, **kwargs)
return wrapper_clear
@property
def officer_id(self):
"""officer_id property representing seed officer."""
return self._officer_id
@officer_id.setter
@sugartrail.api.auth
def officer_id(self, new_value):
"""officer_id setter that checks if officer_id exists in Companies House before setting value."""
officer_info = sugartrail.api.get_appointments(new_value)
if officer_info:
self._officer_id = new_value
self.officer_ids = [{
'officer_id': new_value,
'name': officer_info['items'][0]['name'],
'n':self.n,
'link_type': None,
'node_type': None,
'node': None}]
else:
print(f"Officer with ID:{str(new_value)} not found")
self._officer_id = None
@property
def company_id(self):
"""company_id property representing seed company."""
return self._company_id
@company_id.setter
@sugartrail.api.auth
def company_id(self, new_value):
"""company_id setter that checks if company_id exists in Companies House before setting value."""
company_info = sugartrail.api.get_company(new_value)
if company_info:
self._company_id = new_value
self.company_ids = [{
'company_id': self._company_id,
'n':self.n,
'link_type': '',
'node_type': '',
'node': ''}]
self.companies = [dict(sugartrail.processing.flatten(company_info))]
else:
print(f"Company with ID:{str(new_value)} not found")
self._company_id = None
@property
def address(self, value):
"""address property representing seed address."""
return self._address
@address.setter
@sugartrail.api.auth
def address(self, new_value):
"""address setter."""
self._address = new_value
self.addresses = [dict({'address': self._address,
'n':self.n,
'link_type': '',
'node_type': '',
'node': ''})]
@property
def file(self):
"""file property for loading pre-built network data into class."""
return self._file
@file.setter
def file(self, new_value):
"""file setter for loading pre-built network data into class."""
self._file = new_value
self._officer_id = None
self._company_id = None
self._address_id = None
self.load(self._file)
@property
def officer_id(self):
return self._officer_id
@officer_id.setter
def officer_id(self, new_value):
self._officer_id = new_value
self._company_id = None
self._address_id = None
self.initialise()
@property
def company_id(self):
return self._company_id
@company_id.setter
def company_id(self, new_value):
self._company_id = new_value
self._officer_id = None
self._address_id = None
self.initialise()
@property
def address(self, value):
return self._address
@address.setter
def address(self, new_value):
self._address = new_value
self._company_id = None
self._officer_id = None
self.initialise()
# change to initialise
def initialise(self):
@clear_state
def initialise_node(self, officer_id, company_id, address ,file):
"""Builds initial network from arguments."""
if self.n < 1:
if self._officer_id:
if api.get_appointments(self._officer_id):
self.officer_ids.append(dict({'officer_id': self._officer_id, 'name': api.get_appointments(self._officer_id)['items'][0]['name'], 'n':self.n, 'link_type': None, 'node_type': None, 'node': None}))
else:
print(f"Officer with ID:{str(self._officer_id)} not found")
elif self._company_id:
self.company_ids.append(dict({'company_id': self._company_id, 'n':self.n, 'link_type': '', 'node_type': '', 'node': ''}))
company = api.get_company(self._company_id)
self.companies.append(dict(processing.flatten(company)))
elif self._address:
self.addresses.append(dict({'address': self._address, 'n':self.n, 'link_type': '', 'node_type': '', 'node': ''}))
elif self.file:
pass
if officer_id:
self.officer_id = officer_id
elif company_id:
self.company_id = company_id
elif address:
self.address = address
elif file:
self.file = file
else:
print("No input provided. Please provide either officer_id, company_id or address value as input.")
print("No input provided. Please provide either officer_id, company_id, address or file as input.")
def save(self, filename, location='../assets/networks/'):
"""Saves network in JSON format to '../assets/networks/'."""
network_data = {k: v for k, v in self.__dict__.items() if k != 'hop' and k != 'file'}
saved_network = json.dumps(network_data)
f = open(f'{filename}', 'w')
f = open(location + f'{filename}', 'w')
f.write(saved_network)
f.close
def load(self, filename):
"""Loads network stored in JSON format from '../assets/networks/'."""
if filename:
f = open(f'../assets/networks/{filename}')
network_data = json.load(f)
@@ -119,77 +167,72 @@ class Network:
self.processed_companies = network_data['processed_companies']
self.processed_addresses = network_data['processed_addresses']
def add_company_names(self):
for i, row in enumerate(self.company_ids):
company_details = list(filter(lambda d: d.get('company_number') == row['company_id'], self.companies))
if company_details:
self.company_ids[i]['company_name'] = company_details[0]['company_name']
else:
company_details = api.get_company(row['company_id'])
if company_details:
if 'company_name' in company_details:
self.company_ids[i]['company_name'] = company_details['company_name']
def run_map_preprocessing(self):
"""Gets missing/additional information on companies and addresses required for
mapping them. This includes address histories, company records and coordinates."""
self.get_address_histories()
self.get_company_records_from_id()
self.get_coords()
return
def get_company_from_id(self, company_df=None, company_id=None, print_progress=True):
company_list = []
if company_id:
if company_id in [company['company_id'] for company in self.company_ids]:
company_list = [company_id]
else:
print("add valid company id")
else:
company_list = [company['company_id'] for company in self.company_ids]
def get_address_histories(self):
"""Gets missing address histories for companies at the edge of the network."""
historic_address_company_ids = list(dict.fromkeys([company['company_number'] for company in self.address_history]))
for i, company in enumerate(self.company_ids):
IPython.display.clear_output(wait=True)
print("Updated " + str(i+1) + "/" + str(len(self.company_ids)) + " company addresses.")
# if company is at the edge of the network:
# if historic address not in
if company['company_id'] not in historic_address_company_ids:
historic_address_company_ids.append(company['company_id'])
address_history = sugartrail.processing.build_address_history(company['company_id'])
historic_addresses = []
for historic_address in address_history:
if historic_address not in self.address_history:
historic_addresses.append(historic_address)
self.address_history.extend(historic_addresses)
def get_company_records_from_id(self, company_df=None, print_progress=True):
"""Gets company records for all company IDs in the network. Additionally
enriches company_ids with company names for improved readability."""
company_list = [company['company_id'] for company in self.company_ids]
companies = []
for i, company_id in enumerate(company_list):
IPython.display.clear_output(wait=True)
if print_progress:
print("Processed " + str(i+1) + "/" + str(len(company_list)) + " companies.")
if company_id not in [company['company_number'] for company in self.companies]:
# if using local Companies House data
if company_df is not None:
try:
company = company_df[company_df[" CompanyNumber"] == str(company_id)]["CompanyName"].item()
if company:
# self.companies = self.companies.append(pd.json_normalize(company), ignore_index=True)
companies.append(company)
except:
try:
company = api.get_company(company_id)
company = sugartrail.api.get_company(company_id)
if company:
# self.companies = self.companies.append(pd.json_normalize(company), ignore_index=True)
companies.append(company)
except:
print(f"Failed to get data for {company_id}")
# otherwise uses API
else:
company = api.get_company(company_id)
company = sugartrail.api.get_company(company_id)
if company:
# self.companies = self.companies.append(pd.json_normalize(company), ignore_index=True)
companies.append(company)
# add companies to dataframe
# update company_ids with company name
self.company_ids[i]['company_name'] = company['company_name']
else:
self.company_ids[i]['company_name'] = list(filter(lambda d: d.get('company_number') == company_id, self.companies))[0]['company_name']
self.companies.extend(companies)
def run_map_preprocessing(self):
# merge some of these methods:
self.update_address_history()
self.get_company_from_id()
self.add_company_names()
self.get_coords()
self.address_history = [dict(t) for t in {tuple(d.items()) for d in self.address_history}]
return
def update_address_history(self):
for i, company in enumerate(self.company_ids):
IPython.display.clear_output(wait=True)
print("Updated " + str(i+1) + "/" + str(len(self.company_ids)) + " company addresses.")
if company['n'] == self.n:
address_history = processing.build_address_history(company['company_id'])
self.address_history.extend(address_history)
def get_coords(self):
"""Gets coordinates for each address in addresses and address_history."""
for i, row in enumerate(self.addresses):
IPython.display.clear_output(wait=True)
print("Processed " + str(i+1) + "/" + str(len(self.addresses)) + " addresses.")
if 'lat' not in row or 'lon' not in row:
coords = processing.get_coords_from_address(row['address'])
coords = sugartrail.processing.get_coords_from_address(row['address'])
if coords:
self.addresses[i]['lat'] = coords['lat']
self.addresses[i]['lon'] = coords['lon']
@@ -199,23 +242,26 @@ class Network:
self.address_history[j]['lon'] = coords['lon']
self.address_history[j]['lat'] = coords['lat']
else:
# no coords found
self.addresses[i]['lat'] = ""
self.addresses[i]['lon'] = ""
print("No coords found: " + row['address'])
def find_path(self, select_company):
# network_link_type_rows = self.company_ids.loc[self.company_ids['company_id'] == select_company]
"""Finds path from 'select_company' to origin company'."""
# retrieve rows containing selected company:
network_link_type_rows = list(filter(lambda d: d.get('company_id') == select_company, self.company_ids))
path = []
company_info = self.get_company_from_id(company_id=select_company, print_progress=False)
# iterate through each path from selected company to seed company:
for i, row in enumerate(network_link_type_rows):
# insert end of path node:
path.insert(0, {'hop': row['n'], "type": "Company", "id": select_company, "node": row['company_name'], "node_type": row['link_type'], "link_id": row['node']})
# define search terms for locating connected nodes:
search_terms = [{'n': row['n']-1, 'node_type':row['node_type'], 'node':row['node']}]
# iterate through degrees of seperation till origin is reached:
for j in range(row['n']-1,-1,-1):
for term in search_terms:
if term['n'] == j:
if term['node_type'] == "Address":
###
select_rows = list(filter(lambda d: d.get('address') == term['node'] and d.get('n') == j, self.addresses))
for k, select_row in enumerate(select_rows):
if select_row['n'] == 0:
@@ -231,7 +277,6 @@ class Network:
elif term['node_type'] == "Company":
select_rows = list(filter(lambda d: d.get('company_id') == term['node'] and d.get('n') == j, self.company_ids))
for l, select_row in enumerate(select_rows):
self.get_company_from_id(company_id=select_row['company_id'], print_progress=False)
if select_row['n'] == 0:
origin = {'hop': j, "type": "Company", "id": select_row['company_id'], "node": select_row['company_name'], "node_type": "", "link_id": ""}
if origin not in path:
@@ -259,6 +304,7 @@ class Network:
print(f"{row['node_type']} is invalid node_type")
break
sorted_path = sorted(path, key=lambda d: d['hop'])
# add letter correspondance for readability
for i in range(len(sorted_path)-1,-1,-1):
search_term = sorted_path[i]['link_id']
link_indices = []
@@ -270,9 +316,12 @@ class Network:
return sorted_path
def perform_hop(self, hops, company_data=None):
"""Gets companies, officers and addresses within n-degrees of seperation
from current nodes, where n is the number of hops."""
hop_history = []
for hop in range(hops):
# check if previous hop completed, if any processed items then its still mid-processing:
# select the nodes for which the method will retrieve other nodes
# 1-degree of seperation from:
selected_addresses = [address['address'] for address in list(filter(lambda d: d.get('n') == self.n, self.addresses))]
selected_companies = [company['company_id'] for company in list(filter(lambda d: d.get('n') == self.n, self.company_ids))]
selected_officers = [officer['officer_id'] for officer in list(filter(lambda d: d.get('n') == self.n, self.officer_ids))]
@@ -281,26 +330,32 @@ class Network:
break
else:
for i,address in enumerate(selected_addresses):
# in-case method was run previously and failed to complete,
# check if address was previously processed:
if address not in self.processed_addresses:
self.hop.search_address(self, address, company_data)
self.processed_addresses.append(address)
IPython.display.clear_output(wait=True)
print("Hop number: " + str(hop))
print("Hop number: " + str(hop+1))
print("Processed " + str(i+1) + "/" + str(len(selected_addresses)) + " addresses.")
for j,company in enumerate(selected_companies):
# in-case method was run previously and failed to complete,
# check if company was previously processed:
if company not in self.processed_companies:
self.hop.search_company_id(self,company)
self.processed_companies.append(company)
IPython.display.clear_output(wait=True)
print("Hop number: " + str(hop))
print("Hop number: " + str(hop+1))
print("Processed " + str(len(selected_addresses)) + "/" + str(len(selected_addresses)) + " addresses.")
print("Processed " + str(j+1) + "/" + str(len(selected_companies)) + " companies.")
for k,officer in enumerate(selected_officers):
# in-case method was run previously and failed to complete,
# check if officer was previously processed:
if officer not in self.processed_officers:
self.hop.search_officer_id(self,officer)
self.processed_officers.append(officer)
IPython.display.clear_output(wait=True)
print("Hop number: " + str(hop))
print("Hop number: " + str(hop+1))
print("Processed " + str(len(selected_addresses)) + "/" + str(len(selected_addresses)) + " addresses.")
print("Processed " + str(len(selected_companies)) + "/" + str(len(selected_companies)) + " companies.")
print("Processed " + str(k+1) + "/" + str(len(selected_officers)) + " officers.")
@@ -318,6 +373,9 @@ class Network:
self.hop_history.extend(hop_history)
class Hop:
"""Class attributes store the criteria for each hop. Class contains
methods for getting officers, addresses and companies using the
criteria."""
def __init__(self):
self.get_company_officers = True
self.get_company_address_history = True
@@ -333,57 +391,68 @@ class Network:
self.companies_at_address_maxsize = 50
def search_company_id(self, network, company_id):
"""Gets officers and addresses connected to input company
(company_id)."""
officers = []
new_addresses = []
new_officers = []
if self.get_company_officers:
officers = api.get_company_officers(company_id)
# get officers at company
officers = sugartrail.api.get_company_officers(company_id)
if officers:
officers = officers['items']
if 'items' in officers:
officers = officers['items']
# process officer results
network.node_type = "Company"
network.node = company_id
# find addresses and officers already added to the network
lower_n_addresses = [address['address'] for address in list(filter(lambda d: d.get('n') < network.n+1, network.addresses))]
lower_n_officers = [officer['officer_id'] for officer in list(filter(lambda d: d.get('n') < network.n+1, network.officer_ids))]
if officers:
for officer in officers:
if 'address' in officer:
if processing.normalise_address(officer['address']) not in lower_n_addresses:
network.link_type = "Officer Corresponance Address"
new_address = {'address': processing.normalise_address(officer['address']), 'n':network.n+1, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}
if new_address not in new_addresses:
new_addresses.append(new_address)
# network.addresses = network.addresses.append({'address': processing.normalise_address(officer['address']), 'n':network.n, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}, ignore_index=True)
if officer['links']['officer']['appointments'].split('/')[2] not in lower_n_officers:
network.link_type = "Officer"
new_officer = {'officer_id': str(officer['links']['officer']['appointments'].split('/')[2]), 'name': processing.normalise_name(officer['name']), 'n':network.n+1, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}
if new_officer not in new_officers:
new_officers.append(new_officer)
# network.officer_ids = network.officer_ids.append({'officer_id': officer['links']['officer']['appointments'].split('/')[2], 'name': processing.normalise_name(officer['name']), 'n':network.n, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}, ignore_index=True)
# if 'address' in officer:
# # check address not already in the network
# if sugartrail.processing.normalise_address(officer['address']) not in lower_n_addresses:
# network.link_type = "Officer Corresponance Address"
# new_address = {'address': sugartrail.processing.normalise_address(officer['address']), 'n':network.n+1, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}
# if new_address not in new_addresses:
# new_addresses.append(new_address)
# # check not already in the network
if officer['links']['officer']['appointments'].split('/')[2] not in lower_n_officers:
network.link_type = "Officer"
new_officer = {'officer_id': str(officer['links']['officer']['appointments'].split('/')[2]), 'name': sugartrail.processing.normalise_name(officer['name']), 'n':network.n+1, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}
if new_officer not in new_officers:
new_officers.append(new_officer)
if self.get_psc_correspondance_address:
psc = api.get_psc(company_id)
# get address for company pscs
psc = sugartrail.api.get_psc(company_id)
if psc:
for person in psc['items']:
if "address" in person:
network.link_type = "Person of Significant Control Address"
if processing.normalise_address(person['address']) not in lower_n_addresses:
new_address = {'address': processing.normalise_address(person['address']), 'n':network.n+1, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}
if new_address not in new_addresses:
new_addresses.append(new_address)
if 'items' in psc:
for person in psc['items']:
if "address" in person:
network.link_type = "Person of Significant Control Address"
if sugartrail.processing.normalise_address(person['address']) not in lower_n_addresses:
new_address = {'address': sugartrail.processing.normalise_address(person['address']), 'n':network.n+1, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}
if new_address not in new_addresses:
new_addresses.append(new_address)
if self.get_company_address_history:
address_history = processing.build_address_history(company_id)
# get company address history
address_history = sugartrail.processing.build_address_history(company_id)
network.address_history.extend(address_history)
for address in address_history:
network.link_type = "Historic Address"
if address['address'] not in lower_n_addresses:
new_address = {'address': address['address'], 'n':network.n+1, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}
if new_address not in new_addresses:
new_addresses.append(dict({'address': address['address'], 'n':network.n+1, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}))
# network.addresses = network.addresses.append({'address': address['address'], 'n':network.n, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}, ignore_index=True)
if 'address' in address:
if address['address'] not in lower_n_addresses:
new_address = {'address': address['address'], 'n':network.n+1, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}
if new_address not in new_addresses:
new_addresses.append(dict({'address': address['address'], 'n':network.n+1, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}))
network.addresses.extend(new_addresses)
network.officer_ids.extend(new_officers)
def search_officer_id(self, network, officer_id):
"""Gets officers, companies and addresses connected to input officer
(officer_id)."""
new_addresses = []
new_companies = []
new_officers = []
@@ -392,13 +461,13 @@ class Network:
lower_n_addresses = [address['address'] for address in list(filter(lambda d: d.get('n') < network.n+1, network.addresses))]
lower_n_officers = [officer['officer_id'] for officer in list(filter(lambda d: d.get('n') < network.n+1, network.officer_ids))]
lower_n_companies = [company['company_id'] for company in list(filter(lambda d: d.get('n') < network.n+1, network.company_ids))]
appointments = api.get_appointments(officer_id)
appointments = sugartrail.api.get_appointments(officer_id)
if appointments:
if self.officer_appointments_maxsize == None or len(appointments['items']) < int(self.officer_appointments_maxsize or 0):
for appointment in appointments['items']:
if processing.normalise_address(appointment['address']) not in lower_n_addresses:
if sugartrail.processing.normalise_address(appointment['address']) not in lower_n_addresses:
network.link_type = "Appointment Address"
new_address = {'address': processing.normalise_address(appointment['address']), 'n':network.n+1, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}
new_address = {'address': sugartrail.processing.normalise_address(appointment['address']), 'n':network.n+1, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}
if new_address not in new_addresses:
new_addresses.append(new_address)
if appointment['appointed_to']['company_number'] not in lower_n_companies:
@@ -409,15 +478,15 @@ class Network:
elif len(appointments['items']) > int(self.officer_appointments_maxsize):
network.maxsize_entities.append(dict({'node':officer_id,'type': 'Officer', 'maxsize_type': 'Appointments', 'size': len(appointments['items'])}))
if self.get_officer_correspondance_address:
correspondance_address = api.get_correspondance_address(officer_id)
correspondance_address = sugartrail.api.get_correspondance_address(officer_id)
if correspondance_address:
if processing.normalise_address(correspondance_address['items'][0]['address']) not in lower_n_addresses:
if sugartrail.processing.normalise_address(correspondance_address['items'][0]['address']) not in lower_n_addresses:
network.link_type = "Officer Corresponance Address"
new_address = {'address': processing.normalise_address(correspondance_address['items'][0]['address']), 'n':network.n+1, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}
new_address = {'address': sugartrail.processing.normalise_address(correspondance_address['items'][0]['address']), 'n':network.n+1, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node}
if new_address not in new_addresses:
new_addresses.append(new_address)
if self.get_officer_duplicates:
duplicate_officers = api.get_duplicate_officers(officer_id)
duplicate_officers = sugartrail.api.get_duplicate_officers(officer_id)
if duplicate_officers:
if self.officer_duplicates_maxsize == None or len(duplicate_officers) < int(self.officer_duplicates_maxsize or 0):
for duplicate in duplicate_officers:
@@ -433,6 +502,8 @@ class Network:
network.company_ids.extend(new_companies)
def search_address(self, network, address, company_data):
"""Gets officers, companies and addresses connected to input officer
(officer_id)."""
new_companies = []
new_officers = []
network.node_type = "Address"
@@ -443,9 +514,9 @@ class Network:
if self.get_companies_at_address:
companies = {}
if company_data is not None:
companies['items'] = processing.get_companies_from_address_database(address, company_data)
companies['items'] = sugartrail.processing.get_companies_from_address_database(address, company_data)
else:
companies = api.get_companies_at_address(address)
companies = sugartrail.api.get_companies_at_address(address)
if companies:
if 'items' in companies:
if self.companies_at_address_maxsize == None or len(companies['items']) < int(self.companies_at_address_maxsize or 0):
@@ -458,7 +529,7 @@ class Network:
elif len(companies['items']) > int(self.companies_at_address_maxsize):
network.maxsize_entities.append(dict({'node':address,'type': 'Address', 'maxsize_type': 'Companies', 'size': len(companies['items'])}))
if self.get_officers_at_address:
officers = api.get_officers_at_address(address)
officers = sugartrail.api.get_officers_at_address(address)
if officers:
if self.officers_at_address_maxsize == None or len(officers) < int(self.officers_at_address_maxsize or 0):
for officer in officers:

View File

@@ -5,12 +5,120 @@ import functools
import math
def build_map(network, clear_widget=True):
"""Generates map and table for displaying paths for input network data."""
if clear_widget:
Widget.close_all()
m, path_table = load_map_data(network)
return m, path_table
def load_map_data(network):
"""Adds data from input network to map in 3 layers; marker_cluster,
address_trail and origin_trail. marker_cluster contains all the companies
in the network geolocated, address_trail contains all the historic address
antpaths and origin_trail contains all the antpaths connecting companies
through other companies towards the origin company."""
# initialise historic address trail antpath
address_trail = AntPath(
locations=[],
dash_array=[1,10],
delay=1000,
color='#ed2f2f',
pulse_color='#FFFFFF'
)
# initialise trail from company to origin antpath
origin_trail = AntPath(
locations=[],
dash_array=[1,10],
delay=1000,
color='#000000',
pulse_color='#FFFFFF'
)
# initialise table for printing company to origin trail
path_table = HTML(
value=""
)
# initialise map
m = Map(center=(50, 0),
zoom=5,
layout=Layout(width='90%', height='650px'))
# add antpath layers
m.add_layer(address_trail)
m.add_layer(origin_trail)
# add marker for each company in network
marker_cluster = MarkerCluster(
center=(50, 0),
markers=get_marker_data(network, address_trail, origin_trail, path_table),
disable_clustering_at_zoom = 25,
max_cluster_radius = 25
)
# add markers as layer
m.add_layer(marker_cluster)
return m, path_table
def get_marker_data(network,address_trail, origin_trail, path_table):
"""Generates a marker for each company historic address."""
markers = []
for index, row in enumerate(network.address_history):
if row['lat'] and row['lon']:
marker_color = "green"
# locate company at historic address
company = list(filter(lambda d: d.get('company_number') == row['company_number'], network.companies))[0]
company_name = company['company_name']
company_status = company['company_status']
if company_status == "active":
if row['end_date']:
marker_color = "red"
else:
marker_color = "black"
address = row['address']
# find path from company to origin
path = network.find_path(str(row['company_number']))
locations_from_origin = locations_from_origin_path(path, network)
message = HTML()
message.value = str(company_name) + "<hr>" + str(address)
icon = AwesomeIcon(
marker_color=marker_color
)
# find historic addresses path for company
address_path = get_address_path(network,str(row['company_number']))
marker = Marker(icon=icon, opacity=1, location=(row['lat'], row['lon']), draggable=False, popup=message, title="Address")
# attach on click behavoir for marker
marker.on_click(functools.partial(on_button_clicked, address_path=address_path, address_trail=address_trail, path_table=path_table, origin_trail=origin_trail, path=path, location=(row['lat'], row['lon']), locations_from_origin = locations_from_origin))
markers.append(marker)
return markers
def locations_from_origin_path(path, network):
"""Returns list of addresses found within origin path."""
locations = []
for node in path:
if node['type'] == 'Company':
# finds location for company node
company_address_history = list(filter(lambda d: d.get('company_number') == node['id'], network.address_history))
company_address_history_sorted = sorted(company_address_history, key=lambda d: d['start_date'], reverse=True)
last_company_address_row = {}
for address_row in company_address_history_sorted:
if address_row['lat'] and address_row['lon']:
last_company_address_row = address_row
break
if last_company_address_row:
lat = last_company_address_row['lat']
lon = last_company_address_row['lon']
if not lat or not lon:
pass
else:
locations.append([lat,lon])
elif node['type'] == 'Address':
address_row = list(filter(lambda d: d.get('address') == node['node'], network.addresses))[0]
lat = address_row['lat']
lon = address_row['lon']
if not lat or not lon:
pass
else:
locations.append([lat,lon])
return locations
def get_address_path(network, company_id):
"""Returns list of historic addresses for input company (company_id)."""
company_address_history = list(filter(lambda d: d.get('company_number') == company_id, network.address_history))
company_address_history_sorted = sorted(company_address_history, key=lambda d: d['start_date'], reverse=True)
address_path = []
@@ -21,38 +129,8 @@ def get_address_path(network, company_id):
address_path.insert(0,[row['lat'], row['lon']])
return address_path
def locations_from_origin_path(path, network):
locations = []
for node in path:
if node['type'] == 'Company':
###
company_address_history = list(filter(lambda d: d.get('company_number') == node['id'], network.address_history))
company_address_history_sorted = sorted(company_address_history, key=lambda d: d['start_date'], reverse=True)
last_company_address_row = {}
for address_row in company_address_history_sorted:
if address_row['lat'] and address_row['lon']:
last_company_address_row = address_row
break
# last_company_address_row = list(filter(lambda d: d.get('company_number') == node['id'], network.address_history))[0]
if last_company_address_row:
lat = last_company_address_row['lat']
lon = last_company_address_row['lon']
if not lat or not lon:
pass
else:
locations.append([lat,lon])
elif node['type'] == 'Address':
address_row = list(filter(lambda d: d.get('address') == node['node'], network.addresses))[0]
# address_row = network.addresses.loc[network.addresses['address'] == node['node']].iloc[:1]
lat = address_row['lat']
lon = address_row['lon']
if not lat or not lon:
pass
else:
locations.append([lat,lon])
return locations
def on_button_clicked(address_path, path, location, address_trail, path_table, origin_trail, locations_from_origin, **kwargs):
"""Adds data to map layers that will render when marker is clicked."""
address_trail.locations = address_path
locations_from_origin[-1] = location
origin_trail.locations = locations_from_origin
@@ -60,6 +138,7 @@ def on_button_clicked(address_path, path, location, address_trail, path_table, o
return
def html_table_generator(path):
"""Generates table for displaying origin path data."""
table_style = '<style>table {font-family: arial, sans-serif;border-collapse: collapse;}td, th {border: 1px solid #dddddd;text-align: left;padding: 8px;}tr:nth-child(even) {background-color: #dddddd;}</style>'
headers = ['Node Index', 'Node', 'Hop', 'Node Type', 'Link']
headers_row = ""
@@ -70,67 +149,3 @@ def html_table_generator(path):
nodes += '<tr><td>' + node['node_index'] + '</td><td>' + str(node['node']) + '</td><td>' + str(node['hop']) + '</td><td>' + str(node['node_type']) + '</td><td>' + str(node['link']) + '</td></tr>'
table_html = table_style + '<table><tr>' + headers_row + '</tr>' + nodes + '</table>'
return table_html
def load_map_data(network):
address_trail = AntPath(
locations=[],
dash_array=[1,10],
delay=1000,
color='#ed2f2f',
pulse_color='#FFFFFF'
)
origin_trail = AntPath(
locations=[],
dash_array=[1,10],
delay=1000,
color='#000000',
pulse_color='#FFFFFF'
)
path_table = HTML(
value=""
)
m = Map(center=(50, 0),
zoom=5,
layout=Layout(width='90%', height='650px'))
m.add_layer(address_trail)
m.add_layer(origin_trail)
marker_cluster = MarkerCluster(
center=(50, 0),
markers=get_marker_data(network, address_trail, origin_trail, path_table),
disable_clustering_at_zoom = 25,
max_cluster_radius = 25
)
m.add_layer(marker_cluster)
return m, path_table
def get_marker_data(network,address_trail, origin_trail, path_table):
address_trail=address_trail
origin_trail=origin_trail
ms = []
for index, row in enumerate(network.address_history):
if row['lat'] and row['lon']:
path = ""
locations_from_origin = ""
message = HTML()
marker_color = "green"
company = list(filter(lambda d: d.get('company_number') == row['company_number'], network.companies))[0]
# company = network.companies.loc[network.companies['company_number'] == row['company_number']]
company_name = company['company_name']
company_status = company['company_status']
if company_status == "active":
if row['end_date']:
marker_color = "red"
else:
marker_color = "black"
address = row['address']
path = network.find_path(str(row['company_number']))
locations_from_origin = locations_from_origin_path(path, network)
message.value = str(company_name) + "<hr>" + str(address)
icon = AwesomeIcon(
marker_color=marker_color
)
address_path = get_address_path(network,str(row['company_number']))
marker = Marker(icon=icon, opacity=1, location=(row['lat'], row['lon']), draggable=False, popup=message, title="Address")
marker.on_click(functools.partial(on_button_clicked, address_path=address_path, address_trail=address_trail, path_table=path_table, origin_trail=origin_trail, path=path, location=(row['lat'], row['lon']), locations_from_origin = locations_from_origin))
ms.append(marker)
return ms

View File

@@ -7,6 +7,7 @@ import regex as re
import collections
def flatten(d, parent_key='', sep='.'):
"""Flatten nested dictionary."""
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
@@ -17,6 +18,7 @@ def flatten(d, parent_key='', sep='.'):
return dict(items)
def infer_postcode(address_string):
"""Extracts UK postcode from input address string with regex."""
postcode = re.findall(r'\b[A-Z]{1,2}[0-9][A-Z0-9]? [0-9][ABD-HJLNP-UW-Z]{2}\b', address_string)
if postcode:
return postcode[0]
@@ -24,12 +26,15 @@ def infer_postcode(address_string):
return
def get_companies_from_address_database(address, company_data):
"""Searches input dataframe (company_data) for companies at input address
(address) and returns list of dicts."""
companies = company_data[company_data[' RegAddress.AddressLine2'].apply(lambda x: str(x).upper() in address.upper()) | company_data['RegAddress.AddressLine1'].apply(lambda x: str(x).upper() in address.upper()) & company_data['RegAddress.PostCode'].apply(lambda x: str(x).upper() in address.upper())]
companies = companies.rename(columns={'CompanyName': 'company_name', ' CompanyNumber': 'company_number', 'CompanyStatus': 'company_status', 'CompanyCategory': 'company_type', 'RegAddress.AddressLine1': 'address_line_1', ' RegAddress.AddressLine2': 'address_line_2', 'RegAddress.PostCode': 'postal_code', 'RegAddress.PostTown': 'locality', 'RegAddress.Country': 'country', 'IncorporationDate':'date_of_creation', 'DissolutionDate': 'date_of_cessation'})
companies['registered_office_address'] = [{'address_line_1': row['address_line_1'], 'address_line_2': row['address_line_2'], 'locality': row['locality'], 'postal_code': row['postal_code'], 'country': row['country']} for i,row in companies.iterrows()]
return companies.to_dict('records')
def get_nearby_postcode(postcode_string):
"""Find closest nearby postcode to input postcode (postcode_string)."""
url = "http://api.postcodes.io/postcodes/" + postcode_string[:-1] + "/autocomplete"
response = requests.get(url).json()
if response['result'] != None:
@@ -44,6 +49,7 @@ def get_nearby_postcode(postcode_string):
return closest_address["postcode"]
def get_coords_from_address(address_string):
"""Attempt retrieval of coords for input address string."""
address = urllib.parse.quote(address_string)
url = 'https://nominatim.openstreetmap.org/search/' + urllib.parse.quote(address) +'?format=json'
response = requests.get(url).json()
@@ -70,11 +76,14 @@ def get_coords_from_address(address_string):
print("No postcode found for: " + address_string)
def normalise_name(name):
"""Move first word (often surname) from the beginning to the end of string."""
name_list = name.replace(',','').split(" ")
name_list.append(name_list.pop(0))
return ' '.join(name_list)
def process_address_changes(address_changes):
"""Attempt retrieval of 'new_address' value if Companies House record is
incomplete."""
for i in reversed(range(1,len(address_changes['items']))):
if 'new_address' not in address_changes['items'][i]['description_values'].keys():
if 'old_address' in address_changes['items'][i-1]['description_values'].keys():
@@ -82,6 +91,8 @@ def process_address_changes(address_changes):
return address_changes
def build_address_history(company_id):
"""Returns a list of dicts containing historic addresses for input company
(company_id)."""
company_info = api.get_company(company_id)
if company_info:
company_info_subset = {k:company_info[k] for k in ("date_of_creation","date_of_cessation","registered_office_address") if k in company_info}
@@ -89,6 +100,7 @@ def build_address_history(company_id):
address_keys = ('start_date','end_date','address')
if address_changes:
if address_changes['items']:
# attempt to retrieve any missing items within address changes
address_changes = process_address_changes(address_changes)
addresses = []
entry = {}
@@ -148,6 +160,7 @@ def build_address_history(company_id):
return []
def normalise_address(address_dict):
"""Joins address key values into a single str."""
address_list = []
for key in ['premises','address_line_1', 'locality','postal_code', 'country']:
if key in address_dict: