用 Golang 實作 NSQ:微服務訊息/事件傳遞系統

NSQ 是和 Event Sourcing 理念有所關聯的即時性分布式訊息傳遞系統,看到這裡你可能會以為是作即時通訊軟體用的,但實際上並不是。額外有趣的一點是 NSQ 也是以 Golang 所撰寫的。

我們會在文章中以實際情況、為什麼需要 NSQ 來解答疑惑並且實作一個真正的範例。

在閱讀本篇之前,也許你可以看看另一個相關的文章:「用 Golang 實作 Event Store:微服務事件儲藏中心」這篇文章提及到了為什麼我們應該用事件,在這裡我們仍會以類似的案例來作為前文導讀。

在微服務結構中,沒有服務應該依賴另一個服務,意思是服務都是自主的,任何事情都是自己處理,很多人會在這點上面開始琢磨該如何實作,但請切記:

請不要把舊有的知識套用在準備學習的事物上。

否則只會讓自己走入死循環中。這方面本篇文章並不會提及,因為相關概念多到可以寫成一篇文章,若有需要理解可以參考:「微服務概念與溝通——Golang 微服務實作教學與範例」一文。

實際案例

假設我們今天有個使用者服務,這個服務會在新使用者註冊時順道發送歡迎信件,然後繼續做接下來該做的事情,這個時候的規劃會像這樣。

但這個結構仔細一看⋯⋯我們需要先發送信件才能夠繼續執行接下來的事情,而且如果信件處理過久,我們的服務就會整個卡死,這個問題可以在接下來順帶獲得解決。

信件跟使用者貌似沒什麼關係,所以我們可以把信件獨立成一個服務,接著在建立使用者時呼叫信件服務來寄信⋯⋯

解決方案

等等!剛才那樣不就違反了微服務的「自主性」原則嗎?微服務不能依賴另一個微服務,這會令你的結構有著過度相依性,之後的維護會越來越困難。而這個時候我們就可以透過訊息傳遞系統來協助我們解決這個問題。

首先信件服務需要向傳遞系統註冊一個 send_email 事件,一但有任何 send_email 事件發生,信件服務就能夠知道。簡單說就是監聽事件。

當新使用者註冊時,使用者服務就可以發送帶有新使用者資料的 send_email 事件給傳遞系統。由於先前信件服務已經註冊了 send_email 事件,所以信件服務就會接收到這個事件,然後我們就可以在信件服務發送歡迎信件給這個使用者。

現在這個方案解決了服務與服務之間的依賴性,也順帶解決了服務可能卡死的問題。當使用者服務發送 send_email 事件之後並不需要管理、等待 send_email 的完成。因為這個事件已經交給傳遞系統處理了,所以我們就能夠繼續處理下個工作,這也叫做異步處理,很常出現在分布式系統上。

為什麼是 NSQ?

NSQ 解決的就是分布式訊息傳遞,讓你能夠在服務之間良好地溝通。在 NSQ 中有兩個重要的元素,一個是「主題(Topics)」,另一個則是「頻道(Channels)」。

一個主題就是像事件那樣(如:send_emailuser_created)等。而一個主題可以有很多個頻道。讓我們假設有五個不同的服務想要監聽同一個主題,這個時候就會開設五個頻道。

有趣的是,如果五個不同的服務只開設一個頻道來監聽的話,那麼這個主題只會被分發到其中一個服務(類似負載平衡)。


1. 安裝與啟動 NSQ

先進入 NSQ 的下載頁面,接著選取符合你系統的壓縮或安裝檔。

接著解壓縮,然後在解壓縮的 /bin 目錄直接執行 nsqlookupd 檔案即可。

nsqlookupd 是 NSQ 的叢集管理中心,我們之後都會發送請求到這裡,接著 nsqlookupd 會回傳一個可用的 NSQ 服務給我們使用。

$ ./nsqlookupd 

當然,這還沒完,開啟另一個終端機執行下列指令來建立單個真正的 NSQ 服務。

$ ./nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1

然後再開另一個終端機輸入下列指令啟動 NSQ 叢集管理的網頁介面。

$ ./nsqadmin --lookupd-http-address=127.0.0.1:4161

這個時候你會有至少三個終端機介面。

2.網頁管理介面

前往 http://127.0.0.1:4171/ 就能夠訪問 NSQ Admin 的介面。

雖然這個界面鳥鳥的(謝謝你,Bootstrap),但之後你可以在這裡管理訊息的傳遞,或是查看有哪些訊息被放置在 RAM、Disk 等待傳送。

3.建立主題與頻道

接著我們來到 Lookups 分頁,建立一個新的 test 主題和 test_channel 頻道,這會在稍後使用。 請注意的是:如果你沒有建立主題就向該主題發送訊息,你會得到錯誤,所以你必須先建立主題才行。

4. 引用套件庫

在這裡我們會用到 go-nsq 套件。在終端機中透過 go get 指令取得。

$ go get -u -v github.com/nsqio/go-nsq

接著在你的程式中引用該套件。

import "github.com/bitly/go-nsq"  

5. 生產者

發送、產生訊息的那一端叫做生產者,首先我們需要透過下列程式碼來連線到 NSQ 並且宣告說我們是生產者。

package main

import (  
    "log"
    "github.com/bitly/go-nsq"
)

func main() {  
    // 建立空白設定檔。
    config := nsq.NewConfig()
    // 用該設定檔新建一個連線到 nsqd。
    w, _ := nsq.NewProducer("127.0.0.1:4150", config)

    // 向 `test` 主題發布一個 `hello, world!` 測試訊息。
    err := w.Publish("test", []byte("hello, world!"))
    if err != nil {
        log.Panic("連線失敗。")
    }

    // 結束與 nsqd 的連線。
    w.Stop()
}

連線之後我們會發佈一則 hello, world! 的訊息到 test 主題中,透過 go run 來測試。

$ go run main.go

接著你就能在網頁介面中看見這則訊息目前被放置在硬碟(或記憶體)中等待送出。

接著我們會需要一個消費者接收並且「消化」這則消息。

6. 消費者

消費者負責「消化」在佇列的訊息,簡單說就是接收訊息的那一端。這意味著當你的服務離線時,你並不會因此就丟失這則訊息,這則訊息會從記憶體中移動至硬碟並且重新排程在之後重新發送訊息直至有消費者消化這則訊息。

要注意的是你不能夠依賴訊息的順序,因為訊息會重新排程,所以順序也就不可靠。

package main

import (  
    "log"

    "github.com/bitly/go-nsq"
)

func main() {

    // 建立「已接收」頻道,作為是否接收到訊息的一個開關。
    received := make(chan bool)
    // 建立空白設定檔。
    config := nsq.NewConfig()

    // 用該設定檔新建一個消費者建構體,然後表明要訂閱 `test` 主題,並訂閱該主題中的 `test_channel` 頻道。
    q, _ := nsq.NewConsumer("test", "test_channel", config)
    // 建立訊息接收函式,當我們接收到訊息就會呼叫這個函式。
    q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        // 顯示我們接收到的訊息。
        log.Printf("接收到了一個訊息:%v", message)
        log.Printf("這個訊息轉換成字串就是:%v", string(message.Body))
        // 對「已接收」頻道傳送 `true` 就能讓程式結束。
        received <- true

        return nil
    }))

    // 連線到 NSQ 叢集,而不是單個 NSQ,這樣更安全與可靠。
    err := q.ConnectToNSQLookupd("127.0.0.1:4161")
    if err != nil {
        log.Panic("連線失敗。")
    }

    // 除非接收到訊息,不然我們就讓程式卡住。
    <-received
}

在上述程式碼中我們連線到 NSQ 叢集,這是一個好習慣,因為這樣比起指連線到單個 NSQ 節點來說更可靠、安全。

接著我們也用了頻道來阻塞程式,避免我們還沒接收到訊息程式就結束了。現在請用下面這行指令執行。

$ go run main.go

你就會看到這樣的輸出。

2017/01/14 19:51:45 接收到了一個訊息:&{[48 55 54 49 52 101 53 54 102 56 53 50 49 48 48 48] [104 101 108 108 111 44 32 119 111 114 108 100 33] 1484393999276267909 2 127.0.0.1:4150 0xc420108010 0 0}  
2017/01/14 19:51:45 這個訊息轉換成字串就是:hello, world!