Skip to content
This repository was archived by the owner on May 15, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 0 additions & 44 deletions spot-ingest/ingest_conf.json

This file was deleted.

46 changes: 24 additions & 22 deletions spot-ingest/master_collector.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
#!/bin/env python

import argparse
import os
import json
import sys
from common.utils import Util
from common.kerberos import Kerberos
from common.kafka_client import KafkaTopic
import datetime
import datetime
import ConfigParser

# get master configuration.
script_path = os.path.dirname(os.path.abspath(__file__))
conf_file = "{0}/ingest_conf.json".format(script_path)
master_conf = json.loads(open (conf_file).read())
# initialize ConfigParser
conf_file = '/etc/spot.conf'
master_conf = ConfigParser.SafeConfigParser()
master_conf.read(conf_file)

def main():

Expand All @@ -30,43 +29,46 @@ def start_collector(type,workers_num,id=None):

# generate ingest id
ingest_id = str(datetime.datetime.time(datetime.datetime.now())).replace(":","_").replace(".","_")

# create logger.
logger = Util.get_logger("SPOT.INGEST")

# validate the given configuration exists in ingest_conf.json.
if not type in master_conf["pipelines"]:
logger.error("'{0}' type is not a valid configuration.".format(type));
try:
master_conf.get(type, 'type')
except ConfigParser.NoSectionError:
logger.error("'{0}' type is not a valid configuration.".format(type))
sys.exit(1)

# validate the type is a valid module.
if not Util.validate_data_source(master_conf["pipelines"][type]["type"]):
logger.error("'{0}' type is not configured. Please check you ingest conf file".format(master_conf["pipelines"][type]["type"]));
if not Util.validate_data_source(master_conf.get(type, "type")):
logger.error("{0} type is not configured. Please check /etc/spot.conf".format(type))
sys.exit(1)

# validate if kerberos authentication is required.
if os.getenv('KRB_AUTH'):
if master_conf.get('kerberos', 'KRB_AUTH'):
kb = Kerberos()
kb.authenticate()

# kafka server info.
logger.info("Initializing kafka instance")
k_server = master_conf["kafka"]['kafka_server']
k_port = master_conf["kafka"]['kafka_port']
k_server = master_conf.get('ingest', 'kafka_server')
k_port = master_conf.get('ingest', 'kafka_port')

# required zookeeper info.
zk_server = master_conf["kafka"]['zookeper_server']
zk_port = master_conf["kafka"]['zookeper_port']
zk_server = master_conf.get('ingest', 'zookeeper_server')
zk_port = master_conf.get('ingest', 'zookeeper_port')

topic = "SPOT-INGEST-{0}_{1}".format(type,ingest_id) if not id else id
kafka = KafkaTopic(topic,k_server,k_port,zk_server,zk_port,workers_num)

# create a collector instance based on data source type.
logger.info("Starting {0} ingest instance".format(topic))
module = __import__("pipelines.{0}.collector".format(master_conf["pipelines"][type]["type"]),fromlist=['Collector'])
module = __import__("pipelines.{0}.collector".format(master_conf.get(type, 'type')),fromlist=['Collector'])

# start collector.
ingest_collector = module.Collector(master_conf['hdfs_app_path'],kafka,type)
hdfs_app_path = master_conf.get('ingest', 'hdfs_app_path')
ingest_collector = module.Collector(hdfs_app_path,kafka,type)
ingest_collector.start()

if __name__=='__main__':
Expand Down
79 changes: 40 additions & 39 deletions spot-ingest/pipelines/dns/collector.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
#/bin/env python
#!/bin/env python

import time
import os
import subprocess
import json
import logging
import ConfigParser
from multiprocessing import Process
from common.utils import Util
from common.file_collector import FileWatcher
from multiprocessing import Pool
from common.kafka_client import KafkaTopic


class Collector(object):

def __init__(self,hdfs_app_path,kafka_topic,conf_type):

self._initialize_members(hdfs_app_path,kafka_topic,conf_type)
def __init__(self, hdfs_app_path, kafka_topic, conf_type):

self._initialize_members(hdfs_app_path, kafka_topic, conf_type)

def _initialize_members(self, hdfs_app_path, kafka_topic, conf_type):

def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type):

# getting parameters.
self._logger = logging.getLogger('SPOT.INGEST.DNS')
self._hdfs_app_path = hdfs_app_path
Expand All @@ -28,63 +29,64 @@ def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type):
self._script_path = os.path.dirname(os.path.abspath(__file__))

# read dns configuration.
conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path)))
conf = json.loads(open(conf_file).read())
self._conf = conf["pipelines"][conf_type]
conf_file = "/etc/spot.conf"
self._conf = ConfigParser.SafeConfigParser()
self._conf.read(conf_file)

# set configuration.
self._collector_path = self._conf['collector_path']
self._collector_path = self._conf.get(conf_type, 'collector_path')
self._dsource = 'dns'
self._hdfs_root_path = "{0}/{1}".format(hdfs_app_path, self._dsource)

# set configuration.
self._pkt_num = self._conf['pkt_num']
self._pcap_split_staging = self._conf['pcap_split_staging']
self._supported_files = self._conf['supported_files']
self._pkt_num = self._conf.get(conf_type, 'pkt_num')
self._pcap_split_staging = self._conf.get(conf_type, 'pcap_split_staging')
self._supported_files = self._conf.get(conf_type, 'supported_files')

# create collector watcher
self._watcher = FileWatcher(self._collector_path,self._supported_files)
# create collector watcher
self._watcher = FileWatcher(self._collector_path, self._supported_files)

# Multiprocessing.
self._processes = conf["collector_processes"]
self._ingestion_interval = conf["ingestion_interval"]
# Multiprocessing.
self._processes = self._conf.get(conf_type, 'collector_processes')
self._ingestion_interval = self._conf('ingest', 'ingestion_interval')
self._pool = Pool(processes=self._processes)

def start(self):

self._logger.info("Starting DNS ingest")
self._logger.info("Starting DNS ingest")
self._watcher.start()

try:
while True:
self._ingest_files_pool()
self._ingest_files_pool()
time.sleep(self._ingestion_interval)

except KeyboardInterrupt:
self._logger.info("Stopping DNS collector...")
Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger)
self._logger.info("Stopping DNS collector...")
Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger)
self._watcher.stop()
self._pool.terminate()
self._pool.close()
self._pool.close()
self._pool.join()
SystemExit("Ingest finished...")


def _ingest_files_pool(self):
def _ingest_files_pool(self):

if self._watcher.HasFiles:

for x in range(0,self._processes):
file = self._watcher.GetNextFile()
resutl = self._pool.apply_async(ingest_file,args=(file,self._pkt_num,self._pcap_split_staging,self._kafka_topic.Partition,self._hdfs_root_path ,self._kafka_topic.Topic,self._kafka_topic.BootstrapServers,))
#resutl.get() # to debug add try and catch.
if not self._watcher.HasFiles: break
if not self._watcher.HasFiles: break
return True


def ingest_file(file,pkt_num,pcap_split_staging, partition,hdfs_root_path,topic,kafka_servers):

logger = logging.getLogger('SPOT.INGEST.FLOW.{0}'.format(os.getpid()))

try:
# get file name and date.
org_file = file
Expand All @@ -107,27 +109,26 @@ def ingest_file(file,pkt_num,pcap_split_staging, partition,hdfs_root_path,topic,
pcap_date_path = file_date[-14:-6]

# hdfs path with timestamp.
hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path,pcap_date_path,pcap_hour)
hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path, pcap_date_path, pcap_hour)

# create hdfs path.
Util.creat_hdfs_folder(hdfs_path,logger)
Util.creat_hdfs_folder(hdfs_path, logger)

# load file to hdfs.
hadoop_pcap_file = "{0}/{1}".format(hdfs_path,file)
Util.load_to_hdfs(os.path.join(currdir,file),hadoop_pcap_file,logger)
hadoop_pcap_file = "{0}/{1}".format(hdfs_path, file)
Util.load_to_hdfs(os.path.join(currdir, file), hadoop_pcap_file, logger)

# create event for workers to process the file.
logger.info( "Sending split file to worker number: {0}".format(partition))
KafkaTopic.SendMessage(hadoop_pcap_file,kafka_servers,topic,partition)
logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic))
KafkaTopic.SendMessage(hadoop_pcap_file, kafka_servers, topic, partition)
logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file, topic))


logger.info("Removing file: {0}".format(org_file))
rm_big_file = "rm {0}".format(org_file)
Util.execute_cmd(rm_big_file,logger)
Util.execute_cmd(rm_big_file, logger)

except Exception as err:

logger.error("There was a problem, please check the following error message:{0}".format(err.message))
logger.error("Exception: {0}".format(err))

21 changes: 10 additions & 11 deletions spot-ingest/pipelines/dns/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import datetime
import subprocess
import json
import ConfigParser
import os
from multiprocessing import Process
from common.utils import Util
Expand All @@ -12,7 +12,7 @@
class Worker(object):

def __init__(self,db_name,hdfs_app_path,kafka_consumer,conf_type,processes=None):

self._initialize_members(db_name,hdfs_app_path,kafka_consumer,conf_type)

def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type):
Expand All @@ -24,13 +24,12 @@ def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type):
self._hdfs_app_path = hdfs_app_path

# read proxy configuration.
self._script_path = os.path.dirname(os.path.abspath(__file__))
conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path)))
conf = json.loads(open(conf_file).read())
self._conf = conf["pipelines"][conf_type]
conf_file = "/etc/spot.conf"
self._conf = ConfigParser.SafeConfigParser()
self._conf.read(conf_file)

self._process_opt = self._conf['process_opt']
self._local_staging = self._conf['local_staging']
self._process_opt = self._conf.get(conf_type, 'process_opt')
self._local_staging = self._conf.get(conf_type, 'local_staging')
self.kafka_consumer = kafka_consumer

def start(self):
Expand All @@ -42,9 +41,9 @@ def start(self):
def _new_file(self,file):

self._logger.info("-------------------------------------- New File received --------------------------------------")
self._logger.info("File: {0} ".format(file))
self._logger.info("File: {0} ".format(file))
p = Process(target=self._process_new_file, args=(file,))
p.start()
p.start()

def _process_new_file(self,file):

Expand Down Expand Up @@ -81,7 +80,7 @@ def _process_new_file(self,file):
self._logger.info("Moving data to staging: {0}".format(mv_to_staging))
Util.execute_cmd(mv_to_staging,self._logger)

#load to avro
# load to avro
load_to_avro_cmd = "hive -hiveconf dbname={0} -hiveconf y={1} -hiveconf m={2} -hiveconf d={3} -hiveconf h={4} -hiveconf data_location='{5}' -f pipelines/dns/load_dns_avro_parquet.hql".format(self._db_name,binary_year,binary_month,binary_day,binary_hour,hdfs_staging_path)

self._logger.info("Loading data to hive: {0}".format(load_to_avro_cmd))
Expand Down
Loading