Spring Cloud 系列之 Stream 消息驱动(二)
2021-02-01 10:16
标签:接收 public java 场景 虚拟主机 sage 索引 default string 本篇文章为系列文章,未读第一集的同学请猛戳这里:Spring Cloud 系列之 Stream 消息驱动(一) 本篇文章讲解 Stream 如何实现消息分组和消息分区。 点击链接观看:Stream 消息分组视频(获取更多请关注公众号「哈喽沃德先生」) 如果有多个消息消费者,那么消息生产者发送的消息会被多个消费者都接收到,这种情况在某些实际场景下是有很大问题的,比如在如下场景中,订单系统做集群部署,都会从 RabbitMQ 中获取订单信息,如果一个订单消息同时被两个服务消费,系统肯定会出现问题。为了避免这种情况,Stream 提供了消息分组来解决该问题。 在 Stream 中处于同一个 在 项目代码使用入门案例中消息消费者的代码。 单元测试代码如下: 运行单元测试发送消息,两个消息消费者控制台打印结果如下: stream-consumer 的控制台: stream-consumer02 的控制台: 通过结果可以看到消息被两个消费者同时消费了,原因是因为它们属于不同的分组,默认情况下分组名称是随机生成的,通过 RabbitMQ 也可以得知: stream-consumer 的分组配置为: stream-consumer02 的分组配置为: 运行单元测试发送消息,此时多个消息消费者只有其中一个可以消费。RabbitMQ 结果如下: 点击链接观看:Stream 消息分区视频(获取更多请关注公众号「哈喽沃德先生」) 通过消息分组可以解决消息被重复消费的问题,但在某些场景下分组还不能满足我们的需求。比如,同时有多条同一个用户的数据发送过来,我们需要根据用户统计,但是消息被分散到了不同的集群节点上了,这时我们就可以考虑使用消息分区了。 当生产者将消息发送给多个消费者时,保证同一消息始终由同一个消费者实例接收和处理。消息分区是对消息分组的一种补充。 先给大家演示一下消息未分区的效果,单元测试代码如下: 运行单元测试发送消息,两个消息消费者控制台打印结果如下: stream-consumer 的控制台: stream-consumer02 的控制台: 假设这 10 条消息都来自同一个用户,正确的方式应该都由一个消费者消费所有消息,否则系统肯定会出现问题。为了避免这种情况,Stream 提供了消息分区来解决该问题。 消息生产者配置分区键的表达式规则和消息分区的数量。 通过 该表达式作用于传递给 源码 MessageChannel.java 源码 GenericMessage.java 如果 如果 消息消费者配置消费者总数和当前消费者的索引并开启分区支持。 stream-consumer 的 application.yml stream-consumer02 的 application.yml 运行单元测试发送消息,此时多个消息消费者只有其中一个可以消费所有消息。RabbitMQ 结果如下: 至此 Stream 消息驱动所有的知识点就讲解结束了。 本文采用 大家可以通过 ?? 您的 ?? 扫码关注 Spring Cloud 系列之 Stream 消息驱动(二) 标签:接收 public java 场景 虚拟主机 sage 索引 default string 原文地址:https://www.cnblogs.com/mrhelloworld/p/stream2.html消息分组
group
中的多个消费者是竞争关系,能够保证消息只会被其中一个应用消费。不同的组是可以消费的,同一个组会发生竞争关系,只有其中一个可以消费。通过 spring.cloud.stream.bindings.
属性指定组名。问题演示
stream-demo
项目下创建 stream-consumer02
子项目。package com.example;
import com.example.producer.MessageProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest(classes = {StreamProducerApplication.class})
public class MessageProducerTest {
@Autowired
private MessageProducer messageProducer;
@Test
public void testSend() {
messageProducer.send("hello spring cloud stream");
}
}
测试
message = hello spring cloud stream
message = hello spring cloud stream
配置分组
group-A
。server:
port: 8002 # 端口
spring:
application:
name: stream-consumer # 应用名称
rabbitmq:
host: 192.168.10.101 # 服务器 IP
port: 5672 # 服务器端口
username: guest # 用户名
password: guest # 密码
virtual-host: / # 虚拟主机地址
cloud:
stream:
bindings:
# 消息接收通道
# 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同
input:
destination: stream.message # 绑定的交换机名称
group: group-A
group-A
。server:
port: 8003 # 端口
spring:
application:
name: stream-consumer # 应用名称
rabbitmq:
host: 192.168.10.101 # 服务器 IP
port: 5672 # 服务器端口
username: guest # 用户名
password: guest # 密码
virtual-host: / # 虚拟主机地址
cloud:
stream:
bindings:
# 消息接收通道
# 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同
input:
destination: stream.message # 绑定的交换机名称
group: group-A
测试
消息分区
问题演示
package com.example;
import com.example.producer.MessageProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest(classes = {StreamProducerApplication.class})
public class MessageProducerTest {
@Autowired
private MessageProducer messageProducer;
@Test
public void testSend() {
for (int i = 1; i
测试
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
配置分区
server:
port: 8001 # 端口
spring:
application:
name: stream-producer # 应用名称
rabbitmq:
host: 192.168.10.101 # 服务器 IP
port: 5672 # 服务器端口
username: guest # 用户名
password: guest # 密码
virtual-host: / # 虚拟主机地址
cloud:
stream:
bindings:
# 消息发送通道
# 与 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 注解的 value 相同
output:
destination: stream.message # 绑定的交换机名称
producer:
partition-key-expression: payload # 配置分区键的表达式规则
partition-count: 2 # 配置消息分区的数量
partition-key-expression
参数指定分区键的表达式规则,用于区分每个消息被发送至对应分区的输出 channel
。MessageChannel
的 send
方法的参数,该参数实现 org.springframework.messaging.Message
接口的 GenericMessage
类。package org.springframework.messaging;
@FunctionalInterface
public interface MessageChannel {
long INDEFINITE_TIMEOUT = -1L;
default boolean send(Message> message) {
return this.send(message, -1L);
}
boolean send(Message> var1, long var2);
}
package org.springframework.messaging.support;
import java.io.Serializable;
import java.util.Map;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
public class GenericMessage
partition-key-expression
的值是 payload
,将会使用所有放在 GenericMessage
中的数据作为分区数据。payload
是消息的实体类型,可以为自定义类型比如 User
,Role
等等。partition-key-expression
的值是 headers["xxx"]
,将由 MessageBuilder
类的 setHeader()
方法完成赋值,比如:package com.example.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* 消息生产者
*/
@Component
@EnableBinding(Source.class)
public class MessageProducer {
@Autowired
private Source source;
/**
* 发送消息
*
* @param message
*/
public void send(String message) {
source.output().send(MessageBuilder.withPayload(message).setHeader("xxx", 0).build());
}
}
server:
port: 8002 # 端口
spring:
application:
name: stream-consumer # 应用名称
rabbitmq:
host: 192.168.10.101 # 服务器 IP
port: 5672 # 服务器端口
username: guest # 用户名
password: guest # 密码
virtual-host: / # 虚拟主机地址
cloud:
stream:
instance-count: 2 # 消费者总数
instance-index: 0 # 当前消费者的索引
bindings:
# 消息接收通道
# 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同
input:
destination: stream.message # 绑定的交换机名称
group: group-A
consumer:
partitioned: true # 开启分区支持
server:
port: 8003 # 端口
spring:
application:
name: stream-consumer # 应用名称
rabbitmq:
host: 192.168.10.101 # 服务器 IP
port: 5672 # 服务器端口
username: guest # 用户名
password: guest # 密码
virtual-host: / # 虚拟主机地址
cloud:
stream:
instance-count: 2 # 消费者总数
instance-index: 1 # 当前消费者的索引
bindings:
# 消息接收通道
# 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同
input:
destination: stream.message # 绑定的交换机名称
group: group-A
consumer:
partitioned: true # 开启分区支持
测试
知识共享「署名-非商业性使用-禁止演绎 4.0 国际」许可协议
。分类
查看更多关于 Spring Cloud
的文章。点赞
和转发
是对我最大的支持。哈喽沃德先生
「文档 + 视频」每篇文章都配有专门视频讲解,学习更轻松噢 ~
上一篇:Java并发编程-JMM内存模型与volatile关键字
下一篇:贪心算法
文章标题:Spring Cloud 系列之 Stream 消息驱动(二)
文章链接:http://soscw.com/essay/49443.html