開發與維運

Watermill(Golang 事件驅動庫)Message Router 解析

Configuration

// ...
type RouterConfig struct {
   // CloseTimeout 確定路由在關閉時應為處理程序工作多長時間。
   CloseTimeout time.Duration
}
func (c *RouterConfig) setDefaults() {
    if c.CloseTimeout == 0 {
        c.CloseTimeout = time.Second * 30
    }
}
func (c RouterConfig) Validate() error {
    return nil
}
// ...

Handler

首先,您需要實現 HandlerFunc:

// ...
// HandlerFunc 是在收到消息時調用的函數。
//
// msg.Ack() 會在 HandlerFunc 沒有返回錯誤時自動調用。
// 當 HandlerFunc 返回錯誤時,msg.Nack() 被調用。
// 當 msg.Ack() 在 handler 中被調用並且 HandlerFunc 返回錯誤時,
// msg.Nack() 將不會被髮送,因為 Ack 已經發送了。
//
// HandlerFunc 在接收到多條消息時並行執行
// (因為 msg.Ack() 是在 HandlerFunc 中發送的,或者訂閱者支持多個消費者)
type HandlerFunc func(msg *Message) ([]*Message, error)
// ...

接下來,您必須使用 Router.AddHandler 添加新的處理程序:

// ...
// AddHandler 添加一個新的處理程序。
//
// handlerName 必須唯一。目前,它僅用於調試。
//
// subscribeTopic 是一個處理程序將從其中接收消息的 topic。
//
// publishTopic 是一個 router 將生成 handlerFunc 返回的消息的 topic。
// 
// 當處理程序需要發佈到多個主題時,
// 建議僅向處理程序注入 Publisher 或實現中間件,
// 該中間件將捕獲消息並基於例如元數據發佈到主題。
func (r *Router) AddHandler(
    handlerName string,
    subscribeTopic string,
    subscriber Subscriber,
    publishTopic string,
    publisher Publisher,
    handlerFunc HandlerFunc,
) *Handler {
    r.logger.Info("Adding handler", watermill.LogFields{
        "handler_name": handlerName,
        "topic":        subscribeTopic,
    })
    if _, ok := r.handlers[handlerName]; ok {
        panic(DuplicateHandlerNameError{handlerName})
    }
    publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)
    newHandler := &handler{
        name:   handlerName,
        logger: r.logger,
        subscriber:     subscriber,
        subscribeTopic: subscribeTopic,
        subscriberName: subscriberName,
        publisher:     publisher,
        publishTopic:  publishTopic,
        publisherName: publisherName,
        handlerFunc:       handlerFunc,
        runningHandlersWg: r.runningHandlersWg,
        messagesCh:        nil,
        closeCh:           r.closeCh,
    }
    r.handlers[handlerName] = newHandler
    return &Handler{
        router:  r,
        handler: newHandler,
    }
}
// AddNoPublisherHandler 添加一個新的 handler。
// 該 handler 無法返回消息。
// 當消息返回時,它將發生一個錯誤,Nack 將被髮送。
//
// handlerName 必須唯一。目前,它僅用於調試。
//
// subscribeTopic 是一個 handler 將從其中接收消息的 topic。
//
// subscriber 是將從其消費消息的 Subscriber。
func (r *Router) AddNoPublisherHandler(
    handlerName string,
    subscribeTopic string,
    subscriber Subscriber,
    handlerFunc NoPublishHandlerFunc,
) {
// ...

參見入門的示例用法:

// ...
   handler := router.AddHandler(
        "struct_handler",          // handler 名稱,必須是唯一的
       "incoming_messages_topic", // 我們將從中讀取事件的 topic
       pubSub,
        "outgoing_messages_topic", // 我們將向其發佈事件的 topic
       pubSub,
        structHandler{}.Handler,
    )
// ...

No publisher handler

並非每個處理程序都會產生新消息。您可以使用 Router.AddNoPublisherHandler 添加此類處理程序:

// ...
// AddNoPublisherHandler 添加一個新的 handler。
// 該 handler 無法返回消息。
// 當消息返回時,它將發生一個錯誤,Nack 將被髮送。
//
// handlerName 必須唯一。目前,它僅用於調試。
//
// subscribeTopic 是一個 handler 將從其中接收消息的 topic。
//
// subscriber 是將從其消費消息的 Subscriber。
func (r *Router) AddNoPublisherHandler(
    handlerName string,
    subscribeTopic string,
    subscriber Subscriber,
    handlerFunc NoPublishHandlerFunc,
) {
// ...

Ack

默認情況下,當 handfunc 沒有返回錯誤時,會調用 msg.Ack()。如果返回一個錯誤,msg.Nack() 將被調用。因此,您不必在處理消息後調用 msg.Ack() 或 msg.Nack() (當然,如果您願意,也可以這樣做)。

Producing messages

從處理程序返回多條消息時,請注意,大多數 Publisher 實現都不支持消息的原子發佈。如果代理或存儲不可用,它可能最終僅產生一些消息併發送 msg.Nack()。

如果這是一個問題,考慮使用每個處理程序只發布一條消息。

Running the Router

要運行 Router,你需要調用 run()。

// ...
// Run 運行所有插件和處理程序,並開始訂閱所提供的 topic。
// 當 router 正在運行時,此調用被阻塞。
//
// 當所有處理程序都停止時(例如,因為訂閱已關閉),router 也將停止。
//
// 要停止 Run(),你應該在路由器上調用 Close()。
//
// ctx 將傳播給所有訂閱者。
//
// 當所有處理程序都停止時(例如:因為關閉連接),Run() 也將停止。
func (r *Router) Run(ctx context.Context) (err error) {
// ...

Ensuring that the Router is running

知道 router 是否在運行是很有用的。對此,您可以使用 Running() 方法。

// ...
// Running is closed when router is running.
// In other words: you can wait till router is running using
//        fmt.Println("Starting router")
//        go r.Run(ctx)
//        <- r.Running()
//        fmt.Println("Router is running")
func (r *Router) Running() chan struct{} {
// ...

Execution models

訂閱者可以一次使用一條消息,也可以並行使用多條消息。

  • 單消息流是最簡單的方法,這意味著在調用msg.Ack()之前,訂閱者不會收到任何新消息。
  • 只有某些訂閱者支持多個消息流。通過一次訂閱多個主題分區,多個消息可以同時被使用,即使是之前沒有被ack的消息(例如,Kafka訂閱器是這樣工作的)。路由器通過運行併發處理函數來處理這個模型,每個分區一個處理函數。

請參閱所選的 Pub/Sub 文檔以獲取受支持的執行模型。

Middleware

// ...
// HandlerMiddleware 允許我們編寫類似 HandlerFunc 的裝飾器。
// 它可以在處理程序之前執行某些事情(例如:修改已消費的消息)
// 或之後(修改產生的消息,對被消費的消息進行 ack/nack,處理錯誤,記錄日誌,等等)執行一些事情。
//
// 它可以通過 `AddMiddleware` 方法附加到路由器上。
//
// Example:
//        func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
//            return func(message *message.Message) ([]*message.Message, error) {
//                fmt.Println("executed before handler")
//                producedMessages, err := h(message)
//                fmt.Println("executed after handler")
//
//                return producedMessages, err
//            }
//        }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...

在 Middlewares 中可以找到標準中間件的完整列表。

Plugin

// ...
// RouterPlugin 是一個函數,在 Router 啟動時執行。
type RouterPlugin func(*Router) error
// ...

完整的標準插件列表可以在 message/router/plugin 中找到。

Context

handler 接收到的每條消息在上下文中都保存著一些有用的值:

// ...
// HandlerNameFromCtx 返回使用該消息的路由中的消息處理程序的名稱。
func HandlerNameFromCtx(ctx context.Context) string {
    return valFromCtx(ctx, handlerNameKey)
}
// PublisherNameFromCtx 返回在路由中發佈消息的消息發佈者類型的名稱。
// For example, for Kafka it will be `kafka.Publisher`.
func PublisherNameFromCtx(ctx context.Context) string {
    return valFromCtx(ctx, publisherNameKey)
}
// SubscriberNameFromCtx 返回在路由中訂閱該消息的消息訂閱者類型的名稱。
// For example, for Kafka it will be `kafka.Subscriber`.
func SubscriberNameFromCtx(ctx context.Context) string {
    return valFromCtx(ctx, subscriberNameKey)
}
// SubscribeTopicFromCtx 返回從路由接收到消息的主題。
func SubscribeTopicFromCtx(ctx context.Context) string {
    return valFromCtx(ctx, subscribeTopicKey)
}
// PublishTopicFromCtx 返回路由將向其發佈消息的主題。
func PublishTopicFromCtx(ctx context.Context) string {
    return valFromCtx(ctx, publishTopicKey)
}
// ...

Leave a Reply

Your email address will not be published. Required fields are marked *