-> Download kafka and untar:
wget http://mirrors.estointernet.in/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -xvf ./kafka_2.11-2.1.0.tgz
-> Start Zookeeper service
bin/zookeeper-server-start.sh config/zookeeper.properties
-> Start Kafka service
bin/kafka-server-start.sh config/server.properties
-> Create a topic named obs_parser
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic obs3-meta --partitions 2 --replication-factor 1
-> Start a Producer to send messages
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic obs3-meta
-> Start a Consumer to receive messages
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic obs3_meta
-> Ingest
: To write into the queue:
"""
Routines to write router info into Kafka Message Queue through resource uuid.
"""
from confluent_kafka import Producer
import logging
from settings import kafka_host, kafka_port, kafka_topic
logger = logging.getLogger(__name__)
def ingest(parsed_json):
def delivery_msg(err, msg):
if err:
logger.error(f'obs3-meta kafka message failed delivery: {err}\n')
p = Producer({'bootstrap.servers': f'{kafka_host}:{kafka_port}'})
p.poll(0)
p.produce(kafka_topic, str(msg), callback=delivery_msg)
p.flush()
-> Egest
: To read from the queue:
from confluent_kafka import Consumer, KafkaError
import sys
import logging
from settings import kafka_host, kafka_port, kafka_topic
logger = logging.getLogger(__name__)
def egest(uuid):
c = Consumer({
'bootstrap.servers': f'{kafka_host}:{kafka_port}',
'group.id': 'console-consumer-8436',
'auto.offset.reset': 'earliest'
})
c.subscribe([kafka_topic])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print(f'Consumer error: {msg.error()}')
continue
if uuid in msg.value().decode('utf-8'):
parsed_json = msg_value().decode('utf-8')
break
c.close()
if __name__ == '__main__':
try:
uuid = 'f70ab2e6-ca07-43b4-9586-de1c9fb45584'
egest(uuid)
except Exception as e:
logger.error(f'Processor exit with: {e}', exc_info=True)
sys.exit(1) # exit with error
Top comments (0)