From 4d9b78e05abc993624f928ea4498533380b4af1b Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Thu, 3 Nov 2016 17:04:07 -0700 Subject: [PATCH 01/22] moving spot.conf to .INI file structure --- spot-setup/spot.conf | 59 +++++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/spot-setup/spot.conf b/spot-setup/spot.conf index f393689..a35951e 100755 --- a/spot-setup/spot.conf +++ b/spot-setup/spot.conf @@ -1,35 +1,42 @@ -#node configuration -NODES=('node-01' 'node-02') +#spot.conf should always be located in /etc/spot.conf +#file formated as standard .INI +[DEFAULT] +#base user and data source config +#HDFS +HUSER='/user/spot' +DSOURCES='flow','dns','proxy' +DFOLDERS='binary','csv','hive','stage' +DNS_PATH=%(HUSER)s/%(DSOURCE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ +PROXY_PATH=%(HUSER)s/%(DSOURCE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ +FLOW_PATH=%(HUSER)s/%(DSOURCE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ +##removing FDATE for now********* +HPATH=%(HUSER)s/%(DSOURCE)s/scored_results/ +#local fs +LUSER='/home/spot' +LPATH=%(LUSER)s/ml/%(DSOURCE)s/%(FDATE)s +RPATH=%(LUSER)s/ipython/user/%(FDATE)s +LDAPATH=%(LUSER)s/ml/oni-lda-c +LIPATH=%(LUSER)s/ingest + +[nodes] +NODES='node-01','node-02' UINODE='node03' MLNODE='node04' GWNODE='node16' DBNAME='spot' -#hdfs - base user and data source config -HUSER='/user/spot' -DSOURCES='flow' -DFOLDERS=('binary' 'csv' 'hive' 'stage') -DNS_PATH=${HUSER}/${DSOURCE}/hive/y=${YR}/m=${MH}/d=${DY}/ -PROXY_PATH=${HUSER}/${DSOURCE}/hive/y=${YR}/m=${MH}/d=${DY}/ -FLOW_PATH=${HUSER}/${DSOURCE}/hive/y=${YR}/m=${MH}/d=${DY}/ -HPATH=${HUSER}/${DSOURCE}/scored_results/${FDATE} - -#impala config +[impala_config] IMPALA_DEM='node04' +[kerberos] KRB_AUTH=false KINITPATH= KINITOPTS= KEYTABPATH= KRB_USER= -#local fs base user and data source config -LUSER='/home/spot' -LPATH=${LUSER}/ml/${DSOURCE}/${FDATE} -RPATH=${LUSER}/ipython/user/${FDATE} -LDAPATH=${LUSER}/ml/oni-lda-c -LIPATH=${LUSER}/ingest - +[spark] +SPK_CONFIG=false SPK_EXEC='400' SPK_EXEC_MEM='2048m' SPK_DRIVER_MEM='' @@ -40,14 +47,10 @@ SPAK_EXEC_MEM_OVERHEAD='' TOL='1e-6' -# MPI configuration - -# command to run MPI +[mpi] +#command to run MPI MPI_CMD='mpiexec' - -# command to prepare system for MPI, eg. load environment variables +#command to prepare system for MPI, eg. load environment variables MPI_PREP_CMD='' - -# number of processes to run in MPI -PROCESS_COUNT=20 - +#number of processes to run in MPI +PROCESS_COUNT=20 \ No newline at end of file From 98b933a1760f596f8f224041c4cd501b1bdcecea Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Fri, 4 Nov 2016 22:30:18 -0700 Subject: [PATCH 02/22] convert hdfs_set.sh to .py --- spot-setup/hdfs_setup.py | 91 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 spot-setup/hdfs_setup.py diff --git a/spot-setup/hdfs_setup.py b/spot-setup/hdfs_setup.py new file mode 100644 index 0000000..a6311a4 --- /dev/null +++ b/spot-setup/hdfs_setup.py @@ -0,0 +1,91 @@ +import os +import logging +import subprocess +import ConfigParser + +def main(): + + #initialize logging + logger = get_logger('SPOT.HDFS.SETUP',create_file=False) + #initialize ConfigParser + conf_file = '/etc/spot.conf' + conf = ConfigParser.SafeConfigParser() + spot_conf = conf.read(conf_file) + + #check for file + if len(spot_conf) < 1: + logger("Failed to open /etc/spot.conf, check file location and try again") + raise SystemExit + + #Get configuration + DSOURCES = conf.get('DEFAULT','DSOURCES').split() + DFOLDERS = conf.get('DEFAULT','DFOLDERS').split() + HUSER = conf.get('DEFAULT','HUSER') + USER = os.environ.get('USER') + DBNAME = conf.get('DATABASE','DBNAME') + + #create hdfs folders + mkdir = "sudo -u hdfs hadoop fs -mkdir " + HUSER + execute_cmd(mkdir,logger) + chown = "sudo -u hdfs hadoop fs -chown " + USER + ":supergroup " + HUSER + execute_cmd(chown,logger) + + for source in DSOURCES: + cmd = "hadoop fs -mkdir {0}/{1}".format(HUSER,source) + execute_cmd(cmd,logger) + for folder in DFOLDERS: + cmd = "hadoop fs -mkdir {0}/{1}/{2}".format(HUSER,source,folder) + execute_cmd(cmd,logger) + + #Create hive tables + #create catalog + cmd = hive -e "CREATE DATABASE ${DBNAME}" + for source in DSOURCES: + cmd = "hive -hiveconf huser={0} -hiveconf dbname={1} -f create_{2}_avro_parquet.hql".format(HUSER,DBNAME,source) + execute_cmd(cmd,logger) + +def execute_cmd(command,logger): + + try: + logger.info("Executing: {0}".format(command)) + subprocess.call(command,shell=True) + + except subprocess.CalledProcessError as e: + logger.error("There was an error executing: {0}".format(e.cmd)) + sys.exit(1) + +def validate_parameter(parameter,message,logger): + if parameter == None or parameter == "": + logger.error(message) + sys.exit(1) + +def get_logger(logger_name,create_file=False): + + # create logger for prd_ci + log = logging.getLogger(logger_name) + log.setLevel(level=logging.INFO) + + # create formatter and add it to the handlers + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + if create_file: + # create file handler for logger. + fh = logging.FileHandler('SPOT.log') + fh.setLevel(level=logging.DEBUG) + fh.setFormatter(formatter) + # reate console handler for logger. + ch = logging.StreamHandler() + ch.setLevel(level=logging.DEBUG) + ch.setFormatter(formatter) + + # add handlers to logger. + if create_file: + log.addHandler(fh) + + log.addHandler(ch) + return log + + + +if __name__=='__main__': + main() \ No newline at end of file From 6462293314c20590860bfac39cd3fe9f6b24305f Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Fri, 4 Nov 2016 22:30:33 -0700 Subject: [PATCH 03/22] reformat spot.conf --- spot-setup/spot.conf | 48 ++++++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/spot-setup/spot.conf b/spot-setup/spot.conf index a35951e..862c0a5 100755 --- a/spot-setup/spot.conf +++ b/spot-setup/spot.conf @@ -1,32 +1,36 @@ #spot.conf should always be located in /etc/spot.conf #file formated as standard .INI + [DEFAULT] #base user and data source config #HDFS -HUSER='/user/spot' -DSOURCES='flow','dns','proxy' -DFOLDERS='binary','csv','hive','stage' +HUSER=/user/spot +DSOURCES=flow dns proxy +DFOLDERS=binary csv hive stage DNS_PATH=%(HUSER)s/%(DSOURCE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ PROXY_PATH=%(HUSER)s/%(DSOURCE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ FLOW_PATH=%(HUSER)s/%(DSOURCE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ ##removing FDATE for now********* HPATH=%(HUSER)s/%(DSOURCE)s/scored_results/ #local fs -LUSER='/home/spot' +LUSER=/home/spot LPATH=%(LUSER)s/ml/%(DSOURCE)s/%(FDATE)s RPATH=%(LUSER)s/ipython/user/%(FDATE)s LDAPATH=%(LUSER)s/ml/oni-lda-c LIPATH=%(LUSER)s/ingest +#Spot architecture +#list where each spot component will run from +UINODE=node03 +MLNODE=node04 +GWNODE=node16 -[nodes] -NODES='node-01','node-02' -UINODE='node03' -MLNODE='node04' -GWNODE='node16' -DBNAME='spot' +NODES= + node-01 + node-02 -[impala_config] -IMPALA_DEM='node04' +[database] +IMPALA_DEM=node04 +DBNAME=spot [kerberos] KRB_AUTH=false @@ -37,20 +41,20 @@ KRB_USER= [spark] SPK_CONFIG=false -SPK_EXEC='400' -SPK_EXEC_MEM='2048m' -SPK_DRIVER_MEM='' -SPK_DRIVER_MAX_RESULTS='' -SPK_EXEC_CORES='' -SPK_DRIVER_MEM_OVERHEAD='' -SPAK_EXEC_MEM_OVERHEAD='' -TOL='1e-6' +SPK_EXEC=400 +SPK_EXEC_MEM=2048m +SPK_DRIVER_MEM=None +SPK_DRIVER_MAX_RESULTS=None +SPK_EXEC_CORES=None +SPK_DRIVER_MEM_OVERHEAD=None +SPAK_EXEC_MEM_OVERHEAD=None +TOL=1e-6 [mpi] #command to run MPI -MPI_CMD='mpiexec' +MPI_CMD=mpiexec #command to prepare system for MPI, eg. load environment variables -MPI_PREP_CMD='' +MPI_PREP_CMD= #number of processes to run in MPI PROCESS_COUNT=20 \ No newline at end of file From 773e2a1d29dfe2bf8942e16de26e222b805d7580 Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Mon, 7 Nov 2016 14:36:42 -0800 Subject: [PATCH 04/22] organized by components, changed DSOURCE to TYPE --- spot-setup/spot.conf | 47 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/spot-setup/spot.conf b/spot-setup/spot.conf index 862c0a5..884c390 100755 --- a/spot-setup/spot.conf +++ b/spot-setup/spot.conf @@ -7,9 +7,6 @@ HUSER=/user/spot DSOURCES=flow dns proxy DFOLDERS=binary csv hive stage -DNS_PATH=%(HUSER)s/%(DSOURCE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ -PROXY_PATH=%(HUSER)s/%(DSOURCE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ -FLOW_PATH=%(HUSER)s/%(DSOURCE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ ##removing FDATE for now********* HPATH=%(HUSER)s/%(DSOURCE)s/scored_results/ #local fs @@ -19,11 +16,12 @@ RPATH=%(LUSER)s/ipython/user/%(FDATE)s LDAPATH=%(LUSER)s/ml/oni-lda-c LIPATH=%(LUSER)s/ingest #Spot architecture -#list where each spot component will run from +#list where each Spot component will run from UINODE=node03 MLNODE=node04 GWNODE=node16 - +MAXRESULTS=3000 +TOL=1e-6 NODES= node-01 node-02 @@ -39,17 +37,16 @@ KINITOPTS= KEYTABPATH= KRB_USER= + [spark] -SPK_CONFIG=false +SPK_CONFIG=False SPK_EXEC=400 SPK_EXEC_MEM=2048m SPK_DRIVER_MEM=None SPK_DRIVER_MAX_RESULTS=None SPK_EXEC_CORES=None SPK_DRIVER_MEM_OVERHEAD=None -SPAK_EXEC_MEM_OVERHEAD=None -TOL=1e-6 - +SPK_EXEC_MEM_OVERHEAD=None [mpi] #command to run MPI @@ -57,4 +54,34 @@ MPI_CMD=mpiexec #command to prepare system for MPI, eg. load environment variables MPI_PREP_CMD= #number of processes to run in MPI -PROCESS_COUNT=20 \ No newline at end of file +PROCESS_COUNT=20 + +[ingest] +hdfs_app_path=hdfs application path +kafka_server=kafka ip +kafka_port=9183 +zookeeper_server=localhost +zookeeper_port=2181 +message_size=999999 + +[flow] +type=flow +collector_path=/path_to_flow_collector +local_staging=/tmp/ +process_opt="" +FLOW_PATH=%(HUSER)s/%(DSOURCE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ + +[dns] +type=dns +collector_path=/path_to_dns_collector +local_staging=/tmp/ +pcap_split_staging=/tmp/ +process_opt="-E separator=, -E header=y -E occurrence=f -T fields -e frame.time -e frame.time_epoch -e frame.len -e ip.src -e ip.dst -e dns.resp.name -e dns.resp.type -e dns.resp.class -e dns.flags.rcode -e dns.a 'dns.flags.response == 1'" +DNS_PATH=%(HUSER)s/%(DSOURCE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ + +[proxy] +type=proxy +collector_path=/path_to_proxy_collector +supported_files=["log"] +parser=bro_parser.py +PROXY_PATH=%(HUSER)s/%(DSOURCE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ From 975e364322662a944a9ba290d0a1350419327a81 Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Mon, 7 Nov 2016 14:39:00 -0800 Subject: [PATCH 05/22] fixed DSOURCE-->TYPE --- spot-setup/spot.conf | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/spot-setup/spot.conf b/spot-setup/spot.conf index 884c390..237b2e6 100755 --- a/spot-setup/spot.conf +++ b/spot-setup/spot.conf @@ -8,10 +8,10 @@ HUSER=/user/spot DSOURCES=flow dns proxy DFOLDERS=binary csv hive stage ##removing FDATE for now********* -HPATH=%(HUSER)s/%(DSOURCE)s/scored_results/ +HPATH=%(HUSER)s/%(TYPE)s/scored_results/ #local fs LUSER=/home/spot -LPATH=%(LUSER)s/ml/%(DSOURCE)s/%(FDATE)s +LPATH=%(LUSER)s/ml/%(TYPE)s/%(FDATE)s RPATH=%(LUSER)s/ipython/user/%(FDATE)s LDAPATH=%(LUSER)s/ml/oni-lda-c LIPATH=%(LUSER)s/ingest @@ -69,7 +69,7 @@ type=flow collector_path=/path_to_flow_collector local_staging=/tmp/ process_opt="" -FLOW_PATH=%(HUSER)s/%(DSOURCE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ +FLOW_PATH=%(HUSER)s/%(TYPE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ [dns] type=dns @@ -77,11 +77,11 @@ collector_path=/path_to_dns_collector local_staging=/tmp/ pcap_split_staging=/tmp/ process_opt="-E separator=, -E header=y -E occurrence=f -T fields -e frame.time -e frame.time_epoch -e frame.len -e ip.src -e ip.dst -e dns.resp.name -e dns.resp.type -e dns.resp.class -e dns.flags.rcode -e dns.a 'dns.flags.response == 1'" -DNS_PATH=%(HUSER)s/%(DSOURCE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ +DNS_PATH=%(HUSER)s/%(TYPE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ [proxy] type=proxy collector_path=/path_to_proxy_collector supported_files=["log"] parser=bro_parser.py -PROXY_PATH=%(HUSER)s/%(DSOURCE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ +PROXY_PATH=%(HUSER)s/%(TYPE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ From d60c48775e7b941b46f948df7f7c432f71c5af8c Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Tue, 15 Nov 2016 18:02:53 -0800 Subject: [PATCH 06/22] switched python cmd to ConfigParser --- spot-ingest/start_ingest_standalone.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spot-ingest/start_ingest_standalone.sh b/spot-ingest/start_ingest_standalone.sh index 5a0a2de..dd9d2f5 100755 --- a/spot-ingest/start_ingest_standalone.sh +++ b/spot-ingest/start_ingest_standalone.sh @@ -26,8 +26,8 @@ fi # Validate type (ingest conf). #----------------------------------------------------------------------------------- CONF_FILE="ingest_conf.json" -CONF_ING=`python -c "import json,sys;obj=json.loads(open('ingest_conf.json').read());print obj['pipelines']['${INGEST_CONF}'];"` -TYPE=`python -c "import json,sys;obj=json.loads(open('ingest_conf.json').read());print obj['pipelines']['${INGEST_CONF}']['type'];"` +CONF_ING=`python -c "import ConfigParser; conf = ConfigParser.SafeConfigParser(); conf.read('spot.conf'); result = '${INGEST_CONF}' if conf.has_section('${INGEST_CONF}') == True else ''; print result"` +TYPE=`python -c "import ConfigParser; conf = ConfigParser.SafeConfigParser(); conf.read('spot.conf'); print(conf.get('${INGEST_CONF}','type'))"` if [ -z "$CONF_ING" ]; then echo "Provided type is not part of ${CONF_FILE}" From 4138a47a86857be0e1a78af96ef00c527d1c89a3 Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Tue, 15 Nov 2016 18:09:13 -0800 Subject: [PATCH 07/22] added hdfs_setup.py --- spot-setup/hdfs_setup.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/spot-setup/hdfs_setup.py b/spot-setup/hdfs_setup.py index a6311a4..484644e 100644 --- a/spot-setup/hdfs_setup.py +++ b/spot-setup/hdfs_setup.py @@ -4,26 +4,26 @@ import ConfigParser def main(): - + #initialize logging logger = get_logger('SPOT.HDFS.SETUP',create_file=False) + #initialize ConfigParser conf_file = '/etc/spot.conf' conf = ConfigParser.SafeConfigParser() spot_conf = conf.read(conf_file) - #check for file if len(spot_conf) < 1: - logger("Failed to open /etc/spot.conf, check file location and try again") + logger.info("Failed to open /etc/spot.conf, check file location and try again") raise SystemExit - + #Get configuration DSOURCES = conf.get('DEFAULT','DSOURCES').split() DFOLDERS = conf.get('DEFAULT','DFOLDERS').split() HUSER = conf.get('DEFAULT','HUSER') USER = os.environ.get('USER') DBNAME = conf.get('DATABASE','DBNAME') - + #create hdfs folders mkdir = "sudo -u hdfs hadoop fs -mkdir " + HUSER execute_cmd(mkdir,logger) @@ -39,7 +39,7 @@ def main(): #Create hive tables #create catalog - cmd = hive -e "CREATE DATABASE ${DBNAME}" + cmd = "hive -e 'CREATE DATABASE {0}'".format(DBNAME) for source in DSOURCES: cmd = "hive -hiveconf huser={0} -hiveconf dbname={1} -f create_{2}_avro_parquet.hql".format(HUSER,DBNAME,source) execute_cmd(cmd,logger) @@ -58,7 +58,7 @@ def validate_parameter(parameter,message,logger): if parameter == None or parameter == "": logger.error(message) sys.exit(1) - + def get_logger(logger_name,create_file=False): # create logger for prd_ci @@ -73,7 +73,7 @@ def get_logger(logger_name,create_file=False): fh = logging.FileHandler('SPOT.log') fh.setLevel(level=logging.DEBUG) fh.setFormatter(formatter) - # reate console handler for logger. + # create console handler for logger. ch = logging.StreamHandler() ch.setLevel(level=logging.DEBUG) ch.setFormatter(formatter) @@ -88,4 +88,4 @@ def get_logger(logger_name,create_file=False): if __name__=='__main__': - main() \ No newline at end of file + main() From 68c3ceee5758152bbf405b67a10dd94480fe0e01 Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Tue, 15 Nov 2016 18:11:20 -0800 Subject: [PATCH 08/22] added ml_ops.py for more advanced useage --- spot-ml/ml_ops.py | 200 +++++++++++++++++++++++++++++++++++++++++++ spot-setup/spot.conf | 33 ++++--- 2 files changed, 221 insertions(+), 12 deletions(-) create mode 100755 spot-ml/ml_ops.py diff --git a/spot-ml/ml_ops.py b/spot-ml/ml_ops.py new file mode 100755 index 0000000..d4a1828 --- /dev/null +++ b/spot-ml/ml_ops.py @@ -0,0 +1,200 @@ +import os +import logging +import argparse +import subprocess +import ConfigParser + +def main(): + + #initialize ConfigParser + conf_file = '../spot-setup/spot.conf' + conf = ConfigParser.SafeConfigParser() + spot_conf = conf.read(conf_file) + + #initialize logging + logger = get_logger('SPOT.ML.OPS',create_file=False) + + #check for file + if len(spot_conf) < 1: + logger.info("Failed to open /etc/spot.conf, check file location and try again") + raise SystemExit + + ## parse and validate arguments + tol = None + + # input Parameters + parser = argparse.ArgumentParser(description="ML Operations script") + parser.add_argument('-t','--type',dest='type',required=True,help='Type of data that will be processed',metavar='') + parser.add_argument('-d','--date',dest='fdate',required=True,help='Specify date to be analyzed by ML',metavar='') + parser.add_argument('-T','--tol',dest='tol',required=False,help='If present will override default TOL from conf file',metavar='') + parser.add_argument('-m','--maxResults',dest='MAXRESULTS',required=False,help='If defined sets max results returned',metavar='') + args = parser.parse_args() + + YR=args.fdate[0:4] + MH=args.fdate[4:6] + DY=args.fdate[6:8] + + #getting defaults for ConfigParser interpolation + DEFAULTS = vars(args) + DEFAULTS.update({'YR':YR,'MH':MH,'DY':DY}) + + ## prepare parameters pipeline stages + HPATH = conf.get('DEFAULT','HPATH',vars=DEFAULTS) + LPATH = conf.get('DEFAULT','LPATH',vars=DEFAULTS) + MAXRESULTS = conf.get('DEFAULT','MAXRESULTS') + DUPFACTOR = conf.get('DEFAULT','DUPFACTOR') + RAWDATA_PATH = conf.get(args.type,'{0}_PATH'.format(args.type.upper()), vars=DEFAULTS) + FEEDBACK_PATH = "{0}/{1}_scores.csv".format(conf.get('DEFAULT','LPATH',vars=DEFAULTS),args.type) + PREPROCESS_STEP = "{0}_pre_lda".format(args.type) + POSTPROCESS_STEP = "{0}_post_lda".format(args.type) + HDFS_WORDCOUNTS = "{0}/word_counts".format(HPATH) + + ## paths for intermediate files + HDFS_DOCRESULTS = "{0}/doc_results.csv".format(HPATH) + LOCAL_DOCRESULTS = "{0}/doc_results.csv".format(LPATH) + HDFS_WORDRESULTS = "{0}/word_results.csv".format(HPATH) + LOCAL_WORDRESULTS = "{0}/word_results.csv".format(LPATH) + + HDFS_SCORED_CONNECTS = "{0}/scores".format(HPATH) + + LDA_OUTPUT_DIR = "{0}/{1}".format(args.type,args.fdate) + + #get nodes and create comma seperated list + NODES = conf.get('DEFAULT','NODES').split() + nodes_csl = ','.join(NODES) + + cmd = "hdfs dfs -rm -R -f {0}".format(HDFS_WORDCOUNTS) + execute_cmd(cmd,logger) + + cmd = "mkdir -p {0}".format(LPATH) + execute_cmd(cmd,logger) + + # protect the flow_scores.csv file + cmd = "rm -f {0}/*.{dat,beta,gamma,other,pkl}".format(LPATH) + execute_cmd(cmd,logger) + + cmd = "hdfs dfs -rm -R -f {0}".format(HDFS_SCORED_CONNECTS) + execute_cmd(cmd,logger) + + # Add -p to execute pre MPI command. + # Pre MPI command can be configured in /etc/spot.conf + # In this script, after the line after --mpicmd ${MPI_CMD} add: + # --mpiprep ${MPI_PREP_CMD} + + if conf.get('mpi',"MPI_PREP_CMD"): + cmd = conf.get('mpi',"MPI_PREP_CMD") + execute_cmd(cmd,logger) + + SPK_CONFIG = conf.get('spark','SPK_CONFIG') + SPK_DRIVER_MEM = conf.get('spark','SPK_DRIVER_MEM') + SPK_EXEC = conf.get('spark','SPK_EXEC') + SPK_EXEC_CORES = conf.get('spark','SPK_EXEC_CORES') + SPK_EXEC_MEM = conf.get('spark','SPK_EXEC_MEM') + SPK_EXEC_MEM_OVERHEAD = conf.get('spark', 'SPK_EXEC_MEM_OVERHEAD') + SPK_DRIVER_MAX_RESULTS = conf.get('spark', 'SPK_DRIVER_MAX_RESULTS') + SPK_DRIVER_MEM_OVERHEAD = conf.get('spark', 'SPK_DRIVER_MEM_OVERHEAD') + LDAPATH = conf.get('DEFAULT','LDAPATH') + LUSER = conf.get('DEFAULT','LUSER') + MPI_CMD = conf.get('mpi','MPI_CMD') + PROCESS_COUNT = conf.get('mpi','PROCESS_COUNT') + TOPIC_COUNT = conf.get('DEFAULT','TOPIC_COUNT') + + if tol: + TOL = tol + else: + TOL = conf.get('DEFAULT','TOL') + + #prepare options for spark-submit + spark_cmd = [ + "time", "spark-submit", "--class org.apache.spot.SuspiciousConnects", + "--master yarn-client", "--conf spark.driver.maxPermSize=512m", "--conf spark.driver.cores=1", + "--conf spark.dynamicAllocation.enabled=true", "--conf spark.dynamicAllocation.minExecutors=1", + "--conf spark.executor.extraJavaOptions=-XX:MaxPermSize=512M -XX:PermSize=512M", + "--conf spark.shuffle.io.preferDirectBufs=false", "--conf spark.kryoserializer.buffer.max=512m", + "--conf spark.shuffle.service.enabled=true", "--conf spark.yarn.am.waitTime=1000000"] + + spark_extras = [ + "--driver-memory " + SPK_DRIVER_MEM, + "--conf spark.dynamicAllocation.maxExecutors=" + SPK_EXEC, + "--conf spark.executor.cores=" + SPK_EXEC_CORES, + "--conf spark.executor.memory=" + SPK_EXEC_MEM, + "--conf spark.driver.maxResultSize=" + SPK_DRIVER_MAX_RESULTS, + "--conf spark.yarn.driver.memoryOverhead=" + SPK_DRIVER_MEM_OVERHEAD, + "--conf spark.yarn.executor.memoryOverhead=" + SPK_EXEC_MEM_OVERHEAD] + + if SPK_CONFIG: + logger.info('Adding Spark Configurations from spot.conf') + spark_cmd.extend(spark_extras) + + spot_jar = [ + "target/scala-2.10/spot-ml-assembly-1.1.jar", + "--analysis " + args.type, + "--input " + RAWDATA_PATH, "--dupfactor " + DUPFACTOR, + "--feedback " + FEEDBACK_PATH, + "--model " + LPATH + "/model.dat", + "--topicdoc " + LPATH + "/final.gamma", + "--topicword " + LPATH + "/final.beta", + "--lpath " + LPATH, "--ldapath" + LDAPATH, + "--luser " + LUSER, "--mpicmd " + MPI_CMD, + "--proccount " + PROCESS_COUNT, + "--topiccount " + TOPIC_COUNT, + "--nodes " + nodes_csl, + "--scored " + HDFS_SCORED_CONNECTS, + "--threshold " + TOL, + "--maxresults " + MAXRESULTS] + + spark_cmd.extend(spot_jar) + + execute_cmd(spark_cmd,logger) + #process = subprocess.Popen(spark_cmd, stdout=subprocess.PIPE, stderr=None) + + ## move results to hdfs. + os.chdir(LPATH) + cmd = "hadoop fs -getmerge {0}/part-* {1}_results.csv && hadoop fs -moveFromLocal {1}_results.csv {0}/${1}_results.csv".format(HDFS_SCORED_CONNECTS,args.type) + execute_cmd(cmd,logger) + +def execute_cmd(command,logger): + + try: + logger.info("Executing: {0}".format(command)) + subprocess.call(command,shell=True) + + except subprocess.CalledProcessError as e: + logger.error("There was an error executing: {0}".format(e.cmd)) + sys.exit(1) + +def validate_parameter(parameter,message,logger): + if parameter == None or parameter == "": + logger.error(message) + sys.exit(1) + +def get_logger(logger_name,create_file=False): + + # create logger for prd_ci + log = logging.getLogger(logger_name) + log.setLevel(level=logging.INFO) + + # create formatter and add it to the handlers + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + if create_file: + # create file handler for logger. + fh = logging.FileHandler('SPOT.log') + fh.setLevel(level=logging.DEBUG) + fh.setFormatter(formatter) + # reate console handler for logger. + ch = logging.StreamHandler() + ch.setLevel(level=logging.DEBUG) + ch.setFormatter(formatter) + + # add handlers to logger. + if create_file: + log.addHandler(fh) + + log.addHandler(ch) + return log + + + +if __name__=='__main__': + main() \ No newline at end of file diff --git a/spot-setup/spot.conf b/spot-setup/spot.conf index 237b2e6..50c333a 100755 --- a/spot-setup/spot.conf +++ b/spot-setup/spot.conf @@ -7,24 +7,28 @@ HUSER=/user/spot DSOURCES=flow dns proxy DFOLDERS=binary csv hive stage -##removing FDATE for now********* -HPATH=%(HUSER)s/%(TYPE)s/scored_results/ +HPATH=%(HUSER)s/%(TYPE)s/scored_results/%(FDATE)s #local fs LUSER=/home/spot LPATH=%(LUSER)s/ml/%(TYPE)s/%(FDATE)s RPATH=%(LUSER)s/ipython/user/%(FDATE)s LDAPATH=%(LUSER)s/ml/oni-lda-c LIPATH=%(LUSER)s/ingest -#Spot architecture -#list where each Spot component will run from +#Spot Components UINODE=node03 MLNODE=node04 GWNODE=node16 -MAXRESULTS=3000 -TOL=1e-6 +#list of Worker nodes, excluding MLNODE NODES= node-01 node-02 + node-03 + +#Default values affecting Machine Learning Results +MAXRESULTS=3000 +TOL=1e-6 +DUPFACTOR=1000 +TOPIC_COUNT=20 [database] IMPALA_DEM=node04 @@ -38,15 +42,20 @@ KEYTABPATH= KRB_USER= +#Set SPK_CONFIG to True to use "Optional Values" in spark-submit command +#for more information see ./spot-ml/SPARKCONF.md + [spark] -SPK_CONFIG=False +#default options SPK_EXEC=400 SPK_EXEC_MEM=2048m -SPK_DRIVER_MEM=None -SPK_DRIVER_MAX_RESULTS=None -SPK_EXEC_CORES=None -SPK_DRIVER_MEM_OVERHEAD=None -SPK_EXEC_MEM_OVERHEAD=None +#Optional Values +SPK_CONFIG= +SPK_DRIVER_MEM= +SPK_DRIVER_MAX_RESULTS= +SPK_EXEC_CORES= +SPK_DRIVER_MEM_OVERHEAD= +SPK_EXEC_MEM_OVERHEAD= [mpi] #command to run MPI From 2e8b8fce6adf7c632f9df78e1a9a9904d80aa746 Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Wed, 16 Nov 2016 15:01:44 -0800 Subject: [PATCH 09/22] switched ingest to spot.conf and ConfigParser --- spot-ingest/ingest_conf.json | 33 ----------------------- spot-ingest/master_collector.py | 46 +++++++++++++++++---------------- spot-ingest/worker.py | 37 ++++++++++++++------------ 3 files changed, 44 insertions(+), 72 deletions(-) delete mode 100644 spot-ingest/ingest_conf.json diff --git a/spot-ingest/ingest_conf.json b/spot-ingest/ingest_conf.json deleted file mode 100644 index 3abb43a..0000000 --- a/spot-ingest/ingest_conf.json +++ /dev/null @@ -1,33 +0,0 @@ -{ - "dbname" : "database name", - "hdfs_app_path" : "hdfs application path", - "kafka":{ - "kafka_server":"kafka ip", - "kafka_port":"kafka port", - "zookeper_server":"zk ip", - "zookeper_port":"zk port", - "message_size":999999 - }, - "pipelines":{ - "flow":{ - "type":"flow", - "collector_path":"/path_to_flow_collector", - "local_staging":"/tmp/", - "process_opt":"" - }, - "dns":{ - "type":"dns", - "collector_path":"/path_to_dns_collector", - "local_staging":"/tmp/", - "pkt_num":"650000", - "pcap_split_staging":"/tmp", - "process_opt":"-E separator=, -E header=y -E occurrence=f -T fields -e frame.time -e frame.time_epoch -e frame.len -e ip.src -e ip.dst -e dns.resp.name -e dns.resp.type -e dns.resp.class -e dns.flags.rcode -e dns.a 'dns.flags.response == 1'" - }, - "proxy":{ - "type":"proxy", - "collector_path":"/path_to_proxy_collecor", - "supported_files":["log"], - "parser":"bro_parser.py" - } - } -} diff --git a/spot-ingest/master_collector.py b/spot-ingest/master_collector.py index 07c5925..48349af 100755 --- a/spot-ingest/master_collector.py +++ b/spot-ingest/master_collector.py @@ -1,18 +1,17 @@ #!/bin/env python import argparse -import os -import json import sys from common.utils import Util from common.kerberos import Kerberos from common.kafka_client import KafkaTopic -import datetime +import datetime +import ConfigParser -# get master configuration. -script_path = os.path.dirname(os.path.abspath(__file__)) -conf_file = "{0}/ingest_conf.json".format(script_path) -master_conf = json.loads(open (conf_file).read()) +# initialize ConfigParser +conf_file = '/etc/spot.conf' +master_conf = ConfigParser.SafeConfigParser() +master_conf.read(conf_file) def main(): @@ -30,43 +29,46 @@ def start_collector(type,workers_num,id=None): # generate ingest id ingest_id = str(datetime.datetime.time(datetime.datetime.now())).replace(":","_").replace(".","_") - + # create logger. logger = Util.get_logger("SPOT.INGEST") # validate the given configuration exists in ingest_conf.json. - if not type in master_conf["pipelines"]: - logger.error("'{0}' type is not a valid configuration.".format(type)); + try: + master_conf.get(type, 'type') + except ConfigParser.NoSectionError: + logger.error("'{0}' type is not a valid configuration.".format(type)) sys.exit(1) # validate the type is a valid module. - if not Util.validate_data_source(master_conf["pipelines"][type]["type"]): - logger.error("'{0}' type is not configured. Please check you ingest conf file".format(master_conf["pipelines"][type]["type"])); + if not Util.validate_data_source(master_conf.get(type, "type")): + logger.error("{0} type is not configured. Please check /etc/spot.conf".format(type)) sys.exit(1) - + # validate if kerberos authentication is required. - if os.getenv('KRB_AUTH'): + if master_conf.get('kerberos', 'KRB_AUTH'): kb = Kerberos() kb.authenticate() - + # kafka server info. logger.info("Initializing kafka instance") - k_server = master_conf["kafka"]['kafka_server'] - k_port = master_conf["kafka"]['kafka_port'] + k_server = master_conf.get('ingest', 'kafka_server') + k_port = master_conf.get('ingest', 'kafka_port') # required zookeeper info. - zk_server = master_conf["kafka"]['zookeper_server'] - zk_port = master_conf["kafka"]['zookeper_port'] - + zk_server = master_conf.get('ingest', 'zookeeper_server') + zk_port = master_conf.get('ingest', 'zookeeper_port') + topic = "SPOT-INGEST-{0}_{1}".format(type,ingest_id) if not id else id kafka = KafkaTopic(topic,k_server,k_port,zk_server,zk_port,workers_num) # create a collector instance based on data source type. logger.info("Starting {0} ingest instance".format(topic)) - module = __import__("pipelines.{0}.collector".format(master_conf["pipelines"][type]["type"]),fromlist=['Collector']) + module = __import__("pipelines.{0}.collector".format(master_conf.get(type, 'type')),fromlist=['Collector']) # start collector. - ingest_collector = module.Collector(master_conf['hdfs_app_path'],kafka,type) + hdfs_app_path = master_conf.get('ingest', 'hdfs_app_path') + ingest_collector = module.Collector(hdfs_app_path,kafka,type) ingest_collector.start() if __name__=='__main__': diff --git a/spot-ingest/worker.py b/spot-ingest/worker.py index a01559e..4dc0128 100755 --- a/spot-ingest/worker.py +++ b/spot-ingest/worker.py @@ -9,9 +9,9 @@ from common.kerberos import Kerberos from common.kafka_client import KafkaConsumer -script_path = os.path.dirname(os.path.abspath(__file__)) -conf_file = "{0}/ingest_conf.json".format(script_path) -worker_conf = json.loads(open (conf_file).read()) +conf_file = '/etc/spot.conf' +worker_conf = ConfigParser.SafeConfigParser() +worker_conf.read(conf_file) def main(): @@ -31,39 +31,42 @@ def start_worker(type,topic,id,processes=None): logger = Util.get_logger("SPOT.INGEST.WORKER") - # validate the given configuration exists in ingest_conf.json. - if not type in worker_conf["pipelines"]: - logger.error("'{0}' type is not a valid configuration.".format(type)); + # validate the given configuration exists in spot.conf + try: + master_conf.get(type, 'type' + except ConfigParser.NoSectionError: + logger.error("'{0}' type is not a valid configuration.".format(type)) sys.exit(1) # validate the type is a valid module. - if not Util.validate_data_source(worker_conf["pipelines"][type]["type"]): - logger.error("The provided data source {0} is not valid".format(type));sys.exit(1) + if not Util.validate_data_source(master_conf.get(type, "type"): + logger.error("'{0}' type is not configured. Please check /etc/spot.conf".format(type); + sys.exit(1) - # validate if kerberos authentication is requiered. - if os.getenv('KRB_AUTH'): + # validate if kerberos authentication is required. + if master_conf.get('kerberos', 'KRB_AUTH'): kb = Kerberos() kb.authenticate() # create a worker instance based on the data source type. - module = __import__("pipelines.{0}.worker".format(worker_conf["pipelines"][type]["type"]),fromlist=['Worker']) + module = __import__("pipelines.{0}.worker".format(worker_conf(type, 'type')),fromlist=['Worker']) # kafka server info. logger.info("Initializing kafka instance") - k_server = worker_conf["kafka"]['kafka_server'] - k_port = worker_conf["kafka"]['kafka_port'] + k_server = worker_conf.get('ingest', 'kafka_server') + k_port = worker_conf.get('ingest', 'kafka_port') # required zookeeper info. - zk_server = worker_conf["kafka"]['zookeper_server'] - zk_port = worker_conf["kafka"]['zookeper_port'] + zk_server = worker_conf.get('ingest', 'zookeper_server') + zk_port = worker_conf('ingest','zookeper_port') topic = topic # create kafka consumer. kafka_consumer = KafkaConsumer(topic,k_server,k_port,zk_server,zk_port,id) # start worker. - db_name = worker_conf['dbname'] - app_path = worker_conf['hdfs_app_path'] + db_name = worker_conf.get('database','DBNAME'] + app_path = worker_conf.get('ingest', 'hdfs_app_path') ingest_worker = module.Worker(db_name,app_path,kafka_consumer,type,processes) ingest_worker.start() From 7ba5e24a3196d58e5e0c0daeecb5ab7b1437357b Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Wed, 16 Nov 2016 15:37:47 -0800 Subject: [PATCH 10/22] change OA utils.py to standard ConfigParser, remove .replace --- spot-ml/ml_ops.py | 90 ++++++++-------- spot-oa/oa/components/data/impala.py | 6 +- spot-oa/oa/dns/dns_oa.py | 88 +++++++-------- spot-oa/oa/flow/flow_oa.py | 156 +++++++++++++-------------- spot-oa/oa/proxy/proxy_oa.py | 6 +- spot-oa/oa/utils.py | 56 +++++----- 6 files changed, 201 insertions(+), 201 deletions(-) diff --git a/spot-ml/ml_ops.py b/spot-ml/ml_ops.py index d4a1828..2f9f690 100755 --- a/spot-ml/ml_ops.py +++ b/spot-ml/ml_ops.py @@ -7,18 +7,18 @@ def main(): #initialize ConfigParser - conf_file = '../spot-setup/spot.conf' + conf_file = '/etc/spot.conf' conf = ConfigParser.SafeConfigParser() spot_conf = conf.read(conf_file) #initialize logging logger = get_logger('SPOT.ML.OPS',create_file=False) - + #check for file if len(spot_conf) < 1: logger.info("Failed to open /etc/spot.conf, check file location and try again") raise SystemExit - + ## parse and validate arguments tol = None @@ -29,11 +29,11 @@ def main(): parser.add_argument('-T','--tol',dest='tol',required=False,help='If present will override default TOL from conf file',metavar='') parser.add_argument('-m','--maxResults',dest='MAXRESULTS',required=False,help='If defined sets max results returned',metavar='') args = parser.parse_args() - + YR=args.fdate[0:4] MH=args.fdate[4:6] DY=args.fdate[6:8] - + #getting defaults for ConfigParser interpolation DEFAULTS = vars(args) DEFAULTS.update({'YR':YR,'MH':MH,'DY':DY}) @@ -54,37 +54,37 @@ def main(): LOCAL_DOCRESULTS = "{0}/doc_results.csv".format(LPATH) HDFS_WORDRESULTS = "{0}/word_results.csv".format(HPATH) LOCAL_WORDRESULTS = "{0}/word_results.csv".format(LPATH) - + HDFS_SCORED_CONNECTS = "{0}/scores".format(HPATH) - - LDA_OUTPUT_DIR = "{0}/{1}".format(args.type,args.fdate) - + + LDA_OUTPUT_DIR = "{1}/{1}".format(args.type,args.fdate) + #get nodes and create comma seperated list NODES = conf.get('DEFAULT','NODES').split() nodes_csl = ','.join(NODES) - + cmd = "hdfs dfs -rm -R -f {0}".format(HDFS_WORDCOUNTS) execute_cmd(cmd,logger) cmd = "mkdir -p {0}".format(LPATH) execute_cmd(cmd,logger) - + # protect the flow_scores.csv file cmd = "rm -f {0}/*.{dat,beta,gamma,other,pkl}".format(LPATH) execute_cmd(cmd,logger) cmd = "hdfs dfs -rm -R -f {0}".format(HDFS_SCORED_CONNECTS) execute_cmd(cmd,logger) - + # Add -p to execute pre MPI command. # Pre MPI command can be configured in /etc/spot.conf # In this script, after the line after --mpicmd ${MPI_CMD} add: # --mpiprep ${MPI_PREP_CMD} - + if conf.get('mpi',"MPI_PREP_CMD"): cmd = conf.get('mpi',"MPI_PREP_CMD") execute_cmd(cmd,logger) - + SPK_CONFIG = conf.get('spark','SPK_CONFIG') SPK_DRIVER_MEM = conf.get('spark','SPK_DRIVER_MEM') SPK_EXEC = conf.get('spark','SPK_EXEC') @@ -98,49 +98,49 @@ def main(): MPI_CMD = conf.get('mpi','MPI_CMD') PROCESS_COUNT = conf.get('mpi','PROCESS_COUNT') TOPIC_COUNT = conf.get('DEFAULT','TOPIC_COUNT') - + if tol: TOL = tol else: TOL = conf.get('DEFAULT','TOL') - + #prepare options for spark-submit spark_cmd = [ "time", "spark-submit", "--class org.apache.spot.SuspiciousConnects", - "--master yarn-client", "--conf spark.driver.maxPermSize=512m", "--conf spark.driver.cores=1", - "--conf spark.dynamicAllocation.enabled=true", "--conf spark.dynamicAllocation.minExecutors=1", - "--conf spark.executor.extraJavaOptions=-XX:MaxPermSize=512M -XX:PermSize=512M", - "--conf spark.shuffle.io.preferDirectBufs=false", "--conf spark.kryoserializer.buffer.max=512m", + "--master yarn-client", "--conf spark.driver.maxPermSize=512m", "--conf spark.driver.cores=1", + "--conf spark.dynamicAllocation.enabled=true", "--conf spark.dynamicAllocation.minExecutors=1", + "--conf spark.executor.extraJavaOptions=-XX:MaxPermSize=512M -XX:PermSize=512M", + "--conf spark.shuffle.io.preferDirectBufs=false", "--conf spark.kryoserializer.buffer.max=512m", "--conf spark.shuffle.service.enabled=true", "--conf spark.yarn.am.waitTime=1000000"] - + spark_extras = [ - "--driver-memory " + SPK_DRIVER_MEM, - "--conf spark.dynamicAllocation.maxExecutors=" + SPK_EXEC, - "--conf spark.executor.cores=" + SPK_EXEC_CORES, - "--conf spark.executor.memory=" + SPK_EXEC_MEM, - "--conf spark.driver.maxResultSize=" + SPK_DRIVER_MAX_RESULTS, - "--conf spark.yarn.driver.memoryOverhead=" + SPK_DRIVER_MEM_OVERHEAD, + "--driver-memory " + SPK_DRIVER_MEM, + "--conf spark.dynamicAllocation.maxExecutors=" + SPK_EXEC, + "--conf spark.executor.cores=" + SPK_EXEC_CORES, + "--conf spark.executor.memory=" + SPK_EXEC_MEM, + "--conf spark.driver.maxResultSize=" + SPK_DRIVER_MAX_RESULTS, + "--conf spark.yarn.driver.memoryOverhead=" + SPK_DRIVER_MEM_OVERHEAD, "--conf spark.yarn.executor.memoryOverhead=" + SPK_EXEC_MEM_OVERHEAD] - + if SPK_CONFIG: logger.info('Adding Spark Configurations from spot.conf') spark_cmd.extend(spark_extras) - spot_jar = [ - "target/scala-2.10/spot-ml-assembly-1.1.jar", + spot_jar = [ + "target/scala-2.10/spot-ml-assembly-1.1.jar", "--analysis " + args.type, - "--input " + RAWDATA_PATH, "--dupfactor " + DUPFACTOR, - "--feedback " + FEEDBACK_PATH, - "--model " + LPATH + "/model.dat", - "--topicdoc " + LPATH + "/final.gamma", - "--topicword " + LPATH + "/final.beta", - "--lpath " + LPATH, "--ldapath" + LDAPATH, - "--luser " + LUSER, "--mpicmd " + MPI_CMD, - "--proccount " + PROCESS_COUNT, - "--topiccount " + TOPIC_COUNT, - "--nodes " + nodes_csl, - "--scored " + HDFS_SCORED_CONNECTS, - "--threshold " + TOL, + "--input " + RAWDATA_PATH, "--dupfactor " + DUPFACTOR, + "--feedback " + FEEDBACK_PATH, + "--model " + LPATH + "/model.dat", + "--topicdoc " + LPATH + "/final.gamma", + "--topicword " + LPATH + "/final.beta", + "--lpath " + LPATH, "--ldapath" + LDAPATH, + "--luser " + LUSER, "--mpicmd " + MPI_CMD, + "--proccount " + PROCESS_COUNT, + "--topiccount " + TOPIC_COUNT, + "--nodes " + nodes_csl, + "--scored " + HDFS_SCORED_CONNECTS, + "--threshold " + TOL, "--maxresults " + MAXRESULTS] spark_cmd.extend(spot_jar) @@ -158,7 +158,7 @@ def execute_cmd(command,logger): try: logger.info("Executing: {0}".format(command)) subprocess.call(command,shell=True) - + except subprocess.CalledProcessError as e: logger.error("There was an error executing: {0}".format(e.cmd)) sys.exit(1) @@ -167,7 +167,7 @@ def validate_parameter(parameter,message,logger): if parameter == None or parameter == "": logger.error(message) sys.exit(1) - + def get_logger(logger_name,create_file=False): # create logger for prd_ci @@ -197,4 +197,4 @@ def get_logger(logger_name,create_file=False): if __name__=='__main__': - main() \ No newline at end of file + main() diff --git a/spot-oa/oa/components/data/impala.py b/spot-oa/oa/components/data/impala.py index 5b6c8c1..5cdc378 100644 --- a/spot-oa/oa/components/data/impala.py +++ b/spot-oa/oa/components/data/impala.py @@ -3,13 +3,13 @@ class Engine(object): def __init__(self,db,conf, pipeline): - - self._daemon_node = conf['impala_daemon'] + + self._daemon_node = conf.get('database', 'IMPALA_DEM') self._db = db self._pipeline = pipeline impala_cmd = "impala-shell -i {0} --quiet -q 'INVALIDATE METADATA {1}.{2}'".format(self._daemon_node,self._db, self._pipeline) check_output(impala_cmd,shell=True) - + impala_cmd = "impala-shell -i {0} --quiet -q 'REFRESH {1}.{2}'".format(self._daemon_node,self._db, self._pipeline) check_output(impala_cmd,shell=True) diff --git a/spot-oa/oa/dns/dns_oa.py b/spot-oa/oa/dns/dns_oa.py index 884d46c..844fec0 100644 --- a/spot-oa/oa/dns/dns_oa.py +++ b/spot-oa/oa/dns/dns_oa.py @@ -12,19 +12,19 @@ from utils import Util from components.data.data import Data from components.iana.iana_transform import IanaTransform -from components.nc.network_context import NetworkContext +from components.nc.network_context import NetworkContext from multiprocessing import Process import time class OA(object): - + def __init__(self,date,limit=500,logger=None): self._initialize_members(date,limit,logger) def _initialize_members(self,date,limit,logger): - + # get logger if exists. if not, create new instance. self._logger = logging.getLogger('OA.DNS') if logger else Util.get_logger('OA.DNS',create_file=False) @@ -50,8 +50,8 @@ def _initialize_members(self,date,limit,logger): self._conf = json.loads(open (conf_file).read(),object_pairs_hook=OrderedDict) # initialize data engine - self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", "").replace('"', '') - self._engine = Data(self._db,self._table_name ,self._logger) + self._db = self._spot_conf.get('database', 'DBNAME') + self._engine = Data(self._db,self._table_name ,self._logger) def start(self): @@ -79,9 +79,9 @@ def start(self): def _create_folder_structure(self): # create date folder structure if it does not exist. - self._logger.info("Creating folder structure for OA (data and ipynb)") + self._logger.info("Creating folder structure for OA (data and ipynb)") self._data_path,self._ingest_summary_path,self._ipynb_path = Util.create_oa_folders("dns",self._date) - + def _add_ipynb(self): if os.path.isdir(self._ipynb_path): @@ -102,7 +102,7 @@ def _get_dns_results(self): dns_results = "{0}/dns_results.csv".format(self._data_path) # get hdfs path from conf file. - HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", "").replace('"', '') + HUSER = self._spot_conf.get('DEFAULT', 'HUSER') hdfs_path = "{0}/dns/scored_results/{1}/scores/dns_results.csv".format(HUSER,self._date) # get results file from hdfs. @@ -121,70 +121,70 @@ def _get_dns_results(self): self._logger.error("There was an error getting ML results from HDFS") sys.exit(1) - # add headers. + # add headers. self._logger.info("Adding headers") self._dns_scores_headers = [ str(key) for (key,value) in self._conf['dns_score_fields'].items() ] # add dns content. - self._dns_scores = [ conn[:] for conn in self._dns_results][:] + self._dns_scores = [ conn[:] for conn in self._dns_results][:] def _move_time_stamp(self,dns_data): - + for dns in dns_data: time_stamp = dns[1] dns.remove(time_stamp) dns.append(time_stamp) - - return dns_data + + return dns_data def _create_dns_scores_csv(self): - + dns_scores_csv = "{0}/dns_scores.csv".format(self._data_path) dns_scores_final = self._move_time_stamp(self._dns_scores) dns_scores_final.insert(0,self._dns_scores_headers) - Util.create_csv_file(dns_scores_csv,dns_scores_final) + Util.create_csv_file(dns_scores_csv,dns_scores_final) # create bk file dns_scores_bu_csv = "{0}/dns_scores_bu.csv".format(self._data_path) - Util.create_csv_file(dns_scores_bu_csv,dns_scores_final) + Util.create_csv_file(dns_scores_bu_csv,dns_scores_final) def _add_tld_column(self): qry_name_col = self._conf['dns_results_fields']['dns_qry_name'] - self._dns_scores = [conn + [ get_tld("http://" + str(conn[qry_name_col]), fail_silently=True) if "http://" not in str(conn[qry_name_col]) else get_tld(str(conn[qry_name_col]), fail_silently=True)] for conn in self._dns_scores ] - + self._dns_scores = [conn + [ get_tld("http://" + str(conn[qry_name_col]), fail_silently=True) if "http://" not in str(conn[qry_name_col]) else get_tld(str(conn[qry_name_col]), fail_silently=True)] for conn in self._dns_scores ] + def _add_reputation(self): # read configuration. reputation_conf_file = "{0}/components/reputation/reputation_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) self._logger.info("Reading reputation configuration file: {0}".format(reputation_conf_file)) rep_conf = json.loads(open(reputation_conf_file).read()) - + # initialize reputation services. self._rep_services = [] self._logger.info("Initializing reputation services.") - for service in rep_conf: + for service in rep_conf: config = rep_conf[service] module = __import__("components.reputation.{0}.{0}".format(service), fromlist=['Reputation']) self._rep_services.append(module.Reputation(config,self._logger)) - + # get columns for reputation. rep_cols = {} - indexes = [ int(value) for key, value in self._conf["add_reputation"].items()] + indexes = [ int(value) for key, value in self._conf["add_reputation"].items()] self._logger.info("Getting columns to add reputation based on config file: dns_conf.json".format()) for index in indexes: col_list = [] for conn in self._dns_scores: - col_list.append(conn[index]) + col_list.append(conn[index]) rep_cols[index] = list(set(col_list)) # get reputation per column. - self._logger.info("Getting reputation for each service in config") + self._logger.info("Getting reputation for each service in config") rep_services_results = [] for key,value in rep_cols.items(): rep_services_results = [ rep_service.check(None,value) for rep_service in self._rep_services] - rep_results = {} - for result in rep_services_results: + rep_results = {} + for result in rep_services_results: rep_results = {k: "{0}::{1}".format(rep_results.get(k, ""), result.get(k, "")).strip('::') for k in set(rep_results) | set(result)} self._dns_scores = [ conn + [ rep_results[conn[key]] ] for conn in self._dns_scores ] @@ -209,9 +209,9 @@ def _add_iana(self): dns_qry_type_index = self._conf["dns_results_fields"]["dns_qry_type"] dns_qry_rcode_index = self._conf["dns_results_fields"]["dns_qry_rcode"] self._dns_scores = [ conn + [ dns_iana.get_name(conn[dns_qry_class_index],"dns_qry_class")] + [dns_iana.get_name(conn[dns_qry_type_index],"dns_qry_type")] + [ dns_iana.get_name(conn[dns_qry_rcode_index],"dns_qry_rcode") ] for conn in self._dns_scores ] - - else: - self._dns_scores = [ conn + ["","",""] for conn in self._dns_scores ] + + else: + self._dns_scores = [ conn + ["","",""] for conn in self._dns_scores ] def _add_network_context(self): @@ -226,13 +226,13 @@ def _add_network_context(self): def _get_oa_details(self): - - self._logger.info("Getting OA DNS suspicious details/chord diagram") + + self._logger.info("Getting OA DNS suspicious details/chord diagram") # start suspicious connects details process. p_sp = Process(target=self._get_suspicious_details) - p_sp.start() + p_sp.start() - # start chord diagram process. + # start chord diagram process. p_dn = Process(target=self._get_dns_dendrogram) p_dn.start() @@ -245,7 +245,7 @@ def _get_suspicious_details(self): if os.path.isfile(iana_conf_file): iana_config = json.loads(open(iana_conf_file).read()) dns_iana = IanaTransform(iana_config["IANA"]) - + for conn in self._dns_scores: # get data to query date=conn[self._conf["dns_score_fields"]["frame_time"]].split(" ") @@ -254,24 +254,24 @@ def _get_suspicious_details(self): if len(date) == 5: year=date[2] month=datetime.datetime.strptime(date[0], '%b').strftime('%m') - day=date[1] + day=date[1] hh=conn[self._conf["dns_score_fields"]["hh"]] dns_qry_name = conn[self._conf["dns_score_fields"]["dns_qry_name"]] self._get_dns_details(dns_qry_name,year,month,day,hh,dns_iana) def _get_dns_details(self,dns_qry_name,year,month,day,hh,dns_iana): - + limit = self._details_limit edge_file ="{0}/edge-{1}_{2}_00.csv".format(self._data_path,dns_qry_name.replace("/","-"),hh) edge_tmp ="{0}/edge-{1}_{2}_00.tmp".format(self._data_path,dns_qry_name.replace("/","-"),hh) if not os.path.isfile(edge_file): - + dns_qry = ("SELECT frame_time,frame_len,ip_dst,ip_src,dns_qry_name,dns_qry_class,dns_qry_type,dns_qry_rcode,dns_a FROM {0}.{1} WHERE y={2} AND m={3} AND d={4} AND dns_qry_name LIKE '%{5}%' AND h={6} LIMIT {7};").format(self._db,self._table_name,year,month,day,dns_qry_name,hh,limit) - + # execute query self._engine.query(dns_qry,edge_tmp) - + # add IANA to results. if dns_iana: update_rows = [] @@ -296,18 +296,18 @@ def _get_dns_details(self,dns_qry_name,year,month,day,hh,dns_iana): writer = csv.writer(dns_details_edge, quoting=csv.QUOTE_ALL) if update_rows: writer.writerows(update_rows) - else: - shutil.copy(edge_tmp,edge_file) - + else: + shutil.copy(edge_tmp,edge_file) + try: os.remove(edge_tmp) except OSError: pass - + def _get_dns_dendrogram(self): limit = self._details_limit - for conn in self._dns_scores: + for conn in self._dns_scores: date=conn[self._conf["dns_score_fields"]["frame_time"]].split(" ") date = filter(None,date) diff --git a/spot-oa/oa/flow/flow_oa.py b/spot-oa/oa/flow/flow_oa.py index 8d198d6..652f6eb 100644 --- a/spot-oa/oa/flow/flow_oa.py +++ b/spot-oa/oa/flow/flow_oa.py @@ -20,12 +20,12 @@ class OA(object): - def __init__(self,date,limit=500,logger=None): - + def __init__(self,date,limit=500,logger=None): + self._initialize_members(date,limit,logger) - + def _initialize_members(self,date,limit,logger): - + # get logger if exists. if not, create new instance. self._logger = logging.getLogger('OA.Flow') if logger else Util.get_logger('OA.Flow',create_file=False) @@ -46,24 +46,24 @@ def _initialize_members(self,date,limit,logger): # get scores fields conf conf_file = "{0}/flow_conf.json".format(self._scrtip_path) - self._conf = json.loads(open (conf_file).read(),object_pairs_hook=OrderedDict) - + self._conf = json.loads(open (conf_file).read(),object_pairs_hook=OrderedDict) + # initialize data engine - self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", "").replace('"', '') + self._db = self._spot_conf.get('DEFAULT', 'DBNAME') self._engine = Data(self._db, self._table_name,self._logger) - - def start(self): - + + def start(self): + #################### start = time.time() #################### self._create_folder_structure() - self._add_ipynb() + self._add_ipynb() self._get_flow_results() self._add_network_context() self._add_geo_localization() - self._add_reputation() + self._add_reputation() self._create_flow_scores_csv() self._get_oa_details() @@ -71,14 +71,14 @@ def start(self): end = time.time() print(end - start) ################## - + def _create_folder_structure(self): # create date folder structure if it does not exist. - self._logger.info("Creating folder structure for OA (data and ipynb)") + self._logger.info("Creating folder structure for OA (data and ipynb)") self._data_path,self._ingest_summary_path,self._ipynb_path = Util.create_oa_folders("flow",self._date) - def _add_ipynb(self): + def _add_ipynb(self): if os.path.isdir(self._ipynb_path): @@ -90,16 +90,16 @@ def _add_ipynb(self): else: self._logger.error("There was a problem adding the IPython Notebooks, please check the directory exists.") - + def _get_flow_results(self): - + self._logger.info("Getting {0} Machine Learning Results from HDFS".format(self._date)) flow_results = "{0}/flow_results.csv".format(self._data_path) - # get hdfs path from conf file - HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", "").replace('"', '') + # get hdfs path from conf file + HUSER = self._spot_conf.get('DEFAULT', 'HUSER') hdfs_path = "{0}/flow/scored_results/{1}/scores/flow_results.csv".format(HUSER,self._date) - + # get results file from hdfs get_command = Util.get_ml_results_form_hdfs(hdfs_path,self._data_path) self._logger.info("{0}".format(get_command)) @@ -116,14 +116,14 @@ def _get_flow_results(self): self._logger.error("There was an error getting ML results from HDFS") sys.exit(1) - # add headers. + # add headers. self._logger.info("Adding headers based on configuration file: score_fields.json") self._flow_scores = [ [ str(key) for (key,value) in self._conf['flow_score_fields'].items()] ] # filter results add sev and rank. self._logger.info("Filtering required columns based on configuration") self._flow_scores.extend([ [0] + [ conn[i] for i in self._conf['column_indexes_filter'] ] + [n] for n, conn in enumerate(self._flow_results) ]) - + def _create_flow_scores_csv(self): flow_scores_csv = "{0}/flow_scores.csv".format(self._data_path) @@ -131,11 +131,11 @@ def _create_flow_scores_csv(self): # create bk file flow_scores_bu_csv = "{0}/flow_scores_bu.csv".format(self._data_path) - Util.create_csv_file(flow_scores_bu_csv,self._flow_scores) + Util.create_csv_file(flow_scores_bu_csv,self._flow_scores) def _add_network_context(self): - # use ipranges to see if the IPs are internals. + # use ipranges to see if the IPs are internals. ip_ranges_file = "{0}/context/ipranges.csv".format(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) # add new headers (srcIpInternal/destIpInternal). @@ -158,38 +158,38 @@ def _add_network_context(self): # get src and dst IPs src_ip_index = self._conf["flow_score_fields"]["srcIP"] - dst_ip_index = self._conf["flow_score_fields"]["dstIP"] - + dst_ip_index = self._conf["flow_score_fields"]["dstIP"] + # add networkcontext per connection. - ip_internal_ranges = filter(None,nc_ranges) + ip_internal_ranges = filter(None,nc_ranges) self._logger.info("Adding networkcontext to suspicious connections.") self._flow_scores = [ conn + [ self._is_ip_internal(conn[src_ip_index],ip_internal_ranges)]+[ self._is_ip_internal(conn[dst_ip_index],ip_internal_ranges)] for conn in flow_scores] - + else: - self._flow_scores = [ conn + ["",""] for conn in flow_scores ] + self._flow_scores = [ conn + ["",""] for conn in flow_scores ] self._logger.info("WARNING: Network context was not added because the file ipranges.csv does not exist.") - + self._flow_scores.insert(0,flow_headers) def _is_ip_internal(self,ip, ranges): result = 0 for row in ranges: - if Util.ip_to_int(ip) >= row[0] and Util.ip_to_int(ip) <= row[1]: + if Util.ip_to_int(ip) >= row[0] and Util.ip_to_int(ip) <= row[1]: result = 1 break return result - + def _add_geo_localization(self): - # use ipranges to see if the IPs are internals. + # use ipranges to see if the IPs are internals. iploc_file = "{0}/context/iploc.csv".format(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - # add new headers (srcIpInternal/destIpInternal). + # add new headers (srcIpInternal/destIpInternal). self._logger.info("Adding geo localization headers") flow_headers = self._flow_scores[0] - flow_headers.extend(["srcGeo","dstGeo","srcDomain","dstDomain"]) + flow_headers.extend(["srcGeo","dstGeo","srcDomain","dstDomain"]) # add values to srcIpInternal and destIpInternal. flow_scores = iter(self._flow_scores) @@ -199,9 +199,9 @@ def _add_geo_localization(self): self._logger.info("Initializing geo localization component") geo = GeoLocalization(iploc_file,self._logger) - + src_ip_index = self._conf["flow_score_fields"]["srcIP"] - dst_ip_index = self._conf["flow_score_fields"]["dstIP"] + dst_ip_index = self._conf["flow_score_fields"]["dstIP"] self._logger.info("Adding geo localization...") self._flow_scores = [] @@ -217,29 +217,29 @@ def _add_geo_localization(self): # adding columns to the current connection list. conn.extend([src_geo_dict["geo_loc"],dst_geo_dict["geo_loc"],src_geo_dict["domain"],dst_geo_dict["domain"]]) - self._flow_scores.extend([conn]) + self._flow_scores.extend([conn]) else: - self._flow_scores = [ conn + ["","","",""] for conn in flow_scores ] + self._flow_scores = [ conn + ["","","",""] for conn in flow_scores ] self._logger.info("WARNING: IP location was not added because the file {0} does not exist.".format(iploc_file)) - self._flow_scores.insert(0,flow_headers) + self._flow_scores.insert(0,flow_headers) def _add_reputation(self): - + reputation_conf_file = "{0}/components/reputation/reputation_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - + # add new headers (gtiSrcRep/gtiDstRep). self._logger.info("Adding reputation headers") flow_headers_rep = self._flow_scores[0] flow_headers_rep.extend(["srcIp_rep","dstIp_rep"]) - + # read configuration. self._logger.info("Reading reputation configuration file: {0}".format(reputation_conf_file)) rep_conf = json.loads(open(reputation_conf_file).read())["gti"] if os.path.isfile(rep_conf['refclient']): - + # initialize gti module. self._logger.info("Initializing GTI component") flow_gti = gti.Reputation(rep_conf,self._logger) @@ -253,33 +253,33 @@ def _add_reputation(self): next(flow_scores_src) # getting reputation for src IPs - src_ips = [ conn[src_ip_index] for conn in flow_scores_src ] + src_ips = [ conn[src_ip_index] for conn in flow_scores_src ] src_rep_results = flow_gti.check(src_ips) self._logger.info("Getting GTI reputation for dst IPs") flow_scores_dst = iter(self._flow_scores) next(flow_scores_dst) - # getting reputation for dst IPs + # getting reputation for dst IPs dst_ips = [ conn[dst_ip_index] for conn in flow_scores_dst ] dst_rep_results = flow_gti.check(dst_ips) - + flow_scores_final = iter(self._flow_scores) next(flow_scores_final) - + self._flow_scores = [] flow_scores = [conn + [src_rep_results[conn[src_ip_index]]] + [dst_rep_results[conn[dst_ip_index]]] for conn in flow_scores_final ] - self._flow_scores = flow_scores - + self._flow_scores = flow_scores + else: # add values to gtiSrcRep and gtiDstRep. flow_scores = iter(self._flow_scores) next(flow_scores) - self._flow_scores = [ conn + ["",""] for conn in flow_scores ] - self._logger.info("WARNING: IP reputation was not added. No refclient configured".format(reputation_conf_file)) - - self._flow_scores.insert(0,flow_headers_rep) + self._flow_scores = [ conn + ["",""] for conn in flow_scores ] + self._logger.info("WARNING: IP reputation was not added. No refclient configured".format(reputation_conf_file)) + + self._flow_scores.insert(0,flow_headers_rep) def _get_oa_details(self): @@ -295,27 +295,27 @@ def _get_oa_details(self): p_sp.join() p_ch.join() - + def _get_suspicious_details(self,bar=None): - + # skip header sp_connections = iter(self._flow_scores) next(sp_connections) - + # loop connections. - connections_added = [] + connections_added = [] for conn in sp_connections: - - # validate if the connection's details are not already extracted. + + # validate if the connection's details are not already extracted. if conn in connections_added: continue else: connections_added.append(conn) - + src_ip_index = self._conf["flow_score_fields"]["srcIP"] dst_ip_index = self._conf["flow_score_fields"]["dstIP"] - # get src ip + # get src ip sip = conn[src_ip_index] # get dst ip dip = conn[dst_ip_index] @@ -325,16 +325,16 @@ def _get_suspicious_details(self,bar=None): date_array_1 = date_array[0].split('-') date_array_2 = date_array[1].split(':') - yr = date_array_1[0] + yr = date_array_1[0] dy = date_array_1[2] mh = date_array_1[1] hr = date_array_2[0] mm = date_array_2[1] - + # connection details query. sp_query = ("SELECT treceived as tstart,sip as srcip,dip as dstip,sport as sport,dport as dport,proto as proto,flag as flags,stos as TOS,ibyt as bytes,ipkt as pkts,input as input, output as output,rip as rip from {0}.{1} where ((sip='{2}' AND dip='{3}') or (sip='{3}' AND dip='{2}')) AND y={8} AND m={4} AND d={5} AND h={6} AND trminute={7} order by tstart limit 100") - + # sp query. sp_query = sp_query.format(self._db,self._table_name,sip,dip,mh,dy,hr,mm,yr) @@ -343,15 +343,15 @@ def _get_suspicious_details(self,bar=None): # execute query self._engine.query(sp_query,output_file=edge_file,delimiter="\\t") - + def _get_chord_details(self,bar=None): # skip header sp_connections = iter(self._flow_scores) - next(sp_connections) + next(sp_connections) src_ip_index = self._conf["flow_score_fields"]["srcIP"] - dst_ip_index = self._conf["flow_score_fields"]["dstIP"] + dst_ip_index = self._conf["flow_score_fields"]["dstIP"] # get date parameters. yr = self._date[:4] @@ -361,28 +361,28 @@ def _get_chord_details(self,bar=None): # get number of times each IP appears. srcdict = {} for conn in sp_connections: - if conn[src_ip_index] in srcdict:srcdict[conn[src_ip_index]] += 1 + if conn[src_ip_index] in srcdict:srcdict[conn[src_ip_index]] += 1 else:srcdict[conn[src_ip_index]] = 1 if conn[dst_ip_index] in srcdict:srcdict[conn[dst_ip_index]] += 1 else:srcdict[conn[dst_ip_index]] = 1 - - for (ip,n) in srcdict.items(): + + for (ip,n) in srcdict.items(): if n > 1: - ip_list = [] + ip_list = [] sp_connections = iter(self._flow_scores) next(sp_connections) - for row in sp_connections: + for row in sp_connections: if ip == row[2] : ip_list.append(row[3]) - if ip == row[3] :ip_list.append(row[2]) + if ip == row[3] :ip_list.append(row[2]) ips = list(set(ip_list)) - + if len(ips) > 1: ips_filter = (",".join(str("'{0}'".format(ip)) for ip in ips)) - chord_file = "{0}/chord-{1}.tsv".format(self._data_path,ip.replace(".","_")) + chord_file = "{0}/chord-{1}.tsv".format(self._data_path,ip.replace(".","_")) ch_query = ("SELECT sip as srcip, dip as dstip, MAX(ibyt) as maxbyte, AVG(ibyt) as avgbyte, MAX(ipkt) as maxpkt, AVG(ipkt) as avgpkt from {0}.{1} where y={2} and m={3} and d={4} and ( (sip='{5}' and dip IN({6})) or (sip IN({6}) and dip='{5}') ) group by sip,dip") self._engine.query(ch_query.format(self._db,self._table_name,yr,mn,dy,ip,ips_filter),chord_file,delimiter="\\t") - - + + diff --git a/spot-oa/oa/proxy/proxy_oa.py b/spot-oa/oa/proxy/proxy_oa.py index d9b0362..7fbd189 100644 --- a/spot-oa/oa/proxy/proxy_oa.py +++ b/spot-oa/oa/proxy/proxy_oa.py @@ -50,7 +50,7 @@ def _initialize_members(self,date,limit,logger): self._conf = json.loads(open (conf_file).read(),object_pairs_hook=OrderedDict) # initialize data engine - self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", "").replace('"', '') + self._db = self._spot_conf.get('DEFAULT', 'DBNAME') self._engine = Data(self._db, self._table_name,self._logger) @@ -104,7 +104,7 @@ def _get_proxy_results(self): proxy_results = "{0}/proxy_results.csv".format(self._data_path) # get hdfs path from conf file. - HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", "").replace('"', '') + HUSER = self._spot_conf.get('DEFAULT', 'HUSER') hdfs_path = "{0}/proxy/scored_results/{1}/scores/proxy_results.csv".format(HUSER,self._date) # get results file from hdfs. @@ -276,7 +276,7 @@ def _get_proxy_details(self,fulluri,clientip,conn_hash,year,month,day,hh,proxy_i # due an issue with the output of the query. update_rows = [ [ w.replace('"','') for w in l ] for l in update_rows ] - + # create edge file. self._logger.info("Creating edge file:{0}".format(edge_file)) diff --git a/spot-oa/oa/utils.py b/spot-oa/oa/utils.py index 553315a..b430746 100644 --- a/spot-oa/oa/utils.py +++ b/spot-oa/oa/utils.py @@ -6,18 +6,18 @@ import ConfigParser class Util(object): - + @classmethod def get_logger(cls,logger_name,create_file=False): - + # create logger for prd_ci log = logging.getLogger(logger_name) log.setLevel(level=logging.INFO) - + # create formatter and add it to the handlers formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - + if create_file: # create file handler for logger. fh = logging.FileHandler('oa.log') @@ -37,15 +37,15 @@ def get_logger(cls,logger_name,create_file=False): @classmethod def get_spot_conf(cls): - + conf_file = "/etc/spot.conf" - config = ConfigParser.ConfigParser() - config.readfp(SecHead(open(conf_file))) + config = ConfigParser.SafeConfigParser() + config.read(conf_file) return config - + @classmethod - def create_oa_folders(cls,type,date): + def create_oa_folders(cls,type,date): # create date and ingest summary folder structure if they don't' exist. root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) @@ -59,9 +59,9 @@ def create_oa_folders(cls,type,date): # retun path to folders. data_path = data_type_folder.format(root_path,type,date) - ingest_path = data_type_folder.format(root_path,type,"ingest_summary") + ingest_path = data_type_folder.format(root_path,type,"ingest_summary") return data_path,ingest_path,ipynb_folder - + @classmethod def get_ml_results_form_hdfs(cls,hdfs_file_path,local_path): @@ -72,7 +72,7 @@ def get_ml_results_form_hdfs(cls,hdfs_file_path,local_path): @classmethod def read_results(cls,file,limit, delimiter=','): - + # read csv results. result_rows = [] with open(file, 'rb') as results_file: @@ -87,17 +87,17 @@ def read_results(cls,file,limit, delimiter=','): @classmethod def ip_to_int(self,ip): - + try: o = map(int, ip.split('.')) res = (16777216 * o[0]) + (65536 * o[1]) + (256 * o[2]) + o[3] - return res + return res except ValueError: return None - + @classmethod - def create_csv_file(cls,full_path_file,content,delimiter=','): + def create_csv_file(cls,full_path_file,content,delimiter=','): with open(full_path_file, 'w+') as u_file: writer = csv.writer(u_file, quoting=csv.QUOTE_NONE, delimiter=delimiter) @@ -110,11 +110,11 @@ def __init__(self, fp): def readline(self): if self.sechead: - try: + try: return self.sechead - finally: + finally: self.sechead = None - else: + else: return self.fp.readline() class ProgressBar(object): @@ -131,26 +131,26 @@ def __init__(self,total,prefix='',sufix='',decimals=2,barlength=60): def start(self): self._move_progress_bar(0) - + def update(self,iterator): - + self._move_progress_bar(iterator) def auto_update(self): - self._auto_iteration_status += 1 + self._auto_iteration_status += 1 self._move_progress_bar(self._auto_iteration_status) - + def _move_progress_bar(self,iteration): filledLength = int(round(self._bar_length * iteration / float(self._total))) percents = round(100.00 * (iteration / float(self._total)), self._decimals) - bar = '#' * filledLength + '-' * (self._bar_length - filledLength) - sys.stdout.write("{0} [{1}] {2}% {3}\r".format(self._prefix, bar, percents, self._sufix)) + bar = '#' * filledLength + '-' * (self._bar_length - filledLength) + sys.stdout.write("{0} [{1}] {2}% {3}\r".format(self._prefix, bar, percents, self._sufix)) sys.stdout.flush() - + if iteration == self._total:print("\n") - - + + From 651c0b1b55c4c3d8680065688ef1e91497c304d3 Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Sat, 19 Nov 2016 10:42:06 -0800 Subject: [PATCH 11/22] updated notebooks with ConfigParser --- .../Edge_Investigation_master.ipynb | 39 +++++++++----- .../Threat_Investigation_master.ipynb | 14 +++-- .../Edge_Investigation_master.ipynb | 51 ++++++++++++------- .../Threat_Investigation_master.ipynb | 14 +++-- .../Edge_Investigation_master.ipynb | 15 +++--- .../Threat_Investigation_master.ipynb | 16 +++--- 6 files changed, 95 insertions(+), 54 deletions(-) diff --git a/spot-oa/oa/dns/ipynb_templates/Edge_Investigation_master.ipynb b/spot-oa/oa/dns/ipynb_templates/Edge_Investigation_master.ipynb index 5573c9a..e2c1f8b 100644 --- a/spot-oa/oa/dns/ipynb_templates/Edge_Investigation_master.ipynb +++ b/spot-oa/oa/dns/ipynb_templates/Edge_Investigation_master.ipynb @@ -129,15 +129,33 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": { "collapsed": false }, - "outputs": [], + "outputs": [ + { + "ename": "NameError", + "evalue": "name 'assign_btn' is not defined", + "output_type": "error", + "traceback": [ + "\u001b[0;31m\u001b[0m", + "\u001b[0;31mNameError\u001b[0mTraceback (most recent call last)", + "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[1;32m 86\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 87\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 88\u001b[0;31m \u001b[0massign_btn\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mon_click\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0massign_score\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 89\u001b[0m \u001b[0msave_btn\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mon_click\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msave\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 90\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;31mNameError\u001b[0m: name 'assign_btn' is not defined" + ] + } + ], "source": [ "import csv\n", "import datetime\n", - "import subprocess \n", + "import subprocess\n", + "import ConfigParser\n", + "\n", + "# initialize ConfigParser \n", + "conf_file = '/etc/spot.conf' \n", + "conf = ConfigParser.SafeConfigParser() \n", + "conf.read(conf_file) \n", "\n", "def assign_score(b):\n", " score_values = []\n", @@ -222,14 +240,11 @@ "\n", "def ml_feedback():\n", " dst_name = os.path.basename(sconnect)\n", - " str_fb=\"DSOURCE={0} &&\\\n", - " FDATE={1} &&\\\n", - " source /etc/spot.conf &&\\\n", - " usr=$(echo $LUSER | cut -f3 -d'/') &&\\\n", - " mlnode=$MLNODE &&\\\n", - " lpath=$LPATH &&\\\n", - " scp {2} $usr@$mlnode:$lpath/{3}\".format(dsource,date,score_fbk,dst_name) \n", - "\n", + " mlnode = conf.get('DEFAULT', 'MLNODE')\n", + " lpath = conf.get('DEFAULT','LPATH', vars={'DSOURCE': dsource,'FDATE': date, })\n", + " usr = conf.get('DEFAULT','LUSER').split('/')[2]\n", + " str_fb=\"scp {0} {1}@{2}:{3}/{4}\".format(score_fbk,usr,mlnode,lpath,dst_name) \n", + " \n", " subprocess.call(str_fb, shell=True)" ] }, @@ -261,7 +276,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", - "version": "2.7.5" + "version": "2.7.11" } }, "nbformat": 4, diff --git a/spot-oa/oa/dns/ipynb_templates/Threat_Investigation_master.ipynb b/spot-oa/oa/dns/ipynb_templates/Threat_Investigation_master.ipynb index 8c665a8..140f993 100644 --- a/spot-oa/oa/dns/ipynb_templates/Threat_Investigation_master.ipynb +++ b/spot-oa/oa/dns/ipynb_templates/Threat_Investigation_master.ipynb @@ -23,6 +23,7 @@ "import datetime\n", "import operator\n", "import itertools\n", + "import ConfigParser\n", "\n", "try:\n", " import ipywidgets as widgets # For jupyter/ipython >= 1.4\n", @@ -30,10 +31,13 @@ " from IPython.html import widgets\n", "from IPython.display import display, HTML, clear_output, Javascript \n", "\n", - "with open('/etc/spot.conf') as conf:\n", - " for line in conf.readlines():\n", - " if \"DBNAME=\" in line: DBNAME = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n", - " elif \"IMPALA_DEM=\" in line: IMPALA_DEM = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n", + "# initialize ConfigParser \n", + "conf_file = '/etc/spot.conf' \n", + "conf = ConfigParser.SafeConfigParser() \n", + "conf.read(conf_file) \n", + "\n", + "DBNAME = conf.get('database', 'DBNAME')\n", + "IMPALA_DEM = conf.get('database', 'IMPALA_DEM')\n", "\n", "path = os.getcwd().split(\"/\") \n", "t_date = path[len(path)-1] \n", @@ -333,7 +337,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", - "version": "2.7.5" + "version": "2.7.11" } }, "nbformat": 4, diff --git a/spot-oa/oa/flow/ipynb_templates/Edge_Investigation_master.ipynb b/spot-oa/oa/flow/ipynb_templates/Edge_Investigation_master.ipynb index e916471..927571a 100644 --- a/spot-oa/oa/flow/ipynb_templates/Edge_Investigation_master.ipynb +++ b/spot-oa/oa/flow/ipynb_templates/Edge_Investigation_master.ipynb @@ -10,7 +10,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "collapsed": true + }, "outputs": [], "source": [ "import struct, socket\n", @@ -20,7 +22,8 @@ "import linecache, bisect\n", "import csv, json\n", "import operator\n", - "import os, time, subprocess \n", + "import os, time, subprocess\n", + "import ConfigParser\n", "from collections import OrderedDict\n", "\n", "try:\n", @@ -30,6 +33,11 @@ "\n", "from IPython.display import display, Javascript, clear_output\n", "\n", + "# initialize ConfigParser \n", + "conf_file = '/etc/spot.conf' \n", + "conf = ConfigParser.SafeConfigParser() \n", + "conf.read(conf_file) \n", + "\n", "path = os.getcwd().split(\"/\") \n", "date = path[len(path)-1] \n", "dsource = path[len(path)-2] \n", @@ -55,7 +63,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "collapsed": true + }, "outputs": [], "source": [ "def apply_css_to_select(select):\n", @@ -325,16 +335,13 @@ " except:\n", " print \"Key Error for ip: \" + srcip\n", " \n", - " \n", + "\n", "def ml_feedback():\n", " dst_name = os.path.basename(sconnect)\n", - " str_fb=\"DSOURCE={0} &&\\\n", - " FDATE={1} &&\\\n", - " source /etc/spot.conf &&\\\n", - " usr=$(echo $LUSER | cut -f3 -d'/') &&\\\n", - " mlnode=$MLNODE &&\\\n", - " lpath=$LPATH &&\\\n", - " scp {2} $usr@$mlnode:$lpath/{3}\".format(dsource,date,score_fbk,dst_name) \n", + " mlnode = conf.get('DEFAULT', 'MLNODE')\n", + " lpath = conf.get('DEFAULT','LPATH', vars={'DSOURCE': dsource,'FDATE': date, })\n", + " usr = conf.get('DEFAULT','LUSER').split('/')[2]\n", + " str_fb=\"scp {0} {1}@{2}:{3}/{4}\".format(score_fbk,usr,mlnode,lpath,dst_name) \n", " \n", " subprocess.call(str_fb, shell=True)" ] @@ -349,7 +356,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "collapsed": true + }, "outputs": [], "source": [ "# set_rules()" @@ -358,7 +367,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "collapsed": true + }, "outputs": [], "source": [ "# attack_heuristics()" @@ -367,7 +378,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "collapsed": true + }, "outputs": [], "source": [ "displaythis()" @@ -376,7 +389,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "collapsed": true + }, "outputs": [], "source": [ "# !cp $sconnectbu $sconnect" @@ -392,16 +407,16 @@ "language_info": { "codemirror_mode": { "name": "ipython", - "version": 2.0 + "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", - "version": "2.7.5" + "version": "2.7.11" } }, "nbformat": 4, "nbformat_minor": 0 -} \ No newline at end of file +} diff --git a/spot-oa/oa/flow/ipynb_templates/Threat_Investigation_master.ipynb b/spot-oa/oa/flow/ipynb_templates/Threat_Investigation_master.ipynb index 6393846..e4fe415 100644 --- a/spot-oa/oa/flow/ipynb_templates/Threat_Investigation_master.ipynb +++ b/spot-oa/oa/flow/ipynb_templates/Threat_Investigation_master.ipynb @@ -15,16 +15,20 @@ "import operator\n", "import json\n", "import os\n", + "import ConfigParser\n", "try:\n", " import ipywidgets as widgets # For jupyter/ipython >= 1.4\n", "except ImportError:\n", " from IPython.html import widgets\n", "from IPython.display import display, Javascript, clear_output\n", "\n", - "with open('/etc/spot.conf') as conf:\n", - " for line in conf.readlines(): \n", - " if \"DBNAME=\" in line: DBNAME = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n", - " elif \"IMPALA_DEM=\" in line: IMPALA_DEM = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n", + "# initialize ConfigParser \n", + "conf_file = '/etc/spot.conf' \n", + "conf = ConfigParser.SafeConfigParser() \n", + "conf.read(conf_file) \n", + "\n", + "DBNAME = conf.get('database', 'DBNAME')\n", + "IMPALA_DEM = conf.get('database', 'IMPALA_DEM')\n", "\n", "spath = os.getcwd()\n", "path = spath.split(\"/\") \n", @@ -770,7 +774,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", - "version": "2.7.5" + "version": "2.7.11" } }, "nbformat": 4, diff --git a/spot-oa/oa/proxy/ipynb_templates/Edge_Investigation_master.ipynb b/spot-oa/oa/proxy/ipynb_templates/Edge_Investigation_master.ipynb index 86d6fef..6521ea4 100644 --- a/spot-oa/oa/proxy/ipynb_templates/Edge_Investigation_master.ipynb +++ b/spot-oa/oa/proxy/ipynb_templates/Edge_Investigation_master.ipynb @@ -124,6 +124,7 @@ "import csv\n", "import datetime\n", "import subprocess \n", + "import ConfigParser\n", "\n", "def assign_score(b):\n", " scored_threats = []\n", @@ -183,13 +184,11 @@ "\n", "def ml_feedback():\n", " dst_name = os.path.basename(sconnect)\n", - " str_fb=\"DSOURCE={0} &&\\\n", - " FDATE={1} &&\\\n", - " source /etc/spot.conf &&\\\n", - " usr=$(echo $LUSER | cut -f3 -d'/') &&\\\n", - " mlnode=$MLNODE &&\\\n", - " lpath=$LPATH &&\\\n", - " scp {2} $usr@$mlnode:$lpath/{3}\".format(dsource,date,score_fbk,dst_name)\n", + " mlnode = conf.get('DEFAULT', 'MLNODE')\n", + " lpath = conf.get('DEFAULT','LPATH', vars={'DSOURCE': dsource,'FDATE': date, })\n", + " usr = conf.get('DEFAULT','LUSER').split('/')[2]\n", + " str_fb=\"scp {0} {1}@{2}:{3}/{4}\".format(score_fbk,usr,mlnode,lpath,dst_name) \n", + " \n", " subprocess.call(str_fb, shell=True)" ] }, @@ -221,7 +220,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", - "version": "2.7.5" + "version": "2.7.11" } }, "nbformat": 4, diff --git a/spot-oa/oa/proxy/ipynb_templates/Threat_Investigation_master.ipynb b/spot-oa/oa/proxy/ipynb_templates/Threat_Investigation_master.ipynb index 5cd89db..cc6a855 100644 --- a/spot-oa/oa/proxy/ipynb_templates/Threat_Investigation_master.ipynb +++ b/spot-oa/oa/proxy/ipynb_templates/Threat_Investigation_master.ipynb @@ -20,6 +20,7 @@ "import itertools\n", "import md5\n", "from collections import defaultdict \n", + "import ConfigParser\n", "\n", "try:\n", " import ipywidgets as widgets # For jupyter/ipython >= 1.4\n", @@ -27,11 +28,14 @@ " from IPython.html import widgets\n", "from IPython.display import display, HTML, clear_output, Javascript \n", "\n", - "with open('/etc/spot.conf') as conf:\n", - " for line in conf.readlines():\n", - " if \"DBNAME=\" in line: DBNAME = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n", - " elif \"IMPALA_DEM=\" in line: IMPALA_DEM = line.split(\"=\")[1].strip('\\n').replace(\"'\",\"\"); \n", - " \n", + "# initialize ConfigParser \n", + "conf_file = '/etc/spot.conf' \n", + "conf = ConfigParser.SafeConfigParser() \n", + "conf.read(conf_file) \n", + "\n", + "DBNAME = conf.get('database', 'DBNAME')\n", + "IMPALA_DEM = conf.get('database', 'IMPALA_DEM')\n", + "\n", "path = os.getcwd().split(\"/\") \n", "date = path[len(path)-1] \n", "dpath = '/'.join(['data' if var == 'ipynb' else var for var in path]) + '/'\n", @@ -396,7 +400,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", - "version": "2.7.5" + "version": "2.7.11" }, "widgets": { "state": { From 3e4f2d44f390c96099690bfb70de034a0320457a Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Sat, 19 Nov 2016 10:44:32 -0800 Subject: [PATCH 12/22] bug fixed --- spot-oa/oa/flow/flow_oa.py | 2 +- spot-setup/hdfs_setup.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/spot-oa/oa/flow/flow_oa.py b/spot-oa/oa/flow/flow_oa.py index 652f6eb..a7f070a 100644 --- a/spot-oa/oa/flow/flow_oa.py +++ b/spot-oa/oa/flow/flow_oa.py @@ -49,7 +49,7 @@ def _initialize_members(self,date,limit,logger): self._conf = json.loads(open (conf_file).read(),object_pairs_hook=OrderedDict) # initialize data engine - self._db = self._spot_conf.get('DEFAULT', 'DBNAME') + self._db = self._spot_conf.get('database', 'DBNAME') self._engine = Data(self._db, self._table_name,self._logger) def start(self): diff --git a/spot-setup/hdfs_setup.py b/spot-setup/hdfs_setup.py index 484644e..fa6067d 100644 --- a/spot-setup/hdfs_setup.py +++ b/spot-setup/hdfs_setup.py @@ -22,7 +22,7 @@ def main(): DFOLDERS = conf.get('DEFAULT','DFOLDERS').split() HUSER = conf.get('DEFAULT','HUSER') USER = os.environ.get('USER') - DBNAME = conf.get('DATABASE','DBNAME') + DBNAME = conf.get('database','DBNAME') #create hdfs folders mkdir = "sudo -u hdfs hadoop fs -mkdir " + HUSER @@ -40,6 +40,8 @@ def main(): #Create hive tables #create catalog cmd = "hive -e 'CREATE DATABASE {0}'".format(DBNAME) + execute_cmd(cmd,logger) + for source in DSOURCES: cmd = "hive -hiveconf huser={0} -hiveconf dbname={1} -f create_{2}_avro_parquet.hql".format(HUSER,DBNAME,source) execute_cmd(cmd,logger) From 480891ef5c0679865bae4f3cb63691377be540a4 Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Tue, 22 Nov 2016 07:56:43 -0800 Subject: [PATCH 13/22] updated ml_ops.py with model variable --- spot-ml/ml_ops.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/spot-ml/ml_ops.py b/spot-ml/ml_ops.py index 2f9f690..d45a2ad 100755 --- a/spot-ml/ml_ops.py +++ b/spot-ml/ml_ops.py @@ -52,10 +52,12 @@ def main(): ## paths for intermediate files HDFS_DOCRESULTS = "{0}/doc_results.csv".format(HPATH) LOCAL_DOCRESULTS = "{0}/doc_results.csv".format(LPATH) + HDFS_WORDRESULTS = "{0}/word_results.csv".format(HPATH) LOCAL_WORDRESULTS = "{0}/word_results.csv".format(LPATH) HDFS_SCORED_CONNECTS = "{0}/scores".format(HPATH) + HDFS_MODEL = "{0}/model".format(HPATH) LDA_OUTPUT_DIR = "{1}/{1}".format(args.type,args.fdate) @@ -140,13 +142,14 @@ def main(): "--topiccount " + TOPIC_COUNT, "--nodes " + nodes_csl, "--scored " + HDFS_SCORED_CONNECTS, + "--tempmodel " + HDFS_MODEL, "--threshold " + TOL, "--maxresults " + MAXRESULTS] spark_cmd.extend(spot_jar) - execute_cmd(spark_cmd,logger) - #process = subprocess.Popen(spark_cmd, stdout=subprocess.PIPE, stderr=None) + execute_cmd(spark_cmd, logger) + # process = subprocess.Popen(spark_cmd, stdout=subprocess.PIPE, stderr=None) ## move results to hdfs. os.chdir(LPATH) From eb2df8863691e82cd03f45d21ea91e774125c381 Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Tue, 22 Nov 2016 08:23:23 -0800 Subject: [PATCH 14/22] added new ingest confs --- spot-setup/spot.conf | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/spot-setup/spot.conf b/spot-setup/spot.conf index 50c333a..1aaaf5d 100755 --- a/spot-setup/spot.conf +++ b/spot-setup/spot.conf @@ -72,6 +72,12 @@ kafka_port=9183 zookeeper_server=localhost zookeeper_port=2181 message_size=999999 +# spark streaming options +driver_memory= +spark_exec= +spark_executor_memory= +spark_executor_cores +spark_batch_size [flow] type=flow From 29f4c190ecb7fa6b0470fd732afe0f657f39e8b4 Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Tue, 22 Nov 2016 13:24:02 -0800 Subject: [PATCH 15/22] made ml_ops.py executable --- spot-ml/ml_ops.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spot-ml/ml_ops.py b/spot-ml/ml_ops.py index d45a2ad..9f3fa3f 100755 --- a/spot-ml/ml_ops.py +++ b/spot-ml/ml_ops.py @@ -1,9 +1,12 @@ +#!/usr/bin/python + import os import logging import argparse import subprocess import ConfigParser + def main(): #initialize ConfigParser From 6f2be08b01f83ad7228b0f433b0dea26e63f5aad Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Tue, 22 Nov 2016 16:40:17 -0800 Subject: [PATCH 16/22] updated dns pipeline to ConfigParser --- spot-ingest/pipelines/dns/collector.py | 79 +++++++++++++------------- spot-ingest/pipelines/dns/worker.py | 21 ++++--- 2 files changed, 50 insertions(+), 50 deletions(-) diff --git a/spot-ingest/pipelines/dns/collector.py b/spot-ingest/pipelines/dns/collector.py index 1eae7d2..24282b5 100755 --- a/spot-ingest/pipelines/dns/collector.py +++ b/spot-ingest/pipelines/dns/collector.py @@ -1,24 +1,25 @@ -#/bin/env python +#!/bin/env python import time import os import subprocess -import json import logging +import ConfigParser from multiprocessing import Process from common.utils import Util from common.file_collector import FileWatcher from multiprocessing import Pool from common.kafka_client import KafkaTopic + class Collector(object): - def __init__(self,hdfs_app_path,kafka_topic,conf_type): - - self._initialize_members(hdfs_app_path,kafka_topic,conf_type) + def __init__(self, hdfs_app_path, kafka_topic, conf_type): + + self._initialize_members(hdfs_app_path, kafka_topic, conf_type) + + def _initialize_members(self, hdfs_app_path, kafka_topic, conf_type): - def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type): - # getting parameters. self._logger = logging.getLogger('SPOT.INGEST.DNS') self._hdfs_app_path = hdfs_app_path @@ -28,63 +29,64 @@ def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type): self._script_path = os.path.dirname(os.path.abspath(__file__)) # read dns configuration. - conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path))) - conf = json.loads(open(conf_file).read()) - self._conf = conf["pipelines"][conf_type] + conf_file = "/etc/spot.conf" + self._conf = ConfigParser.SafeConfigParser() + self._conf.read(conf_file) # set configuration. - self._collector_path = self._conf['collector_path'] + self._collector_path = self._conf.get(conf_type, 'collector_path') self._dsource = 'dns' self._hdfs_root_path = "{0}/{1}".format(hdfs_app_path, self._dsource) # set configuration. - self._pkt_num = self._conf['pkt_num'] - self._pcap_split_staging = self._conf['pcap_split_staging'] - self._supported_files = self._conf['supported_files'] + self._pkt_num = self._conf.get(conf_type, 'pkt_num') + self._pcap_split_staging = self._conf.get(conf_type, 'pcap_split_staging') + self._supported_files = self._conf.get(conf_type, 'supported_files') - # create collector watcher - self._watcher = FileWatcher(self._collector_path,self._supported_files) + # create collector watcher + self._watcher = FileWatcher(self._collector_path, self._supported_files) - # Multiprocessing. - self._processes = conf["collector_processes"] - self._ingestion_interval = conf["ingestion_interval"] + # Multiprocessing. + self._processes = self._conf.get(conf_type, 'collector_processes') + self._ingestion_interval = self._conf('ingest', 'ingestion_interval') self._pool = Pool(processes=self._processes) def start(self): - self._logger.info("Starting DNS ingest") + self._logger.info("Starting DNS ingest") self._watcher.start() try: while True: - self._ingest_files_pool() + self._ingest_files_pool() time.sleep(self._ingestion_interval) except KeyboardInterrupt: - self._logger.info("Stopping DNS collector...") - Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger) + self._logger.info("Stopping DNS collector...") + Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger) self._watcher.stop() self._pool.terminate() - self._pool.close() + self._pool.close() self._pool.join() SystemExit("Ingest finished...") - def _ingest_files_pool(self): - + def _ingest_files_pool(self): + if self._watcher.HasFiles: - + for x in range(0,self._processes): file = self._watcher.GetNextFile() resutl = self._pool.apply_async(ingest_file,args=(file,self._pkt_num,self._pcap_split_staging,self._kafka_topic.Partition,self._hdfs_root_path ,self._kafka_topic.Topic,self._kafka_topic.BootstrapServers,)) #resutl.get() # to debug add try and catch. - if not self._watcher.HasFiles: break + if not self._watcher.HasFiles: break return True + def ingest_file(file,pkt_num,pcap_split_staging, partition,hdfs_root_path,topic,kafka_servers): logger = logging.getLogger('SPOT.INGEST.FLOW.{0}'.format(os.getpid())) - + try: # get file name and date. org_file = file @@ -107,27 +109,26 @@ def ingest_file(file,pkt_num,pcap_split_staging, partition,hdfs_root_path,topic, pcap_date_path = file_date[-14:-6] # hdfs path with timestamp. - hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path,pcap_date_path,pcap_hour) + hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path, pcap_date_path, pcap_hour) # create hdfs path. - Util.creat_hdfs_folder(hdfs_path,logger) + Util.creat_hdfs_folder(hdfs_path, logger) # load file to hdfs. - hadoop_pcap_file = "{0}/{1}".format(hdfs_path,file) - Util.load_to_hdfs(os.path.join(currdir,file),hadoop_pcap_file,logger) + hadoop_pcap_file = "{0}/{1}".format(hdfs_path, file) + Util.load_to_hdfs(os.path.join(currdir, file), hadoop_pcap_file, logger) # create event for workers to process the file. logger.info( "Sending split file to worker number: {0}".format(partition)) - KafkaTopic.SendMessage(hadoop_pcap_file,kafka_servers,topic,partition) - logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic)) + KafkaTopic.SendMessage(hadoop_pcap_file, kafka_servers, topic, partition) + logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file, topic)) logger.info("Removing file: {0}".format(org_file)) rm_big_file = "rm {0}".format(org_file) - Util.execute_cmd(rm_big_file,logger) - + Util.execute_cmd(rm_big_file, logger) + except Exception as err: - + logger.error("There was a problem, please check the following error message:{0}".format(err.message)) logger.error("Exception: {0}".format(err)) - diff --git a/spot-ingest/pipelines/dns/worker.py b/spot-ingest/pipelines/dns/worker.py index 06a381a..10b753f 100755 --- a/spot-ingest/pipelines/dns/worker.py +++ b/spot-ingest/pipelines/dns/worker.py @@ -3,7 +3,7 @@ import logging import datetime import subprocess -import json +import ConfigParser import os from multiprocessing import Process from common.utils import Util @@ -12,7 +12,7 @@ class Worker(object): def __init__(self,db_name,hdfs_app_path,kafka_consumer,conf_type,processes=None): - + self._initialize_members(db_name,hdfs_app_path,kafka_consumer,conf_type) def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type): @@ -24,13 +24,12 @@ def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type): self._hdfs_app_path = hdfs_app_path # read proxy configuration. - self._script_path = os.path.dirname(os.path.abspath(__file__)) - conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path))) - conf = json.loads(open(conf_file).read()) - self._conf = conf["pipelines"][conf_type] + conf_file = "/etc/spot.conf" + self._conf = ConfigParser.SafeConfigParser() + self._conf.read(conf_file) - self._process_opt = self._conf['process_opt'] - self._local_staging = self._conf['local_staging'] + self._process_opt = self._conf.get(conf_type, 'process_opt') + self._local_staging = self._conf.get(conf_type, 'local_staging') self.kafka_consumer = kafka_consumer def start(self): @@ -42,9 +41,9 @@ def start(self): def _new_file(self,file): self._logger.info("-------------------------------------- New File received --------------------------------------") - self._logger.info("File: {0} ".format(file)) + self._logger.info("File: {0} ".format(file)) p = Process(target=self._process_new_file, args=(file,)) - p.start() + p.start() def _process_new_file(self,file): @@ -81,7 +80,7 @@ def _process_new_file(self,file): self._logger.info("Moving data to staging: {0}".format(mv_to_staging)) Util.execute_cmd(mv_to_staging,self._logger) - #load to avro + # load to avro load_to_avro_cmd = "hive -hiveconf dbname={0} -hiveconf y={1} -hiveconf m={2} -hiveconf d={3} -hiveconf h={4} -hiveconf data_location='{5}' -f pipelines/dns/load_dns_avro_parquet.hql".format(self._db_name,binary_year,binary_month,binary_day,binary_hour,hdfs_staging_path) self._logger.info("Loading data to hive: {0}".format(load_to_avro_cmd)) From 83691e5005bcf81de66dbe9ead00b3a9bb2c854c Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Tue, 22 Nov 2016 16:40:50 -0800 Subject: [PATCH 17/22] update flow pipeline to ConfigParser --- spot-ingest/pipelines/flow/collector.py | 53 +++++++++++++------------ spot-ingest/pipelines/flow/worker.py | 19 +++++---- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/spot-ingest/pipelines/flow/collector.py b/spot-ingest/pipelines/flow/collector.py index 1f45f16..f76699b 100755 --- a/spot-ingest/pipelines/flow/collector.py +++ b/spot-ingest/pipelines/flow/collector.py @@ -3,21 +3,22 @@ import time import logging import os -import json +import ConfigParser from multiprocessing import Process from common.utils import Util from common.file_collector import FileWatcher from multiprocessing import Pool from common.kafka_client import KafkaTopic + class Collector(object): def __init__(self,hdfs_app_path,kafka_topic,conf_type): - + self._initialize_members(hdfs_app_path,kafka_topic,conf_type) def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type): - + # getting parameters. self._logger = logging.getLogger('SPOT.INGEST.FLOW') self._hdfs_app_path = hdfs_app_path @@ -27,55 +28,55 @@ def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type): self._script_path = os.path.dirname(os.path.abspath(__file__)) # read flow configuration. - conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path))) - conf = json.loads(open(conf_file).read()) - self._conf = conf["pipelines"][conf_type] + conf_file = '/etc/spot.conf' + self._conf = ConfigParser.SafeConfigParser() + self._conf.read(conf_file) # set configuration. - self._collector_path = self._conf['collector_path'] + self._collector_path = self._conf.get(conf_type, 'collector_path') self._dsource = 'flow' self._hdfs_root_path = "{0}/{1}".format(hdfs_app_path, self._dsource) - self._supported_files = self._conf['supported_files'] + self._supported_files = self._conf.get(conf_type, 'supported_files') # create collector watcher self._watcher = FileWatcher(self._collector_path,self._supported_files) - - # Multiprocessing. - self._processes = conf["collector_processes"] - self._ingestion_interval = conf["ingestion_interval"] + + # Multiprocessing. + self._processes = self._conf.get('ingest', 'collector_processes') + self._ingestion_interval = self._conf.get('ingest', 'ingestion_interval') self._pool = Pool(processes=self._processes) def start(self): - self._logger.info("Starting FLOW ingest") + self._logger.info("Starting FLOW ingest") self._watcher.start() - + try: - while True: - self._ingest_files_pool() + while True: + self._ingest_files_pool() time.sleep(self._ingestion_interval) except KeyboardInterrupt: - self._logger.info("Stopping FLOW collector...") - Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger) + self._logger.info("Stopping FLOW collector...") + Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger) self._watcher.stop() self._pool.terminate() - self._pool.close() + self._pool.close() self._pool.join() SystemExit("Ingest finished...") - - def _ingest_files_pool(self): - + + def _ingest_files_pool(self): + if self._watcher.HasFiles: - + for x in range(0,self._processes): file = self._watcher.GetNextFile() resutl = self._pool.apply_async(ingest_file,args=(file,self._kafka_topic.Partition,self._hdfs_root_path ,self._kafka_topic.Topic,self._kafka_topic.BootstrapServers,)) #resutl.get() # to debug add try and catch. - if not self._watcher.HasFiles: break + if not self._watcher.HasFiles: break return True - + def ingest_file(file,partition,hdfs_root_path,topic,kafka_servers): @@ -102,7 +103,7 @@ def ingest_file(file,partition,hdfs_root_path,topic,kafka_servers): # create event for workers to process the file. logger.info("Sending file to worker number: {0}".format(partition)) - KafkaTopic.SendMessage(hdfs_file,kafka_servers,topic,partition) + KafkaTopic.SendMessage(hdfs_file,kafka_servers,topic,partition) logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic)) except Exception as err: diff --git a/spot-ingest/pipelines/flow/worker.py b/spot-ingest/pipelines/flow/worker.py index 8d2efd9..04ad9b2 100755 --- a/spot-ingest/pipelines/flow/worker.py +++ b/spot-ingest/pipelines/flow/worker.py @@ -5,7 +5,7 @@ import datetime import logging import os -import json +import ConfigParser from multiprocessing import Process from common.utils import Util @@ -24,13 +24,12 @@ def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type): self._hdfs_app_path = hdfs_app_path # read proxy configuration. - self._script_path = os.path.dirname(os.path.abspath(__file__)) - conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path))) - conf = json.loads(open(conf_file).read()) - self._conf = conf["pipelines"][conf_type] + conf_file = "/etc/spot.conf" + self._conf = ConfigParser.SafeConfigParser() + self._conf.read(conf_file) - self._process_opt = self._conf['process_opt'] - self._local_staging = self._conf['local_staging'] + self._process_opt = self._conf.get(conf_type, 'process_opt') + self._local_staging = self._conf.get(conf_type, 'local_staging') self.kafka_consumer = kafka_consumer def start(self): @@ -42,9 +41,9 @@ def start(self): def _new_file(self,file): self._logger.info("-------------------------------------- New File received --------------------------------------") - self._logger.info("File: {0} ".format(file)) + self._logger.info("File: {0} ".format(file)) p = Process(target=self._process_new_file, args=(file,)) - p.start() + p.start() def _process_new_file(self,file): @@ -66,7 +65,7 @@ def _process_new_file(self,file): # build process cmd. process_cmd = "nfdump -o csv -r {0}{1} {2} > {0}{1}.csv".format(self._local_staging,file_name,self._process_opt) self._logger.info("Processing file: {0}".format(process_cmd)) - Util.execute_cmd(process_cmd,self._logger) + Util.execute_cmd(process_cmd,self._logger) # create hdfs staging. hdfs_path = "{0}/flow".format(self._hdfs_app_path) From 1c892d6b0535456020887394e0f1160dba04aac3 Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Tue, 22 Nov 2016 16:41:29 -0800 Subject: [PATCH 18/22] update proxy pipeline with ConfigParser --- spot-ingest/pipelines/proxy/collector.py | 65 ++++++++++++------------ spot-ingest/pipelines/proxy/worker.py | 59 ++++++++++----------- 2 files changed, 61 insertions(+), 63 deletions(-) diff --git a/spot-ingest/pipelines/proxy/collector.py b/spot-ingest/pipelines/proxy/collector.py index 4cffa12..4e6424a 100644 --- a/spot-ingest/pipelines/proxy/collector.py +++ b/spot-ingest/pipelines/proxy/collector.py @@ -1,7 +1,7 @@ #!/bin/env python import logging -import json +import ConfigParser import os import sys import copy @@ -11,14 +11,15 @@ from common.file_collector import FileWatcher import time + class Collector(object): - + def __init__(self,hdfs_app_path,kafka_topic,conf_type): - + self._initialize_members(hdfs_app_path,kafka_topic,conf_type) - + def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type): - + # getting parameters. self._logger = logging.getLogger('SPOT.INGEST.PROXY') self._hdfs_app_path = hdfs_app_path @@ -28,75 +29,75 @@ def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type): self._script_path = os.path.dirname(os.path.abspath(__file__)) # read proxy configuration. - conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path))) - conf = json.loads(open(conf_file).read()) - self._message_size = conf["kafka"]["message_size"] - self._conf = conf["pipelines"][conf_type] + conf_file = "/etc/spot.conf" + self._conf = ConfigParser.SafeConfigParser() + self._conf.read(conf_file) + self._message_size = self._conf.get('ingest', "message_size") # get collector path. - self._collector_path = self._conf['collector_path'] + self._collector_path = self._conf.get(conf_type, 'collector_path') #get supported files - self._supported_files = self._conf['supported_files'] + self._supported_files = self._conf.get(conf_type, 'supported_files') # create collector watcher self._watcher = FileWatcher(self._collector_path,self._supported_files) - # Multiprocessing. - self._processes = conf["collector_processes"] - self._ingestion_interval = conf["ingestion_interval"] + # Multiprocessing. + self._processes = self._conf.get('ingest', "collector_processes") + self._ingestion_interval = self._conf.get('ingest', "ingestion_interval") self._pool = Pool(processes=self._processes) def start(self): self._logger.info("Starting PROXY collector") - self._watcher.start() - + self._watcher.start() + try: while True: #self._ingest_files() - self._ingest_files_pool() + self._ingest_files_pool() time.sleep(self._ingestion_interval) except KeyboardInterrupt: - self._logger.info("Stopping Proxy collector...") - Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger) + self._logger.info("Stopping Proxy collector...") + Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger) self._watcher.stop() self._pool.terminate() - self._pool.close() + self._pool.close() self._pool.join() - + def _ingest_files_pool(self): - - + + if self._watcher.HasFiles: - + for x in range(0,self._processes): file = self._watcher.GetNextFile() resutl = self._pool.apply_async(ingest_file,args=(file,self._message_size,self._kafka_topic.Topic,self._kafka_topic.BootstrapServers)) - #resutl.get() # to debug add try and catch. - if not self._watcher.HasFiles: break + # resutl.get() # to debug add try and catch. + if not self._watcher.HasFiles: break return True def ingest_file(file,message_size,topic,kafka_servers): - + logger = logging.getLogger('SPOT.INGEST.PROXY.{0}'.format(os.getpid())) - try: + try: message = "" - logger.info("Ingesting file: {0} process:{1}".format(file,os.getpid())) + logger.info("Ingesting file: {0} process:{1}".format(file,os.getpid())) with open(file,"rb") as f: for line in f: message += line if len(message) > message_size: KafkaTopic.SendMessage(message,kafka_servers,topic,0) message = "" - #send the last package. - KafkaTopic.SendMessage(message,kafka_servers,topic,0) + #send the last package. + KafkaTopic.SendMessage(message,kafka_servers,topic,0) rm_file = "rm {0}".format(file) Util.execute_cmd(rm_file,logger) logger.info("File {0} has been successfully sent to Kafka Topic: {1}".format(file,topic)) - except Exception as err: + except Exception as err: logger.error("There was a problem, please check the following error message:{0}".format(err.message)) logger.error("Exception: {0}".format(err)) diff --git a/spot-ingest/pipelines/proxy/worker.py b/spot-ingest/pipelines/proxy/worker.py index bc8d131..7e54a85 100644 --- a/spot-ingest/pipelines/proxy/worker.py +++ b/spot-ingest/pipelines/proxy/worker.py @@ -1,7 +1,7 @@ #!/bin/env python import os import logging -import json +import ConfigParser from common.utils import Util @@ -13,7 +13,7 @@ def __init__(self,db_name,hdfs_app_path,kafka_consumer,conf_type,processes): self._initialize_members(db_name,hdfs_app_path,kafka_consumer,conf_type,processes) def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type,processes): - + # get logger instance. self._logger = Util.get_logger('SPOT.INGEST.WRK.PROXY') @@ -23,43 +23,40 @@ def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type,proc # read proxy configuration. self._script_path = os.path.dirname(os.path.abspath(__file__)) - conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path))) - conf = json.loads(open(conf_file).read()) - self._spark_conf = conf["spark-streaming"] - self._conf = conf["pipelines"][conf_type] + conf_file = "/etc/spot.conf" + self._conf = ConfigParser.SafeConfigParser() + self._conf.read(conf_file) self._processes = processes def start(self): - self._logger.info("Creating Spark Job for topic: {0}".format(self._kafka_consumer.Topic)) + self._logger.info("Creating Spark Job for topic: {0}".format(self._kafka_consumer.Topic)) # parser - parser = self._conf["parser"] + parser = self._conf.get('proxy', "parser") + + # spark conf + diver_memory = self._conf.get('ingest', "driver_memory") + num_exec = self._conf.get('ingest', "spark_exec") + exec_memory = self._conf.get('ingest', "spark_executor_memory") + exec_cores = self._conf.get('ingest', "spark_executor_cores") + batch_size = self._conf.get('ingest', "spark_batch_size") - #spark conf - diver_memory = self._spark_conf["driver_memory"] - num_exec = self._spark_conf["spark_exec"] - exec_memory = self._spark_conf["spark_executor_memory"] - exec_cores = self._spark_conf["spark_executor_cores"] - batch_size = self._spark_conf["spark_batch_size"] - jar_path = os.path.dirname(os.path.dirname(self._script_path)) - # spark job command. + # spark job command. spark_job_cmd = ("spark-submit --master yarn " - "--driver-memory {0} " - "--num-executors {1} " - "--conf spark.executor.memory={2} " - "--conf spark.executor.cores={3} " - "--jars {4}/common/spark-streaming-kafka-0-8-assembly_2.11-2.0.0.jar " - "{5}/{6} " - "-zk {7} " - "-t {8} " - "-db {9} " - "-dt {10} " - "-w {11} " - "-bs {12}".format(diver_memory,num_exec,exec_memory,exec_cores,jar_path,self._script_path,parser,self._kafka_consumer.ZookeperServer,self._kafka_consumer.Topic,self._db_name,"proxy",self._processes,batch_size)) - + "--driver-memory {0} " + "--num-executors {1} " + "--conf spark.executor.memory={2} " + "--conf spark.executor.cores={3} " + "--jars {4}/common/spark-streaming-kafka-0-8-assembly_2.11-2.0.0.jar " + "{5}/{6} " + "-zk {7} " + "-t {8} " + "-db {9} " + "-dt {10} " + "-w {11} " + "-bs {12}".format(diver_memory,num_exec,exec_memory,exec_cores,jar_path,self._script_path,parser,self._kafka_consumer.ZookeperServer,self._kafka_consumer.Topic,self._db_name,"proxy",self._processes,batch_size)) + # start spark job. Util.execute_cmd(spark_job_cmd,self._logger) - - From bdd5faf7997fc0183cbc1bdeb3addb41b2ee052c Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Tue, 22 Nov 2016 16:42:00 -0800 Subject: [PATCH 19/22] add new options for ingest --- spot-setup/spot.conf | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spot-setup/spot.conf b/spot-setup/spot.conf index f83267b..74d454a 100755 --- a/spot-setup/spot.conf +++ b/spot-setup/spot.conf @@ -85,6 +85,7 @@ collector_path=/path_to_flow_collector local_staging=/tmp/ process_opt="" FLOW_PATH=%(HUSER)s/%(TYPE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ +supported_files=nfcapd. [dns] type=dns @@ -93,6 +94,7 @@ local_staging=/tmp/ pcap_split_staging=/tmp/ process_opt="-E separator=, -E header=y -E occurrence=f -T fields -e frame.time -e frame.time_epoch -e frame.len -e ip.src -e ip.dst -e dns.resp.name -e dns.resp.type -e dns.resp.class -e dns.flags.rcode -e dns.a 'dns.flags.response == 1'" DNS_PATH=%(HUSER)s/%(TYPE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ +supported_files=.pcap [proxy] type=proxy @@ -100,3 +102,4 @@ collector_path=/path_to_proxy_collector supported_files=["log"] parser=bro_parser.py PROXY_PATH=%(HUSER)s/%(TYPE)s/hive/y=%(YR)s/m=%(MH)s/d=%(DY)s/ +supported_files=.log From 4f6af020ae46b3772850a807ad2b8656115894f7 Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Tue, 22 Nov 2016 16:43:14 -0800 Subject: [PATCH 20/22] added lines to spot.conf --- spot-setup/spot.conf | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spot-setup/spot.conf b/spot-setup/spot.conf index 74d454a..0a1fd66 100755 --- a/spot-setup/spot.conf +++ b/spot-setup/spot.conf @@ -67,6 +67,8 @@ PROCESS_COUNT=20 [ingest] hdfs_app_path=hdfs application path +collector_processes=5 +ingestion_interval=1 kafka_server=kafka ip kafka_port=9183 zookeeper_server=localhost From 7f99a67614e4d148c542d55fae6c7c2b8707c833 Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Tue, 22 Nov 2016 17:40:02 -0800 Subject: [PATCH 21/22] code check of ml_ops.py, updated spark options --- spot-ml/README.md | 6 +-- spot-ml/ml_ops.py | 131 +++++++++++++++++++++++++--------------------- spot-ml/ml_ops.sh | 126 -------------------------------------------- 3 files changed, 74 insertions(+), 189 deletions(-) delete mode 100755 spot-ml/ml_ops.sh diff --git a/spot-ml/README.md b/spot-ml/README.md index a4eab66..75c4cb3 100644 --- a/spot-ml/README.md +++ b/spot-ml/README.md @@ -102,11 +102,11 @@ The Hive tables containing DNS data for spot-ml analyses have the following sche ### Run a suspicious connects analysis -To run a suspicious connects analysis, execute the `ml_ops.sh` script in the ml directory of the MLNODE. +To run a suspicious connects analysis, execute the `ml_ops.py` script in the ml directory of the MLNODE. ``` -./ml_ops.sh YYYMMDD +./ml_ops.py -d YYYMMDD -t -T -m ``` - +-T and -m are optional and will use the values from the spot.conf if not provided For example: ``` diff --git a/spot-ml/ml_ops.py b/spot-ml/ml_ops.py index 9f3fa3f..7b6988e 100755 --- a/spot-ml/ml_ops.py +++ b/spot-ml/ml_ops.py @@ -1,6 +1,7 @@ #!/usr/bin/python import os +import sys import logging import argparse import subprocess @@ -9,50 +10,56 @@ def main(): - #initialize ConfigParser + # initialize ConfigParser conf_file = '/etc/spot.conf' conf = ConfigParser.SafeConfigParser() spot_conf = conf.read(conf_file) - #initialize logging - logger = get_logger('SPOT.ML.OPS',create_file=False) + # initialize logging + logger = get_logger('SPOT.ML.OPS', create_file=False) - #check for file + # check for file if len(spot_conf) < 1: logger.info("Failed to open /etc/spot.conf, check file location and try again") raise SystemExit - ## parse and validate arguments + # parse and validate arguments tol = None # input Parameters parser = argparse.ArgumentParser(description="ML Operations script") - parser.add_argument('-t','--type',dest='type',required=True,help='Type of data that will be processed',metavar='') - parser.add_argument('-d','--date',dest='fdate',required=True,help='Specify date to be analyzed by ML',metavar='') - parser.add_argument('-T','--tol',dest='tol',required=False,help='If present will override default TOL from conf file',metavar='') - parser.add_argument('-m','--maxResults',dest='MAXRESULTS',required=False,help='If defined sets max results returned',metavar='') + parser.add_argument('-t','--type', dest='type', required=True, help='Type of data that will be processed', metavar='') + parser.add_argument('-d','--date', dest='fdate', required=True, help='Specify date to be analyzed by ML', metavar='') + parser.add_argument('-T','--tol', dest='tol', required=False, help='If present will override default TOL from conf file', metavar='') + parser.add_argument('-m','--maxResults', dest='MAXRESULTS', required=False, help='If defined sets max results returned', metavar='') args = parser.parse_args() - YR=args.fdate[0:4] - MH=args.fdate[4:6] - DY=args.fdate[6:8] + YR = args.fdate[0:4] + MH = args.fdate[4:6] + DY = args.fdate[6:8] - #getting defaults for ConfigParser interpolation + # getting defaults for ConfigParser interpolation DEFAULTS = vars(args) - DEFAULTS.update({'YR':YR,'MH':MH,'DY':DY}) - - ## prepare parameters pipeline stages - HPATH = conf.get('DEFAULT','HPATH',vars=DEFAULTS) - LPATH = conf.get('DEFAULT','LPATH',vars=DEFAULTS) - MAXRESULTS = conf.get('DEFAULT','MAXRESULTS') - DUPFACTOR = conf.get('DEFAULT','DUPFACTOR') - RAWDATA_PATH = conf.get(args.type,'{0}_PATH'.format(args.type.upper()), vars=DEFAULTS) - FEEDBACK_PATH = "{0}/{1}_scores.csv".format(conf.get('DEFAULT','LPATH',vars=DEFAULTS),args.type) + DEFAULTS.update({'YR': YR, 'MH': MH, 'DY': DY}) + + # prepare parameters pipeline stages + HPATH = conf.get('DEFAULT', 'HPATH', vars=DEFAULTS) + LPATH = conf.get('DEFAULT', 'LPATH', vars=DEFAULTS) + + if not MAXRESULTS: + MAXRESULTS = conf.get('DEFAULT', 'MAXRESULTS') + + RAWDATA_PATH = conf.get(args.type, '{0}_PATH'.format(args.type.upper()), vars=DEFAULTS) + + FEEDBACK_PATH = "{0}/{1}_scores.csv".format(conf.get('DEFAULT', 'LPATH', vars=DEFAULTS), args.type) + DUPFACTOR = conf.get('DEFAULT', 'DUPFACTOR') + PREPROCESS_STEP = "{0}_pre_lda".format(args.type) POSTPROCESS_STEP = "{0}_post_lda".format(args.type) + HDFS_WORDCOUNTS = "{0}/word_counts".format(HPATH) - ## paths for intermediate files + # paths for intermediate files HDFS_DOCRESULTS = "{0}/doc_results.csv".format(HPATH) LOCAL_DOCRESULTS = "{0}/doc_results.csv".format(LPATH) @@ -62,61 +69,65 @@ def main(): HDFS_SCORED_CONNECTS = "{0}/scores".format(HPATH) HDFS_MODEL = "{0}/model".format(HPATH) - LDA_OUTPUT_DIR = "{1}/{1}".format(args.type,args.fdate) + LDA_OUTPUT_DIR = "{1}/{1}".format(args.type, args.fdate) - #get nodes and create comma seperated list - NODES = conf.get('DEFAULT','NODES').split() + # get nodes and create comma seperated list + NODES = conf.get('DEFAULT', 'NODES').split() nodes_csl = ','.join(NODES) cmd = "hdfs dfs -rm -R -f {0}".format(HDFS_WORDCOUNTS) - execute_cmd(cmd,logger) + execute_cmd(cmd, logger) cmd = "mkdir -p {0}".format(LPATH) - execute_cmd(cmd,logger) + execute_cmd(cmd, logger) # protect the flow_scores.csv file cmd = "rm -f {0}/*.{dat,beta,gamma,other,pkl}".format(LPATH) - execute_cmd(cmd,logger) + execute_cmd(cmd, logger) cmd = "hdfs dfs -rm -R -f {0}".format(HDFS_SCORED_CONNECTS) - execute_cmd(cmd,logger) + execute_cmd(cmd, logger) # Add -p to execute pre MPI command. # Pre MPI command can be configured in /etc/spot.conf # In this script, after the line after --mpicmd ${MPI_CMD} add: # --mpiprep ${MPI_PREP_CMD} - if conf.get('mpi',"MPI_PREP_CMD"): - cmd = conf.get('mpi',"MPI_PREP_CMD") - execute_cmd(cmd,logger) + if conf.get('mpi', "MPI_PREP_CMD"): + cmd = conf.get('mpi', "MPI_PREP_CMD") + execute_cmd(cmd, logger) - SPK_CONFIG = conf.get('spark','SPK_CONFIG') - SPK_DRIVER_MEM = conf.get('spark','SPK_DRIVER_MEM') - SPK_EXEC = conf.get('spark','SPK_EXEC') - SPK_EXEC_CORES = conf.get('spark','SPK_EXEC_CORES') - SPK_EXEC_MEM = conf.get('spark','SPK_EXEC_MEM') + SPK_CONFIG = conf.get('spark', 'SPK_CONFIG') + SPK_DRIVER_MEM = conf.get('spark', 'SPK_DRIVER_MEM') + SPK_EXEC = conf.get('spark', 'SPK_EXEC') + SPK_EXEC_CORES = conf.get('spark', 'SPK_EXEC_CORES') + SPK_EXEC_MEM = conf.get('spark', 'SPK_EXEC_MEM') SPK_EXEC_MEM_OVERHEAD = conf.get('spark', 'SPK_EXEC_MEM_OVERHEAD') SPK_DRIVER_MAX_RESULTS = conf.get('spark', 'SPK_DRIVER_MAX_RESULTS') SPK_DRIVER_MEM_OVERHEAD = conf.get('spark', 'SPK_DRIVER_MEM_OVERHEAD') - LDAPATH = conf.get('DEFAULT','LDAPATH') - LUSER = conf.get('DEFAULT','LUSER') - MPI_CMD = conf.get('mpi','MPI_CMD') - PROCESS_COUNT = conf.get('mpi','PROCESS_COUNT') - TOPIC_COUNT = conf.get('DEFAULT','TOPIC_COUNT') + LDAPATH = conf.get('DEFAULT', 'LDAPATH') + LUSER = conf.get('DEFAULT', 'LUSER') + MPI_CMD = conf.get('mpi', 'MPI_CMD') + PROCESS_COUNT = conf.get('mpi', 'PROCESS_COUNT') + TOPIC_COUNT = conf.get('DEFAULT', 'TOPIC_COUNT') if tol: TOL = tol else: - TOL = conf.get('DEFAULT','TOL') - - #prepare options for spark-submit - spark_cmd = [ - "time", "spark-submit", "--class org.apache.spot.SuspiciousConnects", - "--master yarn-client", "--conf spark.driver.maxPermSize=512m", "--conf spark.driver.cores=1", - "--conf spark.dynamicAllocation.enabled=true", "--conf spark.dynamicAllocation.minExecutors=1", - "--conf spark.executor.extraJavaOptions=-XX:MaxPermSize=512M -XX:PermSize=512M", - "--conf spark.shuffle.io.preferDirectBufs=false", "--conf spark.kryoserializer.buffer.max=512m", - "--conf spark.shuffle.service.enabled=true", "--conf spark.yarn.am.waitTime=1000000"] + TOL = conf.get('DEFAULT', 'TOL') + + # prepare options for spark-submit + spark_cmd = ["time", "spark-submit", + "--class org.apache.spot.SuspiciousConnects", + "--master yarn-client", "--conf spark.driver.maxPermSize=512m", + "--conf spark.driver.cores=1", + "--conf spark.dynamicAllocation.enabled=true", + "--conf spark.dynamicAllocation.minExecutors=1", + "--conf spark.executor.extraJavaOptions=-XX:MaxPermSize=512M -XX:PermSize=512M", + "--conf spark.shuffle.io.preferDirectBufs=false", + "--conf spark.kryoserializer.buffer.max=512m", + "--conf spark.shuffle.service.enabled=true", + "--conf spark.yarn.am.waitTime=100s"] spark_extras = [ "--driver-memory " + SPK_DRIVER_MEM, @@ -124,7 +135,7 @@ def main(): "--conf spark.executor.cores=" + SPK_EXEC_CORES, "--conf spark.executor.memory=" + SPK_EXEC_MEM, "--conf spark.driver.maxResultSize=" + SPK_DRIVER_MAX_RESULTS, - "--conf spark.yarn.driver.memoryOverhead=" + SPK_DRIVER_MEM_OVERHEAD, + "--conf spark.yarn.am.memoryOverhead=" + SPK_DRIVER_MEM_OVERHEAD, "--conf spark.yarn.executor.memoryOverhead=" + SPK_EXEC_MEM_OVERHEAD] if SPK_CONFIG: @@ -154,12 +165,12 @@ def main(): execute_cmd(spark_cmd, logger) # process = subprocess.Popen(spark_cmd, stdout=subprocess.PIPE, stderr=None) - ## move results to hdfs. + # move results to hdfs. os.chdir(LPATH) - cmd = "hadoop fs -getmerge {0}/part-* {1}_results.csv && hadoop fs -moveFromLocal {1}_results.csv {0}/${1}_results.csv".format(HDFS_SCORED_CONNECTS,args.type) - execute_cmd(cmd,logger) + cmd = "hadoop fs -getmerge {0}/part-* {1}_results.csv && hadoop fs -moveFromLocal {1}_results.csv {0}/${1}_results.csv".format(HDFS_SCORED_CONNECTS, args.type) + execute_cmd(cmd, logger) -def execute_cmd(command,logger): +def execute_cmd(command, logger): try: logger.info("Executing: {0}".format(command)) @@ -169,7 +180,7 @@ def execute_cmd(command,logger): logger.error("There was an error executing: {0}".format(e.cmd)) sys.exit(1) -def validate_parameter(parameter,message,logger): +def validate_parameter(parameter, message, logger): if parameter == None or parameter == "": logger.error(message) sys.exit(1) @@ -202,5 +213,5 @@ def get_logger(logger_name,create_file=False): -if __name__=='__main__': +if __name__ =='__main__': main() diff --git a/spot-ml/ml_ops.sh b/spot-ml/ml_ops.sh deleted file mode 100755 index 8faef3b..0000000 --- a/spot-ml/ml_ops.sh +++ /dev/null @@ -1,126 +0,0 @@ -#!/bin/bash - -# parse and validate arguments - -FDATE=$1 -DSOURCE=$2 - -YR=${FDATE:0:4} -MH=${FDATE:4:2} -DY=${FDATE:6:2} - -if [[ "${#FDATE}" != "8" || -z "${DSOURCE}" ]]; then - echo "ml_ops.sh syntax error" - echo "Please run ml_ops.sh again with the correct syntax:" - echo "./ml_ops.sh YYYYMMDD TYPE [TOL]" - echo "for example:" - echo "./ml_ops.sh 20160122 dns 1e-6" - echo "./ml_ops.sh 20160122 flow" - exit -fi - - -# read in variables (except for date) from etc/.conf file -# note: FDATE and DSOURCE *must* be defined prior sourcing this conf file - -source /etc/spot.conf - -# third argument if present will override default TOL from conf file - -if [ -n "$3" ]; then TOL=$3 ; fi - -if [ -n "$4" ]; then - MAXRESULTS=$4 -else - MAXRESULTS=-1 -fi - -# prepare parameters pipeline stages - -if [ "$DSOURCE" == "flow" ]; then - RAWDATA_PATH=${FLOW_PATH} -elif [ "$DSOURCE" == "dns" ]; then - RAWDATA_PATH=${DNS_PATH} -else - RAWDATA_PATH=${PROXY_PATH} -fi - -FEEDBACK_PATH=${LPATH}/${DSOURCE}_scores.csv -DUPFACTOR=1000 - -PREPROCESS_STEP=${DSOURCE}_pre_lda -POSTPROCESS_STEP=${DSOURCE}_post_lda - -HDFS_WORDCOUNTS=${HPATH}/word_counts - -# paths for intermediate files -HDFS_DOCRESULTS=${HPATH}/doc_results.csv -LOCAL_DOCRESULTS=${LPATH}/doc_results.csv - -HDFS_WORDRESULTS=${HPATH}/word_results.csv -LOCAL_WORDRESULTS=${LPATH}/word_results.csv - -HDFS_SCORED_CONNECTS=${HPATH}/scores -HDFS_MODEL=${HPATH}/model - -LDA_OUTPUT_DIR=${DSOURCE}/${FDATE} - -TOPIC_COUNT=20 - -nodes=${NODES[0]} -for n in "${NODES[@]:1}" ; do nodes+=",${n}"; done - -hdfs dfs -rm -R -f ${HDFS_WORDCOUNTS} -wait - -mkdir -p ${LPATH} -rm -f ${LPATH}/*.{dat,beta,gamma,other,pkl} # protect the flow_scores.csv file - -hdfs dfs -rm -R -f ${HDFS_SCORED_CONNECTS} - -# Add -p to execute pre MPI command. -# Pre MPI command can be configured in /etc/spot.conf -# In this script, after the line after --mpicmd ${MPI_CMD} add: -# --mpiprep ${MPI_PREP_CMD} - -${MPI_PREP_CMD} - -time spark-submit --class "org.apache.spot.SuspiciousConnects" \ - --master yarn-client \ - --driver-memory ${SPK_DRIVER_MEM} \ - --num-executors ${SPK_EXEC} \ - --conf spark.driver.maxResultSize=${SPK_DRIVER_MAX_RESULTS} \ - --conf spark.driver.maxPermSize=512m \ - --conf spark.dynamicAllocation.enabled=true \ - --conf spark.executor.cores=${SPK_EXEC_CORES} \ - --conf spark.executor.memory=${SPK_EXEC_MEM} \ - --conf "spark.executor.extraJavaOptions=-XX:MaxPermSize=512M -XX:PermSize=512M" \ - --conf spark.kryoserializer.buffer.max=512m \ - --conf spark.yarn.am.waitTime=100s \ - --conf spark.yarn.am.memoryOverhead=${SPK_DRIVER_MEM_OVERHEAD} \ - --conf spark.yarn.executor.memoryOverhead=${SPAK_EXEC_MEM_OVERHEAD} target/scala-2.10/spot-ml-assembly-1.1.jar \ - --analysis ${DSOURCE} \ - --input ${RAWDATA_PATH} \ - --dupfactor ${DUPFACTOR} \ - --feedback ${FEEDBACK_PATH} \ - --model ${LPATH}/model.dat \ - --topicdoc ${LPATH}/final.gamma \ - --topicword ${LPATH}/final.beta \ - --lpath ${LPATH} \ - --ldapath ${LDAPATH} \ - --luser ${LUSER} \ - --mpicmd ${MPI_CMD} \ - --proccount ${PROCESS_COUNT} \ - --topiccount ${TOPIC_COUNT} \ - --nodes ${nodes} \ - --scored ${HDFS_SCORED_CONNECTS} \ - --tempmodel ${HDFS_MODEL} \ - --threshold ${TOL} \ - --maxresults ${MAXRESULTS} - -wait - -# move results to hdfs. -cd ${LPATH} -hadoop fs -getmerge ${HDFS_SCORED_CONNECTS}/part-* ${DSOURCE}_results.csv && hadoop fs -moveFromLocal \ - ${DSOURCE}_results.csv ${HDFS_SCORED_CONNECTS}/${DSOURCE}_results.csv \ No newline at end of file From d643c53feff6183059a7ef8e4d6c472471d0e217 Mon Sep 17 00:00:00 2001 From: natedogs911 Date: Tue, 22 Nov 2016 17:41:05 -0800 Subject: [PATCH 22/22] removed hdfs_setup.sh --- spot-setup/hdfs_setup.sh | 35 ----------------------------------- 1 file changed, 35 deletions(-) delete mode 100755 spot-setup/hdfs_setup.sh diff --git a/spot-setup/hdfs_setup.sh b/spot-setup/hdfs_setup.sh deleted file mode 100755 index 4d61574..0000000 --- a/spot-setup/hdfs_setup.sh +++ /dev/null @@ -1,35 +0,0 @@ -#!/bin/bash - -DSOURCES=('flow' 'dns' 'proxy') -DFOLDERS=('binary' 'hive' 'stage') -source /etc/spot.conf - -# -# creating HDFS user's folder -# -hadoop fs -mkdir ${HUSER} -hadoop fs -chown ${USER}:supergroup ${HUSER} - -for d in "${DSOURCES[@]}" -do - echo "creating /$d" - hadoop fs -mkdir ${HUSER}/$d - for f in "${DFOLDERS[@]}" - do - echo "creating $d/$f" - hadoop fs -mkdir ${HUSER}/$d/$f - done -done - -# -# create hive tables -# -#configure / create catalog -hive -e "CREATE DATABASE ${DBNAME}" - -for d in "${DSOURCES[@]}" -do - hive -hiveconf huser=${HUSER} -hiveconf dbname=${DBNAME} -f create_${d}_avro_parquet.hql -done - -