Spring cloud stream【消息分组】

2020-12-13 04:07

阅读:628

标签:output   java   receiver   NPU   解决   scope   mes   信息   发送   

??上篇文章我们简单的介绍了stream的使用,发现使用还是蛮方便的,但是在上个案例中,如果有多个消息接收者,那么消息生产者发送的消息会被多个消费者都接收到,这种情况在某些实际场景下是有很大问题的,比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决了!

技术图片

Stream消息分组

??消息分组的作用我们已经介绍了。注意在Stream中处于同一个group中的多个消费者是竞争关系。就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。通过案例我们来演示看看,这里我们会创建3个服务,分别如下
|服务|介绍 |
|--|:--|
| stream-group-sender |消息发送者服务 |
| stream-group-receiverA|消息接收者服务 |
|stream-group-receiverB | 消息接收者服务 |

1.创建stream-group-sender 服务

1.1 创建项目

技术图片

1.2 pom文件

4.0.0org.springframework.boot
        spring-boot-starter-parent
        1.5.13.RELEASEcom.bobo
    stream-group-sender
    0.0.1-SNAPSHOTorg.springframework.cloud
                spring-cloud-dependencies
                Dalston.SR5pomimportorg.springframework.boot
            spring-boot-starter-web
        org.springframework.cloud
            spring-cloud-starter-eureka
        org.springframework.cloud
            spring-cloud-starter-stream-rabbit
        org.springframework.boot
            spring-boot-starter-test
            testorg.springframework.boot
                spring-boot-maven-plugin
            

1.3 配置文件

??配置中的“outputProduct”可以自定义,但是我们等会在消息接口中要使用到。

spring.application.name=stream-sender
server.port=9060
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

# 对应 MQ 是 exchange  outputProduct自定义的信息
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct

1.4 发送接口

/**
 * 发送消息的接口
 * @author dengp
 *
 */
public interface ISendeService {
    
    String OUTPUT="outputProduct";
    

    /**
     * 指定输出的交换器名称
     * @return
     */
    @Output(OUTPUT)
    SubscribableChannel send();
}

1.5 启动类

@SpringBootApplication
@EnableEurekaClient
// 绑定我们刚刚创建的发送消息的接口类型
@EnableBinding(value={ISendeService.class})
public class StreamSenderStart {

    public static void main(String[] args) {
        SpringApplication.run(StreamSenderStart.class, args);
    }
}

1.6 创建pojo

??在本案例中我们发送的消息是自定义的对象

package com.bobo.stream.pojo;

import java.io.Serializable;

public class Product implements Serializable{

    private Integer id;
    
    private String name;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Product(Integer id, String name) {
        super();
        this.id = id;
        this.name = name;
    }

    public Product() {
        super();
    }

    @Override
    public String toString() {
        return "Product [id=" + id + ", name=" + name + "]";
    }
    
}

2.创建stream-group-receiverA服务

2.1 创建项目

技术图片

2.2 pom文件

4.0.0org.springframework.boot
        spring-boot-starter-parent
        1.5.13.RELEASEcom.bobo
    stream-group-receiverA
    0.0.1-SNAPSHOTorg.springframework.cloud
                spring-cloud-dependencies
                Dalston.SR5pomimportorg.springframework.boot
            spring-boot-starter-web
        org.springframework.cloud
            spring-cloud-starter-eureka
        org.springframework.cloud
            spring-cloud-starter-stream-rabbit
        org.springframework.boot
                spring-boot-maven-plugin
            

2.3 配置文件

??配置文件中配置分组“groupProduct”

spring.application.name=stream-group-receiverA
server.port=9070
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

# 对应 MQ 是 exchange  和消息发送者的 交换器是同一个
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列  inputProduct 自定义
spring.cloud.stream.bindings.inputProduct.group=groupProduct

2.4 接收消息的接口

/**
 * 接收消息的接口
 * @author dengp
 *
 */
public interface IReceiverService {

    String INPUT = "inputProduct";
    /**
     * 指定接收的交换器名称
     * @return
     */
    @Input(INPUT)
    SubscribableChannel receiver();
}

2.5 消息的具体处理类

/**
 * 具体接收消息的处理类
 * @author dengp
 *
 */
@Service
@EnableBinding(IReceiverService.class)
public class ReceiverService {

    @StreamListener(IReceiverService.INPUT)
    public void onReceiver(Product p){
        System.out.println("消费者A:"+p);
    }
}

注意同样需要添加Product类

package com.bobo.stream.pojo;

import java.io.Serializable;

public class Product implements Serializable{

    private Integer id;
    
    private String name;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Product(Integer id, String name) {
        super();
        this.id = id;
        this.name = name;
    }

    public Product() {
        super();
    }

    @Override
    public String toString() {
        return "Product [id=" + id + ", name=" + name + "]";
    }
    
}

2.6 启动类

@SpringBootApplication
@EnableEurekaClient
@EnableBinding(value={IReceiverService.class})
public class StreamReceiverStart {

    public static void main(String[] args) {
        SpringApplication.run(StreamReceiverStart.class, args);
    }
}

3.创建stream-group-receiverB服务

??此服务和stream-group-receiverA一样,复制一份只需修改application.properties中的服务名称,端口。我们先将group设置不一样,我们测试来看看

spring.application.name=stream-group-receiverB
server.port=9071
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

# 对应 MQ 是 exchange  和消息发送者的 交换器是同一个
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列  inputProduct 自定义
spring.cloud.stream.bindings.inputProduct.group=groupProduct1

4.测试代码

@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest {
    
    @Autowired
    private ISendeService sendService;

    @Test
    public void testStream(){
        Product p = new Product(666, "stream test ...");
        // 将需要发送的消息封装为Message对象
        Message message = MessageBuilder
                                .withPayload(p)
                                .build();
        sendService.send().send(message );
    }
}

在stream-group-receiverA和stream-group-receiverB服务的group不一致的情况下

技术图片

技术图片

技术图片

改为同组的情况下

技术图片

技术图片

启动服务,发送数据

技术图片

技术图片

通过结果可以看到只有其中一个受到消息。避免了消息重复消费。

案例代码github:https://github.com/q279583842q/springcloud-e-book

技术图片

Spring cloud stream【消息分组】

标签:output   java   receiver   NPU   解决   scope   mes   信息   发送   

原文地址:https://www.cnblogs.com/dengpengbo/p/11104631.html


评论


亲,登录后才可以留言!