SpringCloud Stream消息驱动

2021-02-18 03:19

阅读:416

标签:user   定义   cal   pac   project   问题:   XML   password   void   

问题:

技术图片

 

 

 技术图片

 

 

 技术图片

 

 

 技术图片

 

 

 

 

技术图片

 

 

 

8801

POM:

xml version="1.0" encoding="UTF-8"?>
project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    parent>
        artifactId>cloud2020artifactId>
        groupId>com.atguigu.springcloudgroupId>
        version>1.0-SNAPSHOTversion>
    parent>
    modelVersion>4.0.0modelVersion>

    artifactId>cloud-stream-rabbitmq-provider8801artifactId>

    dependencies>
        dependency>
            groupId>org.springframework.bootgroupId>
            artifactId>spring-boot-starter-webartifactId>
        dependency>
        dependency>
            groupId>org.springframework.bootgroupId>
            artifactId>spring-boot-starter-actuatorartifactId>
        dependency>

        dependency>
            groupId>org.springframework.cloudgroupId>
            artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
        dependency>

        dependency>
            groupId>org.springframework.cloudgroupId>
            spring-cloud-starter-stream-rabbit
        dependency>

        dependency>
            groupId>org.projectlombokgroupId>
            artifactId>lombokartifactId>
            optional>trueoptional>
        dependency>
        dependency>
            groupId>org.springframework.bootgroupId>
            artifactId>spring-boot-starter-testartifactId>
            scope>testscope>
        dependency>
    dependencies>

project>

YML:

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8801.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

主启动类:

package com.atguigu.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
 * @author wsk
 * @date 2020/3/17 11:16
 */
@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class,args);
    }
}

技术图片

 

package com.atguigu.springcloud.service.impl;
import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.UUID;
/**
 * @author wsk
 * @date 2020/3/17 11:20
 */
//这不是传统的service,这是和rabbitmq打交道的,不需要加注解@Service
//这里不掉dao,掉消息中间件的service
//信道channel和exchange绑定在一起
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {

    /**
     * 消息发送管道
     */
    @Resource
    private MessageChannel output;

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("serial = " + serial);
        return null;
    }
}

controller:

package com.atguigu.springcloud.controller;
import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
 * @author wsk
 * @date 2020/3/17 13:14
 */
@RestController
public class SendMessageController {

    @Resource
    private IMessageProvider messageProvider;

    @GetMapping("/sendMessage")
    public String sendMessage(){
        return messageProvider.send();
    }
}

技术图片

 

 

 

 8802

xml version="1.0" encoding="UTF-8"?>
project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    parent>
        artifactId>cloud2020artifactId>
        groupId>com.atguigu.springcloudgroupId>
        version>1.0-SNAPSHOTversion>
    parent>
    modelVersion>4.0.0modelVersion>

    artifactId>cloud-stream-rabbitmq-consumer8802artifactId>

    dependencies>
        dependency>
            groupId>org.springframework.bootgroupId>
            artifactId>spring-boot-starter-webartifactId>
        dependency>
        dependency>
            groupId>org.springframework.bootgroupId>
            artifactId>spring-boot-starter-actuatorartifactId>
        dependency>

        dependency>
            groupId>org.springframework.cloudgroupId>
            artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
        dependency>

        dependency>
            groupId>org.springframework.cloudgroupId>
            artifactId>spring-cloud-starter-stream-rabbitartifactId>
        dependency>

        dependency>
            groupId>org.projectlombokgroupId>
            artifactId>lombokartifactId>
            optional>trueoptional>
        dependency>
        dependency>
            groupId>org.springframework.bootgroupId>
            artifactId>spring-boot-starter-testartifactId>
            scope>testscope>
        dependency>
    dependencies>
project>

yml:

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitMQ的服务信息
        defaultRabbit: # 表示定义的名称,用于binding的整合
          type: rabbit # 消息中间件类型
          environment: # 设置rabbitMQ的相关环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
          #group: atguiguA
eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30
    lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90
    instance-id: receive-8802.com #主机名
    prefer-ip-address: true # 显示ip

主启动类:

package com.atguigu.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
 * @author wsk
 * @date 2020/3/17 14:15
 */
@SpringBootApplication
public class StreamMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class,args);
    }
}

 

package com.atguigu.springcloud.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author wsk
 * @date 2020/3/17 14:24
 */
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message message){
        System.out.println("消费者1号,------->接收到的消息: "+message.getPayload()+"\t port: "+serverPort);
    }
}

 

 

分组消费与持化

 

 技术图片

 

 

 

 

 

 

技术图片  技术图片

 

 

 

8802修改YML

技术图片

 

 

8803修改YML

技术图片

 

 结果

技术图片

 

 

解决:

技术图片

 

 

 

 

持久化 (分组后,有持久化性能)

技术图片

 

SpringCloud Stream消息驱动

标签:user   定义   cal   pac   project   问题:   XML   password   void   

原文地址:https://www.cnblogs.com/leeego-123/p/12694312.html


评论


亲,登录后才可以留言!