spring:
rabbitmq:
host: 192.168.133.128
port: 5672
username: admin
password: admin
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 进行手动ACK
prefetch: 2 # 每次处理两条消息
server:
port: 8081
该案例主要是使用Topic模式进行介绍。下面这段代码主要是定义了交换机的名称、队列名称、路由Key的表达式。将队列和交换机进行了一个绑定通过routing key
/**
* rabbitmq 配置类
*/
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "TOPIC_BOOT_EXCHANGE";
public static final String QUEUE_NAME = "TOPIC_BOOT_QUEUE";
public static final String ROUTEING_KEY_NAME = "TOPIC_BOOT.*";
// 1、交换机
@Bean(name = "bootExChange")
public Exchange bootExChange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
// 2、队列
@Bean(name = "bootQueue")
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
// 3、队列和交换机绑定
/**
* 需要知道要哪个交换机和哪个队列
* 需要知道这个交换机和队列是和谁一起绑定
* 设置routeingKey
*/
@Bean
public Binding bootBindingExchangeAndQueue(@Qualifier("bootExChange") Exchange exchange, @Qualifier("bootQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTEING_KEY_NAME).noargs();
}
}
import com.dream.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringRabbitProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 测试topic发送
*/
@Test
public void sender() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "TOPIC_BOOT.DREAM", "com.dream topic spring rabbitmq ....");
}
}
要做到消费者进行消息限流必须要满足下面几个条件,必须开启手动ACK,同时配置prefetch对每次处理多少条数据进行限制,如果不进行调用channel.basicAck(tag, false);进行确认消息被消费那么就不会继续进行下面的消费。
/**
* 消费者对收取消息进行限流,
* 消费者限流需要满足一下条件:
* 1、必须开启手动ACK
* 2、配置prefetch: 2 意思是每次消费者都从队列里面拉取两条消息,手动ACK确认完成后才会继续拉取下面的
*/
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
@SneakyThrows
public void listenerTopicQueueCurrentLimiting(Message message, Channel channel) {
// 获取消息标记
long tag = message.getMessageProperties().getDeliveryTag();
try {
Thread.sleep(2 * 1000);
System.out.println(new Date() + " listenerTopicQueue ===> " + new String(message.getBody()));
/**
* 参数介绍:
* long deliveryTag 消息标记
* boolean multiple 是否允许一次处理多条消息
*/
// 这里不手动确认,消费者就不会继续取出下一个数据
// channel.basicAck(tag, false);
} catch (Exception ex) {
// ex.printStackTrace();
/**
* 参数介绍:
* long deliveryTag 消息标记
* boolean multiple 是否允许一次处理多条消息
* boolean requeue 表示是否消息重新回到消息队列,如果是true,broker会重新发送消息给消费端
*/
channel.basicNack(tag, false, true);
// channel.basicReject(tag,true);
// 上面这个basicReject方法跟basicNack差不多,但是basicReject不允许多个一起处理
}
}
留言与评论(共有 0 条评论) “” |