vivo Pulsar萬億級消息處理實踐(1)-數(shù)據(jù)發(fā)送原理解析和性能調(diào)優(yōu)

一、Pulsar 簡要介紹

Pulsar是新一代的云原生消息中間件,由Apache軟件基金會孵化和開源。它的設計目的是為了滿足現(xiàn)代數(shù)據(jù)處理和計算應用程序?qū)蓴U展性、可靠性和高性能的需求,具備存儲與計算分離、節(jié)點對等、獨立擴展、實時均衡、節(jié)點故障快速恢復等特性。
Pulsar由四個核心模塊組成:broker、bookKeeper和client(Producer和Consumer)、zk(元數(shù)據(jù)管理和節(jié)點協(xié)調(diào))。broker接受來自Producer的消息,將消息路由到對應的topic;bookKeeper用于數(shù)據(jù)持久化存儲和數(shù)據(jù)復制;Consumer消費topic上的數(shù)據(jù)。Pulsar支持多種編程語言和協(xié)議(如Java、C++、Go、Python等),可以運行在云、本地和混合環(huán)境中,擴展性好,支持多租戶和跨數(shù)據(jù)中心復制等特性。因此,Pulsar被廣泛應用于云計算、大數(shù)據(jù)、物聯(lián)網(wǎng)等領(lǐng)域的實時消息傳遞和處理應用中。
二、Pulsar Producer解析
首先需要了解Producer的數(shù)據(jù)發(fā)送流程,這里以“開啟壓縮、batch發(fā)送消息給partitioned topic“這樣的一個線上常規(guī)場景為例,解析數(shù)據(jù)的發(fā)送的關(guān)鍵環(huán)節(jié)。
tips:
在Pulsar中有無分區(qū)(Non-Partitioned)Topic 和有分區(qū) (Partitioned) 的 Topic之分,Partitioned topic最小分區(qū)數(shù)為1,為滿足任務的拓展性,在線上一般使用Partitioned topic。
2.1 消息生產(chǎn)與發(fā)送的詳細流程

Producer發(fā)送數(shù)據(jù)主要分為12個步驟:
① 創(chuàng)建Producer:
partitioned topic創(chuàng)建的是一個Partitioned-
ProducerImpl對象,該對象包含了所有分區(qū)及其對應的ProducerImpl對象,ProducerImpl對象負責所對應分區(qū)數(shù)據(jù)的維護和發(fā)送。
② 構(gòu)造消息:
一條消息被發(fā)送前首先會被封裝成為一個Message對象,對象中包含了所發(fā)送的topic name、消息體、消息大小、schema類型、metadata(是否指定key等)等信息。
③ 確定目標分區(qū):
在發(fā)送消息前需要通過路由策略決定發(fā)往哪一個分區(qū),選擇對應分區(qū)的ProducerImpl對象進行進一步處理。
④ 攔截器:
Producer可以設置自定義的攔截器,攔截器需要實現(xiàn)producerInterceptor接口,在消息發(fā)送前可對消息進行攔截修改。
⑤ 消息堆積控制:
Producer可以處理的消息是有限的,接收新的消息時會分別進行信號量和內(nèi)存使用率校驗,控制接收消息的速率,防止消息無限在本地堆積。
⑥ batch容器管理:
默認情況下分好區(qū)的消息不是直接被發(fā)送,而是放入了生產(chǎn)者的一個batch緩存容器中里面。在這個緩存里面,多條消息會被封裝成為一個批次(batch)。
⑦ 消息序列化:
Pulsar 的消息需要從客戶端傳到服務端,涉及到網(wǎng)絡傳輸,因此Producer將batch緩沖區(qū)中的所有消息逐一進行序列化。
⑧ 壓縮:
Pulsar內(nèi)置了多種壓縮算法,在發(fā)送前會根據(jù)所選擇的壓縮算法對batch整體進行壓縮,這將優(yōu)化網(wǎng)絡傳輸以提高Pulsar消息傳輸?shù)男阅堋?/p>
⑨ 構(gòu)建消息發(fā)送對象:
無論是開啟batch的批次消息,還是關(guān)閉batch的單條消息,都會被包裝為一個OpSendMsg對象,OpSendMsg也是Producer發(fā)送和pulsar broker接收處理的最小單位。
⑩ pending隊列:
所有構(gòu)建好的OpSendMsg在發(fā)送前都會被放入pendingMessages隊列中,消息處理完成后才會從隊列中移除。
? 消息傳輸:
Pulsar 使用netty將消息異步的從客戶端發(fā)送到服務端,Broker節(jié)點將在收到消息后對其進行確認,并將其存儲在指定主題的持久存儲中。
? 響應處理:
Pulsar Broker 在收到消息時會返回一個響應,如果寫入成功,消息將會從pendingMessages隊列中移除。如果寫入失敗,會返回一個錯誤,生產(chǎn)者在收到可重試錯誤之后會嘗試重新發(fā)送消息,直到重試成功或超時。
2.2 關(guān)鍵環(huán)節(jié)原理分析
接下來會對上述流程中關(guān)鍵環(huán)節(jié)的設計和原理作進一步的剖析,幫助讀者更好的理解Producer。
2.2.1 創(chuàng)建Producer

在Pulsar中,PartitionedProducerImpl用于將多個ProducerImpl對象包裝成為一個邏輯生產(chǎn)者,以便向Partitioned Topic發(fā)送消息時能夠批量操作。其中,PartitionedProducerImpl.producers成員變量維護了每個分區(qū)及其對應的ProducerImpl對象,該設計主要有以下3個好處:
① 每個分區(qū)對應一個單獨的生產(chǎn)者:
在Pulsar中,Partitioned Topic按照分區(qū)(Partition)將多個 ProducerImpl 對象進行分配,以便能夠同時發(fā)往多個 Broker 節(jié)點,因此對于每個分區(qū),需要擁有一個單獨的生產(chǎn)者以便進行發(fā)送操作。在 PartitionedProducerImpl 類中,需要為每個分區(qū)維護一個 ProducerImpl 對象,以便在消息被分配好“目標分區(qū)”后可以調(diào)用對應的ProducerImpl進行處理。
②簡化代碼邏輯:
在PartitionedProducerImpl中,將每個分區(qū)及其對應的ProducerImpl對象維護在一個HashMap中,能夠更加方便的維護并管理不同分區(qū)的生產(chǎn)者,使得代碼邏輯更加清晰簡明。
③ 提高容錯性:
當某個分區(qū)的ProducerImpl對象無法工作時,可以選擇其他可用的ProducerImpl對象,從而保證系統(tǒng)整體的可用性。由于將不同分區(qū)的ProducerImpl對象分別進行維護,因此具備更加靈活的容錯處理策略。
在線上實踐中我們也基于該設計,在PartitionedProducerImpl層做了進一步優(yōu)化,通過感知下一層每個ProducerImpl的阻塞狀態(tài)(信號量的使用情況)來決定新的消息發(fā)送,避免將消息持續(xù)發(fā)往阻塞較為嚴重的分區(qū),規(guī)避了topic被某一個分區(qū)阻塞而影響到整體發(fā)送性能的情況,也提高了線上系統(tǒng)的穩(wěn)定性,具體的實現(xiàn)可以詳見這篇文章《構(gòu)建下一代萬億級云原生消息架構(gòu):Apache Pulsar 在 vivo 的探索與實踐》。
關(guān)鍵代碼:
//對每一個分區(qū)都創(chuàng)建一個ProducerImpl對象
private void start(List<Integer> indexList) {
AtomicReference<Throwable> createFail = new AtomicReference<Throwable>();
AtomicInteger completed = new AtomicInteger();
for (int partitionIndex : indexList) {
createProducer(partitionIndex).producerCreatedFuture().handle((prod, createException) -> {
.......
});
}
}
private ProducerImpl<T> createProducer(final int partitionIndex) {
return producers.computeIfAbsent(partitionIndex, (idx) -> {
String partitionName = TopicName.get(topic).getPartition(idx).toString();
return client.newProducerImpl(partitionName, idx,
conf, schema, interceptors, new CompletableFuture<>());
});
}2.2.2 確定目標分區(qū)
在發(fā)送消息前需要決定發(fā)往哪一個分區(qū),確定好分區(qū)后便調(diào)用對應分區(qū)的ProducerImpl對象進一步處理,目標分區(qū)的確定主要跟“路由策略”和“是否指定key”有關(guān):
(1)如果消息沒有指定key:則按照三種路由策略的效果選擇分區(qū)進行發(fā)送,三種路由策略如下:
- SinglePartition:
如果消息沒有指定Key,Producer會隨機選擇一個 Partition,然后把所有的消息都發(fā)送到這個 Partition上。 - RoundRobinPartition:
生產(chǎn)者將以輪詢方式在所有 Partition之間發(fā)布消息,以實現(xiàn)最大吞吐量。需要注意的是如果開啟了batch發(fā)送,則輪詢將會以批為單位進行消息發(fā)送,批次發(fā)送時每隔partitionSwitchMs會輪詢一個 Partition。如果關(guān)閉了批量發(fā)送,那么每條消息發(fā)送都會輪詢一個Partition。(partitionSwitchMs至少為一個batchingMaxPublishDelay時間)。 - CustomPartition:
使用用戶自定義的消息路由實現(xiàn),根據(jù)自定義的 Router實現(xiàn)決定消息要發(fā)往哪個分區(qū)。用戶自定義的 Router可以通過 messageRoute參數(shù)設置。自定義的 Router需要實現(xiàn) MessageRouter接口的 choosePartition方法。
(2)如果消息指定key:則會對Key做哈希處理,然后找到對應的 Partition,把key所對應的消息都發(fā)送到同一個分區(qū):
對消息的Key進行哈希處理后如何找到對應的 Partition的?即使用Key的哈希值對總的 Partition數(shù)取模:N=(Key的哈希值%總的 Partition數(shù)),得到的N就是第N個 Partition,Producer可以通過設置 hashingscheme來使用不同的哈希算法 ,現(xiàn)在已經(jīng)支持 JavastringHash和 Murmur3_32Hash兩種哈希算法,前者直接調(diào)用String.hash.Code(),后者使用Murmur3。
路由策略的關(guān)鍵代碼如下:
//SinglePartition路由策略:
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
// If the message has a key, it supersedes the single partition routing policy
if (msg.hasKey()) {
return signSafeMod(hash.makeHash(msg.getKey()), metadata.numPartitions());
}
return partitionIndex;
}//RoundRobin路由策略:
public int choosePartition(Message<?> msg, TopicMetadata topicMetadata) {
// If the message has a key, it supersedes the round robin routing policy
if (msg.hasKey()) {
return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
}
if (isBatchingEnabled) { // if batching is enabled, choose partition on `partitionSwitchMs` boundary.
long currentMs = clock.millis();
return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartitions());
} else {
return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartitions());
}
}2.2.3 消息堆積控制

Producer不可能無限接收新的消息,如果某些分區(qū)數(shù)據(jù)發(fā)送較慢,消息就會堆積在Prouducer緩存中,導致已經(jīng)阻塞的分區(qū)堆積大量的消息,又無法重新發(fā)往其他分區(qū),同時也可能因為無限堆積的消息占用了大量的內(nèi)存,使得任務頻繁GC甚至OOM。
在Pulsar提供了兩個核心的速率限制策略和一個阻塞時的消息處理策略:
- 消息數(shù)量限制:
maxPendingMessages控制每個分區(qū)某一時刻最大可處理消息數(shù)量,通過信號量的方式控制“新進入的消息”的信號量分配和“處理完成消息“的信號量釋放,防止某個分區(qū)的消息嚴重堆積。 - 消息占用內(nèi)存大小限制:
memoryLimit控制整個Pulsar client的消息最大占用內(nèi)存大小,通過計數(shù)器方式控制“新進入的消息”有效載荷的內(nèi)存分配和“處理完成消息“有效載荷的內(nèi)存釋放,這里需要特殊說明的是memoryLimit是client的參數(shù),針對的是該client對象下的所有topic,因此并不建議一個Pulsar client對象new多個Producer topic ,因為很容易出現(xiàn)某一個topic占用內(nèi)存過多,導致另一個topic無空間可分配的情況。 - 阻塞處理策略:由blockIfQueueFull進行控制,當blockIfQueueFull為true時,代表阻塞等待,Producer會等待獲取信號量;當blockIfQueueFull為false時,一旦獲取不到信號量,就會立刻失敗,需要注意的是如果blockIfQueueFull為false,業(yè)務需要處理好消息失敗后的回調(diào)策略,否則會導致數(shù)據(jù)在Producer上“丟失”。
關(guān)鍵代碼如下:
public void sendAsync(Message<?> message, SendCallback callback) {
......
MessageImpl<?> msg = (MessageImpl<?>) message;
MessageMetadata msgMetadata = msg.getMessageBuilder();
ByteBuf payload = msg.getDataBuffer();
int uncompressedSize = payload.readableBytes();
//對發(fā)送隊列大小以及client memory進行判斷是否有空間放入新的消息
if (!canEnqueueRequest(callback, message.getSequenceId(), uncompressedSize)) {
return;
}
......
}
private boolean canEnqueueRequest(SendCallback callback, long sequenceId, int payloadSize) {
try {
if (conf.isBlockIfQueueFull()) {
//當blockIfQueueFull為true時,等待獲取信號量
if (semaphore.isPresent()) {
semaphore.get().acquire();
}
//分配消息有效載荷所需要的內(nèi)存空間
client.getMemoryLimitController().reserveMemory(payloadSize);
} else {
//當blockIfQueueFull為false時,如果無法獲取到信號量,則快速失敗
if (!semaphore.map(Semaphore::tryAcquire).orElse(true)) {
callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full", sequenceId));
return false;
}
//如果沒有如何的內(nèi)存空間用于消息分配,則報錯
if (!client.getMemoryLimitController().tryReserveMemory(payloadSize)) {
semaphore.ifPresent(Semaphore::release);
callback.sendComplete(new PulsarClientException.MemoryBufferIsFullError("Client memory buffer is full", sequenceId));
return false;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
callback.sendComplete(new PulsarClientException(e, sequenceId));
return false;
}
return true;
}2.2.4 消息batch容器打包

(1)batch關(guān)鍵組成信息
- Messages: 保存消息的list,保存跟這個batch相關(guān)所有的MessageImpl對象。
- Metadata:保存batch相關(guān)的元數(shù)據(jù),如批量消息的序列號、消息發(fā)送的時間戳等信息。
- Callback:保存消息回調(diào)邏輯的集合,記錄了每一條消息對應的callback策略,在batch消息發(fā)送并等到服務端響應后,依次對消息的回調(diào)進行處理。
(2)batch打包條件
batch打包條件的三個關(guān)鍵參數(shù):滿足其一數(shù)據(jù)就會被打包發(fā)送出去。
- 批次大?。?/strong>batchingMaxBytes
- 批次條數(shù):batchingMaxMessages
- 批次延遲發(fā)送時間:
batchingMaxPublishDelay
Pulsar使用兩個模塊設計來實現(xiàn)上面的參數(shù)控制:
- accumulator:在 BatchMessage-
ContainerImpl 中通過計數(shù)器的方式去控制batch的大小和條數(shù),numMessages-
InBatch記錄已經(jīng)緩存的消息數(shù)量,currentBatchSizeBytes用于記錄已緩存的消息的大小。當這些變量達到閾值時,BatchMessageContainerImpl 將會觸發(fā)批量消息的發(fā)送。 - batchTimerTask:當生產(chǎn)者使用批量消息發(fā)送模式時,Producer將會創(chuàng)建一個定時器任務(batchTimerTask),并通過計時器的方式定時將BatchMessageContainer容器中的消息進行批量發(fā)送。
2.2.5 消息壓縮
如果開啟了消息壓縮,在發(fā)送前都需要進行壓縮處理。對于單條消息發(fā)送的場景,是對每一條消息進行單獨壓縮后進行發(fā)送;而如果開啟了batch則是對整個batch進行壓縮后再整個進行發(fā)送。
在線上實踐中,推薦在不影響業(yè)務延遲的情況下batch越大越好,主要有兩個理由:
- 可以優(yōu)化網(wǎng)絡IO降低CPU負載:
不論Producer發(fā)送的是一條消息還是一批消息,在pulsar客戶端都會被構(gòu)建為一個OpSendMsg對象,同時pulsar broker接收到消息進行寫入處理時,也是按照OpSendMsg為一個處理單位將消息寫入磁盤,因此當消息數(shù)量一定時,batch越大,則代表需要處理的OpSendMsg越少。 - batch越大“壓縮效果則越好”:
壓縮算法對應的壓縮率并不固定,它通常取決于所要壓縮的數(shù)據(jù)對象的內(nèi)容和壓縮算法本身,壓縮的本質(zhì)在于通過消除或利用數(shù)據(jù)中存在的冗余來實現(xiàn)數(shù)據(jù)的壓縮和重構(gòu)。而Pulsar是以batch來進行打包的,batch越大,壓縮的目標包體越大壓縮效果則可能越好,同時也能夠盡可能避免單條消息因為包體較小導致越壓縮后包體越大的情況出現(xiàn)。
以下是開啟了batch情況下,構(gòu)建發(fā)送消息和壓縮的關(guān)鍵代碼:
public OpSendMsg createOpSendMsg() throws IOException {
//對數(shù)據(jù)進行壓縮、加密等操作
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
......
ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), numMessagesInBatch, messageMetadata, encryptedPayload);
//對整個batch構(gòu)建一個OpSendMsg
OpSendMsg op = OpSendMsg.create(messages, cmd, messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), firstCallback);
......
return op;
}
//對batch進行壓縮,并將壓縮后信息更新到messageMetadata中
private ByteBuf getCompressedBatchMetadataAndPayload() {
......
int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes();
ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
batchedMessageMetadataAndPayload.release();
......
return compressedPayload;
}2.2.6 pending隊列

Pulsar 中的 pendingMessages隊列是客戶端用來暫存“未處理完成的消息”的一個緩存隊列。用于存儲當Producer連接到 Broker 服務器后,還未發(fā)送或尚未得到 Broker 系統(tǒng)的 ACK 確認的所有生產(chǎn)者(Producer)的消息。在發(fā)送消息之前,Producer 首先會將消息緩存到 pendingMessages 隊列中,并記錄保存緩存消息的OpSendMsg對象,直到它被成功發(fā)送到了 Broker 端并收到 Broker 發(fā)送的ACK 確認之后,相關(guān)的元信息和消息信息才會從隊列中移除。
需要注意的是:pending隊列的本質(zhì)是一個回調(diào)處理隊列,而不是發(fā)送隊列,消息在放入pending隊列的同時就被異步發(fā)送到服務端了,所以這里需要重點理解什么是“未處理完成的消息”。
pendingMessages 隊列的作用在于:對于已經(jīng)發(fā)送但尚未收到 ACK 確認的消息,防止在連接出現(xiàn)異常時丟失消息。當連接中斷時,緩存在 pendingMessages 隊列中的未確認消息將被認為是需要重發(fā)的,當連接恢復時,緩存的消息將重新發(fā)送到 Broker 端,以確保生產(chǎn)者生產(chǎn)的消息不會丟失。
總的來說,pendingMessages 隊列是 Pulsar 客戶端保證消息可靠性和一致性的關(guān)鍵功能組件,在 Pulsar 的生產(chǎn)者(Producer)和消息確認的機制中擔任著非常重要的角色。
關(guān)鍵代碼如下:
add() 方法用于在追加消息時將指定元素插入隊列中的隊尾,remove() 用于消息在完成后移除隊列頭部的元素。
protected void processOpSendMsg(OpSendMsg op) {
if (op == null) {
return;
}
try {
if (op.msg != null && isBatchMessagingEnabled()) {
batchMessageAndSend();
}
//將消息放入“待處理消息隊列”
pendingMessages.add(op);
......
// If we do have a connection, the message is sent immediately, otherwise we'll try again once a new
// connection is established
op.cmd.retain();
cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
......
}
//添加消息到pendingMessages隊列
public boolean add(OpSendMsg o) {
// postpone adding to the queue while forEach iteration is in progress
//batch的計數(shù)是按照batch中消息的總量進行計數(shù)
messagesCount.addAndGet(o.numMessagesInBatch);
if (forEachDepth > 0) {
if (postponedOpSendMgs == null) {
postponedOpSendMgs = new ArrayList<>();
}
return postponedOpSendMgs.add(o);
} else {
return delegate.add(o);
}
}
//將消息從pendingMessages隊列移除
public void remove() {
OpSendMsg op = delegate.remove();
if (op != null) {
messagesCount.addAndGet(-op.numMessagesInBatch);
}
}2.2.7 消息傳輸

Producer和broker都維護了分區(qū)維度的pending隊列來保證消息處理的順序性,以及實現(xiàn)消息重新發(fā)送、重新寫入持久化存儲的能力。在Producer端,消息被順序追加到pending隊列并異步發(fā)送到服務端,服務端的pending隊列在接收到消息后,按照順序追加到隊列中,并按照順序?qū)?shù)據(jù)寫入bookie進行持久化處理,處理完成后按照順序返回響應Producer,并將消息從broker pending和producer pending隊列中移除。
另外在數(shù)據(jù)傳輸過程中,無論是使用Pulsar Producer的同步發(fā)送還是異步發(fā)送,在消息傳輸環(huán)節(jié)本質(zhì)上都是使用netty將消息異步的從客戶端發(fā)送到服務端,區(qū)別在于send() 方法封裝了 sendAsync() 方法,使其可以在向服務器發(fā)送 Pulsar 消息時阻塞等待 Broker 的響應,直到確認消息已經(jīng)被 Broker 成功處理后才會返回,常規(guī)情況下,建議使用異步的方式發(fā)送 Pulsar 消息,因為同步方式必須在 Broker 端成功接收到消息之后才會返回,因此會帶來較大的性能損耗和延遲。但是在部分場景下,需要使用同步方式來保證可靠性,以防 Broker 端接收失敗,可以考慮使用 send() 方法實現(xiàn)同步方式的方式發(fā)送 Pulsar 消息。
使用netty執(zhí)行的代碼:
private static final class WriteInEventLoopCallback implements Runnable {
......
@Override
public void run() {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Sending message cnx {}, sequenceId {}", producer.topic, producer.producerName, cnx,
sequenceId);
}
try {
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
op.updateSentTimestamp();
} finally {
recycle();
}
}
......
}2.2.8 處理響應

Pulsar Producer 使用“ACK 跟蹤機制”來實現(xiàn)對 Broker 返回的 ACK 確認消息的處理,用于檢測和處理到達生產(chǎn)者的全部消息狀態(tài)信息。
對于Producer發(fā)送的消息,Pulsar會對每個消息分配一個唯一的 sequenceId 序號,并記錄該消息的創(chuàng)建時間(createdAt)等元數(shù)據(jù)信息。當 Broker 確認收到某個消息時,Producer 會依據(jù)返回的 ACK 序號和 Broker 返回的確認時間來判斷當前 ACK 是否有效,并從已緩存的 pendingMessages 隊列中找到對應的消息元數(shù)據(jù)信息,以進行確認處理,在 Broker 確認消息接收成功時,Producer 將從等待確認的消息隊列中刪除對應的消息元數(shù)據(jù)信息,如果 Broker 返回的 ACK 消息不符合生產(chǎn)者預期的消息狀態(tài)信息,Producer 將會重發(fā)消息,直到重試成功或多次重試失敗后拋出異常后再從隊列中移除對應消息元數(shù)據(jù)信息并釋放對應內(nèi)存、信號量等資源。
消息重發(fā)的關(guān)鍵代碼如下:
private void resendMessages(ClientCnx cnx, long expectedEpoch) {
cnx.ctx().channel().eventLoop().execute(() -> {
synchronized (this) {
//判斷連接狀態(tài):當連接正在關(guān)閉或者已經(jīng)關(guān)閉則不進行重發(fā)
if (getState() == State.Closing || getState() == State.Closed) {
// Producer was closed while reconnecting, close the connection to make sure the broker
// drops the producer on its side
cnx.channel().close();
return;
}
......
//調(diào)用重發(fā)消息方法
recoverProcessOpSendMsgFrom(cnx, null, expectedEpoch);
}
});
}
// Must acquire a lock on ProducerImpl.this before calling method.
private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long expectedEpoch) {
......
final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion();
Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
OpSendMsg pendingRegisteringOp = null;
while (msgIterator.hasNext()) {
OpSendMsg op = msgIterator.next();
......
op.cmd.retain();
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Re-Sending message in cnx {}, sequenceId {}", topic, producerName,
cnx.channel(), op.sequenceId);
}
//發(fā)送消息
cnx.ctx().write(op.cmd, cnx.ctx().voidPromise());
op.updateSentTimestamp();
stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
}
cnx.ctx().flush();
......
}三、Pulsar 數(shù)據(jù)發(fā)送端參數(shù)調(diào)優(yōu)實踐
根據(jù)以上對原理解析,我們對Producer已經(jīng)有了一個大致理解,下面通過一個Producer參數(shù)調(diào)優(yōu)實踐案例來幫助讀者基于原理進一步理解客戶端參數(shù)之間的聯(lián)系。
3.1 調(diào)優(yōu)目的
首先要清楚為什么要進行參數(shù)調(diào)優(yōu),有以下兩個目的:
- 降低參數(shù)使用門檻:
Pulsar client和Producer的幾十個配置參數(shù),參數(shù)多且聯(lián)系緊密,需要花費較多的時間成本去理解,同時參數(shù)之間存在協(xié)同生效互相影響的情況,對普通使用者而言場景復雜理解門檻高,我們希望能夠有一套較為通用的參數(shù)配置,或有公式化的參數(shù)配置方法論。 - 提升單機處理性能:
站在客戶端的角度,相同時間內(nèi)處理的數(shù)據(jù)量越多,則認為單機處理性能更強。作為中間件系統(tǒng)的提供者,我們經(jīng)常認為性能提升是服務端的事情,想盡辦法在pulsar的broker和bookie上去提升單機處理性能,但pulsar client作為整個消息中間件系統(tǒng)的核心組件,它能否發(fā)送好一份數(shù)據(jù),對整個消息中間件系統(tǒng)的性能和穩(wěn)定性也發(fā)揮著至關(guān)重要的作用。
3.2 調(diào)優(yōu)實踐
下面就圍繞“參數(shù)通用模版化”和“提升單機處理性能”兩個目的出發(fā)并結(jié)合上述講解的數(shù)據(jù)發(fā)送原理,來分享一些實踐經(jīng)驗。
3.2.1 關(guān)聯(lián)與場景相關(guān)的重點參數(shù)
Pulsar客戶端參數(shù)雖多但都提供了默認值,不需要一一調(diào)整。只需要對業(yè)務場景相關(guān)的針對性的去調(diào)整即可,如我們本次的參數(shù)調(diào)優(yōu)目的是提升單機處理性能,則重點關(guān)注哪些場景哪些參數(shù)可以提升客戶端的發(fā)送速率、降低服務端的壓力,讓服務端可以處理更多的數(shù)據(jù),有以下四點最為關(guān)鍵:
- batch打包發(fā)送:
消息多條批次發(fā)送,在降低客戶端和服務端網(wǎng)絡IO的同時也降低了兩者的cpu的負載。這里需強調(diào)的是我們希望batch是一個均勻的、“完整”的包,如pending隊列被打滿,batch只能空等到延遲發(fā)送時間過后被發(fā)送,沒有構(gòu)建出預期中的batch,那么可以認為這個batch是一個不完整的包,這種batch包含的數(shù)據(jù)量少,對發(fā)送效率有著極大的影響。 - 數(shù)據(jù)壓縮:
Pulsar是IO密集型系統(tǒng),常規(guī)情況下磁盤是系統(tǒng)的主要瓶頸,開啟壓縮可以有效降低網(wǎng)絡I/O,提升處理相同數(shù)據(jù)量下的讀寫能力。由于壓縮是針對batch的,在發(fā)送時間一定的情況下,batch越大其壓縮效果也越好,代表著處理的消息量也更多。 - RoundRobin發(fā)送:
將數(shù)據(jù)均勻地分配到多個分區(qū)中。它的基本思想是輪詢將新的數(shù)據(jù)寫入到不同的分區(qū)中,以均衡地分散負載。 - 消息堆積控制:maxPendingMessages信號量和memoryLimit限制不直接提升發(fā)送速率,但它能夠有效保障我們客戶端的穩(wěn)定,也是控制或限制發(fā)送效率的重要參數(shù)之一。
涉及的客戶端關(guān)鍵參數(shù)以及默認值和我們線上調(diào)優(yōu)后設置的數(shù)值如下表:

3.2.2 結(jié)合Producer發(fā)送原理分析參數(shù)的效果
接下來我們以參數(shù)的效用角度來描述一條消息從構(gòu)建到發(fā)送的過程,進一步解釋參數(shù)如此設置的意義:
(1)選擇分區(qū)
構(gòu)建消息后,通過messageRoutingMode參數(shù)所設置的路由策略來選擇分區(qū),這里以RoundRobinPartition為路由策略,開啟batch時則每間隔partitionSwitchMs時間換一個分區(qū)進行數(shù)據(jù)發(fā)送,partitionSwitchMs的值為“batchingPartitionSwitchFrequencyByPublish
-Delay、batchingMaxPublishDelayMicros”這兩個Producer參數(shù)之積,也就是每batchingPartition
-SwitchFrequencyByPublishDelay個batch的最大打包時間,消息就會輪換一個分區(qū)發(fā)送。
為了能在batchingMaxPublishDelayMicros內(nèi)得到一個較大的包,我們希望這個batch接收的消息是連續(xù)的,因此batchingPartitionSwitchFrequency-
ByPublishDelay不能小于1,同時也希望一個分區(qū)之間數(shù)據(jù)是較為均勻的,所以batchingPartition-
SwitchFrequencyByPublishDelay也要盡量小,否則分區(qū)對應的信號量maxPendingMessages耗盡還沒有切換分區(qū),就會導致batch必須等待一個batchingMaxPublishDelayMicros時間。因此將batchingPartitionSwitchFrequencyByPublishDelay修改成了1,保證打包了一個batch之后就切換分區(qū),這也極大的避免了分區(qū)信號量耗盡,出現(xiàn)發(fā)送阻塞。
(2)消息堆積控制
maxPendingMessages作為分區(qū)的信號量,也是“pending隊列”的大小,代表著每個分區(qū)能夠同時處理的最大消息上限,而maxPendingMessages-
AcrossPartitions則是針對整個topic生效的,maxPendingMessages=min( maxPending-
Messages,maxPendingMessagesAcrossPartitions/Partition),由于線上分區(qū)可能會變化,有不確定性,因此就使用上而言除非有特殊的使用場景,建議將maxPendingMessagesAcrossPartitions設置的比較大,讓maxPendingMessages生效即可。
除了maxPendingMessages以外,消息能否接收被放入pending隊列中,還要看當前正在處理的消息體大小總和是否超過了memoryLimit參數(shù)的限制,memoryLimit控制了消息待處理隊列中未壓縮前的消息有效荷載總和,可以避免在消息有效荷載非常大時,還未觸發(fā)maxPendingMessages限制,就導致內(nèi)存占用過多出現(xiàn)頻繁GC和oom的問題。由于memoryLimit是client級別的策略,因此也建議一個client對應一個Poducer。
總而言之maxPendingMessages控制了每個分區(qū)可以處理消息數(shù)量的上限,memoryLimit控制了所有分區(qū)可以消息占用內(nèi)存的上限,兩者相輔相成。
(3)消息batch容器打包
決定一個batch是否打包完成有三個條件控制,batchingMaxBytes、batchingMaxMessages、batchingMaxPublishDelayMicros滿足其一即可,根據(jù)這三個參數(shù)的含義去設置值看似是容易的,但容易忽略的是batch中用來打包的消息也是受memoryLimit和maxPendingMessages制約的,應該避免出現(xiàn)batch中消息的數(shù)量超過memoryLimit和maxPendingMessages導致batch打包效率受影響。舉個例子,當maxPendingMessages設置為500,而batchingMaxMessages設置1000時,batch就永遠無法滿足消息條數(shù)達到1000的條件,只能空等batchingMaxPublishDelayMicros或者batchingMaxBytes兩者生效。
3.2.3 公式化模版
通過上述分析,大致了解了關(guān)鍵參數(shù)的生效效果,且彼此相互關(guān)聯(lián),根據(jù)這些關(guān)系就能夠輸出一個較為簡單的參數(shù)調(diào)優(yōu)模版。
假設我們發(fā)送的單條消息大小為:messageByte;分區(qū)數(shù)量為:partitionNum。
那么對應參數(shù)調(diào)整公式如下:
//業(yè)務發(fā)送速率越大,這里設置的值越大
maxPendingMessages:一般1000-2000之間
//這里值也可以設置大一些,讓maxPendingMessages生效即可
maxPendingMessagesAcrossPartitions = maxPendingMessages * partitionNum
//memoryLimit的值就是打算阻塞總消息大小,這與消息體和maxPendingMessages有關(guān)
memoryLimit=(maxPendingMessages * partitionNum * messageByte)
//batch的條數(shù)不超過“待處理消息隊列”大小的一半
batchingMaxMessages=maxPendingMessages/2,這樣可以保證在消息發(fā)送等待ack的時候,該分區(qū)剩下一半的空間還能用來構(gòu)建一個batch
//batch大小同理,batch大小不超過“待處理消息隊列”消息大小的一半
batchingMaxBytes= Math.min(memoryLimit * 1024 * 1024 /partitionNum/2,1048576)
//業(yè)務能夠接受的延遲大小,一般延遲時間越大,batch越大
batchingMaxPublishDelayMicros=1ms-100m皆可
//每構(gòu)建一個batch就轉(zhuǎn)換一個分區(qū)
batchingPartitionSwitchFrequencyByPublishDelay=1可以看到根據(jù)上面的分析,參數(shù)之間是有一個模版化的公式,但這也不是唯一的,讀者可以根據(jù)自己的業(yè)務場景進行調(diào)整。在真實使用過程中線上的消息大小以及分區(qū)數(shù)量實際上是會變化的,因此真正的參數(shù)設置還需要根據(jù)實際情況來確定,比如我們線上通常的做法是根據(jù)機器配置將memoryLimit直接設置為64M-256M,分區(qū)數(shù)量我們線上不會超過1000,那么這里就假設為1000,確定了這兩個參數(shù),其他的參數(shù)的值也就確定了。
3.2.4 效果對比
以線上一個業(yè)務參數(shù)調(diào)優(yōu)為例,前后都開啟壓縮的情況下調(diào)整上述參數(shù)后的一個效果。
服務端(Pulsar):


優(yōu)化前后對比數(shù)據(jù):

相同的寫入速率,Pulsar服務端網(wǎng)卡流量縮減約50%(batch包體增加,壓縮效果提升),cpu負載降低約90%,Pulsar服務端總體成本相較優(yōu)化前至少可降低50%以上,客戶端也有一定程度的負載降低。
參數(shù)調(diào)整后,CPU負載得到明顯降低,一定程度上避免了CPU成為系統(tǒng)的瓶頸,同時由于壓縮效果的提升,Pulsar 的磁盤IO負載得到顯著降低,可以用更少的機器處理更多的數(shù)據(jù)。
四、總結(jié)
理解Producer發(fā)送原理以及核心參數(shù)是寫好數(shù)據(jù)發(fā)送程序最為有效的手段,最簡單的客戶端參數(shù)優(yōu)化反而隱藏了巨大的收益。本文通過對Producer原理進行剖析、對消息的流轉(zhuǎn)過程中參數(shù)效用進行講解,并配合參數(shù)調(diào)優(yōu)實踐案例,介紹了具體的分析思路和調(diào)優(yōu)的方法,在實際使用過程中通過對核心的幾個上游系統(tǒng)進行調(diào)優(yōu),服務端單機處理能力至少提升了一倍以上,成本得到了極大的降低。
參考文章:


























