DEV Community

drake
drake

Posted on

python消费kafka

from kafka import KafkaConsumer
import json
import logging
from typing import Optional

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class KafkaMessageConsumer:
    def __init__(self, 
                 topic: str,
                 bootstrap_servers: str = 'localhost:9092',
                 group_id: str = 'my-consumer-group',
                 auto_offset_reset: str = 'earliest'):
        """
        初始化Kafka消费者

        Args:
            topic: 要消费的topic名称
            bootstrap_servers: Kafka服务器地址
            group_id: 消费者组ID
            auto_offset_reset: 偏移量重置策略 ('earliest', 'latest')
        """
        self.topic = topic
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            auto_offset_reset=auto_offset_reset,
            enable_auto_commit=True,
            auto_commit_interval_ms=1000,
            value_deserializer=lambda x: json.loads(x.decode('utf-8')) if x else None,
            key_deserializer=lambda x: x.decode('utf-8') if x else None
        )
        logger.info(f"Kafka消费者已初始化,订阅topic: {topic}")

    def consume_messages(self, max_messages: Optional[int] = None):
        """
        消费消息

        Args:
            max_messages: 最大消费消息数量,None表示无限制
        """
        try:
            message_count = 0
            logger.info("开始消费消息...")

            for message in self.consumer:
                # 处理消息
                self.process_message(message)

                message_count += 1
                if max_messages and message_count >= max_messages:
                    logger.info(f"已消费{max_messages}条消息,停止消费")
                    break

        except KeyboardInterrupt:
            logger.info("接收到中断信号,停止消费")
        except Exception as e:
            logger.error(f"消费消息时发生错误: {e}")
        finally:
            self.consumer.close()
            logger.info("Kafka消费者已关闭")

    def process_message(self, message):
        """
        处理单条消息

        Args:
            message: Kafka消息对象
        """
        try:
            logger.info(f"收到消息:")
            logger.info(f"  Topic: {message.topic}")
            logger.info(f"  Partition: {message.partition}")
            logger.info(f"  Offset: {message.offset}")
            logger.info(f"  Key: {message.key}")
            logger.info(f"  Value: {message.value}")
            logger.info(f"  Timestamp: {message.timestamp}")

            # 在这里添加你的业务逻辑
            self.handle_business_logic(message.value)

        except Exception as e:
            logger.error(f"处理消息时发生错误: {e}")

    def handle_business_logic(self, message_data):
        """
        处理业务逻辑

        Args:
            message_data: 消息数据
        """
        # 示例:根据消息类型处理不同的业务逻辑
        if isinstance(message_data, dict):
            message_type = message_data.get('type')

            if message_type == 'user_action':
                logger.info(f"处理用户行为: {message_data.get('action')}")
            elif message_type == 'system_event':
                logger.info(f"处理系统事件: {message_data.get('event')}")
            else:
                logger.info(f"处理通用消息: {message_data}")
        else:
            logger.info(f"处理文本消息: {message_data}")

# 使用示例
if __name__ == "__main__":
    # 基本使用
    consumer = KafkaMessageConsumer(
        topic='my-topic',
        bootstrap_servers='localhost:9092',
        group_id='my-consumer-group'
    )

    # 开始消费消息
    consumer.consume_messages()

    # 或者消费指定数量的消息
    # consumer.consume_messages(max_messages=10)


# 高级配置示例
class AdvancedKafkaConsumer:
    def __init__(self, topics, config):
        """
        高级Kafka消费者配置

        Args:
            topics: topic列表
            config: 配置字典
        """
        self.consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=config.get('bootstrap_servers', 'localhost:9092'),
            group_id=config.get('group_id', 'default-group'),
            auto_offset_reset=config.get('auto_offset_reset', 'earliest'),
            enable_auto_commit=config.get('enable_auto_commit', True),
            auto_commit_interval_ms=config.get('auto_commit_interval_ms', 1000),
            session_timeout_ms=config.get('session_timeout_ms', 30000),
            heartbeat_interval_ms=config.get('heartbeat_interval_ms', 10000),
            max_poll_records=config.get('max_poll_records', 500),
            fetch_min_bytes=config.get('fetch_min_bytes', 1),
            fetch_max_wait_ms=config.get('fetch_max_wait_ms', 500),
            value_deserializer=self.deserialize_value,
            key_deserializer=self.deserialize_key
        )

    def deserialize_value(self, value):
        """自定义值反序列化"""
        if not value:
            return None
        try:
            return json.loads(value.decode('utf-8'))
        except json.JSONDecodeError:
            return value.decode('utf-8')

    def deserialize_key(self, key):
        """自定义键反序列化"""
        return key.decode('utf-8') if key else None

    def consume_with_batch_processing(self, batch_size=100):
        """批量处理消息"""
        batch = []

        try:
            for message in self.consumer:
                batch.append(message)

                if len(batch) >= batch_size:
                    self.process_batch(batch)
                    batch = []

        except KeyboardInterrupt:
            if batch:  # 处理剩余的消息
                self.process_batch(batch)
        finally:
            self.consumer.close()

    def process_batch(self, messages):
        """批量处理消息"""
        logger.info(f"批量处理 {len(messages)} 条消息")
        for message in messages:
            # 处理每条消息
            pass

# 错误处理和重试机制示例
class RobustKafkaConsumer:
    def __init__(self, topic, config):
        self.topic = topic
        self.config = config
        self.consumer = None
        self.max_retries = config.get('max_retries', 3)

    def create_consumer(self):
        """创建消费者"""
        return KafkaConsumer(
            self.topic,
            **self.config,
            value_deserializer=lambda x: json.loads(x.decode('utf-8')) if x else None
        )

    def consume_with_retry(self):
        """带重试机制的消费"""
        retry_count = 0

        while retry_count <= self.max_retries:
            try:
                self.consumer = self.create_consumer()

                for message in self.consumer:
                    try:
                        self.process_message_with_retry(message)
                    except Exception as e:
                        logger.error(f"处理消息失败: {e}")

            except Exception as e:
                retry_count += 1
                logger.error(f"消费者连接失败 (重试 {retry_count}/{self.max_retries}): {e}")

                if retry_count <= self.max_retries:
                    import time
                    time.sleep(2 ** retry_count)  # 指数退避
                else:
                    logger.error("达到最大重试次数,停止消费")
                    break
            finally:
                if self.consumer:
                    self.consumer.close()

    def process_message_with_retry(self, message, max_retries=3):
        """带重试的消息处理"""
        for attempt in range(max_retries + 1):
            try:
                # 处理消息的业务逻辑
                self.handle_message(message)
                break
            except Exception as e:
                if attempt == max_retries:
                    logger.error(f"消息处理最终失败: {e}")
                    # 可以将消息发送到死信队列
                else:
                    logger.warning(f"消息处理失败,重试 {attempt + 1}/{max_retries}: {e}")

    def handle_message(self, message):
        """实际的消息处理逻辑"""
        # 这里实现你的业务逻辑
        pass

Enter fullscreen mode Exit fullscreen mode

Top comments (0)