挑戰Kafka!Redis5.0重量級特性Stream嘗鮮

尋夢新聞LINE@每日推播熱門推薦文章,趣聞不漏接❤️

加入LINE好友

作者簡介:錢文品(老錢),互聯網分布式高並發技術十年老兵,目前任掌閱科技資深後端工程師。熟練使用 Java、Python、Golang 等多種計算機語言,開發過遊戲,製作過網站,寫過消息推送系統和MySQL 中間件,做到過開源的 ORM 框架、Web 框架、RPC 框架等

Redis5.0最近被作者突然放出來了,增加了很多新的特色功能。而Redis5.0最大的新特性就是多出了一個數據結構Stream,它是一個新的強大的支持多播的可持久化的消息隊列,作者坦言Redis Stream狠狠地借鑒了Kafka的設計。

Redis Stream的結構如上圖所示,它有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的ID和對應的內容。消息是持久化的,Redis重啟後,內容還在。

每個Stream都有唯一的名稱,它就是Redis的key,在我們首次使用xadd指令追加消息時自動創建。

每個Stream都可以掛多個消費組,每個消費組會有個遊標last_delivered_id在Stream數組之上往前移動,表示當前消費組已經消費到哪條消息了。每個消費組都有一個Stream內唯一的名稱,消費組不會自動創建,它需要單獨的指令xgroup create進行創建,需要指定從Stream的某個消息ID開始消費,這個ID用來初始化last_delivered_id變量。

每個消費組(Consumer Group)的狀態都是獨立的,相互不受影響。也就是說同一份Stream內部的消息會被每個消費組都消費到。

同一個消費組(Consumer Group)可以掛接多個消費者(Consumer),這些消費者之間是競爭關係,任意一個消費者讀取了消息都會使遊標last_delivered_id往前移動。每個消費者者有一個組內唯一名稱。

消費者(Consumer)內部會有個狀態變量pending_ids,它記錄了當前已經被客戶端讀取的消息,但是還沒有ack。如果客戶端沒有ack,這個變量里面的消息ID會越來越多,一旦某個消息被ack,它就開始減少。這個pending_ids變量在Redis官方被稱之為PEL,也就是Pending Entries List,這是一個很核心的數據結構,它用來確保客戶端至少消費了消息一次,而不會在網路傳輸的中途丟失了沒處理。

消息ID

消息ID的形式是timestampInMillis-sequence,例如1527846880572-5,它表示當前的消息在毫米時間戳1527846880572時產生,並且是該毫秒內產生的第5條消息。消息ID可以由服務器自動生成,也可以由客戶端自己指定,但是形式必須是整數-整數,而且必須是後面加入的消息的ID要大於前面的消息ID。

消息內容

消息內容就是鍵值對,形如hash結構的鍵值對,這沒什麼特別之處。

增刪改查

xadd 追加消息xdel 刪除消息,這里的刪除僅僅是設置了標誌位,不影響消息總長度xrange 獲取消息列表,會自動過濾已經刪除的消息xlen 消息長度del 刪除Stream

# *號表示服務器自動生成ID,後面順序跟著一堆key/value127.0.0.1:6379> xadd codehole * name laoqian age 30 # 名字叫laoqian,年齡30歲1527849609889-0 # 生成的消息ID127.0.0.1:6379> xadd codehole * name xiaoyu age 291527849629172-0127.0.0.1:6379> xadd codehole * name xiaoqian age 11527849637634-0127.0.0.1:6379> xlen codehole(integer) 3127.0.0.1:6379> xrange codehole – + # -表示最小值, +表示最大值127.0.0.1:6379> xrange codehole – +1) 1) 1527849609889-02) 1) “name”2) “laoqian”3) “age”4) “30”2) 1) 1527849629172-02) 1) “name”2) “xiaoyu”3) “age”4) “29”3) 1) 1527849637634-02) 1) “name”2) “xiaoqian”3) “age”4) “1”127.0.0.1:6379> xrange codehole 1527849629172-0 + # 指定最小消息ID的列表1) 1) 1527849629172-02) 1) “name”2) “xiaoyu”3) “age”4) “29”2) 1) 1527849637634-02) 1) “name”2) “xiaoqian”3) “age”4) “1”127.0.0.1:6379> xrange codehole – 1527849629172-0 # 指定最大消息ID的列表1) 1) 1527849609889-02) 1) “name”2) “laoqian”3) “age”4) “30”2) 1) 1527849629172-02) 1) “name”2) “xiaoyu”3) “age”4) “29”127.0.0.1:6379> xdel codehole 1527849609889-0(integer) 1127.0.0.1:6379> xlen codehole # 長度不受影響(integer) 3127.0.0.1:6379> xrange codehole – + # 被刪除的消息沒了1) 1) 1527849629172-02) 1) “name”2) “xiaoyu”3) “age”4) “29”2) 1) 1527849637634-02) 1) “name”2) “xiaoqian”3) “age”4) “1”127.0.0.1:6379> del codehole # 刪除整個Stream(integer) 1

獨立消費

我們可以在不定義消費組的情況下進行Stream消息的獨立消費,當Stream沒有新消息時,甚至可以阻塞等待。Redis設計了一個單獨的消費指令xread,可以將Stream當成普通的消息隊列(list)來使用。使用xread時,我們可以完全忽略消費組(Consumer Group)的存在,就好比Stream就是一個普通的列表(list)。

# 從Stream頭部讀取兩條消息127.0.0.1:6379> xread count 2 streams codehole 0-01) 1) “codehole”2) 1) 1) 1527851486781-02) 1) “name”2) “laoqian”3) “age”4) “30”2) 1) 1527851493405-02) 1) “name”2) “yurui”3) “age”4) “29”# 從Stream尾部讀取一條消息,毫無疑問,這里不會返回任何消息127.0.0.1:6379> xread count 1 streams codehole $(nil)# 從尾部阻塞等待新消息到來,下面的指令會堵住,直到新消息到來127.0.0.1:6379> xread block 0 count 1 streams codehole $# 我們從新打開一個窗口,在這個窗口往Stream里塞消息127.0.0.1:6379> xadd codehole * name youming age 601527852774092-0# 再切換到前面的窗口,我們可以看到阻塞解除了,返回了新的消息內容# 而且還顯示了一個等待時間,這里我們等待了93s127.0.0.1:6379> xread block 0 count 1 streams codehole $1) 1) “codehole”2) 1) 1) 1527852774092-02) 1) “name”2) “youming”3) “age”4) “60”(93.11s)

客戶端如果想要使用xread進行順序消費,一定要記住當前消費到哪里了,也就是返回的消息ID。下次繼續調用xread時,將上次返回的最後一個消息ID作為參數傳遞進去,就可以繼續消費後續的消息。

block 0表示永遠阻塞,直到消息到來,block 1000表示阻塞1s,如果1s內沒有任何消息到來,就返回nil

127.0.0.1:6379> xread block 1000 count 1 streams codehole $(nil)(1.07s)

創建消費組

Stream通過xgroup create指令創建消費組(Consumer Group),需要傳遞起始消息ID參數用來初始化last_delivered_id變量。

127.0.0.1:6379> xgroup create codehole cg1 0-0 # 表示從頭開始消費OK# $表示從尾部開始消費,只接受新消息,當前Stream消息會全部忽略127.0.0.1:6379> xgroup create codehole cg2 $OK127.0.0.1:6379> xinfo codehole # 獲取Stream信息1) length2) (integer) 3 # 共3個消息3) radix-tree-keys4) (integer) 15) radix-tree-nodes6) (integer) 27) groups8) (integer) 2 # 兩個消費組9) first-entry # 第一個消息10) 1) 1527851486781-02) 1) “name”2) “laoqian”3) “age”4) “30”11) last-entry # 最後一個消息12) 1) 1527851498956-02) 1) “name”2) “xiaoqian”3) “age”4) “1”127.0.0.1:6379> xinfo groups codehole # 獲取Stream的消費組信息1) 1) name2) “cg1″3) consumers4) (integer) 0 # 該消費組還沒有消費者5) pending6) (integer) 0 # 該消費組沒有正在處理的消息2) 1) name2) “cg2″3) consumers # 該消費組還沒有消費者4) (integer) 05) pending6) (integer) 0 # 該消費組沒有正在處理的消息

消費

Stream提供了xreadgroup指令可以進行消費組的組內消費,需要提供消費組名稱、消費者名稱和起始消息ID。它同xread一樣,也可以阻塞等待新消息。讀到新消息後,對應的消息ID就會進入消費者的PEL(正在處理的消息)結構里,客戶端處理完畢後使用xack指令通知服務器,本條消息已經處理完畢,該消息ID就會從PEL中移除。

# >號表示從當前消費組的last_delivered_id後面開始讀# 每當消費者讀取一條消息,last_delivered_id變量就會前進127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >1) 1) “codehole”2) 1) 1) 1527851486781-02) 1) “name”2) “laoqian”3) “age”4) “30”127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >1) 1) “codehole”2) 1) 1) 1527851493405-02) 1) “name”2) “yurui”3) “age”4) “29”127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 2 streams codehole >1) 1) “codehole”2) 1) 1) 1527851498956-02) 1) “name”2) “xiaoqian”3) “age”4) “1”2) 1) 1527852774092-02) 1) “name”2) “youming”3) “age”4) “60”# 再繼續讀取,就沒有新消息了127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >(nil)# 那就阻塞等待吧127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole ># 開啟另一個窗口,往里塞消息127.0.0.1:6379> xadd codehole * name lanying age 611527854062442-0# 回到前一個窗口,發現阻塞解除,收到新消息了127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >1) 1) “codehole”2) 1) 1) 1527854062442-02) 1) “name”2) “lanying”3) “age”4) “61”(36.54s)127.0.0.1:6379> xinfo groups codehole # 觀察消費組信息1) 1) name2) “cg1″3) consumers4) (integer) 1 # 一個消費者5) pending6) (integer) 5 # 共5條正在處理的信息還有沒有ack2) 1) name2) “cg2″3) consumers4) (integer) 0 # 消費組cg2沒有任何變化,因為前面我們一直在操縱cg15) pending6) (integer) 0# 如果同一個消費組有多個消費者,我們可以通過xinfo consumers指令觀察每個消費者的狀態127.0.0.1:6379> xinfo consumers codehole cg1 # 目前還有1個消費者1) 1) name2) “c1″3) pending4) (integer) 5 # 共5條待處理消息5) idle6) (integer) 418715 # 空閒了多長時間ms沒有讀取消息了# 接下來我們ack一條消息127.0.0.1:6379> xack codehole cg1 1527851486781-0(integer) 1127.0.0.1:6379> xinfo consumers codehole cg11) 1) name2) “c1″3) pending4) (integer) 4 # 變成了5條5) idle6) (integer) 668504# 下面ack所有消息127.0.0.1:6379> xack codehole cg1 1527851493405-0 1527851498956-0 1527852774092-0 1527854062442-0(integer) 4127.0.0.1:6379> xinfo consumers codehole cg11) 1) name2) “c1″3) pending4) (integer) 0 # pel空了5) idle6) (integer) 745505

Stream消息太多怎麼辦

讀者很容易想到,要是消息積累太多,Stream的鏈表豈不是很長,內容會不會爆掉就是個問題了。xdel指令又不會刪除消息,它只是給消息做了個標誌位。

Redis自然考慮到了這一點,所以它提供了一個定長Stream功能。在xadd的指令提供一個定長長度maxlen,就可以將老的消息幹掉,確保最多不超過指定長度。

127.0.0.1:6379> xlen codehole(integer) 5127.0.0.1:6379> xadd codehole maxlen 3 * name xiaorui age 11527855160273-0127.0.0.1:6379> xlen codehole(integer) 3

我們看到Stream的長度被砍掉了。

消息如果忘記ACK會怎樣

Stream在每個消費者結構中保存了正在處理中的消息ID列表PEL,如果消費者收到了消息處理完了但是沒有回復ack,就會導致PEL列表不斷增長,如果有很多消費組的話,那麼這個PEL占用的記憶體就會放大。

PEL如何避免消息丟失

在客戶端消費者讀取Stream消息時,Redis服務器將消息回復給客戶端的過程中,客戶端突然斷開了連接,消息就丟失了。但是PEL里已經保存了發出去的消息ID。待客戶端重新連上之後,可以再次收到PEL中的消息ID列表。不過此時xreadgroup的起始消息ID不能為參數>,而必須是任意有效的消息ID,一般將參數設為0-0,表示讀取所有的PEL消息以及自last_delivered_id之後的新消息。

結論

Stream的消費模型借鑒了kafka的消費分組的概念,它彌補了Redis Pub/Sub不能持久化消息的缺陷。但是它又不同於kafka,kafka的消息可以分partition,而Stream不行。如果非要分parition的話,得在客戶端做,提供不同的Stream名稱,對消息進行hash取模來選擇往哪個Stream里塞。如果讀者稍微研究過Redis作者的另一個開源項目Disque的話,這極可能是作者意識到Disque項目的活躍程度不夠,所以將Disque的內容移植到了Redis里面。這只是本人的猜測,未必是作者的初衷。如果讀者有什麼不同的想法,可以在評論區一起參與討論。