多读书多实践,勤思考善领悟

Flink完整配置

本文于1823天之前发表,文中内容可能已经过时。

对于单节点设置,Flink已准备好开箱即用,您无需更改默认配置即可开始使用.

开箱即用的配置将使用您的默认Java安装.您可以手动设置环境变量JAVA_HOME或配置项env.java.home中conf/flink-conf.yaml,如果你想手动覆盖Java运行时使用.

此页面列出了设置性能良好(分布式)安装通常所需的最常用选项.此外,此处还列出了所有可用配置参数的完整列表.

所有配置都已完成conf/flink-conf.yaml,预计将是具有格式的YAML键值对的扁平集合key: value.

系统和运行脚本在启动时解析配置.对配置文件的更改需要重新启动Flink JobManager和TaskManagers.

TaskManagers的配置文件可能不同,Flink不承担集群中的统一机器.

常见选项

默认 描述
jobmanager.heap.size “1024m” JobManager的JVM堆大小.
taskmanager.heap.size “1024m” TaskManagers的JVM堆大小,它是系统的并行工作者.在YARN设置中,此值自动配置为TaskManager的YARN容器的大小,减去一定的容差值.
parallelism.default 1
taskmanager.numberOfTaskSlots 1 单个TaskManager可以运行的并行算子或用户函数实例的数量.如果此值大于1,则单个TaskManager将获取函数或 算子的多个实例.这样,TaskManager可以使用多个CPU内核,但同时,可用内存在不同的算子或函数实例之间划分.此值通常与TaskManager的计算机具有的物理CPU核心数成比例(例如,等于核心数,或核心数的一半).
state.backend (none) 状态后台用于存储和检查点状态.
state.checkpoints.dir (none) 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录.必须可以从所有参与的进程/节点(即所有TaskManagers和JobManagers)访问存储路径.
state.savepoints.dir (none) 保存点的默认目录.由将后台写入文件系统的状态后台(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)使用.
high-availability “no / not” 定义用于群集执行的高可用性模式.要启用高可用性,请将此模式设置为“ZOOKEEPER”.
high-availability.storageDir (none) 文件系统路径(URI)Flink在高可用性设置中持久保存元数据.
security.ssl.internal.enabled false 打开SSL以进行内部网络通信.可选地,特定组件可以通过它们自己的设置(rpc,数据传输,REST等)覆盖它.
security.ssl.rest.enabled false 通过REST端点打开SSL以进行外部通信.

完整参考

HDFS

注意:不推荐使用这些Keys,建议使用环境变量配置Hadoop路径HADOOP_CONF_DIR.

这些参数配置Flink使用的默认HDFS.未指定HDFS配置的设置必须指定HDFS文件的完整路径(hdfs://address:port/path/to/files)文件也将使用默认HDFS参数(块大小,复制因子)编写.

  • fs.hdfs.hadoopconf:Hadoop文件系统(HDFS)配置目录的绝对路径(可选值).指定此值允许程序使用短URI引用HDFS文件(hdfs:///path/to/files不包括文件URI中NameNode的地址和端口).如果没有此选项,则可以访问HDFS文件,但需要完全限定的URI hdfs://address:port/path/to/files.此选项还会导致文件编写者获取HDFS的块大小和复制因子的默认值.Flink将在指定目录中查找“core-site.xml”和“hdfs-site.xml”文件.
  • fs.hdfs.hdfsdefault:Hadoop自己的配置文件“hdfs-default.xml”的绝对路径(DEFAULT:null).
  • fs.hdfs.hdfssite:Hadoop自己的配置文件“hdfs-site.xml”的绝对路径(DEFAULT:null).

核心

默认 描述
classloader.parent-first-patterns.additional (none) 一个(以分号分隔的)模式列表,指定应始终首先通过父ClassLoader解析哪些类.模式是一个简单的前缀,它根据完全限定的类名进行检查.这些模式附加到“classloader.parent-first-patterns.default”.
classloader.parent-first-patterns.default “java .; scala .; org.apache.flink .; com.esotericsoftware.kryo; org.apache.hadoop .; javax.annotation .; org.slf4j; org.apache.log4j; org.apache.logging; org. apache.commons.logging; ch.qos.logback“ 一个(以分号分隔的)模式列表,指定应始终首先通过父ClassLoader解析哪些类.模式是一个简单的前缀,它根据完全限定的类名进行检查.通常不应修改此设置.要添加其他模式,我们建议使用“classloader.parent-first-patterns.additional”.
classloader.resolve-order “child-first” 从用户代码加载类时定义类解析策略,这意味着是首先检查用户代码jar(“child-first”)还是应用程序类路径(“parent-first”).默认设置指示首先从用户代码jar加载类,这意味着用户代码jar可以包含和加载不同于Flink使用的(依赖)依赖项.
io.tmp.dirs YARN上的’LOCAL_DIRS’.Mesos上的’FLINKTMP_DIR’.独立的System.getProperty(“java.io.tmpdir”).
mode “new” 切换到选择执行模式.可能的值为“new”和“legacy”.
parallelism.default 1  

JobManager

默认 描述
jobmanager.archive.fs.dir (none)
jobmanager.execution.attempts-history-size 16 历史记录中保存的最大执行尝试次数.
jobmanager.execution.failover-strategy full 此选项指定作业计算如何从任务失败中恢复.可接受的值是:
‘full’:重新启动所有任务.
‘individual’:仅重新启动失败的任务.仅当所有任务都是独立组件时才应使用.
‘region’:重新启动可能受任务失败影响的所有任务.
jobmanager.heap.size “1024m” JobManager的JVM堆大小.
jobmanager.resourcemanager.reconnect-interval 2000 此选项指定在与资源管理器的连接丢失时触发资源管理器重新连接的时间间隔.此选项仅供内部使用.
jobmanager.rpc.address (none) config参数定义要连接的网络地址以与JobManager进行通信.此值仅在具有静态名称或地址的单个JobManager存在的设置中解释(简单的独立设置或具有动态服务名称解析的容器设置).当使用Leader选举服务(如ZooKeeper)从潜在的多个Slave JobManagers中选择和发现JobManagerLeader时,它不会在许多高可用性设置中使用.
jobmanager.rpc.port 6123 config参数定义要连接的网络端口以与JobManager进行通信.与jobmanager.rpc.address一样,此值仅在设置中解释,其中存在具有静态名称/地址和端口的单个JobManager(简单的独立设置或具有动态服务名称解析的容器设置).当使用Leader选举服务(如ZooKeeper)从潜在的多个Slave JobManagers中选择和发现JobManagerLeader时,此配置选项不会用于许多高可用性设置.
jobstore.cache-size 52428800 作业存储缓存大小(以字节为单位),用于将已完成的作业保存在内存中.
jobstore.expiration-time 3600 完成作业到期并从作业库中清除的时间(以秒为单位).
slot.idle.timeout 50000 Slot Pool中空闲槽的超时时间(以ms为单位).
slot.request.timeout 300000 从Slot Pool请求插槽的超时(以ms为单位).

TaskManager

默认 描述
task.cancellation.interval 30000 两次连续任务取消尝试之间的时间间隔(以ms为单位).
task.cancellation.timeout 180000 超时(以ms为单位),在此之后任务取消超时并导致致命的TaskManager错误.值为0将禁用看门狗.
task.cancellation.timers.timeout 7500
task.checkpoint.alignment.max-size -1 检查点对齐可以缓冲的最大字节数.如果检查点对齐缓冲超过配置的数据量,则中止检查点(跳过).值-1表示没有限制.
taskmanager.data.port 0 TaskManager的端口用于数据交换 算子操作.
taskmanager.data.ssl.enabled true 为taskmanager数据传输启用SSL支持.仅当内部SSL的全局标志(security.ssl.internal.enabled)设置为true时,此选项才适用
taskmanager.debug.memory.log false 指示是否启动线程的标志,该线程重复记录JVM的内存使用情况.
taskmanager.debug.memory.log-interval 5000 日志线程记录当前内存使用情况的时间间隔(以ms为单位).
taskmanager.exit-on-fatal-akka-error false 是否应启动TaskManager的隔离监视器.如果隔离监视器检测到它已隔离另一个actor系统或者它已被另一个actor系统隔离,则会关闭该actor系统.
taskmanager.heap.size “1024m” TaskManagers的JVM堆大小,它是系统的并行工作者.在YARN设置中,此值自动配置为TaskManager的YARN容器的大小,减去一定的容差值.
taskmanager.host (none) TaskManager绑定到的网络接口的主机名.默认情况下,TaskManager搜索可以连接到JobManager和其他TaskManagers的网络接口.如果该策略由于某种原因失败,则此选项可用于定义主机名.由于不同的TaskManagers需要此选项的不同值,因此通常在其他非共享的特定于TaskManager的配置文件中指定.
taskmanager.jvm-exit-on-oom false 是否在任务线程抛出OutOfMemoryError时终止TaskManager.
taskmanager.memory.fraction 0.7 TaskManager为排序,哈希表和中间结果的缓存预留的相对内存量(在减去网络缓冲区使用的内存量之后).例如,值“0.8”表示TaskManager为内部数据缓冲区保存80%的内存,为TaskManager的堆留下20%的可用内存,用于由用户定义的函数创建的对象.仅当未设置taskmanager.memory.size时,才会评估此参数.
taskmanager.memory.off-heap false 内存分配方法(JVM堆或堆外),用于TaskManager的托管内存以及网络缓冲区.
taskmanager.memory.preallocate false 在TaskManager启动时是否应预先分配TaskManager托管内存.
taskmanager.memory.segment-size “32KB” 网络堆栈和内存管理器使用的内存缓冲区的大小.
taskmanager.memory.size “0” TaskManager的内存管理器分配的内存量.如果未设置,将分配相对分数.
taskmanager.network.detailed-metrics false 布尔标志,用于启用/禁用有关入站/出站网络队列长度的更详细指标.
taskmanager.network.memory.buffers-per-channel 2 每个传出/传入通道(子分区/输入通道)使用的最大网络缓冲区数.在基于信用的流量控制模式下,这表示每个输入通道中有多少信用.它应配置至少2以获得良好的性能.1个缓冲区用于接收子分区中的飞行中数据,1个缓冲区用于并行序列化.
taskmanager.network.memory.floating-buffers-per-gate 8 每个输出/输入门(结果分区/输入门)使用的额外网络缓冲区数.在基于信用的流量控制模式中,这表示在所有输入通道之间共享多少浮动信用.浮动缓冲区基于积压(子分区中的实时输出缓冲区)反馈来分布,并且可以帮助减轻由子分区之间的不平衡数据分布引起的背压.如果节点之间的往返时间较长和/或群集中的机器数量较多,则应增加此值.
taskmanager.network.memory.fraction 0.1 用于网络缓冲区的JVM内存的分数.这决定了TaskManager可以同时拥有多少流数据交换通道以及通道缓冲的程度.如果作业被拒绝或者您收到系统没有足够缓冲区的警告,请增加此值或下面的最小/最大值.另请注意,“taskmanager.network.memory.min”和“taskmanager.network.memory.max”可能会覆盖此分数.
taskmanager.network.memory.max “1GB” 网络缓冲区的最大内存大小.
taskmanager.network.memory.min “64MB” 网络缓冲区的最小内存大小.
taskmanager.network.request-backoff.initial 100 输入通道的分区请求的最小退避.
taskmanager.network.request-backoff.max 10000 输入通道的分区请求的最大退避.
taskmanager.numberOfTaskSlots 1 单个TaskManager可以运行的并行算子或用户函数实例的数量.如果此值大于1,则单个TaskManager将获取函数或 算子的多个实例.这样,TaskManager可以使用多个CPU内核,但同时,可用内存在不同的算子或函数实例之间划分.此值通常与TaskManager的计算机具有的物理CPU核心数成比例(例如,等于核心数,或核心数的一半).
taskmanager.registration.initial-backoff “500ms” 两次连续注册尝试之间的初始注册退避.每次新注册尝试的退避加倍,直到达到最大注册退避.
taskmanager.registration.max-backoff “30s” 两次连续注册尝试之间的最大注册退避.最大注册退避需要时间单位指定符(ms / s / min / h / d).
taskmanager.registration.refused-backoff “10s” 注册后的退避已被作业管理员拒绝,然后重试连接.
taskmanager.registration.timeout “5m” 定义TaskManager注册的超时.如果在未成功注册的情况下超过持续时间,则TaskManager将终止.
taskmanager.rpc.port “0” TaskManager的IPC端口.接受端口列表(“50100,50101”),范围(“50100-50200”)或两者的组合.建议在同一台计算机上运行多个TaskManagers时设置一系列端口以避免冲突.

分布式协调(通过Akka)

默认 描述
akka.ask.timeout “10s” 超时用于所有期货并阻止Akka通话.如果Flink由于超时而失败,那么您应该尝试增加此值.超时可能是由于机器速度慢或网络拥挤造成的.超时值需要时间单位指定符(ms / s / min / h / d).
akka.client-socket-worker-pool.pool-size-factor 1.0 池大小因子用于使用以下公式确定线程池大小:ceil(可用处理器*因子).然后,结果大小由pool-size-min和pool-size-max值限制.
akka.client-socket-worker-pool.pool-size-max 2 将基于因子的数量限制为的最大线程数.
akka.client-socket-worker-pool.pool-size-min 1 将基于因子的数量限制为的最小线程数.
akka.client.timeout “60s” 客户端的所有阻塞调用超时.
akka.fork-join-executor.parallelism-factor 2.0 并行因子用于使用以下公式确定线程池大小:ceil(可用处理器*因子).然后,得到的大小受parallelism-min和parallelism-max值的限制.
akka.fork-join-executor.parallelism-max 64 将基于因子的并行数量限制为的最大线程数.
akka.fork-join-executor.parallelism-min 8 将基于因子的并行数量限制为的最小线程数.
akka.framesize “10485760b” 在JobManager和TaskManager之间发送的消息的最大大小.如果Flink由于消息超出此限制而失败,那么您应该增加它.邮件大小需要大小单位说明符.
akka.jvm-exit-on-fatal-error true 退出JVM致命的Akka错误.
akka.log.lifecycle.events false 打开Akka远程记录事件.在调试时将此值设置为“true”.
akka.lookup.timeout “10s” 用于查找JobManager的超时.超时值必须包含时间单位说明符(ms / s / min / h / d).
akka.retry-gate-closed-for 50 断开远程连接后,应关闭门的ms数.
akka.server-socket-worker-pool.pool-size-factor 1.0 池大小因子用于使用以下公式确定线程池大小:ceil(可用处理器*因子).然后,结果大小由pool-size-min和pool-size-max值限制.
akka.server-socket-worker-pool.pool-size-max 2 将基于因子的数量限制为的最大线程数.
akka.server-socket-worker-pool.pool-size-min 1 将基于因子的数量限制为的最小线程数.
akka.ssl.enabled true 为Akka的远程通信打开SSL.仅当全局ssl标志security.ssl.enabled设置为true时,这才适用.
akka.startup-timeout (none) 超时之后,远程组件的启动被视为失败.
akka.tcp.timeout “20s” 所有出站连接超时.如果由于网络速度较慢而导致连接到TaskManager时遇到问题,则应增加此值.
akka.throughput 15 在将线程返回到池之前批处理的消息数.较低的值表示公平的调度,而较高的值可以以不公平为代价来提高性能.
akka.transport.heartbeat.interval “1000s” Akka传输故障检测器的心跳间隔.由于Flink使用TCP,因此不需要检测器.因此,通过将间隔设置为非常高的值来禁用检测器.如果您需要传输故障检测器,请将间隔设置为某个合理的值.间隔值需要时间单位指定符(ms / s / min / h / d).
akka.transport.heartbeat.pause “6000s” Akka的传输故障检测器可接受的心跳暂停.由于Flink使用TCP,因此不需要检测器.因此,通过将暂停设置为非常高的值来禁用检测器.如果您需要传输故障检测器,请将暂停设置为某个合理的值.暂停值需要时间单位指定符(ms / s / min / h / d).
akka.transport.threshold 300.0 传输故障检测器的阈值.由于Flink使用TCP,因此检测器不是必需的,因此阈值被设置为高值.
akka.watch.heartbeat.interval “10s” Akka的DeathWatch机制检测死亡TaskManagers的心跳间隔.如果由于心跳消息丢失或延迟而导致TaskManagers被错误地标记为死亡,那么您应该减小此值或增加akka.watch.heartbeat.pause.可在此处找到Akka的DeathWatch的详尽描述
akka.watch.heartbeat.pause “60s” Akka的DeathWatch机制可接受的心跳暂停.较低的值不允许心律不齐.如果由于心跳消息丢失或延迟而导致TaskManagers被错误地标记为死亡,那么您应该增加此值或Reduceakka.watch.heartbeat.interval.较高的值会增加检测死的TaskManager的时间.可在此处找到Akka的DeathWatch的详尽描述
akka.watch.threshold 12 DeathWatch故障检测器的阈值.较低的值容易出现误报,而较高的值会增加检测死的TaskManager的时间.可在此处找到Akka的DeathWatch的详尽描述

REST

默认 描述
rest.address (none) 客户端应该用于连接到服务器的地址.
rest.await-leader-timeout 30000 客户端等待Leader地址的时间(以ms为单位),例如Dispatcher或WebMonitorEndpoint
rest.bind-address (none) 服务器绑定自身的地址.
rest.client.max-content-length 104857600 客户端将处理的最大内容长度(以字节为单位).
rest.connection-timeout 15000 客户端建立TCP连接的最长时间(以ms为单位).
rest.port 8081 服务器侦听的端口/客户端连接到的端口.
rest.retry.delay 3000 客户端在重试之间等待的时间(以ms为单位)(另请参阅“rest.retry.max-attempts”).
rest.retry.max-attempts 20 如果可重试 算子操作失败,客户端将尝试重试的次数.
rest.server.max-content-length 104857600 服务器将处理的最大内容长度(以字节为单位).

Blob服务器

默认 描述
blob.fetch.backlog 1000 config参数定义JobManager上BLOB提取的积压.
blob.fetch.num-concurrent 50 config参数定义JobManager服务的最大并发BLOB提取数.
blob.fetch.retries 5 config参数定义失败的BLOB提取的退出次数.
blob.offload.minsize 1048576 要卸载到BlobServer的消息的最小大小.
blob.server.port “0” config参数定义blob服务的服务器端口.
blob.service.cleanup.interval 3600 TaskManager中blob缓存的清理间隔(以秒为单位).
blob.service.ssl.enabled true 用于覆盖blob服务传输的ssl支持的标志.
blob.storage.directory (none) config参数,用于定义blob服务器使用的存储目录.

心跳管理器

默认 描述
heartbeat.interval 10000 从发送方请求心跳的时间间隔.
heartbeat.timeout 50000 为发送方和接收方双方请求和接收心跳的超时.

SSL设置

默认 描述
security.ssl.algorithms “TLS_RSA_WITH_AES_128_CBC_SHA” 要支持的标准SSL算法的逗号分隔列表.在这里阅读更多
security.ssl.internal.enabled false 打开SSL以进行内部网络通信.可选地,特定组件可以通过它们自己的设置(rpc,数据传输,REST等)覆盖它.
security.ssl.internal.key-password (none) 解密Keys库中Flink内部端点(rpc,数据传输,blob服务器)Keys的密钥.
security.ssl.internal.keystore (none) 带有SSLKeys和证书的JavaKeys库文件,用于Flink的内部端点(rpc,数据传输,blob服务器).
security.ssl.internal.keystore-password (none) 为Flink的内部端点(rpc,数据传输,blob服务器)解密Flink的Keys库文件的密钥.
security.ssl.internal.truststore (none) 包含公共CA证书的信任库文件,用于验证Flink内部端点(rpc,数据传输,blob服务器)的对等方.
security.ssl.internal.truststore-password (none) 用于解密Flink内部端点(rpc,数据传输,blob服务器)的信任库的密码.
security.ssl.key-password (none) 解密Keys库中的服务器Keys的密钥.
security.ssl.keystore (none) flink端点用于其SSLKeys和证书的JavaKeys库文件.
security.ssl.keystore-password (none) 解密Keys库文件的密钥.
security.ssl.protocol “TLSv1.2” ssl传输支持的SSL协议版本.请注意,它不支持以逗号分隔的列表.
security.ssl.rest.enabled false 通过REST端点打开SSL以进行外部通信.
security.ssl.rest.key-password (none) 解密Flink外部REST端点的Keys库中的Keys的密钥.
security.ssl.rest.keystore (none) 带有SSLKeys和证书的JavaKeys库文件,用于Flink的外部REST端点.
security.ssl.rest.keystore-password (none) 为Flink的外部REST端点解密Flink的Keys库文件的密钥.
security.ssl.rest.truststore (none) 包含公共CA证书的信任库文件,用于验证Flink的外部REST端点的对等方.
security.ssl.rest.truststore-password (none) 用于解密Flink外部REST端点的信任库的密码.
security.ssl.truststore (none) 信任库文件,包含flink端点用于验证对等方证书的公共CA证书.
security.ssl.truststore-password (none) 解密信任库的秘诀.
security.ssl.verify-hostname true 标记以在ssl握手期间启用对等方的主机名验证.

网络通讯(通过Netty)

这些参数允许高级调整.在大型群集上运行并发高吞吐量作业时,默认值就足够了.

默认 描述
taskmanager.network.netty.client.connectTimeoutSec 120 Netty客户端连接超时.
taskmanager.network.netty.client.numThreads -1 Netty客户端线程的数量.
taskmanager.network.netty.num-arenas -1 Netty竞技场的数量.
taskmanager.network.netty.sendReceiveBufferSize 0 Netty发送和接收缓冲区大小.这默认为系统缓冲区大小(cat / proc / sys / net / ipv4 / tcp_ [rw] mem),在现代Linux中为4 MiB.
taskmanager.network.netty.server.backlog 0 netty服务器连接积压.
taskmanager.network.netty.server.numThreads -1 Netty服务器线程数.
taskmanager.network.netty.transport “nio” Netty传输类型,“nio”或“epoll”

Web前端

默认 描述
web.access-control-allow-origin “*”
web.address (none)
web.backpressure.cleanup-interval 600000
web.backpressure.delay-between-samples 50
web.backpressure.num-samples 100
web.backpressure.refresh-interval 60000
web.checkpoints.history 10
web.history 5
web.log.path (none)
web.refresh-interval 3000
web.ssl.enabled true
web.submit.enable true
web.timeout 10000
web.tmpdir System.getProperty( “java.io.tmpdir”)
web.upload.dir (none)

文件系统

默认 描述
fs.default-scheme (none) 默认文件系统方案,用于未明确声明方案的路径.可能包含权限,例如,在HDFS NameNode的情况下为host:port.
fs.output.always-create-directory false 以大于1的并行度运行的文件编写器为输出文件路径创建目录,并将不同的结果文件(每个并行编写器任务一个)放入该目录中.如果此选项设置为“true”,则并行度为1的编写器也将创建一个目录并将单个结果文件放入其中.如果该选项设置为“false”,则编写器将直接在输出路径上直接创建文件,而不创建包含目录.
fs.overwrite-files false 指定默认情况下文件输出编写器是否应覆盖现有文件.设置为“true”以默认覆盖,否则设置为“false”.

编译/优化

默认 描述
compiler.delimited-informat.max-line-samples 10 编译器为分隔输入采用的最大行样本数.样本用于估计记录数.可以使用输入格式的参数覆盖特定输入的此值.
compiler.delimited-informat.max-sample-len 2097152 编译器用于分隔输入的行样本的最大长度.如果单个样本的长度超过此值(可能是因为解析器配置错误),则取样将中止.可以使用输入格式的参数覆盖特定输入的此值.
compiler.delimited-informat.min-line-samples 2 编译器为分隔输入采用的最小行样本数.样本用于估计记录数.可以使用输入格式的参数覆盖特定输入的此值

运行时算法

默认 描述
taskmanager.runtime.hashjoin-bloom-filters false 用于在混合散列连接实现中激活/停用bloom过滤器的标志.如果散列连接需要溢出到磁盘(数据集大于保存的内存部分),这些布隆过滤器可以大大Reduce溢出记录的数量,但需要花费一些CPU周期.
taskmanager.runtime.max-fan 128 外部合并的最大扇入连接和扇出用于溢出哈希表.限制每个 算子的文件句柄数,但如果设置得太小,可能会导致中间合并/分区.
taskmanager.runtime.sort-spilling-threshold 0.8 当这部分内存预算已满时,排序 算子操作开始溢出.

Resource Manager

本节中的配置键独立于使用的资源管理框架(YARN,Mesos,Standalone,…)

默认 描述
containerized.heap-cutoff-min 600 作为安全边际,要在容器中删除的最小堆内存量.
containerized.heap-cutoff-ratio 0.25 要从容器中删除的堆空间百分比(YARN / Mesos),以补偿其他JVM内存使用情况.
local.number-resourcemanager 1
resourcemanager.job.timeout “5m” 没有TaskManager作为Leader的工作超时.
resourcemanager.rpc.port 0 定义要连接的网络端口以与资源管理器进行通信.默认情况下,JobManager的端口,因为使用了相同的ActorSystem.无法使用此配置键定义端口范围.
resourcemanager.taskmanager-timeout 30000 释放空闲TaskManager的超时.

YARN

默认 描述
yarn.application-attempts (none) ApplicationMaster重启次数.请注意,整个Flink群集将重新启动,YARN客户端将断开连接.此外,JobManager地址将更改,您需要手动设置JM主机:port.建议将此选项保存为1.
yarn.application-master.port “0” 使用此配置选项,用户可以为Application Master(和JobManager)RPC端口指定端口,一系列端口或端口列表.默认情况下,我们建议使用默认值(0)让 算子操作系统选择适当的端口.特别是当多个AM在同一物理主机上运行时,固定端口分配会阻止AM启动.例如,在具有限制性防火墙的环境中在YARN上运行Flink时,此选项允许指定一系列允许的端口.
yarn.appmaster.rpc.address (none) 应用程序主RPC系统正在侦听的主机名或地址.
yarn.appmaster.rpc.port -1 应用程序主RPC系统正在侦听的端口.
yarn.containers.vcores -1 每个YARN容器的虚拟核心数(vcores).默认情况下,vcores的数量设置为每个TaskManager的插槽数(如果已设置),或者设置为1,否则设置为1.为了使用此参数,您的群集必须启用CPU调度.您可以通过设置来完成此 算子操作org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.
yarn.heartbeat-delay 5 使用ResourceManager的心跳之间的时间,以秒为单位.
yarn.maximum-failed-containers (none) 系统在发生故障时将重新分配的最大容器数.
yarn.per-job-cluster.include-user-jar “ORDER” 定义用户jar是否包含在每个作业集群的系统类路径中以及它们在路径中的位置.它们可以位于开头(“FIRST”),末尾(“LAST”),或者根据其名称(“ORDER”)定位.将此参数设置为“DISABLED”会导致jar包含在用户类路径中.
yarn.properties-file.location (none) 将Flink作业提交给YARN时,JobManager的主机和可用处理槽的数量将写入属性文件,以便Flink客户端能够选择这些详细信息.此配置参数允许更改该文件的默认位置(例如,对于在用户之间共享Flink安装的环境).
yarn.tags (none) 要应用于Flink YARN应用程序的以逗号分隔的标记列表.

Mesos

默认 描述
mesos.failover-timeout 604800 Mesos调度程序的故障转移超时(以秒为单位),之后将自动关闭正在运行的任务.
mesos.initial-tasks 0 最初的工人在主人开始时提出来.除非Flink处于传统模式,否则将忽略此选项.
mesos.master (none) Mesos主URL.该值应采用以下形式之一:
主持人:port
ZK://主机1:端口1,主机2:端口2,… /路径
ZK://用户名:密码@主机1:端口1,主机2:端口2,… /路径
文件:///路径/到/文件
mesos.maximum-failed-tasks -1 群集失败前失败的最大工作数.可以设置为-1以禁用此函数.除非Flink处于传统模式,否则将忽略此选项.
mesos.resourcemanager.artifactserver.port 0 config参数定义要使用的Mesos工件服务器端口.将端口设置为0将允许 算子操作系统选择可用端口.
mesos.resourcemanager.artifactserver.ssl.enabled true 为Flink工件服务器启用SSL.请注意,security.ssl.enabled也需要设置为true加密才能启用加密.
mesos.resourcemanager.framework.name “Flink” Mesos框架名称
mesos.resourcemanager.framework.principal (none) Mesos框架主体
mesos.resourcemanager.framework.role “*” Mesos框架角色定义
mesos.resourcemanager.framework.secret (none) Mesos框架密钥
mesos.resourcemanager.framework.user (none) Mesos框架用户
mesos.resourcemanager.tasks.port-assignments (none) 以逗号分隔的配置键列表,表示可配置端口.所有端口Keys将动态获取通过Mesos分配的端口.

Mesos TaskManager

默认 描述
mesos.constraints.hard.hostattribute (none) 基于代理属性在Mesos上放置任务的约束.采用逗号分隔的键:值对列表,对应于目标介质代理公开的属性.示例:az:eu-west-1a,系列:t2
mesos.resourcemanager.tasks.bootstrap-cmd (none) 在TaskManager启动之前执行的命令.
mesos.resourcemanager.tasks.container.docker.force-pull-image false 指示docker containerizer强制拉动镜像,而不是重用缓存版本.
mesos.resourcemanager.tasks.container.docker.parameters (none) 使用docker容器时,要传递给docker run命令的自定义参数.逗号分隔的“key = value”对列表.“值”可能包含“=”.
mesos.resourcemanager.tasks.container.image.name (none) 用于容器的映像名称.
mesos.resourcemanager.tasks.container.type “mesos” 使用的集装箱类型:“mesos”或“docker”.
mesos.resourcemanager.tasks.container.volumes (none) 逗号分隔的[host_path:] container_path [:RO | RW]列表.这允许将额外的卷安装到容器中.
mesos.resourcemanager.tasks.cpus 0.0 要分配给Mesos工作者的CPU.
mesos.resourcemanager.tasks.gpus 0 要分配给Mesos工作者的GPU.
mesos.resourcemanager.tasks.hostname (none) 用于定义TaskManager主机名的可选值.模式TASK由Mesos任务的实际ID替换.这可用于配置TaskManager以使用Mesos DNS(例如TASK.flink-service.mesos)进行名称查找.
mesos.resourcemanager.tasks.mem 1024 要以MB为单位分配给Mesos worker的内存.
mesos.resourcemanager.tasks.taskmanager-cmd “$FLINK_HOME/bin/mesos-taskmanager.sh”
mesos.resourcemanager.tasks.uris (none) 以逗号分隔的自定义工件URI列表,这些URI将下载到Mesos工作者的沙箱中.
taskmanager.numberOfTaskSlots 1 单个TaskManager可以运行的并行算子或用户函数实例的数量.如果此值大于1,则单个TaskManager将获取函数或 算子的多个实例.这样,TaskManager可以使用多个CPU内核,但同时,可用内存在不同的算子或函数实例之间划分.此值通常与TaskManager的计算机具有的物理CPU核心数成比例(例如,等于核心数,或核心数的一半).

高可用性(HA)

默认 描述
high-availability “NONE” 定义用于群集执行的高可用性模式.要启用高可用性,请将此模式设置为“ZOOKEEPER”.
high-availability.cluster-id “/default” Flink集群的ID,用于将多个Flink集群彼此分开.需要为独立群集设置,但在YARN和Mesos中自动推断.
high-availability.job.delay (none) 故障转移后JobManager之前的时间恢复当前作业.
high-availability.jobmanager.port “0” JobManager在高可用性模式下使用的可选端口(范围).
high-availability.storageDir (none) 文件系统路径(URI)Flink在高可用性设置中持久保存元数据.

基于ZooKeeper的HA模式

默认 描述
high-availability.zookeeper.client.acl “open” 定义要在ZK节点上配置的ACL(open | creator).如果ZooKeeper服务器配置将“authProvider”属性映射为使用SASLAuthenticationProvider并且群集配置为以安全模式(Kerberos)运行,则可以将配置值设置为“creator”.
high-availability.zookeeper.client.connection-timeout 15000 定义ZooKeeper的连接超时(以ms为单位).
high-availability.zookeeper.client.max-retry-attempts 3 定义客户端放弃之前的连接重试次数.
high-availability.zookeeper.client.retry-wait 5000 定义以ms为单位的连续重试之间的暂停.
high-availability.zookeeper.client.session-timeout 60000 以ms为单位定义ZooKeeper会话的会话超时.
high-availability.zookeeper.path.checkpoint-counter “/checkpoint-counter” ZooKeeper根路径(ZNode)用于检查点计数器.
high-availability.zookeeper.path.checkpoints “/checkpoints” 已完成检查点的ZooKeeper根路径(ZNode).
high-availability.zookeeper.path.jobgraphs “/jobgraphs” 作业图的ZooKeeper根路径(ZNode)
high-availability.zookeeper.path.latch “/leaderlatch” 定义用于选择Leader的Leader锁存器的znode.
high-availability.zookeeper.path.leader “/leader” 定义Leader的znode,其中包含Leader的URL和当前Leader会话ID.
high-availability.zookeeper.path.mesos工 “/mesos-workers” ZooKeeper根路径,用于保存Mesos工作者信息.
high-availability.zookeeper.path.root “/flink” Flink在ZooKeeper中存储其条目的根路径.
high-availability.zookeeper.path.running的注册表 “/running_job_registry/”
high-availability.zookeeper.quorum (none) 使用ZooKeeper在高可用性模式下运行Flink时要使用的ZooKeeper quorum.

ZooKeeper安全

默认 描述
zookeeper.sasl.disable false
zookeeper.sasl.login-context-name “Client”
zookeeper.sasl.service-name “zookeeper”  

基于Kerberos的安全性

默认 描述
security.kerberos.login.contexts (none) 以逗号分隔的登录上下文列表,用于提供Kerberos凭据(例如,Client,KafkaClient使用凭证进行ZooKeeper身份验证和Kafka身份验证)
security.kerberos.login.keytab (none) 包含用户凭据的KerberosKeys表文件的绝对路径.
security.kerberos.login.principal (none) 与keytab关联的Kerberos主体名称.
security.kerberos.login.use-ticket-cache true 指示是否从Kerberos票证缓存中读取.

环境

默认 描述
env.hadoop.conf.dir (none) hadoop配置目录的路径.需要读取HDFS和/或YARN配置.您也可以通过环境变量进行设置.
env.java.opts (none)
env.java.opts.jobmanager (none)
env.java.opts.taskmanager (none)
env.log.dir (none) 定义保存Flink日志的目录.它必须是一条绝对的道路.(默认为Flink主页下的日志目录)
env.log.max 5 要保存的最大旧日志文件数.
env.ssh.opts (none) 启动或停止JobManager,TaskManager和Zookeeper服务时,其他命令行选项传递给SSH客户端(start-cluster.sh,stop-cluster.sh,start-zookeeper-quorum.sh,stop-zookeeper-quorum.sh).
env.yarn.conf.dir (none) YARN配置目录的路径.它需要在YARN上运行flink.您也可以通过环境变量进行设置.

检查点

默认 描述
state.backend (none) 状态后台用于存储和检查点状态.
state.backend.async true 选择状态后台是否应在可能和可配置的情况下使用异步SNAPSHOT方法.某些状态后台可能不支持异步SNAPSHOT,或者仅支持异步SNAPSHOT,并忽略此选项.
state.backend.fs.memory-threshold 1024 状态数据文件的最小大小.小于该值的所有状态块都内联存储在根检查点元数据文件中.
state.backend.incremental false 如果可能,选择状态后台是否应创建增量检查点.对于增量检查点,仅存储来自先前检查点的差异,而不是完整的检查点状态.某些状态后台可能不支持增量检查点并忽略此选项.
state.backend.local-recovery false
state.checkpoints.dir (none) 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录.必须可以从所有参与的进程/节点(即所有TaskManagers和JobManagers)访问存储路径.
state.checkpoints.num-retained 1 要保存的已完成检查点的最大数量.
state.savepoints.dir (none) 保存点的默认目录.由将后台写入文件系统的状态后台(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)使用.
taskmanager.state.local.root-dirs (none)  

可查询状态

默认 描述
query.client.network-threads 0 网络数(Netty的事件循环)可查询状态客户端的线程.
query.proxy.network-threads 0 网络数(Netty的事件循环)可查询状态代理的线程.
query.proxy.ports “9069” 可查询状态代理的端口范围.指定范围可以是单个端口:“9123”,一系列端口:“50100-50200”,或范围和端口列表:“50100-50200,50300-50400,51234”.
query.proxy.query-threads 0 可查询状态代理的查询线程数.如果设置为0,则使用插槽数.
query.server.network-threads 0 网络数(Netty的事件循环)可查询状态服务器的线程.
query.server.ports “9067” 可查询状态服务器的端口范围.指定范围可以是单个端口:“9123”,一系列端口:“50100-50200”,或范围和端口列表:“50100-50200,50300-50400,51234”.
query.server.query-threads 0 可查询状态服务器的查询线程数.如果设置为0,则使用插槽数.

度量

默认 描述
metrics.latency.granularity “operator” 定义延迟指标的粒度.可接受的值是:
单一 - 跟踪延迟,无需区分源和子任务.
operator - 跟踪延迟,同时区分源,但不区分子任务.
子任务 - 在区分源和子任务时跟踪延迟.
metrics.latency.history-size 128 定义每个算子维护的测量延迟数.
metrics.latency.interval 0 定义从源发出延迟跟踪标记的间隔.如果设置为0或负值,则禁用延迟跟踪.启用此函数会显着影响群集的性能.
metrics.reporter.<name> .<parameter> (none) 为名为的报告器配置参数.
metrics.reporter.<name>.class (none) 报告类用于为报告命名.
metrics.reporter.<name>.interval (none) 报告间隔用于报告名为.
metrics.reporters (none)
metrics.scope.delimiter “”
metrics.scope.jm .jobmanager” 定义应用于作用于JobManager的所有度量标准的范围格式字符串.
metrics.scope.jm.job .jobmanager.<job_name>” 定义范围格式字符串,该字符串应用于作用于JobManager上作业的所有度量标准.
metrics.scope.operator .taskmanager.<tm_id> <job_name> <operator_name> <subtask_index>” 定义应用于作用于 算子的所有度量标准的范围格式字符串.
metrics.scope.task .taskmanager.<tm_id> <job_name> <task_name> <subtask_index>” 定义应用于作用于任务的所有度量标准的范围格式字符串.
metrics.scope.tm .taskmanager.<tm_id>” 定义应用于作用于TaskManager的所有度量标准的范围格式字符串.
metrics.scope.tm.job .taskmanager.<tm_id> <job_name>” 定义范围格式字符串,该字符串应用于作用于TaskManager上作业的所有度量标准.
metrics.system-resource false
metrics.system-resource-probing-interval 5000  

历史服务器

如果要通过HistoryServer的Web前端显示它们,则必须进行配置jobmanager.archive.fs.dir以存档已终止的作业并将其添加到受监视目录列表中historyserver.archive.fs.dir.

  • jobmanager.archive.fs.dir:将有关已终止作业的信息上载到的目录.您必须将此目录添加到历史服务器的受监视目录列表中historyserver.archive.fs.dir.
默认 描述
historyserver.archive.fs.dir (none) 以逗号分隔的目录列表,用于从中获取已归档的作业.历史服务器将监视这些目录以获取已存档的作业.您可以将JobManager配置为通过jobmanager.archive.fs.dir将作业存档到目录.
historyserver.archive.fs.refresh-interval 10000 刷新已归档作业目录的时间间隔(以ms为单位).
historyserver.web.address (none) HistoryServer的Web界面的地址.
historyserver.web.port 8082 HistoryServers的Web界面的端口.
historyserver.web.refresh-interval 10000
historyserver.web.ssl.enabled false 启用对HistoryServer Web前端的HTTP访问.仅当全局SSL标志security.ssl.enabled设置为true时,此选项才适用.
historyserver.web.tmpdir (none) 此配置参数允许定义历史服务器Web界面使用的Flink Web目录.Web界面将其静态文件复制到目录中.

留存

  • mode:Flink的执行模式.可能的值是legacy和new.要启动旧组件,您必须指定legacy(DEFAULT:)new.

背景

配置网络缓冲区

如果您看到异常java.io.IOException: Insufficient number of network buffers,则需要调整用于网络缓冲区的内存量,以便程序在您的TaskManager上运行.

网络缓冲区是通信层的关键资源.它们用于在通过网络传输之前缓冲记录,并在将传入数据解析为记录并将其传递给应用程序之前缓冲传入数据.足够数量的网络缓冲区对于实现良好的吞吐量至关重要.

从Flink 1.3开始,你可以遵循“越多越好”的成语而不会对延迟造成任何惩罚(我们通过限制每个通道使用的实际缓冲区数量来防止每个传出和传入通道中的过度缓冲,即缓冲膨胀) .

通常,将TaskManager配置为具有足够的缓冲区,以使您希望同时打开的每个逻辑网络连接都具有专用缓冲区.对于网络上的每个点对点数据交换存在逻辑网络连接,这通常发生在重新分区或广播步骤(混洗阶段).在那些中,TaskManager中的每个并行任务必须能够与所有其他并行任务进行通信.

注意:从Flink 1.5开始,网络缓冲区将始终在堆外分配,即在JVM堆之外,而不管其值是多少taskmanager.memory.off-heap.这样,我们可以将这些缓冲区直接传递给底层网络堆栈层.

设置内存分数

以前,手动设置网络缓冲区的数量,这成为一个非常容易出错的任务(见下文).从Flink 1.3开始,可以使用以下配置参数定义用于网络缓冲区的一小部分内存:

  • taskmanager.network.memory.fraction:用于网络缓冲区的JVM内存的分数(DEFAULT:0.1),
  • taskmanager.network.memory.min:网络缓冲区的最小内存大小(默认值:64MB),
  • taskmanager.network.memory.max:网络缓冲区的最大内存大小(默认值:1GB),和
  • taskmanager.memory.segment-size:内存管理器和网络堆栈使用的内存缓冲区大小(以字节为单位)(默认值:32KB).

直接设置网络缓冲区的数量

注意:不建议使用这种配置网络缓冲区使用的内存量的方法.请考虑使用上述方法定义要使用的内存部分.

缓冲器的上一个TaskManager所要求数量为 总度的平行度(数的目标) 节点内并行性(源在一个TaskManager数)× NN 是限定多少repartitioning-恒定/您希望同时处于活动状态的广播步骤.由于节点内并行*性通常是核心数量,并且超过4个重新分区或广播频道很少并行活动,因此它经常归结为

#slots-per-TM^2 #TMs 4

哪里#slots per TM是每个TaskManager插槽数量和#TMs是TaskManager的总数.

例如,为了支持20个8插槽机器的集群,您应该使用大约5000个网络缓冲区来获得最佳吞吐量.

默认情况下,每个网络缓冲区的大小为32 KiBytes.在上面的示例中,系统因此将为网络缓冲区分配大约300 MiBytes.

可以使用以下参数配置网络缓冲区的数量和大小:

  • taskmanager.network.numberOfBuffers,和
  • taskmanager.memory.segment-size.

配置临时I / O目录

虽然Flink的目标是尽可能多地处理主内存中的数据,但是需要处理的内存比内存更多的数据并不少见.Flink的运行时用于将临时数据写入磁盘以处理这些情况.

该taskmanager.tmp.dirs参数指定Flink写入临时文件的目录列表.目录的路径需要用’:’(冒号字符)分隔.Flink将同时向(从)每个配置的目录写入(或读取)一个临时文件.这样,临时I / O可以均匀地分布在多个独立的I / O设备(如硬盘)上,以提高性能.要利用快速I / O设备(例如,SSD,RAID,NAS),可以多次指定目录.

如果taskmanager.tmp.dirs未显式指定参数,Flink会将临时数据写入 算子操作系统的临时目录,例如Linux系统中的/ tmp.

配置TaskManager处理槽

Flink通过将程序拆分为子任务并将这些子任务调度到处理槽来并行执行程序.

每个Flink TaskManager都在集群中提供处理插槽.插槽数通常与每个 TaskManager 的可用CPU核心数成比例.作为一般建议,可用的CPU核心数量是一个很好的默认值taskmanager.numberOfTaskSlots.

启动Flink应用程序时,用户可以提供用于该作业的默认插槽数.因此调用命令行值-p(用于并行).此外,可以为整个应用程序和各个算子设置编程API中的插槽数.

img