關(guān)于 Flink Regular Join 與 TTL 的理解
對(duì)于流查詢(xún),Regular Join 的語(yǔ)法是最靈活的,它允許任何類(lèi)型的更新(插入、更新、刪除)輸入表。
Regular Join 包含以下幾種(以 L 作為左流中的數(shù)據(jù)標(biāo)識(shí),R 作為右流中的數(shù)據(jù)標(biāo)識(shí)):
- Inner Join(Inner Equal Join):當(dāng)兩條流 Join 到才會(huì)輸出 +[L, R]
- Left Join(Outer Equal Join):左流數(shù)據(jù)到達(dá)之后 Join 到 R 流數(shù)據(jù)則輸出 +[L, R],沒(méi) Join 到輸出 +[L, null])。如果右流之后數(shù)據(jù)到達(dá)之后,發(fā)現(xiàn)左流之前輸出過(guò)沒(méi)有 Join 到的數(shù)據(jù),則會(huì)發(fā)起回撤流,先輸出 -[L, null],然后輸出 +[L, R]。
- Right Join(Outer Equal Join):與 Left Join 邏輯相反。
- Full Join(Outer Equal Join):流任務(wù)中,左流或者右流的數(shù)據(jù)到達(dá)之后,無(wú)論有沒(méi)有 Join 到另外一條流的數(shù)據(jù),都會(huì)輸出(對(duì)右流來(lái)說(shuō):Join 到輸出 +[L, R],沒(méi) Join 到輸出 +[null, R];對(duì)左流來(lái)說(shuō):Join 到輸出 +[L, R],沒(méi) Join 到輸出 +[L, null])。如果一條流的數(shù)據(jù)到達(dá)之后,發(fā)現(xiàn)之前另一條流之前輸出過(guò)沒(méi)有 Join 到的數(shù)據(jù),則會(huì)發(fā)起回撤流(左流數(shù)據(jù)到達(dá)為例:回撤 -[null, R],輸出 +[L, R],右流數(shù)據(jù)到達(dá)為例:回撤 -[L, null],輸出 +[L, R])。
Regular Inner Join
Flink SQL?:
CREATE TABLE matchResult (
guid STRING
) WITH (
'connector' = 'kafka',
'topic' = 'match_result_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE readRecord (
guid STRING,
book_name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'read_record_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE sink_table (
guid STRING,
book_name STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
matchResult.guid,
readRecord.book_name
FROM matchResult
INNER JOIN readRecord ON matchResult.guid = readRecord.guid;
輸出結(jié)果解析?:
-- L 流數(shù)據(jù)達(dá)到,由于沒(méi)有 Join 到 R 流數(shù)據(jù)而且是 inner join 便不輸出結(jié)果
+I[111, book1] -- R 流數(shù)據(jù)達(dá)到, Join 到 L 流數(shù)據(jù),便輸出 +I[111, book1]
-- R 流數(shù)據(jù)達(dá)到,由于沒(méi)有 Join 到 L 流數(shù)據(jù)而且是 inner join 便不輸出結(jié)果
+I[222, book2] -- L 流數(shù)據(jù)達(dá)到, Join 到 R 流數(shù)據(jù)便輸出結(jié)果
Regular Left Join(Right join 則相反)
Flink SQL:
CREATE TABLE matchResult (
guid STRING
) WITH (
'connector' = 'kafka',
'topic' = 'match_result_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE readRecord (
guid STRING,
book_name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'read_record_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE sink_table (
guid STRING,
book_name STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
matchResult.guid,
readRecord.book_name
FROM matchResult
LEFT JOIN readRecord ON matchResult.guid = readRecord.guid;
輸出結(jié)果解析:
+I[111, null] -- L 流數(shù)據(jù)達(dá)到,沒(méi)有 Join 到 R 流數(shù)據(jù),便輸出 +[L, null]
-D[111, null] -- R 流的數(shù)據(jù)到達(dá),發(fā)現(xiàn) L 流之前輸出過(guò)沒(méi)有 Join 到的數(shù)據(jù),則會(huì)發(fā)起回撤流,先輸出 -[L, null]
+I[111, book1] -- 再輸出 +[L, R]
-- 這里模擬一條 R 流 guid = 222 的數(shù)據(jù)到達(dá),由于是 left join 且沒(méi)有 join 到 L 流,因此不做輸出
+I[222, book2] -- 當(dāng) L 流 guid = 222 的數(shù)據(jù)達(dá)到 join R 流 后輸出結(jié)果 +[L, R]
Regular Full Join
Flink SQL:
CREATE TABLE matchResult (
guid STRING
) WITH (
'connector' = 'kafka',
'topic' = 'match_result_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE readRecord (
guid STRING,
book_name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'read_record_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE sink_table (
guid STRING,
book_name STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
matchResult.guid,
readRecord.book_name
FROM matchResult
FULL JOIN readRecord ON matchResult.guid = readRecord.guid;
輸出結(jié)果解析:
+I[111, null] -- L 流數(shù)據(jù)達(dá)到,沒(méi)有 Join 到 R 流數(shù)據(jù),便輸出 +I[L, null]
+I[null, book2] -- R 流數(shù)據(jù)達(dá)到,沒(méi)有 Join 到 R 流數(shù)據(jù),便輸出 +I[null, R]
-D[null, book2] -- L 流新數(shù)據(jù)到達(dá),發(fā)現(xiàn)之前 R 流之前輸出過(guò)沒(méi)有 Join 到的數(shù)據(jù),則發(fā)起回撤流,先輸出 -D[null, R]
+I[222, book2] -- 再輸出 +I[L, R]
-D[111, null] -- 反之同理
+I[111, book1]
TTL 概念
在 Regular Join 時(shí) Flink 會(huì)將兩條沒(méi)有時(shí)間窗口限制的流的所有數(shù)據(jù)存儲(chǔ)在 State 中,由于流是無(wú)窮無(wú)盡持續(xù)流入的,隨著時(shí)間的不斷推進(jìn),內(nèi)存中積累的狀態(tài)會(huì)越來(lái)越多。
針對(duì)這個(gè)問(wèn)題,F(xiàn)link 提出了空閑狀態(tài)保留時(shí)間(Idle State Retention Time)的概念。通過(guò)為每個(gè)狀態(tài)設(shè)置 Timer,如果這個(gè)狀態(tài)中途被訪問(wèn)過(guò),則重新設(shè)置 Timer;否則(如果狀態(tài)一直未被訪問(wèn),長(zhǎng)期處于 Idle 狀態(tài))則在 Timer 到期時(shí)做狀態(tài)清理。這樣,就可以確保每個(gè)狀態(tài)都能得到及時(shí)的清理,可以通過(guò) table.exec.state.ttl 參數(shù)進(jìn)行控制(注意:這同時(shí)也會(huì)對(duì)結(jié)果的準(zhǔn)確性有所影響,因此需要合理的權(quán)衡)。