Kafka4.0 可观测性最佳实践

    Kafka4.0 介绍

    Kafka4.0 的重大变革 —— KRaft 模式。Kafka4.0 最具革命性的变化,默认运行在 KRaft(Kafka Raft)模式下,彻底摒弃了对 Apache ZooKeeper 的依赖。KRaft 模式的引入,可谓是 Kafka 架构演进的一次重大飞跃。它基于 Raft 一致性算法构建共识机制,将元数据管理功能巧妙地集成到 Kafka 自身的体系之中,从而实现了对 ZooKeeper 的无缝替换。

    主要优势:

    • 简化部署与运维流程:运维人员从此无需再为搭建和维护复杂的 ZooKeeper 集群耗费大量精力,大大降低了整体的运营开销。新的架构设计极大地简化了系统的复杂性,使得 Kafka 的安装、配置以及日常管理工作变得更加直观、高效,即使是新手也能轻松上手。
    • 显著增强可扩展性:在 KRaft 模式下,Kafka 集群的扩展性得到了进一步的提升。新增 Broker 节点的操作变得更加简便快捷,能够更好地适应大规模数据处理场景下,对系统资源进行动态调整的需求。无论是应对业务高峰期的数据洪峰,还是随着业务增长逐步扩展集群规模,KRaft 模式都能游刃有余。
    • 提升系统性能与稳定性:去除 ZooKeeper 这一外部依赖后,Kafka 在元数据操作的响应速度和一致性方面表现得更加出色。特别是在高并发写入和读取的场景中,系统的稳定性和可靠性得到了显著增强,有效减少了因外部组件故障而可能引发的单点问题,为企业级应用提供了更加坚实可靠的底层支撑。

    观测云

    观测云是一款专为 IT 工程师打造的全链路可观测产品,它集成了基础设施监控、应用程序性能监控和日志管理,为整个技术栈提供实时可观察性。这款产品能够帮助工程师全面了解端到端的用户体验追踪,了解应用内函数的每一次调用,以及全面监控云时代的基础设施。此外,观测云还具备快速发现系统安全风险的能力,为数字化时代提供安全保障。

    部署 DataKit

    DataKit 是一个开源的、跨平台的数据收集和监控工具,由观测云开发并维护。它旨在帮助用户收集、处理和分析各种数据源,如日志、指标和事件,以便进行有效的监控和故障排查。DataKit 支持多种数据输入和输出格式,可以轻松集成到现有的监控系统中。

    登录观测云控制台,在「集成」 - 「DataKit」选择对应安装方式,当前采用 Linux 主机部署 DataKit。

    采集步骤

    下载 JMX Exporter

    下载地址:https://github.com/prometheus/jmx_exporter/releases/tag/1.3.0

    配置 JMX 脚本和启动参数

    注意:采集 Producer、Consumer、Streams、Connect 指标需要开各自独立进程,启动各自进程时注意替换对应的 yaml 文件和对应的启动脚本,如下可参考。

    KRaft Metrics

    • 创建 KRaft Metrics 配置文件 kafka.yml
    # ------------------------------------------------------------
    # Kafka 4 Prometheus JMX Exporter Configuration
    # ------------------------------------------------------------
    lowercaseOutputName: false
    lowercaseOutputLabelNames: true
    cacheRules: true
    rules:
    
    # 1. Broker / Topic / Partition Metrics
      - pattern: kafka.server<type=BrokerTopicMetrics, name=(BytesInPerSec|BytesOutPerSec|MessagesInPerSec|TotalFetchRequestsPerSec|ProduceRequestsPerSec|FailedProduceRequestsPerSec|TotalProduceRequestsPerSec|ReassignmentBytesInPerSec|ReassignmentBytesOutPerSec|ProduceMessageConversionsPerSec|FetchMessageConversionsPerSec)(?:, topic=([-\.\w]*))?><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
        name: kafka_server_broker_topic_metrics_$1
        type: GAUGE
        labels:
          topic: "$2"
    
    # 2. Request / Network Metrics
      - pattern: kafka.network<type=RequestMetrics, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
        name: kafka_network_request_metrics_$1
        type: GAUGE
    
    # 3. Socket Server Metrics
      - pattern: kafka.network<type=SocketServer, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate|Value)
        name: kafka_network_socket_server_metrics_$1
        type: GAUGE
    
    # 4. Log / Segment / Cleaner Metrics
      - pattern: kafka.log<type=LogFlushStats, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
        name: kafka_log_$1_$2
        type: GAUGE
    
    # 5. Controller (KRaft) Metrics
      - pattern: kafka.controller<type=KafkaController, name=(.+)><>(Count|Value)
        name: kafka_controller_$1
        type: GAUGE
    
    # 6. Group / Coordinator Metrics
      - pattern: kafka.coordinator.group<type=GroupMetadataManager, name=(.+)><>(Count|Value)
        name: kafka_coordinator_group_metadata_manager_$1
        type: GAUGE
    
    # 7. KRaft Specific Metrics
      - pattern: kafka.controller<type=KafkaController, name=(LeaderElectionSuccessRate|LeaderElectionLatencyMs)><>(Count|Value)
        name: kafka_controller_$1
        type: GAUGE
    
    # 8. New Generation Consumer Rebalance Protocol Metrics
      - pattern: kafka.coordinator.group<type=GroupMetadataManager, name=(RebalanceTimeMs|RebalanceFrequency)><>(Count|Value)
        name: kafka_coordinator_group_metadata_manager_$1
        type: GAUGE
    
    # 9. Queue Metrics
      - pattern: kafka.server<type=Queue, name=(QueueSize|QueueConsumerRate)><>(Count|Value)
        name: kafka_server_queue_$1
        type: GAUGE
    
    # 10. Client Metrics
      - pattern: kafka.network<type=RequestMetrics, name=(ClientConnections|ClientRequestRate|ClientResponseTime)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
        name: kafka_network_request_metrics_$1
        type: GAUGE
    
    # 11. Log Flush Rate and Time
      - pattern: kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
        name: kafka_log_log_flush_rate_and_time_ms
        type: GAUGE
    
    • 启动参数
    export KAFKA_HEAP_OPTS="-Xms1g -Xmx1g"
    export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9999 \
      -Dcom.sun.management.jmxremote.rmi.port=9999 \
      -Dcom.sun.management.jmxremote.authenticate=false \
      -Dcom.sun.management.jmxremote.ssl=false \
      -Djava.rmi.server.hostname=127.0.0.1"
    export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7071:/opt/jmx_exporter/kafka.yml"
    
    /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties
    

    Producer Metrics

    • 创建 Procucer Metrics 配置文件 producer.yml
    ---
    lowercaseOutputName: true
    rules:
      # 新增:producer-node-metrics
      - pattern: kafka\.producer<type=producer-node-metrics, client-id=([^,]+), node-id=([^>]+)><>([^:]+)
        name: kafka_producer_node_$3
        labels:
          client_id: "$1"
          node_id: "$2"
        type: GAUGE
    
      - pattern: 'kafka\.producer<type=producer-metrics, client-id=([^>]+)><>([^:,\s]+).*'
        name: 'kafka_producer_metrics_$2'
        labels:
          client_id: "$1"
        type: GAUGE
    
      # 抓取 Selector 全部指标(Kafka 4.0 新增)
      - pattern: 'kafka\.(?:(producer|consumer|connect))<type=(producer|consumer|connect)-metrics, client-id=([^>]+)><>(connection-.+|io-.+|network-.+|select-.+|send-.+|receive-.+|reauthentication-.+)'
        name: 'kafka_${1}_${4}'
        labels:
          client_id: '$3'
        type: GAUGE
    
    • 启动参数
    export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7072:/opt/jmx_exporter/producer.yml"
    
    /opt/kafka/kafka/bin/kafka-console-producer.sh \
      --broker-list localhost:9092 \
      --topic xxxx \
      --producer-property bootstrap.servers=localhost:9092
    

    Consumer Metrics

    • 创建 Consumer Metrics 配置文件consumer.yml
    lowercaseOutputName: true
    rules:
      # consumer-coordinator-metrics
      - pattern: 'kafka\.consumer<type=consumer-coordinator-metrics, client-id=([^>]+)><>([^:,\s]+).*'
        name: 'kafka_consumer_coordinator_metrics_$2'
        labels:
          client_id: "$1"
        type: GAUGE
    
      - pattern: 'kafka\.consumer<type=consumer-metrics, client-id=([^>]+)><>([^:,\s]+).*'
        name: 'kafka_consumer_metrics_$2'
        labels:
          client_id: "$1"
    
    • 启动参数
    export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7073:/opt/jmx_exporter/consumer.yml"
    
    /opt/kafka/kafka/bin/kafka-console-consumer.sh \
      --broker-list localhost:9092 \
      --topic xxxx \
      --producer-property bootstrap.servers=localhost:9092
    

    Streams Metrics

    • 创建 Streams Metrics 配置文件stream.yml
    lowercaseOutputName: true
    lowercaseOutputLabelNames: true
    
    rules:
      # Kafka Streams 应用指标 - 移除特殊字符
      - pattern: 'kafka.streams<type=stream-metrics, client-id=(.+)><>([a-zA-Z0-9\-]+)$'
        name: kafka_streams_$2
        labels:
          client_id: "$1"
          
      # 处理包含特殊字符的属性名
      - pattern: 'kafka.streams<type=stream-metrics, client-id=(.+)><>([a-zA-Z0-9\-]+):(.+)$'
        name: kafka_streams_$2_$3
        labels:
          client_id: "$1"
    
      # Processor Node 指标
      - pattern: 'kafka.streams<type=stream-processor-node-metrics, client-id=(.+), task-id=(.+), processor-node-id=(.+)><>(.+)'
        name: kafka_streams_processor_$4
        labels:
          client_id: "$1"
          task_id: "$2"
          processor_node_id: "$3"
          
      # Task 指标
      - pattern: 'kafka.streams<type=stream-task-metrics, client-id=(.+), task-id=(.+)><>(.+)'
        name: kafka_streams_task_$3
        labels:
          client_id: "$1"
          task_id: "$2"
          
      # 线程指标
      - pattern: 'kafka.streams<type=stream-thread-metrics, client-id=(.+), thread-id=(.+)><>(.+)'
        name: kafka_streams_thread_$3
        labels:
          client_id: "$1"
          thread_id: "$2"
          
      # JVM 指标
      - pattern: 'java.lang<type=Memory><>(.+)'
        name: jvm_memory_$1
        
      - pattern: 'java.lang<type=GarbageCollector, name=(.+)><>(\w+)'
        name: jvm_gc_$2
        labels:
          gc: "$1"
          
      # 线程池指标
      - pattern: 'java.lang<type=Threading><>(.+)'
        name: jvm_threads_$1
        
      # 默认规则
      - pattern: '(.*)'
    
    • 启动参数
    export KAFKA_HEAP_OPTS="-Xms512m -Xmx512m"
    export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9996 \
      -Dcom.sun.management.jmxremote.rmi.port=9996 \
      -Dcom.sun.management.jmxremote.authenticate=false \
      -Dcom.sun.management.jmxremote.ssl=false \
      -Djava.rmi.server.hostname=127.0.0.1"
    export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7075:/opt/jmx_exporter/stream.yml"
    
    java $KAFKA_HEAP_OPTS $KAFKA_JMX_OPTS $EXTRA_ARGS -cp "libs/*:my-streams.jar" WordCountDemo
    

    Connect Metrics

    • 创建 Connect Metrics 配置文件 connect.yml
    lowercaseOutputName: true
    lowercaseOutputLabelNames: true
    rules:
      # 1) connect-worker-metrics(全局)
      - pattern: 'kafka\.connect<type=connect-worker-metrics><>([^:]+)'
        name: 'kafka_connect_worker_$1'
        type: GAUGE
    
      # 2) connect-worker-metrics,connector=xxx
      - pattern: 'kafka\.connect<type=connect-worker-metrics, connector=([^>]+)><>([^:]+)'
        name: 'kafka_connect_worker_$2'
        labels:
          connector: "$1"
        type: GAUGE
    
      # 3) connect-worker-rebalance-metrics
      - pattern: 'kafka\.connect<type=connect-worker-rebalance-metrics><>([^:]+)'
        name: 'kafka_connect_worker_rebalance_$1'
        type: GAUGE
    
      # 4) connector-task-metrics
      - pattern: 'kafka\.connect<type=connector-task-metrics, connector=([^>]+), task=([^>]+)><>([^:]+)'
        name: 'kafka_connect_task_$3'
        labels:
          connector: "$1"
          task_id: "$2"
        type: GAUGE
    
      # 5) sink-task-metrics
      - pattern: 'kafka\.connect<type=sink-task-metrics, connector=([^>]+), task=([^>]+)><>([^:]+)'
        name: 'kafka_connect_sink_task_$3'
        labels:
          connector: "$1"
          task_id: "$2"
    
    • 启动参数
    export KAFKA_HEAP_OPTS="-Xms512m -Xmx512m"
    export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote \
      -Dcom.sun.management.jmxremote.authenticate=false \
      -Dcom.sun.management.jmxremote.ssl=false \
      -Dcom.sun.management.jmxremote.port=9995 \
      -Dcom.sun.management.jmxremote.rmi.port=9995 \
      -Djava.rmi.server.hostname=127.0.0.1"
    export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7074:/opt/jmx_exporter/connect.yml"
    
    # 启动 Kafka Connect
    /opt/kafka/kafka/bin/connect-distributed.sh /opt/kafka/kafka/config/connect-distributed.properties
    

    启动成功后,可通过 curl http://IP:端口号/metrics 查看获取到的监控数据。

    配置 DataKit

    • 进入 datakit 安装目录下的 conf.d/prom 目录,复制 prom.conf.sample 并命名为 kafka.conf
    cp prom.conf.sample kafka.conf
    
    • 调整 kafka.conf
    [[inputs.prom]]
      ## Exporter URLs.
      urls = ["http://127.0.0.1:7071/metrics","http://127.0.0.1:7072/metrics","http://127.0.0.1:7073/metrics","http://127.0.0.1:7074/metrics","http://127.0.0.1:7075/metrics"]
    
      ## Collector alias.
      source = "kafka"
    
      ## Prioritier over 'measurement_name' configuration.
      [[inputs.prom.measurements]]
        prefix = "kafka_controller_"
        name = "kafka_controller"
        
      [[inputs.prom.measurements]]
        prefix = "kafka_network_"
        name = "kafka_network"
    
      [[inputs.prom.measurements]]
        prefix = "kafka_log_"
        name = "kafka_log"
    
      [[inputs.prom.measurements]]
        prefix = "kafka_server_"
        name = "kafka_server"
    
      [[inputs.prom.measurements]]
        prefix = "kafka_connect_"
        name = "kafka_connect"
    
      [[inputs.prom.measurements]]
        prefix = "kafka_stream_"
        name = "kafka_stream"
    

    重启 DataKit

    执行以下命令:

    datakit service -R
    

    指标集

    以下是 kafka4.0 部分指标说明,更多指标可参考 Kafka 指标详情

    kafka_server指标集

    指标名 描述 单位
    Fetch_queue_size 当前活跃的 Broker 数量 Count
    Produce_queue_size 当前活跃的 Broker 数量 Count
    Request_queue_size 当前活跃的 Broker 数量 Count
    broker_topic_metrics_BytesInPerSec 当前活跃的 Broker 数量 Count
    broker_topic_metrics_BytesOutPerSec 当前活跃的 Broker 数量 Count
    broker_topic_metrics_FailedProduceRequestsPerSec 当前活跃的 Broker 数量 Count
    broker_topic_metrics_FetchMessageConversionsPerSec 当前活跃的 Broker 数量 Count
    broker_topic_metrics_MessagesInPerSec 当前活跃的 Broker 数量 Count
    broker_topic_metrics_ProduceMessageConversionsPerSec 当前活跃的 Broker 数量 Count
    broker_topic_metrics_TotalFetchRequestsPerSec 当前活跃的 Broker 数量 Count
    broker_topic_metrics_TotalProduceRequestsPerSec 当前活跃的 Broker 数量 Count
    socket_server_metrics_connection_count 当前活跃的 Broker 数量 Count
    socket_server_metrics_connection_close_total 当前活跃的 Broker 数量 Count
    socket_server_metrics_incoming_byte_rate 当前活跃的 Broker 数量 Count

    kafka_network 指标集

    指标名 描述 单位
    request_metrics_RequestBytes_request_AddOffsetsToTxn AddOffsetsToTxn 请求大小 bytes
    request_metrics_RequestBytes_request_Fetch Fetch 请求大小 count
    request_metrics_RequestBytes_request_FetchConsumer FetchConsumer 请求大小 bytes
    request_metrics_RequestBytes_request_FetchFollower FetchFollower 请求大小 bytes
    request_metrics_TotalTimeMs_request_CreateTopics CreateTopics 请求总时间 ms
    request_metrics_TotalTimeMs_request_CreatePartitions CreatePartitions 请求总时间 ms
    request_metrics_RequestQueueTimeMs_request_CreatePartitions CreatePartitions 在请求对列等待时间 ms
    request_metrics_RequestQueueTimeMs_request_Produce Produce 在请求对列的等待时间 ms

    kafka_controller 指标集

    指标名 描述 单位
    ActiveBrokerCount 当前活跃的 Broker 数量 Count
    ActiveControllerCount 活跃控制器数量 Count
    GlobalPartitionCount 分区数量 Count
    GlobalTopicCount 主题数量 Count
    IgnoredStaticVoters bytes Kong的带宽使用量,单位为字节
    OfflinePartitionsCount 离线分区数量 Count
    PreferredReplicaImbalanceCount Preferred Leader 选举条件的分区数 Count
    TimedOutBrokerHeartbeatCount Broker 心跳超时的次数 Count

    kafka_producer 指标集

    指标名 描述 单位
    producer_metrics_batch_split_rate 批次分割率 count/s
    producer_metrics_buffer_available_bytes 未使用的缓冲区内存总量 bytes
    producer_metrics_buffer_exhausted_rate 缓冲区耗尽而丢弃的每秒平均记录发送数量 count/s
    producer_metrics_buffer_total_bytes 缓冲区总字节大小 bytes
    producer_metrics_bufferpool_wait_ratio 缓冲池等待比率 %
    producer_metrics_bufferpool_wait_time_ns_total 缓冲池等待时间 ms
    producer_metrics_connection_close_rate 关闭连接率 count/s
    producer_metrics_connection_count 关闭连接数量 count

    kafka_consumer 指标集

    指标名 描述 单位
    consumer_coordinator_metrics_failed_rebalance_total 再平衡失败数量 count
    consumer_coordinator_metrics_heartbeat_rate 每秒平均心跳次数 count/s
    consumer_coordinator_metrics_heartbeat_response_time_max 心跳响应最大时间 count
    consumer_coordinator_metrics_join_rate Group 每秒加入速率 count/s
    consumer_coordinator_metrics_join_total Group 加入总数 count
    consumer_coordinator_metrics_last_rebalance_seconds_ago 自上次重新平衡事件以来的秒数 ms
    consumer_coordinator_metrics_rebalance_latency_total 重新平衡延迟总计 ms
    consumer_fetch_manager_metrics_bytes_consumed_rate 每秒消耗的字节数 bytes/s
    consumer_fetch_manager_metrics_fetch_latency_avg Fetch 请求延迟 ms
    consumer_metrics_connection_count 连接数 count
    consumer_metrics_incoming_byte_rate 输入字节数率 bytes/s

    kafka_connect 指标集

    指标名 描述 单位
    worker_connector_count Connector 数量 count
    worker_task_startup_attempts_total 任务启动重试次数 count
    worker_connector_startup_attempts_total 连接器尝试启动次数 count
    worker_task_startup_failure_total 任务启动失败数量 count
    worker_connector_startup_failure_percentage 连接失败率 %
    worker_rebalance_completed_rebalances_total 再平衡完成总数 count
    worker_task_startup_failure_percentage 任务启动失败占比 %
    worker_rebalance_time_since_last_rebalance_ms 自上次重新平衡以来的时间 ms
    worker_task_startup_attempts_total 任务尝试启动次数 count

    kafka_stream 指标集

    指标名 描述 单位
    stream_thread_metrics_thread_start_time 线程启动时间 时间戳 ms
    stream_thread_metrics_task_created_total 任务创建总数 count
    stream_state_metrics_block_cache_capacity 块缓存大小 bytes
    stream_state_metrics_all_rate 所有操作率 count/s
    stream_state_metrics_block_cache_usage 块缓存使用率 %
    stream_state_metrics_bytes_read_compaction_rate 字节读取压缩率 bytes/s
    stream_state_metrics_bytes_written_compaction_rate 字节写入压缩率 bytes/s
    stream_state_metrics_block_cache_index_hit_ratio 块缓存索引命中率 %
    stream_state_metrics_block_cache_data_hit_ratio 块缓存数据命中率 %

    场景视图

    登录观测云控制台,点击「场景」 -「新建仪表板」,输入 “Kafka 4”, 选择 “Kafka 4”,点击 “确定” 即可添加视图。

    监控器(告警)

    登录观测云控制台,点击「监控」 -「新建监控器」,输入 “kafka”, 选择对应的监控器,点击 “确定” 即可添加。

    Kafka 连接过期被关闭客户端连接

    Kafka 集群在处理消费者拉取请求时的延迟过高

    Kafka 集群在处理生产者请求时的延迟过高

    Kafka 集群 ActiveController 为0异常

    Kafka 离线分区数量过高异常

    总结

    通过监控 Kafka,我们可以实时掌握消息吞吐、消费者滞后、Broker 健康等关键指标,提前发现副本缺失、网络拥塞或消费延迟,保障系统稳定;也能结合历史基线做容量预测与异常检测,为扩缩容、参数调优提供量化依据,让数据持续高效、可观测、可演进。

    联系我们

    加入社区

    微信扫码
    加入官方交流群

    立即体验

    在线开通,按量计费,真正的云服务!

    立即开始

    选择观测云版本

    代码托管平台