大數據

SpringCloud Stream消息驅動

SpringCloud Stream消息驅動

在這裡插入圖片描述

簡介

什麼是SpringCloudStream
  • 官方定義 SpringCloud Stream 是一個構建消息驅動微服務對框架。
  • 應用程序通過 inputs 或者 outputs 來 與Springcloud Stream 中 binder 對象交互 通過我們配置來 binding(綁定),而 SpringCloud Stream 的 binder 對象負責與消息中間件交互,所以我們只需要搞清楚如何與 Spring Coud Stream 交互就可以方便使用消息驅動的方式。
  • 通過使用SpringIntegration 來連接消息代理中間件實現消息事件驅動。
  • Spring Cloud Stream 為一些供應商的消息中間件產品提供來個性化的自動化配置實現,引用來發布-訂閱、消費組、分區的三個核心概念。
  • ==目前值支持 RabbitMQ、Kafka==
    在這裡插入圖片描述
  • 官網 https://spring.io/projects/spring-cloud-stream#overview

Spring Cloud Stream 是用於構建與消息傳遞系統的高度可伸縮的事件驅動微服務架構,該框架提供來一個靈活的編程模型,它簡歷在已經建立和熟悉的Spring 熟語和最佳實踐上,包括支持持久化的發佈訂閱、消費組以及消息分區這三個核心概念

在這裡插入圖片描述
API:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/

中文指導手冊:https://m.wang1314.com/doc/webapp/topic/20971999.html

設計思想

標準MQ

在這裡插入圖片描述
在這裡插入圖片描述
在這裡插入圖片描述

為什麼用Cloud Stream
  • 比方說我們用到了RabbitMQ 和 Kafka ,由於這兩個消息中間件的架構上的不同,像RabbitMQ 有 exchange,Kafka有 Topic 和 Partitions 分區,
  • 在這裡插入圖片描述
    這些中間件的差異性導致我們實際項目開發給我們造成一定困擾,我們如果用了兩個消息隊列的其中一種,後面業務需求,我們想往另一個消息隊列進行遷移,這時候無疑就是一個災難性的,一大堆東西都需要推到重新做,因為它跟我們系統耦合度很高,這時候SpringCloud Stream 給我們提供了一種解耦合的方式
stream憑什麼可以統一底層差異

Biinder

  • INPUT 對應於消費者
  • OUTPUT 對應於生產者
  • 在沒有綁定器這個概念的情況下,我們springBoot 應用要直接與消息中間件進行交互的時候,由於消息中間件構造不同,他們的實現細節上會有較大的差異性,通過定義綁定器作為中間層,完美地實現了應用與消息中間件細節之間的隔離。通過嚮應用程序暴露統一的Channel 通道,使得應用程序不需要再考慮各種不同的消息中間件實現
  • 通過綁定器Binder作為中間層,實現了應用程序與消息中間件的隔離

在這裡插入圖片描述
在這裡插入圖片描述

  • 通過定義綁定器Binder 作為中間層,實現了應用程序與消息中間件之間細節的隔離。
  • Stream 中的消息通信方式遵循了發佈訂閱 Topic主題進行廣播

Spring Cloud Stream標準流程套路
在這裡插入圖片描述
在這裡插入圖片描述

編碼API和常用註解

在這裡插入圖片描述

案例:
消息驅動生產者
  1. 新建模塊 cloud-stream-rabbitmq-provider8801
  2. pom
<dependencies>
    <!--stream rabbit -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <!--eureka client-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--監控-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--熱部署-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
  1. yml
server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: #在此處配置要綁定的rabbitmq的服務信息
        defaultRabbit: #表示定義的名稱,用於binding整合
          type: rabbit #消息組件類型
          environment: #設置rabbitmq的相關環境配置
            spring:
              rabbitmq:
                host: ip端口號  #RabbitMQ在本機的用localhost,在服務器的用服務器的ip地址
                port: 5672
                username: guest
                password: guest
      bindings: #服務的整合處理
        output: #這個名字是一個通道的名稱
          destination: studyExchange #表示要使用的Exchange名稱定義
          content-type: application/json #設置消息類型,本次為json
          binder: defaultRabbit #設置要綁定的消息服務的具體設置(爆紅不影響使用,位置沒錯)

eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 #設置心跳的時間間隔(默認是30S)
    lease-expiration-duration-in-seconds: 5 #如果超過5S間隔就註銷節點 默認是90s
    instance-id: send-8801.com #在信息列表時顯示主機名稱
    prefer-ip-address: true #訪問的路徑變為IP地址

啟動類

@SpringBootApplication
public class StreamMQMain8801 {

    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }

}
  1. 新建業務類

新建service包 --- IMessageProvider接口 與實現類

public interface IMessageProvider {
    public String send();
}
@EnableBinding(Source.class)    //定義消息的推送管道(Source是spring的)
public class IMessageProviderImpl implements IMessageProvider {

    @Resource
    private MessageChannel output;  //消息發送管道

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());     //MessageBuilder是spring的integration.support.MessageBuilder
        System.out.println("*******serial: " + serial);
        return null;
    }
}
@RestController
public class SendMessageController {

    @Resource
    private IMessageProvider iMessageProvider;

    @GetMapping("/sendMessage")
    public String sendMessage(){
        return iMessageProvider.send();
    }

}
消費者
  1. 新建模塊cloud-stream-rabbitmq-consumer8802
  2. pom
<dependencies>
    <!--stream rabbit -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <!--eureka client-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--監控-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--熱部署-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
  1. yml
server:
  port: 8802

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: #在此處配置要綁定的rabbitmq的服務信息
        defaultRabbit: #表示定義的名稱,用於binding整合
          type: rabbit #消息組件類型
          environment: #設置rabbitmq的相關環境配置
            spring:
              rabbitmq:
                host: ip端口  #RabbitMQ在本機的用localhost,在服務器的用服務器的ip地址
                port: 5672
                username: 用戶名
                password: 密碼
      bindings: #服務的整合處理
        input: #這個名字是一個通道的名稱
          destination: studyExchange #表示要使用的Exchange名稱定義
          content-type: application/json #設置消息類型,本次為json,本文要設置為“text/plain”
          binder: defaultRabbit #設置要綁定的消息服務的具體設置(爆紅不影響使用,位置沒錯)

eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 #設置心跳的時間間隔(默認是30S)
    lease-expiration-duration-in-seconds: 5 #如果超過5S間隔就註銷節點 默認是90s
    instance-id: receive-8802.com #在信息列表時顯示主機名稱
    prefer-ip-address: true #訪問的路徑變為IP地址
  1. 主啟動類
@SpringBootApplication
public class StreamMQMain8802 {

    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class, args);
    }

}
  1. 新建 Controller
@EnableBinding(Sink.class)
@Controller
public class ReceiveMessageListenerController {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT) //監聽
    public void input(Message<String> message){
        System.out.println("消費者1號------>收到的消息:" + message.getPayload() + "\t port:" + serverPort);
    }

}

啟動Eureka 消息驅動生產者,消費者

在這裡插入圖片描述

在這裡插入圖片描述

Leave a Reply

Your email address will not be published. Required fields are marked *