偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Disruptor 有哪些典型的使用場(chǎng)景?

開(kāi)發(fā) 前端
通過(guò)消費(fèi)者的靈活組合,Disruptor 的使用場(chǎng)景非常豐富。本文介紹了 Disruptor 的 5 個(gè)典型使用場(chǎng)景。在選型的時(shí)候,除了使用場(chǎng)景,更多地要考慮到 Disruptor 作為高性能內(nèi)存隊(duì)列的這個(gè)特點(diǎn)。

大家好,我是君哥。

Disruptor 是一款高性能的內(nèi)存有界隊(duì)列,它通過(guò)內(nèi)存預(yù)分配、無(wú)鎖并發(fā)、解決偽共享問(wèn)題、使用 RingBuffer 取代阻塞隊(duì)列等措施來(lái)大幅提升隊(duì)列性能。

但開(kāi)發(fā)者們往往對(duì)它的使用場(chǎng)景不太了解,到底應(yīng)該在哪些場(chǎng)景使用呢?今天咱們就來(lái)聊一聊 Disruptor 的使用場(chǎng)景。

Disruptor 是一個(gè)生產(chǎn)-消費(fèi)模式的隊(duì)列,這里我們使用官網(wǎng)的示例,生產(chǎn)者發(fā)送一個(gè) long 類型的變量,消費(fèi)者收到消息后把變量打印出來(lái)。首先定義消息體:

public class LongEvent {
    private long value;
    public void set(long value)
    {
        this.value = value;
    }

    @Override
    public String toString()
    {
        return "LongEvent{" + "value=" + value + '}';
    }
}

為了讓 Disruptor 給消息預(yù)先分配內(nèi)存,定義一個(gè) EventFactory,代碼如下:

public class LongEventFactory implements EventFactory<LongEvent>
{
    @Override
    public LongEvent newInstance()
    {
        return new LongEvent();
    }
}

下面定義個(gè)消費(fèi)者 LongEventHandler:

public class LongEventHandler implements EventHandler<LongEvent>
{
    private String consumer;

    public LongEventHandler(String consumer) {
        this.consumer = consumer;
    }

    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println("consumer: " + consumer + ",Event: " + event);
    }
}

1.廣播場(chǎng)景

廣播場(chǎng)景在我們的開(kāi)發(fā)工作中并不少見(jiàn),比如系統(tǒng)收到上游系統(tǒng)的一個(gè)請(qǐng)求消息,然后把這個(gè)消息發(fā)送給多個(gè)下游系統(tǒng)來(lái)處理。Disruptor 支持廣播模式。比如消費(fèi)者生產(chǎn)的消息由三個(gè)消費(fèi)者來(lái)消費(fèi):

public class Broadcast {
    public static void main(String[] args) throws InterruptedException {
        int bufferSize = 1024;

        Disruptor<LongEvent> disruptor =
                new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");
        EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");
        EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");

        disruptor.handleEventsWith(consumer1, consumer2, consumer3);
        disruptor.start();

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
            Thread.sleep(1000);
        }
    }
}

2.日志收集

再來(lái)看一個(gè)日志收集的例子。這里我們假設(shè)一個(gè)場(chǎng)景,業(yè)務(wù)系統(tǒng)集群有 3 個(gè)節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)打印的業(yè)務(wù)日志發(fā)送到 Disruptor,Disruptor 下游有 3 個(gè)消費(fèi)者負(fù)責(zé)日志收集。

這里我們需要重新定義一個(gè)日志收集處理類,代碼如下:

public class LogCollectHandler implements WorkHandler<LongEvent> {
    public LogCollectHandler(String consumer) {
        this.consumer = consumer;
    }

    private String consumer;


    @Override
    public void onEvent(LongEvent event)
    {
        System.out.println("consumer: " + consumer + ",Event: " + event);
    }
}

下面這個(gè)代碼是綁定消費(fèi)者的代碼:

public static void main(String[] args) throws InterruptedException {
 int bufferSize = 1024;

 Disruptor<LongEvent> disruptor =
   new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

 WorkHandler<LongEvent> consumer1 = new LogCollectHandler("consumer1");
 WorkHandler<LongEvent> consumer2 = new LogCollectHandler("consumer2");
 WorkHandler<LongEvent> consumer3 = new LogCollectHandler("consumer3");

 disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3);
 disruptor.start();
}

需要注意的是,上面使用的是 Disruptor 的 handleEventsWithWorkerPool 方法,使用的消費(fèi)者不是 EventHandler,而是 WorkHandler。消費(fèi)者組里面的消費(fèi)者如果是 WorkHandler,那消費(fèi)者之間就是有競(jìng)爭(zhēng)的,比如一個(gè) Event 已經(jīng)被 consumer1 消費(fèi)過(guò),那就不再會(huì)被其他消費(fèi)者消費(fèi)了。消費(fèi)者組里面的消費(fèi)者如果是 EventHandler,那消費(fèi)者之間是沒(méi)有競(jìng)爭(zhēng)的,所有消息都會(huì)消費(fèi)。

3.責(zé)任鏈

責(zé)任鏈這種設(shè)計(jì)模式我們都比較熟悉了,同一個(gè)對(duì)象的處理有多個(gè)不同的邏輯,每個(gè)邏輯作為一個(gè)節(jié)點(diǎn)組成責(zé)任鏈,比如收到一條告警消息,處理節(jié)點(diǎn)分為:給開(kāi)發(fā)人員發(fā)送郵件、給運(yùn)維人員發(fā)送短信、給業(yè)務(wù)人員發(fā)送 OA 消息。

Disruptor 支持鏈?zhǔn)教幚硐ⅲ聪旅娴氖纠a:

public static void main(String[] args) throws InterruptedException {
 int bufferSize = 1024;

 Disruptor<LongEvent> disruptor =
   new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

 EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");
 EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");
 EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");

 disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3);
 disruptor.start();
}

Disruptor 也支持多個(gè)并行責(zé)任鏈,下圖是 2 條責(zé)任鏈的場(chǎng)景:

這里給出一個(gè)示例代碼:

public static void main(String[] args) throws InterruptedException {
 int bufferSize = 1024;

 Disruptor<LongEvent> disruptor =
   new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

 EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");
 EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");
 EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");
 EventHandler<LongEvent> consumer4 = new LongEventHandler("consumer4");
 EventHandler<LongEvent> consumer5 = new LongEventHandler("consumer5");
 EventHandler<LongEvent> consumer6 = new LongEventHandler("consumer6");

 disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3);
 disruptor.handleEventsWith(consumer4).then(consumer5).then(consumer6);
 disruptor.start();
}

4.多任務(wù)協(xié)作

一個(gè)經(jīng)典的例子,我們?cè)谂菘Х戎?,需要燒水、洗被子、磨咖啡粉,這三個(gè)步驟可以并行,但是需要等著三步都完成之后,才可以泡咖啡。

當(dāng)然,這個(gè)例子可以用 Java 中的 CompletableFuture 來(lái)實(shí)現(xiàn),代碼如下:

public static void main(String[] args){
    ExecutorService executor = ...;
    CompletableFuture future1 = CompletableFuture.runAsync(() -> {
        try {
            washCup();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, executor);

    CompletableFuture future2 = CompletableFuture.runAsync(() -> {
        try {
            hotWater();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, executor);

    CompletableFuture future3 = CompletableFuture.runAsync(() -> {
        try {
            grindCoffee();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, executor);

    CompletableFuture.allOf(future1, future2, future3).thenAccept(
            r -> {
                System.out.println("泡咖啡");
            }
    );
    System.out.println("我是主線程");
}

同樣,使用 Disruptor 也可以實(shí)現(xiàn)這個(gè)場(chǎng)景,看下面代碼:

public static void main(String[] args) throws InterruptedException {
 int bufferSize = 1024;

 Disruptor<LongEvent> disruptor =
   new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

 EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");
 EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");
 EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");
 EventHandler<LongEvent> consumer4 = new LongEventHandler("consumer4");

 disruptor.handleEventsWith(consumer1, consumer2, consumer3).then(consumer4);
 disruptor.start();
}

5.多消費(fèi)者組

類比主流消息隊(duì)列的場(chǎng)景,Disruptor 也可以實(shí)現(xiàn)多消費(fèi)者組的場(chǎng)景,組間并行消費(fèi)互不影響,組內(nèi)消費(fèi)者競(jìng)爭(zhēng)消息,如下圖:

示例代碼如下:

public static void main(String[] args) throws InterruptedException {
 int bufferSize = 1024;

 Disruptor<LongEvent> disruptor =
   new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

 WorkHandler<LongEvent> consumer1 = new LogWorkHandler("consumer1");
 WorkHandler<LongEvent> consumer2 = new LogWorkHandler("consumer2");
 WorkHandler<LongEvent> consumer3 = new LogWorkHandler("consumer3");
 WorkHandler<LongEvent> consumer4 = new LogWorkHandler("consumer4");
 WorkHandler<LongEvent> consumer5 = new LogWorkHandler("consumer5");
 WorkHandler<LongEvent> consumer6 = new LogWorkHandler("consumer6");

 disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3);
 disruptor.handleEventsWithWorkerPool(consumer4, consumer5, consumer6);
 disruptor.start();
}

6.總結(jié)

通過(guò)消費(fèi)者的靈活組合,Disruptor 的使用場(chǎng)景非常豐富。本文介紹了 Disruptor 的 5 個(gè)典型使用場(chǎng)景。在選型的時(shí)候,除了使用場(chǎng)景,更多地要考慮到 Disruptor 作為高性能內(nèi)存隊(duì)列的這個(gè)特點(diǎn)。

責(zé)任編輯:姜華 來(lái)源: 君哥聊技術(shù)
相關(guān)推薦

2023-04-03 11:01:26

低代碼平臺(tái)場(chǎng)景

2022-10-17 00:27:20

二叉樹(shù)數(shù)組索引

2025-02-11 09:49:12

2021-03-16 06:47:47

Python

2020-11-20 10:53:46

邊緣計(jì)算

2023-01-30 11:27:57

人工智能高性能計(jì)算CPU

2020-02-25 22:08:02

ZooKeeper典型應(yīng)用場(chǎng)景

2015-08-04 15:21:17

SDN公有云軟件定義網(wǎng)絡(luò)

2024-12-30 08:32:36

2023-12-29 10:28:24

SPIJava靈活性

2025-01-15 07:54:02

2024-01-03 10:32:36

2022-07-24 21:56:38

元宇宙

2020-10-16 09:09:20

機(jī)器學(xué)習(xí)銀行技術(shù)

2015-10-09 10:12:23

ZooKeeper

2023-05-16 07:47:18

RabbitMQ消息隊(duì)列系統(tǒng)

2024-05-29 14:34:07

2022-12-08 10:40:06

聲明式事務(wù)AOP

2013-07-27 20:11:27

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)