Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 94 additions & 59 deletions safer/html.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@


def debug_print_element(e):
print(html.tostring(e).decode("utf-8"))
print(html.tostring(e))
print("\n\n\n")


def get_first_item_or_none(items):
if len(items) == 0:
return None
return items[0]


def process_extracted_text(extracted_data):
if isinstance(extracted_data, list):
if len(extracted_data) > 1:
Expand Down Expand Up @@ -145,18 +151,38 @@ def process_final_dictionary(data):

# Formatting Mileage Year to a dictionary
data["mcs_150_mileage_year"] = {
"mileage": int(data["mcs_150_mileage_year"].split(" ")[0].replace(",", ""))
if data["mcs_150_mileage_year"]
else None,
"year": int(
data["mcs_150_mileage_year"].split(" ")[1].replace("(", "").replace(")", "")
)
if data["mcs_150_mileage_year"]
else None,
"mileage": (
int(data["mcs_150_mileage_year"].split(" ")[0].replace(",", ""))
if data["mcs_150_mileage_year"]
else None
),
"year": (
int(
data["mcs_150_mileage_year"]
.split(" ")[1]
.replace("(", "")
.replace(")", "")
)
if data["mcs_150_mileage_year"]
else None
),
}

data["safety_rating_date"] = None
data["safety_review_date"] = None
data["safety_rating"] = None
data["safety_type"] = None

# HTML Returns -- for the Duns number when it should just be None or blank
data["duns_number"] = data["duns_number"] if data["duns_number"] != "--" else None
data["state_carrier_id"] = (
data["state_carrier_id"] if data["state_carrier_id"] != "" else None
)
data["dba_name"] = data["dba_name"] if data["dba_name"] != "" else None

if data["out_of_service_date"] == "None":
data["out_of_service_date"] = None

return data


Expand Down Expand Up @@ -186,7 +212,7 @@ def process_search_result_html(tree):
# Parsing the USDOT number out of the xpath
c_id_parsed = parse_qsl(c_id[10:])
# Getting the Raw HTML.
c_raw_html = html.tostring(item, pretty_print=True).decode("utf-8")
c_raw_html = html.tostring(item, pretty_print=True)

result_set.append(
{
Expand Down Expand Up @@ -226,97 +252,106 @@ def process_company_snapshot(tree):
crashes_tables = tree.xpath('//table[@summary="Crashes"]')

fields = {
"entity_type": "tr[2]/td/text()",
"legal_name": "tr[4]/td/text()",
"dba_name": "tr[5]/td/text()",
"physical_address": "tr[6]/td/text()",
"phone": "tr[7]/td/text()",
"mailing_address": "tr[8]/td/text()",
"usdot": "tr[9]/td[1]/text()",
"state_carrier_id": "tr[9]/td[2]/text()",
"mc_mx_ff_numbers": "tr[10]/td[1]/a/text()",
"duns_number": "tr[10]/td[2]/text()",
"power_units": "tr[11]/td[1]/text()",
"drivers": "tr[11]/td[2]/font/b/text()",
"mcs_150_form_date": "tr[12]/td[1]/text()",
"mcs_150_mileage_year": "tr[12]/td[2]/font/b/text()",
"entity_type": "tr[3]/td/text()",
"usdot_status": "tr[4]/td[1]/text()",
"legal_name": "tr[11]/td/text()",
"dba_name": "tr[12]/td/text()",
"physical_address": "tr[13]/td/text()",
"phone": "tr[14]/td/text()",
"mailing_address": "tr[15]/td/text()",
"usdot": "tr[5]/td[1]/text()",
"state_carrier_id": "tr[5]/td[2]/text()",
"mc_mx_ff_numbers": "tr[9]/td[1]/a/text()",
"duns_number": "tr[16]/td/text()",
"power_units": "tr[17]/td[1]/text()",
"drivers": "tr[17]/td[2]/font/b/text()",
"mcs_150_form_date": "tr[6]/td[1]/text()",
"mcs_150_mileage_year": "tr[6]/td[2]/font/b/text()",
}

for item in fields:
fields[item] = process_extracted_text(general_info_table.xpath(fields[item]))
parsed_fields = {}

for item in fields.items():
parsed_fields[item[0]] = process_extracted_text(
general_info_table.xpath(item[1])
)

# Out of Service Date comes in as a string 'None' if None
fields["out_of_service_date"] = process_extracted_text(
general_info_table.xpath("tr[3]/td[2]/text()")
parsed_fields["out_of_service_date"] = process_extracted_text(
general_info_table.xpath("tr[4]/td[2]/text()")
)
if fields["out_of_service_date"] == "None":
fields["out_of_service_date"] = None

# Getting Operating Status out of HTML, must be done outside of loop because it requires more decisiveness
if not isinstance(
process_extracted_text(general_info_table.xpath("tr[3]/td[1]/text()")), str
):
fields["operating_status"] = process_extracted_text(
general_info_table.xpath("tr[3]/td[1]/font/b/text()")
operating_status_data_items_length = len(general_info_table.xpath("tr[8]/td"))
if operating_status_data_items_length > 0:
operating_status_font_wrapped_based = process_extracted_text(
general_info_table.xpath("tr[8]/td[1]/font/b/text()")
)
else:
fields["operating_status"] = process_extracted_text(
general_info_table.xpath("tr[3]/td[1]/text()")
operating_status_non_wrapped = process_extracted_text(
general_info_table.xpath("tr[8]/td[1]/text()")
)

parsed_fields["operating_authority_status"] = (
operating_status_font_wrapped_based or operating_status_non_wrapped
)
else:
parsed_fields["operating_authority_status"] = None

# Getting Operation Classifications from a list of classifications if the table exists in the HTML
# Checks the HTML for all table rows that contain and X next to them
if len(operation_classification_table) == 1:
fields["operation_classification"] = []
parsed_fields["operation_classification"] = []
operation_classification_table = operation_classification_table.pop(0)
for classification in operation_classification_table.xpath(
"tr[2]/td/table/tr[.//td[@class='queryfield']/text() = 'X']/td/font/text()"
):
fields["operation_classification"].append(classification)
parsed_fields["operation_classification"].append(classification)
last_val = operation_classification_table.xpath(
"tr[2]/td[3]/table/tr[5]/td[2]/text()"
)
if len(last_val) > 0:
fields["operation_classification"].append(process_extracted_text(last_val))
parsed_fields["operation_classification"].append(
process_extracted_text(last_val)
)

# Parsing out Carrier Operation from the list of types
# Checks the HTML for all table rows that contain and X next to them
if len(carrier_operation_table) == 1:
fields["carrier_operation"] = []
parsed_fields["carrier_operation"] = []
carrier_operation_table = carrier_operation_table.pop(0)
for operation in carrier_operation_table.xpath(
"tr[2]/td/table/tr[.//td[@class='queryfield']/text() = 'X']/td/font/text()"
):
fields["carrier_operation"].append(operation)
parsed_fields["carrier_operation"].append(operation)

# Parsing out Shipper Opertation from the list of types if the table exists in the HTML
# Checks the HTML for all table rows that contain and X next to thema
if len(hm_shipper_operation_table) == 1:
fields["hm_shipper_operation"] = []
parsed_fields["hm_shipper_operation"] = []
hm_shipper_operation_table = hm_shipper_operation_table.pop(0)
for operation in hm_shipper_operation_table.xpath(
"tr[2]/td/table/tr[.//td[@class='queryfield']/text() = 'X']/td/font/text()"
):
fields["hm_shipper_operation"].append(operation)
parsed_fields["hm_shipper_operation"].append(operation)
else:
fields["hm_shipper_operation"] = None
parsed_fields["hm_shipper_operation"] = None

# Parsing out the type of cargo this carrier is authorized or carry if the table exists in the HTML
# Checks the HTML for all table rows that contain and X next to them
if len(cargo_carried_table) == 1:
fields["cargo_carried"] = []
parsed_fields["cargo_carried"] = []
cargo_carried_table = cargo_carried_table.pop(0)
for cargo in cargo_carried_table.xpath(
"tr[2]/td/table/tr[.//td[@class='queryfield']/text() = 'X']/td/font/text()"
):
fields["cargo_carried"].append(cargo)
parsed_fields["cargo_carried"].append(cargo)

# Parsing the data from tables into nested dictionaries.

# Parsing the inspections in the United Status if the tables exist in the HTML
if len(inspections_tables) == 2:
us_inspections_table = inspections_tables[0]
fields["united_states_inspections"] = {
parsed_fields["united_states_inspections"] = {
"vehicle": {
"inspections": process_extracted_text(
us_inspections_table.xpath("tr[2]/td[1]/text()")
Expand Down Expand Up @@ -378,7 +413,7 @@ def process_company_snapshot(tree):
# Parsing the crashes in the United States if the tables exist in the HTML
if len(crashes_tables) == 2:
us_crashes_table = crashes_tables[0]
fields["united_states_crashes"] = {
parsed_fields["united_states_crashes"] = {
"fatal": process_extracted_text(
us_crashes_table.xpath("tr[2]/td[1]/text()")
),
Expand All @@ -394,7 +429,7 @@ def process_company_snapshot(tree):
# Parsing the inspections in Canada if the tables exist in the HTML
if len(inspections_tables) == 2:
canada_inspections_table = inspections_tables[1]
fields["canada_inspections"] = {
parsed_fields["canada_inspections"] = {
"vehicle": {
"inspections": process_extracted_text(
canada_inspections_table.xpath("tr[2]/td[1]/text()")
Expand Down Expand Up @@ -422,7 +457,7 @@ def process_company_snapshot(tree):
# Parsing the crashes in Canada if the tables exist in the HTML
if len(crashes_tables) == 2:
canada_crashes_table = crashes_tables[1]
fields["canada_crashes"] = {
parsed_fields["canada_crashes"] = {
"fatal": process_extracted_text(
canada_crashes_table.xpath("tr[2]/td[1]/text()")
),
Expand All @@ -440,23 +475,23 @@ def process_company_snapshot(tree):
# Parsing the Safety Rating if it exists in the HTML
if len(safety_rating_table) == 1:
safety_rating_table = safety_rating_table.pop(0)
fields["safety_rating_date"] = process_extracted_text(
parsed_fields["safety_rating_date"] = process_extracted_text(
safety_rating_table.xpath("tr[2]/td[1]/text()")
)
fields["safety_review_date"] = process_extracted_text(
parsed_fields["safety_review_date"] = process_extracted_text(
safety_rating_table.xpath("tr[2]/td[2]/text()")
)
fields["safety_rating"] = process_extracted_text(
parsed_fields["safety_rating"] = process_extracted_text(
safety_rating_table.xpath("tr[3]/td[1]/text()")
)
fields["safety_type"] = process_extracted_text(
parsed_fields["safety_type"] = process_extracted_text(
safety_rating_table.xpath("tr[3]/td[2]/text()")
)

# Parsing the latest update date.
fields["latest_update"] = process_extracted_text(
parsed_fields["latest_update"] = process_extracted_text(
tree.xpath("//b/font[@color='#0000C0']/text()")[-1]
)

fields = process_final_dictionary(fields)
return fields
parsed_fields = process_final_dictionary(parsed_fields)
return parsed_fields
6 changes: 3 additions & 3 deletions safer/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(self, data):

# Mapping values.
self.__entity_type = data["entity_type"]
self.__operating_status = data["operating_status"]
self.__operating_authority_status = data["operating_authority_status"]
self.__legal_name = data["legal_name"]
self.__dba_name = data["dba_name"]
self.__duns_number = data["duns_number"]
Expand Down Expand Up @@ -84,8 +84,8 @@ def __init__(self, data):
self.__raw["url"] = self.__url

@property
def operating_status(self):
return self.__operating_status
def operating_authority_status_status(self):
return self.__operating_authority_status

@property
def safety_review_data(self):
Expand Down
Loading