数据同步是希望,当我们商品修改了数据库中的商品信息,索引库中的信息也会跟着改。在微服务中数据库和索引库是在两个不同的服务中。如果,商品的服务,向es的服务中发个消息,通知ES服务就可以实现数据的同步。此时我们利用MQ接收商品服务的消息,实现ES服务对消息的监听就可以了。
org.springframework.boot
spring-boot-starter-amqp
rabbitmq:
host: 124.223.41.137
port: 5672
virtual-host: /
username: rabbitmq
password: 123321
@Service
public class ItemService extends ServiceImpl implements IItemService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
@Override
public void updateStatus(Long id, Integer status) {
// update tb_item set status = ? where id = ?
this.update().set("status", status).eq("id", id).update();
// 根据上下架判断RoutingKey
String routingKey = status == 1 ? "item.up" : "item.down";
// 发送消息
rabbitTemplate.convertAndSend("item.topic", routingKey, id);
}
}
org.springframework.boot
spring-boot-starter-amqp
rabbitmq:
host: 124.223.41.137
port: 5672
virtual-host: /
username: rabbitmq
password: 123321
@Component //注册成一个Bean
public class ItemListener {
@Autowired
private ISearchService searchService;
//基于注解的方式现实交换机和队列的绑定
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "up.queue"),
exchange = @Exchange(name = "item.topic", type = ExchangeTypes.TOPIC),
key = "item.up"
))
//新增业务
public void listenItemUp(Long id){
searchService.saveItemById(id);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "down.queue"),
exchange = @Exchange(name = "item.topic", type = ExchangeTypes.TOPIC),
key = "item.down"
))
//删除业务
public void listenItemDown(Long id){
searchService.deleteItemById(id);
}
}
//消息同步:上架——新增索引库数据
void deleteItemById(Long id);
//消息同步:下架——删除索引库数据
void saveItemById(Long id);
//删除方法
@Override
public void deleteItemById(Long id) {
try {
// 1.准备Request
DeleteRequest request = new DeleteRequest("item", id.toString());
// 2.发请求
restHighLevelClient.delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
//注入——实现远程调用(Item的服务中有商品的查询方法)
@Autowired
private ItemClient itemClient;
//新增方法
@Override
public void saveItemById(Long id) {
try {
// 1.查询商品数据
Item item = itemClient.queryItemById(id);
// 2.准备Request
IndexRequest request = new IndexRequest("item").id(id.toString());
// 3.准备DSL
request.source(MAPPER.writeValueAsString(new ItemDoc(item)), XContentType.JSON);
// 4.发送请求
restHighLevelClient.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
留言与评论(共有 0 条评论) “” |