SpringCloud Stream消息驅動
簡介
什麼是SpringCloudStream
- 官方定義 SpringCloud Stream 是一個構建消息驅動微服務對框架。
- 應用程序通過 inputs 或者 outputs 來 與Springcloud Stream 中 binder 對象交互 通過我們配置來 binding(綁定),而 SpringCloud Stream 的 binder 對象負責與消息中間件交互,所以我們只需要搞清楚如何與 Spring Coud Stream 交互就可以方便使用消息驅動的方式。
- 通過使用SpringIntegration 來連接消息代理中間件實現消息事件驅動。
- Spring Cloud Stream 為一些供應商的消息中間件產品提供來個性化的自動化配置實現,引用來發布-訂閱、消費組、分區的三個核心概念。
- ==目前值支持 RabbitMQ、Kafka==
- 官網 https://spring.io/projects/spring-cloud-stream#overview
Spring Cloud Stream 是用於構建與消息傳遞系統的高度可伸縮的事件驅動微服務架構,該框架提供來一個靈活的編程模型,它簡歷在已經建立和熟悉的Spring 熟語和最佳實踐上,包括支持持久化的發佈訂閱、消費組以及消息分區這三個核心概念
API:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/
中文指導手冊:https://m.wang1314.com/doc/webapp/topic/20971999.html
設計思想
標準MQ
為什麼用Cloud Stream
- 比方說我們用到了RabbitMQ 和 Kafka ,由於這兩個消息中間件的架構上的不同,像RabbitMQ 有 exchange,Kafka有 Topic 和 Partitions 分區,
-
這些中間件的差異性導致我們實際項目開發給我們造成一定困擾,我們如果用了兩個消息隊列的其中一種,後面業務需求,我們想往另一個消息隊列進行遷移,這時候無疑就是一個災難性的,一大堆東西都需要推到重新做,因為它跟我們系統耦合度很高,這時候SpringCloud Stream 給我們提供了一種解耦合的方式
stream憑什麼可以統一底層差異
Biinder
- INPUT 對應於消費者
- OUTPUT 對應於生產者
- 在沒有綁定器這個概念的情況下,我們springBoot 應用要直接與消息中間件進行交互的時候,由於消息中間件構造不同,他們的實現細節上會有較大的差異性,通過定義綁定器作為中間層,完美地實現了應用與消息中間件細節之間的隔離。通過嚮應用程序暴露統一的Channel 通道,使得應用程序不需要再考慮各種不同的消息中間件實現
- 通過綁定器Binder作為中間層,實現了應用程序與消息中間件的隔離
- 通過定義綁定器Binder 作為中間層,實現了應用程序與消息中間件之間細節的隔離。
- Stream 中的消息通信方式遵循了發佈訂閱 Topic主題進行廣播
Spring Cloud Stream標準流程套路
編碼API和常用註解
案例:
消息驅動生產者
- 新建模塊 cloud-stream-rabbitmq-provider8801
- pom
<dependencies>
<!--stream rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--監控-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--熱部署-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
- yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: #在此處配置要綁定的rabbitmq的服務信息
defaultRabbit: #表示定義的名稱,用於binding整合
type: rabbit #消息組件類型
environment: #設置rabbitmq的相關環境配置
spring:
rabbitmq:
host: ip端口號 #RabbitMQ在本機的用localhost,在服務器的用服務器的ip地址
port: 5672
username: guest
password: guest
bindings: #服務的整合處理
output: #這個名字是一個通道的名稱
destination: studyExchange #表示要使用的Exchange名稱定義
content-type: application/json #設置消息類型,本次為json
binder: defaultRabbit #設置要綁定的消息服務的具體設置(爆紅不影響使用,位置沒錯)
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 #設置心跳的時間間隔(默認是30S)
lease-expiration-duration-in-seconds: 5 #如果超過5S間隔就註銷節點 默認是90s
instance-id: send-8801.com #在信息列表時顯示主機名稱
prefer-ip-address: true #訪問的路徑變為IP地址
啟動類
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class, args);
}
}
- 新建業務類
新建service包 --- IMessageProvider接口 與實現類
public interface IMessageProvider {
public String send();
}
@EnableBinding(Source.class) //定義消息的推送管道(Source是spring的)
public class IMessageProviderImpl implements IMessageProvider {
@Resource
private MessageChannel output; //消息發送管道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build()); //MessageBuilder是spring的integration.support.MessageBuilder
System.out.println("*******serial: " + serial);
return null;
}
}
@RestController
public class SendMessageController {
@Resource
private IMessageProvider iMessageProvider;
@GetMapping("/sendMessage")
public String sendMessage(){
return iMessageProvider.send();
}
}
消費者
- 新建模塊cloud-stream-rabbitmq-consumer8802
- pom
<dependencies>
<!--stream rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--監控-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--熱部署-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
- yml
server:
port: 8802
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: #在此處配置要綁定的rabbitmq的服務信息
defaultRabbit: #表示定義的名稱,用於binding整合
type: rabbit #消息組件類型
environment: #設置rabbitmq的相關環境配置
spring:
rabbitmq:
host: ip端口 #RabbitMQ在本機的用localhost,在服務器的用服務器的ip地址
port: 5672
username: 用戶名
password: 密碼
bindings: #服務的整合處理
input: #這個名字是一個通道的名稱
destination: studyExchange #表示要使用的Exchange名稱定義
content-type: application/json #設置消息類型,本次為json,本文要設置為“text/plain”
binder: defaultRabbit #設置要綁定的消息服務的具體設置(爆紅不影響使用,位置沒錯)
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 #設置心跳的時間間隔(默認是30S)
lease-expiration-duration-in-seconds: 5 #如果超過5S間隔就註銷節點 默認是90s
instance-id: receive-8802.com #在信息列表時顯示主機名稱
prefer-ip-address: true #訪問的路徑變為IP地址
- 主啟動類
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class, args);
}
}
- 新建 Controller
@EnableBinding(Sink.class)
@Controller
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT) //監聽
public void input(Message<String> message){
System.out.println("消費者1號------>收到的消息:" + message.getPayload() + "\t port:" + serverPort);
}
}
啟動Eureka 消息驅動生產者,消費者
-
http://localhost:8801/sendMessage(8801發送消息)
- 8802接收到消息:
- 個人博客: http://blog.yanxiaolong.cn/.