偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Kafka之消費(fèi)與心跳

系統(tǒng) Linux Kafka
kafka是一個(gè)分布式,分區(qū)的,多副本的,多訂閱者的消息發(fā)布訂閱系統(tǒng)(分布式MQ系統(tǒng)),可以用于搜索日志,監(jiān)控日志,訪問(wèn)日志等。kafka是一個(gè)分布式,分區(qū)的,多副本的,多訂閱者的消息發(fā)布訂閱系統(tǒng)(分布式MQ系統(tǒng)),可以用于搜索日志,監(jiān)控日志,訪問(wèn)日志等。今天小編來(lái)領(lǐng)大家一起來(lái)學(xué)習(xí)一下Kafka消費(fèi)與心跳機(jī)制。

 導(dǎo)讀kafka是一個(gè)分布式,分區(qū)的,多副本的,多訂閱者的消息發(fā)布訂閱系統(tǒng)(分布式MQ系統(tǒng)),可以用于搜索日志,監(jiān)控日志,訪問(wèn)日志等。kafka是一個(gè)分布式,分區(qū)的,多副本的,多訂閱者的消息發(fā)布訂閱系統(tǒng)(分布式MQ系統(tǒng)),可以用于搜索日志,監(jiān)控日志,訪問(wèn)日志等。今天小編來(lái)領(lǐng)大家一起來(lái)學(xué)習(xí)一下Kafka消費(fèi)與心跳機(jī)制。

1、Kafka消費(fèi)

首先,我們來(lái)看看消費(fèi)。Kafka提供了非常簡(jiǎn)單的消費(fèi)API,使用者只需初始化Kafka的Broker Server地址,然后實(shí)例化KafkaConsumer類即可拿到Topic中的數(shù)據(jù)。一個(gè)簡(jiǎn)單的Kafka消費(fèi)實(shí)例代碼如下所示:

  1. public class JConsumerSubscribe extends Thread {  
  2.     public static void main(String[] args) {        JConsumerSubscribe jconsumer = new JConsumerSubscribe();        jconsumer.start();    }    /** 初始化Kafka集群信息. */    private Properties configure() {        Properties props = new Properties();        props.put("bootstrap.servers""dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址  
  3.         props.put("group.id""ke");// 指定消費(fèi)者組  
  4.         props.put("enable.auto.commit""true");// 開(kāi)啟自動(dòng)提交  
  5.         props.put("auto.commit.interval.ms""1000");// 自動(dòng)提交的時(shí)間間隔  
  6.         // 反序列化消息主鍵        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");  
  7.         // 反序列化消費(fèi)記錄        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");  
  8.         return props;  
  9.     }    /** 實(shí)現(xiàn)一個(gè)單線程消費(fèi)者. */    @Override    public void run() {        // 創(chuàng)建一個(gè)消費(fèi)者實(shí)例對(duì)象        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configure());        // 訂閱消費(fèi)主題集合        consumer.subscribe(Arrays.asList("test_kafka_topic"));  
  10.         // 實(shí)時(shí)消費(fèi)標(biāo)識(shí)        boolean flag = true;  
  11.         while (flag) {  
  12.             // 獲取主題消息數(shù)據(jù)            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));  
  13.             for (ConsumerRecord<String, String> record : records)  
  14.                 // 循環(huán)打印消息記錄                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());  
  15.         }        // 出現(xiàn)異常關(guān)閉消費(fèi)者對(duì)象        consumer.close();  
  16.     }}  

 上述代碼我們就可以非常便捷地拿到Topic中的數(shù)據(jù)。但是,當(dāng)我們調(diào)用poll方法拉取數(shù)據(jù)的時(shí)候,Kafka Broker Server做了那些事情。接下來(lái),我們可以去看看源代碼的實(shí)現(xiàn)細(xì)節(jié)。核心代碼如下:

org.apache.kafka.clients.consumer.KafkaConsumer

  1. private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {  
  2.         acquireAndEnsureOpen();        try {  
  3.             if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative");  
  4.             if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {  
  5.                 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");  
  6.             }            // poll for new data until the timeout expires  
  7.             long elapsedTime = 0L;  
  8.             do {  
  9.                 client.maybeTriggerWakeup();                final long metadataEnd;                if (includeMetadataInTimeout) {  
  10.                     final long metadataStart = time.milliseconds();                    if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {  
  11.                         return ConsumerRecords.empty();  
  12.                     }                    metadataEnd = time.milliseconds();                    elapsedTime += metadataEnd - metadataStart;                } else {  
  13.                     while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {  
  14.                         log.warn("Still waiting for metadata");  
  15.                     }                    metadataEnd = time.milliseconds();                }                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));                if (!records.isEmpty()) {  
  16.                     // before returning the fetched records, we can send off the next round of fetches  
  17.                     // and avoid block waiting for their responses to enable pipelining while the user  
  18.                     // is handling the fetched records.  
  19.                     //  
  20.                     // NOTE: since the consumed position has already been updated, we must not allow  
  21.                     // wakeups or any other errors to be triggered prior to returning the fetched records.  
  22.                     if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {  
  23.                         client.pollNoWakeup();                    }                    return this.interceptors.onConsume(new ConsumerRecords<>(records));  
  24.                 }                final long fetchEnd = time.milliseconds();                elapsedTime += fetchEnd - metadataEnd;            } while (elapsedTime < timeoutMs);  
  25.             return ConsumerRecords.empty();  
  26.         } finally {  
  27.             release();        }    }  

 上述代碼中有個(gè)方法pollForFetches,它的實(shí)現(xiàn)邏輯如下: 

  1. private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {  
  2.         final long startMs = time.milliseconds();  
  3.         long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);  
  4.         // if data is available already, return it immediately  
  5.         final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();  
  6.         if (!records.isEmpty()) {  
  7.             return records;  
  8.         }  
  9.         // send any new fetches (won't resend pending fetches)  
  10.         fetcher.sendFetches();  
  11.         // We do not want to be stuck blocking in poll if we are missing some positions  
  12.         // since the offset lookup may be backing off after a failure  
  13.         // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call  
  14.         // updateAssignmentMetadataIfNeeded before this method.  
  15.         if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {  
  16.             pollTimeout = retryBackoffMs;  
  17.         }  
  18.         client.poll(pollTimeout, startMs, () -> {  
  19.             // since a fetch might be completed by the background thread, we need this poll condition  
  20.             // to ensure that we do not block unnecessarily in poll()  
  21.             return !fetcher.hasCompletedFetches();  
  22.         });  
  23.         // after the long poll, we should check whether the group needs to rebalance  
  24.         // prior to returning data so that the group can stabilize faster  
  25.         if (coordinator.rejoinNeededOrPending()) {  
  26.             return Collections.emptyMap();  
  27.         }  
  28.         return fetcher.fetchedRecords();  
  29.     }  

 上述代碼中加粗的位置,我們可以看出每次消費(fèi)者客戶端拉取數(shù)據(jù)時(shí),通過(guò)poll方法,先調(diào)用fetcher中的fetchedRecords函數(shù),如果獲取不到數(shù)據(jù),就會(huì)發(fā)起一個(gè)新的sendFetches請(qǐng)求。而在消費(fèi)數(shù)據(jù)的時(shí)候,每個(gè)批次從Kafka Broker Server中拉取數(shù)據(jù)是有最大數(shù)據(jù)量限制,默認(rèn)是500條,由屬性(max.poll.records)控制,可以在客戶端中設(shè)置該屬性值來(lái)調(diào)整我們消費(fèi)時(shí)每次拉取數(shù)據(jù)的量。

提示:這里需要注意的是,max.poll.records返回的是一個(gè)poll請(qǐng)求的數(shù)據(jù)總和,與多少個(gè)分區(qū)無(wú)關(guān)。因此,每次消費(fèi)從所有分區(qū)中拉取Topic的數(shù)據(jù)的總條數(shù)不會(huì)超過(guò)max.poll.records所設(shè)置的值。

而在Fetcher的類中,在sendFetches方法中有限制拉取數(shù)據(jù)容量的限制,由屬性(max.partition.fetch.bytes),默認(rèn)1MB。可能會(huì)有這樣一個(gè)場(chǎng)景,當(dāng)滿足max.partition.fetch.bytes限制條件,如果需要Fetch出10000條記錄,每次默認(rèn)500條,那么我們需要執(zhí)行20次才能將這一次通過(guò)網(wǎng)絡(luò)發(fā)起的請(qǐng)求全部Fetch完畢。

這里,可能有同學(xué)有疑問(wèn),我們不能將默認(rèn)的max.poll.records屬性值調(diào)到10000嗎?可以調(diào),但是還有個(gè)屬性需要一起配合才可以,這個(gè)就是每次poll的超時(shí)時(shí)間(Duration.ofMillis(100)),這里需要根據(jù)你的實(shí)際每條數(shù)據(jù)的容量大小來(lái)確定設(shè)置超時(shí)時(shí)間,如果你將最大值調(diào)到10000,當(dāng)你每條記錄的容量很大時(shí),超時(shí)時(shí)間還是100ms,那么可能拉取的數(shù)據(jù)少于10000條。

而這里,還有另外一個(gè)需要注意的事情,就是會(huì)話超時(shí)的問(wèn)題。session.timeout.ms默認(rèn)是10s,group.min.session.timeout.ms默認(rèn)是6s,group.max.session.timeout.ms默認(rèn)是30min。當(dāng)你在處理消費(fèi)的業(yè)務(wù)邏輯的時(shí)候,如果在10s內(nèi)沒(méi)有處理完,那么消費(fèi)者客戶端就會(huì)與Kafka Broker Server斷開(kāi),消費(fèi)掉的數(shù)據(jù),產(chǎn)生的offset就沒(méi)法提交給Kafka,因?yàn)镵afka Broker Server此時(shí)認(rèn)為該消費(fèi)者程序已經(jīng)斷開(kāi),而即使你設(shè)置了自動(dòng)提交屬性,或者設(shè)置auto.offset.reset屬性,你消費(fèi)的時(shí)候還是會(huì)出現(xiàn)重復(fù)消費(fèi)的情況,這就是因?yàn)閟ession.timeout.ms超時(shí)的原因?qū)е碌摹?/p>

2、心跳機(jī)制

上面在末尾的時(shí)候,說(shuō)到會(huì)話超時(shí)的情況導(dǎo)致消息重復(fù)消費(fèi),為什么會(huì)有超時(shí)?有同學(xué)會(huì)有這樣的疑問(wèn),我的消費(fèi)者線程明明是啟動(dòng)的,也沒(méi)有退出,為啥消費(fèi)不到Kafka的消息呢?消費(fèi)者組也查不到我的ConsumerGroupID呢?這就有可能是超時(shí)導(dǎo)致的,而Kafka是通過(guò)心跳機(jī)制來(lái)控制超時(shí),心跳機(jī)制對(duì)于消費(fèi)者客戶端來(lái)說(shuō)是無(wú)感的,它是一個(gè)異步線程,當(dāng)我們啟動(dòng)一個(gè)消費(fèi)者實(shí)例時(shí),心跳線程就開(kāi)始工作了。

在org.apache.kafka.clients.consumer.internals.AbstractCoordinator中會(huì)啟動(dòng)一個(gè)HeartbeatThread線程來(lái)定時(shí)發(fā)送心跳和檢測(cè)消費(fèi)者的狀態(tài)。每個(gè)消費(fèi)者都有個(gè)org.apache.kafka.clients.consumer.internals.ConsumerCoordinator,而每個(gè)ConsumerCoordinator都會(huì)啟動(dòng)一個(gè)HeartbeatThread線程來(lái)維護(hù)心跳,心跳信息存放在org.apache.kafka.clients.consumer.internals.Heartbeat中,聲明的Schema如下所示:

  1. private final int sessionTimeoutMs;  
  2.     private final int heartbeatIntervalMs;  
  3.     private final int maxPollIntervalMs;  
  4.     private final long retryBackoffMs;  
  5.     private volatile long lastHeartbeatSend;   
  6.     private long lastHeartbeatReceive;  
  7.     private long lastSessionReset;  
  8.     private long lastPoll;  
  9.     private boolean heartbeatFailed;  

 心跳線程中的run方法實(shí)現(xiàn)代碼如下: 

  1. public void run() {  
  2.             try {  
  3.                 log.debug("Heartbeat thread started");  
  4.                 while (true) {  
  5.                     synchronized (AbstractCoordinator.this) {  
  6.                         if (closed)  
  7.                             return;  
  8.                         if (!enabled) {  
  9.                             AbstractCoordinator.this.wait();  
  10.                             continue;  
  11.                         }                        if (state != MemberState.STABLE) {  
  12.                             // the group is not stable (perhaps because we left the group or because the coordinator  
  13.                             // kicked us out), so disable heartbeats and wait for the main thread to rejoin.  
  14.                             disable();  
  15.                             continue;  
  16.                         }  
  17.                         client.pollNoWakeup();  
  18.                         long now = time.milliseconds();  
  19.                         if (coordinatorUnknown()) {  
  20.                             if (findCoordinatorFuture != null || lookupCoordinator().failed())  
  21.                                 // the immediate future check ensures that we backoff properly in the case that no  
  22.                                 // brokers are available to connect to.  
  23.                                 AbstractCoordinator.this.wait(retryBackoffMs);  
  24.                         } else if (heartbeat.sessionTimeoutExpired(now)) {  
  25.                             // the session timeout has expired without seeing a successful heartbeat, so we should  
  26.                             // probably make sure the coordinator is still healthy.  
  27.                             markCoordinatorUnknown();  
  28.                         } else if (heartbeat.pollTimeoutExpired(now)) {  
  29.                             // the poll timeout has expired, which means that the foreground thread has stalled  
  30.                             // in between calls to poll(), so we explicitly leave the group.  
  31.                             maybeLeaveGroup();  
  32.                         } else if (!heartbeat.shouldHeartbeat(now)) {  
  33.                             // poll again after waiting for the retry backoff in case the heartbeat failed or the  
  34.                             // coordinator disconnected  
  35.                             AbstractCoordinator.this.wait(retryBackoffMs);  
  36.                         } else {  
  37.                             heartbeat.sentHeartbeat(now);  
  38.                             sendHeartbeatRequest().addListener(new RequestFutureListener() {  
  39.                                 @Override  
  40.                                 public void onSuccess(Void value) {  
  41.                                     synchronized (AbstractCoordinator.this) {  
  42.                                         heartbeat.receiveHeartbeat(time.milliseconds());  
  43.                                     }  
  44.                                 }  
  45.                                 @Override  
  46.                                 public void onFailure(RuntimeException e) {  
  47.                                     synchronized (AbstractCoordinator.this) {  
  48.                                         if (e instanceof RebalanceInProgressException) {  
  49.                                             // it is valid to continue heartbeating while the group is rebalancing. This  
  50.                                             // ensures that the coordinator keeps the member in the group for as long  
  51.                                             // as the duration of the rebalance timeout. If we stop sending heartbeats,  
  52.                                             // however, then the session timeout may expire before we can rejoin.  
  53.                                             heartbeat.receiveHeartbeat(time.milliseconds());  
  54.                                         } else {  
  55.                                             heartbeat.failHeartbeat();  
  56.                                             // wake up the thread if it's sleeping to reschedule the heartbeat  
  57.                                             AbstractCoordinator.this.notify();  
  58.                                         }  
  59.                                     }  
  60.                                 }  
  61.                             });  
  62.                         }  
  63.                     }  
  64.                 }  
  65.             } catch (AuthenticationException e) {  
  66.                 log.error("An authentication error occurred in the heartbeat thread", e);  
  67.                 this.failed.set(e);  
  68.             } catch (GroupAuthorizationException e) {  
  69.                 log.error("A group authorization error occurred in the heartbeat thread", e);  
  70.                 this.failed.set(e);  
  71.             } catch (InterruptedException | InterruptException e) {  
  72.                 Thread.interrupted();  
  73.                 log.error("Unexpected interrupt received in heartbeat thread", e);  
  74.                 this.failed.set(new RuntimeException(e));  
  75.             } catch (Throwable e) {  
  76.                 log.error("Heartbeat thread failed due to unexpected error", e);  
  77.                 if (e instanceof RuntimeException)  
  78.                     this.failed.set((RuntimeException) e);  
  79.                 else  
  80.                     this.failed.set(new RuntimeException(e));  
  81.             } finally {  
  82.                 log.debug("Heartbeat thread has closed");  
  83.             }  
  84.         }  

 在心跳線程中這里面包含兩個(gè)最重要的超時(shí)函數(shù),它們是sessionTimeoutExpired和pollTimeoutExpired。 

  1. public boolean sessionTimeoutExpired(long now) {  
  2.         return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeoutMs;  
  3. }public boolean pollTimeoutExpired(long now) {  
  4.         return now - lastPoll > maxPollIntervalMs;  
  5. }  

 2.1、sessionTimeoutExpired

如果是sessionTimeout超時(shí),則會(huì)被標(biāo)記為當(dāng)前協(xié)調(diào)器處理斷開(kāi),此時(shí),會(huì)將消費(fèi)者移除,重新分配分區(qū)和消費(fèi)者的對(duì)應(yīng)關(guān)系。在Kafka Broker Server中,Consumer Group定義了5種(如果算上Unknown,應(yīng)該是6種狀態(tài))狀態(tài),org.apache.kafka.common.ConsumerGroupState,如下圖所示:


2.2、pollTimeoutExpired

如果觸發(fā)了poll超時(shí),此時(shí)消費(fèi)者客戶端會(huì)退出ConsumerGroup,當(dāng)再次poll的時(shí)候,會(huì)重新加入到ConsumerGroup,觸發(fā)RebalanceGroup。而KafkaConsumer Client是不會(huì)幫我們重復(fù)poll的,需要我們自己在實(shí)現(xiàn)的消費(fèi)邏輯中不停地調(diào)用poll方法。

3.分區(qū)與消費(fèi)線程

關(guān)于消費(fèi)分區(qū)與消費(fèi)線程的對(duì)應(yīng)關(guān)系,理論上消費(fèi)線程數(shù)應(yīng)該小于等于分區(qū)數(shù)。之前是有這樣一種觀點(diǎn),一個(gè)消費(fèi)線程對(duì)應(yīng)一個(gè)分區(qū),當(dāng)消費(fèi)線程等于分區(qū)數(shù)是最大化線程的利用率。直接使用KafkaConsumer Client實(shí)例,這樣使用確實(shí)沒(méi)有什么問(wèn)題。但是,如果我們有富裕的CPU,其實(shí)還可以使用大于分區(qū)數(shù)的線程,來(lái)提升消費(fèi)能力,這就需要我們對(duì)KafkaConsumer Client實(shí)例進(jìn)行改造,實(shí)現(xiàn)消費(fèi)策略預(yù)計(jì)算,利用額外的CPU開(kāi)啟更多的線程,來(lái)實(shí)現(xiàn)消費(fèi)任務(wù)分片。Linux就該這么學(xué)

 

責(zé)任編輯:姜華 來(lái)源: 今日頭條
相關(guān)推薦

2020-09-30 14:07:05

Kafka心跳機(jī)制API

2022-07-07 09:00:49

RocketMQ消費(fèi)者消息消費(fèi)

2022-03-07 10:15:28

KafkaZookeeper存儲(chǔ)

2023-06-01 08:08:38

kafka消費(fèi)者分區(qū)策略

2019-09-23 08:27:15

TCP長(zhǎng)連接心跳

2013-08-22 09:36:52

移動(dòng)終端安全移動(dòng)安全移動(dòng)策略

2024-03-20 08:33:00

Kafka線程安全Rebalance

2024-09-23 20:55:04

2021-12-27 08:22:18

Kafka消費(fèi)模型

2023-11-27 17:29:43

Kafka全局順序性

2019-12-16 09:37:19

Kafka架構(gòu)數(shù)據(jù)

2024-08-23 16:04:45

2021-12-28 12:01:59

Kafka 消費(fèi)者機(jī)制

2021-07-08 05:52:34

Kafka架構(gòu)主從架構(gòu)

2021-10-11 07:01:33

混合云多云數(shù)據(jù)

2024-10-31 11:49:41

Kafka管理死信隊(duì)列

2025-07-01 07:21:15

2019-12-03 11:00:08

spring bootspring-kafkJava

2025-06-30 09:20:02

Kafka開(kāi)發(fā)Linux

2024-07-05 11:01:13

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)