KafkaMQ 日志采集最佳实践

    banner.jpg

    概述

    Kafka 是由 LinkedIn 开发、后由Apache软件基金会维护的分布式流处理平台,采用Scala和Java编写。它本质是一个高吞吐、持久化的发布-订阅消息系统,专注于处理实时数据流(如用户行为日志、点击流等)。在收集日志的场景中,Kafka 可以作为一个消息中间件,用于接收、存储和转发大量的日志,链路,指标数据。

    观测云

    观测云是一款专为 IT 工程师打造的全链路可观测产品,它集成了基础设施监控、应用程序性能监控和日志管理,为整个技术栈提供实时可观察性。这款产品能够帮助工程师全面了解端到端的用户体验追踪,了解应用内函数的每一次调用,以及全面监控云时代的基础设施。此外,观测云还具备快速发现系统安全风险的能力,为数字化时代提供安全保障。

    本实践主要是通过观测云消费 Kafka 队列收集到的日志数据,并将数据通过 Pipeline 进行字段提取和分类,便于用户对日志数据进行可视化分析。

    部署 Kafka

    目前 DataKit 支持的 Kafka 版本有 [ version:0.8.2 ~ 3.2.0 ]。

    下载 3.2.0 版本,解压即可使用。

    wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.13-3.2.0.tgz
    

    1、启动 Zookeeper 服务

    $ bin/zookeeper-server-start.sh config/zookeeper.properties
    

    2、启动 KafkaServer

    $ bin/kafka-server-start.sh config/server.properties
    

    3、创建 Topic

    创建名为 testlog 的 Topic 。

    $ bin/kafka-topics.sh --create --topic testlog --bootstrap-server localhost:9092
    

    4、启动 Producer

    $ bin/kafka-console-producer.sh --topic testlog --bootstrap-server localhost:9092
    

    部署 DataKit

    DataKit 是一个开源的、跨平台的数据收集和监控工具,由观测云开发并维护。它旨在帮助用户收集、处理和分析各种数据源,如日志、指标和事件,以便进行有效的监控和故障排查。DataKit 支持多种数据输入和输出格式,可以轻松集成到现有的监控系统中。

    登录观测云控制台,在「集成」 - 「DataKit」选择对应安装方式,当前采用 Linux 主机部署 DataKit。

    开启 KafkaMQ 采集器

    进入 DataKit 安装目录下 (默认是 /usr/local/datakit/conf.d/ ) 的 conf.d/kafkamq 目录,复制 kafkamq.conf.sample 并命名为 kafkamq.conf

    类似如下:

    -rwxr-xr-x 1 root root 2574 Apr 30 23:52 kafkamq.conf
    -rwxr-xr-x 1 root root 2579 May  1 00:40 kafkamq.conf.sample
    

    调整 kafkamq 采集器配置如下:

    • addrs = ["localhost:9092"],该文采集器 DataKit 和 Kafka 安装到同一台操作系统中,localhost 即可。
    • kafka_version = "3.2.0",该文使用 Kafka 的版本。
    • [inputs.kafkamq.custom],删除注释符号“#”。
    • [inputs.kafkamq.custom.log_topic_map],删除注释符号“#”。
    • "testlog"="log.p",testlog 为 Topic 的名字,log.p 为观测云 Pipeline 可编程数据处理器的日志字段提取规则配置。涉及的业务日志和 log.p 的内容详细见下面的《使用 Pipeline》。
    • 其他一些配置说明:
      • group_id = "datakit-group":消费者组名称,相同组内消费者共享分区消费进度。不同消费者组可独立消费同一主题。
      • assignor = "roundrobin":分区轮询分配给消费者,​适合组内消费者订阅相同主题列表​,实现负载均衡。

    注意:开启或调整 DataKit 的配置,需重启采集器(shell 下执行 datakit service -R)。

    [[inputs.kafkamq]]
      addrs = ["localhost:9092"]
      # your kafka version:0.8.2 ~ 3.2.0
      kafka_version = "3.2.0"
      group_id = "datakit-group"
      # consumer group partition assignment strategy (range, roundrobin, sticky)
      
    
      ## rate limit.
      #limit_sec = 100
      ## sample
      # sampling_rate = 1.0
    
      ## kafka tls config
      # tls_enable = true
      ## PLAINTEXT/SASL_SSL/SASL_PLAINTEXT
      # tls_security_protocol = "SASL_PLAINTEXT"
      ## PLAIN/SCRAM-SHA-256/SCRAM-SHA-512/OAUTHBEARER,default is PLAIN.
      # tls_sasl_mechanism = "PLAIN"
      # tls_sasl_plain_username = "user"
      # tls_sasl_plain_password = "pw"
      ## If tls_security_protocol is SASL_SSL, then ssl_cert must be configured.
      # ssl_cert = "/path/to/host.cert"
    
      ## -1:Offset Newest, -2:Offset Oldest
      offsets=-1
    
      ## skywalking custom
      #[inputs.kafkamq.skywalking]
      ## Required: send to datakit skywalking input.
      #  dk_endpoint="http://localhost:9529"
      #  thread = 8 
      #  topics = [
      #    "skywalking-metrics",
      #    "skywalking-profilings",
      #    "skywalking-segments",
      #    "skywalking-managements",
      #    "skywalking-meters",
      #    "skywalking-logging",
      #  ]
      #  namespace = ""
    
      ## Jaeger from kafka. Please make sure your Datakit Jaeger collector is open!
      #[inputs.kafkamq.jaeger]
      ## Required: ipv6 is "[::1]:9529"
      #  dk_endpoint="http://localhost:9529"
      #  thread = 8 
      #  source: agent,otel,others...
      #  source = "agent"
      #  # Required: topics
      #  topics=["jaeger-spans","jaeger-my-spans"]
    
      ## user custom message with PL script.
      [inputs.kafkamq.custom]
        #spilt_json_body = true
        #thread = 8
        #storage_index = "" # NOTE: only working on logging collection
    
        ## spilt_topic_map determines whether to enable log splitting for specific topic based on the values in the spilt_topic_map[topic].
        #[inputs.kafkamq.custom.spilt_topic_map]
        #  "log_topic"=true
        #  "log01"=false
        [inputs.kafkamq.custom.log_topic_map]
           "test_log"="log.p"
        #  "log01"="log_01.p"
        #[inputs.kafkamq.custom.metric_topic_map]
        #  "metric_topic"="metric.p"
        #  "metric01"="rum_apm.p"
        #[inputs.kafkamq.custom.rum_topic_map]
        #  "rum_topic"="rum_01.p"
        #  "rum_02"="rum_02.p"
    
      #[inputs.kafkamq.remote_handle]
        ## Required
        #endpoint="http://localhost:8080"
        ## Required topics
        #topics=["spans","my-spans"]
        # send_message_count = 100
        # debug = false
        # is_response_point = true
        # header_check = false
    
      ## Receive and consume OTEL data from kafka.
      #[inputs.kafkamq.otel]
        #dk_endpoint="http://localhost:9529"
        #trace_api="/otel/v1/traces"
        #metric_api="/otel/v1/metrics"
        #trace_topics=["trace1","trace2"]
        #metric_topics=["otel-metric","otel-metric1"]
        #thread = 8 
    
      ## todo: add other input-mq
    

    编写 Pipeline

    log.p 规则内容:

    data = load_json(message)
    protocol = data["protocol"]
    response_code = data["response_code"]
    set_tag(protocol,protocol)set_tag(response_code,response_code)
    group_between(response_code,[200,300],"info","status")
    group_between(response_code,[400,499],"warning","status")
    group_between(response_code,[500,599],"error","status")
    time = data["start_time"]
    set_tag(time,time)
    default_time(time)
    

    效果展示

    发送业务日志样例

    业务日志样例文件如下:

    #info
    {"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":204,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:37:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
    #error
    {"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":504,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:39:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
    #warn
    {"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":404,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:38:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
    

    日志发送命令

    在 Producer 启动后,分别发送如下三条日志内容,三条日志一条为 info 级别("response_code":204),另一条为 error 级别("response_code":504),最后一条为 warn 级别日志("response_code":404)。

    >{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":204,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
    >{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":504,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
    >{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":404,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
    

    效果

    • 通过 DataKit 采集到 Kafka 的三条业务日志

    • 使用 Pipeline 对日志进行字段提取的效果展示

    下图 protocol、response_code 以及 time 都是使用 Pipeline 提取后的效果。

    结语

    观测云通过集成 KafkaMQ ,实现了 Kafka 队列日志数据的高效采集和处理,并结合观测云的 Pipeline 功能,能够实时采集业务日志并进行字段提取和分类,便于后续分析和可视化;此外,DataKit 的 KafkaMQ 采集器可扩展应用于其他数据处理场景,如还支持链路(如开源 otel,skywalking,jaeger),指标,RUM 等数据的消费,这种集成方案提升了系统的可观测性,同时反映了观测云平台的开放和包容性,加速了企业的数字化转型。

    联系我们

    加入社区

    微信扫码
    加入官方交流群

    立即体验

    在线开通,按量计费,真正的云服务!

    立即开始

    选择观测云版本

    代码托管平台