Flink無法將聚合結(jié)果直接寫入Kafka怎么辦?
原創(chuàng)拋出疑無路?
【Flink 1.10】- 有一種情況是所有的系統(tǒng)或應(yīng)用之間的橋梁都是Kafka,而這個(gè)時(shí)候恰恰是上游需要做Unbound的聚合統(tǒng)計(jì)。From @PyFlink 企業(yè)用戶。
示例代碼:
INSERT INTO kafkaSink
SELECT
id,
SUM(cnt)
FROM csvSource
GROUP BY id
執(zhí)行這個(gè)SQL,在【Flink 1.10】版本會(huì)拋出如下異常:

再現(xiàn)又一村!
【Flink-1.10】這個(gè)問題是因Flink內(nèi)部Retract機(jī)制導(dǎo)致,在沒有考慮對(duì)Chanage log全鏈路支持之前,無法在Kafka這樣的Append only的消息隊(duì)列增加對(duì)Retract/Upsert的支持。這個(gè)做法是出于語義完整性考慮做出的決定。但現(xiàn)實(shí)業(yè)務(wù)場(chǎng)景總是有著這樣或那樣的實(shí)際業(yè)務(wù)需求,業(yè)務(wù)不關(guān)心你語義是否okay,業(yè)務(wù)關(guān)心我不改變我原有的技術(shù)選型。
在這個(gè)基礎(chǔ)之上只要你告訴我Sink到Kafka的行為就行,我會(huì)根據(jù)你的產(chǎn)出行為,在業(yè)務(wù)上面做適配,所以這個(gè)時(shí)候就是實(shí)用為主,不管什么語義不語義了......,所以這個(gè)時(shí)候應(yīng)該怎么辦呢?
我們的做法是將 Kafka的sink由原有的AppendStreamTableSink變成UpsertStreamTableSink或者RetractStreamTableSink。但出于性能考慮,我們改變成UpsertStreamTableSink,這個(gè)改動(dòng)不多,但是對(duì)于初學(xué)者來講還是不太愿意動(dòng)手改代碼,所以為大家提供一份:
- KafkaTableSinkBase.java
https://github.com/sunjincheng121/know_how_know_why/blob/master/QA/upsertKafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
- KafkaTableSourceSinkFactoryBase.java
https://github.com/sunjincheng121/know_how_know_why/blob/master/QA/upsertKafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
在你的項(xiàng)目創(chuàng)建 org.apache.flink.streaming.connectors.kafka包 并把上面的兩個(gè)類放入該包,用于覆蓋官方KafkaConnector里面的實(shí)現(xiàn)。
特別強(qiáng)調(diào):這樣的變化會(huì)導(dǎo)致寫入Kafka的結(jié)果不會(huì)是每個(gè)Group Key只有一條結(jié)果,而是每個(gè)Key可能有很多條結(jié)果。這個(gè)大家可以自行測(cè)試一下:
package cdc
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
/**
* Test for sink data to Kafka with upsert mode.
*/
object UpsertKafka {
def main(args: Array[String]): Unit = {
val sourceData = "file:///Users/jincheng.sunjc/work/know_how_know_why/QA/upsertKafka/src/main/scala/cdc/id_cnt_data.csv"
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val sourceDDL = "CREATE TABLE csvSource (" +
" id VARCHAR," +
" cnt INT" +
") WITH (" +
"'connector.type' = 'filesystem'," +
"'connector.path' = '" + sourceData + "'," +
"'format.type' = 'csv'" +
")"
val sinkDDL = "CREATE TABLE kafkaSink (" +
" id VARCHAR," +
" cnt INT " +
") WITH (" +
"'connector.type' = 'kafka'," +
"'connector.version' = '0.10'," +
"'connector.topic' = 'test'," +
"'connector.properties.zookeeper.connect' = 'localhost:2181'," +
"'connector.properties.bootstrap.servers' = 'localhost:9092'," +
"'connector.properties.group.id' = 'data_Group'," +
"'format.type' = 'json')"
tEnv.sqlUpdate(sourceDDL)
tEnv.sqlUpdate(sinkDDL)
val sql = "INSERT INTO kafkaSink" +
" SELECT id, SUM(cnt) FROM csvSource GROUP BY id"
tEnv.sqlUpdate(sql)
env.execute("RetractKafka")
}
}
當(dāng)然,也可以clone我的git代碼【https://github.com/sunjincheng121/know_how_know_why/tree/master/QA/upsertKafka】直觀體驗(yàn)一下。由于本系列文章只關(guān)注解決問題,不論述細(xì)節(jié)原理,有關(guān)原理性知識(shí),我會(huì)在我的視頻課程《Apache 知其然,知其所以然》中進(jìn)行介紹。
Flink 的鍋?...
看到上面的問題有些朋友可能會(huì)問,既然知道問題,知道有實(shí)際業(yè)務(wù)需求,為啥Flink不改進(jìn),不把這種情況支持掉呢?問的好,就這個(gè)問題而言,F(xiàn)link是委屈的,F(xiàn)link已經(jīng)在努力支持這個(gè)場(chǎng)景了,預(yù)期Flink-1.12的版本大家會(huì)體驗(yàn)到完整的CDC(change data capture)支持。
眾人拾柴
期待你典型問題的拋出... 我將知無不言...言無不盡... 我在又一村等你...
作者介紹
孫金城,51CTO社區(qū)編輯,Apache Flink PMC 成員,Apache Beam Committer,Apache IoTDB PMC 成員,ALC Beijing 成員,Apache ShenYu 導(dǎo)師,Apache 軟件基金會(huì)成員。關(guān)注技術(shù)領(lǐng)域流計(jì)算和時(shí)序數(shù)據(jù)存儲(chǔ)。


























