背景:项目中一个场景需要用java端的处理代码获取php端放到rabbitmq内的消息,然后做相应业务的处理。
前提:rabbitmq服务器已经搭建好,php端的消息发布正常运行。
首先:下载rabbitmq-client对应的java版jar包(spring好像有相应的支持)
开始代码coding的工作,上代码
package com.eelly.imagesearch.common; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.GetResponse; public class RabbitMqControll { /** * 读取RabbitMq中的存储信息 * * @param queue_name 队列名 * @param exchange_name 交换机名 * @param route_key 绑定用到的route_key * @param durable 是否持久化 */ public void readRabbitMqInfo (String queue_name, String exchange_name, String route_key, boolean durable) { ConnectionFactory factory = new ConnectionFactory(); // 设置服务器ip factory.setHost("172.18.107.66"); // 设置rabbitmq服务器运行的端口 factory.setPort(5672); // 设置rabbitmq服务器连接用户 factory.setUsername("guest"); // 设置rabbitmq服务器连接用户密码 factory.setPassword("guest"); // 设置rabbitmq服务器节点目录(个人理解) factory.setVirtualHost("/"); try { // 创建工厂连接 Connection connection = factory.newConnection(); // 创建通道 Channel channel = connection.createChannel(); // 声明交换机(设置相关属性时需要和php端的一致) channel.exchangeDeclare(exchange_name, "direct", durable); // 声明消息队列(设置相关属性时需要和php端的一致) channel.queueDeclare(queue_name, durable, false, true, null); // 绑定消息队列(设置相关属性时需要和php端的一致) channel.queueBind(queue_name, exchange_name, route_key); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // basicConsume消费模式 /*channel.basicQos(1);//消息分发处理 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queue_name, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); // 提交消息处理完成回复 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }*/ // basicGet消费模式 while (true) { // get方式主动消费 GetResponse res=channel.basicGet(queue_name, false); if (res != null && res.getMessageCount() >= 0) { System.out.println(res.getMessageCount()); String message = ""; message = new String(res.getBody()); channel.basicAck(res.getEnvelope().getDeliveryTag(), false); System.out.println(" [x] Received '" + message + "'"); } else { System.out.println("消息队列中没有可消费的信息!"); break; } } channel.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } } |
在开发的过程中,主要报的异常是:
1.创建交换机和消息队列时,设置的属性和消息产生端的php代码设置的不一样,导致不匹配和一直重写属性
2.在调用时一直没有确定到底是用basicConsume的消费模式还是basicGet消费模式(前者带有监控效果,后者没有,不知道是不是因为一者有跳出while循环,一者没有的原因)