Skip to content
This repository was archived by the owner on May 15, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions oni/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.pyc
126 changes: 114 additions & 12 deletions oni/kafka_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@
from kafka.partitioner.roundrobin import RoundRobinPartitioner
from kafka.common import TopicPartition

#librdkafka kerberos configs
krb_conf_options = {'sasl.mechanisms': 'gssapi',
'security.protocol': 'sasl_plaintext',
'sasl.kerberos.service.name': 'kafka',
'sasl.kerberos.kinit.cmd': 'kinit -k -t "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal}',
'sasl.kerberos.principal': os.getenv('KRB_USER'),
'sasl.kerberos.keytab': os.getenv('KEYTABPATH'),
'sasl.kerberos.min.time.before.relogin': 60000}

class KafkaTopic(object):


Expand All @@ -28,6 +37,7 @@ def _initialize_members(self,topic,server,port,zk_server,zk_port,partitions):
self._num_of_partitions = partitions
self._partitions = []
self._partitioner = None
self._librdkafka_debug = {'debug': 'all'}

# create topic with partitions
self._create_topic()
Expand All @@ -37,10 +47,10 @@ def _create_topic(self):
self._logger.info("Creating topic: {0} with {1} parititions".format(self._topic,self._num_of_partitions))

# Create partitions for the workers.
self._partitions = [ TopicPartition(self._topic,p) for p in range(int(self._num_of_partitions))]
#self._partitions = [ TopicPartition(self._topic,p) for p in range(int(self._num_of_partitions))]

# create partitioner
self._partitioner = RoundRobinPartitioner(self._partitions)
#self._partitioner = RoundRobinPartitioner(self._partitions)

# get script path
zk_conf = "{0}:{1}".format(self._zk_server,self._zk_port)
Expand All @@ -53,19 +63,42 @@ def send_message(self,message,topic_partition):

self._logger.info("Sending message to: Topic: {0} Partition:{1}".format(self._topic,topic_partition))
kafka_brokers = '{0}:{1}'.format(self._server,self._port)
producer = KafkaProducer(bootstrap_servers=[kafka_brokers],api_version_auto_timeout_ms=3600000)
future = producer.send(self._topic,message,partition=topic_partition)
self._producer_conf = {'bootstrap.servers': kafka_brokers,
'session.timeout.ms': 6000,
'api.version.request': 'false',
'internal.termination.signal': 0,
'broker.version.fallback': '0.9.0.0',
'log.connection.close': 'false',
'socket.keepalive.enable': 'false',
'default.topic.config': {'request.required.acks': 'all'}}

if os.getenv('ingest_kafka_debug'):
self._logger.info("librdkafka debug: all")
self._producer_conf.update(self._librdkafka_debug)

if os.getenv('KRB_AUTH'):
self._logger.info("Updating Consumer Configuration with Kerberos options")
self._producer_conf.update(krb_conf_options)

def delivery_callback (err, msg):
if err:
self._logger.info('Message failed delivery: {0}'.format(err))
else:
self._logger.info('Message delivered to topic {0} on {1}'.format(msg.topic(), msg.partition()))

producer = confluent_kafka_Producer(**self._producer_conf)
future = producer.produce(self._topic, message.encode('utf-8'), callback=delivery_callback)
producer.poll(0)
producer.flush()
producer.close()

@property
def Topic(self):
return self._topic

@property
def Partition(self):
return self._partitioner.partition(self._topic).partition

#return self._partitioner.partition(self._topic).partition
return 0


class KafkaConsumer(object):
Expand All @@ -76,21 +109,90 @@ def __init__(self,topic,server,port,zk_server,zk_port,partition):

def _initialize_members(self,topic,server,port,zk_server,zk_port,partition):

self._logger = Util.get_logger("ONI.INGEST.KAFKA")
self._topic = topic
self._server = server
self._port = port
self._zk_server = zk_server
self._zk_port = zk_port
self._id = partition
self._librdkafka_debug = {'debug': 'all'}

def start(self):

kafka_brokers = '{0}:{1}'.format(self._server,self._port)
consumer = KC(bootstrap_servers=[kafka_brokers],group_id=self._topic)
partition = [TopicPartition(self._topic,int(self._id))]
consumer.assign(partitions=partition)
consumer.poll()
return consumer
kafka_brokers = '{0}:{1}'.format(self._server,self._port)
self._consumer_test = {'bootstrap.servers': kafka_brokers,
'group.id': self._id}
self._consumer_conf = {'bootstrap.servers': kafka_brokers,
'group.id': self._id,
'internal.termination.signal': 0,
'client.id': 'npsmithx-mac',
'socket.timeout.ms': 30000,
'socket.keepalive.enable': 'true',
'reconnect.backoff.jitter.ms': '6000',
'api.version.request': 'false', 'debug': 'generic',
'broker.version.fallback': '0.9.0.0', 'log.connection.close': 'false',
'default.topic.config': {'auto.commit.enable': 'true', 'auto.commit.interval.ms': '60000', 'auto.offset.reset': 'smallest'}}

if os.getenv('ingest_kafka_debug'):
self._logger.info("librdkafka debug: all")
self._consumer_conf.update(self._librdkafka_debug)

if os.getenv('KRB_AUTH'):
self._logger.info("Updating Consumer Configuration with Kerberos options")
self._consumer_conf.update(krb_conf_options)

consumer = confluent_kafka_Consumer(self._consumer_conf)
subscribed = None

def on_assign (consumer, partitions):
self._logger.info('Assigned: {0}, {1}'.format(len(partitions), partitions))
for p in partitions:
print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset))
p.offset=-1
consumer.assign(partitions)

def on_revoke (consumer, partitions):
self._logger.info('Revoked: {0} {1}'.format(len(partitions), partitions))
for p in partitions:
print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset))
consumer.unassign()

running = True

try:

consumer.subscribe([self._topic], on_assign=on_assign, on_revoke=on_revoke)
self._logger.info('subscribing to ' + self._topic)

while running:
print "polling"
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
self._logger.info('{0} {1} reached end at offset {2}'.format(msg.topic(), msg.partition(), msg.offset()))
continue
elif msg.error():
raise KafkaException(msg.error())
SystemExit
else:
return msg

except KeyboardInterrupt:
self._logger.info('User interrupted')
raise SystemExit
except:
self._logger.info('Unexpected error:, {0}'.format(sys.exc_info()[0]))
raise SystemExit
finally:
self._logger.info('closing down consumer')
consumer.close()

def stop(self):
running = False

@property
def Topic(self):
Expand Down
4 changes: 2 additions & 2 deletions oni/kerberos.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ def __init__(self):

self._kinit_args = [self._kinit,self._kinitopts,self._keytab,self._krb_user]

def authenticate(self):
def authenticate(self):

kinit = subprocess.Popen(self._kinit_args, stderr = subprocess.PIPE)
kinit = subprocess.Popen(self._kinit_args, shell=True, stderr = subprocess.PIPE)
output,error = kinit.communicate()
if not kinit.returncode == 0:
if error:
Expand Down
4 changes: 2 additions & 2 deletions pipelines/dns/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type):
def start(self):

self._logger.info("Listening topic:{0}".format(self.kafka_consumer.Topic))
for message in self.kafka_consumer.start():
self._new_file(message.value)
for message in [self.kafka_consumer.start()]:
self._new_file(message.value())

def _new_file(self,file):

Expand Down
4 changes: 2 additions & 2 deletions pipelines/flow/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type):
def start(self):

self._logger.info("Listening topic:{0}".format(self.kafka_consumer.Topic))
for message in self.kafka_consumer.start():
self._new_file(message.value)
for message in [self.kafka_consumer.start()]:
self._new_file(message.value())

def _new_file(self,file):

Expand Down
8 changes: 8 additions & 0 deletions start_ingest_standalone.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
#!/bin/bash

source /etc/duxbay.conf

export KRB_AUTH
export KINITPATH
export KINITOPTS
export KEYTABPATH
export KRB_USER

#-----------------------------------------------------------------------------------
# Validate parameters.
#-----------------------------------------------------------------------------------
Expand Down