Configuration
// ... type RouterConfig struct { // CloseTimeout 確定路由在關閉時應為處理程序工作多長時間。 CloseTimeout time.Duration } func (c *RouterConfig) setDefaults() { if c.CloseTimeout == 0 { c.CloseTimeout = time.Second * 30 } } func (c RouterConfig) Validate() error { return nil } // ...
Handler
首先,您需要實現 HandlerFunc:
// ... // HandlerFunc 是在收到消息時調用的函數。 // // msg.Ack() 會在 HandlerFunc 沒有返回錯誤時自動調用。 // 當 HandlerFunc 返回錯誤時,msg.Nack() 被調用。 // 當 msg.Ack() 在 handler 中被調用並且 HandlerFunc 返回錯誤時, // msg.Nack() 將不會被髮送,因為 Ack 已經發送了。 // // HandlerFunc 在接收到多條消息時並行執行 // (因為 msg.Ack() 是在 HandlerFunc 中發送的,或者訂閱者支持多個消費者) type HandlerFunc func(msg *Message) ([]*Message, error) // ...
接下來,您必須使用 Router.AddHandler
添加新的處理程序:
// ... // AddHandler 添加一個新的處理程序。 // // handlerName 必須唯一。目前,它僅用於調試。 // // subscribeTopic 是一個處理程序將從其中接收消息的 topic。 // // publishTopic 是一個 router 將生成 handlerFunc 返回的消息的 topic。 // // 當處理程序需要發佈到多個主題時, // 建議僅向處理程序注入 Publisher 或實現中間件, // 該中間件將捕獲消息並基於例如元數據發佈到主題。 func (r *Router) AddHandler( handlerName string, subscribeTopic string, subscriber Subscriber, publishTopic string, publisher Publisher, handlerFunc HandlerFunc, ) *Handler { r.logger.Info("Adding handler", watermill.LogFields{ "handler_name": handlerName, "topic": subscribeTopic, }) if _, ok := r.handlers[handlerName]; ok { panic(DuplicateHandlerNameError{handlerName}) } publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber) newHandler := &handler{ name: handlerName, logger: r.logger, subscriber: subscriber, subscribeTopic: subscribeTopic, subscriberName: subscriberName, publisher: publisher, publishTopic: publishTopic, publisherName: publisherName, handlerFunc: handlerFunc, runningHandlersWg: r.runningHandlersWg, messagesCh: nil, closeCh: r.closeCh, } r.handlers[handlerName] = newHandler return &Handler{ router: r, handler: newHandler, } } // AddNoPublisherHandler 添加一個新的 handler。 // 該 handler 無法返回消息。 // 當消息返回時,它將發生一個錯誤,Nack 將被髮送。 // // handlerName 必須唯一。目前,它僅用於調試。 // // subscribeTopic 是一個 handler 將從其中接收消息的 topic。 // // subscriber 是將從其消費消息的 Subscriber。 func (r *Router) AddNoPublisherHandler( handlerName string, subscribeTopic string, subscriber Subscriber, handlerFunc NoPublishHandlerFunc, ) { // ...
參見入門的示例用法:
// ... handler := router.AddHandler( "struct_handler", // handler 名稱,必須是唯一的 "incoming_messages_topic", // 我們將從中讀取事件的 topic pubSub, "outgoing_messages_topic", // 我們將向其發佈事件的 topic pubSub, structHandler{}.Handler, ) // ...
No publisher handler
並非每個處理程序都會產生新消息。您可以使用 Router.AddNoPublisherHandler
添加此類處理程序:
// ... // AddNoPublisherHandler 添加一個新的 handler。 // 該 handler 無法返回消息。 // 當消息返回時,它將發生一個錯誤,Nack 將被髮送。 // // handlerName 必須唯一。目前,它僅用於調試。 // // subscribeTopic 是一個 handler 將從其中接收消息的 topic。 // // subscriber 是將從其消費消息的 Subscriber。 func (r *Router) AddNoPublisherHandler( handlerName string, subscribeTopic string, subscriber Subscriber, handlerFunc NoPublishHandlerFunc, ) { // ...
Ack
默認情況下,當 handfunc 沒有返回錯誤時,會調用 msg.Ack()。如果返回一個錯誤,msg.Nack() 將被調用。因此,您不必在處理消息後調用 msg.Ack() 或 msg.Nack() (當然,如果您願意,也可以這樣做)。
Producing messages
從處理程序返回多條消息時,請注意,大多數 Publisher 實現都不支持消息的原子發佈。如果代理或存儲不可用,它可能最終僅產生一些消息併發送 msg.Nack()。
如果這是一個問題,考慮使用每個處理程序只發布一條消息。
Running the Router
要運行 Router,你需要調用 run()。
// ... // Run 運行所有插件和處理程序,並開始訂閱所提供的 topic。 // 當 router 正在運行時,此調用被阻塞。 // // 當所有處理程序都停止時(例如,因為訂閱已關閉),router 也將停止。 // // 要停止 Run(),你應該在路由器上調用 Close()。 // // ctx 將傳播給所有訂閱者。 // // 當所有處理程序都停止時(例如:因為關閉連接),Run() 也將停止。 func (r *Router) Run(ctx context.Context) (err error) { // ...
Ensuring that the Router is running
知道 router 是否在運行是很有用的。對此,您可以使用 Running()
方法。
// ... // Running is closed when router is running. // In other words: you can wait till router is running using // fmt.Println("Starting router") // go r.Run(ctx) // <- r.Running() // fmt.Println("Router is running") func (r *Router) Running() chan struct{} { // ...
Execution models
訂閱者可以一次使用一條消息,也可以並行使用多條消息。
- 單消息流是最簡單的方法,這意味著在調用msg.Ack()之前,訂閱者不會收到任何新消息。
- 只有某些訂閱者支持多個消息流。通過一次訂閱多個主題分區,多個消息可以同時被使用,即使是之前沒有被ack的消息(例如,Kafka訂閱器是這樣工作的)。路由器通過運行併發處理函數來處理這個模型,每個分區一個處理函數。
請參閱所選的 Pub/Sub 文檔以獲取受支持的執行模型。
Middleware
// ... // HandlerMiddleware 允許我們編寫類似 HandlerFunc 的裝飾器。 // 它可以在處理程序之前執行某些事情(例如:修改已消費的消息) // 或之後(修改產生的消息,對被消費的消息進行 ack/nack,處理錯誤,記錄日誌,等等)執行一些事情。 // // 它可以通過 `AddMiddleware` 方法附加到路由器上。 // // Example: // func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc { // return func(message *message.Message) ([]*message.Message, error) { // fmt.Println("executed before handler") // producedMessages, err := h(message) // fmt.Println("executed after handler") // // return producedMessages, err // } // } type HandlerMiddleware func(h HandlerFunc) HandlerFunc // ...
在 Middlewares 中可以找到標準中間件的完整列表。
Plugin
// ... // RouterPlugin 是一個函數,在 Router 啟動時執行。 type RouterPlugin func(*Router) error // ...
完整的標準插件列表可以在 message/router/plugin 中找到。
Context
handler 接收到的每條消息在上下文中都保存著一些有用的值:
// ... // HandlerNameFromCtx 返回使用該消息的路由中的消息處理程序的名稱。 func HandlerNameFromCtx(ctx context.Context) string { return valFromCtx(ctx, handlerNameKey) } // PublisherNameFromCtx 返回在路由中發佈消息的消息發佈者類型的名稱。 // For example, for Kafka it will be `kafka.Publisher`. func PublisherNameFromCtx(ctx context.Context) string { return valFromCtx(ctx, publisherNameKey) } // SubscriberNameFromCtx 返回在路由中訂閱該消息的消息訂閱者類型的名稱。 // For example, for Kafka it will be `kafka.Subscriber`. func SubscriberNameFromCtx(ctx context.Context) string { return valFromCtx(ctx, subscriberNameKey) } // SubscribeTopicFromCtx 返回從路由接收到消息的主題。 func SubscribeTopicFromCtx(ctx context.Context) string { return valFromCtx(ctx, subscribeTopicKey) } // PublishTopicFromCtx 返回路由將向其發佈消息的主題。 func PublishTopicFromCtx(ctx context.Context) string { return valFromCtx(ctx, publishTopicKey) } // ...