有赞实时计算 Flink 1.13 升级实践

一、背景

​ 随着有赞实时计算业务场景全部以 Flink SQL 的方式接入,对有赞现有的引擎版本——Flink 1.10 的 SQL 能力提出了越来越多无法满足的需求以及可以优化的功能点。目前有赞的 Flink SQL 是在 Yarn 上运行,但是在公司应用容器化的背景下,可以统一使用公司 K8S 资源池,同时考虑到任务之间的隔离性以及任务的弹性调度,Flink SQL 任务 K8S 化是必须进行的,所以我们也希望通过这次升级直接利社区的 on K8S 能力,直接将Flink SQL 集群迁移到 K8S 上。特别是社区在 Flink 1.13 中 on Native K8S 能力的支持完善,为了紧跟社区同时提升有赞实时计算引擎的能力,经过一些列调研,我们决定将有赞实时计算引擎由 Flink 1.10 升级到 Flink 1.13.2。

二、有赞业务场景下的升级到 Flink 1.13 收益评估

​ 社区在发布 Flink 1.13 后相比于 Flink 1.10 有了很多的新特性和优化,有些新特性在有赞场景下可能并未用到,所以接下来将主要从以下几个方面介绍一下在有赞业务场景下升级到 Flink 1.13 的一些收益。

2.1 Flink SQL 相关收益

​ 由于目前几乎所有的实时计算任务都通过 Flink SQL 方式实现,所以升级后关于 Flink SQL 上的一些优化是我们十分关注的,其中下面几点在升级后在有赞的实时计算业务场景下有很大的收益的:

(1) Flink SQL 语法更为简洁,提高开发效率

​ Flink 1.10 之后,社区提出了新的 connector 属性 key,SQL开发更为简洁,可以提升实时用户的开发作业效率。

(2)时区和时间函数相关优化

​ 由于 Flink 1.10 的时间函数在时区问题的不完善,用户在使用 currenttimestamp 和 currentday 等函数时由于时区问题需要额外的转换。而在 Flink 1.13 中对时区和时间函数进行纠正和优化,包括:

  • 相关时间函数考虑了时区问题

    • CURRENTTIMESTAMP/CURRENTTIME/CURRENT_DATE/NOW() 在 Flink 1.13 中考虑了时区问题,且为本地时区。
    • PROCTIME() 考虑了时区问题,且为本地时区。

    函数对比

  • 支持了TIMESTAMP_LTZ类型

    例如 CURRENTTIMESTAMP 函数返回值为 TIMESTAMPLTZ 类型,而不是 TIMESTAMP 类型。

    类型介绍

  • 夏令时支持 Flink 支持在 TIMESTAMPLTZ 列上定义时间属性,Flink SQL 在window 处理时结合 TIMESTAMP 和 TIMESTAMPLTZ, 优雅地支持了夏令时。

(3)支持 Window TVF 语法标准化

​ 在官方的介绍中,关于 Window TVF 包含四部分内容:Window TVF 语法,近实时累计计算,Window 性能优化,多维数据分析。 ​ 其中 Window TVF 语法在Flink 1.13 中用 Table-Valued Function 进行了语法标准化,在新的语法中支持 TUMBLE 和 HOP 窗口,我们通过以下两个例子来展示这一特性在某些场景下的应用:

  • 用户在 table-valued 窗口函数中可以访问窗口的起始和终止时间,从而使用户可以实现新的功能。例如,除了常规的基于窗口的聚合和 Join 之外,用户现在也可以实现基于窗口的 Top-K 聚合:
SELECT *  
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
    FROM (
      SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cnt
      FROM TABLE(
        TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
      GROUP BY window_start, window_end, supplier_id
    )
  ) WHERE rownum <= 3;
  • 新增了CUMULATE WINDOW 窗口,它可以支持按特定步长扩展的窗口,直到达到最大窗口大小,例如计算一段时间内的 PV, UV 等指标:

  SELECT window_time, window_start, window_end, SUM(price) AS total_price 
    FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end, window_time;
  • window1 统计的是第一个区间的数据;
  • window2 统计的是第一区间和第二个区间的数据;
  • window3 统计的是第一区间,第二个区间和第三个区间的数据。

​ 累积窗口可以对迟到的数据进行处理,比如某一个数据是 window1 的迟到数据,无法被 window1 统计进去,但是触发 window2 时,会把 window1 迟到的数据统计进去,而且 window2 会复用 window1 的统计结果,而不是重新计算一遍。

(4)Flink On Hive 的能力

​ 目前在有赞已经开始有部分实时业务方希望 Flink 能够支持 Hive,比如 Flink-Hive 近实时的数仓中间层【小时表可更快产出】,以及 Flink 实时任务和离线数据对比功能。而在 Flink 1.12 中,已经支持生产级别 Flink On Hive 任务运行(社区 Commitor 说),所以基于这次 Flink 1.13 引擎版本升级,能够支持 Flink on hive 生产功能。因此本次升级可以解决部分实时业务方, Flink On Hive 的业务需求,下面是 Flink 1.13具体 Hive 相关功能:

  • 支持 Hive 的 SQL 语法,支持常用的 Hive DML,DQL语法。
  • Hive 写入:FLIP-115 完善扩展了 FileSystem connector 的基础能力和实现,Table/SQL 层的 sink 可以支持各种格式(CSV、Json、Avro、Parquet、ORC),而且支持 Hive table 的所有格式。
  • FLIP-123 通过 Hive Dialect 为用户提供语法兼容,可以直接迁移 Hive 脚本到 Flink 中执行等等。

(5)其他功能及需求

  • 支持查看 SQL 的执行计划
  • 解决复杂 Quary 语句无法推断出 primary key 的问题
  • SQL Connectors 中的 Metadata 处理。如果可以将某些 source(和 format)的元数据作为额外字段暴露给用户,对于需要将元数据与记录数据一起处理的用户来说很有意义。一个常见的例子是 Kafka,用户可能需要访问 offset、partition 或 topic 信息、读写 kafka 消息中的 key 或 使用消息 metadata 中的时间戳进行时间相关的操作。
CREATEF TABLE table (  
id BIGINT,  
name STRING,  
event_time TIMESTAMP(3) METADATA FROM 'timestamp', -- access Kafka 'timestamp' metadata  
headers MAP METADATA -- access Kafka 'headers' metadata  
) WITH (
'connector' = 'kafka',  
'topic' = 'test-topic',  
'format' = 'avro'  
);

2.2 Flink on K8S 相关收益

​ 在 on K8S 层面考虑升级到 Flink 1.13 主要有以下几个方面收益:

(1) Flink 1.13 on K8S 更成熟稳定

​ 相比于Flink 1.11 和 Flink 1.12,在Flink 1.13 版本中 on K8S 模式上更加丰富,更为成熟稳定。而且社区后续肯定是在 Native K8S 或者 Application Level K8S 上面发力。目前 社区在Flink 1.13 中关于 K8S 已经有了下面一些优化和新特性:

  • 基于 Kubernetes 的高可用 (HA) 方案 Flink 可以利用 Kubernetes 提供的内置功能来实现 JobManager 的 failover,而不用依赖 ZooKeeper。为了实现不依赖于 ZooKeeper 的高可用方案,社区在 Flink 1.12(FLIP-144)中实现了基于 Kubernetes 的高可用方案。

  • 引入 Application 模式 按照 application 粒度来启动一个集群,属于这个 application 的所有 job 在这个集群中运行。核心是 Job Graph 的生成以及作业的提交不在客户端执行,而是转移到 JM 端执行,这样网络下载上传的负载也会分散到集群中,不再有上述 client 单点上的瓶颈。

(2)实时离线弹性扩缩容

​ 目前有赞的离线任务已经实现了较好的弹性扩缩容,当 Flink SQL 任务 K8S 化之后,可以和离线任务之间实现更好的弹性扩缩容,节省集群资源成本,这是十分有意义的。

2.3 状态保留和恢复相关收益

(1)基于Savepoint跨集群迁移的能力

​ 在 Flink 1.10 版本中,Savepoint 中 meta 数据和 state 数据存放的是绝对路径,这就造成了不能进行集群迁移,否则会造成任务状态丢失。而在 Flink 1.10 以后 savepoint 中 meta 数据和 state 数据保存在同一目录,方便整体转移和复用;把 state 引用改成了相对路径,这样即使迁移后路径发生变化依然可用。

(2)生产可用的 Unaligned Checkpoint

​ 用户现在使用 Unaligned Checkpoint 时也可以扩缩容应用。如果用户需要因为性能原因不能使用 Savepoint 而必须使用 Retained checkpoint 时,这一功能会非常方便。 ​
收益:这一特性极大提升我们 checkpoint 的性能,同时也优化了在反压场景下 checkpoint超时失败的问题,解决目前一些大状态任务经常 checkpoint 超时的问题。同时也符合我们利用 checkpoint 来做重启状态恢复的场景。

(3)优化失败 Checkpoint 的异常和失败原因的汇报

​ Flink 1.13 现在提供了失败或被取消的 Checkpoint 的统计,从而使用户可以更简单的判断 Checkpoint 失败的原因,而不需要去查看日志。Flink 1.13 之前的版本只有在 Checkpoint 成功的时候才会汇报指标(例如持久化数据的大小、触发时间等)。

2.4 支持 upsert kafka 和 更丰富 Format 格式

(1)支持 upsert kafka

​ 在 Flink 1.12 中支持了 Upsert kafka,这一特性在有赞的实时计算业务场景中可以在某些数据链路中保障数据一致性。对于公司现有的一些场景,Upsert-kafka可以解决一些典型场景的数据重复问题:比如下图展示的在有赞很常见的一条实时链路,上游数据可以能是 MySQL binlog 或者NSQ -> Kafka 进行数据同步,然后下游对 Kafka 数据进行按照 key 聚合,将聚合数据存到 mysql , tidb 等等。这是很容易产生的问题就是在中间环节写入 Kafka 时很可能因为容错恢复等一些原因造成数据重复,特别是在 checkpoint 时间比较大时,造成的重复的数据量会很大,在现有的解决方案中,往往需要业务方在写入Kafka 时进行幂等操作,比如存入ZanKV等方式进行幂等。但是现有的方式问题就是现在的幂等方式性能有限,同时不能做到完全幂等。 img

​ 而接入 Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。 作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。

​ 作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中,实现像 Hbase 一样的幂等写入。 img

(2)支持更丰富的 Format 格式

​ 在 Flink1.10 版本中对 Source 和 Sink 的 Format 支持是有限的,这也造成了我们业务方有些任务需要 Source 段支持更多的格式,比如 Kafka 支持 Raw、本地调试功能中Filesystem 需要支持 Json 等,这在 Flink 1.10 版本中是无法做到的。但是如果升级到 Flink 1.13 则可以完美解决这些问题。

2.5 其他相关收益

(1)查看JM和TM的内存相关指标

​ Flink 1.12 在 WebUI 上暴露了 JobManager 内存相关的指标和配置参数(FLIP-104)。对于 TaskManager 的指标页面也进行了更新,为 Managed Memory、Network Memory 和 Metaspace 添加了新的指标,以反映自 Flink 1.10(FLIP-102)开始引入的 TaskManager 内存模型的更改。

img

(2)WebUI界面查看被压情况

​ Flink 1.13 带来了一个改进的背压度量系统(使用任务邮箱计时而不是线程堆栈采样),以及一个重新设计的作业数据流图形表示,用颜色编码和繁忙度和背压比率表示。

img

(3)CPU 火焰图查看

​ 可以直观的看 CPU 的火焰图来确定以下指标:当前哪些方法在消耗 CPU 的资源、各个方法消耗的 CPU 的资源的多少对比、堆栈上的哪些调用会导致执行特定的方法。

(4)Web UI 支持历史异常

​ Flink Web UI 现在可以展示导致作业失败的 n 次历史异常,从而提升在一个异常导致多个后续异常的场景下的调试体验。用户可以在异常历史中找到根异常。

三、Flink 1.13 升级过程实践与踩坑

​ 实时计算平台 Flink 引擎从 Flink 1.10 升级到 Flink 1.13 的主要工作将主要集中在自定义 connector 的升级、SQL 语法升级转换、任务迁移验证等几个方面的实践和踩坑来介绍此次升级过程。

3.1 自定义 connector 升级

​ 目前有赞的实时计算平台的数据流如下图所示,包括有赞自研的 NSQ、Kafka、Mysql、TiDB、Clickhouse、Habse 等大数据组件。那么此次升级需要将一些官方没有提供以及一些已经定制化的 connector 升级,其中包括 NSQ connector,定制的无用户名密码的 jdbc connector, clickhouse connector, 定制的高可用 hbase connector等。

​ 本次升级 connector 的主要工作是在 Flink 1.10 中 DataStream 和 Table connector 都统一是用到的是 Row 这种数据结构。而Flink 1.11 在 FLIP-95 对 TableSource 和 TableSink API 进行了重构,新增了 Flink SQL 内部数据结构 RowData, 在一些场景的序列化有一定的提升。为此,我们需要对上述四种定制或者自定义的 table connector 进行升级重构,对于无用户名密码的 jdbc connector 的链接方式采用的是连接池构建链接的方式,但是采用链接池的方式构建链接时,如果对于 Flink 任务长时间没有数据流入则链接会被释放掉,如果再次过来数据用原来的链接去写入数据时会抛出链接被关闭的异常,导致任务出现频繁的重启:

​ 为解决上述问题,需要在 flush 前检查链接是否有效,如果连接失效需要重新构建链接:

3.2 UDF 兼容

​ 在 Flink 1.10 版本有赞实时计算平台根据业务需求提供了很多通用的 UDF, 如 Dubbo 调用,JSON 转换,动态过滤条件。同时用户也自定义了一部分 UDF。所以在升级的过程中需要保证 UDF 的兼容性。好在 Flink 本身对 UDF 做了良好的兼容性,我们只需要将 maven 中 flink-table-common 改成对应的 Flink 版本即可。其中要注意的一点是,在 Flink 1.13 版本中如果 UDF 的参数是 Object 需要加上注解 @DataTypeHint(inputGroup = InputGroup.ANY) 帮助 Flink 做类型推荐。

3.3 SQL 语法转换实践

​ 在 Flink 1.13 SQL 用法中相比于 Flink 1.10 的 SQL 用法主要有以下几部分存在差异:建表语句的配置项简化 、时间函数的优化导致类型不匹配、存在 upsert 操作的的建表语句中需要指定 primary key。为保证任务可以平滑的从 Flink 1.10 升级到 Flink 1.13,我们对目前集群已有的数百个 Flink 1.10 语法的 SQL 任务进行转换,自动生成 Flink 1.13 版本的语法。`

(1)建表语句的配置项转换

​ 在Flink 1.13 中社区提出了新的 connector 属性 key,SQL 开发更为简洁,如下图分别展示了 Kafka 作为数据源时在 Flink 1.10 语法中的 connector 属性配置以及转换后在 Flink 1.13 语法中的属性配置。

​ 从上图的对比可以看出 Flink 1.13 语法中的 connector 属性配置 相比于 Flink 1.10 语法更为简洁易懂。虽然 Flink 本身对老版本的 SQL connector 的配置依然兼容,但是为了让用户使用新版的语法,我们对 用户在 Flink 1.10 的任务 SQL 进行配置了转换。

值得注意的是:在一些 connector 的属性配置中,一些属性的 key 进行了改变,以 Kafka connector 为例,其中在 Flink 1.10 中 format.fail-on-not-json-record = false 要对应 Flink 1.13 中的 json.ignore-parse-errors = true 表示的是按照 JSON 格式解析数据失败则跳过。同样 connector.startup-mode = earliest-offsetscan.startup.mode = earliest-offset 都表示从 consumer 的最早的点位开始消费,但是配置的 key 已经改变了,这是大家在做新老版本语法转换需要注意的事情。

(2)时间函数类型逻辑转换及时间数据类型转换

在 Flink 1.13 中对一些时间函数进行了优化正如上一章的第一节所介绍的,那么在现有的 Flink 1.10 SQL 业务中,有些用户用到了相关的时间函数比如最常见的 currenttimestamp 函数,那么我们要对任务进行平滑升级时需要对使用 currenttimestamp 等时间函数进行相应的逻辑转换,主要是时区变更的转化和类型不匹配的转换。

  • 时间函数时区逻辑转换以 currenttimestamp 函数为例,在 Flink 1.10 版本中 currenttimestamp 未考虑时区是 UTC+0 时间,而升级 Flink 1.13 之后 current_timestamp 考虑时区,且是本地时区时间。因此在之前的任务中,有些任务为了解决时区问题在任务中加了8小时或者减了16小时(前一天时间)。那么针对已经进行了时区转换的任务,我们需要将对应的 8 小时 时差减去,因此关于这一点我们对 SQL 任务进行匹配分析,对已经做了时区转换的任务逻辑减去 8 小时的时差。
  • 时间函数类型转换还是以 currenttimestamp 函数为例,在 Flink 1.10 版本中 currenttimestamp 返回值类型为 timestamp 而在 Flink 1.13 中 currenttimestamp 返回值类型为 timestampltz 的格式。那么在 Flink 1.10 中的 current_timestamp 一些函数使用在 Flink 1.13中会报错,比如 TIMESTAMPDIFF 和 TIMESTAMPADD。简单举个例子
TIMESTAMPDIFF(MINUTE, (current_timestamp - INTERVAL '10' MINUTE), TO_TIMESTAMP(FROM_UNIXTIME(orderTime / 1000, 'yyyy-MM-dd HH:mm:ss')))  

​ 上述语句在 Flink 1.13 中会因为 TIMESTAMPDIFF 函数中一个是 timestampltz 格式 一个是 timestamp 而出现异常,为此需要转换成同一种类型,比如将后面的时间转为 timestampltz 类型,才能应用 TIMESTAMPDIFF、TIMESTAMPADD 等函数。

TIMESTAMPDIFF(MINUTE, (current_timestamp - INTERVAL '10' MINUTE), TO_TIMESTAMP_LTZ(mainOrderInfo.orderTime , 3))  

​ 如果需要将 CURRENTTIMESTAMP 的 TIMESTAMPLTZ 类型转为 TIMESTAMP 类型,可以使用下面的方式进行转换:

TO_TIMESTAMP(CAST(CURRENT_TIMESTAMP AS STRING))  

​ 因此在升级到 Flink 1.13 中关于时间函数的使用转换是尤为需要注意的,否则会因为逻辑不对造成数据不准确,或者任务异常无法启动。

(3)Primary key 自动生成

​ 在 Flink 1.10 以后对于存在 upsert 操作时比如写 mysql,tidb 时出现了聚合等操作需要在建表语句中指定 primary key, 这也是为了解决在 Flink 1.10 中对于一个复杂的 SQL 语句无法通过优化器 从Quary 语句中自动推断出 primary key而产生异常的问题。因此,为了平滑升级,我们需要对 upsert 流的建表语句中指定 primary key,否则会提示异常:

please declare primary key for sink table when query contains update/delete record.

​ 我们采用优化器推断 Quary 语句推断的方式实现了一套 primary key 自动生成的逻辑,然后判断任务是为 upsert 流 来为需要添加 primary key 的建表语句自动生成对应的 primary key。当然对于一些过于复杂的 SQL 任务如果生成失败会进行提示,联系用户自己去手动添加 primary key, 我们的 primary key 生成逻辑满足 95%的任务的 primary key 的自动生成。

3.4 任务平滑迁移实践与踩坑

​ 在 Flink 1.10 SQL 任务升级到 Flink 1.13 版本的过程中,我们除了做了语法转换之外,还有批量按照 Flink 1.13 语法检查,数据准确性验证,批量重启等工作。整个工作过程如下流程图所示:

其中有几点需要关注的是:

  • 在迁移之前我们对各种任务构建了测试任务,并在第二天将测试任务的数据与老版本的实时任务和离线任务进行数据准确性验证;
  • 同时关于 SQL 转换后关于 current_timestamp 这种时间函数的逻辑转换以及 primary key 的自动生成,需要在 SQL 转换后让用户进行 double check,反正升级后数据不准产生问题。
  • 任务迁移尽量选择流量较小的时间段,防止重启异常时对业务产生很大的数据延迟影响。同时按照任务优先级的高低,以及根据实时任务血缘确定任务的重启顺序,比如在有赞的实时计算任务中,我们会优先重启低优先级和数据链路中下游的任务,在保证任务升级重启稳定运行一段时间后再去重启高优先级的任务,反正一些未发现的异常对升级后的任务产生大的影响。

3.5 其他踩坑和注意

​ 关于本次有赞实时计算平台引擎升级到 Flink 1.13 过程中也遇到过一些问题和踩过一些坑,一些问题已经在对应的实践中提及过了,那么还有遇到其他的一些升级过程中遇到一些问题在这里可以分享一下:

(1)任务升级后从之前版本的 checkpoint 文件恢复失败

当我们升级 Flink 1.13 后的任务想通过之前的任务的 checkpoint 文件进行状态恢复时,会偶尔出现下面的异常:

​ 通过社区邮件和源码阅读发现根本原因是在 Flink 1.11之后 BaseRowSerializer 改名成 RowDataSerializer了,即使用 state-processor-API 也没办法处理当前不存在的类。目前关于这一个问题社区也没有专门去处理的 Jira。

​ 这种问题并不是所有的任务重启时从之前的状态文件恢复都会出现的,所以面对这种问题的比较好的办法就是升级重启的时间尽量选择在流量小的时间段,对于一些按天维度做聚合的任务最好在凌晨的时候重启,这样出现问题也不会对第二天的数据有很大的影响,同时对于恢复异常的任务做好数据重放的处理。

(2)Mysql 维表关联出现类型转换异常报错

在升级 Flink 1.13 过程中,我们发现有几个 mysql 维表关联的任务升级重启后抛出如下异常:

​ 在1.13中由于对 Table connector 数据类型统一为 RowData,在维表关联时如果业务方的 mysql 的字段类型定义为 BIGINT,当 mysql 中是 BIGINT UNSIGNED 时,如果用Flink 的BIGINT 去转成 mysql 的 BIGINT UNSIGNED 时会出现上述的报错。因为最终维表关联的数据要转换成RowData格式,所以不能将mysql 的 BIGINT UNSIGNED 与Flink 的 BIGINT 进行相互转换。

​ 为了解决上述问题,在 Flink 1.11 中提出的一个Jira : FLINK-18580 ,官方建议在Flink 的建维表时将 BIGINT 定义为 DECIMAL(20,0)。

(3)执行多条 insert 语句任务异常

​ 在 Flink 1.10 中我们底层真正执行 SQL 的是 executeSql() 方法,对于 Flink 1.10 版本去调用该方法不会出现任何异常,且每条 insert 语句均有输出。但是升级到 Flink 1.13 之后,如果依然采用 executeSql() 方法去执行一个任务内的多条 insert 语句时会出现问题,我们发现只有第一条 insert 语句是有结果的,同时集群上出现多个相同的 job 被提交。如下面例子所示:

insert into max_realtimet select guangBusinessId,st_hour,'orderCountHourAll',orderCountHourAll from order_hour_cnt_all_view;

insert into max_realtime select guangBusinessId,st_hour,'orderPaidAmountHourAll',orderPaidAmountHourAll from order_hour_cnt_all_view;

insert into max_realtime select guangBusinessId,st_hour,'orderPaidUserCountHourAll',orderPaidUserCountHourAll from order_hour_cnt_all_view;  

​ 在 Flink 1.10的任务中 print 的结果是正常的:

​ 但是在 Flink 1.13 中可以明显只有第一条 insert 语句的输出:

​ 通过官方文档的解释我们发现执在 Flink 1.13 版本中 executeSql() 方法每执行一条 insert 语句的会立即提交一个 Flink 作业,并返回一个与提交的作业相关联的 TableResult 实例。这也验证了为什么发现确实当启动一个多 insert 语句的任务时在集群会起来了多个 job。 ​ 为此需要采用 StatementSet 将 insert 语句添加到 StatementSet 中,最后执行 StatementSet.execute(),如下代码所示:

StatementSet statementSet = streamTableEnv.createStatementSet();  
sqls.forEach(sql -> {  
  if(isInsertSql(sql)){
    statementSet.addInsertSql(sql);
  }else{
    streamTableEnv.executeSql(sql);
  }
});
statementSet.execute();  

​ 上述是我们从 Flink 1.10 升级到 Flink 1.13 中间遇到的一些问题,因为在 Flink 1.10 以后社区的代码架构改动还是很大的,中间踩了一些坑,也遇到一些问题,其实好多问题在社区邮件和社区的 jira 里面都给出了好的解决方案,我们更多的介绍了实践过程中踩过的一些坑来分享。

四、总结

​ 目前有赞实时计算平台已经将 Flink 引擎从 Flink 1.10 升级到了 Flink 1.13,并将所有的 Flink SQL 任务平滑迁移升级到 Flink 1.13 版本中,并成功运行了近三个月。随着有赞更多的业务场景不断接入实时任务,目前 Flink SQL 任务接近整体实时任务体量的 60%,实时任务 SQL 化是我们的目标,因此升级到 Flink 1.13 后对于 Flink SQL 开发的简化以及特性增加与性能优化对我们来说是十分有价值的。

​ 同时随着实时集群任务体量的增大,对资源的管控以及弹性扩缩容的需求也越来越大。而社区在 Flink on K8S 的投入也在不断增加,后续肯定是在 Native K8S 或者 Application Level K8S 有更多的优化,为此升级 Flink 1.13 之后我们将所有 Flink SQL 任务全部迁移到 K8S 集群,采用 Flink on Native 的 Application 模式运行任务,实现整个集群容器化,为后续的实时任务弹性扩缩容做好准备,目前我们已经完成 Flink on Native 的 Application 模式任务的测试阶段。后面将紧跟 Flink 社区的发展,为有赞的更多业务场景提供更多实践的可能。

欢迎关注我们的公众号