springcloud-消息驱动Stream01(十)

2021-04-27 14:30

阅读:422

标签:流程   type   resource   可伸缩   产品   input   client   订阅   就是   

消息驱动概述

  • 是什么
  1. 简单的说, 消息驱动屏蔽底层消息中间件的差异, 降低切换成本, 统一消息的编程模型.
  2. 官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架.
    • 应用程序inputs或outputs来与Spring Cloud Stream中binder对象交互, 通过我们配置来binding(绑定), 而Stream的binder对象负责与消息中间件交互. 所以我们只需要搞清楚如何与Stream交互就可以方便使用消息驱动的方式.
    • 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动.
    • Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现, 引用了发布-订阅, 消费组, 分区的三个核心概念, 但目前仅支持RabbitMQ和Kafka
  3. Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架, 该框架提供了一个灵活的编程模型.
  4. 中文指导手册: https://m.wang1314.com/doc/webapp/topic/20971999.html
  •  设计思想
  1. 标准的MQ
    • 生产者/消费者之间靠消息媒介传递信息内容 -> Message
    • 消息必须走特定的通道 -> MessageChannel
    • 消息通道里的消息如何被消费, 谁负责收发处理? -> 消息通道MessageChannel的子接口SubscribableChannel, 由MessageHandler消息处理器订阅.
    • 技术图片
  2. 为什么用Stream
    • 中间件的差异问题
      • 比如说RabbitMQ有交换机(exchange), 而Kafka有Topic和Partitions分区.
      • 中间件的差异会造成一定困扰, 我们如果用了两个消息队列的其中一种, 如果后面需要网另一个消息队列迁移, 一大堆东西需要推倒重做, 因为它跟我们的系统耦合了, 而Stream给我们提供了解耦的方式.
    • Stream如何统一底层差异
      • 通过定义绑定器(binder)作为中间层, 完美地实现了应用程序与消息中间件细节之间的隔离.
      • 通过向应用程序暴露统一的Channel通道, 使得应用程序不需要再考虑各种不同的消息中间件实现.
      • Stream对消息中间件的进一步封装, 可以做到代码层面对中间件的无感知, 甚至于动态的切换中间件, 使得微服务开发的高度解耦, 服务可以关注更多自己的业务流程.
      • 技术图片
    • Binder
      • INPUT对应消费者
      • OUTPUT对应生产者
      • 技术图片
  3. Stream中的消息通信方式遵循了发布-订阅模式
    • Topic主题进行广播
  • Stream的标准流程

技术图片

    • Binder: 很方便的连接中间件, 屏蔽差异
    • Channel: 通道, 是队列Queue的一种抽象, 在消息通讯系统中就是实现存储与转发的媒介, 通过Channel对队列进行配置.
    • Source和Sink: 简单的可理解为参照对象是Stream自身, 从Stream发布消息就是输出, 接受消息就是输入.
  • 编码API和常用注解

技术图片

消息驱动之生产者

  1. 新建Module: cloud-stream-rabbitmq-provider8801, 作为生产者进行发消息模块.
  2. pom中新加
    dependency>
            groupId>org.springframework.cloudgroupId>
            artifactId>spring-cloud-starter-stream-rabbitartifactId>
        dependency>
  3. yml
    server:
      port: 8801
    
    spring:
      application:
        name: cloud-stream-provider
      cloud:
        stream:
          binders: # 在此处配置要绑定的rabbitmq的服务信息;
            defaultRabbit: # 表示定义的名称,用于于binding整合
              type: rabbit # 消息组件类型
              environment: # 设置rabbitmq的相关的环境配置
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 服务的整合处理
            output: # 这个名字是一个通道的名称
              destination: studyExchange # 表示要使用的Exchange名称定义
              content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
              binder: defaultRabbit  # 设置要绑定的消息服务的具体设置
    
    eureka:
      client: # 客户端进行Eureka注册的配置
        service-url:
          defaultZone: http://localhost:7001/eureka
      instance:
        lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
        lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
        instance-id: send-8801.com  # 在信息列表时显示主机名称
        prefer-ip-address: true     # 访问的路径变为IP地址
  4. 主启动类
    @SpringBootApplication
    public class StreamMQMain8801 {
        public static void main(String[] args) {
            SpringApplication.run(StreamMQMain8801.class, args);
        }
    }
  5. 业务类
    • 发送消息接口
      public interface IMessageProvider {
      
          String send();
      }
    • 发送消息接口实现类
      @EnableBinding(Source.class) //定义消息的推送管道.
      public class IMessageProviderImpl implements IMessageProvider {
      
          @Resource
          private MessageChannel output; //消息发送管道
      
          @Override
          public String send() {
              String serial = UUID.randomUUID().toString();
              output.send(MessageBuilder.withPayload(serial).build());
              System.out.println("***********serial: " + serial);
              return null;
          }
      }
    • controller
      @RestController
      public class SendMessageController {
      
          @Resource
          private IMessageProvider messageProvider;
      
          @GetMapping("/sendMessage")
          public String sendMessage() {
              return messageProvider.send();
          }
      }
  6. 测试
    • 启动7001, 7002, rabbitmq
    • 启动8001后: http://localhost:8801/sendMessage

消息驱动之消费者

  1. 新建Module: cloud-stream-rabbitmq-provider8801
  2. 新加pom
    dependency>
         groupId>org.springframework.cloudgroupId>
         artifactId>spring-cloud-starter-stream-rabbitartifactId>
    dependency>
  3. yml
    server:
      port: 8802
    
    spring:
      application:
        name: cloud-stream-consumer
      cloud:
        stream:
          binders: # 在此处配置要绑定的rabbitmq的服务信息;
            defaultRabbit: # 表示定义的名称,用于于binding整合
              type: rabbit # 消息组件类型
              environment: # 设置rabbitmq的相关的环境配置
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 服务的整合处理
            input: # 这个名字是一个通道的名称
              destination: studyExchange # 表示要使用的Exchange名称定义
              content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
              binder: defaultRabbit  # 设置要绑定的消息服务的具体设置
    
    eureka:
      client: # 客户端进行Eureka注册的配置
        service-url:
          defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/
      instance:
        lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
        lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
        instance-id: receive-8802.com  # 在信息列表时显示主机名称
        prefer-ip-address: true     # 访问的路径变为IP地址
  4. 主启动类
    @SpringBootApplication
    public class StreamMQMain8802 {
    
        public static void main(String[] args) {
            SpringApplication.run(StreamMQMain8802.class, args);
        }
    }
  5. 业务类
    @Component
    @EnableBinding(Sink.class)
    public class ReceiveMessageListenerController {
    
        @Value("${server.port}")
        private String serverPort;
    
        @StreamListener(Sink.INPUT)
        public void input(Message message) {
            System.out.println("消费者1号, ----> 接收的消息: " + message.getPayload() + "\t port" + serverPort);
        }
    }
  6. 测试: 8801发送消息, 8802接收消息
    • http://localhost:8801/sendMessage
    • 在8802的控制台会打出接收的消息.

springcloud-消息驱动Stream01(十)

标签:流程   type   resource   可伸缩   产品   input   client   订阅   就是   

原文地址:https://www.cnblogs.com/binwenhome/p/13244211.html


评论


亲,登录后才可以留言!