DEV Community

drake
drake

Posted on

kafka 消费者批处理


import json
import time
import logging
from kafka import KafkaConsumer
from kafka.errors import KafkaError
from datetime import datetime
from typing import Optional
from config import config

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Kafka Consumer')

class Consumer:
    """
    Kafka消费者基类
    当Topic被消费完之后会自动断开连接,结束进程
    """
    def __init__(self):
        """
        初始化消费者
        """
        self.consumer = None
        self.message_count = 0
        self.start_time = None
        self.bootstrap_servers = config.KAFKA_SERVERS
        self.topic = 'bg_spider_flow_news'  # 直接写死topic名称,或者可以通过参数传入
        self.group_id = 'python-consumer-group-1'  # 可以根据需要设置消费者组ID

    def _create_consumer(self):
        """
        创建Kafka消费者(连接服务只重试10次)
        """
        num = 0
        logger.info("开始创建Kafka消费者...")
        while self.consumer is None:
            num += 1
            try:                
                self.consumer = KafkaConsumer(
                    self.topic,
                    bootstrap_servers=self.bootstrap_servers,
                    group_id=self.group_id,
                    client_id=self.group_id,
                    auto_offset_reset='earliest',
                    # enable_auto_commit=True,
                    value_deserializer=lambda x: x.decode('utf-8') if x else None,
                    key_deserializer=lambda x: x.decode('utf-8') if x else None
                    # Topic没有消息时该参数会导致连接对象退出
                    # consumer_timeout_ms=1000
                )

                logger.info("✅ 成功创建Kafka消费者")

            except Exception as e:
                if num < 10:
                    logger.warning(f"⚠️ 第 {num} 次尝试创建Kafka消费者失败,重试中...")
                    time.sleep(2)
                else:
                    logger.error(f"❌ 创建Kafka消费者失败: {e}")
                    raise

    def start_consuming(self, max_messages: Optional[int] = None, show_stats_interval: int = 100, 
                       batch_size: int = 10, batch_timeout_ms: int = 5000):
        """
        批处理消费者
        max_messages: 限制最大消费消息数
        show_stats_interval: 每多少条消息显示一次统计信息
        batch_size: 批量处理的消息数量
        batch_timeout_ms: 批量获取消息的超时时间(毫秒)
        """
        # 创建消费者
        self._create_consumer()
        if not self.consumer:
            logger.error("❌ 消费者未初始化,无法开始消费")
            return

        self.start_time = datetime.now()
        logger.info("=" * 60)
        logger.info("🚀 开始消费Kafka消息")
        logger.info(f"⏰ 开始时间: {self.start_time}")
        logger.info(f"📦 批量大小: {batch_size}, 超时时间: {batch_timeout_ms}ms")
        logger.info("=" * 60)

        logger.info(f"⏳ 等待消费 Topic '{self.topic}' 的消息...")

        try:
            while True:
                # 使用poll方法批量获取消息
                message_batch = self.consumer.poll(
                    timeout_ms=batch_timeout_ms, 
                    max_records=batch_size
                )

                if message_batch:
                    # 处理这批消息
                    batch_messages = []
                    for topic_partition, messages in message_batch.items():
                        for message in messages:
                            self.message_count += 1
                            batch_messages.append(message)

                    if batch_messages:
                        logger.info(f"🔥 获取到 {len(batch_messages)} 条消息,开始批量处理")
                        self._process_message_batch(batch_messages)

                        # 显示统计信息
                        if self.message_count % show_stats_interval == 0:
                            self._show_statistics()

                        # 检查最大消息数限制
                        if max_messages and self.message_count >= max_messages:
                            logger.info(f"✅ 已消费{max_messages}条消息,停止消费")
                            break
                else:
                    # 没有消息时的处理(可选)
                    logger.info("⏳ 等待新消息...")
                    time.sleep(2)

        except KafkaError as e:
            logger.error(f"❌ Kafka错误: {e}")
        except Exception as e:
            logger.error(f"❌ 消费过程中发生错误: {e}")



    # def format_message(self, message_value):
    #     """
    #     格式化消息内容为标准的字典格式
    #     message_value: Json 等待序列化的json或字符串
    #     Returns:
    #         Dict: 格式化后的消息字典
    #         None: 如果消息内容为空或格式不正确
    #     """
    #     message = None
    #     try:
    #         if message_value:
    #             # 尝试解析JSON
    #             try:
    #                 message = json.loads(message_value)
    #             except json.JSONDecodeError:
    #                 logger.info(f"   📝 文本内容: {str(message_value)[:100]}{'...' if len(str(message_value)) > 100 else ''}")
    #         else:
    #             logger.info("   ⚪ 空消息")

    #     except Exception as e:
    #         logger.error(f"❌ 业务逻辑处理失败: {e}")
    #     return message

    def _process_message_batch(self, messages):
        """
        批量处理消息
        messages: List[ConsumerRecord], 消息列表
        """
        try:
            logger.info(f"开始批量处理 {len(messages)} 条消息")

            # 批量格式化消息
            formatted_messages = []
            for i, message in enumerate(messages):
                logger.info(f"处理第 {self.message_count - len(messages) + i + 1} 条消息,分区: {message.partition}, 偏移量: {message.offset}")
                formatted_message = self.format_message(message.value)
                if formatted_message:
                    formatted_messages.append(formatted_message)

            if formatted_messages:
                # 调用批量业务处理逻辑
                self.batch_message_jobs(formatted_messages)

            logger.info(f"✅ 批量处理完成,成功处理 {len(formatted_messages)} 条有效消息")

        except Exception as e:
            logger.error(f"❌ 批量处理消息失败: {e}")

    def batch_message_jobs(self, messages):
        """
        批量消费消息的特定处理逻辑
        messages: List[Dict], 消息内容字典列表
        """
        try:
            logger.info(f"📦 批量处理 {len(messages)} 条消息")

            # 这里可以添加具体的批量业务逻辑处理代码
            # 例如:批量插入数据库、批量调用API等
            for i, message in enumerate(messages, 1):
                logger.info(f"批量消息 {i}: {message}")

            # 示例:如果需要批量插入数据库
            # self._batch_insert_to_database(messages)

        except Exception as e:
            logger.error(f"❌ 批量消费者处理失败: {e}")

    # def start_consuming(self, max_messages: Optional[int] = None, show_stats_interval: int = 100):
    #     """
    #     开始消费消息
    #     max_messages: 限制最大消费消息数
    #     show_stats_interval: 每多少条消息显示一次统计信息
    #     """
    #     # 创建消费者
    #     self._create_consumer()
    #     if not self.consumer:
    #         logger.error("❌ 消费者未初始化,无法开始消费")
    #         return

    #     self.start_time = datetime.now()
    #     logger.info("=" * 60)
    #     logger.info("🚀 开始消费Kafka消息")
    #     logger.info(f"⏰ 开始时间: {self.start_time}")
    #     logger.info("=" * 60)
    #     logger.info(f"📦 self.consumer: {self.consumer}")
    #     if not self.consumer:
    #         logger.error("❌ 消费者未初始化,无法开始消费")
    #         return
    #     # 使用 poll() 方法检查是否有消息
    #     message_batch = self.consumer.poll(timeout_ms=1000, max_records=1)
    #     if not message_batch:
    #         logger.info(f"⚪ Topic '{self.topic}' 当前没有可消费的消息")
    #         return
    #     try:
    #         for message in self.consumer:
    #             self.message_count += 1
    #             self._process_message(message)

    #             # 显示统计信息
    #             if self.message_count % show_stats_interval == 0:
    #                 self._show_statistics()

    #             # 检查最大消息数限制
    #             if max_messages and self.message_count >= max_messages:
    #                 logger.info(f"✅ 已消费{max_messages}条消息,停止消费")
    #                 break

    #     except KafkaError as e:
    #         logger.error(f"❌ Kafka错误: {e}")
    #     except Exception as e:
    #         logger.error(f"❌ 消费过程中发生错误: {e}")

    def _show_statistics(self):
        """显示统计信息"""
        if self.start_time:
            elapsed = datetime.now() - self.start_time
            rate = self.message_count / elapsed.total_seconds() if elapsed.total_seconds() > 0 else 0
            logger.info(f"📊 统计信息: 已消费消息: {self.message_count} 消费速率: {rate:.2f} 条/秒")

    # def _process_message(self, message):
    #     """
    #     处理单条消息
    #     message: Kafkay原始消息对象
    #     """
    #     try:
    #         logger.info(f"处理第 {self.message_count} 条消息,分区: {message.partition}, 偏移量: {message.offset}")
    #         # 格式化消息内容为标准的字典格式
    #         message_formated_to_dict = self.format_message(message.value)
    #         if message_formated_to_dict:
    #             # 针对每条消息的详细处理逻辑
    #             self.consumer_per_message(message_formated_to_dict)
    #     except Exception as e:
    #         logger.error(f"❌ 处理消息失败: {e}")

    def format_message(self, message_value):
        """
        格式化消息内容为标准的字典格式
        message_value: Json 等待序列化的json或字符串
        Returns:
            Dict: 格式化后的消息字典
            None: 如果消息内容为空或格式不正确
        """
        message = None
        try:
            if message_value:
                # 尝试解析JSON
                try:
                    message = json.loads(message_value)
                except json.JSONDecodeError:
                    logger.info(f"   📝 文本内容: {str(message_value)[:100]}{'...' if len(str(message_value)) > 100 else ''}")
            else:
                logger.info("   ⚪ 空消息")

        except Exception as e:
            logger.error(f"❌ 业务逻辑处理失败: {e}")
        return message

    # def consumer_per_message(self, message):
    #     """
    #     消费每条消息的特定处理逻辑
    #     message: Dict, 消息内容字典
    #     """
    #     try:
    #         # 这里可以添加具体的业务逻辑处理代码
    #         logger.info(f"消息内容: {message}")
    #     except Exception as e:
    #         logger.error(f"❌ 消费者每条消息处理失败: {e}")

Enter fullscreen mode Exit fullscreen mode

Top comments (0)