From c898e945124cafbb06ea902c412a9941790e0e0a Mon Sep 17 00:00:00 2001 From: seangreaves Date: Wed, 18 Jan 2023 17:45:07 +0000 Subject: [PATCH] added additional comments and decorators in base class --- .gitignore | 3 + assets/.DS_Store | Bin 0 -> 10244 bytes dashboard/.DS_Store | Bin 0 -> 6148 bytes notebooks/001_getting_started.ipynb | 2 +- notebooks/002_candy_connections.ipynb | 4 +- notebooks/003_virtual_offices.ipynb | 6 +- notebooks/quickstart.ipynb | 4 +- sugartrail/api.py | 62 +++-- sugartrail/base.py | 371 +++++++++++++++----------- sugartrail/mapview.py | 205 +++++++------- sugartrail/processing.py | 13 + 11 files changed, 396 insertions(+), 274 deletions(-) create mode 100644 assets/.DS_Store create mode 100644 dashboard/.DS_Store diff --git a/.gitignore b/.gitignore index 1d8dd28..4cf359e 100644 --- a/.gitignore +++ b/.gitignore @@ -48,3 +48,6 @@ coverage.xml # Sphinx documentation docs/_build/ + +# Testing notebook +notebooks/testing.ipynb diff --git a/assets/.DS_Store b/assets/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..bf1e5beae7b9b92560c8247cdbe3d9b07f6ef7bd GIT binary patch literal 10244 zcmeHMTWl0n82j@OV@6=1QzHbl$*l#CMYGXw51m=>uhIi$Ii}dcV>5i zrb%PGMM*SKRD3f=jW0YPCi0+|=nDxCN(d2+x9Ec*CPsbmg^T~0GmG0Bi7#NxoMg^_ z{&W6+&VRo7{+T^z0RWqFS{xt(00Lc1%9T`2Q`k7Wrj(H1Ba=k(2T;L+V~_-mT0sdA z2oMMm2oMMm2oU%$B0%44R)jfR26ccyfIxu2Z3Ni+Awn0E315zL>AgCr^0ojZ%c$Ec z)MncO<`eN{!k6P*(g+pgt|YlD`dbX-?$jUg?GnBm=aRbv`kN2*XGVXAg5KP$KqTnV;YWIUw;Kdv&zb6R|plt9N}>1kX`6>RX67} zso8@}>#}V#Rcx!N1BPBm%JZ9T%T*0a&w37qrjyB`9>db?LX(}dG{^Jp5{tyQG>pX}qw)G}4YA0k2O7r4L}5X6!;byQVKZ&nFS5w#J?Xzsp^EFb>l4(P zvanK+vG3(m)7m)sX1@5)&En#}DlT4><|tj=QctfjU|XHWaXl%|b7=LCklK;e^R()o z0o%-_Ehi~g52)rq&Qx8!(KMN{)9dO-UDoP%?5x}D875^p=ji)&vNQ5}3t5wE4ZBl& z#x-oqIiP19lF>vxUz~Wla>25?m22xaZ{OMU@ZR>zRdeUb)isjTPcT}p@r0pgy9Cw~MAd!lrFg9k)HBTi(px z!fKUPLFi*+jhd!wm#Y>nUbuUYqVzjMYF2MA5GCbHEKKTeqkiu>(g?d#t7f=%)*tNf z24%y_8^m{d4xILaAVd?Z`2+Dl6=n7lK!G~g1P?+7^wA+=0Zzb4cn#ix({K*X!v**f zuE16J8oq&_;aB(ruEPzug@7W~U@hK{br{7MZpCf59h-3vK8ky>4ZE=)58+`vf`gbw z8#6e9$MGqA8lS;u@g;l%PvM*RHlD!`@GO3epWu1CfS2%F{2qV6AMqMq#~XMvR1pe? zmV}mumW5V^)`ns{`cN^0dbwV#L+hzEHG`bI|7r+zPnR9xmR4!=J~mPEOV6b5^32?nbS@D){hD-&EyJh9g*q; z;~*Mv8Vf{a|*Ds2}jcjPjzE|GSM zq70rAUK4H-1bGTEXQbu(a0WhtPvIib@;mqmeu2M;iUQ8X#kid4$jG$Ee{I5^mboI)uxF#1NLnU(ZONGkwP&2d}@r~rUQB`lP&SRvF-x+Dem zln|Bs#sTcZ5j4QSo26)R{EH0G+HFDzr?_%;SXjTeslAR;8(}+bj+dl=Sk~o4lVP88 z?xKF03`e!vZ&lbRK6_r$CB3CThPQSSjyj|9u-O@WpxU{$X*9Px(M8gov@5SpZ8qv8 zS$E(PlDLZ@SC>f^+ey=ovv}xoL*Jmwy4{3X20UtQ55c1CodT1@C}WXe55z^@Y?qJKJfhAy11TgwU`5~ensEYA3sml z$HtG2?cXl;pVKf_j`K@6$Woi#VEcTI6e}VO2m`{vA_IIr=&ePk*usD?APgV_bUqly zfHC*zjt(r`69BP@ZYOBd?;_PYmnKJd utwH^SN int(self.officer_appointments_maxsize): network.maxsize_entities.append(dict({'node':officer_id,'type': 'Officer', 'maxsize_type': 'Appointments', 'size': len(appointments['items'])})) if self.get_officer_correspondance_address: - correspondance_address = api.get_correspondance_address(officer_id) + correspondance_address = sugartrail.api.get_correspondance_address(officer_id) if correspondance_address: - if processing.normalise_address(correspondance_address['items'][0]['address']) not in lower_n_addresses: + if sugartrail.processing.normalise_address(correspondance_address['items'][0]['address']) not in lower_n_addresses: network.link_type = "Officer Corresponance Address" - new_address = {'address': processing.normalise_address(correspondance_address['items'][0]['address']), 'n':network.n+1, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node} + new_address = {'address': sugartrail.processing.normalise_address(correspondance_address['items'][0]['address']), 'n':network.n+1, 'link_type':network.link_type, 'node_type': network.node_type, 'node': network.node} if new_address not in new_addresses: new_addresses.append(new_address) if self.get_officer_duplicates: - duplicate_officers = api.get_duplicate_officers(officer_id) + duplicate_officers = sugartrail.api.get_duplicate_officers(officer_id) if duplicate_officers: if self.officer_duplicates_maxsize == None or len(duplicate_officers) < int(self.officer_duplicates_maxsize or 0): for duplicate in duplicate_officers: @@ -433,6 +502,8 @@ class Network: network.company_ids.extend(new_companies) def search_address(self, network, address, company_data): + """Gets officers, companies and addresses connected to input officer + (officer_id).""" new_companies = [] new_officers = [] network.node_type = "Address" @@ -443,9 +514,9 @@ class Network: if self.get_companies_at_address: companies = {} if company_data is not None: - companies['items'] = processing.get_companies_from_address_database(address, company_data) + companies['items'] = sugartrail.processing.get_companies_from_address_database(address, company_data) else: - companies = api.get_companies_at_address(address) + companies = sugartrail.api.get_companies_at_address(address) if companies: if 'items' in companies: if self.companies_at_address_maxsize == None or len(companies['items']) < int(self.companies_at_address_maxsize or 0): @@ -458,7 +529,7 @@ class Network: elif len(companies['items']) > int(self.companies_at_address_maxsize): network.maxsize_entities.append(dict({'node':address,'type': 'Address', 'maxsize_type': 'Companies', 'size': len(companies['items'])})) if self.get_officers_at_address: - officers = api.get_officers_at_address(address) + officers = sugartrail.api.get_officers_at_address(address) if officers: if self.officers_at_address_maxsize == None or len(officers) < int(self.officers_at_address_maxsize or 0): for officer in officers: diff --git a/sugartrail/mapview.py b/sugartrail/mapview.py index c5e8823..0143c03 100644 --- a/sugartrail/mapview.py +++ b/sugartrail/mapview.py @@ -5,12 +5,120 @@ import functools import math def build_map(network, clear_widget=True): + """Generates map and table for displaying paths for input network data.""" if clear_widget: Widget.close_all() m, path_table = load_map_data(network) return m, path_table +def load_map_data(network): + """Adds data from input network to map in 3 layers; marker_cluster, + address_trail and origin_trail. marker_cluster contains all the companies + in the network geolocated, address_trail contains all the historic address + antpaths and origin_trail contains all the antpaths connecting companies + through other companies towards the origin company.""" + # initialise historic address trail antpath + address_trail = AntPath( + locations=[], + dash_array=[1,10], + delay=1000, + color='#ed2f2f', + pulse_color='#FFFFFF' + ) + # initialise trail from company to origin antpath + origin_trail = AntPath( + locations=[], + dash_array=[1,10], + delay=1000, + color='#000000', + pulse_color='#FFFFFF' + ) + # initialise table for printing company to origin trail + path_table = HTML( + value="" + ) + # initialise map + m = Map(center=(50, 0), + zoom=5, + layout=Layout(width='90%', height='650px')) + # add antpath layers + m.add_layer(address_trail) + m.add_layer(origin_trail) + # add marker for each company in network + marker_cluster = MarkerCluster( + center=(50, 0), + markers=get_marker_data(network, address_trail, origin_trail, path_table), + disable_clustering_at_zoom = 25, + max_cluster_radius = 25 + ) + # add markers as layer + m.add_layer(marker_cluster) + return m, path_table + +def get_marker_data(network,address_trail, origin_trail, path_table): + """Generates a marker for each company historic address.""" + markers = [] + for index, row in enumerate(network.address_history): + if row['lat'] and row['lon']: + marker_color = "green" + # locate company at historic address + company = list(filter(lambda d: d.get('company_number') == row['company_number'], network.companies))[0] + company_name = company['company_name'] + company_status = company['company_status'] + if company_status == "active": + if row['end_date']: + marker_color = "red" + else: + marker_color = "black" + address = row['address'] + # find path from company to origin + path = network.find_path(str(row['company_number'])) + locations_from_origin = locations_from_origin_path(path, network) + message = HTML() + message.value = str(company_name) + "
" + str(address) + icon = AwesomeIcon( + marker_color=marker_color + ) + # find historic addresses path for company + address_path = get_address_path(network,str(row['company_number'])) + marker = Marker(icon=icon, opacity=1, location=(row['lat'], row['lon']), draggable=False, popup=message, title="Address") + # attach on click behavoir for marker + marker.on_click(functools.partial(on_button_clicked, address_path=address_path, address_trail=address_trail, path_table=path_table, origin_trail=origin_trail, path=path, location=(row['lat'], row['lon']), locations_from_origin = locations_from_origin)) + markers.append(marker) + return markers + +def locations_from_origin_path(path, network): + """Returns list of addresses found within origin path.""" + locations = [] + for node in path: + if node['type'] == 'Company': + # finds location for company node + company_address_history = list(filter(lambda d: d.get('company_number') == node['id'], network.address_history)) + company_address_history_sorted = sorted(company_address_history, key=lambda d: d['start_date'], reverse=True) + last_company_address_row = {} + for address_row in company_address_history_sorted: + if address_row['lat'] and address_row['lon']: + last_company_address_row = address_row + break + if last_company_address_row: + lat = last_company_address_row['lat'] + lon = last_company_address_row['lon'] + if not lat or not lon: + pass + else: + locations.append([lat,lon]) + elif node['type'] == 'Address': + address_row = list(filter(lambda d: d.get('address') == node['node'], network.addresses))[0] + lat = address_row['lat'] + lon = address_row['lon'] + if not lat or not lon: + pass + else: + locations.append([lat,lon]) + return locations + def get_address_path(network, company_id): + """Returns list of historic addresses for input company (company_id).""" company_address_history = list(filter(lambda d: d.get('company_number') == company_id, network.address_history)) company_address_history_sorted = sorted(company_address_history, key=lambda d: d['start_date'], reverse=True) address_path = [] @@ -21,38 +129,8 @@ def get_address_path(network, company_id): address_path.insert(0,[row['lat'], row['lon']]) return address_path -def locations_from_origin_path(path, network): - locations = [] - for node in path: - if node['type'] == 'Company': - ### - company_address_history = list(filter(lambda d: d.get('company_number') == node['id'], network.address_history)) - company_address_history_sorted = sorted(company_address_history, key=lambda d: d['start_date'], reverse=True) - last_company_address_row = {} - for address_row in company_address_history_sorted: - if address_row['lat'] and address_row['lon']: - last_company_address_row = address_row - break - # last_company_address_row = list(filter(lambda d: d.get('company_number') == node['id'], network.address_history))[0] - if last_company_address_row: - lat = last_company_address_row['lat'] - lon = last_company_address_row['lon'] - if not lat or not lon: - pass - else: - locations.append([lat,lon]) - elif node['type'] == 'Address': - address_row = list(filter(lambda d: d.get('address') == node['node'], network.addresses))[0] - # address_row = network.addresses.loc[network.addresses['address'] == node['node']].iloc[:1] - lat = address_row['lat'] - lon = address_row['lon'] - if not lat or not lon: - pass - else: - locations.append([lat,lon]) - return locations - def on_button_clicked(address_path, path, location, address_trail, path_table, origin_trail, locations_from_origin, **kwargs): + """Adds data to map layers that will render when marker is clicked.""" address_trail.locations = address_path locations_from_origin[-1] = location origin_trail.locations = locations_from_origin @@ -60,6 +138,7 @@ def on_button_clicked(address_path, path, location, address_trail, path_table, o return def html_table_generator(path): + """Generates table for displaying origin path data.""" table_style = '' headers = ['Node Index', 'Node', 'Hop', 'Node Type', 'Link'] headers_row = "" @@ -70,67 +149,3 @@ def html_table_generator(path): nodes += '' + node['node_index'] + '' + str(node['node']) + '' + str(node['hop']) + '' + str(node['node_type']) + '' + str(node['link']) + '' table_html = table_style + '' + headers_row + '' + nodes + '
' return table_html - -def load_map_data(network): - address_trail = AntPath( - locations=[], - dash_array=[1,10], - delay=1000, - color='#ed2f2f', - pulse_color='#FFFFFF' - ) - origin_trail = AntPath( - locations=[], - dash_array=[1,10], - delay=1000, - color='#000000', - pulse_color='#FFFFFF' - ) - path_table = HTML( - value="" - ) - m = Map(center=(50, 0), - zoom=5, - layout=Layout(width='90%', height='650px')) - m.add_layer(address_trail) - m.add_layer(origin_trail) - marker_cluster = MarkerCluster( - center=(50, 0), - markers=get_marker_data(network, address_trail, origin_trail, path_table), - disable_clustering_at_zoom = 25, - max_cluster_radius = 25 - ) - m.add_layer(marker_cluster) - return m, path_table - -def get_marker_data(network,address_trail, origin_trail, path_table): - address_trail=address_trail - origin_trail=origin_trail - ms = [] - for index, row in enumerate(network.address_history): - if row['lat'] and row['lon']: - path = "" - locations_from_origin = "" - message = HTML() - marker_color = "green" - company = list(filter(lambda d: d.get('company_number') == row['company_number'], network.companies))[0] - # company = network.companies.loc[network.companies['company_number'] == row['company_number']] - company_name = company['company_name'] - company_status = company['company_status'] - if company_status == "active": - if row['end_date']: - marker_color = "red" - else: - marker_color = "black" - address = row['address'] - path = network.find_path(str(row['company_number'])) - locations_from_origin = locations_from_origin_path(path, network) - message.value = str(company_name) + "
" + str(address) - icon = AwesomeIcon( - marker_color=marker_color - ) - address_path = get_address_path(network,str(row['company_number'])) - marker = Marker(icon=icon, opacity=1, location=(row['lat'], row['lon']), draggable=False, popup=message, title="Address") - marker.on_click(functools.partial(on_button_clicked, address_path=address_path, address_trail=address_trail, path_table=path_table, origin_trail=origin_trail, path=path, location=(row['lat'], row['lon']), locations_from_origin = locations_from_origin)) - ms.append(marker) - return ms diff --git a/sugartrail/processing.py b/sugartrail/processing.py index b1486da..92ffc8c 100644 --- a/sugartrail/processing.py +++ b/sugartrail/processing.py @@ -7,6 +7,7 @@ import regex as re import collections def flatten(d, parent_key='', sep='.'): + """Flatten nested dictionary.""" items = [] for k, v in d.items(): new_key = parent_key + sep + k if parent_key else k @@ -17,6 +18,7 @@ def flatten(d, parent_key='', sep='.'): return dict(items) def infer_postcode(address_string): + """Extracts UK postcode from input address string with regex.""" postcode = re.findall(r'\b[A-Z]{1,2}[0-9][A-Z0-9]? [0-9][ABD-HJLNP-UW-Z]{2}\b', address_string) if postcode: return postcode[0] @@ -24,12 +26,15 @@ def infer_postcode(address_string): return def get_companies_from_address_database(address, company_data): + """Searches input dataframe (company_data) for companies at input address + (address) and returns list of dicts.""" companies = company_data[company_data[' RegAddress.AddressLine2'].apply(lambda x: str(x).upper() in address.upper()) | company_data['RegAddress.AddressLine1'].apply(lambda x: str(x).upper() in address.upper()) & company_data['RegAddress.PostCode'].apply(lambda x: str(x).upper() in address.upper())] companies = companies.rename(columns={'CompanyName': 'company_name', ' CompanyNumber': 'company_number', 'CompanyStatus': 'company_status', 'CompanyCategory': 'company_type', 'RegAddress.AddressLine1': 'address_line_1', ' RegAddress.AddressLine2': 'address_line_2', 'RegAddress.PostCode': 'postal_code', 'RegAddress.PostTown': 'locality', 'RegAddress.Country': 'country', 'IncorporationDate':'date_of_creation', 'DissolutionDate': 'date_of_cessation'}) companies['registered_office_address'] = [{'address_line_1': row['address_line_1'], 'address_line_2': row['address_line_2'], 'locality': row['locality'], 'postal_code': row['postal_code'], 'country': row['country']} for i,row in companies.iterrows()] return companies.to_dict('records') def get_nearby_postcode(postcode_string): + """Find closest nearby postcode to input postcode (postcode_string).""" url = "http://api.postcodes.io/postcodes/" + postcode_string[:-1] + "/autocomplete" response = requests.get(url).json() if response['result'] != None: @@ -44,6 +49,7 @@ def get_nearby_postcode(postcode_string): return closest_address["postcode"] def get_coords_from_address(address_string): + """Attempt retrieval of coords for input address string.""" address = urllib.parse.quote(address_string) url = 'https://nominatim.openstreetmap.org/search/' + urllib.parse.quote(address) +'?format=json' response = requests.get(url).json() @@ -70,11 +76,14 @@ def get_coords_from_address(address_string): print("No postcode found for: " + address_string) def normalise_name(name): + """Move first word (often surname) from the beginning to the end of string.""" name_list = name.replace(',','').split(" ") name_list.append(name_list.pop(0)) return ' '.join(name_list) def process_address_changes(address_changes): + """Attempt retrieval of 'new_address' value if Companies House record is + incomplete.""" for i in reversed(range(1,len(address_changes['items']))): if 'new_address' not in address_changes['items'][i]['description_values'].keys(): if 'old_address' in address_changes['items'][i-1]['description_values'].keys(): @@ -82,6 +91,8 @@ def process_address_changes(address_changes): return address_changes def build_address_history(company_id): + """Returns a list of dicts containing historic addresses for input company + (company_id).""" company_info = api.get_company(company_id) if company_info: company_info_subset = {k:company_info[k] for k in ("date_of_creation","date_of_cessation","registered_office_address") if k in company_info} @@ -89,6 +100,7 @@ def build_address_history(company_id): address_keys = ('start_date','end_date','address') if address_changes: if address_changes['items']: + # attempt to retrieve any missing items within address changes address_changes = process_address_changes(address_changes) addresses = [] entry = {} @@ -148,6 +160,7 @@ def build_address_history(company_id): return [] def normalise_address(address_dict): + """Joins address key values into a single str.""" address_list = [] for key in ['premises','address_line_1', 'locality','postal_code', 'country']: if key in address_dict: