hornetq 入门(1)

2020-12-13 02:12

阅读:283

标签:des   style   blog   class   code   java   

Hornetq 版本2.4.0final  需要JDK7及以上

Hornetq官网

Hornetq2.1中文手册 

step1.启动服务端

  1.1准备配置文件(配置说明参考官网手册)

  hornetq-configuration.xml

  

soscw.com,搜素材
configuration xmlns="urn:hornetq"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
    
    name>HornetQ.main.configname>
    
     bindings-directory>F:/hornetq/data/messaging/bindingsbindings-directory>

     large-messages-directory>F:/hornetq/data/messaging/largemessageslarge-messages-directory>

     paging-directory>F:/hornetq/data/messaging/pagingpaging-directory>
    
    
    journal-directory>F:/hornetq/journaljournal-directory>
    journal-min-files>10journal-min-files>
    
    id-cache-size>9000id-cache-size>
    jmx-management-enabled>truejmx-management-enabled>   
    
    message-counter-enabled>truemessage-counter-enabled>
    
    message-counter-max-day-history>7message-counter-max-day-history>            
    
    message-counter-sample-period>60000message-counter-sample-period>
    persistence-enabled>truepersistence-enabled>    
     
    cluster-user>HORNETQ.CLUSTER.ADMIN.USERcluster-user> cluster-password>test65525cluster-password> connectors> connector name="connector-netty"> factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory factory-class> param key="use-nio" value="true" /> param key="host" value="localhost"/> param key="port" value="11212" /> connector> connector name="netty-ssl-connector"> factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactoryfactory-class> param key="host" value="localhost"/> param key="port" value="5500"/> param key="ssl-enabled" value="true"/> param key="key-store-path" value="F:/ssl/keystore"/> param key="key-store-password" value="test"/> connector> connectors> acceptors> acceptor name="netty"> factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory factory-class> param key="use-nio" value="true" /> param key="host" value="0.0.0.0,127.0.0.1,localhost">param> param key="port" value="11212" /> acceptor> acceptor name="netty-ssl-acceptor"> factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactoryfactory-class> param key="host" value="localhost"/> param key="port" value="5500"/> param key="ssl-enabled" value="true"/> param key="key-store-path" value="F:/ssl/keystore"/> param key="key-store-password" value="test"/> param key="trust-store-path" value="F:/ssl/truststore"/> param key="trust-store-password" value="test"/> acceptor> acceptors> address-settings> address-setting match="jms.queue.#"> redelivery-delay>5000redelivery-delay> expiry-address>jms.queue.expiryQueueexpiry-address> last-value-queue>truelast-value-queue> max-size-bytes>100000max-size-bytes> page-size-bytes>20000page-size-bytes> redistribution-delay>0redistribution-delay> address-full-policy>PAGEaddress-full-policy> send-to-dla-on-no-route>truesend-to-dla-on-no-route> dead-letter-address>jms.queue.deadLetterQueuedead-letter-address> max-delivery-attempts>3max-delivery-attempts> address-setting> address-settings> security-settings> security-setting match="jms.queue.#"> permission type="createDurableQueue" roles="guest" /> permission type="deleteDurableQueue" roles="guest" /> permission type="createNonDurableQueue" roles="guest" /> permission type="deleteNonDurableQueue" roles="guest" /> permission type="consume" roles="guest" /> permission type="send" roles="guest" /> security-setting> security-settings> configuration>
soscw.com,搜素材

hornetq-jms.xml

soscw.com,搜素材
configuration xmlns="urn:hornetq"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
  
    connection-factory name="ConnectionFactory">
        connectors>
            
            connector-ref connector-name="connector-netty" />
        connectors>
        entries>
            entry name="ConnectionFactory" />
            entry name="/ConnectionFactory" />
            entry name="XAConnectionFactory" />
            entry name="/XAConnectionFactory" />
            entry name="java:/ConnectionFactory"/>
             entry name="java:/XAConnectionFactory"/>
        entries>
        
        retry-interval>1000retry-interval>
        retry-interval-multiplier>1.5retry-interval-multiplier>
        max-retry-interval>60000max-retry-interval>
        reconnect-attempts>1000reconnect-attempts>
        confirmation-window-size>1048576confirmation-window-size>
    connection-factory>
    
     
    queue name="notificationsQueue">
        entry name="/queue/notificationsQueue">entry>
    queue>
    queue name="testQueue">
          entry name="/queue/testQueue"/>
          selector string="color=‘red‘"/>
          durable>truedurable>
    queue>
    
   queue name="deadLetterQueue">
      entry name="/queue/deadLetterQueue"/>
   queue>
     
configuration>
soscw.com,搜素材

 1.2 启动hornetq服务

 

soscw.com,搜素材
public static void startHornetqServer(){
        try {
            //config  hornetq-configuration.xml
            FileConfiguration config = new FileConfiguration();
            config.start();
            //HornetQServer
            HornetQServer server=HornetQServers.newHornetQServer(config);
            //JNPServer
            StandaloneNamingServer standalone=new StandaloneNamingServer(server);
            standalone.setBindAddress("0.0.0.0");
            standalone.setRmiBindAddress("0.0.0.0");
            standalone.start();
            //JMSServer hornetq-jms.xml
            jmsServer=new JMSServerManagerImpl(server);
            jmsServer.start();
            //start hornetq core server
            server.start();
            System.out.println(jmsServer.isStarted());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
soscw.com,搜素材

 

step2.发送消息客户端

   

soscw.com,搜素材
/**
     * @param args
     */
    public static void main(String[] args) {
        try {
            Properties prop = new Properties();
            prop.setProperty("java.naming.factory.initial","org.jnp.interfaces.NamingContextFactory");
            prop.setProperty("java.naming.provider.url", "jnp://127.0.0.1:1099");
            prop.setProperty("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces");
            prop.setProperty(Context.SECURITY_PRINCIPAL,"guest");
            prop.setProperty(Context.SECURITY_CREDENTIALS, "guest");
            
            Context ctx = new InitialContext(prop);
            System.out.println("+++++++1111ssssssss");
            //查找目标地址
            Destination destination = (Destination)ctx.lookup("/queue/notificationsQueue");
            System.out.println("+++++++2222"+destination);
            
            //根据上下文查找一个连接工厂 QueueConnectionFactory 。
            //该连接工厂是由JMS提供的,不需我们自己创建,每个厂商都为它绑定了一个全局JNDI,我们通过它的全局JNDI便可获取它;
            //ConnectionFactory 对应hornetq-jms.xml里面的 connection-factory name="ConnectionFactory"
            ConnectionFactory factory = (ConnectionFactory)ctx.lookup("ConnectionFactory");
            System.out.println("+++++++3333"+factory);
            //从连接工厂得到一个连接 create QueueConnection
            Connection    conn = factory.createConnection();
            System.out.println("+++++++4444"+conn);
            conn.start();
            
            //通过连接来建立一个会话(Session); 
            javax.jms.Session session = conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
            
            //根据会话以及目标地址来建立消息生产者MessageProducer (QueueSender和TopicPublisher都扩展自MessageProducer接口)
            MessageProducer producer = session.createProducer(destination);
            TextMessage msg = session.createTextMessage("ffffffffffffffffffffffffffffffffffffffffffffff小心呈现出");
            BytesMessage byteMessage=session.createBytesMessage();
            byteMessage.writeBytes("testddddddddd".getBytes("utf-8"));
            producer.send(msg);
            producer.send(byteMessage);
            System.out.println("send over !!!!!");
            session.close();
            conn.close();
            System.out.println("send down===");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
soscw.com,搜素材

 

step3.接受消息客户端

soscw.com,搜素材
public MessageReceive(String ...destinationJNDI){
            
            QueueConnectionFactory factory=(QueueConnectionFactory)getJNDIRemoteObj("ConnectionFactory");
            try {
                if(factory==null)
                    return;
                connection = factory.createConnection();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                connection.start();
                for (int i = 0; i ) {
                    destination = (Queue) getJNDIRemoteObj(destinationJNDI[i]);
                    if(destination==null)
                        continue;
                    producer = session.createConsumer(destination);
                    //接受消息
                    producer.setMessageListener(new ReceiveMessage());
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }    
        }
soscw.com,搜素材
soscw.com,搜素材
public static Object getJNDIRemoteObj(String jndiName) {
        try {
            Properties prop = new Properties();
            prop.setProperty("java.naming.factory.initial","org.jnp.interfaces.NamingContextFactory");
            prop.setProperty("java.naming.provider.url", "jnp://127.0.0.1:1099");
            prop.setProperty("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces");
            prop.setProperty(Context.SECURITY_PRINCIPAL,"guest");
            prop.setProperty(Context.SECURITY_CREDENTIALS, "guest");
            Context context = new InitialContext(prop);
            return context.lookup(jndiName);
        } catch (NamingException e) {
            e.printStackTrace();
        }
        return null;
    }
soscw.com,搜素材
soscw.com,搜素材
public class ReceiveMessage implements MessageListener {

    @SuppressWarnings("deprecation")
    @Override
    public void onMessage(Message message) {
         System.out.println("Received notification:"+new Date().toLocaleString());
         try
         {
//            Enumeration propertyNames = message.getPropertyNames();
//            while (propertyNames.hasMoreElements())
//            {
//               String propertyName = (String)propertyNames.nextElement();
//               System.err.format("  %s: %s\n", propertyName, message.getObjectProperty(propertyName));
//            }
             HornetQDestination des=(HornetQDestination) message.getJMSDestination();
            if(message instanceof TextMessage){
                TextMessage mesg=(TextMessage)message;
                System.out.println(des.getAddress()+"==received:"+mesg.getText());
            }else  if(message instanceof BytesMessage){
                BytesMessage mesg=(BytesMessage)message;
                ByteArrayOutputStream out=new ByteArrayOutputStream(((Long)mesg.getBodyLength()).intValue());
                try {
                    byte[] r=new byte[2048];
                    int i=0;
                    while((i=mesg.readBytes(r))!=-1)
                        out.write(r,0,i);
                    System.out.println(des.getClass()+"==received:"+new String(out.toByteArray(),"utf-8"));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                try {
                    out.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(message instanceof HornetQObjectMessage){
                HornetQObjectMessage object=(HornetQObjectMessage)message;
                Object msgObj=object.getObject();
                if(msgObj instanceof ErrorMessageBO){
                    ErrorMessageBO messageBO=(ErrorMessageBO)msgObj;
                    String msg=messageBO.getMessageContent();
                    System.err.println("error:==>"+msg);
                }
            }
            
         }
         catch (JMSException e)
         {
             e.printStackTrace();
         }
         System.out.println("----------end--------------");

    }
soscw.com,搜素材

 

hornetq 入门(1),搜素材,soscw.com

hornetq 入门(1)

标签:des   style   blog   class   code   java   

原文地址:http://www.cnblogs.com/qwj888/p/3716023.html


评论


亲,登录后才可以留言!