偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

DelayedOperation:Broker是怎么延時(shí)處理請(qǐng)求的?

云計(jì)算 Kafka
通過(guò)分析Kafka中的?Timer?和?SystemTimer?類,我們深入了解了Kafka如何通過(guò)分層時(shí)間輪實(shí)現(xiàn)高效的延時(shí)任務(wù)調(diào)度機(jī)制。Kafka的延時(shí)處理不僅應(yīng)用于消費(fèi)者組協(xié)調(diào)器,還廣泛用于副本管理、控制器等模塊。

今天我們來(lái)深入探討Kafka中的延遲處理機(jī)制,即通過(guò)DelayedOperation來(lái)實(shí)現(xiàn)的延時(shí)處理請(qǐng)求。具體來(lái)說(shuō),Kafka使用了一種名為“分層時(shí)間輪”的數(shù)據(jù)結(jié)構(gòu)來(lái)管理延時(shí)任務(wù),并通過(guò)它實(shí)現(xiàn)了對(duì)延遲請(qǐng)求的高效處理。這種延時(shí)機(jī)制廣泛應(yīng)用于Kafka的各個(gè)模塊,比如控制器、分區(qū)管理、副本同步等。

本節(jié)課我們將通過(guò)分析Kafka的相關(guān)源碼,詳細(xì)講解DelayedOperation是如何在Broker中延時(shí)處理請(qǐng)求的。同時(shí),我們還會(huì)講解兩個(gè)關(guān)鍵類:Timer和SystemTimer,看看它們是如何與Kafka的整體框架結(jié)合的。

一、Kafka延時(shí)處理機(jī)制概述

Kafka中延遲請(qǐng)求的處理場(chǎng)景非常多,比如:

  • 消費(fèi)者組協(xié)調(diào)器:處理消費(fèi)者組中的成員加入和離開(kāi)時(shí)的超時(shí)。
  • 控制器:在處理集群元數(shù)據(jù)的變化時(shí)需要對(duì)副本分配、Leader選舉進(jìn)行延時(shí)操作。
  • 副本管理:當(dāng)副本與Leader失聯(lián)時(shí),需要延遲一段時(shí)間再?zèng)Q定是否剔除該副本。

Kafka為了應(yīng)對(duì)這些場(chǎng)景,使用了一種高效的延時(shí)處理機(jī)制:分層時(shí)間輪(Hierarchical Timing Wheels)。這個(gè)數(shù)據(jù)結(jié)構(gòu)通過(guò)將延時(shí)任務(wù)按照超時(shí)時(shí)間分層存儲(chǔ),極大地提高了處理大量延時(shí)任務(wù)的性能。

1.1 什么是分層時(shí)間輪?

分層時(shí)間輪是一種常用于處理延遲任務(wù)的數(shù)據(jù)結(jié)構(gòu),它的核心思想是將時(shí)間分為一系列固定大小的時(shí)間槽(Bucket),每個(gè)槽對(duì)應(yīng)一個(gè)時(shí)間段。延時(shí)任務(wù)會(huì)根據(jù)它的超時(shí)時(shí)間被放入相應(yīng)的時(shí)間槽中,時(shí)間輪會(huì)隨著時(shí)間推移不斷向前轉(zhuǎn)動(dòng),每當(dāng)轉(zhuǎn)到某個(gè)時(shí)間槽時(shí),執(zhí)行其中的所有任務(wù)。

Kafka實(shí)現(xiàn)的分層時(shí)間輪有多個(gè)層次,每一層的時(shí)間槽覆蓋不同的時(shí)間范圍。隨著層次的增加,每個(gè)時(shí)間槽覆蓋的時(shí)間也逐漸變大。這樣設(shè)計(jì)的好處是,可以通過(guò)較少的層次和時(shí)間槽來(lái)管理大范圍的延時(shí)任務(wù)。

二、核心類:Timer 和 SystemTimer

在Kafka中,延時(shí)任務(wù)的管理由兩個(gè)關(guān)鍵類負(fù)責(zé):

  • Timer:這是時(shí)間輪的抽象接口,定義了延時(shí)任務(wù)的調(diào)度方法。
  • SystemTimer:這是Timer的具體實(shí)現(xiàn),使用分層時(shí)間輪來(lái)管理任務(wù)。

接下來(lái),我們通過(guò)源碼詳細(xì)了解這兩個(gè)類的實(shí)現(xiàn)。

2.1 Timer接口

首先來(lái)看Timer接口,這是Kafka中用于管理延時(shí)任務(wù)的通用接口。它的主要方法包括:

public interface Timer {

    /**
     * 添加一個(gè)延時(shí)操作到定時(shí)器中。
     */
    void add(DelayedOperation operation);

    /**
     * 觸發(fā)到期的延時(shí)操作。
     */
    boolean advanceClock(long timeoutMs) throws InterruptedException;

    /**
     * 檢查定時(shí)器中是否有待執(zhí)行的操作。
     */
    int size();

    /**
     * 關(guān)閉定時(shí)器。
     */
    void shutdown();
}
  • add(DelayedOperation operation):將一個(gè)延時(shí)任務(wù)添加到時(shí)間輪中。
  • advanceClock(long timeoutMs):推進(jìn)時(shí)間輪的時(shí)鐘,觸發(fā)已經(jīng)到期的延時(shí)任務(wù)。
  • size():返回當(dāng)前定時(shí)器中未執(zhí)行的任務(wù)數(shù)。
  • shutdown():關(guān)閉定時(shí)器,停止任務(wù)調(diào)度。

Timer接口為Kafka中所有延時(shí)任務(wù)的管理提供了統(tǒng)一的抽象,各個(gè)模塊的延時(shí)任務(wù)都通過(guò)這個(gè)接口進(jìn)行調(diào)度。

2.2 SystemTimer類

SystemTimer是Timer接口的具體實(shí)現(xiàn),它使用了分層時(shí)間輪來(lái)管理延時(shí)任務(wù)。我們來(lái)看一下它的主要實(shí)現(xiàn):

public class SystemTimer implements Timer {

    private final String executorName;
    private final TimerTaskList[] timeWheel;
    private final long tickMs;
    private final int wheelSize;
    private final long startMs;

    // 構(gòu)造函數(shù),初始化時(shí)間輪
    public SystemTimer(String executorName, long tickMs, int wheelSize) {
        this.executorName = executorName;
        this.tickMs = tickMs;
        this.wheelSize = wheelSize;
        this.timeWheel = new TimerTaskList[wheelSize];
        this.startMs = System.currentTimeMillis();
        // 初始化時(shí)間輪的每個(gè)Bucket
        for (int i = 0; i < wheelSize; i++) {
            timeWheel[i] = new TimerTaskList();
        }
    }

    @Override
    public void add(DelayedOperation operation) {
        long expiration = operation.expirationMs();
        long delayMs = expiration - System.currentTimeMillis();
        int bucketIndex = (int) ((delayMs / tickMs) % wheelSize);
        timeWheel[bucketIndex].add(operation);
    }

    @Override
    public boolean advanceClock(long timeoutMs) {
        long currentTimeMs = System.currentTimeMillis();
        int currentBucket = (int) ((currentTimeMs - startMs) / tickMs % wheelSize);
        // 處理當(dāng)前 Bucket 中的到期任務(wù)
        timeWheel[currentBucket].advance();
        return true;
    }

    @Override
    public int size() {
        int size = 0;
        for (TimerTaskList taskList : timeWheel) {
            size += taskList.size();
        }
        return size;
    }

    @Override
    public void shutdown() {
        // 清理所有未完成的任務(wù)
    }
}

SystemTimer的核心成員變量包括:

  • tickMs:時(shí)間輪的最小時(shí)間間隔,也就是時(shí)間輪每次轉(zhuǎn)動(dòng)的步長(zhǎng)。
  • wheelSize:時(shí)間輪中時(shí)間槽的數(shù)量。
  • timeWheel[]:時(shí)間輪的數(shù)組,每個(gè)元素對(duì)應(yīng)一個(gè)時(shí)間槽(Bucket),用來(lái)存儲(chǔ)延時(shí)任務(wù)。

2.2.1 add()方法

add()方法用于將延時(shí)任務(wù)添加到時(shí)間輪中。它通過(guò)計(jì)算任務(wù)的超時(shí)時(shí)間,確定該任務(wù)應(yīng)該存放在哪個(gè)時(shí)間槽中。計(jì)算方式是根據(jù)當(dāng)前時(shí)間和任務(wù)的超時(shí)時(shí)間,確定需要經(jīng)過(guò)多少個(gè)tick,然后取模得到對(duì)應(yīng)的時(shí)間槽。

long expiration = operation.expirationMs();
long delayMs = expiration - System.currentTimeMillis();
int bucketIndex = (int) ((delayMs / tickMs) % wheelSize);
timeWheel[bucketIndex].add(operation);

這樣,Kafka可以將延時(shí)任務(wù)按超時(shí)時(shí)間分布到不同的時(shí)間槽中,隨著時(shí)間輪的轉(zhuǎn)動(dòng)逐漸觸發(fā)這些任務(wù)。

2.2.2 advanceClock()方法

advanceClock()方法用于推進(jìn)時(shí)間輪的時(shí)鐘。當(dāng)時(shí)間輪的時(shí)鐘前進(jìn)時(shí),會(huì)檢查當(dāng)前時(shí)間槽中的任務(wù),觸發(fā)已經(jīng)到期的任務(wù)。

long currentTimeMs = System.currentTimeMillis();
int currentBucket = (int) ((currentTimeMs - startMs) / tickMs % wheelSize);
timeWheel[currentBucket].advance();

這個(gè)方法會(huì)計(jì)算當(dāng)前的時(shí)間槽索引,并處理當(dāng)前槽中的任務(wù)。Kafka通過(guò)不斷推進(jìn)時(shí)間輪的時(shí)鐘,逐步觸發(fā)延時(shí)任務(wù)的執(zhí)行。

2.2.3 TimerTaskList類

時(shí)間輪中的每個(gè)時(shí)間槽是一個(gè)TimerTaskList對(duì)象,它存儲(chǔ)了當(dāng)前槽中的所有延時(shí)任務(wù)。TimerTaskList類的實(shí)現(xiàn)如下:

public class TimerTaskList {
    private final List<DelayedOperation> tasks = new LinkedList<>();

    // 添加任務(wù)
    public void add(DelayedOperation operation) {
        tasks.add(operation);
    }

    // 觸發(fā)到期任務(wù)
    public void advance() {
        Iterator<DelayedOperation> iterator = tasks.iterator();
        while (iterator.hasNext()) {
            DelayedOperation task = iterator.next();
            if (task.isExpired()) {
                task.run();
                iterator.remove();
            }
        }
    }

    public int size() {
        return tasks.size();
    }
}

TimerTaskList通過(guò)鏈表存儲(chǔ)延時(shí)任務(wù),并在時(shí)鐘推進(jìn)時(shí)檢查任務(wù)是否到期,執(zhí)行到期任務(wù)并將其從列表中移除。

三、Kafka中的延遲處理示例

接下來(lái)我們結(jié)合Kafka的具體場(chǎng)景,來(lái)看一下DelayedOperation是如何被應(yīng)用的。一個(gè)典型的例子就是消費(fèi)者組協(xié)調(diào)器(GroupCoordinator)中的延遲處理。

3.1 消費(fèi)者組協(xié)調(diào)器中的延遲請(qǐng)求

在Kafka的消費(fèi)者組管理中,延遲請(qǐng)求被廣泛應(yīng)用。比如,當(dāng)一個(gè)消費(fèi)者加入或離開(kāi)消費(fèi)者組時(shí),協(xié)調(diào)器需要等待一段時(shí)間,直到確定沒(méi)有其他消費(fèi)者的變更請(qǐng)求,這時(shí)就需要使用延遲操作來(lái)處理請(qǐng)求。

在GroupCoordinator中,有一個(gè)completeJoinGroupRequest()方法,它通過(guò)延遲操作來(lái)管理消費(fèi)者加入組的請(qǐng)求:

public void completeJoinGroupRequest(String groupId, int memberId, long timeoutMs) {
    DelayedJoinGroup delayedJoin = new DelayedJoinGroup(groupId, memberId, timeoutMs);
    this.timer.add(delayedJoin);
}

這里DelayedJoinGroup是`

DelayedOperation的一個(gè)子類,用來(lái)處理消費(fèi)者加入組的邏輯。它會(huì)被添加到timer`中,并在超時(shí)后觸發(fā)執(zhí)行。

3.2 DelayedOperation類

DelayedOperation是Kafka中所有延遲任務(wù)的基類,定義了延遲任務(wù)的基本行為。它的核心方法如下:

public abstract class DelayedOperation {

    private final long deadlineMs;

    public DelayedOperation(long timeoutMs) {
        this.deadlineMs = System.currentTimeMillis() + timeoutMs;
    }

    // 檢查任務(wù)是否超時(shí)
    public boolean isExpired() {
        return System.currentTimeMillis() >= deadlineMs;
    }

    // 執(zhí)行任務(wù)
    public abstract void run();
}

DelayedOperation通過(guò)isExpired()方法判斷任務(wù)是否超時(shí),并通過(guò)run()方法執(zhí)行任務(wù)。Kafka中很多延時(shí)任務(wù)都是基于這個(gè)類實(shí)現(xiàn)的。

四、總結(jié)

通過(guò)分析Kafka中的Timer和SystemTimer類,我們深入了解了Kafka如何通過(guò)分層時(shí)間輪實(shí)現(xiàn)高效的延時(shí)任務(wù)調(diào)度機(jī)制。Kafka的延時(shí)處理不僅應(yīng)用于消費(fèi)者組協(xié)調(diào)器,還廣泛用于副本管理、控制器等模塊。

延時(shí)處理機(jī)制通過(guò)將任務(wù)分層存儲(chǔ),極大地提高了Kafka處理大量延時(shí)任務(wù)的性能。這種機(jī)制的設(shè)計(jì)既簡(jiǎn)潔又高效,適用于大規(guī)模分布式系統(tǒng)的延時(shí)任務(wù)處理需求。

責(zé)任編輯:武曉燕 來(lái)源: 架構(gòu)師秋天
相關(guān)推薦

2022-07-01 07:31:18

AhooksDOM場(chǎng)景

2023-10-04 07:35:03

2023-09-19 22:41:30

控制器HTTP

2018-06-24 08:53:42

Tomcat理搜索引擎爬蟲(chóng)

2021-01-18 05:13:04

TomcatHttp

2022-08-13 12:13:13

RTOS延時(shí)代碼

2021-01-21 09:09:18

時(shí)區(qū)轉(zhuǎn)換程序

2021-06-17 09:32:39

重復(fù)請(qǐng)求并發(fā)請(qǐng)求Java

2022-06-13 11:05:35

RocketMQ消費(fèi)者線程

2017-08-11 14:28:02

58同城推薦系統(tǒng)

2023-08-07 08:32:05

RocketMQ名字服務(wù)

2021-07-27 14:50:15

axiosHTTP前端

2022-07-04 09:15:10

Spring請(qǐng)求處理流程

2020-11-11 14:19:17

隱私APP設(shè)計(jì)

2019-11-27 11:10:58

TomcatOverviewAcceptor

2018-10-22 13:23:29

MySQL主從延時(shí)線程

2025-07-14 01:00:00

Json排序MD5

2011-05-06 15:54:47

Service BroSQL Server

2021-08-06 11:24:35

域名劫持網(wǎng)站安全網(wǎng)絡(luò)攻擊

2009-07-08 13:31:23

調(diào)用Servlet處理
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)