• 正文
    • 一、關(guān)于 Disruptor
    • 二、實(shí)戰(zhàn)案例
    • 三、總結(jié)
  • 相關(guān)推薦
申請(qǐng)入駐 產(chǎn)業(yè)圖譜

技術(shù)亮點(diǎn):Disruptor 高性能環(huán)形消息隊(duì)列應(yīng)用,Log4j 2 也用到了這套技術(shù)

2024/10/31
2926
加入交流群
掃碼加入
獲取工程師必備禮包
參與熱點(diǎn)資訊討論

作者:小傅哥,博客:https://bugstack.cn

大家好,我是技術(shù)UP主小傅哥。

說(shuō)到底,無(wú)論是晉升述職還是面試考察,編程技能的展現(xiàn)總是在那些技術(shù)的橫向?qū)Ρ群蜕疃鹊牧私膺\(yùn)用。知其一,也知其二。一個(gè)場(chǎng)景的問(wèn)題,往往也會(huì)對(duì)應(yīng)著多種的解決方案,從沒(méi)有絕對(duì)的好和不好,都是是否適合而已。所以,往往技術(shù)越好的,也越低調(diào),不那么咋咋呼呼的。

什么是柔性事務(wù)?

在分布式軟件系統(tǒng)架構(gòu)設(shè)計(jì)中,所有的并發(fā)資源的競(jìng)爭(zhēng),都會(huì)往無(wú)鎖化非獨(dú)占競(jìng)爭(zhēng),以及柔性事務(wù)設(shè)計(jì)。柔性事務(wù)用于替代傳統(tǒng)事務(wù)管理中(如ACID屬性:原子性、一致性、隔離性、持久性),在分布式架構(gòu)系統(tǒng)中的使用場(chǎng)景。通過(guò)消息、補(bǔ)償,協(xié)調(diào)不同服務(wù)間的一致性。

那么在消息的使用中,除了有 MQ 消息,使用于微服務(wù)之間。還有本地消息,可以作用在各個(gè)領(lǐng)域間驅(qū)動(dòng)流程。關(guān)于本地消息可以用,Spring 的監(jiān)聽(tīng)、Redis 發(fā)布訂閱、Guava EventBus 事件總線(xiàn),這些內(nèi)容在小傅哥博客 bugstack.cn 《路書(shū)》中有相關(guān)的案例。之后本節(jié)咱們介紹一個(gè)新的高性能組件 Disruptor 的使用。

一、關(guān)于 Disruptor

Disruptor 是一種高性能的并發(fā)框架,最初由 LMAX 開(kāi)發(fā),用于解決高吞吐量、低延遲的消息處理問(wèn)題。它提供了一種無(wú)鎖的、有序的事件處理模型,非常適合處理需要高性能的場(chǎng)景。Disruptor 本身并不是用于實(shí)現(xiàn)事務(wù)的框架,而是一個(gè)事件處理器。因此,要在 Disruptor 上實(shí)現(xiàn)柔性事務(wù),需要結(jié)合其事件處理能力與柔性事務(wù)的模式。

    源碼:https://github.com/LMAX-Exchange/disruptor文檔:https://lmax-exchange.github.io/disruptor/ - 谷歌瀏覽器右鍵點(diǎn)翻譯為中文。

二、實(shí)戰(zhàn)案例

1. 工程結(jié)構(gòu)

小傅哥準(zhǔn)備好了一份基于 Disruptor 事件消息的使用案例工程,你可以直接上手體現(xiàn)。

    app 是使用的啟動(dòng)層、trigger 是提供接口、監(jiān)聽(tīng)消息、處理任務(wù)的觸發(fā)器層。在這里我們通過(guò) trigger 下的 event 包,監(jiān)聽(tīng)事件消息。之后把這個(gè) XxxEventHandler 讓 app 層下的 Disruptor 進(jìn)行實(shí)例化。

2. 引入POM

<!-- https://mvnrepository.com/artifact/com.lmax/disruptor -->
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>
    引入 disruptor pom 包。

3. 監(jiān)聽(tīng)消息

@Slf4j
public?class?XxxEventHandler?implements?EventHandler<XxxEventHandler.Message>?{

????@Override
????public?void?onEvent(Message?longEvent,?long?l,?boolean?b)?throws?Exception?{
????????log.info("接收消息:{}",?longEvent.getValue());
????}

????@Data
????public?static?class?Message?{
????????private?String?value;
????}

}
    在 trigger 下 event 包內(nèi),加一個(gè)實(shí)現(xiàn)了 disruptor EventHandler 的監(jiān)聽(tīng)實(shí)現(xiàn)類(lèi),消息體類(lèi)型我們定義到 XxxEventHandler 中,也就是 Message。具體生產(chǎn)使用的時(shí)候,按需調(diào)整。這個(gè)接收消息的過(guò)程和使用 MQ 的方式是一樣的。

4. 實(shí)例化監(jiān)聽(tīng)

@Configuration
public?class?DisruptorConfig?{

????private?final?ExecutorService?executor?=?Executors.newCachedThreadPool();

????@Bean("xxxEventDisruptor")
????public?Disruptor<XxxEventHandler.Message>?disruptor()?{
????????//?環(huán)形隊(duì)列的大小,注意要是2的冪
????????int?bufferSize?=?1024;

????????//?創(chuàng)建Disruptor
????????Disruptor<XxxEventHandler.Message>?disruptor?=?new?Disruptor<>(XxxEventHandler.Message::new,?bufferSize,?executor);

????????//?連接事件處理器
????????disruptor.handleEventsWith(new?XxxEventHandler());

????????//?開(kāi)始Disruptor
????????disruptor.start();

????????return?disruptor;
????}

}
    在 App 模塊下,有一個(gè) config 專(zhuān)門(mén)的配置類(lèi),在這里配置下消息監(jiān)聽(tīng)。這個(gè)過(guò)程和我們之前使用的 Redis 發(fā)布訂閱是一樣的。

5. 推送消息(Test)

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes?=?Application.class)
public?class?DisruptorTest?{

????@Resource
????private?Disruptor<XxxEventHandler.Message>?xxxEventDisruptor;

????@Test
????public?void?test_publishEvent()?throws?InterruptedException?{

????????for?(int?i?=?0;?i?<?10;?i++)?{
????????????xxxEventDisruptor.publishEvent((event,?sequence)?->?event.setValue("你好,我是?Disruptor?Message"));
????????}

????????//?暫停?-?測(cè)試完手動(dòng)關(guān)閉程序
????????new?CountDownLatch(1).await();
????}

}
24-10-26.11:55:55.827?[main????????????]?INFO??DisruptorTest??????????-?Starting?DisruptorTest?using?Java?1.8.0_311?on?MacBook-Pro.local?with?PID?92827?(started?by?fuzhengwei?in?/Users/fuzhengwei/1024/KnowledgePlanet/road-map/xfg-dev-tech-disruptor/xfg-dev-tech-app)
24-10-26.11:55:55.829?[main????????????]?INFO??DisruptorTest??????????-?The?following?1?profile?is?active:?"dev"
24-10-26.11:55:57.749?[main????????????]?INFO??DisruptorTest??????????-?Started?DisruptorTest?in?2.526?seconds?(JVM?running?for?3.741)
24-10-26.11:55:58.125?[pool-2-thread-1?] INFO  XxxEventHandler ???????-?接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128?[pool-2-thread-1?] INFO  XxxEventHandler ???????-?接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128?[pool-2-thread-1?] INFO  XxxEventHandler ???????-?接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128?[pool-2-thread-1?] INFO  XxxEventHandler ???????-?接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128?[pool-2-thread-1?] INFO  XxxEventHandler ???????-?接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128?[pool-2-thread-1?] INFO  XxxEventHandler ???????-?接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128?[pool-2-thread-1?] INFO  XxxEventHandler ???????-?接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128?[pool-2-thread-1?] INFO  XxxEventHandler ???????-?接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128?[pool-2-thread-1?] INFO  XxxEventHandler ???????-?接收消息:你好,我是 Disruptor Message
24-10-26.11:55:58.128?[pool-2-thread-1?] INFO  XxxEventHandler ???????-?接收消息:你好,我是 Disruptor Message
    提供一個(gè)單測(cè)來(lái)測(cè)試消息推送,這樣你就可以監(jiān)聽(tīng)到消息了。

三、總結(jié)

在美團(tuán)、京東、阿里,等各個(gè)大廠(chǎng)中都有很多這樣的組件使用,在美團(tuán)發(fā)布過(guò)的文章中《高性能隊(duì)列——Disruptor》 還有一個(gè)對(duì)應(yīng)的壓測(cè)數(shù)據(jù)。CPU:Intel Core i7-2720QM,JVM:Java 1.6.0_25 64-bit,OS:Ubuntu 11.04

- ABQ Disruptor
Unicast: 1P – 1C 4,057,453 22,381,378
Pipeline: 1P – 3C 2,006,903 15,857,913
Sequencer: 3P – 1C 2,056,118 14,540,519
Multicast: 1P – 3C 260,733 10,860,121
Diamond: 1P – 3C 2,082,725 15,295,197
    依據(jù)并發(fā)競(jìng)爭(zhēng)的激烈程度的不同,Disruptor比ArrayBlockingQueue吞吐量快4~7倍。

另外,Log4j 2 采用了 Disruptor(一種無(wú)鎖的線(xiàn)程間通信庫(kù)),提高吞吐量降低延遲。在生產(chǎn)使用中,大并發(fā)的系統(tǒng)注意 Log4j 版本。官網(wǎng)說(shuō)明:https://logging.apache.org/log4j/2.12.x/manual/async.html

    異步 Logger是 Log4j 2 中的新增功能。其目的是盡快從對(duì) Logger.log 的調(diào)用返回到應(yīng)用程序。您可以選擇使所有 Logger 異步,或使用同步和異步 Logger 的混合。使所有 Logger 異步將提供最佳性能,而混合使用則可為您提供更大的靈活性。LMAX Disruptor 技術(shù)。異步記錄器內(nèi)部使用 Disruptor(一種無(wú)鎖的線(xiàn)程間通信庫(kù))而不是隊(duì)列,從而實(shí)現(xiàn)更高的吞吐量和更低的延遲。作為異步日志記錄器工作的一部分,異步附加器已得到增強(qiáng),可以在批處理結(jié)束時(shí)(當(dāng)隊(duì)列為空時(shí))刷新到磁盤(pán)。這會(huì)產(chǎn)生與配置“immediateFlush=true”相同的結(jié)果,即所有收到的日志事件始終在磁盤(pán)上可用,但效率更高,因?yàn)樗恍枰诿總€(gè)日志事件上都接觸磁盤(pán)。(異步附加器在內(nèi)部使用 ArrayBlockingQueue,不需要類(lèi)路徑上的 Disruptor jar。)

四、技術(shù)實(shí)戰(zhàn)

加入小傅哥的星球「碼農(nóng)會(huì)鎖」閱讀450+份簡(jiǎn)歷和評(píng)審,學(xué)習(xí)6個(gè)業(yè)務(wù)項(xiàng)目;MVC+DDD,雙架構(gòu)開(kāi)發(fā)小型電商大營(yíng)銷(xiāo)(超級(jí)大課)、OpenAI 大模型應(yīng)用Lottery、IMAI 問(wèn)答助手。7個(gè)組件項(xiàng)目;OpenAI 代碼評(píng)審、BCP 透視業(yè)務(wù)監(jiān)控動(dòng)態(tài)線(xiàn)程池、支付SDK設(shè)計(jì)和開(kāi)發(fā)API網(wǎng)關(guān)、SpringBoot StarterIDEA Plugin 插件開(kāi)發(fā)。1套源碼課程、1套基礎(chǔ)教程、1到云服務(wù)器教程以及各類(lèi)場(chǎng)景解決方案。

如果大家希望通過(guò)做有價(jià)值的編程項(xiàng)目,提高自己的編程思維和編碼能力,可以加入小傅哥的【星球:碼農(nóng)會(huì)鎖】。加入后解鎖所有往期項(xiàng)目,還可以學(xué)習(xí)后續(xù)新開(kāi)發(fā)的項(xiàng)目。

??加入學(xué)習(xí)這樣一套項(xiàng)目,放在一些平臺(tái)售賣(mài),一個(gè)至少都是上千塊。但小傅哥的星球,只需要100多,就可以獲得全部的學(xué)習(xí)項(xiàng)目!重要的是學(xué)習(xí)小傅哥的經(jīng)驗(yàn)!

相關(guān)推薦

登錄即可解鎖
  • 海量技術(shù)文章
  • 設(shè)計(jì)資源下載
  • 產(chǎn)業(yè)鏈客戶(hù)資源
  • 寫(xiě)文章/發(fā)需求
立即登錄

作者小傅哥多年從事一線(xiàn)互聯(lián)網(wǎng)Java開(kāi)發(fā),從19年開(kāi)始編寫(xiě)工作和學(xué)習(xí)歷程的技術(shù)匯總,旨在為大家提供一個(gè)較清晰詳細(xì)的核心技能學(xué)習(xí)文檔。如果本文能為您提供幫助,請(qǐng)給予支持(關(guān)注、點(diǎn)贊、分享)!