# RabbitMQ 入门

官网: https://www.rabbitmq.com/

Java 简单模式文档参考:https://www.rabbitmq.com/tutorials/tutorial-one-java.html

需求:使用简单模式完成消息传递

步骤:

① 创建工程(生成者、消费者)

② 分别添加依赖

③ 编写生产者发送消息

④ 编写消费者接收消息

# 1. 搭建示例工程

# 1.1. 创建工程

创建项目: rabbitmq-producer

image-20220902094203897

创建项目: rabbitmq-consumer

image-20220902094209384

# 1.2. 添加依赖

往两个 rabbitmq 的 pom.xml 文件中添加如下依赖:

<dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

# 2. 编写生产者

编写消息生产者 com.atguigu.rabbitmq.simple.Producer

package com.atguigu.rabbitmq.simple;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Map;
 
public class Producer {
 
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 主机地址
        connectionFactory.setHost("192.168.6.100");
        // 连接端口;默认为 5672
        connectionFactory.setPort(5672);
        // 虚拟主机名称;默认为 /
        connectionFactory.setVirtualHost("/");
        // 连接用户名;默认为 guest
        connectionFactory.setUsername("admin");
        // 连接密码;默认为 guest
        connectionFactory.setPassword("123456");
 
        // 创建连接
        Connection connection = connectionFactory.newConnection();
 
        // 创建频道
        Channel channel = connection.createChannel();
 
        // 声明(创建)队列
        /**
         * queue      参数 1:队列名称
         * durable    参数 2:是否定义持久化队列,当 mq 重启之后,还在
         * exclusive  参数 3:是否独占本次连接
         *            ① 是否独占,只能有一个消费者监听这个队列
         *            ② 当 connection 关闭时,是否删除队列
         * autoDelete 参数 4:是否在不使用的时候自动删除队列,当没有 consumer 时,自动删除
         * arguments  参数 5:队列其它参数
         */
        channel.queueDeclare("simple_queue", true, false, false, null);
 
        // 要发送的信息
        String message = "你好;小兔子!";
        /**
         * 参数 1:交换机名称,如果没有指定则使用默认 Default Exchage
         * 参数 2:路由 key, 简单模式可以传递队列名称
         * 参数 3:配置信息
         * 参数 4:消息内容
         */
        channel.basicPublish("", "simple_queue", null, message.getBytes());
        System.out.println("已发送消息:" + message);
 
        // 关闭资源
        channel.close();
        connection.close();
    }
}

运行程序:http://192.168.6.100:15672

在执行上述的消息发送之后;可以登录 rabbitMQ 的管理控制台,可以发现队列和其消息: image-20220902094308205

# 3. 编写消费者

编写消息的消费者 com.atguigu.rabbitmq.simple.Consumer

package com.atguigu.rabbitmq.simple;
 
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
 
public class Consumer {
    public static void main(String[] args) throws Exception {
 
        //1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("192.168.6.100");//ip
        factory.setPort(5672); // 端口  默认值 5672
        factory.setVirtualHost("/");// 虚拟机 默认值 /
        factory.setUsername("admin");// 用户名
        factory.setPassword("123456");// 密码
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建 Channel
        Channel channel = connection.createChannel();
        //5. 创建队列 Queue
        /*
        queueDeclare (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当 mq 重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当 Connection 关闭时,是否删除队列
            4. autoDelete:是否自动删除。当没有 Consumer 时,自动删除掉
            5. arguments:参数。
         */
        // 如果没有一个名字叫 simple_queue 的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("simple_queue",true,false,false,null);
 
        // 接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /*
               回调方法,当收到消息后,会自动执行该方法
               1. consumerTag:标识
               2. envelope:获取一些信息,交换机,路由 key...
               3. properties:配置信息
               4. body:数据
            */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
            }
        };
        /*
        basicConsume (String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息
            3. callback:回调对象
         */
        // 消费者类似一个监听程序,主要是用来监听消息
        channel.basicConsume("simple_queue",true,consumer);
   }
}

运行程序

image-20220917235627876

# 4. 小结

上述的入门案例中中其实使用的是如下的简单模式:

image-20220917235608323

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

# 简易聊天室

效果:

image-20220917235819321

image-20220917235835334

因为创建连接部分是公共部分,就不重复写了,可以写一个工具类。

main 方法也差不多,只不过改改队列名而已。也是瞎写的,反正实现了功能,如果有什么不对的望指点。

聊天者 1:

public class worker1 {
    private static final String DABING_QUEUE="dabing";
    private static final String CZH_QUEUE="czh";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 生产者
        Channel channel = ConnUtil.getChannel();
        channel.queueDeclare(DABING_QUEUE,true,false,false,null);
        DefaultConsumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        System.out.println("请在控制台输入信息,回车确认!");
        Scanner scanner = new Scanner(System.in);
        do{
            channel.basicConsume(CZH_QUEUE,true, consumer);
            String message="小冰冰 : "+scanner.nextLine();
            channel.basicPublish("",DABING_QUEUE,null,message.getBytes());
        }while (scanner.hasNextLine());
        System.out.println("消息发送完毕!");
    }
}

聊天者 2:

public class worker2 {
    private static final String DABING_QUEUE="dabing";
    private static final String CZH_QUEUE="czh";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 生产者
        Channel channel = ConnUtil.getChannel();
        channel.queueDeclare(CZH_QUEUE,true,false,false,null);
        DefaultConsumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        System.out.println("请在控制台输入信息,回车确认!");
        Scanner scanner = new Scanner(System.in);
        do{
            channel.basicConsume(DABING_QUEUE,true, consumer);
            String message="小辉辉 : "+scanner.nextLine();
            channel.basicPublish("",CZH_QUEUE,null,message.getBytes());
        }while (scanner.hasNextLine());
        System.out.println("消息发送完毕!");
    }
}