diff --git a/spot-ingest/ingest_conf.json b/spot-ingest/ingest_conf.json deleted file mode 100644 index 67da304..0000000 --- a/spot-ingest/ingest_conf.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "dbname" : "database name", - "hdfs_app_path" : "hdfs application path", - "collector_processes":5, - "ingestion_interval":1, - "spark-streaming":{ - "driver_memory":"", - "spark_exec":"", - "spark_executor_memory":"", - "spark_executor_cores":"", - "spark_batch_size":"" - }, - "kafka":{ - "kafka_server":"kafka ip", - "kafka_port":"kafka port", - "zookeper_server":"zk ip", - "zookeper_port":"zk port", - "message_size":900000 - }, - "pipelines":{ - "flow":{ - "type":"flow", - "collector_path":"/collector_path/flow", - "local_staging":"/tmp/", - "supported_files":["nfcapd."], - "process_opt":"" - }, - "dns":{ - "type":"dns", - "collector_path":"/collector_path/dns", - "local_staging":"/collector_path/dns/tmp", - "supported_files":[".pcap"], - "pkt_num":"650000", - "pcap_split_staging":"/collector_path/dns/dns_staging", - "process_opt":"-E separator=, -E header=y -E occurrence=f -T fields -e frame.time -e frame.time_epoch -e frame.len -e ip.src -e ip.dst -e dns.resp.name -e dns.resp.type -e dns.resp.class -e dns.flags.rcode -e dns.a 'dns.flags.response == 1'" - }, - "proxy":{ - "type":"proxy", - "collector_path":"/collector_path/proxy", - "supported_files":[".log"], - "parser":"bluecoat.py" - } - } -} diff --git a/spot-ingest/master_collector.py b/spot-ingest/master_collector.py index 07c5925..48349af 100755 --- a/spot-ingest/master_collector.py +++ b/spot-ingest/master_collector.py @@ -1,18 +1,17 @@ #!/bin/env python import argparse -import os -import json import sys from common.utils import Util from common.kerberos import Kerberos from common.kafka_client import KafkaTopic -import datetime +import datetime +import ConfigParser -# get master configuration. -script_path = os.path.dirname(os.path.abspath(__file__)) -conf_file = "{0}/ingest_conf.json".format(script_path) -master_conf = json.loads(open (conf_file).read()) +# initialize ConfigParser +conf_file = '/etc/spot.conf' +master_conf = ConfigParser.SafeConfigParser() +master_conf.read(conf_file) def main(): @@ -30,43 +29,46 @@ def start_collector(type,workers_num,id=None): # generate ingest id ingest_id = str(datetime.datetime.time(datetime.datetime.now())).replace(":","_").replace(".","_") - + # create logger. logger = Util.get_logger("SPOT.INGEST") # validate the given configuration exists in ingest_conf.json. - if not type in master_conf["pipelines"]: - logger.error("'{0}' type is not a valid configuration.".format(type)); + try: + master_conf.get(type, 'type') + except ConfigParser.NoSectionError: + logger.error("'{0}' type is not a valid configuration.".format(type)) sys.exit(1) # validate the type is a valid module. - if not Util.validate_data_source(master_conf["pipelines"][type]["type"]): - logger.error("'{0}' type is not configured. Please check you ingest conf file".format(master_conf["pipelines"][type]["type"])); + if not Util.validate_data_source(master_conf.get(type, "type")): + logger.error("{0} type is not configured. Please check /etc/spot.conf".format(type)) sys.exit(1) - + # validate if kerberos authentication is required. - if os.getenv('KRB_AUTH'): + if master_conf.get('kerberos', 'KRB_AUTH'): kb = Kerberos() kb.authenticate() - + # kafka server info. logger.info("Initializing kafka instance") - k_server = master_conf["kafka"]['kafka_server'] - k_port = master_conf["kafka"]['kafka_port'] + k_server = master_conf.get('ingest', 'kafka_server') + k_port = master_conf.get('ingest', 'kafka_port') # required zookeeper info. - zk_server = master_conf["kafka"]['zookeper_server'] - zk_port = master_conf["kafka"]['zookeper_port'] - + zk_server = master_conf.get('ingest', 'zookeeper_server') + zk_port = master_conf.get('ingest', 'zookeeper_port') + topic = "SPOT-INGEST-{0}_{1}".format(type,ingest_id) if not id else id kafka = KafkaTopic(topic,k_server,k_port,zk_server,zk_port,workers_num) # create a collector instance based on data source type. logger.info("Starting {0} ingest instance".format(topic)) - module = __import__("pipelines.{0}.collector".format(master_conf["pipelines"][type]["type"]),fromlist=['Collector']) + module = __import__("pipelines.{0}.collector".format(master_conf.get(type, 'type')),fromlist=['Collector']) # start collector. - ingest_collector = module.Collector(master_conf['hdfs_app_path'],kafka,type) + hdfs_app_path = master_conf.get('ingest', 'hdfs_app_path') + ingest_collector = module.Collector(hdfs_app_path,kafka,type) ingest_collector.start() if __name__=='__main__': diff --git a/spot-ingest/pipelines/dns/collector.py b/spot-ingest/pipelines/dns/collector.py index 1eae7d2..24282b5 100755 --- a/spot-ingest/pipelines/dns/collector.py +++ b/spot-ingest/pipelines/dns/collector.py @@ -1,24 +1,25 @@ -#/bin/env python +#!/bin/env python import time import os import subprocess -import json import logging +import ConfigParser from multiprocessing import Process from common.utils import Util from common.file_collector import FileWatcher from multiprocessing import Pool from common.kafka_client import KafkaTopic + class Collector(object): - def __init__(self,hdfs_app_path,kafka_topic,conf_type): - - self._initialize_members(hdfs_app_path,kafka_topic,conf_type) + def __init__(self, hdfs_app_path, kafka_topic, conf_type): + + self._initialize_members(hdfs_app_path, kafka_topic, conf_type) + + def _initialize_members(self, hdfs_app_path, kafka_topic, conf_type): - def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type): - # getting parameters. self._logger = logging.getLogger('SPOT.INGEST.DNS') self._hdfs_app_path = hdfs_app_path @@ -28,63 +29,64 @@ def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type): self._script_path = os.path.dirname(os.path.abspath(__file__)) # read dns configuration. - conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path))) - conf = json.loads(open(conf_file).read()) - self._conf = conf["pipelines"][conf_type] + conf_file = "/etc/spot.conf" + self._conf = ConfigParser.SafeConfigParser() + self._conf.read(conf_file) # set configuration. - self._collector_path = self._conf['collector_path'] + self._collector_path = self._conf.get(conf_type, 'collector_path') self._dsource = 'dns' self._hdfs_root_path = "{0}/{1}".format(hdfs_app_path, self._dsource) # set configuration. - self._pkt_num = self._conf['pkt_num'] - self._pcap_split_staging = self._conf['pcap_split_staging'] - self._supported_files = self._conf['supported_files'] + self._pkt_num = self._conf.get(conf_type, 'pkt_num') + self._pcap_split_staging = self._conf.get(conf_type, 'pcap_split_staging') + self._supported_files = self._conf.get(conf_type, 'supported_files') - # create collector watcher - self._watcher = FileWatcher(self._collector_path,self._supported_files) + # create collector watcher + self._watcher = FileWatcher(self._collector_path, self._supported_files) - # Multiprocessing. - self._processes = conf["collector_processes"] - self._ingestion_interval = conf["ingestion_interval"] + # Multiprocessing. + self._processes = self._conf.get(conf_type, 'collector_processes') + self._ingestion_interval = self._conf('ingest', 'ingestion_interval') self._pool = Pool(processes=self._processes) def start(self): - self._logger.info("Starting DNS ingest") + self._logger.info("Starting DNS ingest") self._watcher.start() try: while True: - self._ingest_files_pool() + self._ingest_files_pool() time.sleep(self._ingestion_interval) except KeyboardInterrupt: - self._logger.info("Stopping DNS collector...") - Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger) + self._logger.info("Stopping DNS collector...") + Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger) self._watcher.stop() self._pool.terminate() - self._pool.close() + self._pool.close() self._pool.join() SystemExit("Ingest finished...") - def _ingest_files_pool(self): - + def _ingest_files_pool(self): + if self._watcher.HasFiles: - + for x in range(0,self._processes): file = self._watcher.GetNextFile() resutl = self._pool.apply_async(ingest_file,args=(file,self._pkt_num,self._pcap_split_staging,self._kafka_topic.Partition,self._hdfs_root_path ,self._kafka_topic.Topic,self._kafka_topic.BootstrapServers,)) #resutl.get() # to debug add try and catch. - if not self._watcher.HasFiles: break + if not self._watcher.HasFiles: break return True + def ingest_file(file,pkt_num,pcap_split_staging, partition,hdfs_root_path,topic,kafka_servers): logger = logging.getLogger('SPOT.INGEST.FLOW.{0}'.format(os.getpid())) - + try: # get file name and date. org_file = file @@ -107,27 +109,26 @@ def ingest_file(file,pkt_num,pcap_split_staging, partition,hdfs_root_path,topic, pcap_date_path = file_date[-14:-6] # hdfs path with timestamp. - hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path,pcap_date_path,pcap_hour) + hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path, pcap_date_path, pcap_hour) # create hdfs path. - Util.creat_hdfs_folder(hdfs_path,logger) + Util.creat_hdfs_folder(hdfs_path, logger) # load file to hdfs. - hadoop_pcap_file = "{0}/{1}".format(hdfs_path,file) - Util.load_to_hdfs(os.path.join(currdir,file),hadoop_pcap_file,logger) + hadoop_pcap_file = "{0}/{1}".format(hdfs_path, file) + Util.load_to_hdfs(os.path.join(currdir, file), hadoop_pcap_file, logger) # create event for workers to process the file. logger.info( "Sending split file to worker number: {0}".format(partition)) - KafkaTopic.SendMessage(hadoop_pcap_file,kafka_servers,topic,partition) - logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic)) + KafkaTopic.SendMessage(hadoop_pcap_file, kafka_servers, topic, partition) + logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file, topic)) logger.info("Removing file: {0}".format(org_file)) rm_big_file = "rm {0}".format(org_file) - Util.execute_cmd(rm_big_file,logger) - + Util.execute_cmd(rm_big_file, logger) + except Exception as err: - + logger.error("There was a problem, please check the following error message:{0}".format(err.message)) logger.error("Exception: {0}".format(err)) - diff --git a/spot-ingest/pipelines/dns/worker.py b/spot-ingest/pipelines/dns/worker.py index 06a381a..10b753f 100755 --- a/spot-ingest/pipelines/dns/worker.py +++ b/spot-ingest/pipelines/dns/worker.py @@ -3,7 +3,7 @@ import logging import datetime import subprocess -import json +import ConfigParser import os from multiprocessing import Process from common.utils import Util @@ -12,7 +12,7 @@ class Worker(object): def __init__(self,db_name,hdfs_app_path,kafka_consumer,conf_type,processes=None): - + self._initialize_members(db_name,hdfs_app_path,kafka_consumer,conf_type) def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type): @@ -24,13 +24,12 @@ def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type): self._hdfs_app_path = hdfs_app_path # read proxy configuration. - self._script_path = os.path.dirname(os.path.abspath(__file__)) - conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path))) - conf = json.loads(open(conf_file).read()) - self._conf = conf["pipelines"][conf_type] + conf_file = "/etc/spot.conf" + self._conf = ConfigParser.SafeConfigParser() + self._conf.read(conf_file) - self._process_opt = self._conf['process_opt'] - self._local_staging = self._conf['local_staging'] + self._process_opt = self._conf.get(conf_type, 'process_opt') + self._local_staging = self._conf.get(conf_type, 'local_staging') self.kafka_consumer = kafka_consumer def start(self): @@ -42,9 +41,9 @@ def start(self): def _new_file(self,file): self._logger.info("-------------------------------------- New File received --------------------------------------") - self._logger.info("File: {0} ".format(file)) + self._logger.info("File: {0} ".format(file)) p = Process(target=self._process_new_file, args=(file,)) - p.start() + p.start() def _process_new_file(self,file): @@ -81,7 +80,7 @@ def _process_new_file(self,file): self._logger.info("Moving data to staging: {0}".format(mv_to_staging)) Util.execute_cmd(mv_to_staging,self._logger) - #load to avro + # load to avro load_to_avro_cmd = "hive -hiveconf dbname={0} -hiveconf y={1} -hiveconf m={2} -hiveconf d={3} -hiveconf h={4} -hiveconf data_location='{5}' -f pipelines/dns/load_dns_avro_parquet.hql".format(self._db_name,binary_year,binary_month,binary_day,binary_hour,hdfs_staging_path) self._logger.info("Loading data to hive: {0}".format(load_to_avro_cmd)) diff --git a/spot-ingest/pipelines/flow/collector.py b/spot-ingest/pipelines/flow/collector.py index 1f45f16..f76699b 100755 --- a/spot-ingest/pipelines/flow/collector.py +++ b/spot-ingest/pipelines/flow/collector.py @@ -3,21 +3,22 @@ import time import logging import os -import json +import ConfigParser from multiprocessing import Process from common.utils import Util from common.file_collector import FileWatcher from multiprocessing import Pool from common.kafka_client import KafkaTopic + class Collector(object): def __init__(self,hdfs_app_path,kafka_topic,conf_type): - + self._initialize_members(hdfs_app_path,kafka_topic,conf_type) def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type): - + # getting parameters. self._logger = logging.getLogger('SPOT.INGEST.FLOW') self._hdfs_app_path = hdfs_app_path @@ -27,55 +28,55 @@ def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type): self._script_path = os.path.dirname(os.path.abspath(__file__)) # read flow configuration. - conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path))) - conf = json.loads(open(conf_file).read()) - self._conf = conf["pipelines"][conf_type] + conf_file = '/etc/spot.conf' + self._conf = ConfigParser.SafeConfigParser() + self._conf.read(conf_file) # set configuration. - self._collector_path = self._conf['collector_path'] + self._collector_path = self._conf.get(conf_type, 'collector_path') self._dsource = 'flow' self._hdfs_root_path = "{0}/{1}".format(hdfs_app_path, self._dsource) - self._supported_files = self._conf['supported_files'] + self._supported_files = self._conf.get(conf_type, 'supported_files') # create collector watcher self._watcher = FileWatcher(self._collector_path,self._supported_files) - - # Multiprocessing. - self._processes = conf["collector_processes"] - self._ingestion_interval = conf["ingestion_interval"] + + # Multiprocessing. + self._processes = self._conf.get('ingest', 'collector_processes') + self._ingestion_interval = self._conf.get('ingest', 'ingestion_interval') self._pool = Pool(processes=self._processes) def start(self): - self._logger.info("Starting FLOW ingest") + self._logger.info("Starting FLOW ingest") self._watcher.start() - + try: - while True: - self._ingest_files_pool() + while True: + self._ingest_files_pool() time.sleep(self._ingestion_interval) except KeyboardInterrupt: - self._logger.info("Stopping FLOW collector...") - Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger) + self._logger.info("Stopping FLOW collector...") + Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger) self._watcher.stop() self._pool.terminate() - self._pool.close() + self._pool.close() self._pool.join() SystemExit("Ingest finished...") - - def _ingest_files_pool(self): - + + def _ingest_files_pool(self): + if self._watcher.HasFiles: - + for x in range(0,self._processes): file = self._watcher.GetNextFile() resutl = self._pool.apply_async(ingest_file,args=(file,self._kafka_topic.Partition,self._hdfs_root_path ,self._kafka_topic.Topic,self._kafka_topic.BootstrapServers,)) #resutl.get() # to debug add try and catch. - if not self._watcher.HasFiles: break + if not self._watcher.HasFiles: break return True - + def ingest_file(file,partition,hdfs_root_path,topic,kafka_servers): @@ -102,7 +103,7 @@ def ingest_file(file,partition,hdfs_root_path,topic,kafka_servers): # create event for workers to process the file. logger.info("Sending file to worker number: {0}".format(partition)) - KafkaTopic.SendMessage(hdfs_file,kafka_servers,topic,partition) + KafkaTopic.SendMessage(hdfs_file,kafka_servers,topic,partition) logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic)) except Exception as err: diff --git a/spot-ingest/pipelines/flow/worker.py b/spot-ingest/pipelines/flow/worker.py index 8d2efd9..04ad9b2 100755 --- a/spot-ingest/pipelines/flow/worker.py +++ b/spot-ingest/pipelines/flow/worker.py @@ -5,7 +5,7 @@ import datetime import logging import os -import json +import ConfigParser from multiprocessing import Process from common.utils import Util @@ -24,13 +24,12 @@ def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type): self._hdfs_app_path = hdfs_app_path # read proxy configuration. - self._script_path = os.path.dirname(os.path.abspath(__file__)) - conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path))) - conf = json.loads(open(conf_file).read()) - self._conf = conf["pipelines"][conf_type] + conf_file = "/etc/spot.conf" + self._conf = ConfigParser.SafeConfigParser() + self._conf.read(conf_file) - self._process_opt = self._conf['process_opt'] - self._local_staging = self._conf['local_staging'] + self._process_opt = self._conf.get(conf_type, 'process_opt') + self._local_staging = self._conf.get(conf_type, 'local_staging') self.kafka_consumer = kafka_consumer def start(self): @@ -42,9 +41,9 @@ def start(self): def _new_file(self,file): self._logger.info("-------------------------------------- New File received --------------------------------------") - self._logger.info("File: {0} ".format(file)) + self._logger.info("File: {0} ".format(file)) p = Process(target=self._process_new_file, args=(file,)) - p.start() + p.start() def _process_new_file(self,file): @@ -66,7 +65,7 @@ def _process_new_file(self,file): # build process cmd. process_cmd = "nfdump -o csv -r {0}{1} {2} > {0}{1}.csv".format(self._local_staging,file_name,self._process_opt) self._logger.info("Processing file: {0}".format(process_cmd)) - Util.execute_cmd(process_cmd,self._logger) + Util.execute_cmd(process_cmd,self._logger) # create hdfs staging. hdfs_path = "{0}/flow".format(self._hdfs_app_path) diff --git a/spot-ingest/pipelines/proxy/collector.py b/spot-ingest/pipelines/proxy/collector.py index 4cffa12..4e6424a 100644 --- a/spot-ingest/pipelines/proxy/collector.py +++ b/spot-ingest/pipelines/proxy/collector.py @@ -1,7 +1,7 @@ #!/bin/env python import logging -import json +import ConfigParser import os import sys import copy @@ -11,14 +11,15 @@ from common.file_collector import FileWatcher import time + class Collector(object): - + def __init__(self,hdfs_app_path,kafka_topic,conf_type): - + self._initialize_members(hdfs_app_path,kafka_topic,conf_type) - + def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type): - + # getting parameters. self._logger = logging.getLogger('SPOT.INGEST.PROXY') self._hdfs_app_path = hdfs_app_path @@ -28,75 +29,75 @@ def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type): self._script_path = os.path.dirname(os.path.abspath(__file__)) # read proxy configuration. - conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path))) - conf = json.loads(open(conf_file).read()) - self._message_size = conf["kafka"]["message_size"] - self._conf = conf["pipelines"][conf_type] + conf_file = "/etc/spot.conf" + self._conf = ConfigParser.SafeConfigParser() + self._conf.read(conf_file) + self._message_size = self._conf.get('ingest', "message_size") # get collector path. - self._collector_path = self._conf['collector_path'] + self._collector_path = self._conf.get(conf_type, 'collector_path') #get supported files - self._supported_files = self._conf['supported_files'] + self._supported_files = self._conf.get(conf_type, 'supported_files') # create collector watcher self._watcher = FileWatcher(self._collector_path,self._supported_files) - # Multiprocessing. - self._processes = conf["collector_processes"] - self._ingestion_interval = conf["ingestion_interval"] + # Multiprocessing. + self._processes = self._conf.get('ingest', "collector_processes") + self._ingestion_interval = self._conf.get('ingest', "ingestion_interval") self._pool = Pool(processes=self._processes) def start(self): self._logger.info("Starting PROXY collector") - self._watcher.start() - + self._watcher.start() + try: while True: #self._ingest_files() - self._ingest_files_pool() + self._ingest_files_pool() time.sleep(self._ingestion_interval) except KeyboardInterrupt: - self._logger.info("Stopping Proxy collector...") - Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger) + self._logger.info("Stopping Proxy collector...") + Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger) self._watcher.stop() self._pool.terminate() - self._pool.close() + self._pool.close() self._pool.join() - + def _ingest_files_pool(self): - - + + if self._watcher.HasFiles: - + for x in range(0,self._processes): file = self._watcher.GetNextFile() resutl = self._pool.apply_async(ingest_file,args=(file,self._message_size,self._kafka_topic.Topic,self._kafka_topic.BootstrapServers)) - #resutl.get() # to debug add try and catch. - if not self._watcher.HasFiles: break + # resutl.get() # to debug add try and catch. + if not self._watcher.HasFiles: break return True def ingest_file(file,message_size,topic,kafka_servers): - + logger = logging.getLogger('SPOT.INGEST.PROXY.{0}'.format(os.getpid())) - try: + try: message = "" - logger.info("Ingesting file: {0} process:{1}".format(file,os.getpid())) + logger.info("Ingesting file: {0} process:{1}".format(file,os.getpid())) with open(file,"rb") as f: for line in f: message += line if len(message) > message_size: KafkaTopic.SendMessage(message,kafka_servers,topic,0) message = "" - #send the last package. - KafkaTopic.SendMessage(message,kafka_servers,topic,0) + #send the last package. + KafkaTopic.SendMessage(message,kafka_servers,topic,0) rm_file = "rm {0}".format(file) Util.execute_cmd(rm_file,logger) logger.info("File {0} has been successfully sent to Kafka Topic: {1}".format(file,topic)) - except Exception as err: + except Exception as err: logger.error("There was a problem, please check the following error message:{0}".format(err.message)) logger.error("Exception: {0}".format(err)) diff --git a/spot-ingest/pipelines/proxy/worker.py b/spot-ingest/pipelines/proxy/worker.py index bc8d131..7e54a85 100644 --- a/spot-ingest/pipelines/proxy/worker.py +++ b/spot-ingest/pipelines/proxy/worker.py @@ -1,7 +1,7 @@ #!/bin/env python import os import logging -import json +import ConfigParser from common.utils import Util @@ -13,7 +13,7 @@ def __init__(self,db_name,hdfs_app_path,kafka_consumer,conf_type,processes): self._initialize_members(db_name,hdfs_app_path,kafka_consumer,conf_type,processes) def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type,processes): - + # get logger instance. self._logger = Util.get_logger('SPOT.INGEST.WRK.PROXY') @@ -23,43 +23,40 @@ def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type,proc # read proxy configuration. self._script_path = os.path.dirname(os.path.abspath(__file__)) - conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path))) - conf = json.loads(open(conf_file).read()) - self._spark_conf = conf["spark-streaming"] - self._conf = conf["pipelines"][conf_type] + conf_file = "/etc/spot.conf" + self._conf = ConfigParser.SafeConfigParser() + self._conf.read(conf_file) self._processes = processes def start(self): - self._logger.info("Creating Spark Job for topic: {0}".format(self._kafka_consumer.Topic)) + self._logger.info("Creating Spark Job for topic: {0}".format(self._kafka_consumer.Topic)) # parser - parser = self._conf["parser"] + parser = self._conf.get('proxy', "parser") + + # spark conf + diver_memory = self._conf.get('ingest', "driver_memory") + num_exec = self._conf.get('ingest', "spark_exec") + exec_memory = self._conf.get('ingest', "spark_executor_memory") + exec_cores = self._conf.get('ingest', "spark_executor_cores") + batch_size = self._conf.get('ingest', "spark_batch_size") - #spark conf - diver_memory = self._spark_conf["driver_memory"] - num_exec = self._spark_conf["spark_exec"] - exec_memory = self._spark_conf["spark_executor_memory"] - exec_cores = self._spark_conf["spark_executor_cores"] - batch_size = self._spark_conf["spark_batch_size"] - jar_path = os.path.dirname(os.path.dirname(self._script_path)) - # spark job command. + # spark job command. spark_job_cmd = ("spark-submit --master yarn " - "--driver-memory {0} " - "--num-executors {1} " - "--conf spark.executor.memory={2} " - "--conf spark.executor.cores={3} " - "--jars {4}/common/spark-streaming-kafka-0-8-assembly_2.11-2.0.0.jar " - "{5}/{6} " - "-zk {7} " - "-t {8} " - "-db {9} " - "-dt {10} " - "-w {11} " - "-bs {12}".format(diver_memory,num_exec,exec_memory,exec_cores,jar_path,self._script_path,parser,self._kafka_consumer.ZookeperServer,self._kafka_consumer.Topic,self._db_name,"proxy",self._processes,batch_size)) - + "--driver-memory {0} " + "--num-executors {1} " + "--conf spark.executor.memory={2} " + "--conf spark.executor.cores={3} " + "--jars {4}/common/spark-streaming-kafka-0-8-assembly_2.11-2.0.0.jar " + "{5}/{6} " + "-zk {7} " + "-t {8} " + "-db {9} " + "-dt {10} " + "-w {11} " + "-bs {12}".format(diver_memory,num_exec,exec_memory,exec_cores,jar_path,self._script_path,parser,self._kafka_consumer.ZookeperServer,self._kafka_consumer.Topic,self._db_name,"proxy",self._processes,batch_size)) + # start spark job. Util.execute_cmd(spark_job_cmd,self._logger) - - diff --git a/spot-ingest/start_ingest_standalone.sh b/spot-ingest/start_ingest_standalone.sh index 5a0a2de..dd9d2f5 100755 --- a/spot-ingest/start_ingest_standalone.sh +++ b/spot-ingest/start_ingest_standalone.sh @@ -26,8 +26,8 @@ fi # Validate type (ingest conf). #----------------------------------------------------------------------------------- CONF_FILE="ingest_conf.json" -CONF_ING=`python -c "import json,sys;obj=json.loads(open('ingest_conf.json').read());print obj['pipelines']['${INGEST_CONF}'];"` -TYPE=`python -c "import json,sys;obj=json.loads(open('ingest_conf.json').read());print obj['pipelines']['${INGEST_CONF}']['type'];"` +CONF_ING=`python -c "import ConfigParser; conf = ConfigParser.SafeConfigParser(); conf.read('spot.conf'); result = '${INGEST_CONF}' if conf.has_section('${INGEST_CONF}') == True else ''; print result"` +TYPE=`python -c "import ConfigParser; conf = ConfigParser.SafeConfigParser(); conf.read('spot.conf'); print(conf.get('${INGEST_CONF}','type'))"` if [ -z "$CONF_ING" ]; then echo "Provided type is not part of ${CONF_FILE}" diff --git a/spot-ingest/worker.py b/spot-ingest/worker.py index a01559e..4dc0128 100755 --- a/spot-ingest/worker.py +++ b/spot-ingest/worker.py @@ -9,9 +9,9 @@ from common.kerberos import Kerberos from common.kafka_client import KafkaConsumer -script_path = os.path.dirname(os.path.abspath(__file__)) -conf_file = "{0}/ingest_conf.json".format(script_path) -worker_conf = json.loads(open (conf_file).read()) +conf_file = '/etc/spot.conf' +worker_conf = ConfigParser.SafeConfigParser() +worker_conf.read(conf_file) def main(): @@ -31,39 +31,42 @@ def start_worker(type,topic,id,processes=None): logger = Util.get_logger("SPOT.INGEST.WORKER") - # validate the given configuration exists in ingest_conf.json. - if not type in worker_conf["pipelines"]: - logger.error("'{0}' type is not a valid configuration.".format(type)); + # validate the given configuration exists in spot.conf + try: + master_conf.get(type, 'type' + except ConfigParser.NoSectionError: + logger.error("'{0}' type is not a valid configuration.".format(type)) sys.exit(1) # validate the type is a valid module. - if not Util.validate_data_source(worker_conf["pipelines"][type]["type"]): - logger.error("The provided data source {0} is not valid".format(type));sys.exit(1) + if not Util.validate_data_source(master_conf.get(type, "type"): + logger.error("'{0}' type is not configured. Please check /etc/spot.conf".format(type); + sys.exit(1) - # validate if kerberos authentication is requiered. - if os.getenv('KRB_AUTH'): + # validate if kerberos authentication is required. + if master_conf.get('kerberos', 'KRB_AUTH'): kb = Kerberos() kb.authenticate() # create a worker instance based on the data source type. - module = __import__("pipelines.{0}.worker".format(worker_conf["pipelines"][type]["type"]),fromlist=['Worker']) + module = __import__("pipelines.{0}.worker".format(worker_conf(type, 'type')),fromlist=['Worker']) # kafka server info. logger.info("Initializing kafka instance") - k_server = worker_conf["kafka"]['kafka_server'] - k_port = worker_conf["kafka"]['kafka_port'] + k_server = worker_conf.get('ingest', 'kafka_server') + k_port = worker_conf.get('ingest', 'kafka_port') # required zookeeper info. - zk_server = worker_conf["kafka"]['zookeper_server'] - zk_port = worker_conf["kafka"]['zookeper_port'] + zk_server = worker_conf.get('ingest', 'zookeper_server') + zk_port = worker_conf('ingest','zookeper_port') topic = topic # create kafka consumer. kafka_consumer = KafkaConsumer(topic,k_server,k_port,zk_server,zk_port,id) # start worker. - db_name = worker_conf['dbname'] - app_path = worker_conf['hdfs_app_path'] + db_name = worker_conf.get('database','DBNAME'] + app_path = worker_conf.get('ingest', 'hdfs_app_path') ingest_worker = module.Worker(db_name,app_path,kafka_consumer,type,processes) ingest_worker.start() diff --git a/spot-ml/README.md b/spot-ml/README.md index a4eab66..75c4cb3 100644 --- a/spot-ml/README.md +++ b/spot-ml/README.md @@ -102,11 +102,11 @@ The Hive tables containing DNS data for spot-ml analyses have the following sche ### Run a suspicious connects analysis -To run a suspicious connects analysis, execute the `ml_ops.sh` script in the ml directory of the MLNODE. +To run a suspicious connects analysis, execute the `ml_ops.py` script in the ml directory of the MLNODE. ``` -./ml_ops.sh YYYMMDD +./ml_ops.py -d YYYMMDD -t -T -m ``` - +-T and -m are optional and will use the values from the spot.conf if not provided For example: ``` diff --git a/spot-ml/ml_ops.py b/spot-ml/ml_ops.py new file mode 100755 index 0000000..7b6988e --- /dev/null +++ b/spot-ml/ml_ops.py @@ -0,0 +1,217 @@ +#!/usr/bin/python + +import os +import sys +import logging +import argparse +import subprocess +import ConfigParser + + +def main(): + + # initialize ConfigParser + conf_file = '/etc/spot.conf' + conf = ConfigParser.SafeConfigParser() + spot_conf = conf.read(conf_file) + + # initialize logging + logger = get_logger('SPOT.ML.OPS', create_file=False) + + # check for file + if len(spot_conf) < 1: + logger.info("Failed to open /etc/spot.conf, check file location and try again") + raise SystemExit + + # parse and validate arguments + tol = None + + # input Parameters + parser = argparse.ArgumentParser(description="ML Operations script") + parser.add_argument('-t','--type', dest='type', required=True, help='Type of data that will be processed', metavar='') + parser.add_argument('-d','--date', dest='fdate', required=True, help='Specify date to be analyzed by ML', metavar='') + parser.add_argument('-T','--tol', dest='tol', required=False, help='If present will override default TOL from conf file', metavar='') + parser.add_argument('-m','--maxResults', dest='MAXRESULTS', required=False, help='If defined sets max results returned', metavar='') + args = parser.parse_args() + + YR = args.fdate[0:4] + MH = args.fdate[4:6] + DY = args.fdate[6:8] + + # getting defaults for ConfigParser interpolation + DEFAULTS = vars(args) + DEFAULTS.update({'YR': YR, 'MH': MH, 'DY': DY}) + + # prepare parameters pipeline stages + HPATH = conf.get('DEFAULT', 'HPATH', vars=DEFAULTS) + LPATH = conf.get('DEFAULT', 'LPATH', vars=DEFAULTS) + + if not MAXRESULTS: + MAXRESULTS = conf.get('DEFAULT', 'MAXRESULTS') + + RAWDATA_PATH = conf.get(args.type, '{0}_PATH'.format(args.type.upper()), vars=DEFAULTS) + + FEEDBACK_PATH = "{0}/{1}_scores.csv".format(conf.get('DEFAULT', 'LPATH', vars=DEFAULTS), args.type) + DUPFACTOR = conf.get('DEFAULT', 'DUPFACTOR') + + PREPROCESS_STEP = "{0}_pre_lda".format(args.type) + POSTPROCESS_STEP = "{0}_post_lda".format(args.type) + + HDFS_WORDCOUNTS = "{0}/word_counts".format(HPATH) + + # paths for intermediate files + HDFS_DOCRESULTS = "{0}/doc_results.csv".format(HPATH) + LOCAL_DOCRESULTS = "{0}/doc_results.csv".format(LPATH) + + HDFS_WORDRESULTS = "{0}/word_results.csv".format(HPATH) + LOCAL_WORDRESULTS = "{0}/word_results.csv".format(LPATH) + + HDFS_SCORED_CONNECTS = "{0}/scores".format(HPATH) + HDFS_MODEL = "{0}/model".format(HPATH) + + LDA_OUTPUT_DIR = "{1}/{1}".format(args.type, args.fdate) + + # get nodes and create comma seperated list + NODES = conf.get('DEFAULT', 'NODES').split() + nodes_csl = ','.join(NODES) + + cmd = "hdfs dfs -rm -R -f {0}".format(HDFS_WORDCOUNTS) + execute_cmd(cmd, logger) + + cmd = "mkdir -p {0}".format(LPATH) + execute_cmd(cmd, logger) + + # protect the flow_scores.csv file + cmd = "rm -f {0}/*.{dat,beta,gamma,other,pkl}".format(LPATH) + execute_cmd(cmd, logger) + + cmd = "hdfs dfs -rm -R -f {0}".format(HDFS_SCORED_CONNECTS) + execute_cmd(cmd, logger) + + # Add -p to execute pre MPI command. + # Pre MPI command can be configured in /etc/spot.conf + # In this script, after the line after --mpicmd ${MPI_CMD} add: + # --mpiprep ${MPI_PREP_CMD} + + if conf.get('mpi', "MPI_PREP_CMD"): + cmd = conf.get('mpi', "MPI_PREP_CMD") + execute_cmd(cmd, logger) + + SPK_CONFIG = conf.get('spark', 'SPK_CONFIG') + SPK_DRIVER_MEM = conf.get('spark', 'SPK_DRIVER_MEM') + SPK_EXEC = conf.get('spark', 'SPK_EXEC') + SPK_EXEC_CORES = conf.get('spark', 'SPK_EXEC_CORES') + SPK_EXEC_MEM = conf.get('spark', 'SPK_EXEC_MEM') + SPK_EXEC_MEM_OVERHEAD = conf.get('spark', 'SPK_EXEC_MEM_OVERHEAD') + SPK_DRIVER_MAX_RESULTS = conf.get('spark', 'SPK_DRIVER_MAX_RESULTS') + SPK_DRIVER_MEM_OVERHEAD = conf.get('spark', 'SPK_DRIVER_MEM_OVERHEAD') + LDAPATH = conf.get('DEFAULT', 'LDAPATH') + LUSER = conf.get('DEFAULT', 'LUSER') + MPI_CMD = conf.get('mpi', 'MPI_CMD') + PROCESS_COUNT = conf.get('mpi', 'PROCESS_COUNT') + TOPIC_COUNT = conf.get('DEFAULT', 'TOPIC_COUNT') + + if tol: + TOL = tol + else: + TOL = conf.get('DEFAULT', 'TOL') + + # prepare options for spark-submit + spark_cmd = ["time", "spark-submit", + "--class org.apache.spot.SuspiciousConnects", + "--master yarn-client", "--conf spark.driver.maxPermSize=512m", + "--conf spark.driver.cores=1", + "--conf spark.dynamicAllocation.enabled=true", + "--conf spark.dynamicAllocation.minExecutors=1", + "--conf spark.executor.extraJavaOptions=-XX:MaxPermSize=512M -XX:PermSize=512M", + "--conf spark.shuffle.io.preferDirectBufs=false", + "--conf spark.kryoserializer.buffer.max=512m", + "--conf spark.shuffle.service.enabled=true", + "--conf spark.yarn.am.waitTime=100s"] + + spark_extras = [ + "--driver-memory " + SPK_DRIVER_MEM, + "--conf spark.dynamicAllocation.maxExecutors=" + SPK_EXEC, + "--conf spark.executor.cores=" + SPK_EXEC_CORES, + "--conf spark.executor.memory=" + SPK_EXEC_MEM, + "--conf spark.driver.maxResultSize=" + SPK_DRIVER_MAX_RESULTS, + "--conf spark.yarn.am.memoryOverhead=" + SPK_DRIVER_MEM_OVERHEAD, + "--conf spark.yarn.executor.memoryOverhead=" + SPK_EXEC_MEM_OVERHEAD] + + if SPK_CONFIG: + logger.info('Adding Spark Configurations from spot.conf') + spark_cmd.extend(spark_extras) + + spot_jar = [ + "target/scala-2.10/spot-ml-assembly-1.1.jar", + "--analysis " + args.type, + "--input " + RAWDATA_PATH, "--dupfactor " + DUPFACTOR, + "--feedback " + FEEDBACK_PATH, + "--model " + LPATH + "/model.dat", + "--topicdoc " + LPATH + "/final.gamma", + "--topicword " + LPATH + "/final.beta", + "--lpath " + LPATH, "--ldapath" + LDAPATH, + "--luser " + LUSER, "--mpicmd " + MPI_CMD, + "--proccount " + PROCESS_COUNT, + "--topiccount " + TOPIC_COUNT, + "--nodes " + nodes_csl, + "--scored " + HDFS_SCORED_CONNECTS, + "--tempmodel " + HDFS_MODEL, + "--threshold " + TOL, + "--maxresults " + MAXRESULTS] + + spark_cmd.extend(spot_jar) + + execute_cmd(spark_cmd, logger) + # process = subprocess.Popen(spark_cmd, stdout=subprocess.PIPE, stderr=None) + + # move results to hdfs. + os.chdir(LPATH) + cmd = "hadoop fs -getmerge {0}/part-* {1}_results.csv && hadoop fs -moveFromLocal {1}_results.csv {0}/${1}_results.csv".format(HDFS_SCORED_CONNECTS, args.type) + execute_cmd(cmd, logger) + +def execute_cmd(command, logger): + + try: + logger.info("Executing: {0}".format(command)) + subprocess.call(command,shell=True) + + except subprocess.CalledProcessError as e: + logger.error("There was an error executing: {0}".format(e.cmd)) + sys.exit(1) + +def validate_parameter(parameter, message, logger): + if parameter == None or parameter == "": + logger.error(message) + sys.exit(1) + +def get_logger(logger_name,create_file=False): + + # create logger for prd_ci + log = logging.getLogger(logger_name) + log.setLevel(level=logging.INFO) + + # create formatter and add it to the handlers + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + if create_file: + # create file handler for logger. + fh = logging.FileHandler('SPOT.log') + fh.setLevel(level=logging.DEBUG) + fh.setFormatter(formatter) + # reate console handler for logger. + ch = logging.StreamHandler() + ch.setLevel(level=logging.DEBUG) + ch.setFormatter(formatter) + + # add handlers to logger. + if create_file: + log.addHandler(fh) + + log.addHandler(ch) + return log + + + +if __name__ =='__main__': + main() diff --git a/spot-ml/ml_ops.sh b/spot-ml/ml_ops.sh deleted file mode 100755 index 8faef3b..0000000 --- a/spot-ml/ml_ops.sh +++ /dev/null @@ -1,126 +0,0 @@ -#!/bin/bash - -# parse and validate arguments - -FDATE=$1 -DSOURCE=$2 - -YR=${FDATE:0:4} -MH=${FDATE:4:2} -DY=${FDATE:6:2} - -if [[ "${#FDATE}" != "8" || -z "${DSOURCE}" ]]; then - echo "ml_ops.sh syntax error" - echo "Please run ml_ops.sh again with the correct syntax:" - echo "./ml_ops.sh YYYYMMDD TYPE [TOL]" - echo "for example:" - echo "./ml_ops.sh 20160122 dns 1e-6" - echo "./ml_ops.sh 20160122 flow" - exit -fi - - -# read in variables (except for date) from etc/.conf file -# note: FDATE and DSOURCE *must* be defined prior sourcing this conf file - -source /etc/spot.conf - -# third argument if present will override default TOL from conf file - -if [ -n "$3" ]; then TOL=$3 ; fi - -if [ -n "$4" ]; then - MAXRESULTS=$4 -else - MAXRESULTS=-1 -fi - -# prepare parameters pipeline stages - -if [ "$DSOURCE" == "flow" ]; then - RAWDATA_PATH=${FLOW_PATH} -elif [ "$DSOURCE" == "dns" ]; then - RAWDATA_PATH=${DNS_PATH} -else - RAWDATA_PATH=${PROXY_PATH} -fi - -FEEDBACK_PATH=${LPATH}/${DSOURCE}_scores.csv -DUPFACTOR=1000 - -PREPROCESS_STEP=${DSOURCE}_pre_lda -POSTPROCESS_STEP=${DSOURCE}_post_lda - -HDFS_WORDCOUNTS=${HPATH}/word_counts - -# paths for intermediate files -HDFS_DOCRESULTS=${HPATH}/doc_results.csv -LOCAL_DOCRESULTS=${LPATH}/doc_results.csv - -HDFS_WORDRESULTS=${HPATH}/word_results.csv -LOCAL_WORDRESULTS=${LPATH}/word_results.csv - -HDFS_SCORED_CONNECTS=${HPATH}/scores -HDFS_MODEL=${HPATH}/model - -LDA_OUTPUT_DIR=${DSOURCE}/${FDATE} - -TOPIC_COUNT=20 - -nodes=${NODES[0]} -for n in "${NODES[@]:1}" ; do nodes+=",${n}"; done - -hdfs dfs -rm -R -f ${HDFS_WORDCOUNTS} -wait - -mkdir -p ${LPATH} -rm -f ${LPATH}/*.{dat,beta,gamma,other,pkl} # protect the flow_scores.csv file - -hdfs dfs -rm -R -f ${HDFS_SCORED_CONNECTS} - -# Add -p to execute pre MPI command. -# Pre MPI command can be configured in /etc/spot.conf -# In this script, after the line after --mpicmd ${MPI_CMD} add: -# --mpiprep ${MPI_PREP_CMD} - -${MPI_PREP_CMD} - -time spark-submit --class "org.apache.spot.SuspiciousConnects" \ - --master yarn-client \ - --driver-memory ${SPK_DRIVER_MEM} \ - --num-executors ${SPK_EXEC} \ - --conf spark.driver.maxResultSize=${SPK_DRIVER_MAX_RESULTS} \ - --conf spark.driver.maxPermSize=512m \ - --conf spark.dynamicAllocation.enabled=true \ - --conf spark.executor.cores=${SPK_EXEC_CORES} \ - --conf spark.executor.memory=${SPK_EXEC_MEM} \ - --conf "spark.executor.extraJavaOptions=-XX:MaxPermSize=512M -XX:PermSize=512M" \ - --conf spark.kryoserializer.buffer.max=512m \ - --conf spark.yarn.am.waitTime=100s \ - --conf spark.yarn.am.memoryOverhead=${SPK_DRIVER_MEM_OVERHEAD} \ - --conf spark.yarn.executor.memoryOverhead=${SPAK_EXEC_MEM_OVERHEAD} target/scala-2.10/spot-ml-assembly-1.1.jar \ - --analysis ${DSOURCE} \ - --input ${RAWDATA_PATH} \ - --dupfactor ${DUPFACTOR} \ - --feedback ${FEEDBACK_PATH} \ - --model ${LPATH}/model.dat \ - --topicdoc ${LPATH}/final.gamma \ - --topicword ${LPATH}/final.beta \ - --lpath ${LPATH} \ - --ldapath ${LDAPATH} \ - --luser ${LUSER} \ - --mpicmd ${MPI_CMD} \ - --proccount ${PROCESS_COUNT} \ - --topiccount ${TOPIC_COUNT} \ - --nodes ${nodes} \ - --scored ${HDFS_SCORED_CONNECTS} \ - --tempmodel ${HDFS_MODEL} \ - --threshold ${TOL} \ - --maxresults ${MAXRESULTS} - -wait - -# move results to hdfs. -cd ${LPATH} -hadoop fs -getmerge ${HDFS_SCORED_CONNECTS}/part-* ${DSOURCE}_results.csv && hadoop fs -moveFromLocal \ - ${DSOURCE}_results.csv ${HDFS_SCORED_CONNECTS}/${DSOURCE}_results.csv \ No newline at end of file diff --git a/spot-oa/oa/components/data/impala.py b/spot-oa/oa/components/data/impala.py index 5b6c8c1..5cdc378 100644 --- a/spot-oa/oa/components/data/impala.py +++ b/spot-oa/oa/components/data/impala.py @@ -3,13 +3,13 @@ class Engine(object): def __init__(self,db,conf, pipeline): - - self._daemon_node = conf['impala_daemon'] + + self._daemon_node = conf.get('database', 'IMPALA_DEM') self._db = db self._pipeline = pipeline impala_cmd = "impala-shell -i {0} --quiet -q 'INVALIDATE METADATA {1}.{2}'".format(self._daemon_node,self._db, self._pipeline) check_output(impala_cmd,shell=True) - + impala_cmd = "impala-shell -i {0} --quiet -q 'REFRESH {1}.{2}'".format(self._daemon_node,self._db, self._pipeline) check_output(impala_cmd,shell=True) diff --git a/spot-oa/oa/dns/dns_oa.py b/spot-oa/oa/dns/dns_oa.py index 884d46c..844fec0 100644 --- a/spot-oa/oa/dns/dns_oa.py +++ b/spot-oa/oa/dns/dns_oa.py @@ -12,19 +12,19 @@ from utils import Util from components.data.data import Data from components.iana.iana_transform import IanaTransform -from components.nc.network_context import NetworkContext +from components.nc.network_context import NetworkContext from multiprocessing import Process import time class OA(object): - + def __init__(self,date,limit=500,logger=None): self._initialize_members(date,limit,logger) def _initialize_members(self,date,limit,logger): - + # get logger if exists. if not, create new instance. self._logger = logging.getLogger('OA.DNS') if logger else Util.get_logger('OA.DNS',create_file=False) @@ -50,8 +50,8 @@ def _initialize_members(self,date,limit,logger): self._conf = json.loads(open (conf_file).read(),object_pairs_hook=OrderedDict) # initialize data engine - self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", "").replace('"', '') - self._engine = Data(self._db,self._table_name ,self._logger) + self._db = self._spot_conf.get('database', 'DBNAME') + self._engine = Data(self._db,self._table_name ,self._logger) def start(self): @@ -79,9 +79,9 @@ def start(self): def _create_folder_structure(self): # create date folder structure if it does not exist. - self._logger.info("Creating folder structure for OA (data and ipynb)") + self._logger.info("Creating folder structure for OA (data and ipynb)") self._data_path,self._ingest_summary_path,self._ipynb_path = Util.create_oa_folders("dns",self._date) - + def _add_ipynb(self): if os.path.isdir(self._ipynb_path): @@ -102,7 +102,7 @@ def _get_dns_results(self): dns_results = "{0}/dns_results.csv".format(self._data_path) # get hdfs path from conf file. - HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", "").replace('"', '') + HUSER = self._spot_conf.get('DEFAULT', 'HUSER') hdfs_path = "{0}/dns/scored_results/{1}/scores/dns_results.csv".format(HUSER,self._date) # get results file from hdfs. @@ -121,70 +121,70 @@ def _get_dns_results(self): self._logger.error("There was an error getting ML results from HDFS") sys.exit(1) - # add headers. + # add headers. self._logger.info("Adding headers") self._dns_scores_headers = [ str(key) for (key,value) in self._conf['dns_score_fields'].items() ] # add dns content. - self._dns_scores = [ conn[:] for conn in self._dns_results][:] + self._dns_scores = [ conn[:] for conn in self._dns_results][:] def _move_time_stamp(self,dns_data): - + for dns in dns_data: time_stamp = dns[1] dns.remove(time_stamp) dns.append(time_stamp) - - return dns_data + + return dns_data def _create_dns_scores_csv(self): - + dns_scores_csv = "{0}/dns_scores.csv".format(self._data_path) dns_scores_final = self._move_time_stamp(self._dns_scores) dns_scores_final.insert(0,self._dns_scores_headers) - Util.create_csv_file(dns_scores_csv,dns_scores_final) + Util.create_csv_file(dns_scores_csv,dns_scores_final) # create bk file dns_scores_bu_csv = "{0}/dns_scores_bu.csv".format(self._data_path) - Util.create_csv_file(dns_scores_bu_csv,dns_scores_final) + Util.create_csv_file(dns_scores_bu_csv,dns_scores_final) def _add_tld_column(self): qry_name_col = self._conf['dns_results_fields']['dns_qry_name'] - self._dns_scores = [conn + [ get_tld("http://" + str(conn[qry_name_col]), fail_silently=True) if "http://" not in str(conn[qry_name_col]) else get_tld(str(conn[qry_name_col]), fail_silently=True)] for conn in self._dns_scores ] - + self._dns_scores = [conn + [ get_tld("http://" + str(conn[qry_name_col]), fail_silently=True) if "http://" not in str(conn[qry_name_col]) else get_tld(str(conn[qry_name_col]), fail_silently=True)] for conn in self._dns_scores ] + def _add_reputation(self): # read configuration. reputation_conf_file = "{0}/components/reputation/reputation_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) self._logger.info("Reading reputation configuration file: {0}".format(reputation_conf_file)) rep_conf = json.loads(open(reputation_conf_file).read()) - + # initialize reputation services. self._rep_services = [] self._logger.info("Initializing reputation services.") - for service in rep_conf: + for service in rep_conf: config = rep_conf[service] module = __import__("components.reputation.{0}.{0}".format(service), fromlist=['Reputation']) self._rep_services.append(module.Reputation(config,self._logger)) - + # get columns for reputation. rep_cols = {} - indexes = [ int(value) for key, value in self._conf["add_reputation"].items()] + indexes = [ int(value) for key, value in self._conf["add_reputation"].items()] self._logger.info("Getting columns to add reputation based on config file: dns_conf.json".format()) for index in indexes: col_list = [] for conn in self._dns_scores: - col_list.append(conn[index]) + col_list.append(conn[index]) rep_cols[index] = list(set(col_list)) # get reputation per column. - self._logger.info("Getting reputation for each service in config") + self._logger.info("Getting reputation for each service in config") rep_services_results = [] for key,value in rep_cols.items(): rep_services_results = [ rep_service.check(None,value) for rep_service in self._rep_services] - rep_results = {} - for result in rep_services_results: + rep_results = {} + for result in rep_services_results: rep_results = {k: "{0}::{1}".format(rep_results.get(k, ""), result.get(k, "")).strip('::') for k in set(rep_results) | set(result)} self._dns_scores = [ conn + [ rep_results[conn[key]] ] for conn in self._dns_scores ] @@ -209,9 +209,9 @@ def _add_iana(self): dns_qry_type_index = self._conf["dns_results_fields"]["dns_qry_type"] dns_qry_rcode_index = self._conf["dns_results_fields"]["dns_qry_rcode"] self._dns_scores = [ conn + [ dns_iana.get_name(conn[dns_qry_class_index],"dns_qry_class")] + [dns_iana.get_name(conn[dns_qry_type_index],"dns_qry_type")] + [ dns_iana.get_name(conn[dns_qry_rcode_index],"dns_qry_rcode") ] for conn in self._dns_scores ] - - else: - self._dns_scores = [ conn + ["","",""] for conn in self._dns_scores ] + + else: + self._dns_scores = [ conn + ["","",""] for conn in self._dns_scores ] def _add_network_context(self): @@ -226,13 +226,13 @@ def _add_network_context(self): def _get_oa_details(self): - - self._logger.info("Getting OA DNS suspicious details/chord diagram") + + self._logger.info("Getting OA DNS suspicious details/chord diagram") # start suspicious connects details process. p_sp = Process(target=self._get_suspicious_details) - p_sp.start() + p_sp.start() - # start chord diagram process. + # start chord diagram process. p_dn = Process(target=self._get_dns_dendrogram) p_dn.start() @@ -245,7 +245,7 @@ def _get_suspicious_details(self): if os.path.isfile(iana_conf_file): iana_config = json.loads(open(iana_conf_file).read()) dns_iana = IanaTransform(iana_config["IANA"]) - + for conn in self._dns_scores: # get data to query date=conn[self._conf["dns_score_fields"]["frame_time"]].split(" ") @@ -254,24 +254,24 @@ def _get_suspicious_details(self): if len(date) == 5: year=date[2] month=datetime.datetime.strptime(date[0], '%b').strftime('%m') - day=date[1] + day=date[1] hh=conn[self._conf["dns_score_fields"]["hh"]] dns_qry_name = conn[self._conf["dns_score_fields"]["dns_qry_name"]] self._get_dns_details(dns_qry_name,year,month,day,hh,dns_iana) def _get_dns_details(self,dns_qry_name,year,month,day,hh,dns_iana): - + limit = self._details_limit edge_file ="{0}/edge-{1}_{2}_00.csv".format(self._data_path,dns_qry_name.replace("/","-"),hh) edge_tmp ="{0}/edge-{1}_{2}_00.tmp".format(self._data_path,dns_qry_name.replace("/","-"),hh) if not os.path.isfile(edge_file): - + dns_qry = ("SELECT frame_time,frame_len,ip_dst,ip_src,dns_qry_name,dns_qry_class,dns_qry_type,dns_qry_rcode,dns_a FROM {0}.{1} WHERE y={2} AND m={3} AND d={4} AND dns_qry_name LIKE '%{5}%' AND h={6} LIMIT {7};").format(self._db,self._table_name,year,month,day,dns_qry_name,hh,limit) - + # execute query self._engine.query(dns_qry,edge_tmp) - + # add IANA to results. if dns_iana: update_rows = [] @@ -296,18 +296,18 @@ def _get_dns_details(self,dns_qry_name,year,month,day,hh,dns_iana): writer = csv.writer(dns_details_edge, quoting=csv.QUOTE_ALL) if update_rows: writer.writerows(update_rows) - else: - shutil.copy(edge_tmp,edge_file) - + else: + shutil.copy(edge_tmp,edge_file) + try: os.remove(edge_tmp) except OSError: pass - + def _get_dns_dendrogram(self): limit = self._details_limit - for conn in self._dns_scores: + for conn in self._dns_scores: date=conn[self._conf["dns_score_fields"]["frame_time"]].split(" ") date = filter(None,date) diff --git a/spot-oa/oa/dns/ipynb_templates/Edge_Investigation_master.ipynb b/spot-oa/oa/dns/ipynb_templates/Edge_Investigation_master.ipynb index 5573c9a..e2c1f8b 100644 --- a/spot-oa/oa/dns/ipynb_templates/Edge_Investigation_master.ipynb +++ b/spot-oa/oa/dns/ipynb_templates/Edge_Investigation_master.ipynb @@ -129,15 +129,33 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": { "collapsed": false }, - "outputs": [], + "outputs": [ + { + "ename": "NameError", + "evalue": "name 'assign_btn' is not defined", + "output_type": "error", + "traceback": [ + "\u001b[0;31m\u001b[0m", + "\u001b[0;31mNameError\u001b[0mTraceback (most recent call last)", + "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[1;32m 86\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 87\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 88\u001b[0;31m \u001b[0massign_btn\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mon_click\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0massign_score\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 89\u001b[0m \u001b[0msave_btn\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mon_click\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msave\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 90\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;31mNameError\u001b[0m: name 'assign_btn' is not defined" + ] + } + ], "source": [ "import csv\n", "import datetime\n", - "import subprocess \n", + "import subprocess\n", + "import ConfigParser\n", + "\n", + "# initialize ConfigParser \n", + "conf_file = '/etc/spot.conf' \n", + "conf = ConfigParser.SafeConfigParser() \n", + "conf.read(conf_file) \n", "\n", "def assign_score(b):\n", " score_values = []\n", @@ -222,14 +240,11 @@ "\n", "def ml_feedback():\n", " dst_name = os.path.basename(sconnect)\n", - " str_fb=\"DSOURCE={0} &&\\\n", - " FDATE={1} &&\\\n", - " source /etc/spot.conf &&\\\n", - " usr=$(echo $LUSER | cut -f3 -d'/') &&\\\n", - " mlnode=$MLNODE &&\\\n", - " lpath=$LPATH &&\\\n", - " scp {2} $usr@$mlnode:$lpath/{3}\".format(dsource,date,score_fbk,dst_name) \n", - "\n", + " mlnode = conf.get('DEFAULT', 'MLNODE')\n", + " lpath = conf.get('DEFAULT','LPATH', vars={'DSOURCE': dsource,'FDATE': date, })\n", + " usr = conf.get('DEFAULT','LUSER').split('/')[2]\n", + " str_fb=\"scp {0} {1}@{2}:{3}/{4}\".format(score_fbk,usr,mlnode,lpath,dst_name) \n", + " \n", " subprocess.call(str_fb, shell=True)" ] }, @@ -261,7 +276,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", - "version": "2.7.5" + "version": "2.7.11" } }, "nbformat": 4, diff --git a/spot-oa/oa/dns/ipynb_templates/Threat_Investigation_master.ipynb b/spot-oa/oa/dns/ipynb_templates/Threat_Investigation_master.ipynb index 8c665a8..140f993 100644 --- a/spot-oa/oa/dns/ipynb_templates/Threat_Investigation_master.ipynb +++ b/spot-oa/oa/dns/ipynb_templates/Threat_Investigation_master.ipynb @@ -23,6 +23,7 @@ "import datetime\n", "import operator\n", "import itertools\n", + "import ConfigParser\n", "\n", "try:\n", " import ipywidgets as widgets # For jupyter/ipython >= 1.4\n", @@ -30,10 +31,13 @@ " from IPython.html import widgets\n", "from IPython.display import display, HTML, clear_output, Javascript \n", "\n", - "with open('/etc/spot.conf') as conf:\n", - " for line in conf.readlines():\n", - " if \"DBNAME=\" in line: DBNAME = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n", - " elif \"IMPALA_DEM=\" in line: IMPALA_DEM = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n", + "# initialize ConfigParser \n", + "conf_file = '/etc/spot.conf' \n", + "conf = ConfigParser.SafeConfigParser() \n", + "conf.read(conf_file) \n", + "\n", + "DBNAME = conf.get('database', 'DBNAME')\n", + "IMPALA_DEM = conf.get('database', 'IMPALA_DEM')\n", "\n", "path = os.getcwd().split(\"/\") \n", "t_date = path[len(path)-1] \n", @@ -333,7 +337,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", - "version": "2.7.5" + "version": "2.7.11" } }, "nbformat": 4, diff --git a/spot-oa/oa/flow/flow_oa.py b/spot-oa/oa/flow/flow_oa.py index 1fbd503..55f825d 100644 --- a/spot-oa/oa/flow/flow_oa.py +++ b/spot-oa/oa/flow/flow_oa.py @@ -20,12 +20,12 @@ class OA(object): - def __init__(self,date,limit=500,logger=None): - + def __init__(self,date,limit=500,logger=None): + self._initialize_members(date,limit,logger) - + def _initialize_members(self,date,limit,logger): - + # get logger if exists. if not, create new instance. self._logger = logging.getLogger('OA.Flow') if logger else Util.get_logger('OA.Flow',create_file=False) @@ -46,24 +46,24 @@ def _initialize_members(self,date,limit,logger): # get scores fields conf conf_file = "{0}/flow_conf.json".format(self._scrtip_path) - self._conf = json.loads(open (conf_file).read(),object_pairs_hook=OrderedDict) - + self._conf = json.loads(open (conf_file).read(),object_pairs_hook=OrderedDict) + # initialize data engine - self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", "").replace('"', '') + self._db = self._spot_conf.get('database', 'DBNAME') self._engine = Data(self._db, self._table_name,self._logger) - - def start(self): - + + def start(self): + #################### start = time.time() #################### self._create_folder_structure() - self._add_ipynb() + self._add_ipynb() self._get_flow_results() self._add_network_context() self._add_geo_localization() - self._add_reputation() + self._add_reputation() self._create_flow_scores_csv() self._get_oa_details() @@ -71,14 +71,14 @@ def start(self): end = time.time() print(end - start) ################## - + def _create_folder_structure(self): # create date folder structure if it does not exist. - self._logger.info("Creating folder structure for OA (data and ipynb)") + self._logger.info("Creating folder structure for OA (data and ipynb)") self._data_path,self._ingest_summary_path,self._ipynb_path = Util.create_oa_folders("flow",self._date) - def _add_ipynb(self): + def _add_ipynb(self): if os.path.isdir(self._ipynb_path): @@ -90,16 +90,16 @@ def _add_ipynb(self): else: self._logger.error("There was a problem adding the IPython Notebooks, please check the directory exists.") - + def _get_flow_results(self): - + self._logger.info("Getting {0} Machine Learning Results from HDFS".format(self._date)) flow_results = "{0}/flow_results.csv".format(self._data_path) - # get hdfs path from conf file - HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", "").replace('"', '') + # get hdfs path from conf file + HUSER = self._spot_conf.get('DEFAULT', 'HUSER') hdfs_path = "{0}/flow/scored_results/{1}/scores/flow_results.csv".format(HUSER,self._date) - + # get results file from hdfs get_command = Util.get_ml_results_form_hdfs(hdfs_path,self._data_path) self._logger.info("{0}".format(get_command)) @@ -116,14 +116,14 @@ def _get_flow_results(self): self._logger.error("There was an error getting ML results from HDFS") sys.exit(1) - # add headers. + # add headers. self._logger.info("Adding headers based on configuration file: score_fields.json") self._flow_scores = [ [ str(key) for (key,value) in self._conf['flow_score_fields'].items()] ] # filter results add sev and rank. self._logger.info("Filtering required columns based on configuration") self._flow_scores.extend([ [0] + [ conn[i] for i in self._conf['column_indexes_filter'] ] + [n] for n, conn in enumerate(self._flow_results) ]) - + def _create_flow_scores_csv(self): flow_scores_csv = "{0}/flow_scores.csv".format(self._data_path) @@ -131,11 +131,11 @@ def _create_flow_scores_csv(self): # create bk file flow_scores_bu_csv = "{0}/flow_scores_bu.csv".format(self._data_path) - Util.create_csv_file(flow_scores_bu_csv,self._flow_scores) + Util.create_csv_file(flow_scores_bu_csv,self._flow_scores) def _add_network_context(self): - # use ipranges to see if the IPs are internals. + # use ipranges to see if the IPs are internals. ip_ranges_file = "{0}/context/ipranges.csv".format(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) # add new headers (srcIpInternal/destIpInternal). @@ -158,38 +158,38 @@ def _add_network_context(self): # get src and dst IPs src_ip_index = self._conf["flow_score_fields"]["srcIP"] - dst_ip_index = self._conf["flow_score_fields"]["dstIP"] - + dst_ip_index = self._conf["flow_score_fields"]["dstIP"] + # add networkcontext per connection. - ip_internal_ranges = filter(None,nc_ranges) + ip_internal_ranges = filter(None,nc_ranges) self._logger.info("Adding networkcontext to suspicious connections.") self._flow_scores = [ conn + [ self._is_ip_internal(conn[src_ip_index],ip_internal_ranges)]+[ self._is_ip_internal(conn[dst_ip_index],ip_internal_ranges)] for conn in flow_scores] - + else: - self._flow_scores = [ conn + ["",""] for conn in flow_scores ] + self._flow_scores = [ conn + ["",""] for conn in flow_scores ] self._logger.info("WARNING: Network context was not added because the file ipranges.csv does not exist.") - + self._flow_scores.insert(0,flow_headers) def _is_ip_internal(self,ip, ranges): result = 0 for row in ranges: - if Util.ip_to_int(ip) >= row[0] and Util.ip_to_int(ip) <= row[1]: + if Util.ip_to_int(ip) >= row[0] and Util.ip_to_int(ip) <= row[1]: result = 1 break return result - + def _add_geo_localization(self): - # use ipranges to see if the IPs are internals. + # use ipranges to see if the IPs are internals. iploc_file = "{0}/context/iploc.csv".format(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - # add new headers (srcIpInternal/destIpInternal). + # add new headers (srcIpInternal/destIpInternal). self._logger.info("Adding geo localization headers") flow_headers = self._flow_scores[0] - flow_headers.extend(["srcGeo","dstGeo","srcDomain","dstDomain"]) + flow_headers.extend(["srcGeo","dstGeo","srcDomain","dstDomain"]) # add values to srcIpInternal and destIpInternal. flow_scores = iter(self._flow_scores) @@ -199,9 +199,9 @@ def _add_geo_localization(self): self._logger.info("Initializing geo localization component") geo = GeoLocalization(iploc_file,self._logger) - + src_ip_index = self._conf["flow_score_fields"]["srcIP"] - dst_ip_index = self._conf["flow_score_fields"]["dstIP"] + dst_ip_index = self._conf["flow_score_fields"]["dstIP"] self._logger.info("Adding geo localization...") self._flow_scores = [] @@ -217,27 +217,27 @@ def _add_geo_localization(self): # adding columns to the current connection list. conn.extend([src_geo_dict["geo_loc"],dst_geo_dict["geo_loc"],src_geo_dict["domain"],dst_geo_dict["domain"]]) - self._flow_scores.extend([conn]) + self._flow_scores.extend([conn]) else: - self._flow_scores = [ conn + ["","","",""] for conn in flow_scores ] + self._flow_scores = [ conn + ["","","",""] for conn in flow_scores ] self._logger.info("WARNING: IP location was not added because the file {0} does not exist.".format(iploc_file)) - self._flow_scores.insert(0,flow_headers) + self._flow_scores.insert(0,flow_headers) def _add_reputation(self): - + reputation_conf_file = "{0}/components/reputation/reputation_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - + # add new headers (gtiSrcRep/gtiDstRep). self._logger.info("Adding reputation headers") flow_headers_rep = self._flow_scores[0] flow_headers_rep.extend(["srcIp_rep","dstIp_rep"]) - + # read configuration. self._logger.info("Reading reputation configuration file: {0}".format(reputation_conf_file)) rep_conf = json.loads(open(reputation_conf_file).read()) - + if "gti" in rep_conf and os.path.isfile(rep_conf['gti']['refclient']): rep_conf = rep_conf['gti'] # initialize gti module. @@ -253,14 +253,14 @@ def _add_reputation(self): next(flow_scores_src) # getting reputation for src IPs - src_ips = [ conn[src_ip_index] for conn in flow_scores_src ] + src_ips = [ conn[src_ip_index] for conn in flow_scores_src ] src_rep_results = flow_gti.check(src_ips) self._logger.info("Getting GTI reputation for dst IPs") flow_scores_dst = iter(self._flow_scores) next(flow_scores_dst) - # getting reputation for dst IPs + # getting reputation for dst IPs dst_ips = [ conn[dst_ip_index] for conn in flow_scores_dst ] dst_rep_results = flow_gti.check(dst_ips) @@ -269,18 +269,18 @@ def _add_reputation(self): self._flow_scores = [] flow_scores = [conn + [src_rep_results[conn[src_ip_index]]] + [dst_rep_results[conn[dst_ip_index]]] for conn in flow_scores_final ] - self._flow_scores = flow_scores - + self._flow_scores = flow_scores + else: # add values to gtiSrcRep and gtiDstRep. flow_scores = iter(self._flow_scores) next(flow_scores) - self._flow_scores = [ conn + ["",""] for conn in flow_scores ] - self._logger.info("WARNING: IP reputation was not added. No refclient configured") + self._flow_scores = [ conn + ["",""] for conn in flow_scores ] + self._logger.info("WARNING: IP reputation was not added. No refclient configured") - self._flow_scores.insert(0,flow_headers_rep) + self._flow_scores.insert(0,flow_headers_rep) def _get_oa_details(self): @@ -296,27 +296,27 @@ def _get_oa_details(self): p_sp.join() p_ch.join() - + def _get_suspicious_details(self,bar=None): - + # skip header sp_connections = iter(self._flow_scores) next(sp_connections) - + # loop connections. - connections_added = [] + connections_added = [] for conn in sp_connections: - - # validate if the connection's details are not already extracted. + + # validate if the connection's details are not already extracted. if conn in connections_added: continue else: connections_added.append(conn) - + src_ip_index = self._conf["flow_score_fields"]["srcIP"] dst_ip_index = self._conf["flow_score_fields"]["dstIP"] - # get src ip + # get src ip sip = conn[src_ip_index] # get dst ip dip = conn[dst_ip_index] @@ -326,16 +326,16 @@ def _get_suspicious_details(self,bar=None): date_array_1 = date_array[0].split('-') date_array_2 = date_array[1].split(':') - yr = date_array_1[0] + yr = date_array_1[0] dy = date_array_1[2] mh = date_array_1[1] hr = date_array_2[0] mm = date_array_2[1] - + # connection details query. sp_query = ("SELECT treceived as tstart,sip as srcip,dip as dstip,sport as sport,dport as dport,proto as proto,flag as flags,stos as TOS,ibyt as ibytes,ipkt as ipkts,input as input, output as output,rip as rip, obyt as obytes, opkt as opkts from {0}.{1} where ((sip='{2}' AND dip='{3}') or (sip='{3}' AND dip='{2}')) AND y={8} AND m={4} AND d={5} AND h={6} AND trminute={7} order by tstart limit 100") - + # sp query. sp_query = sp_query.format(self._db,self._table_name,sip,dip,mh,dy,hr,mm,yr) @@ -344,15 +344,15 @@ def _get_suspicious_details(self,bar=None): # execute query self._engine.query(sp_query,output_file=edge_file,delimiter="\\t") - + def _get_chord_details(self,bar=None): # skip header sp_connections = iter(self._flow_scores) - next(sp_connections) + next(sp_connections) src_ip_index = self._conf["flow_score_fields"]["srcIP"] - dst_ip_index = self._conf["flow_score_fields"]["dstIP"] + dst_ip_index = self._conf["flow_score_fields"]["dstIP"] # get date parameters. yr = self._date[:4] @@ -362,29 +362,29 @@ def _get_chord_details(self,bar=None): # get number of times each IP appears. srcdict = {} for conn in sp_connections: - if conn[src_ip_index] in srcdict:srcdict[conn[src_ip_index]] += 1 + if conn[src_ip_index] in srcdict:srcdict[conn[src_ip_index]] += 1 else:srcdict[conn[src_ip_index]] = 1 if conn[dst_ip_index] in srcdict:srcdict[conn[dst_ip_index]] += 1 else:srcdict[conn[dst_ip_index]] = 1 - - for (ip,n) in srcdict.items(): + + for (ip,n) in srcdict.items(): if n > 1: - ip_list = [] + ip_list = [] sp_connections = iter(self._flow_scores) next(sp_connections) - for row in sp_connections: + for row in sp_connections: if ip == row[2] : ip_list.append(row[3]) - if ip == row[3] :ip_list.append(row[2]) + if ip == row[3] :ip_list.append(row[2]) ips = list(set(ip_list)) - + if len(ips) > 1: ips_filter = (",".join(str("'{0}'".format(ip)) for ip in ips)) - chord_file = "{0}/chord-{1}.tsv".format(self._data_path,ip.replace(".","_")) + chord_file = "{0}/chord-{1}.tsv".format(self._data_path,ip.replace(".","_")) ch_query = ("SELECT sip as srcip, dip as dstip, SUM(ibyt) as ibytes, SUM(ipkt) as ipkts from {0}.{1} where y={2} and m={3} \ and d={4} and ( (sip='{5}' and dip IN({6})) or (sip IN({6}) and dip='{5}') ) group by sip,dip") self._engine.query(ch_query.format(self._db,self._table_name,yr,mn,dy,ip,ips_filter),chord_file,delimiter="\\t") - - + + diff --git a/spot-oa/oa/flow/ipynb_templates/Edge_Investigation_master.ipynb b/spot-oa/oa/flow/ipynb_templates/Edge_Investigation_master.ipynb index e916471..927571a 100644 --- a/spot-oa/oa/flow/ipynb_templates/Edge_Investigation_master.ipynb +++ b/spot-oa/oa/flow/ipynb_templates/Edge_Investigation_master.ipynb @@ -10,7 +10,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "collapsed": true + }, "outputs": [], "source": [ "import struct, socket\n", @@ -20,7 +22,8 @@ "import linecache, bisect\n", "import csv, json\n", "import operator\n", - "import os, time, subprocess \n", + "import os, time, subprocess\n", + "import ConfigParser\n", "from collections import OrderedDict\n", "\n", "try:\n", @@ -30,6 +33,11 @@ "\n", "from IPython.display import display, Javascript, clear_output\n", "\n", + "# initialize ConfigParser \n", + "conf_file = '/etc/spot.conf' \n", + "conf = ConfigParser.SafeConfigParser() \n", + "conf.read(conf_file) \n", + "\n", "path = os.getcwd().split(\"/\") \n", "date = path[len(path)-1] \n", "dsource = path[len(path)-2] \n", @@ -55,7 +63,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "collapsed": true + }, "outputs": [], "source": [ "def apply_css_to_select(select):\n", @@ -325,16 +335,13 @@ " except:\n", " print \"Key Error for ip: \" + srcip\n", " \n", - " \n", + "\n", "def ml_feedback():\n", " dst_name = os.path.basename(sconnect)\n", - " str_fb=\"DSOURCE={0} &&\\\n", - " FDATE={1} &&\\\n", - " source /etc/spot.conf &&\\\n", - " usr=$(echo $LUSER | cut -f3 -d'/') &&\\\n", - " mlnode=$MLNODE &&\\\n", - " lpath=$LPATH &&\\\n", - " scp {2} $usr@$mlnode:$lpath/{3}\".format(dsource,date,score_fbk,dst_name) \n", + " mlnode = conf.get('DEFAULT', 'MLNODE')\n", + " lpath = conf.get('DEFAULT','LPATH', vars={'DSOURCE': dsource,'FDATE': date, })\n", + " usr = conf.get('DEFAULT','LUSER').split('/')[2]\n", + " str_fb=\"scp {0} {1}@{2}:{3}/{4}\".format(score_fbk,usr,mlnode,lpath,dst_name) \n", " \n", " subprocess.call(str_fb, shell=True)" ] @@ -349,7 +356,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "collapsed": true + }, "outputs": [], "source": [ "# set_rules()" @@ -358,7 +367,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "collapsed": true + }, "outputs": [], "source": [ "# attack_heuristics()" @@ -367,7 +378,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "collapsed": true + }, "outputs": [], "source": [ "displaythis()" @@ -376,7 +389,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "collapsed": true + }, "outputs": [], "source": [ "# !cp $sconnectbu $sconnect" @@ -392,16 +407,16 @@ "language_info": { "codemirror_mode": { "name": "ipython", - "version": 2.0 + "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", - "version": "2.7.5" + "version": "2.7.11" } }, "nbformat": 4, "nbformat_minor": 0 -} \ No newline at end of file +} diff --git a/spot-oa/oa/flow/ipynb_templates/Threat_Investigation_master.ipynb b/spot-oa/oa/flow/ipynb_templates/Threat_Investigation_master.ipynb index 6393846..e4fe415 100644 --- a/spot-oa/oa/flow/ipynb_templates/Threat_Investigation_master.ipynb +++ b/spot-oa/oa/flow/ipynb_templates/Threat_Investigation_master.ipynb @@ -15,16 +15,20 @@ "import operator\n", "import json\n", "import os\n", + "import ConfigParser\n", "try:\n", " import ipywidgets as widgets # For jupyter/ipython >= 1.4\n", "except ImportError:\n", " from IPython.html import widgets\n", "from IPython.display import display, Javascript, clear_output\n", "\n", - "with open('/etc/spot.conf') as conf:\n", - " for line in conf.readlines(): \n", - " if \"DBNAME=\" in line: DBNAME = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n", - " elif \"IMPALA_DEM=\" in line: IMPALA_DEM = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n", + "# initialize ConfigParser \n", + "conf_file = '/etc/spot.conf' \n", + "conf = ConfigParser.SafeConfigParser() \n", + "conf.read(conf_file) \n", + "\n", + "DBNAME = conf.get('database', 'DBNAME')\n", + "IMPALA_DEM = conf.get('database', 'IMPALA_DEM')\n", "\n", "spath = os.getcwd()\n", "path = spath.split(\"/\") \n", @@ -770,7 +774,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", - "version": "2.7.5" + "version": "2.7.11" } }, "nbformat": 4, diff --git a/spot-oa/oa/proxy/ipynb_templates/Edge_Investigation_master.ipynb b/spot-oa/oa/proxy/ipynb_templates/Edge_Investigation_master.ipynb index 86d6fef..6521ea4 100644 --- a/spot-oa/oa/proxy/ipynb_templates/Edge_Investigation_master.ipynb +++ b/spot-oa/oa/proxy/ipynb_templates/Edge_Investigation_master.ipynb @@ -124,6 +124,7 @@ "import csv\n", "import datetime\n", "import subprocess \n", + "import ConfigParser\n", "\n", "def assign_score(b):\n", " scored_threats = []\n", @@ -183,13 +184,11 @@ "\n", "def ml_feedback():\n", " dst_name = os.path.basename(sconnect)\n", - " str_fb=\"DSOURCE={0} &&\\\n", - " FDATE={1} &&\\\n", - " source /etc/spot.conf &&\\\n", - " usr=$(echo $LUSER | cut -f3 -d'/') &&\\\n", - " mlnode=$MLNODE &&\\\n", - " lpath=$LPATH &&\\\n", - " scp {2} $usr@$mlnode:$lpath/{3}\".format(dsource,date,score_fbk,dst_name)\n", + " mlnode = conf.get('DEFAULT', 'MLNODE')\n", + " lpath = conf.get('DEFAULT','LPATH', vars={'DSOURCE': dsource,'FDATE': date, })\n", + " usr = conf.get('DEFAULT','LUSER').split('/')[2]\n", + " str_fb=\"scp {0} {1}@{2}:{3}/{4}\".format(score_fbk,usr,mlnode,lpath,dst_name) \n", + " \n", " subprocess.call(str_fb, shell=True)" ] }, @@ -221,7 +220,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", - "version": "2.7.5" + "version": "2.7.11" } }, "nbformat": 4, diff --git a/spot-oa/oa/proxy/ipynb_templates/Threat_Investigation_master.ipynb b/spot-oa/oa/proxy/ipynb_templates/Threat_Investigation_master.ipynb index 5cd89db..cc6a855 100644 --- a/spot-oa/oa/proxy/ipynb_templates/Threat_Investigation_master.ipynb +++ b/spot-oa/oa/proxy/ipynb_templates/Threat_Investigation_master.ipynb @@ -20,6 +20,7 @@ "import itertools\n", "import md5\n", "from collections import defaultdict \n", + "import ConfigParser\n", "\n", "try:\n", " import ipywidgets as widgets # For jupyter/ipython >= 1.4\n", @@ -27,11 +28,14 @@ " from IPython.html import widgets\n", "from IPython.display import display, HTML, clear_output, Javascript \n", "\n", - "with open('/etc/spot.conf') as conf:\n", - " for line in conf.readlines():\n", - " if \"DBNAME=\" in line: DBNAME = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n", - " elif \"IMPALA_DEM=\" in line: IMPALA_DEM = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n", - " \n", + "# initialize ConfigParser \n", + "conf_file = '/etc/spot.conf' \n", + "conf = ConfigParser.SafeConfigParser() \n", + "conf.read(conf_file) \n", + "\n", + "DBNAME = conf.get('database', 'DBNAME')\n", + "IMPALA_DEM = conf.get('database', 'IMPALA_DEM')\n", + "\n", "path = os.getcwd().split(\"/\") \n", "date = path[len(path)-1] \n", "dpath = '/'.join(['data' if var == 'ipynb' else var for var in path]) + '/'\n", @@ -396,7 +400,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", - "version": "2.7.5" + "version": "2.7.11" }, "widgets": { "state": { diff --git a/spot-oa/oa/proxy/proxy_oa.py b/spot-oa/oa/proxy/proxy_oa.py index d9b0362..7fbd189 100644 --- a/spot-oa/oa/proxy/proxy_oa.py +++ b/spot-oa/oa/proxy/proxy_oa.py @@ -50,7 +50,7 @@ def _initialize_members(self,date,limit,logger): self._conf = json.loads(open (conf_file).read(),object_pairs_hook=OrderedDict) # initialize data engine - self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", "").replace('"', '') + self._db = self._spot_conf.get('DEFAULT', 'DBNAME') self._engine = Data(self._db, self._table_name,self._logger) @@ -104,7 +104,7 @@ def _get_proxy_results(self): proxy_results = "{0}/proxy_results.csv".format(self._data_path) # get hdfs path from conf file. - HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", "").replace('"', '') + HUSER = self._spot_conf.get('DEFAULT', 'HUSER') hdfs_path = "{0}/proxy/scored_results/{1}/scores/proxy_results.csv".format(HUSER,self._date) # get results file from hdfs. @@ -276,7 +276,7 @@ def _get_proxy_details(self,fulluri,clientip,conn_hash,year,month,day,hh,proxy_i # due an issue with the output of the query. update_rows = [ [ w.replace('"','') for w in l ] for l in update_rows ] - + # create edge file. self._logger.info("Creating edge file:{0}".format(edge_file)) diff --git a/spot-oa/oa/utils.py b/spot-oa/oa/utils.py index 553315a..b430746 100644 --- a/spot-oa/oa/utils.py +++ b/spot-oa/oa/utils.py @@ -6,18 +6,18 @@ import ConfigParser class Util(object): - + @classmethod def get_logger(cls,logger_name,create_file=False): - + # create logger for prd_ci log = logging.getLogger(logger_name) log.setLevel(level=logging.INFO) - + # create formatter and add it to the handlers formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - + if create_file: # create file handler for logger. fh = logging.FileHandler('oa.log') @@ -37,15 +37,15 @@ def get_logger(cls,logger_name,create_file=False): @classmethod def get_spot_conf(cls): - + conf_file = "/etc/spot.conf" - config = ConfigParser.ConfigParser() - config.readfp(SecHead(open(conf_file))) + config = ConfigParser.SafeConfigParser() + config.read(conf_file) return config - + @classmethod - def create_oa_folders(cls,type,date): + def create_oa_folders(cls,type,date): # create date and ingest summary folder structure if they don't' exist. root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) @@ -59,9 +59,9 @@ def create_oa_folders(cls,type,date): # retun path to folders. data_path = data_type_folder.format(root_path,type,date) - ingest_path = data_type_folder.format(root_path,type,"ingest_summary") + ingest_path = data_type_folder.format(root_path,type,"ingest_summary") return data_path,ingest_path,ipynb_folder - + @classmethod def get_ml_results_form_hdfs(cls,hdfs_file_path,local_path): @@ -72,7 +72,7 @@ def get_ml_results_form_hdfs(cls,hdfs_file_path,local_path): @classmethod def read_results(cls,file,limit, delimiter=','): - + # read csv results. result_rows = [] with open(file, 'rb') as results_file: @@ -87,17 +87,17 @@ def read_results(cls,file,limit, delimiter=','): @classmethod def ip_to_int(self,ip): - + try: o = map(int, ip.split('.')) res = (16777216 * o[0]) + (65536 * o[1]) + (256 * o[2]) + o[3] - return res + return res except ValueError: return None - + @classmethod - def create_csv_file(cls,full_path_file,content,delimiter=','): + def create_csv_file(cls,full_path_file,content,delimiter=','): with open(full_path_file, 'w+') as u_file: writer = csv.writer(u_file, quoting=csv.QUOTE_NONE, delimiter=delimiter) @@ -110,11 +110,11 @@ def __init__(self, fp): def readline(self): if self.sechead: - try: + try: return self.sechead - finally: + finally: self.sechead = None - else: + else: return self.fp.readline() class ProgressBar(object): @@ -131,26 +131,26 @@ def __init__(self,total,prefix='',sufix='',decimals=2,barlength=60): def start(self): self._move_progress_bar(0) - + def update(self,iterator): - + self._move_progress_bar(iterator) def auto_update(self): - self._auto_iteration_status += 1 + self._auto_iteration_status += 1 self._move_progress_bar(self._auto_iteration_status) - + def _move_progress_bar(self,iteration): filledLength = int(round(self._bar_length * iteration / float(self._total))) percents = round(100.00 * (iteration / float(self._total)), self._decimals) - bar = '#' * filledLength + '-' * (self._bar_length - filledLength) - sys.stdout.write("{0} [{1}] {2}% {3}\r".format(self._prefix, bar, percents, self._sufix)) + bar = '#' * filledLength + '-' * (self._bar_length - filledLength) + sys.stdout.write("{0} [{1}] {2}% {3}\r".format(self._prefix, bar, percents, self._sufix)) sys.stdout.flush() - + if iteration == self._total:print("\n") - - + + diff --git a/spot-setup/hdfs_setup.py b/spot-setup/hdfs_setup.py new file mode 100755 index 0000000..b5889cd --- /dev/null +++ b/spot-setup/hdfs_setup.py @@ -0,0 +1,96 @@ +#!/usr/bin/python + +import os +import sys +import logging +import subprocess +import ConfigParser + +def main(): + + # initialize logging + logger = get_logger('SPOT.HDFS.SETUP',create_file=False) + + # initialize ConfigParser + conf_file = '/etc/spot.conf' + conf = ConfigParser.SafeConfigParser() + spot_conf = conf.read(conf_file) + # check for file + if len(spot_conf) < 1: + logger.info("Failed to open /etc/spot.conf, check file location and try again") + raise SystemExit + + # Get configuration + DSOURCES = conf.get('DEFAULT','DSOURCES').split() + DFOLDERS = conf.get('DEFAULT','DFOLDERS').split() + HUSER = conf.get('DEFAULT','HUSER') + USER = os.environ.get('USER') + DBNAME = conf.get('database','DBNAME') + + # create hdfs folders + mkdir = "sudo -u hdfs hadoop fs -mkdir " + HUSER + execute_cmd(mkdir,logger) + chown = "sudo -u hdfs hadoop fs -chown " + USER + ":supergroup " + HUSER + execute_cmd(chown,logger) + + for source in DSOURCES: + cmd = "hadoop fs -mkdir {0}/{1}".format(HUSER,source) + execute_cmd(cmd,logger) + for folder in DFOLDERS: + cmd = "hadoop fs -mkdir {0}/{1}/{2}".format(HUSER,source,folder) + execute_cmd(cmd,logger) + + # Create hive tables + # create catalog + cmd = "hive -e 'CREATE DATABASE {0}'".format(DBNAME) + execute_cmd(cmd,logger) + + for source in DSOURCES: + cmd = "hive -hiveconf huser={0} -hiveconf dbname={1} -f create_{2}_avro_parquet.hql".format(HUSER,DBNAME,source) + execute_cmd(cmd,logger) + +def execute_cmd(command,logger): + + try: + logger.info("Executing: {0}".format(command)) + subprocess.call(command,shell=True) + + except subprocess.CalledProcessError as e: + logger.error("There was an error executing: {0}".format(e.cmd)) + sys.exit(1) + +def validate_parameter(parameter,message,logger): + if parameter == None or parameter == "": + logger.error(message) + sys.exit(1) + +def get_logger(logger_name,create_file=False): + + # create logger for prd_ci + log = logging.getLogger(logger_name) + log.setLevel(level=logging.INFO) + + # create formatter and add it to the handlers + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + if create_file: + # create file handler for logger. + fh = logging.FileHandler('SPOT.log') + fh.setLevel(level=logging.DEBUG) + fh.setFormatter(formatter) + # create console handler for logger. + ch = logging.StreamHandler() + ch.setLevel(level=logging.DEBUG) + ch.setFormatter(formatter) + + # add handlers to logger. + if create_file: + log.addHandler(fh) + + log.addHandler(ch) + return log + + + +if __name__=='__main__': + main() diff --git a/spot-setup/hdfs_setup.sh b/spot-setup/hdfs_setup.sh deleted file mode 100755 index 4d61574..0000000 --- a/spot-setup/hdfs_setup.sh +++ /dev/null @@ -1,35 +0,0 @@ -#!/bin/bash - -DSOURCES=('flow' 'dns' 'proxy') -DFOLDERS=('binary' 'hive' 'stage') -source /etc/spot.conf - -# -# creating HDFS user's folder -# -hadoop fs -mkdir ${HUSER} -hadoop fs -chown ${USER}:supergroup ${HUSER} - -for d in "${DSOURCES[@]}" -do - echo "creating /$d" - hadoop fs -mkdir ${HUSER}/$d - for f in "${DFOLDERS[@]}" - do - echo "creating $d/$f" - hadoop fs -mkdir ${HUSER}/$d/$f - done -done - -# -# create hive tables -# -#configure / create catalog -hive -e "CREATE DATABASE ${DBNAME}" - -for d in "${DSOURCES[@]}" -do - hive -hiveconf huser=${HUSER} -hiveconf dbname=${DBNAME} -f create_${d}_avro_parquet.hql -done - - diff --git a/spot-setup/spot.conf b/spot-setup/spot.conf index f393689..0a1fd66 100755 --- a/spot-setup/spot.conf +++ b/spot-setup/spot.conf @@ -1,53 +1,107 @@ -#node configuration -NODES=('node-01' 'node-02') -UINODE='node03' -MLNODE='node04' -GWNODE='node16' -DBNAME='spot' - -#hdfs - base user and data source config -HUSER='/user/spot' -DSOURCES='flow' -DFOLDERS=('binary' 'csv' 'hive' 'stage') -DNS_PATH=${HUSER}/${DSOURCE}/hive/y=${YR}/m=${MH}/d=${DY}/ -PROXY_PATH=${HUSER}/${DSOURCE}/hive/y=${YR}/m=${MH}/d=${DY}/ -FLOW_PATH=${HUSER}/${DSOURCE}/hive/y=${YR}/m=${MH}/d=${DY}/ -HPATH=${HUSER}/${DSOURCE}/scored_results/${FDATE} - -#impala config -IMPALA_DEM='node04' +#spot.conf should always be located in /etc/spot.conf +#file formated as standard .INI +[DEFAULT] +#base user and data source config +#HDFS +HUSER=/user/spot +DSOURCES=flow dns proxy +DFOLDERS=binary csv hive stage +HPATH=%(HUSER)s/%(TYPE)s/scored_results/%(FDATE)s +#local fs +LUSER=/home/spot +LPATH=%(LUSER)s/ml/%(TYPE)s/%(FDATE)s +RPATH=%(LUSER)s/ipython/user/%(FDATE)s +LDAPATH=%(LUSER)s/ml/oni-lda-c +LIPATH=%(LUSER)s/ingest +#Spot Components +UINODE=node03 +MLNODE=node04 +GWNODE=node16 +#list of Worker nodes, excluding MLNODE +NODES= + node-01 + node-02 + node-03 + +#Default values affecting Machine Learning Results +MAXRESULTS=3000 +TOL=1e-6 +DUPFACTOR=1000 +TOPIC_COUNT=20 + +[database] +IMPALA_DEM=node04 +DBNAME=spot + +[kerberos] KRB_AUTH=false KINITPATH= KINITOPTS= KEYTABPATH= KRB_USER= -#local fs base user and data source config -LUSER='/home/spot' -LPATH=${LUSER}/ml/${DSOURCE}/${FDATE} -RPATH=${LUSER}/ipython/user/${FDATE} -LDAPATH=${LUSER}/ml/oni-lda-c -LIPATH=${LUSER}/ingest -SPK_EXEC='400' -SPK_EXEC_MEM='2048m' -SPK_DRIVER_MEM='' -SPK_DRIVER_MAX_RESULTS='' -SPK_EXEC_CORES='' -SPK_DRIVER_MEM_OVERHEAD='' -SPAK_EXEC_MEM_OVERHEAD='' -TOL='1e-6' +#Set SPK_CONFIG to True to use "Optional Values" in spark-submit command +#for more information see ./spot-ml/SPARKCONF.md +[spark] +#default options +SPK_EXEC=400 +SPK_EXEC_MEM=2048m +#Optional Values +SPK_CONFIG= +SPK_DRIVER_MEM= +SPK_DRIVER_MAX_RESULTS= +SPK_EXEC_CORES= +SPK_DRIVER_MEM_OVERHEAD= +SPK_EXEC_MEM_OVERHEAD= -# MPI configuration +[mpi] +#command to run MPI +MPI_CMD=mpiexec +#command to prepare system for MPI, eg. load environment variables +MPI_PREP_CMD= +#number of processes to run in MPI +PROCESS_COUNT=20 -# command to run MPI -MPI_CMD='mpiexec' +[ingest] +hdfs_app_path=hdfs application path +collector_processes=5 +ingestion_interval=1 +kafka_server=kafka ip +kafka_port=9183 +zookeeper_server=localhost +zookeeper_port=2181 +message_size=999999 +# spark streaming options +driver_memory= +spark_exec= +spark_executor_memory= +spark_executor_cores= +spark_batch_size= -# command to prepare system for MPI, eg. load environment variables -MPI_PREP_CMD='' +[flow] +type=flow +collector_path=/path_to_flow_collector +local_staging=/tmp/ +process_opt="" +FLOW_PATH=%(HUSER)s/%(TYPE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ +supported_files=nfcapd. -# number of processes to run in MPI -PROCESS_COUNT=20 +[dns] +type=dns +collector_path=/path_to_dns_collector +local_staging=/tmp/ +pcap_split_staging=/tmp/ +process_opt="-E separator=, -E header=y -E occurrence=f -T fields -e frame.time -e frame.time_epoch -e frame.len -e ip.src -e ip.dst -e dns.resp.name -e dns.resp.type -e dns.resp.class -e dns.flags.rcode -e dns.a 'dns.flags.response == 1'" +DNS_PATH=%(HUSER)s/%(TYPE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ +supported_files=.pcap +[proxy] +type=proxy +collector_path=/path_to_proxy_collector +supported_files=["log"] +parser=bro_parser.py +PROXY_PATH=%(HUSER)s/%(TYPE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ +supported_files=.log