Kafka 监控
Kafka 是一种分布式的,基于发布 / 订阅的消息系统。原本开发自 LinkedIn,用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。现在很多公司在都在使用 Kafka ,所以我找了一个 Kafka 的 Exporter 来采集 Kafka 的指标。
这个 Kafka Exporter 的地址是 https://github.com/danielqsj/kafka_exporter ,之前作者 2 年没有更新项目,今年突然更新了。本来以为这个工具已经没人维护被弃用了,现在发现还可以继续使用。当前最新版本是 v1.4.2 ,发布于 2021.09.16 。
这个 Kafka Exporter 支持 Apache Kafka 最低版本为 v0.10.1.0 版本。
安装
Kafka Exporter 可以直接用二进制运行,也可以用 Docker 来运行。
二进制下载地址为 https://github.com/danielqsj/kafka_exporter/releases
Docker image 下载地址为 https://hub.docker.com/r/danielqsj/kafka-exporter/tags ,看到标签为 latest 的镜像是 17 天前推送的,也就是大约 11 月初的样子,比稳定版 v1.4.2 要晚很多,可能年底还会有一次版本更新。
启动 二进制的 命令为
kafka_exporter --kafka.server=kafka:9092 [--kafka.server=another-server ...]
注意这里是要 Cluster 的 IP地址。
启动 Docker image 的命令为
docker run -ti --rm -p 9308:9308 danielqsj/kafka-exporter:v1.4.2 --kafka.server=kafka:9092 [--kafka.server=another-server ...]
我打印了 Kafka Exporter 的帮助信息,里边有很多的参数可以使用。
[Erdong@kafka01 kafka_exporter-1.4.2.linux-amd64]$ ./kafka_exporter --help
usage: kafka_exporter [<flags>]
Flags:
-h, --help Show context-sensitive help (also try --help-long and --help-man).
--web.listen-address=":9308"
Address to listen on for web interface and telemetry.
--web.telemetry-path="/metrics"
Path under which to expose metrics.
--topic.filter=".*" Regex that determines which topics to collect.
--group.filter=".*" Regex that determines which consumer groups to collect.
--log.enable-sarama Turn on Sarama logging.
--kafka.server=kafka:9092 ...
Address (host:port) of Kafka server.
--sasl.enabled Connect using SASL/PLAIN.
--sasl.handshake Only set this to false if using a non-Kafka SASL proxy.
--sasl.username="" SASL user name.
--sasl.password="" SASL user password.
--sasl.mechanism="" The SASL SCRAM SHA algorithm sha256 or sha512 or gssapi as mechanism
--sasl.service-name="" Service name when using kerberos Auth
--sasl.kerberos-config-path=""
Kerberos config path
--sasl.realm="" Kerberos realm
--sasl.kerberos-auth-type=""
Kerberos auth type. Either 'keytabAuth' or 'userAuth'
--sasl.keytab-path="" Kerberos keytab file path
--tls.enabled Connect to Kafka using TLS.
--tls.server-name="" Used to verify the hostname on the returned certificates unless
tls.insecure-skip-tls-verify is given. The kafka server's name should be given.
--tls.ca-file="" The optional certificate authority file for Kafka TLS client authentication.
--tls.cert-file="" The optional certificate file for Kafka client authentication.
--tls.key-file="" The optional key file for Kafka client authentication.
--server.tls.enabled Enable TLS for web server.
--server.tls.mutual-auth-enabled
Enable TLS client mutual authentication.
--server.tls.ca-file="" The certificate authority file for the web server.
--server.tls.cert-file="" The certificate file for the web server.
--server.tls.key-file="" The key file for the web server.
--tls.insecure-skip-tls-verify
If true, the server's certificate will not be checked for validity. This will
make your HTTPS connections insecure.
--kafka.version="2.0.0" Kafka broker version
--use.consumelag.zookeeper
if you need to use a group from zookeeper
--zookeeper.server=localhost:2181 ...
Address (hosts) of zookeeper server.
--kafka.labels="" Kafka cluster name
--refresh.metadata="30s" Metadata refresh interval
--offset.show-all Whether show the offset/lag for all consumer group, otherwise, only show
connected consumer groups
--concurrent.enable If true, all scrapes will trigger kafka operations otherwise, they will share
results. WARN: This should be disabled on large clusters
--topic.workers=100 Number of topic workers
--verbosity=0 Verbosity log level
--log.level=info Only log messages with the given severity or above. One of: [debug, info, warn,
error]
--log.format=logfmt Output format of log messages. One of: [logfmt, json]
--version Show application version.
指标
使用 这个 Exporter 可以采集以下这些类型,比如 Brokers、Topics、Consumer Groups、
关于 Brokers 主要是 Kafka 的数量。
# HELP kafka_brokers Number of Brokers in the Kafka Cluster.
# TYPE kafka_brokers gauge
kafka_brokers 3
关于 Topics 的指标就多了,
- kafka_topic_partitions Number of partitions for this Topic
- kafka_topic_partition_current_offset Current Offset of a Broker at Topic/Partition
- kafka_topic_partition_oldest_offset Oldest Offset of a Broker at Topic/Partition
- kafka_topic_partition_in_sync_replica Number of In-Sync Replicas for this Topic/Partition
- kafka_topic_partition_leader Leader Broker ID of this Topic/Partition
- kafka_topic_partition_leader_is_preferred 1 if Topic/Partition is using the Preferred Broker
- kafka_topic_partition_replicas Number of Replicas for this Topic/Partition
- kafka_topic_partition_under_replicated_partition 1 if Topic/Partition is under Replicated
这些指标的输出样例如下:
# HELP kafka_topic_partitions Number of partitions for this Topic
# TYPE kafka_topic_partitions gauge
kafka_topic_partitions{topic="__consumer_offsets"} 50
# HELP kafka_topic_partition_current_offset Current Offset of a Broker at Topic/Partition
# TYPE kafka_topic_partition_current_offset gauge
kafka_topic_partition_current_offset{partition="0",topic="__consumer_offsets"} 0
# HELP kafka_topic_partition_oldest_offset Oldest Offset of a Broker at Topic/Partition
# TYPE kafka_topic_partition_oldest_offset gauge
kafka_topic_partition_oldest_offset{partition="0",topic="__consumer_offsets"} 0
# HELP kafka_topic_partition_in_sync_replica Number of In-Sync Replicas for this Topic/Partition
# TYPE kafka_topic_partition_in_sync_replica gauge
kafka_topic_partition_in_sync_replica{partition="0",topic="__consumer_offsets"} 3
# HELP kafka_topic_partition_leader Leader Broker ID of this Topic/Partition
# TYPE kafka_topic_partition_leader gauge
kafka_topic_partition_leader{partition="0",topic="__consumer_offsets"} 0
# HELP kafka_topic_partition_leader_is_preferred 1 if Topic/Partition is using the Preferred Broker
# TYPE kafka_topic_partition_leader_is_preferred gauge
kafka_topic_partition_leader_is_preferred{partition="0",topic="__consumer_offsets"} 1
# HELP kafka_topic_partition_replicas Number of Replicas for this Topic/Partition
# TYPE kafka_topic_partition_replicas gauge
kafka_topic_partition_replicas{partition="0",topic="__consumer_offsets"} 3
# HELP kafka_topic_partition_under_replicated_partition 1 if Topic/Partition is under Replicated
# TYPE kafka_topic_partition_under_replicated_partition gauge
kafka_topic_partition_under_replicated_partition{partition="0",topic="__consumer_offsets"} 0
对于 Consumer Groups 有 2 个指标,
- kafka_consumergroup_current_offset Current Offset of a ConsumerGroup at Topic/Partition
- kafka_consumergroup_lag Current Approximate Lag of a ConsumerGroup at Topic/Partition
这两个指标的样例如下:
# HELP kafka_consumergroup_current_offset Current Offset of a ConsumerGroup at Topic/Partition
# TYPE kafka_consumergroup_current_offset gauge
kafka_consumergroup_current_offset{consumergroup="KMOffsetCache-kafka-manager-3806276532-ml44w",partition="0",topic="__consumer_offsets"} -1
# HELP kafka_consumergroup_lag Current Approximate Lag of a ConsumerGroup at Topic/Partition
# TYPE kafka_consumergroup_lag gauge
kafka_consumergroup_lag{consumergroup="KMOffsetCache-kafka-manager-3806276532-ml44w",partition="0",topic="__consumer_offsets"} 1
关于这些指标的 Grafana 展示,作者在三年前上传了一个到 Grafana Dashboards 里,我感觉已经过期了,有时间自己画一个吧。
问题排查
1. client has run out of available brokers to talk to (Is your cluster reachable?)
启动 Kafka Exporter 的时候,有时候会遇到上边的报错,这报错有多种可能,其中一种是启动的时候没有指定 Kafka 的版本,使用 --kafka.version=0.x.x.x
指定版本即可。Kafka 没有提供 version 命令,可以进入 kafka/libs
文件夹,找到像 kafka_2.10-0.8.2-beta.jar
这样的文件,其中 2.10 是 Scala 版本,0.8.2-beta 是 Kafka 版本。
Kafka 使用 jmx 监控
在 Kafka 的启动脚本内添加如下内容:
export PROMETHEUS_JMX=/usr/local/prometheus-jmx/kafka
export KAFKA_OPTS="$KAFKA_OPTS -javaagent:$PROMETHEUS_JMX/jmx_prometheus_javaagent-0.16.1.jar=7071:$PROMETHEUS_JMX/kafka-0-8-2.yml"
kafka-0-8-2.yml 配置文件来自于仓库 jmx_exporter 仓库的 example_configs 目录下,也可以根据自己的需求选择。
启动后访问 127.0.0.1:7071 就可以拿到监控数据。