RocketMQ結(jié)合源碼告訴你消息量大為啥不需要手動(dòng)壓縮消息
背景
最近同事發(fā)現(xiàn)線上發(fā)送的RocketMQ消息太大,同事為了節(jié)省網(wǎng)絡(luò)帶寬和存儲(chǔ)空間,手動(dòng)壓縮消息然后再進(jìn)行消息發(fā)送,發(fā)現(xiàn)磁盤也沒有明顯的縮減。
所以我打算結(jié)合源碼告訴他RocketMQ自帶的消息壓縮。
RocketMQ版本
- 5.1.0
為什么需要壓縮消息
首先說一下為什么需要消息壓縮,原因其實(shí)很簡(jiǎn)單。就是為了節(jié)省網(wǎng)絡(luò)帶寬和存儲(chǔ)空間。
在哪里壓縮消息
我們的消息壓縮可以在很多個(gè)地方進(jìn)行。
有兩種方案
在client端進(jìn)行壓縮
比如我們可以在Producer發(fā)送消息的時(shí)候進(jìn)行消息壓縮。
然后將壓縮后的消息發(fā)送到Broker,broker只管存儲(chǔ)。
等到consumer需要消息的時(shí)候,原封不動(dòng)的推送給消費(fèi)者,由consumer自己進(jìn)行解壓縮。
這種方式的好處是broker不需要關(guān)心消息的壓縮和解壓縮,只需要存儲(chǔ)消息即可。
在broker端進(jìn)行壓縮
這種方式就是Producer發(fā)送消息的時(shí)候,不進(jìn)行壓縮。
RocketMQ在存儲(chǔ)的時(shí)候自己進(jìn)行消息壓縮,consumer進(jìn)行消息拉取的時(shí)候,broker進(jìn)行消息解壓縮,然后推送給consumer。
這種方式就是消耗broker cpu,也不能節(jié)省網(wǎng)絡(luò)帶寬,只能節(jié)省存儲(chǔ)空間。
所以很明顯是在client端進(jìn)行壓縮比較好。
源碼分析
這里我們來具體結(jié)合源碼分析下:
在消息發(fā)送org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl方法中會(huì)對(duì)消息進(jìn)行壓縮判斷。
圖片
tryToCompressMessage 消息壓縮
什么消息會(huì)被壓縮呢?
private boolean tryToCompressMessage(final Message msg) {
if (msg instanceof MessageBatch) {
//batch does not support compressing right now
return false;
}
byte[] body = msg.getBody();
if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try {
byte[] data = compressor.compress(body, compressLevel);
if (data != null) {
msg.setBody(data);
return true;
}
} catch (IOException e) {
log.error("tryToCompressMessage exception", e);
log.warn(msg.toString());
}
}
}
return false;
}
- 批量消息不支持壓縮。
- 消息體長(zhǎng)度大于defaultMQProducer.getCompressMsgBodyOverHowmuch()的時(shí)候進(jìn)行壓縮。默認(rèn)1024 * 4 = 4kb。
- 壓縮算法是什么呢?
RocketMQ目前提供三種壓縮算法
- LZ4
- ZSTD
- ZLIB
默認(rèn)壓縮算法為ZLIB。
private CompressionType compressType = CompressionType.of(System.getProperty(MixAll.MESSAGE_COMPRESS_TYPE, "ZLIB"));
壓縮等級(jí)為5。
private int compressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
- 消息壓縮完后會(huì)通過sysFlag進(jìn)行標(biāo)記,表示消息進(jìn)行了壓縮,方便后續(xù)解壓。
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
sysFlag |= compressType.getCompressionFlag();
消息解壓
消息解壓主要是在方法org.apache.rocketmq.common.message.MessageDecoder.decode(java.nio.ByteBuffer, boolean, boolean, boolean, boolean, boolean)中進(jìn)行的。
在client拉取到消息成功后對(duì)PullResult對(duì)象進(jìn)行處理執(zhí)行decodesBatch方法。
圖片
消息解析decodesBatch方法會(huì)調(diào)用org.apache.rocketmq.common.message.MessageDecoder.decode(java.nio.ByteBuffer, boolean, boolean, boolean, boolean, boolean)方法。
decode方法會(huì)對(duì)消息進(jìn)行解壓。
總結(jié)
- 消息壓縮主要是為了節(jié)省網(wǎng)絡(luò)帶寬和存儲(chǔ)空間。
- RocketMQ提供了三種壓縮算法,分別是LZ4、ZSTD、ZLIB,默認(rèn)為ZLIB。
- 消息壓縮主要是在Producer發(fā)送消息的時(shí)候進(jìn)行壓縮,broker只管存儲(chǔ)。
- 消息解壓主要是在Consumer拉取消息的時(shí)候進(jìn)行解壓。
- RocketMQ消息壓縮僅支持單條消息壓縮,不支持批量消息壓縮。
- 一般消息壓縮都會(huì)選擇在client端進(jìn)行壓縮,這樣可以節(jié)省broker的cpu。