RabbitMQ詳解
RabbitMQ的優點:
- 開源, 功能好效, 安定性好
- 提供可靠性消息投遞形式(confirm), 前往形式(return)等
- 與Spring完善整合, API豐厚
- 集群形式豐厚, 支持表達式設置, 高可用HA形式, 鏡像行列模子
- 可以確保數據不喪失的條件下做到高可靠性, 可用性
RabbitMQ高功能緣故:
- 由Erlang言語開發,承繼其天生的并發性,安定性和寧靜性有保證
RabbitMQ的協議:
AMQP(Advanced Message Queuing Protocol)高等消息行列協議,是一個異步消息轉達所使用使用層協議標準,為面向消息正中件計劃,基于此協議的客戶端與消息正中件可以無視消息泉源轉達消息,不受客戶端、消息正中件、不同的開發言語情況等條件的限定。
計劃看法表明:
- Server : 又稱Broker, 承受客戶端毗連, 完成AMQP實體辦事
- Connection : 毗連, 使用步驟與Broker的網絡毗連
- Channel : 網絡信道, 幾乎一切的利用都在Channel中舉行, Channel是舉行消息讀寫的通道??蛻舳丝梢詣摻ǘ鄠€Channel, 每個Channel代表一個會話職責。
- Message : 消息, 辦事器和使用步驟之間傳送的數據, 有Properties和Body構成。Properties可以抵消息舉行修飾, 好比消息的優先級, 延長等高等特性; Body就是消息體內容。
- Virtual Host : 假造地點, 用于舉行邏輯斷絕, 最表層的消息路由。一個Virtual Host內里可以有多少個Exchange和Queue, 同一個Virtual Host內里不克不及有相反稱呼的Exchange或Queue
- Exchange : 互換機, 用于吸收消息, 依據路由鍵轉發消息到綁定的行列
- Binding : Exchange和Queue之間的假造毗連, binding中可以包含routing key
- Routing Key : 一個路由端正, 假造機可用它來確定怎樣路由一個特定消息
- Queue : 也成Message Queue, 消息行列, 用于保存消息并將它們轉發給消耗者
RabbitMQ全體架構
RabbitMQ成員簡介
Binding-綁定
- Exchange和Exchange, Queue之間的毗連干系
- 綁定中可以包含RoutingKey大概參數
Queue-消息行列
- 消息行列, 實踐存儲消息數據
- Durability : 對否歷久化
- Auto delete : 如選yes,代表當最初一個監聽被移除之后, 該Queue會主動被刪除
Message-消息
- 辦事和使用步驟之間傳送的數據
- 實質上就是一段數據, 由Properties和Payload(Body)構成
- 常用屬性 : delivery mode, headers(自界說屬性)
- 其他屬性content_type, content_encoding, prioritycorrelation_id : 可以以為是消息的唯一idreplay_to : 重回行列設定expiration : 消息過時時間message_id : 消息idtimestamp, type, user_id, app_id, cluster_id
Virtual Host-假造主機
- 假造地點, 用于舉行邏輯斷絕, 最表層的消息路由
- 一個Virtual Host內里可以有多少個Exchange和Queue
- 同一個Virtual Host內里不克不及有相反稱呼的Exchange或Queue
Exchange互換機
吸收消息,并依據路由鍵轉發消息到所綁定的行列
注:互換機不會存儲消息,假如消息發送到沒有綁定消耗行列的互換機,消息則喪失。
互換機的屬性
- Name : 互換機稱呼
- Type : 互換機典范, direct, topic, fanout, headers
- Durability : 對否必要歷久化, true為歷久化
- Auto Delete : 當最初一個綁定到Exchange上的行列刪除后, 主動刪除該Exchange
- Internal : 如今Exchange對否用于RabbitMQ內里使用, 默以為False, 這個屬性很少會用到
- Arguments : 擴展參數, 用于擴展AMQP協議訂定化使用
互換機的四品種型
- Direct exchange(直連互換機)是依據消息攜帶的路由鍵(routing key)將消息投遞給對應行列的注意 : Direct形式可以使用RabbitMQ自帶的Exchange(default Exchange), 以是不必要將Exchange舉行任何綁定(binding)利用, 消息轉達時, RoutingKey必需完全婚配才會被行列吸收, 不然該消息會被丟棄
- Fanout exchange(扇型互換機)將消息路由給綁定到它身上的一切行列不處理路由鍵, 只必要簡便的將行列綁定到互換機上發送到互換機的消息都市被轉發到與該互換機綁定的一切行列上Fanout互換機轉發消息是最快的
- Topic exchange(主題互換機)行列經過路由鍵綁定到互換機上,然后,互換機依據消息里的路由值,將消息路由給一個或多個綁定行列(含糊婚配)“#” : 婚配一個或多個詞“*” : 婚配一個詞
- Headers exchange(頭互換機)相似主題互換機,但是頭互換機使用多個消息屬性來代替路由鍵創建路由端正。經過推斷消息頭的值可否與指定的綁定相婚配來建立路由端正。
RabbitMQ常用的5種事情形式
1、點對點(簡便)的行列
2、事情行列(公平性)
- 不必要互換機
- 一個消費者,多個消耗者,但是一個消息只會發送給一個行列(競爭的消耗者形式)
- 默許是輪詢,即會將消息輪替發給多個消耗者,但如此抵消耗得比力慢的消耗者不公平
- 可接納公中分派,即能者多勞channel.basicQos(1);// 限定:發送一條信息給消耗者A,消耗者A未反應處理后果之前,不會再次發送信息給消耗者Aboolean autoAck = false;// 取消主動反應 channel.basicConsume(QUEUE_NAME, autoAck, consumer);// 吸收信息channel.basicAck(envelope.getDeliveryTag(), false);// 反應消息處理終了
3、公布/訂閱
- 一個消費者,多個消耗者
- 每一個消耗者都有本人的一個行列
- 消費者沒有直接發消息到行列中,而是發送到互換機
- 每個消耗者的行列都綁定到互換機上
- 消息經過互換機抵達每個消耗者的行列
該形式就是Fanout Exchange(扇型互換機)將消息路由給綁定到它身上的一切行列
4、路由
消費者發送消息到互換機并指定一個路由key,消耗者行列綁定到互換機時要訂定路由key(key婚配就能承受消息,key不婚配就不克不及承受消息)
該形式接納Direct exchange(直連互換機)
5、主題(通配符)
此形式真實路由key形式的基本上,使用了通配符來辦理消耗者吸收消息。消費者P發送消息到互換機X,互換機依據綁定行列的routing key的值舉行通配符婚配
標記#:婚配一個大概多個詞lazy.# 可以婚配lazy.irs大概lazy.irs.cor
標記*:只能婚配一個詞lazy.* 可以婚配lazy.irs大概lazy.cor
該形式接納Topic exchange(主題互換機)
消息可靠性轉達或回退(消費者端)
消費者發送消息出去之后,不曉得畢竟有沒有發送到RabbitMQ辦事器, 默許是不曉得的。并且有的時分我們在發送消息之后,后方的邏輯出成績了,我們不想要發送之前的消息了,必要撤回該怎樣做。
AMQP 事件機制
- txSelect 將如今channel設置為transaction形式
- txCommit 提交如今事件
- txRollback 事件回滾
Confirm 形式
消息的確認, 是指消費者投遞消息后, 假如Broker收到消息, 則會給我們產生一個應對
消費者舉行吸收應對, 用來確定這條消息對否正常發送到Broker, 這種辦法也是消息的可靠性投遞的中心保證
- 在channel上開啟確認形式 : channel.confirmSelect()
- 在channel上添加監聽 : addConfirmListener, 監聽告捷和失敗的前往后果, 依據具體的后果抵消息舉行重新發送, 或紀錄日志等后續處理
Return消息機制
Return Listener用于處理一些不成路由的消息
正常情況下消息消費者經過指定一個Exchange和RoutingKey, 把消息送到某一個行列中去, 然后消耗者監聽行列, 舉行消耗,但在某些情況下, 假如在發送消息的時分, 如今的exchange不存在大概指定的路由key路由不到,這個時分假如我們必要監聽這種不成達的消息, 就要使用Return Listener。
在基本API中有一個緊張的設置項Mandatory : 假如為true, 則監聽器會吸收到路由不成達的消息, 然后舉行后續處理(補償或人工處理), 假如為false, 那么broker端主動刪除該消息。
怎樣保證消息可靠轉達
- 保證消息的告捷發射
- 保證MQ節點的告捷吸收
- 發送端收到MQ節點(Broker)的確認應對
- 完滿的消息補償機制
方案:
1、消息落庫, 抵消息形態舉行標志
- step1:消息入庫
- step2:消息發送
- step3:消耗端消息確認
- step4:更新庫中消息形態為已確認
- step5:定時職責讀取數據庫中未確認的消息
- step6:未收到確認后果的消息重新發送
- step7:假如重試多次之后仍舊失敗, 則將消息形態變動為投遞失敗的終態, 后方必要人工到場
2、消息的延長投遞, 做二次確認, 回調反省
- step1 : 第一次消息發送, 必需業務數據落庫之后才干舉行消息發送
- step2 : 第二次消息延長發送, 設定延長一段時間發送第二次check消息
- step3 : 消耗端監聽Broker, 舉行消息消耗
- step4 : 消耗告捷之后, 發送確認消息到確認消息行列
- step5 : Callback Service監聽step4中的確認消息行列, 維護消息形態, 對否消耗告捷等形態
- step6 : Callback Service監聽step2發送的Delay Check的消息行列, 檢測內里的消息形態, 假如消息是發送告捷形態, 則流程完畢, 假如消息是失敗形態, 大概查不到如今消息形態時, 會關照消費者, 舉行消息重發, 重新上述步調
重試機制和冪等性保證(消耗者端)
重試機制
消耗者在消耗消息的時分,假如消耗者業務邏輯顯現步驟特別,會使用消息重試機制。
- 情況1: 消耗者獲取到消息后,調用第三方接口,但接口暫且無法拜候,對否必要重試? (必要重試機制)
- 情況2: 消耗者獲取到消息后,拋出數據轉換特別,對否必要重試?(不必要重試機制)必要公布舉行處理。
關于情況2,假如消耗者代碼拋出特別是必要公布新版本才干處理的成績,那么不必要重試,重試也于事無補。應該接納日志紀錄+定時職責job康健反省+人工舉行補償
重試機制的完成
在SpringBoot中,@RabbitListener(queue="")用于消耗者監聽行列。底層使用Aop舉行攔阻,假如步驟沒有拋出特別,則主動提交事件。假如拋出特別,該消息會緩存到RabbitMQ辦事器,主動實行重試機制,不休到告捷為止??梢栽O置重試距離時間和重試的次數。
冪等性保證
冪等性:多次實行, 后果堅持一律
網絡延長傳輸中,消耗顯現特別大概是消耗延長消耗,會形成MQ舉行重試補償,在重試歷程中,約莫會形成反復消耗。
處理方案:
- 唯一ID+指紋碼機制唯一ID + 指紋碼機制,使用數據庫主鍵去重SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID +指紋碼利益:完成簡便壞處:高并發下多數據庫寫入的功能瓶頸處理方案:跟進ID舉行分庫分表舉行算法路由
- 使用Redis的原子性去完成在吸收到消息后將消息ID作為key實行 setnx 下令,假如實行告捷就表現沒有處理過這條消息,可以舉行消耗了,實行失敗表現消息以前被消耗了。
主動簽收與手動簽收(消耗端)
默許是主動簽收
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
channel.basicAck(envelope.getDeliveryTag(), false);
消耗端限流
消息行列中囤積了多量的消息, 大概某些時候消費的消息遠宏大于消耗者處理才能的時分, 這個時分假如消耗者一次取出多量的消息, 但是客戶端又無法處理, 就會顯現成績, 乃至約莫招致辦事崩潰, 以是必要抵消耗端舉行限流
RabbitMQ提供了一種qos(辦事質量確保)功效, 即在非主動確認消息的條件下, 假如一定數目標消息(經過consumer大概channel設置qos的值)未被確認前, 不舉行消耗新的消息
- 主動簽收要設置成false, 發起實踐事情中也設置成false
- void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;prefetchSize : 消息輕重限定, 尋常設置為0, 消耗端不做限定prefetchCount : 會報告RabbitMQ不要同時給一個消耗者推送多于N個消息, 即一旦有N個消息還沒有ack, 則該consumer將block(壅閉), 直到有消息ackglobal : true/false 對否將外表設置使用于channel, 簡便來說就是外表的限定是channel級別的照舊consumer級別 注意 :
prefetchSize和global這兩項,RabbitMQ沒有完成,臨時不眷注,prefetchCount在autoAck設置false的情況下奏效,即在主動確認的情況下這個值是不奏效的
限流可完成公平行列。
