RabbitMQ基础(消费端的限流)

10.1 YAML配置

spring:
  rabbitmq:
    host: 192.168.133.128
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual  # 进行手动ACK
        prefetch: 2  # 每次处理两条消息
server:
  port: 8081

10.2 代码实现

10.2.1 队列配置

该案例主要是使用Topic模式进行介绍。下面这段代码主要是定义了交换机的名称、队列名称、路由Key的表达式。将队列和交换机进行了一个绑定通过routing key

/**
 * rabbitmq 配置类
 */
@Configuration
public class RabbitMQConfig {
    public static final String EXCHANGE_NAME = "TOPIC_BOOT_EXCHANGE";
    public static final String QUEUE_NAME = "TOPIC_BOOT_QUEUE";
    public static final String ROUTEING_KEY_NAME = "TOPIC_BOOT.*";

    // 1、交换机
    @Bean(name = "bootExChange")
    public Exchange bootExChange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    // 2、队列
    @Bean(name = "bootQueue")
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    // 3、队列和交换机绑定
    /**
     * 需要知道要哪个交换机和哪个队列
     * 需要知道这个交换机和队列是和谁一起绑定
     * 设置routeingKey
     */
    @Bean
    public Binding bootBindingExchangeAndQueue(@Qualifier("bootExChange") Exchange exchange, @Qualifier("bootQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTEING_KEY_NAME).noargs();
    }
    }


10.2.2 生产者

import com.dream.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringRabbitProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 测试topic发送
     */
    @Test
    public void sender() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "TOPIC_BOOT.DREAM", "com.dream topic spring rabbitmq ....");
    }
}

10.2.3 消费者

要做到消费者进行消息限流必须要满足下面几个条件,必须开启手动ACK,同时配置prefetch对每次处理多少条数据进行限制,如果不进行调用channel.basicAck(tag, false);进行确认消息被消费那么就不会继续进行下面的消费。

    /**
     * 消费者对收取消息进行限流,
     * 消费者限流需要满足一下条件:
     * 1、必须开启手动ACK
     * 2、配置prefetch: 2 意思是每次消费者都从队列里面拉取两条消息,手动ACK确认完成后才会继续拉取下面的
     */
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    @SneakyThrows
    public void listenerTopicQueueCurrentLimiting(Message message, Channel channel) {
        // 获取消息标记
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            Thread.sleep(2 * 1000);
            System.out.println(new Date() + "  listenerTopicQueue ===> " + new String(message.getBody()));

            /**
             * 参数介绍:
             * long deliveryTag  消息标记
             * boolean multiple  是否允许一次处理多条消息
             */
            // 这里不手动确认,消费者就不会继续取出下一个数据
            // channel.basicAck(tag, false);
        } catch (Exception ex) {
//            ex.printStackTrace();
            /**
             * 参数介绍:
             * long deliveryTag  消息标记
             * boolean multiple  是否允许一次处理多条消息
             * boolean requeue   表示是否消息重新回到消息队列,如果是true,broker会重新发送消息给消费端
             */
            channel.basicNack(tag, false, true);
            // channel.basicReject(tag,true);
            // 上面这个basicReject方法跟basicNack差不多,但是basicReject不允许多个一起处理
        }
    }
发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章