Install kafka-python package
Python client for the Apache Kafka distributed stream processing system.
pip install kafka-python
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(bootstrap_servers=[ipaddress:port])
topic_names = ['topic1', 'topic2', 'topic3' , 'topic3']
def create_topics(topic_names):
existing_topic_list = consumer.topics()
print(list(consumer.topics()))
topic_list = []
for topic in topic_names:
if topic not in existing_topic_list:
print('Topic : {} added '.format(topic))
topic_list.append(NewTopic(name=topic, num_partitions=3, replication_factor=3))
else:
print('Topic : {topic} already exist ')
try:
if topic_list:
admin_client.create_topics(new_topics=topic_list, validate_only=False)
print("Topic Created Successfully")
else:
print("Topic Exist")
except TopicAlreadyExistsError as e:
print("Topic Already Exist")
except Exception as e:
print(e)
def delete_topics(topic_names):
try:
admin_client.delete_topics(topics=topic_names)
print("Topic Deleted Successfully")
except UnknownTopicOrPartitionError as e:
print("Topic Doesn't Exist")
except Exception as e:
print(e)
consumer = KafkaConsumer(
bootstrap_servers = "ip_address",
)
create_topics(topic_names)
Top comments (3)
Nice snip but not sure if u missed it but u didnt initialize consumer and didnt assign existing_topic_list to receive the topics from consumer
The code snippet has been modified.
Sorry for missing that code snippet.