三、入门案例、MQ标准、API详解

2021-01-08 13:28

阅读:639

标签:生产者和消费者   ges   print   pen   形式   item   增加   The   div   

一、pom.xml导入依赖

dependencies>
  
  dependency>
    groupId>org.apache.activemqgroupId>
    artifactId>activemq-allartifactId>
    version>5.15.9version>
  dependency>
  
  dependency>
    groupId>org.apache.xbeangroupId>
    artifactId>xbean-springartifactId>
    version>3.16version>
  dependency>
dependencies>

二、JMS编码总体规范

技术图片

  JMS开发的基本步骤:

  1. 创建一个connection factory。

  2. 通过connection factory 来创建JMS connection。

  3. 启动JMS connection。

  4. 通过connection创建JMS session。

  5. 创建JMS destination。

  6. 创建JMS producer 或者创建JMS message 并设置destination。

  7. 创建JMS consumer 或者是注册一个 JMS message listener。

  8. 发送或者接受JMS message(s)。

  9. 关闭所有的JMS资源(connection、session、producer 、consumer )。

三、Destination简介

   Destination是目的地,分为两种:队列和主题。下图介绍:

技术图片

四、队列消息生产者的入门案例

public class JmsProduce {
    //  linux 上部署的activemq 的 IP 地址 + activemq 的端口号
    public static final String ACTIVEMQ_URL = "tcp://192.168.126.133:61616";
    // 目的地的名称
    public static final String QUEUE_NAME = "jdy01";

    public static void main(String[] args) throws  Exception{
        // 1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码。
        //     该类的其他构造方法可以指定用户名和密码。
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通过连接工厂,获得连接 connection 并启动访问。
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3 创建会话session 。第一参数是是否开启事务, 第二参数是消息签收的方式
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 4 创建目的地(两种 :队列/主题)。Destination是Queue和Topic的父类
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5 创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        // 6 通过messageProducer 生产 3 条 消息发送到消息队列中
        for (int i = 1; i ) {
            // 7  创建消息
            TextMessage textMessage = session.createTextMessage("msg--" + i);
            // 8  通过messageProducer发送给mq
            messageProducer.send(textMessage);
        }
        // 9 关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("**** 消息发送到MQ完成 ****");
    }
}

五、ActiveMQ控制台之队列

  运行上面代码,控制台显示如下:

技术图片

  • Number Of Pending Messages:等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。

  • Number Of Consumers:消费者数量,消费者端的消费者数量。

  • Messages Enqueued:进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。

  • Messages Dequeued:出队消息数,可以理解为是消费者消费掉的数量。

  总结:当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。

六、队列消息消费者的入门案例

// 消息的消费者
public class JmsConsumer {

    public static final String ACTIVEMQ_URL = "tcp://192.168.126.133:61616";
    public static final String QUEUE_NAME = "jdy01";

    public static void main(String[] args) throws Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5 创建消息的消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        while(true){
            // reveive()一直等待接收消息,在能够接收到消息之前将一直阻塞。 是同步阻塞方式 。和socket的accept方法类似的。
       // reveive(Long time):等待n毫秒之后还没有收到消息,就是结束阻塞。
            // 因为消息发送者是 TextMessage,所以消息接受者也要是TextMessage
            TextMessage message = (TextMessage)messageConsumer.receive(); 
            if (null != message){
                System.out.println("****消费者的消息:"+message.getText());
            }else {
                break;
            }
        }
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

  控制台显示:

技术图片

 

七、异步监听式消费者(MessageListener)

// 消息的消费者  也就是回答消息的系统
public class JmsConsumer {

    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";

    public static final String QUEUE_NAME = "jdbc01";

    public static void main(String[] args) throws Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);

        /* 通过监听的方式来消费消息,是异步非阻塞的方式消费消息。
           通过messageConsumer 的setMessageListener 注册一个监听器,当有消息发送来时,系统自动调用MessageListener 的 onMessage 方法处理消息
         */
        messageConsumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message)  {
            //  instanceof 判断是否A对象是否是B类的子类
                    if (null != message  && message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage)message;
                        try {
                            System.out.println("****消费者的消息:"+textMessage.getText());
                        }catch (JMSException e) {
                            e.printStackTrace();
                        }
                }
            }
        });
        // 让主线程不要结束。因为一旦主线程结束了,其他的线程(如此处的监听消息的线程)也都会被迫结束。
        // 实际开发中,我们的程序会一直运行,这句代码都会省略。
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

八、队列消息总结

8.1、两种消费方式

  • 同步阻塞方式(receive):订阅者或接收者调用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。

  • 异步非阻塞方式:订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。

8.2、队列的特点

  在点对点的消息传递中,目的地被称为队列。点对点消息传递域的特点如下:

  1. 每个消息只能有一个消费者,类似1对1的关系,好比个人快递自己领取自己的。

  2. 消息的生产者和消费者之间没有时间上的相关性,无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息,好比我们发送短息,发送者发送后不见得的接受者即收即看。

  3. 消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息。

8.3、消息消费情况

技术图片

  • 情况1:只启动消费者1。
    结果:消费者1会消费所有的数据。

  • 情况2:先启动消费者1,再启动消费者2。
    结果:消费者1消费所有的数据。消费者2不会消费到消息。

  • 情况3:生产者发布6条消息,在此之前已经启动了消费者1和消费者2。
    结果:消费者1和消费者2平摊了消息。各自消费3条消息。

  疑问:怎么去将消费者1和消费者2不平均分摊呢?而是按照各自的消费能力去消费。我觉得,现在activemq就是这样的机制。

九、Topic介绍、入门案例、控制台

9.1、topic介绍

  在发布订阅消息传递域中,目的地被称为主题(topic)。发布/订阅消息传递域的特点如下:

  1. 生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;

  2. 生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。

  3. 生产者生产时,topic不保存消息,它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。

  默认情况下如上所述,但是JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅。

技术图片

9.2、生产者案例

public class JmsProduce_topic {

    public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws  Exception{
             ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic(TOPIC_NAME);

        MessageProducer messageProducer = session.createProducer(topic);
        for (int i = 1; i ) {
            TextMessage textMessage = session.createTextMessage("topic_name--" + i);
            messageProducer.send(textMessage);
            MapMessage mapMessage = session.createMapMessage();
        }
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** TOPIC_NAME消息发送到MQ完成 ****");
    }
} 

9.3、消费者入门案例

public class JmsConsummer_topic {
    public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 4 创建目的地 (两种 : 队列/主题   这里用主题)
        Topic topic = session.createTopic(TOPIC_NAME);

        MessageConsumer messageConsumer = session.createConsumer(topic);
// MessageListener接口只有一个方法,可以使用lambda表达式
        messageConsumer.setMessageListener( (message) -> {
            if (null != message  && message instanceof TextMessage){
                     TextMessage textMessage = (TextMessage)message;
                    try {
                      System.out.println("****消费者text的消息:"+textMessage.getText());
                    }catch (JMSException e) {
                    }
                }
        });

        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

  存在多个消费者,每个消费者都能收到,自从自己启动后所有生产的消息。

9.4、ActiveMQ控制台

  topic有多个消费者时,消费消息的数量 ≈ 在线消费者数量*生产消息的数量。
  下图展示了:我们先启动了3个消费者,再启动一个生产者,并生产了3条消息。

技术图片

比较项目 Topic模式队列 Queue模式队列
工作模式 "订阅-发布"模式,如果当前没有订阅者,消息将会被丢弃,如果有多个订阅者,那么这些订阅者都还会收到消息。 "负载均衡"模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息也只会发送其中一个消费者,并且要求消费者ack信息。
有无状态 无状态 Queue数据默认会在mq服务商以文件形式保存,比如ActiveMQ 一般保存在$AMQ_HOME\data\kr-srore\data下面,也可以配置成DB存储,
传递完整性 如果没有订阅者消息将会被丢弃。 消息不会丢失
处理效率 由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异 由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会明显降低,当然不同消息协议的具体性能也是有差异的。

三、入门案例、MQ标准、API详解

标签:生产者和消费者   ges   print   pen   形式   item   增加   The   div   

原文地址:https://www.cnblogs.com/jdy1022/p/14238663.html


评论


亲,登录后才可以留言!