Spring Cloud Stream
2021-02-05 18:16
标签:spec 模拟器 访问 gre 协议 qualifier intro png lis Spring Cloud Stream 在 Spring Cloud 体系内用于构建高度可扩展的基于事件驱动的微服务,其目的是为了简化消息在 Spring Cloud 应用程序中的开发。 是一款用于构建消息驱动的微服务应用程序的轻量级框架
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互 Spring Cloud Stream 的 binder 负责与消息中间件交互 Binder RabbitMQ Apache Kafka Kafka Streams Amazon Kinesis RocketMQ 1.RabbitMQ RabbitMQ是一个基于AMQP协议的高级消息中间件,它主要的技术特点是可用性,安全性,集群,多协议支持,可视化的客户端,活跃的社区 2.docker安装rabbitmq 使用的是带 web 管理插件的镜像 docker pull rabbitmq:management 运行 5672 应用访问端口 15672 控制台Web端口号 访问管理端了 http://宿主机IP:15672 eg: http://192.168.99.100:15672/
说明: 默认创建了一个 guest 用户,密码也是 guest 这里在启动的时候指定了用户名spring密码spring 在线的RabbitMQ模拟器 http://tryrabbitmq.com
3.示例 (1)service-provider 添加依赖 配置 启动类 使用 监听通道创建一个绑定 消费消息的类 控制器 消费消息的类 service-comsumer 依赖 配置 启动类 消费消息的类 使用 使用 监听通道创建一个绑定 启动rabbitmq 启动consul 启动service-provider 启动service-comsumer GET http://172.27.0.17:8010/rabbitmq?num=1 返回OK 控制台消息也输出了 说明: 如果出现 ERROR 13220 --- [ main] o.s.boot.SpringApplication : Application run failed org.springframework.beans.factory.BeanDefinitionStoreException: Failed to process import candidates for configuration class [com.xyz.provider.ProviderApplication]; nested exception is java.lang.IllegalStateException: Failed to introspect annotated methods on class org.springframework.cloud.stream.config.BinderFactoryConfiguration 及 Spring Cloud Stream 标签:spec 模拟器 访问 gre 协议 qualifier intro png lis 原文地址:https://www.cnblogs.com/baby123/p/12777378.html通过Spring Cloud Stream访问 RabbitMQ
RabbitMQ is the most widely deployed open source message broker
docker run --name rabbitmq -d -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=spring -e RABBITMQ_DEFAULT_PASS=spring rabbitmq:management
server.port=8010
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
spring.application.name=service-provider
spring.cloud.consul.host=192.168.99.100
spring.cloud.consul.port=8500
spring.cloud.consul.discovery.health-check-path=/actuator/health
spring.cloud.consul.discovery.service-name=${spring.application.name}
spring.cloud.consul.discovery.heartbeat.enabled=true
spring.cloud.consul.discovery.prefer-ip-address=true
spring.rabbitmq.host=192.168.99.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=spring
spring.rabbitmq.password=spring
spring.cloud.stream.bindings.finishedOrders.group=service-provider
@EnableBinding
注解,传入Barista告诉Spring
加载package com.xyz.provider;
import com.xyz.provider.integration.Barista;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableDiscoveryClient
@SpringBootApplication
@EnableBinding(Barista.class)
public class ProviderApplication {
public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class, args);
}
}
SubscribableChannel
和@Input
注解连接到newOrders,消息数据将被推送这里package com.xyz.provider.integration;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface Barista {
String NEW_ORDERS = "newOrders";
String FINISHED_ORDERS = "finishedOrders";
@Input
SubscribableChannel finishedOrders();
@Output
MessageChannel newOrders();
}
package com.xyz.provider.integration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class OrderListener {
@StreamListener(Barista.FINISHED_ORDERS)
public void listenFinishedOrders(Integer num) {
log.info("We‘ve finished an order [{}].", num);
}
}
package com.xyz.provider.controller;
import com.xyz.provider.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class demoController {
@Autowired
private OrderService orderService;
@RequestMapping("/rabbitmq")
public String rabbitmq(Integer num) {
log.info("msq num: ", num);
orderService.updateNum(num);
return "ok";
}
}
package com.xyz.provider.service;
import com.xyz.provider.integration.Barista;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Transactional
@Slf4j
public class OrderService{
@Autowired
private Barista barista;
public boolean updateNum(Integer num) {
num++;
System.out.println(num);
barista.newOrders().send(MessageBuilder.withPayload(num).build());
return true;
}
}
server.port=8015
spring.application.name=service-comsumer
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
spring.cloud.consul.host=192.168.99.100
spring.cloud.consul.port=8500
spring.cloud.consul.discovery.health-check-path=/actuator/health
spring.cloud.consul.discovery.service-name=${spring.application.name}
spring.cloud.consul.discovery.heartbeat.enabled=true
spring.rabbitmq.host=192.168.99.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=spring
spring.rabbitmq.password=spring
spring.cloud.stream.bindings.newOrders.group=service-comsumer
@EnableBinding
注解,传入Waiter告诉Spring
加载package com.xyz.comsumer;
import com.xyz.comsumer.integration.Waiter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableFeignClients
@SpringBootApplication
@EnableBinding(Waiter.class)
public class ComsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ComsumerApplication.class, args);
}
}
MessageBuilder
创建一个String
类型的消息MessageChannel
上的.send()
方法来发布消息package com.xyz.comsumer.integration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Component
@Slf4j
@Transactional
public class OrderListener {
@Autowired
@Qualifier(Waiter.FINISHED_ORDERS)
private MessageChannel finishedOrdersMessageChannel;
@StreamListener(Waiter.NEW_ORDERS)
public void processNewOrder(Integer num) {
num++;
log.info("Receive a new order",
num);
System.out.println(num);
finishedOrdersMessageChannel.send(MessageBuilder.withPayload(num).build());
}
}
package com.xyz.comsumer.integration;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface Waiter {
String NEW_ORDERS = "newOrders";
String FINISHED_ORDERS = "finishedOrders";
@Input(NEW_ORDERS)
SubscribableChannel newOrders();
@Output(FINISHED_ORDERS)
MessageChannel finishedOrders();
}
可以换下
下一篇:二.Java中的基本语法