# RabbitMQ 入门
官网: https://www.rabbitmq.com/
Java 简单模式文档参考:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
需求:使用简单模式完成消息传递
步骤:
① 创建工程(生成者、消费者)
② 分别添加依赖
③ 编写生产者发送消息
④ 编写消费者接收消息
# 1. 搭建示例工程
# 1.1. 创建工程
创建项目: rabbitmq-producer
创建项目: rabbitmq-consumer
# 1.2. 添加依赖
往两个 rabbitmq 的 pom.xml 文件中添加如下依赖:
<dependencies> | |
<dependency> | |
<groupId>com.rabbitmq</groupId> | |
<artifactId>amqp-client</artifactId> | |
<version>5.6.0</version> | |
</dependency> | |
</dependencies> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<version>3.8.0</version> | |
<configuration> | |
<source>1.8</source> | |
<target>1.8</target> | |
</configuration> | |
</plugin> | |
</plugins> | |
</build> |
# 2. 编写生产者
编写消息生产者 com.atguigu.rabbitmq.simple.Producer
package com.atguigu.rabbitmq.simple; | |
import com.rabbitmq.client.Channel; | |
import com.rabbitmq.client.Connection; | |
import com.rabbitmq.client.ConnectionFactory; | |
import java.util.Map; | |
public class Producer { | |
public static void main(String[] args) throws Exception { | |
// 创建连接工厂 | |
ConnectionFactory connectionFactory = new ConnectionFactory(); | |
// 主机地址 | |
connectionFactory.setHost("192.168.6.100"); | |
// 连接端口;默认为 5672 | |
connectionFactory.setPort(5672); | |
// 虚拟主机名称;默认为 / | |
connectionFactory.setVirtualHost("/"); | |
// 连接用户名;默认为 guest | |
connectionFactory.setUsername("admin"); | |
// 连接密码;默认为 guest | |
connectionFactory.setPassword("123456"); | |
// 创建连接 | |
Connection connection = connectionFactory.newConnection(); | |
// 创建频道 | |
Channel channel = connection.createChannel(); | |
// 声明(创建)队列 | |
/** | |
* queue 参数 1:队列名称 | |
* durable 参数 2:是否定义持久化队列,当 mq 重启之后,还在 | |
* exclusive 参数 3:是否独占本次连接 | |
* ① 是否独占,只能有一个消费者监听这个队列 | |
* ② 当 connection 关闭时,是否删除队列 | |
* autoDelete 参数 4:是否在不使用的时候自动删除队列,当没有 consumer 时,自动删除 | |
* arguments 参数 5:队列其它参数 | |
*/ | |
channel.queueDeclare("simple_queue", true, false, false, null); | |
// 要发送的信息 | |
String message = "你好;小兔子!"; | |
/** | |
* 参数 1:交换机名称,如果没有指定则使用默认 Default Exchage | |
* 参数 2:路由 key, 简单模式可以传递队列名称 | |
* 参数 3:配置信息 | |
* 参数 4:消息内容 | |
*/ | |
channel.basicPublish("", "simple_queue", null, message.getBytes()); | |
System.out.println("已发送消息:" + message); | |
// 关闭资源 | |
channel.close(); | |
connection.close(); | |
} | |
} |
运行程序:http://192.168.6.100:15672
在执行上述的消息发送之后;可以登录 rabbitMQ 的管理控制台,可以发现队列和其消息:
# 3. 编写消费者
编写消息的消费者 com.atguigu.rabbitmq.simple.Consumer
package com.atguigu.rabbitmq.simple; | |
import com.rabbitmq.client.*; | |
import java.io.IOException; | |
import java.util.concurrent.TimeoutException; | |
public class Consumer { | |
public static void main(String[] args) throws Exception { | |
//1. 创建连接工厂 | |
ConnectionFactory factory = new ConnectionFactory(); | |
//2. 设置参数 | |
factory.setHost("192.168.6.100");//ip | |
factory.setPort(5672); // 端口 默认值 5672 | |
factory.setVirtualHost("/");// 虚拟机 默认值 / | |
factory.setUsername("admin");// 用户名 | |
factory.setPassword("123456");// 密码 | |
//3. 创建连接 Connection | |
Connection connection = factory.newConnection(); | |
//4. 创建 Channel | |
Channel channel = connection.createChannel(); | |
//5. 创建队列 Queue | |
/* | |
queueDeclare (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) | |
参数: | |
1. queue:队列名称 | |
2. durable:是否持久化,当 mq 重启之后,还在 | |
3. exclusive: | |
* 是否独占。只能有一个消费者监听这队列 | |
* 当 Connection 关闭时,是否删除队列 | |
4. autoDelete:是否自动删除。当没有 Consumer 时,自动删除掉 | |
5. arguments:参数。 | |
*/ | |
// 如果没有一个名字叫 simple_queue 的队列,则会创建该队列,如果有则不会创建 | |
channel.queueDeclare("simple_queue",true,false,false,null); | |
// 接收消息 | |
DefaultConsumer consumer = new DefaultConsumer(channel){ | |
/* | |
回调方法,当收到消息后,会自动执行该方法 | |
1. consumerTag:标识 | |
2. envelope:获取一些信息,交换机,路由 key... | |
3. properties:配置信息 | |
4. body:数据 | |
*/ | |
@Override | |
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { | |
System.out.println("consumerTag:"+consumerTag); | |
System.out.println("Exchange:"+envelope.getExchange()); | |
System.out.println("RoutingKey:"+envelope.getRoutingKey()); | |
System.out.println("properties:"+properties); | |
System.out.println("body:"+new String(body)); | |
} | |
}; | |
/* | |
basicConsume (String queue, boolean autoAck, Consumer callback) | |
参数: | |
1. queue:队列名称 | |
2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息 | |
3. callback:回调对象 | |
*/ | |
// 消费者类似一个监听程序,主要是用来监听消息 | |
channel.basicConsume("simple_queue",true,consumer); | |
} | |
} |
运行程序
# 4. 小结
上述的入门案例中中其实使用的是如下的简单模式:
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
# 简易聊天室
效果:
因为创建连接部分是公共部分,就不重复写了,可以写一个工具类。
main 方法也差不多,只不过改改队列名而已。也是瞎写的,反正实现了功能,如果有什么不对的望指点。
聊天者 1:
public class worker1 { | |
private static final String DABING_QUEUE="dabing"; | |
private static final String CZH_QUEUE="czh"; | |
public static void main(String[] args) throws IOException, TimeoutException { | |
// 生产者 | |
Channel channel = ConnUtil.getChannel(); | |
channel.queueDeclare(DABING_QUEUE,true,false,false,null); | |
DefaultConsumer consumer=new DefaultConsumer(channel){ | |
@Override | |
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { | |
System.out.println(new String(body)); | |
} | |
}; | |
System.out.println("请在控制台输入信息,回车确认!"); | |
Scanner scanner = new Scanner(System.in); | |
do{ | |
channel.basicConsume(CZH_QUEUE,true, consumer); | |
String message="小冰冰 : "+scanner.nextLine(); | |
channel.basicPublish("",DABING_QUEUE,null,message.getBytes()); | |
}while (scanner.hasNextLine()); | |
System.out.println("消息发送完毕!"); | |
} | |
} |
聊天者 2:
public class worker2 { | |
private static final String DABING_QUEUE="dabing"; | |
private static final String CZH_QUEUE="czh"; | |
public static void main(String[] args) throws IOException, TimeoutException { | |
// 生产者 | |
Channel channel = ConnUtil.getChannel(); | |
channel.queueDeclare(CZH_QUEUE,true,false,false,null); | |
DefaultConsumer consumer=new DefaultConsumer(channel){ | |
@Override | |
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { | |
System.out.println(new String(body)); | |
} | |
}; | |
System.out.println("请在控制台输入信息,回车确认!"); | |
Scanner scanner = new Scanner(System.in); | |
do{ | |
channel.basicConsume(DABING_QUEUE,true, consumer); | |
String message="小辉辉 : "+scanner.nextLine(); | |
channel.basicPublish("",CZH_QUEUE,null,message.getBytes()); | |
}while (scanner.hasNextLine()); | |
System.out.println("消息发送完毕!"); | |
} | |
} |