RocketMQ 基础
https://rocketmq.apache.org/
为什么使用 MQ?-> MQ 的优势和劣势
优势(作用):
- 应用解耦
- 异步提速
- 削峰填谷
劣势:
- 系统可用性降低
- 系统复杂度提高
- 异步消息机制
- 消息顺序性
- 消息丢失
- 消息一致性
- 消息重复使用
应用解耦:消费方存活与否不影响生产方
系统的耦合性越高,容错性就越低,可维护性就越低。
主流 MQ 产品对比
- ActiveMQ:Java 语言实现,万级数据吞吐量,处理速度 ms 级,主从架构,成熟度高
- RabbitMQ:Erlang 语言实现,万级数据吞吐量,处理速度 us 级,主从架构
- RocketMQ:Java 语言实现,十万级数据吞吐量,分布式架构,功能强大,扩展性强
- Kafka:Scala 语言实现,十万级数据吞吐量,处理速度 ms 级,分布式架构,功能较少,应用于大数据较多
环境搭建
略。
消息发送
主要内容
- 基于 Java 环境构建发送与消息接受基础程序
- 单生产者-单消费者
- 单生产者-多消费者
- 多生产者-多消费者
- 发送不同类型的消息
- 同步消息
- 异步消息
- 单向消息
- 特殊的消息发送
- 延时消息
- 批量消息
- 特殊的消息接收
- 消息过滤
- 消息发送与接收顺序控制
- 事务消息
消息发送、消息接收的开发流程
- 谁发
- 发给谁
- 怎么发
- 发什么
- 发的结果是什么
- 打扫战场
关系
- One-To-One(基础发送与基础接收)
- One-To-Many(负载均衡模式与广播模式)
- Many-To-Many
单生产者-单消费者(One-To-One)
- 生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class Producer { public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("topic1", "hello rocketmq".getBytes("UTF-8")); SendResult result = producer.send(msg); System.out.println("返回结果:" + result); producer.shutdown(); } }
|
- 消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class Consumer { public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("topic1", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { System.out.println("收到消息:"+msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("接受消息服务已开启"); } }
|
单生产者-多消费者(One-To-Many):负载均衡
- 生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic1", ("hello rocketmq" + i).getBytes("UTF-8")); SendResult result = producer.send(msg); System.out.println("返回结果:" + result); }
producer.shutdown();
|
- 消费者(默认模式:负载均衡)
开启多实例运行:
Edit Configurations - Add Run Options - Allow multiple instances
说明:同一个消费者,多个实例,争抢 topic 数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic1", "*");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { System.out.println("收到消息:" + msg); System.out.println("消息是:" + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
consumer.start(); System.out.println("接受消息服务已开启");
|
单生产者-多消费者(One-To-Many):广播模式
- 生产者
同上。
- 消费者(广播模式)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic1","*");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { System.out.println("收到消息:" + msg); System.out.println("消息是:" + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
consumer.start(); System.out.println("接受消息服务已开启");
|
多生产者-多消费者(Many-To-Many)
多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费。
消息类别
- 同步消息
- 异步消息
- 单向消息
同步消息
特征:即时性较强,重要,且必须有回执的消息,如短信、通知(转账成功)
1
| SendResult result = producer.send(msg);
|
异步消息
特征:即时性较弱,但需要有回执的消息,如订单中的某些信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic1", ("hello rocketmq"+i).getBytes("UTF-8"));
producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable throwable) { System.out.println(throwable); } }); System.out.println("消息" + i + "发完了,我先做业务逻辑去了~"); }
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
|
单向消息
特征:不需要有回执的消息,如日志类消息
1
| producer.sendOneway(msg);
|
延时消息
消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用。
1 2 3 4 5
| Message msg = new Message("topic3", ("延时消息:hello rocketmq " + i).getBytes("UTF-8"));
msg.setDelayTimeLevel(3); SendResult result = producer.send(msg); System.out.println("返回结果:" + result);
|
目前支持的消息时间
1
| private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
|
批量消息
批量发送消息能显著提高传递小消息的性能。
1 2 3 4 5 6 7 8 9 10
| List<Message> msgList = new ArrayList<Message>(); Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF-8")); Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF-8")); Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF-8"));
msgList.add(msg1); msgList.add(msg2); msgList.add(msg3);
SendResult result = producer.send(msgList);
|
注意限制:
- 这些批量消息应该有相同的 topic
- 这些批量消息应该有相同的 waitStoreMsgOK
- 不能是延时消息
- 消息内容总长度不超过 4 MB
消息内容总长度包含如下:
- topic(字符串字节数)
- body(字节数组长度)
- 消息追加的属性(key 与 value 对应字符串字节数)
- 日志(固定 20 字节)
消息过滤
分类过滤
按照 tag 过滤消息。
生产者
1
| Message msg = new Message("topic6", "tag2", ("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8"));
|
消费者
1 2
| consumer.subscribe("topic6", "tag1 || tag2");
|
语法过滤(属性过滤/语法过滤/SQL过滤)
基本语法与 SQL 类似:
- 数值比较,如:>, >=, <, <=, BETWEEN, =
- 字符比较,如:=, <>, IN
- IS NULL 或者 IS NOT NULL
- 逻辑符号:AND, OR, NOT
常量支持类型:
- 数值,如:123, 3.1415
- 字符,如:’abc’(必须用单引号包裹起来)
- NULL(特殊的常量)
- 布尔值:TRUE 或 FALSE
生产者
1 2 3
| msg.putUserProperty("vip", "1"); msg.putUserProperty("age", "20");
|
消费者
1 2 3
| consumer.subscribe("topic7", MessageSelector.bySql("age >= 18")); consumer.subscribe("topic6", MessageSelector.bySql("name = 'litiedan'"));
|
注意:SQL 过滤需要依赖服务器的功能支持,在 broker.conf 配置文件中添加对应的功能项,并开启对应功能
1
| enablePropertyFilter=true
|
重启 broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
或者直接在终端输入
1
| mqadmin.cmd updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue
|
然后可以在 RocketMQ 控制台(需安装)查看开启与否
Spring Boot 整合
导包
略。
配置文件
1 2
| rocketmq.name-server=localhost:9876 rocketmq.producer.group=demo_producer
|
实体类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class user implements Serializable { String userName; String userId;
public user(){
}
public user(String userName, String userId) { this.userName = userName; this.userId = userId; }
@Override public String toString() { return "demoEntity{" + "userName='" + userName + '\'' + ", userId='" + userId + '\'' + '}'; } }
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12
| @RestController public class DemoProducers { @Autowired private RocketMQTemplate template;
@RequestMapping("/producer") public String producersMessage() { User user = new User("fan", "123456789"); template.convertAndSend("demo-topic", user); return JSON.toJSONString(user); } }
|
消费者
1 2 3 4 5 6 7 8
| @Service @RocketMQMessageListener(topic = "demo-topic", consumerGroup = "demo_consumer") public class DemoConsumers1 implements RocketMQListener<user> { @Override public void onMessage(user user) { System.out.println("Consumers1接收消息:" + demoEntity.toString()); } }
|
其他消息
异步发送
1 2 3 4 5 6 7 8 9 10 11
| rocketMQTemplate.asyncSend("topic9", user, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); }
@Override public void onException(Throwable throwable) { System.out.println(throwable); } });
|
单向发送
1
| rocketMQTemplate.sendOneWay("topic9",user);
|
延时消息
1
| rocketMQTemplate.syncSend("topic9", MessageBuilder.withPayload("test delay").build(), 2000, 2);
|
批量
1 2 3 4 5
| List<Message> msgList = new ArrayList<>(); msgList.add(new Message("topic6", "tag1", "msg1".getBytes())); msgList.add(new Message("topic6", "tag1", "msg2".getBytes())); msgList.add(new Message("topic6", "tag1", "msg3".getBytes())); rocketMQTemplate.syncSend("topic8", msgList, 1000);
|
tag 过滤
消费者
1
| @RocketMQMessageListener(topic = "topic9", consumerGroup = "group1", selectorExpression = "tag1")
|
sql 过滤
1
| @RocketMQMessageListener(topic = "topic9", consumerGroup = "group1", selectorExpression = "age>18" ,selectorType= SelectorType.SQL92)
|
改消息模式
1
| @RocketMQMessageListener(topic = "topic9", consumerGroup = "group1", messageModel = MessageModel.BROADCASTING)
|