mq新api订阅实现
2021-01-02 12:31
标签:nbsp ethos cat def rgs 订阅 try 绑定 null mq连接 定义消息生产者 定义消息消费者 mq新api订阅实现 标签:nbsp ethos cat def rgs 订阅 try 绑定 null 原文地址:https://www.cnblogs.com/wsxmiss/p/13219381.html
public class MessageUtils {
//获取mq的连接
public static Connection getConnection() throws IOException, TimeoutException {
//定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//设置端口
factory.setPort(5672);
//设置路径
factory.setVirtualHost("/user_num1");
//设置用户名
factory.setUsername("user1");
//设置密码
factory.setPassword("user1");
return factory.newConnection();
}
}
public class MessageProduction {
private static final String QUER_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException{
//获取一个连接
Connection connection = MessageUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明交换机
//fanout 分发
channel.exchangeDeclare(QUER_NAME,"fanout");
//发送消息
String msg = "liujiang ";
channel.basicPublish(QUER_NAME,"",null,msg.getBytes());
System.out.println("send"+msg);
//关闭通道
channel.close();
connection.close();
}
}
public class MessageConsumer {
private static final String QUER_NAME1 = "test_simple_queue2";
private static final String QUER_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = MessageUtils.getConnection();
//创建频道
final Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUER_NAME1,false,false,false,null);
//绑定队列到交换机转发器
channel.queueBind(QUER_NAME1,QUER_NAME,"");
//保证每次之分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
//获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body, "UTF-8");
System.out.println("消费者[1] msg:"+message.getBytes());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//自动应答
boolean autoAck = false;
//监听队列
channel.basicConsume(QUER_NAME1, autoAck, consumer);
}
}