微服务之路(十一)spring cloud stream
2021-05-12 14:29
标签:自动化 复制 exce windows @param selector factory 方式 cep 当客户端向服务端请求,服务端返回出现了异常,对于客户端1返回为NULL,而对于客户端2返回的是正常数据。而服务端并不知道返回给客户端们的数据对不对,只能通过用户反馈来证实返回的错误性,显然是不正确的。 Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。 官方地址:http://kafka.apache.org E:\JavaEE\kafka-2.5.0-src\kafka-2.5.0-src\bin\windows 1.下载并解压kafka压缩包。 2.下载并解压zookeeper压缩包,这里官方它的quickstart就是以zookeeper保证强一致性。zookeeper官方地址:https://zookeeper.apache.org/ 3.以windows为例,我们先到zookeeper的conf目录下,把zoo_sample.cfg文件复制一份重命名为zoo.cfg。现在目录如下所示: 然后打开cmd,进入bin目录,启动服务。 4.启动kafka。进入到kafka的window文件夹,执行启动命令。 5.创建kafka主题。再次打开一个cmd窗口,进入到windows文件夹 6.生产者发送消息/生产消息 然后输入要发送的消息: 7.消费者接收消息/消费消息 重新打开一个cmd,输入接收命令。 当我在生产者端输入消息,消费者端马上就接收到了消息。 如果消费命令后面加上--from beginning参数,那么他会接收到从开始就生产的消息。 那么被消费后的消息能否被其他消费者消费?我们再开一个cmd,利用新的消费者消费。答案是可以的。 1.从start.spring.io构建项目。 2.新建包raw.api,创建类KafkaProducerDemo,这里就是让生产者通过java api形式进行发送消息。 3.运行以上代码,然后你会发现,cmd窗口的消费者会接收消息。 那么接下来我们使用Spring整合的kafka。 Spring社区对data数据操作,有一个基本的模式,Template模式: XXXTemplate一定实现XXXOpeations 其中KafkaTemplate会被自动装配: 依赖 1.我们继续在上面的项目动刀子,我们先在application.properties文件转移之前demo类中配置。 2.然后编写一个controller类。 3.通过postman访问http://localhost:8080/message/send。 4.打开cmd消费端窗口,发现消息成功接收。 5.同样地,我们开始配置消费者,先去application.properties文件增加消费者配置。 6.因为消费者它是以监听的形式监听消息的,所以我们创建一个KafkaConsumerListener监听类。通过@KafkaListener来监听改主题的消息。 7.随后postman访问http://localhost:8080/message/send。控制台则会打印出: 加上本章中的stream,上一篇中的架构图又丰富了些东西。 其中 Source:来源,近义词:Producer,Publisher Sink:接收器,近义词:Consumer,Subcriber Processor:对于上流而言是Sink,对于下流而言是Source Reactive Streams 1.我们拷贝上面的spring cloud stream kafka项目,导入IDEA。 2.启动zookeeper,参考以上。 3.启动kafka,参考以上。 4.我们需要引入spring cloud stream依赖。 5.创建一个stream包,包下再创建producer包,创建一个类MessageProducerBean 消息大致分为两个部分,消息头和消息体。 改写一下我们之前写的controller,增加另一种方式的接口。 6.我们需要给引入依赖 之前没有加上spring cloud 版本,现在要加上: 7.这时我们再启动cmd中的consumer消费者。 8.注释掉之前配置的生产者序列化。 9.Postman分别以GET,POST方式访问http://localhost:8080/send/message,发现消费者正常收到消息。 拓展:如果想要多主题怎么办,那么我能不能仿造Source接口,搭建一个属于自己的管道呢?我们也在stream包下创建一个message包,message包下创建一个接口(仿Source)MyMessageSource. 然后我们仿造之前写的MessageProducerBean,再整一套自己的,也就是自定义消息发送源。 在application.properties文件增加一行属于自己的主题配置 这时去消费者监听类里面增加监听主题。 我们去cmd黑窗口,把刚刚主题为gupao的停掉,改成mygupao主题监听。 最后去controller类增加一个接口,用于发送消息至我们新创建的管道。 由于我之前杀死过8080端口,导致zookeeper进程被杀了(它也是8080端口),所以我将stream项目改成8081,重新启动了zookeeper,postman测试一下我们http://localhost:8081/message/sendToGupao。控制台信息如下: cmd窗口如下: 同样地,我们也可以创建一个消息消费Bean用于接收消息。 在stream包下继续创建一个consumer包,包下创建名为MessageConsumerBean的Bean。用来实现标准Sink监听, application.properties下也要增加对应的input主题项了。 我们复制一下上面的项目,准备为stream rabbitmq做准备。重命名为spring-cloud-stream-rabbitmq重新导入IDEA,里面pom文件的artifactId也要修改。清除掉有关kafka的代码,application.properties清除关于kafka的生产者,消费者配置。 现在项目结构如下: 修改依赖为 其中MessageConsumerBean MyMessagesSource MessageProducerBean MessageProducerController application.properties 1.当时用Future时,异步调用都可以使用get()方式强制执行吗? 解答:是的,get等待当前线程执行完毕,并且获取返回接口。 2.@KafkaListener和kafka consumer有啥区别? 解答:没有实质区别,主要是编程模式。 @KafkaListener采用注解驱动 kafka consumer API 采用接口编程。 3.消费者接收消息的地方在哪? 解答:订阅并且处理后就消失了。 4.生产环境配置多个生产者和消费者只需要定义不同的group就可以了吗? 解答:group是一种,要看是不是相同topic。 5.为了不丢失数据,消息队列的容错,和排错后的处理,如何实现的? 解答这个依赖于zookeeper。 6.异步接收除了打印还有什么办法处理消息吗? 解答:可以处理其他逻辑,比如存储数据库。 7.kafka适合什么场景下使用? 解答:高性能的Stream处理。 8.Kafka消息一直都在,内存占用会很多吧,消息量不停产生消息咋办? 解答:kafka还是会删除的,并不是一直存在。 9.怎么没看到broker配置? 解答:broker不需要设置,它是单独启动。 10.consumer为什么要分组? 解答:consumer需要定义不同逻辑分组,相同主题里面不同分组,便于管理。 11.@EnableBinding有什么用? 解答:@EnableBinding将Source、Sink以及Processor提升成相应的代理. 12.@Autowired Source source 这种写法是默认用官方的实现? 解答:是官方的实现。 13.这么多消息框架,各自有点是什么,怎么选取? 解答:RabbitMQ:AMQP,JMS规范 kafka:相对松散的消息队列协议 ActiveMQ:AMQP,JMS规范 14.如果中间件有问题怎么办,我们只管用,不用维护吗?现在遇到的很多问题不是使用,而是俄日胡,中间件一有问题,消息堵塞或者丢失,只有重启? 解答:消息中间件无法保证不丢消息,多数高一致性的消息背会还是有持久化的。 15.@EnableBinder,@EnableZuulProxy,@EnableDiscoverClient这些注解都是通过特定BeanPostProcessor实现的吗? 解答:不完全对,主要处理接口在@Import: 16.我对流式处理还是懵懵的,到底啥事流式处理,怎样才能称为流式处理,一般应用在什么场景? 解答:Stream处理简单的说,异步处理,消息是一种处理方式。 提交申请,机器生产,对于高密度提交任务,多数场景采用异步处理,Stream,Evnet-Driven。举例说明:审核流程,鉴别黄图。 17.如果是大量消息,怎么快速消费,用多线程吗? 解答:确实是使用多线程,不过不一定奏效,依赖于处理的具体内容,比如:一个线程使用了25%的CPU,四个线程就将cpu耗尽,因此,并发100个处理,实际上还是4个线程在处理。I/O密集型,CPU密集型。大多数是多线程,其实也单线程,流式非阻塞。 18.购物车的价格计算可以使用流式计算来处理吗?能说下思路吗?有没有什么高性能的方式推荐? 解答:当商品添加到购物车的时候,就可以开始计算了。 微服务之路(十一)spring cloud stream 标签:自动化 复制 exce windows @param selector factory 方式 cep 原文地址:https://www.cnblogs.com/jmy520/p/13138658.html前言
场景描述
Stream简介
主要议题
主体内容
一、Kafka
主要用途
执行脚本目录bin
同类产品比较
快速上手步骤
zkServer.cmd
kafka-server-start.bat ../../config/server.properties
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic gupao
kafka-console-producer.bat --broker-list localhost:9092 --topic gupao
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic gupao
使用Kafka标准API
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* @ClassName
* @Describe Kafka Producer Demo使用Kafka原始API
* @Author 66477
* @Date 2020/6/1417:15
* @Version 1.0
*/
public class KafkaProducerDemo {
public static void main(String[] args) throws Exception {
//初始化配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","localhost:9092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer",StringSerializer.class.getName());//注意引包
//创建Kafka Producer
KafkaProducer
二、Spring Kafka
官方文档
设计模式
Maven依赖
三、Spring Boot Kafka
Maven依赖
自动装配器
KafkaAutoConfiguration
@Bean
@ConditionalOnMissingBean({KafkaTemplate.class})
public KafkaTemplate, ?> kafkaTemplate(ProducerFactory
关闭Spring Security
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.WebSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Override
public void configure(WebSecurity web) throws Exception {
web.ignoring().antMatchers("/**");
}
}
创建生产者
#定义应用名称
spring.application.name=spring-cloud-stream-kafka
#配置端口
server.port=8080
#Spring Kafka配置信息
spring.kafka.bootstrap-servers=localhost:9092
#配置需要的kafka主题
kafka.topic = gupao
#生产者配置
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName
* @Describe Kafka生产者Controller
* @Author 66477
* @Date 2020/6/1418:07
* @Version 1.0
*/
@RestController
public class KafkaProducerController {
private final KafkaTemplate
创建消费者
#消费者配置
spring.kafka.consumer.group-id=gupao-1
spring.kafka.consumer.key-Derializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-Derializer=org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @ClassName
* @Describe Kafka消费者监听器
* @Author 66477
* @Date 2020/6/1418:25
* @Version 1.0
*/
@Component
public class KafkaConsumerListener {
@KafkaListener(topics ="${kafka.topic}" )
public void onMessage(String message){
System.out.println("Kafka消费者监听器接收到消息:"+message);
}
}
2020-06-14 18:32:30.970 INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-06-14 18:32:30.970 INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-06-14 18:32:30.971 INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1592130750970
2020-06-14 18:32:30.976 INFO 451856 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: i1-NXUmvQRyaT-E27LPozQ
Kafka消费者监听器接收到消息:hello world
四、Spring Cloud Stream
基本概念
代码示例
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* @ClassName
* @Describe TODO
* @Author 66477
* @Date 2020/6/1421:58
* @Version 1.0
*/
@Component
@EnableBinding(Source.class)
public class MessageProducerBean {
@Autowired
@Qualifier(Source.OUTPUT)
private MessageChannel messageChannel;
@Autowired
private Source source;
/**
* 发送消息
* @param message 消息内容
*/
public void send(String message){
//通过消息管道发送消息
source.output().send(MessageBuilder.withPayload(message).build());
}
}
import com.example.stream.producer.MessageProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName
* @Describe Kafka生产者Controller
* @Author 66477
* @Date 2020/6/1418:07
* @Version 1.0
*/
@RestController
public class KafkaProducerController {
private final KafkaTemplate
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic gupao
#生产者配置
#spring.kafka.producer.bootstrap-servers=localhost:9092
#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @ClassName
* @Describe TODO
* @Author 66477
* @Date 2020/6/1423:27
* @Version 1.0
*/
public interface MyMessagesSource {
/**
* 消息来源的管道名称
*/
String NAME="gupao";
@Output(NAME)
MessageChannel gupao();
}
import com.example.stream.message.MyMessagesSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* @ClassName
* @Describe TODO
* @Author 66477
* @Date 2020/6/1421:58
* @Version 1.0
*/
@Component
@EnableBinding({Source.class, MyMessagesSource.class})
public class MessageProducerBean {
@Autowired
@Qualifier(Source.OUTPUT)
private MessageChannel messageChannel;
@Autowired
private Source source;
@Autowired
@Qualifier(MyMessagesSource.NAME)//Bean名称
private MessageChannel gupaoMessageChannel;
@Autowired
private MyMessagesSource myMessagesSource;
/**
* 发送消息
* @param message 消息内容
*/
public void send(String message){
//通过消息管道发送消息
source.output().send(MessageBuilder.withPayload(message).build());
}
/**
* 发送消息
* @param message 消息内容
*/
public void sendToGupao(String message){
//通过消息管道发送消息
myMessagesSource.gupao().send(MessageBuilder.withPayload(message).build());
}
}
spring.cloud.stream.bindings.gupao.destination=mygupao
@KafkaListener(topics ="mygupao" )
public void onGupaoMessage(String message){
System.out.println("Kafka消费者监听器接收到主题mygupao消息:"+message);
}
E:\JavaEE\kafka\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic mygupao
/**
* 通过消息生产者Bean发送{@link com.example.stream.producer.MessageProducerBean}
* @param message
* @return
*/
@GetMapping("/message/sendToGupao")
public Boolean sendToGupao(@RequestParam String message){
messageProducerBean.sendToGupao(message);
return true;
}
2020-06-15 00:00:24.170 INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-06-15 00:00:24.170 INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-06-15 00:00:24.170 INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1592150424170
2020-06-15 00:00:24.174 INFO 171780 --- [ad | producer-3] org.apache.kafka.clients.Metadata : [Producer clientId=producer-3] Cluster ID: i1-NXUmvQRyaT-E27LPozQ
Kafka消费者监听器接收到主题mygupao消息:mygupaoaaa
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @ClassName
* @Describe 消息消费Bean
* @Author 66477
* @Date 2020/6/1520:25
* @Version 1.0
*/
@Component
@EnableBinding({Sink.class})
public class MessageConsumerBean {
@Autowired
@Qualifier(Sink.INPUT)//Bean名称
private SubscribableChannel subscribableChannel;
@Autowired
private Sink sink;
//那么订阅消息有多种方式
//方式一:通过SubscribableChannel订阅消息
//当字段注入完成后的回调
@PostConstruct
public void init(){
subscribableChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message> message) throws MessagingException {
System.out.println(message.getPayload());
}
});
}
//方式二:通过@ServiceActivator方式订阅消息
@ServiceActivator(inputChannel = Sink.INPUT)
public void onMessage(Object message){
System.out.println("onMessage :"+message);
}
//方式三:通过@StreamListener实现
@StreamListener(Sink.INPUT)
public void onMessage(String message){
System.out.println("StreamListener:"+message);
}
}
spring.cloud.stream.bindings.input.destination=${kafka.topic}
五、Spring Cloud Stream Kafka Binder(RabbitMQ)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @ClassName
* @Describe 消息消费Bean
* @Author 66477
* @Date 2020/6/1520:25
* @Version 1.0
*/
@Component
@EnableBinding({Sink.class})
public class MessageConsumerBean {
@Autowired
@Qualifier(Sink.INPUT)//Bean名称
private SubscribableChannel subscribableChannel;
@Autowired
private Sink sink;
//方式一:通过Subcribe订阅消息
//当字段注入完成后的回调
@PostConstruct
public void init(){
//实现异步回调
subscribableChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message> message) throws MessagingException {
System.out.println("subscribe:"+message.getPayload());
}
});
}
//方式二:通过@ServiceActivator
@ServiceActivator(inputChannel = Sink.INPUT)
public void onMessage(Object message){
System.out.println("onMessage :"+message);
}
//方式三:通过@StreamListener实现
@StreamListener(Sink.INPUT)
public void onMessage(String message){
System.out.println("StreamListener:"+message);
}
}
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @ClassName
* @Describe TODO
* @Author 66477
* @Date 2020/6/1423:27
* @Version 1.0
*/
public interface MyMessagesSource {
/**
* 消息来源的管道名称
*/
String NAME="gupao";
@Output(NAME)
MessageChannel gupao();
}
import com.example.rabbitmq.stream.message.MyMessagesSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* @ClassName
* @Describe TODO
* @Author 66477
* @Date 2020/6/1421:58
* @Version 1.0
*/
@Component
@EnableBinding({Source.class, MyMessagesSource.class})
public class MessageProducerBean {
@Autowired
@Qualifier(Source.OUTPUT)
private MessageChannel messageChannel;
@Autowired
private Source source;
@Autowired
@Qualifier(MyMessagesSource.NAME)//Bean名称
private MessageChannel gupaoMessageChannel;
@Autowired
private MyMessagesSource myMessagesSource;
/**
* 发送消息
* @param message 消息内容
*/
public void send(String message){
//通过消息管道发送消息
source.output().send(MessageBuilder.withPayload(message).build());
}
/**
* 发送消息
* @param message 消息内容
*/
public void sendToGupao(String message){
//通过消息管道发送消息
myMessagesSource.gupao().send(MessageBuilder.withPayload(message).build());
}
}
import com.example.rabbitmq.stream.producer.MessageProducerBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName
* @Describe Rabbitmq生产者Controller
* @Author 66477
* @Date 2020/6/1418:07
* @Version 1.0
*/
@RestController
class MessageProducerController {
private final MessageProducerBean messageProducerBean;
private final String topic;
MessageProducerController(MessageProducerBean messageProducerBean, @Value("${kafka.topic}") String topic) {
this.messageProducerBean = messageProducerBean;
this.topic = topic;
}
/**
* 通过消息生产者Bean发送{@link MessageProducerBean}
* @param message
* @return
*/
@GetMapping("/messageProducer/send")
public Boolean send(@RequestParam String message){
messageProducerBean.send(message);
return true;
}
/**
* 通过消息生产者Bean发送{@link MessageProducerBean}
* @param message
* @return
*/
@GetMapping("/message/sendToGupao")
public Boolean sendToGupao(@RequestParam String message){
messageProducerBean.sendToGupao(message);
return true;
}
}
#定义应用名称
spring.application.name=spring-cloud-stream-rabbitmq
#配置端口
server.port=8081
#Spring Kafka配置信息
spring.kafka.bootstrap-servers=localhost:9092
#配置需要的kafka主题
kafka.topic = gupao
#定义Spring Cloud Stream Source消息去向
#针对kafka而言,基本模式如下
#spring.cloud.stream.bindings.${channel-name}.destination=${kafka.topic}
spring.cloud.stream.bindings.output.destination=${kafka.topic}
spring.cloud.stream.bindings.gupao.destination=mygupao
spring.cloud.stream.bindings.input.destination=${kafka.topic}
六、问题总结
上一篇:CentOS 8配置Java环境
文章标题:微服务之路(十一)spring cloud stream
文章链接:http://soscw.com/index.php/essay/84724.html