SpringCloud(4) rabbitmq使用
2021-01-24 16:12
标签:server des cat term frame org rev bsp queue 一。rabbitmq基本知识 exchange: 交换器,接收生产者发送的消息并路由给对应的队列。三种常用的交换器类型:1.direct(发布订阅,完全匹配) 2。广播型 3.topic(主题,规则匹配) queue: 消息队列,用来保存消息直到发送给消费者。消息一直在队列中,知道消费者链接到队列将它取走 binding: 绑定。用于消息队列和交换器之间的关联,一个绑定就是基于路由键将两者连接起来的路由规则。 routingkey: 路由键。1.队列通过路由键和交换器绑定。2.消息带着路由键发送到交换器,交换器根据路由键发到匹配的队列 二。代码示例 2.1 使用amqp (1).pom依赖 (2).消费端 (3).生产者 2.2 使用stream 2.2.1 生产者 (1) pom依赖 (2)application.yml定义 (3)定义mq连接源 (4)发送类, @EnableBinding(StreamClient.class)声明 (5)接收返回的mq消息 2.2.2 消费者 (1) pom依赖 (2)application.yml定义 (3)连接源 (4)接收端。并范松返回mq消息 SpringCloud(4) rabbitmq使用 标签:server des cat term frame org rev bsp queue 原文地址:https://www.cnblogs.com/t96fxi/p/12865507.html
//1. 只指定队列@RabbitListener(queues = "myQueue")
//2.自动创建队列
//@RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue("myQueue"))
//3.自动创建, exchange和queue绑定
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange("myOrder"),
key = "fruit",
value = @Queue("fruitQueue")
))
public void process(String message) {
log.info("fruitProcess receive mq message: {}", message);
}
/**
* Producer(生产者): 将消息发送到Exchange
Exchange(交换器):将从生产者接收到的消息路由到Queue
Queue(队列):存放供消费者消费的消息
BindingKey(绑定键):建立Exchange与Queue之间的关系(个人看作是一种规则,也就是Exchange将什么样的消息路由到Queue)
RoutingKey(路由键):Producer发送消息与路由键给Exchange,Exchange将判断RoutingKey是否符合BindingKey,如何则将该消息路由到绑定的Queue
Consumer(消费者):从Queue中获取消息
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange("myOrder"),
key = "computer",
value = @Queue("computerQueue")
))
public void computerProcess(String message) {
log.info("computerProcess receive mq message: {}", message);
}
@SpringBootTest
public class MQSenderTest {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void send() {
amqpTemplate.convertAndSend("myQueue", "hello mq!!");
}
@Test
public void send3() {
amqpTemplate.convertAndSend("myOrder", "computer", "computerMsgsg");
}
}
spring:
cloud:
stream:
bindings:
#连接源定义的input和output
messagesSend:
#交换器名称
destination: msg-topic
backRec:
destination: back-topic
##消费者组名,这个组下只有一个queue,多台应用情况下只有一台能收到消息
group:
back-group
rabbitmq:
addresses: 127.0.0.1:5672
username: guest
password: guestimport org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface StreamClient {
public static final String INPUT = "messagesSend";
public static final String BACK = "backRec";
// 发送
@Output(INPUT)
MessageChannel output();
// 回调接收
@Input(BACK)
SubscribableChannel input();
}
@RestController
@EnableBinding(StreamClient.class)
public class SendMessageController {
@Autowired
StreamClient streamClient;
@RequestMapping("/sendMsg")
public void sendMsg() {
OrderDTO order = new OrderDTO();
order.setOrderId("123");
streamClient.output().send(MessageBuilder.withPayload(order).build());
}
}
@Component
@Slf4j
@EnableBinding(StreamClient.class)
public class StreamReciever {
// 接收对象
@StreamListener(StreamClient.BACK)
public void process(Object message) {
log.info("StreamReciever:{}", message);
}
}
spring:
cloud:
stream:
bindings:
messageRec:
#交换器exchange名称
destination: msg-topic
##消费者组名,这个组下只有一个queue,多台应用情况下只有一台能收到消息
group: msg-group
backSend:
destination: back-topic
#我没有配置mq信息,自动用的默认配置
rabbitmq:
addresses: 127.0.0.1:5672
username: guest
password: guestpublic interface StreamServer {
public static final String INPUT = "messageRec";
public static final String BACK = "backSend";
@Input(INPUT)
SubscribableChannel input();
@Output(BACK)
MessageChannel output();
}
@Component
@Slf4j
@EnableBinding(StreamServer.class)
public class StreamReciever {
// 接收对象
@StreamListener(StreamServer.INPUT)
@SendTo(StreamServer.BACK)
public String process(Object message) {
log.info("StreamReciever:{}", message);
// 返回的消息
return "Revieved..";
}
}
文章标题:SpringCloud(4) rabbitmq使用
文章链接:http://soscw.com/index.php/essay/46385.html