Kafka 监控

Kafka 是一种分布式的,基于发布 / 订阅的消息系统。原本开发自 LinkedIn,用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。现在很多公司在都在使用 Kafka ,所以我找了一个 Kafka 的 Exporter 来采集 Kafka 的指标。

这个 Kafka Exporter 的地址是 https://github.com/danielqsj/kafka_exporter ,之前作者 2 年没有更新项目,今年突然更新了。本来以为这个工具已经没人维护被弃用了,现在发现还可以继续使用。当前最新版本是 v1.4.2 ,发布于 2021.09.16 。

这个 Kafka Exporter 支持 Apache Kafka 最低版本为 v0.10.1.0 版本。

安装

Kafka Exporter 可以直接用二进制运行,也可以用 Docker 来运行。

二进制下载地址为 https://github.com/danielqsj/kafka_exporter/releases

Docker image 下载地址为 https://hub.docker.com/r/danielqsj/kafka-exporter/tags ,看到标签为 latest 的镜像是 17 天前推送的,也就是大约 11 月初的样子,比稳定版 v1.4.2 要晚很多,可能年底还会有一次版本更新。

启动 二进制的 命令为

kafka_exporter --kafka.server=kafka:9092 [--kafka.server=another-server ...]

注意这里是要 Cluster 的 IP地址。

启动 Docker image 的命令为

docker run -ti --rm -p 9308:9308 danielqsj/kafka-exporter:v1.4.2 --kafka.server=kafka:9092 [--kafka.server=another-server ...]

我打印了 Kafka Exporter 的帮助信息,里边有很多的参数可以使用。

[Erdong@kafka01 kafka_exporter-1.4.2.linux-amd64]$ ./kafka_exporter --help
usage: kafka_exporter [<flags>]

Flags:
  -h, --help                     Show context-sensitive help (also try --help-long and --help-man).
      --web.listen-address=":9308"
                                 Address to listen on for web interface and telemetry.
      --web.telemetry-path="/metrics"
                                 Path under which to expose metrics.
      --topic.filter=".*"        Regex that determines which topics to collect.
      --group.filter=".*"        Regex that determines which consumer groups to collect.
      --log.enable-sarama        Turn on Sarama logging.
      --kafka.server=kafka:9092 ...
                                 Address (host:port) of Kafka server.
      --sasl.enabled             Connect using SASL/PLAIN.
      --sasl.handshake           Only set this to false if using a non-Kafka SASL proxy.
      --sasl.username=""         SASL user name.
      --sasl.password=""         SASL user password.
      --sasl.mechanism=""        The SASL SCRAM SHA algorithm sha256 or sha512 or gssapi as mechanism
      --sasl.service-name=""     Service name when using kerberos Auth
      --sasl.kerberos-config-path=""
                                 Kerberos config path
      --sasl.realm=""            Kerberos realm
      --sasl.kerberos-auth-type=""
                                 Kerberos auth type. Either 'keytabAuth' or 'userAuth'
      --sasl.keytab-path=""      Kerberos keytab file path
      --tls.enabled              Connect to Kafka using TLS.
      --tls.server-name=""       Used to verify the hostname on the returned certificates unless
                                 tls.insecure-skip-tls-verify is given. The kafka server's name should be given.
      --tls.ca-file=""           The optional certificate authority file for Kafka TLS client authentication.
      --tls.cert-file=""         The optional certificate file for Kafka client authentication.
      --tls.key-file=""          The optional key file for Kafka client authentication.
      --server.tls.enabled       Enable TLS for web server.
      --server.tls.mutual-auth-enabled
                                 Enable TLS client mutual authentication.
      --server.tls.ca-file=""    The certificate authority file for the web server.
      --server.tls.cert-file=""  The certificate file for the web server.
      --server.tls.key-file=""   The key file for the web server.
      --tls.insecure-skip-tls-verify
                                 If true, the server's certificate will not be checked for validity. This will
                                 make your HTTPS connections insecure.
      --kafka.version="2.0.0"    Kafka broker version
      --use.consumelag.zookeeper
                                 if you need to use a group from zookeeper
      --zookeeper.server=localhost:2181 ...
                                 Address (hosts) of zookeeper server.
      --kafka.labels=""          Kafka cluster name
      --refresh.metadata="30s"   Metadata refresh interval
      --offset.show-all          Whether show the offset/lag for all consumer group, otherwise, only show
                                 connected consumer groups
      --concurrent.enable        If true, all scrapes will trigger kafka operations otherwise, they will share
                                 results. WARN: This should be disabled on large clusters
      --topic.workers=100        Number of topic workers
      --verbosity=0              Verbosity log level
      --log.level=info           Only log messages with the given severity or above. One of: [debug, info, warn,
                                 error]
      --log.format=logfmt        Output format of log messages. One of: [logfmt, json]
      --version                  Show application version.

指标

使用 这个 Exporter 可以采集以下这些类型,比如 Brokers、Topics、Consumer Groups、

关于 Brokers 主要是 Kafka 的数量。

# HELP kafka_brokers Number of Brokers in the Kafka Cluster.
# TYPE kafka_brokers gauge
kafka_brokers 3

关于 Topics 的指标就多了,

  • kafka_topic_partitions Number of partitions for this Topic
  • kafka_topic_partition_current_offset Current Offset of a Broker at Topic/Partition
  • kafka_topic_partition_oldest_offset Oldest Offset of a Broker at Topic/Partition
  • kafka_topic_partition_in_sync_replica Number of In-Sync Replicas for this Topic/Partition
  • kafka_topic_partition_leader Leader Broker ID of this Topic/Partition
  • kafka_topic_partition_leader_is_preferred 1 if Topic/Partition is using the Preferred Broker
  • kafka_topic_partition_replicas Number of Replicas for this Topic/Partition
  • kafka_topic_partition_under_replicated_partition 1 if Topic/Partition is under Replicated

这些指标的输出样例如下:

# HELP kafka_topic_partitions Number of partitions for this Topic
# TYPE kafka_topic_partitions gauge
kafka_topic_partitions{topic="__consumer_offsets"} 50

# HELP kafka_topic_partition_current_offset Current Offset of a Broker at Topic/Partition
# TYPE kafka_topic_partition_current_offset gauge
kafka_topic_partition_current_offset{partition="0",topic="__consumer_offsets"} 0

# HELP kafka_topic_partition_oldest_offset Oldest Offset of a Broker at Topic/Partition
# TYPE kafka_topic_partition_oldest_offset gauge
kafka_topic_partition_oldest_offset{partition="0",topic="__consumer_offsets"} 0

# HELP kafka_topic_partition_in_sync_replica Number of In-Sync Replicas for this Topic/Partition
# TYPE kafka_topic_partition_in_sync_replica gauge
kafka_topic_partition_in_sync_replica{partition="0",topic="__consumer_offsets"} 3

# HELP kafka_topic_partition_leader Leader Broker ID of this Topic/Partition
# TYPE kafka_topic_partition_leader gauge
kafka_topic_partition_leader{partition="0",topic="__consumer_offsets"} 0

# HELP kafka_topic_partition_leader_is_preferred 1 if Topic/Partition is using the Preferred Broker
# TYPE kafka_topic_partition_leader_is_preferred gauge
kafka_topic_partition_leader_is_preferred{partition="0",topic="__consumer_offsets"} 1

# HELP kafka_topic_partition_replicas Number of Replicas for this Topic/Partition
# TYPE kafka_topic_partition_replicas gauge
kafka_topic_partition_replicas{partition="0",topic="__consumer_offsets"} 3

# HELP kafka_topic_partition_under_replicated_partition 1 if Topic/Partition is under Replicated
# TYPE kafka_topic_partition_under_replicated_partition gauge
kafka_topic_partition_under_replicated_partition{partition="0",topic="__consumer_offsets"} 0

对于 Consumer Groups 有 2 个指标,

  • kafka_consumergroup_current_offset Current Offset of a ConsumerGroup at Topic/Partition
  • kafka_consumergroup_lag Current Approximate Lag of a ConsumerGroup at Topic/Partition

这两个指标的样例如下:

# HELP kafka_consumergroup_current_offset Current Offset of a ConsumerGroup at Topic/Partition
# TYPE kafka_consumergroup_current_offset gauge
kafka_consumergroup_current_offset{consumergroup="KMOffsetCache-kafka-manager-3806276532-ml44w",partition="0",topic="__consumer_offsets"} -1

# HELP kafka_consumergroup_lag Current Approximate Lag of a ConsumerGroup at Topic/Partition
# TYPE kafka_consumergroup_lag gauge
kafka_consumergroup_lag{consumergroup="KMOffsetCache-kafka-manager-3806276532-ml44w",partition="0",topic="__consumer_offsets"} 1

关于这些指标的 Grafana 展示,作者在三年前上传了一个到 Grafana Dashboards 里,我感觉已经过期了,有时间自己画一个吧。

问题排查

1. client has run out of available brokers to talk to (Is your cluster reachable?)

启动 Kafka Exporter 的时候,有时候会遇到上边的报错,这报错有多种可能,其中一种是启动的时候没有指定 Kafka 的版本,使用 --kafka.version=0.x.x.x 指定版本即可。Kafka 没有提供 version 命令,可以进入 kafka/libs 文件夹,找到像 kafka_2.10-0.8.2-beta.jar 这样的文件,其中 2.10 是 Scala 版本,0.8.2-beta 是 Kafka 版本。

Kafka 使用 jmx 监控

在 Kafka 的启动脚本内添加如下内容:

export PROMETHEUS_JMX=/usr/local/prometheus-jmx/kafka
export KAFKA_OPTS="$KAFKA_OPTS -javaagent:$PROMETHEUS_JMX/jmx_prometheus_javaagent-0.16.1.jar=7071:$PROMETHEUS_JMX/kafka-0-8-2.yml"

kafka-0-8-2.yml 配置文件来自于仓库 jmx_exporter 仓库的 example_configs 目录下,也可以根据自己的需求选择。

启动后访问 127.0.0.1:7071 就可以拿到监控数据。

results matching ""

    No results matching ""