From 2a5c5ef807b9716bea298f1252b18c4dbc5dee29 Mon Sep 17 00:00:00 2001 From: Matvey-Kuk Date: Mon, 12 Jun 2017 13:42:38 -0700 Subject: [PATCH 1/5] Ability to disable validation --- src/openbmp/api/parsed/message/Base.py | 33 +++++++++++++++---- .../api/parsed/message/UnicastPrefix.py | 5 +-- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/src/openbmp/api/parsed/message/Base.py b/src/openbmp/api/parsed/message/Base.py index 3db49a7..f0d2ed6 100644 --- a/src/openbmp/api/parsed/message/Base.py +++ b/src/openbmp/api/parsed/message/Base.py @@ -8,6 +8,7 @@ from abc import ABCMeta, abstractmethod import json +import re class Base(object): @@ -24,6 +25,24 @@ class Base(object): __metaclass__ = ABCMeta + @staticmethod + def isplit(string, delimiter=None): + """Like string.split but returns an iterator (lazy) + Multiple character delimters are not handled. + """ + if delimiter is None: + # Handle whitespace by default + delim = r"\s" + + elif len(delimiter) != 1: + raise ValueError("Can only handle single character delimiters", delimiter) + + else: + # Escape, incase it's "\", "*" etc. + delim = re.escape(delimiter) + + return (x.group(0) for x in re.finditer(r"[^{}]+".format(delim), string)) + def __init__(self): """Initializes the class variables.""" # Default message bus specification version (max) supported @@ -52,13 +71,14 @@ def get_row_map(self): """ return self.row_map - def parse(self, version, data): + def parse(self, version, data, validate=True): """ Parse TSV rows of data from message :param version: Float representation of maximum message bus specification version supported. See http://openbmp.org/#!docs/MESSAGE_BUS_API.md for more details. :param data: TSV data (MUST not include the headers) + :param validate: If required to validate every field with its corresponding processor :return: True if error, False if no errors """ @@ -71,17 +91,16 @@ def parse(self, version, data): if len(self.header_names) == 0: raise Exception("header_names should be overriden.") - records = data.splitlines() # Splits data into records. - # Splits each record into fields. - for r in records: + for r in Base.isplit(data, "\n"): fields = r.split('\t') # Fields of a record as array. fields_map = dict(zip(self.header_names, fields)) - # Process and validate each field with its corresponding processor. - for (f, p, h) in zip(fields, self.get_processors(), self.header_names): - fields_map[h] = p.process_value(f) + if validate: + # Process and validate each field with its corresponding processor. + for (f, p, h) in zip(fields, self.get_processors(), self.header_names): + fields_map[h] = p.process_value(f) self.row_map.append(fields_map) diff --git a/src/openbmp/api/parsed/message/UnicastPrefix.py b/src/openbmp/api/parsed/message/UnicastPrefix.py index 231b02a..b8c066c 100644 --- a/src/openbmp/api/parsed/message/UnicastPrefix.py +++ b/src/openbmp/api/parsed/message/UnicastPrefix.py @@ -48,11 +48,12 @@ class UnicastPrefix(Base): MsgBusFields.ORIGINATOR_ID.get_name() ] - def __init__(self, message): + def __init__(self, message, validate=True): """ Handle the message by parsing it and storing the data in memory. :param message: 'Message' object. + :param message: If required to validate every field with its corresponding processor """ if not isinstance(message, Message): raise TypeError("Expected Message object instead of type " + type(message)) @@ -80,7 +81,7 @@ def __init__(self, message): # Concatenate minimum header names and version specific header names. self.header_names = UnicastPrefix.minimum_header_names + version_specific_headers - self.parse(version, data) + self.parse(version, data, validate=validate) def get_processors(self): """ From 2f95ad8e43dfdc638889ed92e8be407cb253e671 Mon Sep 17 00:00:00 2001 From: Matvey-Kuk Date: Mon, 12 Jun 2017 18:41:59 -0700 Subject: [PATCH 2/5] Select fields when retreive UnicastPrefixes --- src/openbmp/api/parsed/message/Base.py | 19 ++++--- src/openbmp/api/parsed/message/Message.py | 55 ++++++++++--------- .../api/parsed/message/UnicastPrefix.py | 4 +- 3 files changed, 43 insertions(+), 35 deletions(-) diff --git a/src/openbmp/api/parsed/message/Base.py b/src/openbmp/api/parsed/message/Base.py index f0d2ed6..e998024 100644 --- a/src/openbmp/api/parsed/message/Base.py +++ b/src/openbmp/api/parsed/message/Base.py @@ -71,7 +71,7 @@ def get_row_map(self): """ return self.row_map - def parse(self, version, data, validate=True): + def parse(self, version, data, validate=True, required_fields=None): """ Parse TSV rows of data from message @@ -95,12 +95,17 @@ def parse(self, version, data, validate=True): for r in Base.isplit(data, "\n"): fields = r.split('\t') # Fields of a record as array. - fields_map = dict(zip(self.header_names, fields)) - - if validate: - # Process and validate each field with its corresponding processor. - for (f, p, h) in zip(fields, self.get_processors(), self.header_names): - fields_map[h] = p.process_value(f) + fields_map = {} + + if required_fields: + for key in required_fields: + fields_map[required_fields[key]] = fields[key] + else: + fields_map = dict(zip(self.header_names, fields)) + if validate: + # Process and validate each field with its corresponding processor. + for (f, p, h) in zip(fields, self.get_processors(), self.header_names): + fields_map[h] = p.process_value(f) self.row_map.append(fields_map) diff --git a/src/openbmp/api/parsed/message/Message.py b/src/openbmp/api/parsed/message/Message.py index a90b7f7..f3eb758 100644 --- a/src/openbmp/api/parsed/message/Message.py +++ b/src/openbmp/api/parsed/message/Message.py @@ -15,7 +15,7 @@ class Message(object): TYPE_PEER = "PEER" TYPE_ROUTER = "ROUTER" - def __init__(self, data): + def __init__(self, data, parse_headers=True): """ Handle the message by parsing header of it. @@ -35,48 +35,51 @@ def __init__(self, data): self.content_pos = int() self.router_ip = str() - self.__parse(data) + self.__parse(data, parse_headers) - def __parse(self, data): + def __parse(self, data, parse_headers=True): """ Parses header of raw Kafka messages and set the version, length, number of records and router hash id. :param data: Raw Kafka message as string. """ - data_end_pos = data.find("\n\n") - header_data = data[:data_end_pos] + if parse_headers: + data_end_pos = data.find("\n\n") + header_data = data[:data_end_pos] - self.content_pos = data_end_pos + 2 - self.content = data[self.content_pos:] + self.content_pos = data_end_pos + 2 + self.content = data[self.content_pos:] - headers = header_data.split("\n") + headers = header_data.split("\n") - for header in headers: - value = header.split(":")[1].strip() - attr = header.split(":")[0].strip() + for header in headers: + value = header.split(":")[1].strip() + attr = header.split(":")[0].strip() - # Attribute names are from http://openbmp.org/#!docs/MESSAGE_BUS_API.md headers - if attr == "V": - self.version = float(value) + # Attribute names are from http://openbmp.org/#!docs/MESSAGE_BUS_API.md headers + if attr == "V": + self.version = float(value) - elif attr == "C_HASH_ID": - self.collector_hash_id = value + elif attr == "C_HASH_ID": + self.collector_hash_id = value - elif attr == "T": - self.type = value + elif attr == "T": + self.type = value - elif attr == "L": - self.length = long(value) + elif attr == "L": + self.length = long(value) - elif attr == "R": - self.records = long(value) + elif attr == "R": + self.records = long(value) - elif attr == "R_HASH_ID": - self.router_hash_id = value + elif attr == "R_HASH_ID": + self.router_hash_id = value - elif attr == "R_IP": - self.router_ip = value + elif attr == "R_IP": + self.router_ip = value + else: + self.content = data def get_version(self): return self.version diff --git a/src/openbmp/api/parsed/message/UnicastPrefix.py b/src/openbmp/api/parsed/message/UnicastPrefix.py index b8c066c..9d1347e 100644 --- a/src/openbmp/api/parsed/message/UnicastPrefix.py +++ b/src/openbmp/api/parsed/message/UnicastPrefix.py @@ -48,7 +48,7 @@ class UnicastPrefix(Base): MsgBusFields.ORIGINATOR_ID.get_name() ] - def __init__(self, message, validate=True): + def __init__(self, message, validate=True, required_fields=None): """ Handle the message by parsing it and storing the data in memory. @@ -81,7 +81,7 @@ def __init__(self, message, validate=True): # Concatenate minimum header names and version specific header names. self.header_names = UnicastPrefix.minimum_header_names + version_specific_headers - self.parse(version, data, validate=validate) + self.parse(version, data, validate=validate, required_fields=required_fields) def get_processors(self): """ From cae5c1a4a6d9fb3b04cf6d50edf5c76144af4f46 Mon Sep 17 00:00:00 2001 From: Matvey-Kuk Date: Thu, 22 Jun 2017 14:19:02 -0700 Subject: [PATCH 3/5] Comments for new features --- src/openbmp/api/parsed/message/Base.py | 8 ++++++-- src/openbmp/api/parsed/message/Message.py | 2 ++ src/openbmp/api/parsed/message/UnicastPrefix.py | 7 ++++++- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/openbmp/api/parsed/message/Base.py b/src/openbmp/api/parsed/message/Base.py index e998024..302fb59 100644 --- a/src/openbmp/api/parsed/message/Base.py +++ b/src/openbmp/api/parsed/message/Base.py @@ -27,8 +27,8 @@ class Base(object): @staticmethod def isplit(string, delimiter=None): - """Like string.split but returns an iterator (lazy) - Multiple character delimters are not handled. + """ + Like string.split but returns an iterator (lazy) """ if delimiter is None: # Handle whitespace by default @@ -79,6 +79,10 @@ def parse(self, version, data, validate=True, required_fields=None): See http://openbmp.org/#!docs/MESSAGE_BUS_API.md for more details. :param data: TSV data (MUST not include the headers) :param validate: If required to validate every field with its corresponding processor + :param required_fields: If needed to parse only feq fields ans speed up parsing. + Example: {10: 'prefix', 11: "prefix_len"} where: + "10" and "11" - positions of fields in MESSAGE_BUS_API, + "prefix" and "prefix_len" - name of parsed fields in resulting dictionary. :return: True if error, False if no errors """ diff --git a/src/openbmp/api/parsed/message/Message.py b/src/openbmp/api/parsed/message/Message.py index f3eb758..157c193 100644 --- a/src/openbmp/api/parsed/message/Message.py +++ b/src/openbmp/api/parsed/message/Message.py @@ -20,6 +20,7 @@ def __init__(self, data, parse_headers=True): Handle the message by parsing header of it. :param data: Raw Kafka message as string. + :param parse_headers: If headers parsing is required. May be disabled to speed up. """ if not data.strip(): # If "data" is not string, throws error. @@ -42,6 +43,7 @@ def __parse(self, data, parse_headers=True): Parses header of raw Kafka messages and set the version, length, number of records and router hash id. :param data: Raw Kafka message as string. + :param parse_headers: If headers parsing is required. May be disabled to speed up. """ if parse_headers: diff --git a/src/openbmp/api/parsed/message/UnicastPrefix.py b/src/openbmp/api/parsed/message/UnicastPrefix.py index 9d1347e..b3d3945 100644 --- a/src/openbmp/api/parsed/message/UnicastPrefix.py +++ b/src/openbmp/api/parsed/message/UnicastPrefix.py @@ -53,7 +53,12 @@ def __init__(self, message, validate=True, required_fields=None): Handle the message by parsing it and storing the data in memory. :param message: 'Message' object. - :param message: If required to validate every field with its corresponding processor + :param validate: If required to validate every field with its corresponding processor + :param validate: If required to validate every field with its corresponding processor + :param required_fields: If needed to parse only feq fields ans speed up parsing. + Example: {10: 'prefix', 11: "prefix_len"} where: + "10" and "11" - positions of fields in MESSAGE_BUS_API, + "prefix" and "prefix_len" - name of parsed fields in resulting dictionary. """ if not isinstance(message, Message): raise TypeError("Expected Message object instead of type " + type(message)) From 8e564fc1e2bc2496788ba4cd2f6bb3f86faeaf1b Mon Sep 17 00:00:00 2001 From: Matvey-Kuk Date: Thu, 22 Jun 2017 17:08:04 -0700 Subject: [PATCH 4/5] UnitTests for Message (optional parse_headers) and UnicastPrefixes (optional validation and required_fields), validation bug fix --- examples/log_consumer.py | 4 ++ src/openbmp/api/parsed/message/Base.py | 6 ++- tests/FixturesBasedTestCase.py | 8 ++++ tests/__init__.py | 0 tests/fixtures/unicast_prefixes_message | 7 ++++ tests/test_message.py | 28 ++++++++++++++ tests/test_unicast_prefix.py | 51 +++++++++++++++++++++++++ 7 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 tests/FixturesBasedTestCase.py create mode 100644 tests/__init__.py create mode 100644 tests/fixtures/unicast_prefixes_message create mode 100644 tests/test_message.py create mode 100644 tests/test_unicast_prefix.py diff --git a/examples/log_consumer.py b/examples/log_consumer.py index c62ddf6..97fd773 100644 --- a/examples/log_consumer.py +++ b/examples/log_consumer.py @@ -52,6 +52,10 @@ def process_message(msg): elif t == "openbmp.parsed.unicast_prefix": unicast_prefix = UnicastPrefix(m) + + with open('temp', "w") as file: + file.write(msg.value + '\n') + print '\n' + 'Received Message (' + t_stamp + ') : ' + m_tag + '(V: ' + str(m.version) + ')' print unicast_prefix.to_json_pretty() diff --git a/src/openbmp/api/parsed/message/Base.py b/src/openbmp/api/parsed/message/Base.py index a1dd455..cb63107 100644 --- a/src/openbmp/api/parsed/message/Base.py +++ b/src/openbmp/api/parsed/message/Base.py @@ -108,7 +108,11 @@ def parse(self, version, data, validate=True, required_fields=None): if required_fields: for key in required_fields: - fields_map[required_fields[key]] = fields[key] + if validate: + processor_class = self.get_processors()[key] + fields_map[required_fields[key]] = processor_class.process_value(fields[key]) + else: + fields_map[required_fields[key]] = fields[key] else: if len(fields) >= len(self.processors): fields_map = dict(zip(self.header_names, fields)) diff --git a/tests/FixturesBasedTestCase.py b/tests/FixturesBasedTestCase.py new file mode 100644 index 0000000..3a8f1a1 --- /dev/null +++ b/tests/FixturesBasedTestCase.py @@ -0,0 +1,8 @@ +import unittest + + +class FixturesBasedTestCase(unittest.TestCase): + + def setUp(self): + with open('fixtures/unicast_prefixes_message', 'r') as unicast_prefix_message_file: + self.unicast_prefix_message = unicast_prefix_message_file.read() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/fixtures/unicast_prefixes_message b/tests/fixtures/unicast_prefixes_message new file mode 100644 index 0000000..581d90b --- /dev/null +++ b/tests/fixtures/unicast_prefixes_message @@ -0,0 +1,7 @@ +V: 1.5 +C_HASH_ID: 9ae8148974c9ca01ec9271753426d214 +L: 290976 +R: 2 + +add 465863 2783130a95f3ee3c238f38eb1b898b81 eee2f394c09f96c2453bf83989eaa1a2 172.20.0.1 d651b1d601e34826247374cb1430c91c 7aefefa6df92a1fccf2fd8a50d8900de 12.12.12.12 1403 2017-06-22 23:33:04.101567 12.12.12.0 20 1 igp 1403 999 777 3 9829 14.14.14.14 0 0 14032:1299 0 1 0 1 1 +add 465864 b1c8a35dcec465d155e6d31f190153fa eee2f394c09f96c2453bf83989eaa1a2 172.20.0.1 d651b1d601e34826247374cb1430c91c 7aefefa6df92a1fccf2fd8a50d8900de 12.12.123.123 1403 2017-06-22 23:33:04.101567 12.12.123.0 22 1 igp 1403 999 777 3 9829 14.14.14.144 0 0 14032:1299 0 1 0 1 1 \ No newline at end of file diff --git a/tests/test_message.py b/tests/test_message.py new file mode 100644 index 0000000..6ea523e --- /dev/null +++ b/tests/test_message.py @@ -0,0 +1,28 @@ +from FixturesBasedTestCase import FixturesBasedTestCase + +from openbmp.api.parsed.message import Message + + +class MessageTest(FixturesBasedTestCase): + + def test_headers_parsing(self): + """ + Test default Message headers parsing + """ + message = Message(self.unicast_prefix_message) + + self.assertEqual(1.5, message.version) + self.assertEqual("9ae8148974c9ca01ec9271753426d214", message.collector_hash_id) + self.assertEqual(290976, message.length) + self.assertEqual(2, message.records) + + def test_disabled_headers_parsing(self): + """ + Test disabled Message headers parsing + """ + message = Message(self.unicast_prefix_message, parse_headers=False) + + self.assertEqual(0.0, message.version) + self.assertEqual("", message.collector_hash_id) + self.assertEqual(0, message.length) + self.assertEqual(0, message.records) diff --git a/tests/test_unicast_prefix.py b/tests/test_unicast_prefix.py new file mode 100644 index 0000000..f476336 --- /dev/null +++ b/tests/test_unicast_prefix.py @@ -0,0 +1,51 @@ +from FixturesBasedTestCase import FixturesBasedTestCase + +from openbmp.api.parsed.message import Message, UnicastPrefix + + +class UnicastPrefixTest(FixturesBasedTestCase): + + def test_default_parsing(self): + """ + Test default UnicastPrefix parsing + """ + message = Message(self.unicast_prefix_message) + unicast_prefixes = UnicastPrefix(message) + row_map = unicast_prefixes.get_row_map() + + self.assertEqual(2, len(row_map)) + + self.assertEqual("12.12.12.0", row_map[0]['prefix']) + self.assertEqual(20, row_map[0]['prefix_len']) + + self.assertEqual("12.12.123.0", row_map[1]['prefix']) + self.assertEqual(22, row_map[1]['prefix_len']) + + def test_parsing_without_validation(self): + """ + If disable validation, it should still produce the same output but + numerical fields should be serializes as String + """ + message = Message(self.unicast_prefix_message) + unicast_prefixes = UnicastPrefix(message, validate=False) + row_map = unicast_prefixes.get_row_map() + + self.assertEqual("20", row_map[0]['prefix_len']) + + def test_parsing_with_required_fields(self): + """ + Test how UnicastPrefix works with custom required_fields parameter + """ + message = Message(self.unicast_prefix_message) + + # Without validation + unicast_prefixes = UnicastPrefix(message, validate=False, required_fields={11: "my_custom_prefix_name"}) + row_map = unicast_prefixes.get_row_map() + + self.assertEqual('20', row_map[0]['my_custom_prefix_name']) + + # With validation + unicast_prefixes = UnicastPrefix(message, validate=True, required_fields={11: "my_custom_prefix_name"}) + row_map = unicast_prefixes.get_row_map() + + self.assertEqual(20, row_map[0]['my_custom_prefix_name']) From df30ccc2dacec21a390a0287c75324300a27de0a Mon Sep 17 00:00:00 2001 From: Matvey-Kuk Date: Thu, 22 Jun 2017 17:10:01 -0700 Subject: [PATCH 5/5] Making fixture writer optional --- examples/log_consumer.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/log_consumer.py b/examples/log_consumer.py index 97fd773..3da3326 100644 --- a/examples/log_consumer.py +++ b/examples/log_consumer.py @@ -53,8 +53,9 @@ def process_message(msg): elif t == "openbmp.parsed.unicast_prefix": unicast_prefix = UnicastPrefix(m) - with open('temp', "w") as file: - file.write(msg.value + '\n') + # Optional fixture writer + # with open('unicast_prefixes_message', "w+") as file: + # file.write(msg.value + '\n') print '\n' + 'Received Message (' + t_stamp + ') : ' + m_tag + '(V: ' + str(m.version) + ')' print unicast_prefix.to_json_pretty()