废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:
书接上文,上文介绍了曝光流在关联点击流时,使用 flink sql regular join 存在的 retract 问题。
本文介绍怎么使用 flink sql interval join 解决这些问题。
看看上节的实际案例,来看看在具体输入值的场景下,输出值应该长啥样。
场景:即常见的曝光日志流(show_log)通过 log_id 关联点击日志流(click_log),将数据的关联结果进行下发。
来一波输入数据:
上节的 flink sql regular join 解决方案如下:
INSERT INTO sink_tableSELECT show_log.log_id as log_id, show_log.timestamp as timestamp, show_log.show_params as show_params, click_log.click_params as click_paramsFROM show_logLEFT JOIN click_log ON show_log.log_id = click_log.log_id;
上节说道,flink sql left join 在流数据到达时,如果左表流(show_log)join 不到右表流(click_log) ,则不会等待右流直接输出(show_log,null),在后续右表流数据代打时,会将(show_log,null)撤回,发送(show_log,click_log)。这就是为什么产生了 retract 流,从而导致重复写入 kafka。
对此,我们也是提出了对应的解决思路,既然 left join 中左流不会等待右流,那么能不能让左流强行等待右流一段时间,实在等不到在数据关联不到的数据即可。
当当当!!!
本文的 flink sql interval join 登场,它就能等。
大家先通过下面这句话和图简单了解一下 interval join 的作用(熟悉 DataStream 的小伙伴萌可能已经使用过了),后续会详细介绍原理。
interval join 就是用一个流的数据去关联另一个流的一段时间区间内的数据。关联到就下发关联到的数据,关联不到且在超时后就根据是否是 outer join(left join,right join,full join)下发没关联到的数据。
interval join
来看看上述案例的 flink sql interval join sql 怎么写:
INSERT INTO sink_tableSELECT show_log.log_id as log_id, show_log.timestamp as timestamp, show_log.show_params as show_params, click_log.click_params as click_paramsFROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_idAND show_log.row_time BETWEEN click_log.row_time - INTERVAL '10' MINUTE AND click_log.row_time + INTERVAL '10' MINUTE;
这里设置了 show_log.row_time BETWEEN click_log.row_time - INTERVAL '10' MINUTE AND click_log.row_time + INTERVAL '10' MINUTE代表 show_log 表中的数据会和 click_log 表中的 row_time 在前后 10 分钟之内的数据进行关联。
运行结果如下:
+[1 | 2021-11-01 00:01:03 | show_params | click_params]+[2 | 2021-11-01 00:03:00 | show_params | click_params]+[3 | 2021-11-01 00:05:00 | show_params | null]
如上就是我们期望的正确结果了。
flink web ui 算子图如下:
flink web ui
那么此时你可能有一个问题,结果中的前两条数据 join 到了输出我是理解的,那当 show_log join 不到 click_log 时为啥也输出了?原理是啥?
博主带你们来定位到具体的实现源码。先看一下 transformations。
transformations
可以看到事件时间下 interval join 的具体 operator 是 org.apache.flink.table.runtime.operators.join.KeyedCoProcessOperatorWithWatermarkDelay。
其核心逻辑就集中在 processElement1 和 processElement2 中,在 processElement1 和 processElement2 中使用 org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin 来处理具体 join 逻辑。RowTimeIntervalJoin 重要方法如下图所示。
TimeIntervalJoin
下面详细给大家解释一下。
join 时,左流和右流会在 interval 时间之内相互等待,如果等到了则输出数据[+(show_log,click_log)],如果等不到,并且另一条流的时间已经推进到当前这条数据在也不可能 join 到另一条流的数据时,则直接输出[+(show_log,null)],[+(null,click_log)]。
举个例子,show_log.row_time BETWEEN click_log.row_time - INTERVAL '10' MINUTE AND click_log.row_time + INTERVAL '10' MINUTE, 当 click_log 的时间推进到 2021-11-01 11:00:00 时,这时 show_log 来一条 2021-11-01 02:00:00 的数据, 那这条 show_log 必然不可能和 click_log 中的数据 join 到了,因为 click_log 中 2021-11-01 01:50:00 到 2021-11-01 02:10:00 之间的数据以及过期删除了。则 show_log 直接输出 [+(show_log,null)]
Notes:如果你设置了 allowLateness,join 不到的数据的输出和 state 的清理会多保留 allowLateness 时间
以上面案例的 show_log(左表) interval join click_log(右表) 为例(不管是 inner interval join,left interval join,right interval join 还是 full interval join,都会按照下面的流程执行):
上面只是左流 show_log 数据到达时的执行流程(即 ProcessElement1),当右流 click_log 到达时也是完全类似的执行流程(即 ProcessElement2)。
小伙伴萌在使用 interval join 需要注意的两点事项:
本文主要介绍了 flink sql interval 是怎么避免出现 flink regular join 存在的 retract 问题的,并通过解析其实现说明了运行原理,博主期望你读完本文之后能了解到:
留言与评论(共有 0 条评论) “” |