Msg依赖
incloud 平台选用RocketMQ做为消息中间件,RocketMQ是一款高性能、高可靠、的消息中间件。 卓越的性能、高可用性和可靠性、支持分布式、多语言客户端。支持 Java、C++、Paython等等开发语言。
依赖
<dependency>
<groupId>com.netwisd</groupId>
<artifactId>incloud-starter-common-mq-rocket</artifactId>
<version>${version}</version>
</dependency>
配置信息
spring:
rocketmq:
namesrvAddr: ${INCLOUD_ROCKETMQ_SERVICE_HOST}:${INCLOUD_ROCKETMQ_SERVICE_PORT}
生产者
@Component
public class demo {
private final DefaultMQProducer defaultMQProducer;
public void send(){
Message msg = new Message(topic,tag,content);
defaultMQProducer.sendOneway(msg);
}
}
消费者
public class UpInterfaceReleaseConsumer implements ApplicationRunner {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;
@Override
public void run(ApplicationArguments args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
// 设置NameServer的地址
consumer.setNamesrvAddr(namesrvAddr);
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe(topic, tag);
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
try {
for (MessageExt messageExt : list) {
String msg = new String(messageExt.getBody());
log.info("消息内容:{}", msg);
}
} catch (Exception e) {
log.error("数据建模获取到离线接口处理异常:", e);
} finally {
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
}
}
Last modified: 20 一月 2025