Pub/Sub模式相较于工作队列和简单模式而言可以有多个消费者来消费同一条消息。Pub/Sub较上面两种引入了一个新的概念为交换机,图中的X就是交换机,生产者先会把消息发送给交换机X,然后通过交换机X在将消息分发给对应的消息队列,各个消费者消费自己的队列的消息。实际上以上两种方式也是有交换机的,只不过他的交换机为默认的,我们用的就是默认的交换机。
交换机常见的有三种类型,Fanout广播类型,将消息发送给所有绑定交换机的队列。Direct定向类型,将消息交给符合指定routing key的队列。Topic通配符,将消息交给符合routing pattern(路由模式)的队列。
交换机只负责转发消息,不具备储存消息的能力,因为没有任何队列或者任何符号条件的路由规则的队列与交换机绑定,那么消息就会消失。
Pub/Sub模式的生产者实现,可以通过代码看到,这次我们声明了两个队列,我们将两个队列与交换机进行了一个绑定,但是发送的时候我们没有直接的向队列发送消息,我们向交换机发送了一条消息。
public class PubSubProducer {
@SneakyThrows
public static void main(String[] args) {
// 1、建立连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接参数
connectionFactory.setHost("192.168.133.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/"); // 默认就是/
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 3、创建连接
Connection connection = connectionFactory.newConnection();
// 4、创建channel
Channel channel = connection.createChannel();
// 5、创建交换机
/**
* 参数介绍
* String exchange 交换机名称
* BuiltinExchangeType type 交换机类型 direct:定向方式
* fanout:扇形(广播) ,发送消息给每一个与之绑定的队列
* topic: 通配符方式 headers:参数匹配方式
* boolean durable 是否持久化
* boolean autoDelete 是否自动删除
* boolean internal 内部使用
* Map arguments 参数列表
*/
String exchangeName = "PubSub_Fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
// 6、创建队列
String queueName1 = "fanout_queue_name_1";
String queueName2 = "fanout_queue_name_2";
channel.queueDeclare(queueName1, true, false, false, null);
channel.queueDeclare(queueName2, true, false, false, null);
// 7、绑定交换机和队列
/**
* 参数介绍
* String queue 队列名称
* String exchange 交换机名称
* String routingKey 路由名称,交换机类型为fanout,routingKey默认为空字符串
* Map arguments 参数列表
*/
channel.queueBind(queueName1, exchangeName, "", null);
channel.queueBind(queueName2, exchangeName, "", null);
// 8、发送消息
String body = "hello com.dream house";
channel.basicPublish(exchangeName, "", null, body.getBytes());
// 9、关闭资源
channel.close();
connection.close();
}
}
Pub/Sub模式消费端的实现,只写了一个代码,可以看到消费者监听了一个队列的名称而非是交换机的名字,两个消费者分别监听不同的消息队列。
public class PubSubConsumer1 {
@SneakyThrows
public static void main(String[] args) {
// 1、建立连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接参数
connectionFactory.setHost("192.168.133.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/"); // 默认就是/
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 3、创建连接
Connection connection = connectionFactory.newConnection();
// 4、创建channel
Channel channel = connection.createChannel();
// 5、创建队列
/**
* 参数介绍
* queue 队列名称
* durable 是否持久化,当MQ重启后还在
* exclusive 是否独占(只能有一个消费者监听这个队列),当Connection关闭时是否删除队列,
* autoDelete 是否自动删除,当没有消费者的时候会自动删除掉
* arguments 参数信息
*/
// 如果没有一个叫dream_house的队列就会自动创建一个队列
// channel.queueDeclare("dream_house", true, false, false, null);
// 6、接收消息
/**
* 参数介绍
* String queue 队列名称
* boolean autoAck 是否自动确认
* Consumer callback 回调函数
*/
Consumer consumer = new DefaultConsumer(channel) {
// 回调方法当收到消息后自动执行方法,匿名内部类
/**
* 参数介绍
* @param consumerTag 消息标识
* @param envelope 获取一些信息,比如交换机,路由
* @param properties 配置信息
* @param body 真实的报文信息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("consumerTag:" + consumerTag + ",envelope:" + envelope.toString() + ",properties:" + properties + ",body:" + new String(body));
}
};
channel.basicConsume("fanout_queue_name_1", true, consumer);
}
}
Pub/Sub类型使用的交换机类型是广播类型Fanout,它需要声明一个交换机Exchange,将多个队列与这个交换机进行绑定,生产者通过向交换机发送消息的方式来进行生产。消费者则通过绑定的队列来各自消费各自队列的消息。
留言与评论(共有 0 条评论) “” |