本文共 3038 字,大约阅读时间需要 10 分钟。
com.rabbitmq amqp-client 5.7.0
Ps:执行下面的Demo的时候,建议优先执行消费端的代码,RabbitMQ会创建对应的exchange和queue并对应绑定**
当然,也可以在RabbitMQ的管理UI上进行add和绑定消息生产者Code:
import java.io.IOException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConfirmListener;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Producer { public static void main(String[] args) throws Exception { // 1、创建connectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.xxx.xx"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("rabbitadmin"); connectionFactory.setPassword("123456"); // 2 获取Connection Connection connection = connectionFactory.newConnection(); // 3 通过Connection创建一个新的Channel Channel channel = connection.createChannel(); // 4 指定我们的消息投递模式: 消息的确认模式 channel.confirmSelect(); String exchangeName = "test_confirm_exchange"; String routingKey = "confirm.save"; // 5 发送一条消息 String msg = "Hello RabbitMQ Send confirm message!"; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); // 6 添加一个确认监听 channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------no ack!-----------"); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------ack!-----------"); } }); }}
消息消费者 Code:
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;import com.rabbitmq.client.QueueingConsumer.Delivery;public class Consumer { public static void main(String[] args) throws Exception { // 1、创建connectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.xxx.xx"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("rabbitadmin"); connectionFactory.setPassword("123456"); // 2 获取Connection Connection connection = connectionFactory.newConnection(); // 3 通过Connection创建一个新的Channel Channel channel = connection.createChannel(); String exchangeName = "test_confirm_exchange"; String routingKey = "confirm.#"; String queueName = "test_confirm_queue"; // 4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key channel.exchangeDeclare(exchangeName, "topic", true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); // 5 创建消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, queueingConsumer); while (true) { Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消费端: " + msg); } }}
转载地址:http://zsuws.baihongyu.com/