jms及active(jdk api)的实现

2021-03-28 02:25

阅读:607

标签:ati   时间   vat   att   积分   std   Delve   延迟   res   

一、jms:java message service java消息服务,他不是一种协议,而是一个api,用来服务于消息中间件的api;

 

二、消息中间件:为了异步可靠的在两个系统结构之间(泛指两个,可以多个)进行消息传输,令项目服务化,解耦性强。例如:我们有个登录系统,如果用户登录成功返回给用户成功提示之前会调用积分增加服务,日志服务等,这样一旦其中一个服务失效可能就会延迟用户获得登陆成功提示的时间,降低用户体验,所以我们可以异步将用户登录信息传输到附加服务中,只返回用户关心的登录信息。

 

三、常用的消息中间件:ActiveMQ、RabbitMQ、Kafka

 

四、以下为active利用jdk自带的jms实现,结合spring的请点击:

结构如下:

技术分享图片

 

 

  pom.xml文件:

project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    modelVersion>4.0.0modelVersion>

    groupId>com.zxc.jmsgroupId>
    artifactId>jmsartifactId>
    version>1.0-SNAPSHOTversion>

    dependencies>
        dependency>
            groupId>javax.jmsgroupId>
            artifactId>javax.jms-apiartifactId>
            version>2.0.1version>
        dependency>

        dependency>
            groupId>org.apache.activemqgroupId>
            artifactId>activemq-coreartifactId>
            version>5.7.0version>
        dependency>

        
        dependency>
            groupId>log4jgroupId>
            artifactId>log4jartifactId>
            version>1.2.17version>
        dependency>
        
        dependency>
            groupId>org.slf4jgroupId>
            artifactId>slf4j-apiartifactId>
            version>1.7.2version>
        dependency>

        
        dependency>
            groupId>org.slf4jgroupId>
            artifactId>slf4j-log4j12artifactId>
            version>1.7.2version>
        dependency>
        
    dependencies>

project>

log4j.properties:(用的是slf4j日志)如果是idea注意一定要放在resource下,如果用的是eclipse要放在src下

### 设置###
log4j.rootLogger = debug,stdout,D,E

### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

### 输出DEBUG 级别以上的日志到=E://logs/error.log ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File = D://logs/log.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = DEBUG
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n

### 输出ERROR 级别以上的日志到=E://logs/error.log ###
log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
log4j.appender.E.File =D://logs/error.log
log4j.appender.E.Append = true
log4j.appender.E.Threshold = ERROR
log4j.appender.E.layout = org.apache.log4j.PatternLayout
log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n

ActiveMQ分为两种类型:

  第一种队列模式:点对点模式,一个消息生产者生产的消息存入消息队列中,然后可以有多个消费者的多个连接,这些连接基本平均分配这些消息。

  第二种订阅发布模式:一定要订阅者(就是消费者)先要订阅队列,然后发布者发布消息存入队列,订阅者全部接受所有队列消息,不会均分。

 

第一种的队列模式:

  appProducer:

package com.zxc.jms.queue;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class appProducer {

    private static String url = "tcp://localhost:61616";
    private static String queuename = "queue-test";
    private static Logger logger = LoggerFactory.getLogger(appProducer.class);

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(queuename);
        MessageProducer producer = session.createProducer(destination);
        for (int i = 1; i ) {
            TextMessage message = session.createTextMessage("生产者发出第" + i + "消息");
            producer.send(message);
            logger.info("生产者发出的第{}条数据", i);
        }
        connection.close();
    }
}

  

  appConsumer:

  

package com.zxc.jms.queue;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class appConsumer {
    private static String url = "tcp://localhost:61616";
    private static String queuename = "queue-test";
    private static Logger logger = LoggerFactory.getLogger(appProducer.class);

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(queuename);
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                logger.info("接受者接到消息");

            }
        });
    }
}

 

第二种:订阅发布模式

  appProducer:

package com.zxc.topic;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class appProducer {
    private static final String url = "tcp://localhost:61616";
    private static final String topicName = "topic-test";
    private static final Logger logger = LoggerFactory.getLogger(appProducer.class);

    public static void main(String[] args) throws JMSException, InterruptedException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topicName);
        MessageProducer producer = session.createProducer(topic);
        for (int i = 0; i ) {
            TextMessage message = session.createTextMessage("消息"+i);
            producer.send(message);
            logger.info("生产者广播第{}条消息",i);
            Thread.sleep(2000);
        }
        connection.close();
    }
}

  appConsumer:

  

package com.zxc.topic;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class appConsumer {
    private static final String url = "tcp://localhost:61616";
    private static final String topicName = "topic-test";
    private static final Logger logger = LoggerFactory.getLogger(appProducer.class);

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topicName);
        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener((message -> {
            TextMessage textMessage = (TextMessage) message;
            try {
                logger.info("收到消息:{}",textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }));
    }
}

 

  

    

jms及active(jdk api)的实现

标签:ati   时间   vat   att   积分   std   Delve   延迟   res   

原文地址:https://www.cnblogs.com/television/p/9344530.html


评论


亲,登录后才可以留言!