import json
import time
import logging
from kafka import KafkaConsumer
from kafka.errors import KafkaError
from datetime import datetime
from typing import Optional
from config import config
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Kafka Consumer')
class Consumer:
"""
Kafka消费者基类
当Topic被消费完之后会自动断开连接,结束进程
"""
def __init__(self):
"""
初始化消费者
"""
self.consumer = None
self.message_count = 0
self.start_time = None
self.bootstrap_servers = config.KAFKA_SERVERS
self.topic = 'bg_spider_flow_news' # 直接写死topic名称,或者可以通过参数传入
self.group_id = 'python-consumer-group-1' # 可以根据需要设置消费者组ID
def _create_consumer(self):
"""
创建Kafka消费者(连接服务只重试10次)
"""
num = 0
logger.info("开始创建Kafka消费者...")
while self.consumer is None:
num += 1
try:
self.consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.bootstrap_servers,
group_id=self.group_id,
client_id=self.group_id,
auto_offset_reset='earliest',
# enable_auto_commit=True,
value_deserializer=lambda x: x.decode('utf-8') if x else None,
key_deserializer=lambda x: x.decode('utf-8') if x else None
# Topic没有消息时该参数会导致连接对象退出
# consumer_timeout_ms=1000
)
logger.info("✅ 成功创建Kafka消费者")
except Exception as e:
if num < 10:
logger.warning(f"⚠️ 第 {num} 次尝试创建Kafka消费者失败,重试中...")
time.sleep(2)
else:
logger.error(f"❌ 创建Kafka消费者失败: {e}")
raise
def start_consuming(self, max_messages: Optional[int] = None, show_stats_interval: int = 100,
batch_size: int = 10, batch_timeout_ms: int = 5000):
"""
批处理消费者
max_messages: 限制最大消费消息数
show_stats_interval: 每多少条消息显示一次统计信息
batch_size: 批量处理的消息数量
batch_timeout_ms: 批量获取消息的超时时间(毫秒)
"""
# 创建消费者
self._create_consumer()
if not self.consumer:
logger.error("❌ 消费者未初始化,无法开始消费")
return
self.start_time = datetime.now()
logger.info("=" * 60)
logger.info("🚀 开始消费Kafka消息")
logger.info(f"⏰ 开始时间: {self.start_time}")
logger.info(f"📦 批量大小: {batch_size}, 超时时间: {batch_timeout_ms}ms")
logger.info("=" * 60)
logger.info(f"⏳ 等待消费 Topic '{self.topic}' 的消息...")
try:
while True:
# 使用poll方法批量获取消息
message_batch = self.consumer.poll(
timeout_ms=batch_timeout_ms,
max_records=batch_size
)
if message_batch:
# 处理这批消息
batch_messages = []
for topic_partition, messages in message_batch.items():
for message in messages:
self.message_count += 1
batch_messages.append(message)
if batch_messages:
logger.info(f"🔥 获取到 {len(batch_messages)} 条消息,开始批量处理")
self._process_message_batch(batch_messages)
# 显示统计信息
if self.message_count % show_stats_interval == 0:
self._show_statistics()
# 检查最大消息数限制
if max_messages and self.message_count >= max_messages:
logger.info(f"✅ 已消费{max_messages}条消息,停止消费")
break
else:
# 没有消息时的处理(可选)
logger.info("⏳ 等待新消息...")
time.sleep(2)
except KafkaError as e:
logger.error(f"❌ Kafka错误: {e}")
except Exception as e:
logger.error(f"❌ 消费过程中发生错误: {e}")
# def format_message(self, message_value):
# """
# 格式化消息内容为标准的字典格式
# message_value: Json 等待序列化的json或字符串
# Returns:
# Dict: 格式化后的消息字典
# None: 如果消息内容为空或格式不正确
# """
# message = None
# try:
# if message_value:
# # 尝试解析JSON
# try:
# message = json.loads(message_value)
# except json.JSONDecodeError:
# logger.info(f" 📝 文本内容: {str(message_value)[:100]}{'...' if len(str(message_value)) > 100 else ''}")
# else:
# logger.info(" ⚪ 空消息")
# except Exception as e:
# logger.error(f"❌ 业务逻辑处理失败: {e}")
# return message
def _process_message_batch(self, messages):
"""
批量处理消息
messages: List[ConsumerRecord], 消息列表
"""
try:
logger.info(f"开始批量处理 {len(messages)} 条消息")
# 批量格式化消息
formatted_messages = []
for i, message in enumerate(messages):
logger.info(f"处理第 {self.message_count - len(messages) + i + 1} 条消息,分区: {message.partition}, 偏移量: {message.offset}")
formatted_message = self.format_message(message.value)
if formatted_message:
formatted_messages.append(formatted_message)
if formatted_messages:
# 调用批量业务处理逻辑
self.batch_message_jobs(formatted_messages)
logger.info(f"✅ 批量处理完成,成功处理 {len(formatted_messages)} 条有效消息")
except Exception as e:
logger.error(f"❌ 批量处理消息失败: {e}")
def batch_message_jobs(self, messages):
"""
批量消费消息的特定处理逻辑
messages: List[Dict], 消息内容字典列表
"""
try:
logger.info(f"📦 批量处理 {len(messages)} 条消息")
# 这里可以添加具体的批量业务逻辑处理代码
# 例如:批量插入数据库、批量调用API等
for i, message in enumerate(messages, 1):
logger.info(f"批量消息 {i}: {message}")
# 示例:如果需要批量插入数据库
# self._batch_insert_to_database(messages)
except Exception as e:
logger.error(f"❌ 批量消费者处理失败: {e}")
# def start_consuming(self, max_messages: Optional[int] = None, show_stats_interval: int = 100):
# """
# 开始消费消息
# max_messages: 限制最大消费消息数
# show_stats_interval: 每多少条消息显示一次统计信息
# """
# # 创建消费者
# self._create_consumer()
# if not self.consumer:
# logger.error("❌ 消费者未初始化,无法开始消费")
# return
# self.start_time = datetime.now()
# logger.info("=" * 60)
# logger.info("🚀 开始消费Kafka消息")
# logger.info(f"⏰ 开始时间: {self.start_time}")
# logger.info("=" * 60)
# logger.info(f"📦 self.consumer: {self.consumer}")
# if not self.consumer:
# logger.error("❌ 消费者未初始化,无法开始消费")
# return
# # 使用 poll() 方法检查是否有消息
# message_batch = self.consumer.poll(timeout_ms=1000, max_records=1)
# if not message_batch:
# logger.info(f"⚪ Topic '{self.topic}' 当前没有可消费的消息")
# return
# try:
# for message in self.consumer:
# self.message_count += 1
# self._process_message(message)
# # 显示统计信息
# if self.message_count % show_stats_interval == 0:
# self._show_statistics()
# # 检查最大消息数限制
# if max_messages and self.message_count >= max_messages:
# logger.info(f"✅ 已消费{max_messages}条消息,停止消费")
# break
# except KafkaError as e:
# logger.error(f"❌ Kafka错误: {e}")
# except Exception as e:
# logger.error(f"❌ 消费过程中发生错误: {e}")
def _show_statistics(self):
"""显示统计信息"""
if self.start_time:
elapsed = datetime.now() - self.start_time
rate = self.message_count / elapsed.total_seconds() if elapsed.total_seconds() > 0 else 0
logger.info(f"📊 统计信息: 已消费消息: {self.message_count} 消费速率: {rate:.2f} 条/秒")
# def _process_message(self, message):
# """
# 处理单条消息
# message: Kafkay原始消息对象
# """
# try:
# logger.info(f"处理第 {self.message_count} 条消息,分区: {message.partition}, 偏移量: {message.offset}")
# # 格式化消息内容为标准的字典格式
# message_formated_to_dict = self.format_message(message.value)
# if message_formated_to_dict:
# # 针对每条消息的详细处理逻辑
# self.consumer_per_message(message_formated_to_dict)
# except Exception as e:
# logger.error(f"❌ 处理消息失败: {e}")
def format_message(self, message_value):
"""
格式化消息内容为标准的字典格式
message_value: Json 等待序列化的json或字符串
Returns:
Dict: 格式化后的消息字典
None: 如果消息内容为空或格式不正确
"""
message = None
try:
if message_value:
# 尝试解析JSON
try:
message = json.loads(message_value)
except json.JSONDecodeError:
logger.info(f" 📝 文本内容: {str(message_value)[:100]}{'...' if len(str(message_value)) > 100 else ''}")
else:
logger.info(" ⚪ 空消息")
except Exception as e:
logger.error(f"❌ 业务逻辑处理失败: {e}")
return message
# def consumer_per_message(self, message):
# """
# 消费每条消息的特定处理逻辑
# message: Dict, 消息内容字典
# """
# try:
# # 这里可以添加具体的业务逻辑处理代码
# logger.info(f"消息内容: {message}")
# except Exception as e:
# logger.error(f"❌ 消费者每条消息处理失败: {e}")
For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)