SpringCloud学习第八篇:Stream学习(Greenwich.SR1版本)
2020-12-13 03:17
标签:read alt ret mil pre tar return cat 方式 一、Stream简介 三、验证 项目启动顺序:spring-cloud-eureka-server -> spring-cloud-api-client -> spring-cloud-api-provider 这里乱码的应该是kafka的其他属性没有转换过来,这里我也没有处理这些。 SpringCloud学习第八篇:Stream学习(Greenwich.SR1版本) 标签:read alt ret mil pre tar return cat 方式 原文地址:https://www.cnblogs.com/yangk1996/p/11072712.html
@Data
public class User implements Serializable {
/**
* ID
*/
private Long id;
/**
* 用户名称
*/
private String name;
}
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @Description: 用户信息输出
* @date: 2019/6/23 13:05
*/
public interface UserMessage {
@Output("user-message")
MessageChannel output();
}
@Autowired
private UserMessage userMessage;
@PostMapping("/user/save/message/stream")
public boolean saveUserByRabbitMessage(@RequestBody User user){
MessageChannel messageChannel = userMessage.output();
return messageChannel.send(MessageBuilder.withPayload(user).build());
}
## Kafka 生产者配置
spring.kafka.BOOTSTRAP-SERVERS=192.168.100.129:9092
spring.kafka.consumer.group-id= yangk
spring.kafka.consumer.clientId=spring-cloud-api-client
## Spring Cloud Stream Binding 配置
### user-message 为输出管道名称 destination 指定 Topic
spring.cloud.stream.bindings.user-message.destination = springCloud
@EnableBinding(UserMessage.class)
public interface UserMessage {
@Input("user-message") //管道名称
SubscribableChannel input();
}
@Autowired
private UserMessage userMessage;
@Autowired
private UserService userService;
@Autowired
private ObjectMapper objectMapper;
@ServiceActivator(inputChannel = "user-message")
public void listen(String data) throws IOException {
System.out.println("ServiceActivator实现"+data);
saveUser(data);
}
@StreamListener("user-message")
public void onMessage(String data) throws IOException {
System.out.println(" @StreamListeners实现"+data);
saveUser(data);
}
private void saveUser(String data) throws IOException {
User user = objectMapper.readValue(data, User.class);
userService.saveUser(user);
}
@PostConstruct
public void init() {
SubscribableChannel subscribableChannel = userMessage.input();
subscribableChannel.subscribe(message -> {
System.out.println("SubscribableChannel实现"+message);
});
}
#Kafka配置
spring.kafka.BOOTSTRAP-SERVERS=192.168.100.129:9092
spring.kafka.consumer.group-id=yangk
spring.kafka.consumer.clientId=spring-cloud-api-client
## Spring Cloud Stream Binding 配置
### userMessage 为输入管道名称 destination 指定 Topic
spring.cloud.stream.bindings.user-message.destination = springCloud
//激活 Stream Binding到UserMessage
@EnableBinding(UserMessage.class)
文章标题:SpringCloud学习第八篇:Stream学习(Greenwich.SR1版本)
文章链接:http://soscw.com/essay/27365.html