rocketMq开始使用
rocketmq是怎么使用的
public class ASimpleTest {
@Test
public void simpleProduce() throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");//定义生产者组(可以有多个生产者一起往主题里发)
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.setSendMsgTimeout(30000); // 设置超时为30秒
producer.start();
try {
Message message = new Message("testTopic", "我是一个简单的消息".getBytes());
SendResult sendResult = producer.send(message); // 使用默认超时
System.out.println("发送结果: " + sendResult.getSendStatus());
} catch (Exception e) {
e.printStackTrace(); // 打印异常信息
} finally {
producer.shutdown();
}
}
@Test
public void simpleConsume() throws Exception{
//创建一个消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");
//连接namesrv
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
//订阅一个主题*标识订阅这个主题中所有的消息 后续会有消息过滤
consumer.subscribe("testTopic", "*");
//设置一个监听器(一直监听,异步回调)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//这个就是消费的方法(业务逻辑)
System.out.println("我是消费者");
System.out.println(msgs.get(0).toString());
System.out.println("消息内容"+new String(msgs.get(0).getBody()));
System.out.println("消费上下文"+context);
//返回值,CONSUME_SUCCESS成功,消息会从mq出队
//RECONSUME_LATER(报错/null) 失败,消息会重新回到队列,过一会重新投递出来,给当前消费者或者其他消费者消费的
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起jvm
System.in.read();//一直读,就不停了
}
}
消费模式
MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。
Push是服务端【MQ】主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
Push模式也是基于pull模式的,只能客户端内部封装了api,一般场景下,上游消息生产量小或者均速的时候,选择push模式。在特殊场景下,例如电商大促,抢优惠券等场景可以选择pull模式
异步消息
public class BASyncTest {
@Test
public void asyncProducer()throws Exception{
DefaultMQProducer producer=new DefaultMQProducer("async-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("asyncTopic", "我是一个异步消息".toString().getBytes());
producer.send(message, new SendCallback() {//我们发送完消息之后,不是等待他返回,而是先去执行其他任务,如果收到消息,则执行回调函数
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
@Override
public void onException(Throwable e) {
System.err.println("发送失败:"+e.getMessage());
}
});
System.out.println("我先执行");
System.in.read();//挂起
}
}
单项消息:使用mq处理日志
代码
public class COnewayTest {
@Test
public void onewayProducer()throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("oneway_producer_group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("onewayTopic", "日志XXX".getBytes());
producer.sendOneway(message);//单项消息,没有返回值
System.out.println("发送成功");
producer.shutdown();
}
}
延迟任务(占座买票)
批量
详细代码:
public class EBatchTest {
@Test
public void testBatchProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("batch-producer-group");
// 设置nameServer地址
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 启动实例
producer.start();
List<Message> msgs = Arrays.asList(
new Message("batchTopic", "我是一组消息的A消息".getBytes()),
new Message("batchTopic", "我是一组消息的B消息".getBytes()),
new Message("batchTopic", "我是一组消息的C消息".getBytes())
);
SendResult send = producer.send(msgs);
System.out.println(send);
// 关闭实例
producer.shutdown();
}
@Test
public void msConsumer()throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch-consumer-group");//DefaultMQPushConsumer!!PUSh
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("batchTopic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("收到信息了"+new Date());
System.out.println(msgs.size());//发的时候是捆绑一起发,消费的时候是单个消费
System.out.println(new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
}
本站资源均来自互联网,仅供研究学习,禁止违法使用和商用,产生法律纠纷本站概不负责!如果侵犯了您的权益请与我们联系!
转载请注明出处: 免费源码网-免费的源码资源网站 » rocketMq的使用和消费模式(异步消息,单项消息(使用mq处理日志),推迟任务(占座买票),批量消息)
发表评论 取消回复