日本黄色一级经典视频|伊人久久精品视频|亚洲黄色色周成人视频九九九|av免费网址黄色小短片|黄色Av无码亚洲成年人|亚洲1区2区3区无码|真人黄片免费观看|无码一级小说欧美日免费三级|日韩中文字幕91在线看|精品久久久无码中文字幕边打电话

當(dāng)前位置:首頁(yè) > 單片機(jī) > 架構(gòu)師社區(qū)
[導(dǎo)讀]最近部門(mén)號(hào)召大伙多組織一些技術(shù)分享會(huì),說(shuō)是要活躍公司的技術(shù)氛圍,但早就看穿一切的我知道,這 T M 就是為了刷KPI。不過(guò),話說(shuō)回來(lái)這的確是件好事,與其開(kāi)那些沒(méi)味的扯皮會(huì),多做技術(shù)交流還是很有助于個(gè)人成長(zhǎng)的。 于是乎我主動(dòng)報(bào)名參加了分享,咳咳咳~ ,

最近部門(mén)號(hào)召大伙多組織一些技術(shù)分享會(huì),說(shuō)是要活躍公司的技術(shù)氛圍,但早就看穿一切的我知道,這 T M 就是為了刷KPI。不過(guò),話說(shuō)回來(lái)這的確是件好事,與其開(kāi)那些沒(méi)味的扯皮會(huì),多做技術(shù)交流還是很有助于個(gè)人成長(zhǎng)的。

于是乎我主動(dòng)報(bào)名參加了分享,咳咳咳~ ,真的不是為了那點(diǎn)KPI,就是想和大伙一起學(xué)習(xí)學(xué)習(xí)!

springboot + rabbitmq 用了消息確認(rèn)機(jī)制,感覺(jué)掉坑里了
在這里插入圖片描述

這次我分享的是 springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制,以及在實(shí)際開(kāi)發(fā)中的一點(diǎn)踩坑經(jīng)驗(yàn),其實(shí)整體的內(nèi)容比較簡(jiǎn)單,有時(shí)候事情就是這么神奇,越是簡(jiǎn)單的東西就越容易出錯(cuò)。

可以看到使用了 RabbitMQ 以后,我們的業(yè)務(wù)鏈路明顯變長(zhǎng)了,雖然做到了系統(tǒng)間的解耦,但可能造成消息丟失的場(chǎng)景也增加了。例如:

  • 消息生產(chǎn)者 - > rabbitmq服務(wù)器(消息發(fā)送失敗)

  • rabbitmq服務(wù)器自身故障導(dǎo)致消息丟失

  • 消息消費(fèi)者 - > rabbitmq服務(wù)(消費(fèi)消息失?。?/p>

springboot + rabbitmq 用了消息確認(rèn)機(jī)制,感覺(jué)掉坑里了所以說(shuō)能不使用中間件就盡量不要用,如果為了用而用只會(huì)徒增煩惱。開(kāi)啟消息確認(rèn)機(jī)制以后,盡管很大程度上保證了消息的準(zhǔn)確送達(dá),但由于頻繁的確認(rèn)交互,rabbitmq 整體效率變低,吞吐量下降嚴(yán)重,不是非常重要的消息真心不建議你用消息確認(rèn)機(jī)制。


下邊我們先來(lái)實(shí)現(xiàn)springboot + rabbitmq消息確認(rèn)機(jī)制,再對(duì)遇到的問(wèn)題做具體分析。

一、準(zhǔn)備環(huán)境

1、引入 rabbitmq 依賴包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、修改 application.properties 配置

配置中需要開(kāi)啟 發(fā)送端消費(fèi)端 的消息確認(rèn)。

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# 發(fā)送者開(kāi)啟 confirm 確認(rèn)機(jī)制
spring.rabbitmq.publisher-confirms=true
# 發(fā)送者開(kāi)啟 return 確認(rèn)機(jī)制
spring.rabbitmq.publisher-returns=true
####################################################
# 設(shè)置消費(fèi)端手動(dòng) ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重試
spring.rabbitmq.listener.simple.retry.enabled=true

3、定義 Exchange 和 Queue

定義交換機(jī) confirmTestExchange 和隊(duì)列 confirm_test_queue ,并將隊(duì)列綁定在交換機(jī)上。

@Configuration
public class QueueConfig {

    @Bean(name = "confirmTestQueue")
    public Queue confirmTestQueue() {
        return new Queue("confirm_test_queue"truefalsefalse);
    }

    @Bean(name = "confirmTestExchange")
    public FanoutExchange confirmTestExchange() {
        return new FanoutExchange("confirmTestExchange");
    }

    @Bean
    public Binding confirmTestFanoutExchangeAndQueue(
            @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
            @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
        return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
    }
}

rabbitmq 的消息確認(rèn)分為兩部分:發(fā)送消息確認(rèn) 和 消息接收確認(rèn)。

springboot + rabbitmq 用了消息確認(rèn)機(jī)制,感覺(jué)掉坑里了
在這里插入圖片描述

二、消息發(fā)送確認(rèn)

發(fā)送消息確認(rèn):用來(lái)確認(rèn)生產(chǎn)者 producer 將消息發(fā)送到 broker ,broker 上的交換機(jī) exchange 再投遞給隊(duì)列 queue的過(guò)程中,消息是否成功投遞。

消息從 producerrabbitmq broker有一個(gè) confirmCallback 確認(rèn)模式。

消息從 exchangequeue 投遞失敗有一個(gè) returnCallback 退回模式。

我們可以利用這兩個(gè)Callback來(lái)確保消的100%送達(dá)。

1、 ConfirmCallback確認(rèn)模式

消息只要被 rabbitmq broker 接收到就會(huì)觸發(fā) confirmCallback 回調(diào) 。

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        if (!ack) {
            log.error("消息發(fā)送異常!");
        } else {
            log.info("發(fā)送者爸爸已經(jīng)收到確認(rèn),correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }
    }
}

實(shí)現(xiàn)接口 ConfirmCallback ,重寫(xiě)其confirm()方法,方法內(nèi)有三個(gè)參數(shù)correlationData、ack、cause。

  • correlationData:對(duì)象內(nèi)部只有一個(gè) id 屬性,用來(lái)表示當(dāng)前消息的唯一性。
  • ack:消息投遞到 broker 的狀態(tài), true表示成功。
  • cause:表示投遞失敗的原因。

但消息被 broker 接收到只能表示已經(jīng)到達(dá) MQ服務(wù)器,并不能保證消息一定會(huì)被投遞到目標(biāo) queue 里。所以接下來(lái)需要用到 returnCallback 。

2、 ReturnCallback 退回模式

如果消息未能投遞到目標(biāo) queue 里將觸發(fā)回調(diào) returnCallback ,一旦向 queue 投遞消息未成功,這里一般會(huì)記錄下當(dāng)前消息的詳細(xì)投遞數(shù)據(jù),方便后續(xù)做重發(fā)或者補(bǔ)償?shù)炔僮鳌?/p>

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
    }
}

實(shí)現(xiàn)接口ReturnCallback,重寫(xiě) returnedMessage() 方法,方法有五個(gè)參數(shù)message(消息體)、replyCode(響應(yīng)code)、replyText(響應(yīng)內(nèi)容)、exchange(交換機(jī))、routingKey(隊(duì)列)。

下邊是具體的消息發(fā)送,在rabbitTemplate中設(shè)置 ConfirmReturn 回調(diào),我們通過(guò)setDeliveryMode()對(duì)消息做持久化處理,為了后續(xù)測(cè)試創(chuàng)建一個(gè) CorrelationData對(duì)象,添加一個(gè)id10000000000。

@Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ConfirmCallbackService confirmCallbackService;

    @Autowired
    private ReturnCallbackService returnCallbackService;

    public void sendMessage(String exchange, String routingKey, Object msg) {

        /**
         * 確保消息發(fā)送失敗后可以重新返回到隊(duì)列中
         * 注意:yml需要配置 publisher-returns: true
         */

        rabbitTemplate.setMandatory(true);

        /**
         * 消費(fèi)者確認(rèn)收到消息后,手動(dòng)ack回執(zhí)回調(diào)處理
         */

        rabbitTemplate.setConfirmCallback(confirmCallbackService);

        /**
         * 消息投遞到隊(duì)列失敗回調(diào)處理
         */

        rabbitTemplate.setReturnCallback(returnCallbackService);

        /**
         * 發(fā)送消息
         */

        rabbitTemplate.convertAndSend(exchange, routingKey, msg,
                message -> {
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                },
                new CorrelationData(UUID.randomUUID().toString()));
    }

三、消息接收確認(rèn)

消息接收確認(rèn)要比消息發(fā)送確認(rèn)簡(jiǎn)單一點(diǎn),因?yàn)橹挥幸粋€(gè)消息回執(zhí)(ack)的過(guò)程。使用@RabbitHandler注解標(biāo)注的方法要增加 channel(信道)、message 兩個(gè)參數(shù)。

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
    
    @RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("小富收到消息:{}", msg);

            //TODO 具體業(yè)務(wù)
            
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }  catch (Exception e) {
            
            if (message.getMessageProperties().getRedelivered()) {
                
                log.error("消息已重復(fù)處理失敗,拒絕再次接收...");
                
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息
            } else {
                
                log.error("消息即將再次返回隊(duì)列處理...");
                
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsetrue); 
            }
        }
    }
}

消費(fèi)消息有三種回執(zhí)方法,我們來(lái)分析一下每種方法的含義。

1、basicAck

basicAck:表示成功確認(rèn),使用此回執(zhí)方法后,消息會(huì)被rabbitmq broker 刪除。

void basicAck(long deliveryTag, boolean multiple) 

deliveryTag:表示消息投遞序號(hào),每次消費(fèi)消息或者消息重新投遞后,deliveryTag都會(huì)增加。手動(dòng)消息確認(rèn)模式下,我們可以對(duì)指定deliveryTag的消息進(jìn)行ack、nack、reject等操作。

multiple:是否批量確認(rèn),值為 true 則會(huì)一次性 ack所有小于當(dāng)前消息 deliveryTag 的消息。

舉個(gè)栗子: 假設(shè)我先發(fā)送三條消息deliveryTag分別是5、6、7,可它們都沒(méi)有被確認(rèn),當(dāng)我發(fā)第四條消息此時(shí)deliveryTag為8,multiple設(shè)置為 true,會(huì)將5、6、7、8的消息全部進(jìn)行確認(rèn)。

2、basicNack

basicNack :表示失敗確認(rèn),一般在消費(fèi)消息業(yè)務(wù)異常時(shí)用到此方法,可以將消息重新投遞入隊(duì)列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投遞序號(hào)。

multiple:是否批量確認(rèn)。

requeue:值為 true 消息將重新入隊(duì)列。

3、basicReject

basicReject:拒絕消息,與basicNack區(qū)別在于不能進(jìn)行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投遞序號(hào)。

requeue:值為 true 消息將重新入隊(duì)列。

四、測(cè)試

發(fā)送消息測(cè)試一下消息確認(rèn)機(jī)制是否生效,從執(zhí)行結(jié)果上看發(fā)送者發(fā)消息后成功回調(diào),消費(fèi)端成功的消費(fèi)了消息。springboot + rabbitmq 用了消息確認(rèn)機(jī)制,感覺(jué)掉坑里了用抓包工具Wireshark 觀察一下rabbitmq amqp協(xié)議交互的變化,也多了 ack 的過(guò)程。springboot + rabbitmq 用了消息確認(rèn)機(jī)制,感覺(jué)掉坑里了

五、踩坑日志

1、別忘確認(rèn)消息

這是一個(gè)非常沒(méi)技術(shù)含量的坑,但卻是非常容易犯錯(cuò)的地方。

開(kāi)啟消息確認(rèn)機(jī)制,消費(fèi)消息別忘了channel.basicAck,否則消息會(huì)一直存在,導(dǎo)致重復(fù)消費(fèi)。springboot + rabbitmq 用了消息確認(rèn)機(jī)制,感覺(jué)掉坑里了

2、消息無(wú)限投遞

在我最開(kāi)始接觸消息確認(rèn)機(jī)制的時(shí)候,消費(fèi)端代碼就像下邊這樣寫(xiě)的,思路很簡(jiǎn)單:處理完業(yè)務(wù)邏輯后確認(rèn)消息, int a = 1 / 0 發(fā)生異常后將消息重新投入隊(duì)列。

@RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("消費(fèi)者 2 號(hào)收到:{}", msg);

            int a = 1 / 0;

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        } catch (Exception e) {

            channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsetrue);
        }
    }

但是有個(gè)問(wèn)題是,業(yè)務(wù)代碼一旦出現(xiàn) bug 99.9%的情況是不會(huì)自動(dòng)修復(fù),一條消息會(huì)被無(wú)限投遞進(jìn)隊(duì)列,消費(fèi)端無(wú)限執(zhí)行,導(dǎo)致了死循環(huán)。

springboot + rabbitmq 用了消息確認(rèn)機(jī)制,感覺(jué)掉坑里了
在這里插入圖片描述

本地的CPU被瞬間打滿了,大家可以想象一下當(dāng)時(shí)在生產(chǎn)環(huán)境導(dǎo)致服務(wù)死機(jī),我是有多慌。

springboot + rabbitmq 用了消息確認(rèn)機(jī)制,感覺(jué)掉坑里了而且rabbitmq management 只有一條未被確認(rèn)的消息。

springboot + rabbitmq 用了消息確認(rèn)機(jī)制,感覺(jué)掉坑里了
在這里插入圖片描述

經(jīng)過(guò)測(cè)試分析發(fā)現(xiàn),當(dāng)消息重新投遞到消息隊(duì)列時(shí),這條消息不會(huì)回到隊(duì)列尾部,仍是在隊(duì)列頭部。

消費(fèi)者會(huì)立刻消費(fèi)這條消息,業(yè)務(wù)處理再拋出異常,消息再重新入隊(duì),如此反復(fù)進(jìn)行。導(dǎo)致消息隊(duì)列處理出現(xiàn)阻塞,導(dǎo)致正常消息也無(wú)法運(yùn)行。

而我們當(dāng)時(shí)的解決方案是,先將消息進(jìn)行應(yīng)答,此時(shí)消息隊(duì)列會(huì)刪除該條消息,同時(shí)我們?cè)俅伟l(fā)送該消息到消息隊(duì)列,異常消息就放在了消息隊(duì)列尾部,這樣既保證消息不會(huì)丟失,又保證了正常業(yè)務(wù)的進(jìn)行。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新發(fā)送消息到隊(duì)尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                    JSON.toJSONBytes(msg));

但這種方法并沒(méi)有解決根本問(wèn)題,錯(cuò)誤消息還是會(huì)時(shí)不時(shí)報(bào)錯(cuò),后面優(yōu)化設(shè)置了消息重試次數(shù),達(dá)到了重試上限以后,手動(dòng)確認(rèn),隊(duì)列刪除此消息,并將消息持久化入MySQL并推送報(bào)警,進(jìn)行人工處理和定時(shí)任務(wù)做補(bǔ)償。

3、重復(fù)消費(fèi)

如何保證 MQ 的消費(fèi)是冪等性,這個(gè)需要根據(jù)具體業(yè)務(wù)而定,可以借助MySQL、或者redis 將消息持久化,通過(guò)再消息中的唯一性屬性校驗(yàn)。

特別推薦一個(gè)分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒(méi)關(guān)注的小伙伴,可以長(zhǎng)按關(guān)注一下:

springboot + rabbitmq 用了消息確認(rèn)機(jī)制,感覺(jué)掉坑里了

長(zhǎng)按訂閱更多精彩▼

springboot + rabbitmq 用了消息確認(rèn)機(jī)制,感覺(jué)掉坑里了

如有收獲,點(diǎn)個(gè)在看,誠(chéng)摯感謝

免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。文章僅代表作者個(gè)人觀點(diǎn),不代表本平臺(tái)立場(chǎng),如有問(wèn)題,請(qǐng)聯(lián)系我們,謝謝!

本站聲明: 本文章由作者或相關(guān)機(jī)構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點(diǎn),本站亦不保證或承諾內(nèi)容真實(shí)性等。需要轉(zhuǎn)載請(qǐng)聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請(qǐng)及時(shí)聯(lián)系本站刪除。
換一批
延伸閱讀

LED驅(qū)動(dòng)電源的輸入包括高壓工頻交流(即市電)、低壓直流、高壓直流、低壓高頻交流(如電子變壓器的輸出)等。

關(guān)鍵字: 驅(qū)動(dòng)電源

在工業(yè)自動(dòng)化蓬勃發(fā)展的當(dāng)下,工業(yè)電機(jī)作為核心動(dòng)力設(shè)備,其驅(qū)動(dòng)電源的性能直接關(guān)系到整個(gè)系統(tǒng)的穩(wěn)定性和可靠性。其中,反電動(dòng)勢(shì)抑制與過(guò)流保護(hù)是驅(qū)動(dòng)電源設(shè)計(jì)中至關(guān)重要的兩個(gè)環(huán)節(jié),集成化方案的設(shè)計(jì)成為提升電機(jī)驅(qū)動(dòng)性能的關(guān)鍵。

關(guān)鍵字: 工業(yè)電機(jī) 驅(qū)動(dòng)電源

LED 驅(qū)動(dòng)電源作為 LED 照明系統(tǒng)的 “心臟”,其穩(wěn)定性直接決定了整個(gè)照明設(shè)備的使用壽命。然而,在實(shí)際應(yīng)用中,LED 驅(qū)動(dòng)電源易損壞的問(wèn)題卻十分常見(jiàn),不僅增加了維護(hù)成本,還影響了用戶體驗(yàn)。要解決這一問(wèn)題,需從設(shè)計(jì)、生...

關(guān)鍵字: 驅(qū)動(dòng)電源 照明系統(tǒng) 散熱

根據(jù)LED驅(qū)動(dòng)電源的公式,電感內(nèi)電流波動(dòng)大小和電感值成反比,輸出紋波和輸出電容值成反比。所以加大電感值和輸出電容值可以減小紋波。

關(guān)鍵字: LED 設(shè)計(jì) 驅(qū)動(dòng)電源

電動(dòng)汽車(EV)作為新能源汽車的重要代表,正逐漸成為全球汽車產(chǎn)業(yè)的重要發(fā)展方向。電動(dòng)汽車的核心技術(shù)之一是電機(jī)驅(qū)動(dòng)控制系統(tǒng),而絕緣柵雙極型晶體管(IGBT)作為電機(jī)驅(qū)動(dòng)系統(tǒng)中的關(guān)鍵元件,其性能直接影響到電動(dòng)汽車的動(dòng)力性能和...

關(guān)鍵字: 電動(dòng)汽車 新能源 驅(qū)動(dòng)電源

在現(xiàn)代城市建設(shè)中,街道及停車場(chǎng)照明作為基礎(chǔ)設(shè)施的重要組成部分,其質(zhì)量和效率直接關(guān)系到城市的公共安全、居民生活質(zhì)量和能源利用效率。隨著科技的進(jìn)步,高亮度白光發(fā)光二極管(LED)因其獨(dú)特的優(yōu)勢(shì)逐漸取代傳統(tǒng)光源,成為大功率區(qū)域...

關(guān)鍵字: 發(fā)光二極管 驅(qū)動(dòng)電源 LED

LED通用照明設(shè)計(jì)工程師會(huì)遇到許多挑戰(zhàn),如功率密度、功率因數(shù)校正(PFC)、空間受限和可靠性等。

關(guān)鍵字: LED 驅(qū)動(dòng)電源 功率因數(shù)校正

在LED照明技術(shù)日益普及的今天,LED驅(qū)動(dòng)電源的電磁干擾(EMI)問(wèn)題成為了一個(gè)不可忽視的挑戰(zhàn)。電磁干擾不僅會(huì)影響LED燈具的正常工作,還可能對(duì)周圍電子設(shè)備造成不利影響,甚至引發(fā)系統(tǒng)故障。因此,采取有效的硬件措施來(lái)解決L...

關(guān)鍵字: LED照明技術(shù) 電磁干擾 驅(qū)動(dòng)電源

開(kāi)關(guān)電源具有效率高的特性,而且開(kāi)關(guān)電源的變壓器體積比串聯(lián)穩(wěn)壓型電源的要小得多,電源電路比較整潔,整機(jī)重量也有所下降,所以,現(xiàn)在的LED驅(qū)動(dòng)電源

關(guān)鍵字: LED 驅(qū)動(dòng)電源 開(kāi)關(guān)電源

LED驅(qū)動(dòng)電源是把電源供應(yīng)轉(zhuǎn)換為特定的電壓電流以驅(qū)動(dòng)LED發(fā)光的電壓轉(zhuǎn)換器,通常情況下:LED驅(qū)動(dòng)電源的輸入包括高壓工頻交流(即市電)、低壓直流、高壓直流、低壓高頻交流(如電子變壓器的輸出)等。

關(guān)鍵字: LED 隧道燈 驅(qū)動(dòng)電源
關(guān)閉