from kafka import KafkaConsumer
import json
import logging
from typing import Optional
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class KafkaMessageConsumer:
def __init__(self,
topic: str,
bootstrap_servers: str = 'localhost:9092',
group_id: str = 'my-consumer-group',
auto_offset_reset: str = 'earliest'):
"""
初始化Kafka消费者
Args:
topic: 要消费的topic名称
bootstrap_servers: Kafka服务器地址
group_id: 消费者组ID
auto_offset_reset: 偏移量重置策略 ('earliest', 'latest')
"""
self.topic = topic
self.consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=True,
auto_commit_interval_ms=1000,
value_deserializer=lambda x: json.loads(x.decode('utf-8')) if x else None,
key_deserializer=lambda x: x.decode('utf-8') if x else None
)
logger.info(f"Kafka消费者已初始化,订阅topic: {topic}")
def consume_messages(self, max_messages: Optional[int] = None):
"""
消费消息
Args:
max_messages: 最大消费消息数量,None表示无限制
"""
try:
message_count = 0
logger.info("开始消费消息...")
for message in self.consumer:
# 处理消息
self.process_message(message)
message_count += 1
if max_messages and message_count >= max_messages:
logger.info(f"已消费{max_messages}条消息,停止消费")
break
except KeyboardInterrupt:
logger.info("接收到中断信号,停止消费")
except Exception as e:
logger.error(f"消费消息时发生错误: {e}")
finally:
self.consumer.close()
logger.info("Kafka消费者已关闭")
def process_message(self, message):
"""
处理单条消息
Args:
message: Kafka消息对象
"""
try:
logger.info(f"收到消息:")
logger.info(f" Topic: {message.topic}")
logger.info(f" Partition: {message.partition}")
logger.info(f" Offset: {message.offset}")
logger.info(f" Key: {message.key}")
logger.info(f" Value: {message.value}")
logger.info(f" Timestamp: {message.timestamp}")
# 在这里添加你的业务逻辑
self.handle_business_logic(message.value)
except Exception as e:
logger.error(f"处理消息时发生错误: {e}")
def handle_business_logic(self, message_data):
"""
处理业务逻辑
Args:
message_data: 消息数据
"""
# 示例:根据消息类型处理不同的业务逻辑
if isinstance(message_data, dict):
message_type = message_data.get('type')
if message_type == 'user_action':
logger.info(f"处理用户行为: {message_data.get('action')}")
elif message_type == 'system_event':
logger.info(f"处理系统事件: {message_data.get('event')}")
else:
logger.info(f"处理通用消息: {message_data}")
else:
logger.info(f"处理文本消息: {message_data}")
# 使用示例
if __name__ == "__main__":
# 基本使用
consumer = KafkaMessageConsumer(
topic='my-topic',
bootstrap_servers='localhost:9092',
group_id='my-consumer-group'
)
# 开始消费消息
consumer.consume_messages()
# 或者消费指定数量的消息
# consumer.consume_messages(max_messages=10)
# 高级配置示例
class AdvancedKafkaConsumer:
def __init__(self, topics, config):
"""
高级Kafka消费者配置
Args:
topics: topic列表
config: 配置字典
"""
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=config.get('bootstrap_servers', 'localhost:9092'),
group_id=config.get('group_id', 'default-group'),
auto_offset_reset=config.get('auto_offset_reset', 'earliest'),
enable_auto_commit=config.get('enable_auto_commit', True),
auto_commit_interval_ms=config.get('auto_commit_interval_ms', 1000),
session_timeout_ms=config.get('session_timeout_ms', 30000),
heartbeat_interval_ms=config.get('heartbeat_interval_ms', 10000),
max_poll_records=config.get('max_poll_records', 500),
fetch_min_bytes=config.get('fetch_min_bytes', 1),
fetch_max_wait_ms=config.get('fetch_max_wait_ms', 500),
value_deserializer=self.deserialize_value,
key_deserializer=self.deserialize_key
)
def deserialize_value(self, value):
"""自定义值反序列化"""
if not value:
return None
try:
return json.loads(value.decode('utf-8'))
except json.JSONDecodeError:
return value.decode('utf-8')
def deserialize_key(self, key):
"""自定义键反序列化"""
return key.decode('utf-8') if key else None
def consume_with_batch_processing(self, batch_size=100):
"""批量处理消息"""
batch = []
try:
for message in self.consumer:
batch.append(message)
if len(batch) >= batch_size:
self.process_batch(batch)
batch = []
except KeyboardInterrupt:
if batch: # 处理剩余的消息
self.process_batch(batch)
finally:
self.consumer.close()
def process_batch(self, messages):
"""批量处理消息"""
logger.info(f"批量处理 {len(messages)} 条消息")
for message in messages:
# 处理每条消息
pass
# 错误处理和重试机制示例
class RobustKafkaConsumer:
def __init__(self, topic, config):
self.topic = topic
self.config = config
self.consumer = None
self.max_retries = config.get('max_retries', 3)
def create_consumer(self):
"""创建消费者"""
return KafkaConsumer(
self.topic,
**self.config,
value_deserializer=lambda x: json.loads(x.decode('utf-8')) if x else None
)
def consume_with_retry(self):
"""带重试机制的消费"""
retry_count = 0
while retry_count <= self.max_retries:
try:
self.consumer = self.create_consumer()
for message in self.consumer:
try:
self.process_message_with_retry(message)
except Exception as e:
logger.error(f"处理消息失败: {e}")
except Exception as e:
retry_count += 1
logger.error(f"消费者连接失败 (重试 {retry_count}/{self.max_retries}): {e}")
if retry_count <= self.max_retries:
import time
time.sleep(2 ** retry_count) # 指数退避
else:
logger.error("达到最大重试次数,停止消费")
break
finally:
if self.consumer:
self.consumer.close()
def process_message_with_retry(self, message, max_retries=3):
"""带重试的消息处理"""
for attempt in range(max_retries + 1):
try:
# 处理消息的业务逻辑
self.handle_message(message)
break
except Exception as e:
if attempt == max_retries:
logger.error(f"消息处理最终失败: {e}")
# 可以将消息发送到死信队列
else:
logger.warning(f"消息处理失败,重试 {attempt + 1}/{max_retries}: {e}")
def handle_message(self, message):
"""实际的消息处理逻辑"""
# 这里实现你的业务逻辑
pass
For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)