開發與維運

Spring Cloud Alibaba 七天訓練營(六)分佈式消息(事件)驅動

文檔目錄

1. 簡介

事件驅動架構(Event-driven 架構,簡稱 EDA)是軟件設計領域內的一套程序設計模型。這套模型的意義是所有的操作通過事件的發送/接收來完成。舉個例子,比如一個訂單的創建在傳統軟件設計中服務端通過接口暴露創建訂單的動作,然後客戶端訪問創建訂單。在事件驅動設計裡,訂單的創建通過接收訂單事件來完成,這個過程中有事件發送者和事件接受者這兩個模塊,事件發送者的作用是發送訂單事件,事件接受者的作用的接收訂單事件。Spring Cloud Stream 是一套基於消息的事件驅動開發框架,它提供了一套全新的消息編程模型,此模型屏蔽了底層具體消息中間件的使用方式。開發者們使用這套模型可以完成基於消息的事件驅動應用開發。

2. 學習目標

  • 掌握 Spring 對消息的編程模型封裝
  • 掌握 RocketMQ 整合 Spring Cloud Stream 完成消息的發送和接收
  • 掌握 RocketMQ 整合 Spring Cloud Bus 完成遠程事件的發送和接收

3. 詳細內容

  • 概念理解:指導讀者理解 Spring 的消息編程模型
  • 消息發送/接收:實戰 Spring Cloud Steam RocketMQ Binder
  • 事件發送/接收: 實戰 Spring Cloud Bus RocketMQ

4. 理解 Spring 消息編程模型

首先我們來看這個場景,不同的消息中間件發送消息的代碼:

1.png

每個消息中間件都有自己的消息模型編程,他們的代碼編寫方式都不一致。同樣地,在消息的訂閱方面,也是不同的代碼。這個時候如果某天想把 Kafka 切換到 RocketMQ,必須得修改大量代碼。

Spring 生態裡有兩個消息相關的模塊和項目,分別是 spring-messaging 模塊和 Spring Integration 項目,它們對消息的編程模型進行了統一,不論是 Apache RocketMQ 的 Message,或者是 Apache Kafka 的 ProducerRecord,都被統一稱為 org.springframework.messaging.Message 接口。

Message 接口有兩個方法,分別是 getPayload 以及 getHeaders 用於獲取消息體以及消息頭。如圖所示,這也意味著一個消息 Message 由 Header 和 Payload 組成:

2.png

Payload 是一個泛型,意味是消息體可以放任意數據類型。Header 是一個 MessageHeaders 類型的消息頭。

有了消息之後,這個消息被髮送到哪裡呢?Spring 提供了消息通道 MessageChannel 的概念。消息可以被髮送到消息通道里,然後再通過消息處理器 MessageHandler 去處理消息通道里的消息:

3.png

消息處理這裡又會遇到一個問題。如果消息通道里只有 1 個消息,但是消息處理器有 N 個,這個時候要被哪個消息處理器處理呢?這裡又涉及一個消息分發器的問題。UnicastingDispatcher 表示單播的處理方式,消息會通過負載均衡被分發到某一個消息處理器上,BroadcastingDispatcher 表示廣播的方式,消息會被所有的消息處理器處理。

4.png

5. Spring Cloud Stream

Spring Cloud Stream 是一套基於消息的事件驅動開發框架。

Spring Cloud Stream 在 Spring Integration 項目的基礎上再進行了一些封裝,提出一些新的概念,讓開發者能夠更簡單地使用這套消息編程模型。如圖所示,這是三者之間的關係:

5.png

如下圖所示,這是 Spring Cloud Stream 的編程模型。通過 RabbitMQ Binder 構建 input Binding 用於讀取 RabbitMQ 上的消息,將 payload 內容轉成大寫再通過 Kafka Binder 構建的 output Binding 寫入到 Kafka 中。圖上中間的 [4 ]()行非常簡單的代碼就可以完成從 RabbitMQ 讀取消息再寫入到 Kafka 的動作。

6.png

以下代碼是使用 Spring Cloud Stream 以最簡單的方式完成消息的發送和接收:

@SpringBootApplication@EnableBinding({Source.class, Sink.class})  // ①
public class SCSApplication {
    public static void main(String[] args) {
        new SpringApplicationBuilder().sources(SCSApplication.class)
            .web(WebApplicationType.NONE).run(args);
    }
    @Autowired
    Source source;  // ②
    @Bean
    public CommandLineRunner runner() {
        return (args) -> {
            source.output().send(MessageBuilder.withPayload("custom payload").setHeader("k1", "v1").build());  // ③
        };
    }
    @StreamListener(Sink.INPUT)  // ④
    @SendTo(Source.OUTPUT)  // ⑤
    public String receive(String msg) {
        return msg.toUpperCase();
    }
}
  1. 使用 @EnableBinding 註解,註解裡面有兩個參數 Source 和 Sink,它們都是接口。Source 接口內部有個 MessageChannel 類型返回值的 output 方法,被 @Output 註解修飾表示這是一個 Output Binding;Sink 接口內部有個 SubscribableChannel 類型返回值的 intput 方法,被 @Input 註解修飾表示這是一個 Input Binding。@EnableBinding 註解會針對這兩個接口生成動態代理。
  2. 注入 @EnableBinding 註解對於 Source 接口生成的動態代理。
  3. 使用 @EnableBinding 註解對於 Source 接口生成的動態代理內部的 MessageChannel 發送一條消息。最終消息會被髮送到消息中間件對應的 topic 裡。
  4. @StreamListener 註解訂閱 @EnableBinding 註解對於 Sink 接口生成的動態代理內部的 SubscribableChannel 中的消息,這裡會訂閱到消息中間件對應的topic 和 group。
  5. 消息處理結果發送到@EnableBinding 註解對於 Source 接口生成的動態代理內部的 MessageChannel。最終消息會被髮送到消息中間件對應的topic 裡。

上述代碼需要配置信息:

spring.cloud.stream.bindings.input.destination=test-input
spring.cloud.stream.bindings.input.group=test-input-binder
spring.cloud.stream.bindings.input.binder=kafka

spring.cloud.stream.bindings.output.destination=test-output
spring.cloud.stream.bindings.output.binder=rocketmq

這裡的 Input Binding 對應的 topic 是 test-input,group 是 test-input-binder,對應的 MQ 是 Kafka,Output Binding 對應的 topic 是 test-output,對應的 MQ 是 RocketMQ。

所以這段代碼的意思是以 test-input-binder 這個 group 去 Kafka 上讀取 test-input 這個 topic 下的消息,把消息的內容轉換成大寫再發送給 RocketMQ 的 test-output topic 上。

當然,你也可以直接通過沙箱環境直接查看案例

Leave a Reply

Your email address will not be published. Required fields are marked *