偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

深度剖析 Kafka 日志保留與數(shù)據(jù)清理策略

開發(fā)
本文從原理到源碼詳細分析了 Kafka 的日志保留和數(shù)據(jù)清理策略,Kafka 能夠在保證數(shù)據(jù)持久化的同時,最大限度地利用存儲資源。。

Log 是Kafka的核心組件之一,用于持久化存儲消息,為了有效管理存儲空間和保證系統(tǒng)性能,Kafka 提供了日志保留和數(shù)據(jù)清理策略。這篇文章,我將詳細分析它們的工作原理。

一、日志保留

Kafka 的日志保留策略決定了消息在 Kafka 中存儲的時間長度,保留策略可以基于時間或日志大小來配置。當(dāng)消息超過指定的保留時間或日志大小限制時,Kafka 將自動清理這些消息以釋放存儲空間。

1.日志保留配置

Kafka 提供了多種配置選項以控制日志保留策略:

  • log.retention.hours: 定義消息在日志中保留的時間(以小時為單位),默認值為 168 小時(7 天)。
  • log.retention.minutes: 以分鐘為單位的保留時間。
  • log.retention.ms: 以毫秒為單位的保留時間。
  • log.retention.bytes: 定義每個日志分區(qū)允許使用的最大存儲空間,當(dāng)達到此限制時,最早的消息將被刪除。

需要注意的是,時間和大小限制是互斥的,Kafka 將依據(jù)首先滿足的條件來清理日志。

2.日志清理策略

Kafka 提供兩種主要的日志清理策略:

  • 刪除策略(delete): 在達到保留期后刪除舊數(shù)據(jù)。
  • 壓縮策略(compact): 針對具有相同鍵的記錄,只保留最新版本。

默認情況下,Kafka 使用刪除策略。日志清理策略可以通過 log.cleanup.policy 配置,其中 delete 和 compact 都可以作為其值。

二、日志清理

Kafka 的日志清理是在后臺運行的,它并不影響正常的讀寫操作,日志清理策略主要包含刪除策略和壓縮策略 2種類型:

1.刪除策略

刪除策略是最簡單的日志清理機制,Kafka 定期檢查日志分區(qū)的時間戳或大小,當(dāng)某個分區(qū)超過指定的保留時間或大小時,系統(tǒng)會刪除該分區(qū)的舊日志段(Log Segment)。具體過程如下:

  • 檢查條件: Kafka 定期比較當(dāng)前時間與日志段創(chuàng)建時間的差值,或檢查日志分區(qū)的大小是否超過配置的限制。
  • 標記刪除: 符合刪除條件的日志段被標記為刪除。
  • 物理刪除: 在下一個清理周期中,Kafka 將實際刪除這些標記的日志段以釋放磁盤空間。

2.壓縮策略

壓縮策略主要用于僅保留每個鍵的最新消息版本,它適用于更新頻繁的場景,例如數(shù)據(jù)庫變更日志。壓縮策略的工作流程如下:

  • 收集日志段: Kafka 定期掃描日志段,識別出需要壓縮的段。
  • 構(gòu)建索引: 為每個日志段構(gòu)建一個映射,記錄每個鍵的最新偏移量。
  • 合并日志段: 確定每個鍵的最新消息后,Kafka 將這些消息寫入新的日志段。
  • 替換舊日志段: 新日志段生成后,Kafka 替換舊的日志段,并在下次清理時刪除舊段。

三、核心源碼分析

為了更深入理解 Kafka 的日志清理機制,接下來會分析幾個相關(guān)的核心源碼類:

1.LogCleaner 類

LogCleaner 是 Kafka 中負責(zé)日志壓縮(compaction)的核心組件之一,它的主要功能是定期掃描 Kafka 日志,并對其進行壓縮,以確保每個鍵只保留最新的值。下面是對 LogCleaner 源碼的詳細分析。

(1) LogCleaner 的基本結(jié)構(gòu)

LogCleaner 繼承自 ShutdownableThread,這意味著它是一個可以安全關(guān)閉的后臺線程,其主要職責(zé)是從需要壓縮的日志中清除冗余消息。

public class LogCleaner extends ShutdownableThread {
    // 主要成員變量
    private final CleanerConfig config;
    private final OffsetCheckpoint checkpoint;
    private final Time time;
    private final Cleaner cleaner;

    public LogCleaner(String name, CleanerConfig config, OffsetCheckpoint checkpoint, Time time) {
        super(name, true);
        this.config = config;
        this.checkpoint = checkpoint;
        this.time = time;
        this.cleaner = new Cleaner(config, time);
    }

    @Override
    public void doWork() {
        // 核心清理邏輯
    }

(2) 核心方法分析

① doWork()

doWork() 是 LogCleaner 的核心方法,它被定期調(diào)用以執(zhí)行日志壓縮任務(wù)。

@Override
public void doWork() {
    // 從清理隊列中獲取下一個需要清理的日志
    LogToClean logToClean = cleanerManager.grabFilthiestLog();
    if (logToClean != null) {
        try {
            // 執(zhí)行壓縮
            cleaner.clean(logToClean);
        } finally {
            // 釋放資源
            cleanerManager.doneCleaning(logToClean);
        }
    } else {
        // 如果沒有日志需要清理,則線程休眠一段時間
        time.sleep(config.backOffMs);
    }
}

該方法的主要步驟包括:

  • 從 cleanerManager 中獲取下一個需要清理的日志。
  • 調(diào)用 cleaner.clean() 方法對日志進行壓縮。
  • 完成后,釋放資源并更新清理狀態(tài)。

② clean()

clean() 方法是 Cleaner 類中的一個重要方法,負責(zé)具體的日志壓縮操作。

public void clean(LogToClean logToClean) {
    // 獲取需要壓縮的日志段
    List<LogSegment> segments = logToClean.segments();
    
    // 創(chuàng)建一個新的日志段用于存儲壓縮后的數(shù)據(jù)
    LogSegment newSegment = new LogSegment(...);

    // 遍歷舊段,壓縮數(shù)據(jù)并寫入新段
    for (LogSegment segment : segments) {
        // 讀取每個消息
        for (MessageAndOffset message : segment) {
            // 檢查是否是最新的消息
            if (isLatest(message)) {
                newSegment.append(message);
            }
        }
    }

    // 替換舊段
    logToClean.replaceSegments(newSegment);
}

clean() 方法的主要步驟包括:

  • 獲取需要壓縮的日志段。
  • 創(chuàng)建新的日志段以存儲壓縮后的數(shù)據(jù)。
  • 遍歷舊日志段,選出每個鍵的最新消息并寫入新段。
  • 替換舊日志段為新段。

2.LogSegment 類

LogSegment 是 Kafka 中表示日志文件的基本單位。每個 Kafka 主題分區(qū)由多個日志段(LogSegment)組成。每個日志段包括一個日志文件和一個索引文件。下面是對 LogSegment 類的源碼分析,幫助理解其結(jié)構(gòu)和功能。

(1) LogSegment 的基本結(jié)構(gòu)

LogSegment 類位于 Kafka 的 log 包中,表示一個日志段。它包含兩個主要文件:數(shù)據(jù)文件(存儲消息)和索引文件(存儲消息的偏移量)。

public class LogSegment {
    private final File log;
    private final FileMessageSet messageSet;
    private final OffsetIndex index;
    private final TimeIndex timeIndex;
    private final long baseOffset;
    private final long created;
    private final AtomicLong nextOffset;
    private final AtomicLong nextTimeIndexEntry;
    // 其他成員變量和方法
}

(2) 核心構(gòu)造函數(shù)

LogSegment 的構(gòu)造函數(shù)負責(zé)初始化日志段的各個組件,包括數(shù)據(jù)文件和索引文件。

public LogSegment(File logFile,
                  FileMessageSet messageSet,
                  OffsetIndex offsetIndex,
                  TimeIndex timeIndex,
                  long baseOffset,
                  long created) {
    this.log = logFile;
    this.messageSet = messageSet;
    this.index = offsetIndex;
    this.timeIndex = timeIndex;
    this.baseOffset = baseOffset;
    this.created = created;
    this.nextOffset = new AtomicLong(baseOffset);
    this.nextTimeIndexEntry = new AtomicLong(baseOffset);
}

(3) 主要方法分析

① append()

append() 方法用于向日志段追加消息,它將消息寫入數(shù)據(jù)文件,并在索引文件中記錄偏移量信息。

public void append(long offset, RecordBatch batch) {
    // 將消息追加到數(shù)據(jù)文件
    int physicalPosition = messageSet.append(batch);

    // 更新偏移量索引
    index.append(offset, physicalPosition);

    // 更新時間索引
    if (batch.maxTimestamp() > 0) {
        timeIndex.maybeAppend(batch.maxTimestamp(), offset);
    }

    // 更新下一個可用偏移量
    nextOffset.set(offset + 1);
}

② read()

read() 方法用于從日志段讀取消息,它根據(jù)給定的偏移量和大小,返回相應(yīng)的消息集合。

public FileMessageSet read(long startOffset, int maxSize) {
    // 計算讀取的起始位置和大小
    int startPosition = index.lookup(startOffset).position;
    return messageSet.read(startPosition, maxSize);
}

③ delete()

delete() 方法用于刪除日志段的物理文件,它會刪除數(shù)據(jù)文件和索引文件。

public void delete() {
    boolean deletedLog = log.delete();
    boolean deletedIndex = index.delete();
    boolean deletedTimeIndex = timeIndex.delete();
    if (!deletedLog || !deletedIndex || !deletedTimeIndex) {
        throw new KafkaException("Failed to delete log segment files.");
    }
}

四、優(yōu)化建議

Kafka 的日志清理機制可以通過多種配置進行優(yōu)化,以適應(yīng)不同的業(yè)務(wù)需求。以下是一些常見的優(yōu)化建議:

  • 合理設(shè)置保留時間:根據(jù)數(shù)據(jù)的重要性和訪問頻率,合理設(shè)置日志的保留時間。對于不常訪問的數(shù)據(jù),可以適當(dāng)縮短保留時間,以節(jié)省存儲空間。
  • 調(diào)整日志段大?。和ㄟ^設(shè)置 log.segment.bytes,可以控制每個日志段的大小。適當(dāng)?shù)娜罩径未笮】梢蕴岣咔謇硇?,避免頻繁的段切換。
  • 配置清理線程:Kafka 允許配置清理線程的數(shù)量和頻率。通過 log.cleaner.threads 和 log.cleaner.interval.ms 配置,可以優(yōu)化清理線程的性能。

五、總結(jié)

本文,我們從原理到源碼詳細分析了 Kafka 的日志保留和數(shù)據(jù)清理策略,在日常工作種,通過合理配置和優(yōu)化這些策略,Kafka 能夠在保證數(shù)據(jù)持久化的同時,最大限度地利用存儲資源。

責(zé)任編輯:趙寧寧 來源: 猿java
相關(guān)推薦

2024-08-07 10:54:27

MySQL日志策略

2010-02-05 15:33:29

Android JDK

2022-05-05 10:00:53

Kafka分區(qū)分配Linux

2010-05-20 18:05:38

2024-12-24 14:01:10

2022-11-07 09:25:02

Kafka存儲架構(gòu)

2012-02-17 10:50:10

Java

2016-11-25 20:52:14

Linux

2024-07-29 00:01:00

RabbitMQ消息堆積

2011-11-21 15:04:30

2022-09-27 18:56:28

ArrayList數(shù)組源代碼

2025-06-04 08:30:00

seata分布式事務(wù)開發(fā)

2024-02-05 19:06:04

DartVMGC流程

2025-05-27 01:20:00

向量數(shù)據(jù)庫HNSWANN索引算法

2025-02-07 12:11:52

2023-10-12 19:41:55

2025-01-02 10:19:18

2018-10-29 13:07:15

HBase存儲遷移

2017-06-23 18:25:51

kafka數(shù)據(jù)可靠性

2025-06-09 07:45:00

點贊
收藏

51CTO技術(shù)棧公眾號