报告(Reporter)
通过 conf/flink-conf.yaml
文件配置一个或多个 Reporters 来暴露度量值给外部系统,这些 Reporter 将在作业和任务启动的时候实例化。
-
metrics.reporter.<name>.<config>
:名字为<name>
的 Reporter 的通用设置 -
metrics.reporter.<name>.class
:名字为<name>
的 Reporter class -
metrics.reporter.<name>.interval
:名字为<name>
的 Reporter 的间隔时间 -
metrics.reporter.<name>.scope.delimiter
:名字为<name>
的 Reporter 的标识符的分隔符(默认使用metrics.scope.delimiter
) -
metrics.reporters
:(可选)以逗号分隔的包含报告名称列表。默认情况下,将使用所有已配置的报告。
所有的 Reporter 配置至少需要配置 class
属性,还有一些允许配置记录间隔。下面是一些 Reporter 的配置实例:
metrics.reporters: my_jmx_reporter,my_other_reporter
metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000
包含 Reporter 的 jar 必须放到 /lib
文件夹,这样 Flink 就可以访问到这些 jar。
可以通过继承 org.apache.flink.metrics.reporter.MetricReporter
接口来实现自己的 Reporter,如果需要定期发送记录,需要继承 Scheduled
接口。
下面是一些支持的 Reporter:
JMX(org.apache.flink.metrics.jmx.JMXReporter)
不需要添加额外的依赖就可以支持 JMX Reporter,默认是不激活的。
参数:
- port - (可选)JMX 连接监听的端口。为了能够在一个主机上运行多个 Reporter 实例(例如,当一个 TaskManager 与 JobManager 共同使用时),建议端口范围(如 9250-9260),实际端口将显示在相关作业或 TaskManager 日志中。如果设置此设置,Flink 将为给定的端口/范围启动额外的 JMX 连接器。度量指标将在本地默认的JMX实例上显示。
配置示例:
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789
通过 JMX 公开的度量由域(domain)和键属性列表(key-properties)标识,这些属性一起构成对象名。
域始终以 org.apache.flink
开头,后跟一个通用的度量标识符。与通常的标识符不同,它不受作用域格式的影响,不包含任何变量,并且在跨作业时也是常量。例子:org.apache.flink.job.task.numbytesout
。
键属性列表包含与给定指标关联的所有变量的值,无论配置的作用域格式如何。例子:host=localhost,job_name=myjob,task_name=mytask
。
因此,域标识一个度量类,键属性列表标识该度量的一个(或多个)实例。
Ganglia(org.apache.flink.metrics.ganglia.GangliaReporter)
要使用此 Reporter,必须复制 /opt/flink-metrics-ganglia-1.6.1-SNAPSHOT.jar
到 Flink 的 /lib
文件夹下。
参数:
- host - 在 gmond.conf 中的 udp_recv_channel.bind 下配置的 gmond 主机地址
- port - 在 gmond.conf 的 udp_recv_channel.port 下配置的 gmond 端口
- tmax - 旧指标应保留多长时间的软限制
- dmax - 旧指标应保留多长时间的硬限制
- ttl - 传输的 UDP 包的生存时间
- addressingMode - 要使用的 UDP 寻址模式(单播/多播)
配置示例:
metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
metrics.reporter.gang.host: localhost
metrics.reporter.gang.port: 8649
metrics.reporter.gang.tmax: 60
metrics.reporter.gang.dmax: 0
metrics.reporter.gang.ttl: 1
metrics.reporter.gang.addressingMode: MULTICAST
Graphite(org.apache.flink.metrics.graphite.GraphiteReporter)
要使用此 Reporter,必须复制 /opt/flink-metrics-graphite-1.6.1-SNAPSHOT.jar
到 Flink 的 /lib
文件夹下。
参数:
- host - Graphite 服务器主机地址
- port - Graphite 服务器端口
- protocol - 使用协议(TCP / UDP)
配置示例:
metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP
Prometheus (org.apache.flink.metrics.prometheus.PrometheusReporter)
要使用此 Reporter,必须复制 /opt/flink-metrics-prometheus-1.6.1-SNAPSHOT.jar
到 Flink 的 /lib
文件夹下。
参数:
- port - (可选)Prometheus exporter 监听的端口,默认为 9249。为了能够在一个主机上运行多个报告实例(例如,当一个 TaskManager 与 JobManager 共同使用时),建议使用端口范围(如:9250-9260)。
配置示例:
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
Flink 度量类型映射到 Prometheus 度量类型,如下所示:
Flink | Prometheus | Description |
---|---|---|
Counter | Gauge | Prometheus 计数器不能减 |
Gauge | Gauge | Prometheus 仅支持数字和布尔类型 |
Histogram | Summary | 分位数 .5,.75,.95,.98,.99 和 .999 |
Meter | Gauge | The gauge exports the meter’s rate |
PrometheusPushGateway(org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)
要使用此 Reporter,必须复制 /opt/flink-metrics-prometheus-1.6.1-SNAPSHOT.jar
到 Flink 的 /lib
文件夹下。
参数:
键 | 默认值 | 描述 |
---|---|---|
deleteOnShutdown | true | 指定是否在关闭时从 PushGateway 中删除指标。 |
Host | (none) | PushGateway 服务器主机。 |
jobName | (none) | 将推送指标的作业名称。 |
port | -1 | PushGateway 服务器端口。 |
randomJobNameSuffix | true | 指定是否应将随机后缀附加到作业名称。 |
配置示例:
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
PrometheusPushGatewayReporter 将指标推送到 Pushgateway,可由 Prometheus 抓取。
StatsD(org.apache.flink.metrics.statsd.StatsDReporter)
要使用此 Reporter,必须复制 /opt/flink-metrics-statsd-1.6.1-SNAPSHOT.jar
到 Flink 的 /lib
文件夹下。
参数:
- host - StatsD 服务器主机
- port - StatsD 服务器端口
配置示例:
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125
Datadog(org.apache.flink.metrics.datadog.DatadogHttpReporter)
要使用此 Reporter,必须复制 /opt/flink-metrics-datadog-1.6.1-SNAPSHOT.jar
到 Flink 的 /lib
文件夹下。
Flink 指标,如任何变量 <host>
,<job_name>
,<tm_id>
,<subtask_index>
,<task_name>
和 <operator_name>
,将被发送到 Datadog 作为标签。标签看起来像 host:localhost
和 job_name:myjobname
。
参数:
- apikey - Datadog APIKeys
- tags - (可选)发送到 Datadog 时将应用于度量标准的全局标记。标签应仅以逗号分隔
配置示例:
metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: xxx
metrics.reporter.dghttp.tags: myflinkapp,prod
Slf4j(org.apache.flink.metrics.slf4j.Slf4jReporter)
要使用此 Reporter,必须复制 /opt/flink-metrics-slf4j-1.6.1-SNAPSHOT.jar
到 Flink 的 /lib
文件夹下。
配置示例:
metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 60 SECONDS
系统指标
Flink 默认会收集当前状态的指标,下文的表格中包括以下5列:
- “Scope”列描述了生成系统范围的范围格式,比如,如果表格里面的值为“Operator”,那么“metrics.scope.operator”将作为指标的范围格式。如果表格包含使用斜线分割的多个值,那么系统将根据不同的值分别报告多个指标,比如同时包含 job- 和 taskmanagers 两个。
- “Infix”(可选)列描述了附加哪个中缀到系统范围之后。
- “Metrics” 列出了此系统范围和中缀注册的所有特性的名字。
- “Description”列描述了指标测量的信息。
- “Type”描述了指标的类型。
请注意,“infix” 和 “Metrics” 列中所有的点根据 “metrics.delimiter” 设置变化。
因此,为了推断指标的标识符:
- 先从“Scope”列获取范围格式。
- 如果“Infix”列有值的话,附加到范围格式后面,并根据“metrices.delimiter”设置附加相应的分隔符。
- 附加指标的名称。
CPU
Scope | Infix | Metrics | Description | Type |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.CPU | Load | JVM CPU使用情况。 | Gauge |
- | - | Time | JVM CPU时间。 | Gauge |
Memory
Scope | Infix | Metrics | Description | Type |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.Memory | Heap.Used | 当前使用的堆内存量(bytes)。 | Gauge |
- | - | Heap.Committed | 保证可供 JVM 使用的堆内存量(bytes)。 | Gauge |
- | - | Heap.Max | 可用于内存管理的最大堆内存量(bytes)。 | Gauge |
- | - | NonHeap.Used | 当前使用的非堆内存量(bytes)。 | Gauge |
- | - | NonHeap.Committed | 保证 JVM 可用的非堆内存量(bytes)。 | Gauge |
- | - | NonHeap.Max | 可用于内存管理的最大非堆内存量(bytes)。 | Gauge |
- | - | Direct.Count | 直接缓冲池中的缓冲区数。 | Gauge |
- | - | Direct.MemoryUsed | JVM 用于直接缓冲池的内存量(bytes)。 | Gauge |
- | - | Direct.TotalCapacity | 直接缓冲池中所有缓冲区的总容量(bytes)。 | Gauge |
- | - | Mapped.Count | 映射缓冲池中的缓冲区数。 | Gauge |
- | - | Mapped.MemoryUsed | JVM 用于映射缓冲池的内存量(bytes)。 | Gauge |
- | - | Mapped.TotalCapacity | 映射缓冲池中的缓冲区数(bytes)。 | Gauge |
Threads
Scope | Infix | Metrics | Description | Type |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.Threads | Count | 活动线程总数。 | Gauge |
GarbageCollection
Scope | Infix | Metrics | Description | Type |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.GarbageCollector |
<GarbageCollector> .Count |
已发生的集合总数。 | Gauge |
- | - |
<GarbageCollector> .Time |
执行垃圾收集所花费的总时间。 | Gauge |
ClassLoader
Scope | Infix | Metrics | Description | Type |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.ClassLoader | ClassesLoaded | JVM 启动以来加载的类总数。 | Gauge |
- | - | ClassesUnloaded | JVM 启动以来卸载的类总数。 | Gauge |
Network
Scope | Infix | Metrics | Description | Type |
---|---|---|---|---|
TaskManager | Status.Network | AvailableMemorySegments | 未使用的内存段数。 | Gauge |
- | - | TotalMemorySegments | 分配的内存段数。 | Gauge |
Task | buffers | inputQueueLength | 排队的输入缓冲区数。 | Gauge |
- | - | outputQueueLength | 排队输出缓冲区的数量。 | Gauge |
- | - | inPoolUsage | 估计输入缓冲区的使用情况。 | Gauge |
- | - | outPoolUsage | 估计输出缓冲区的使用情况。 | Gauge |
- | Network.<Input/Output> .<gate>
|
totalQueueLen | 所有输入/输出通道中排队缓冲区的总数。 | Gauge |
- | - | minQueueLen | 所有输入/输出通道中的最小排队缓冲区数。 | Gauge |
- | - | maxQueueLen | 所有输入/输出通道中的最大排队缓冲区数。 | Gauge |
- | - | avgQueueLen | 所有输入/输出通道中的平均缓冲区数。 | Gauge |
Cluster
Scope | Metrics | Description | Type |
---|---|---|---|
JobManager | numRegisteredTaskManagers | 注册 TaskManager 的数量。 | Gauge |
- | numRunningJobs | 正在运行的作业数量。 | Gauge |
- | taskSlotsAvailable | 可用任务槽的数量。 | Gauge |
- | taskSlotsTotal | 任务槽的总数。 | Gauge |
Availability
Scope | Metrics | Description | Type |
---|---|---|---|
Job | restartingTime | 重新启动作业所花费的时间,或当前重新启动的持续时间(ms)。 | Gauge |
- | uptime | 作业运行的时间不间断。对于已完成的作业,返回-1(ms)。 | Gauge |
- | downtime | 对于当前处于故障/恢复状态的作业,在此中断期间经过的时间。对于正在运行的作业返回0,对于已完成的作业返回-1(ms)。 | Gauge |
- | fullRestarts | 自提交此作业以来完全重新启动的总次数。 | Gauge |
Checkpointing
Scope | Metrics | Description | Type |
---|---|---|---|
Job | lastCheckpointDuration | 完成最后一个检查点所花费的时间(ms)。 | Gauge |
- | lastCheckpointSize | 最后一个检查点的总大小(bytes)。 | Gauge |
- | lastCheckpointExternalPath | 存储最后一个外部检查点的路径。 | Gauge |
- | lastCheckpointRestoreTimestamp | 在协调器上恢复最后一个检查点时的时间戳(ms)。 | Gauge |
- | lastCheckpointAlignmentBuffered | 在最后一个检查点的所有子任务上进行对齐期间的缓冲字节数(ms)。 | Gauge |
- | numberOfInProgressCheckpoints | 进行中检查点的数量。 | Gauge |
- | numberOfCompletedCheckpoints | 成功完成检查点的数量。 | Gauge |
- | numberOfFailedCheckpoints | 失败检查点的数量。 | Gauge |
- | totalNumberOfCheckpoints | 总检查点的数量(正在进行,已完成,失败)。 | Gauge |
Task | checkpointAlignmentTime | 最后一次屏障对齐完成所花费的时间(nanoseconds),或当前对齐到目前为止所用的时间(nanoseconds)。 | Gauge |
IO
Scope | Metrics | Description | Type |
---|---|---|---|
Job |
<SOURCE_ID> .<source_subtask_index> .<operator_id> .<operator_subtask_index> .latency |
从给定源子任务到算子子任务的延迟分布(ms)。 | Histogram |
Task | numBytesInLocal | 此任务从本地源读取的总字节数。 | Counter |
- | numBytesInLocalPerSecond | 此任务每秒从本地源读取的字节数。 | Meter |
- | numBytesInRemote | 此任务从远程源读取的总字节数。 | Counter |
- | numBytesInRemotePerSecond | 此任务每秒从远程源读取的字节数。 | Meter |
- | numBuffersInLocal | 此任务从本地源读取的网络缓冲区总数。 | Counter |
- | numBuffersInLocalPerSecond | 此任务每秒从本地源读取的网络缓冲区数。 | Meter |
- | numBuffersInRemote | 此任务从远程源读取的网络缓冲区总数。 | Counter |
- | numBuffersInRemotePerSecond | 此任务每秒从远程源读取的网络缓冲区数。 | Meter |
- | numBytesOut | 此任务已发出的总字节数。 | Counter |
- | numBytesOutPerSecond | 此任务每秒发出的字节数。 | Meter |
- | numBuffersOut | 此任务已发出的网络缓冲区总数。 | Counter |
- | numBuffersOutPerSecond | 此任务每秒发出的网络缓冲区数。 | Meter |
Task/Operator | numRecordsIn | 此算子/任务已收到的记录总数。 | Counter |
- | numRecordsInPerSecond | 此算子/任务每秒接收的记录数。 | Meter |
- | numRecordsOut | 此算子/任务已发出的记录总数。 | Counter |
- | numRecordsOutPerSecond | 此算子/任务每秒发送的记录数。 | Meter |
- | numLateRecordsDropped | 此算子/任务因迟到而丢失的记录数。 | Counter |
- | currentInputWatermark | 此算子/任务收到的最后一个水印(ms)。注意:对于具有2个输入的算子/任务,这是最后收到的水印的最小值。 | Gauge |
Operator | currentInput1Watermark | 此算子在其第一个输入(ms)中收到的最后一个水印。注意:仅适用于具有2个输入的算子。 | Gauge |
- | currentInput2Watermark | 此算子在其第二个输入中接收的最后一个水印(ms)。注意:仅适用于具有2个输入的算子。 | Gauge |
- | currentOutputWatermark | 此算子发出的最后一个水印(ms)。 | Gauge |
- | numSplitsProcessed | 此数据源已处理的InputSplits总数。 | Gauge |
Connectors
Kafka 连接器
Scope | Metrics | User Variables | Description | Type |
---|---|---|---|---|
Operator | commitsSucceeded | N / A | 如果启用了偏移提交并且启用了检查点,则成功向 Kafka 提交的偏移提交总数。 | Counter |
- | commitsFailed | N / A | 如果启用了偏移提交并且启用了检查点,则 Kafka 的偏移提交失败总数。请注意,将偏移量提交回 Kafka 只是暴露消费者进度的一种方法,因此提交失败不会影响 Flink 的检查点分区偏移的完整性。 | Counter |
- | committedOffsets | Topic,分区 | 对于每个分区,最后成功提交到 Kafka 的偏移量。可以通过主题名称和分区ID指定特定分区的度量标准。 | Gauge |
- | currentOffsets | Topic,分区 | 消费者对每个分区的当前读取偏移量。可以通过主题名称和分区ID指定特定分区的度量标准。 | Gauge |
Kinesis 连接器
Scope | Metrics | User Variables | Description | Type |
---|---|---|---|---|
Operator | millisBehindLatest | stream,shardId | 对于每个 Kinesis 分片,消费者在流的头部后面的毫秒数,表示消费者当前时间落后多少。可以通过流名称和分片标识指定特定分片的度量标准。值为0表示记录处理被捕获,此时没有要处理的新记录。值-1表示该度量标准尚未报告。 | Gauge |
- | sleepTimeMillis | stream,shardId | 消费者在从 Kinesis 获取记录之前花费的毫秒数。可以通过流名称和分片标识指定特定分片的度量标准。 | Gauge |
- | maxNumberOfRecordsPerFetch | stream,shardId | 消费者在单个 getRecords 调用 Kinesis 时请求的最大记录数。 | Gauge |
- | numberOfAggregatedRecordsPerFetch | stream,shardId | 消费者在单个 getRecords 调用 Kinesis 时获取的聚合 Kinesis 记录数。 | Gauge |
- | numberOfDeggregatedRecordsPerFetch | stream,shardId | 消费者在单个 getRecords 调用 Kinesis 时获取的分解 Kinesis 记录的数量。 | Gauge |
- | averageRecordSizeBytes | stream,shardId | Kinesis 记录的平均大小(bytes),由消费者在单个 getRecords 调用中获取。 | Gauge |
- | runLoopTimeNanos | stream,shardId | 消费者在运行循环中花费的实际时间(ns)。 | Gauge |
- | loopFrequencyHz | stream,shardId | 一秒钟内调用 getRecords 的次数。 | Gauge |
- | bytesRequestedPerFetch | stream,shardId | 在一次调用 getRecords 中请求的字节数。 | Gauge |
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html