本文以redis为数据结构基础,配合Spring管理机制,使用java实现了一个轻量级、可配置的消息队列。适合的项目特点:
- Spring框架管理对象
- 有消息需求,但不想维护mq中间件
- 有使用redis
- 对消息持久化并没有很苛刻的要求
设计方案
设计主要包含以下几点:
- 将整个Redis当做消息池,以kv形式存储消息
- 使用ZSET做优先队列,按照score维持优先级
- 使用LIST结构,以先进先出的方式消费
- zset和list存储消息地址(对应消息池的每个key)
- 自定义路由对象,存储zset和list名称,以点对点的方式将消息从zset路由到正确的list
- 使用定时器维持路由
- 根据TTL规则实现消息延迟
代码实现
技术说明
示例使用Springboot,gradle,redis,jdk8。
核心代码
核心代码主要包含消息对象Message,路由器Route和消息队列RedisMQ。
Message对象
1 | package git.yampery.msmq; |
Route(消息路由器)
1 | package git.yampery.msmq; |
RedisMq(消息队列)
1 | package git.yampery.msmq; |
RedisMq消息队列配置:
- mq.properties文件
1 | # 队列的监听数量 |
- MqConfig.java文件
1 | package git.yampery.config; |
如果使用的是xml配置 请参考:
1 | <bean id="redisMQ" class="git.yampery.msmq.RedisMQ"> |
消费者
并没有内置消费者监听器来实现,可以直接使用定时器实现
1 | package git.yampery.task; |
测试
测试设置20秒延迟,发布消息到queue:1,在list:1消费。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 package git.yampery.mq;
import com.alibaba.fastjson.JSONObject;
import git.yampery.msmq.Message;
import git.yampery.msmq.RedisMQ;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.UUID;
/**
* @decription TestMQ
* <p>测试</p>
* @author Yampery
* @date 2018/2/9 18:43
*/
(SpringRunner.class)
public class TestMQ {
private RedisMQ redisMQ;
"${mq.queue.first}") (
private String MQ_QUEUE_FIRST;
public void testMq() {
JSONObject jObj = new JSONObject();
jObj.put("msg", "这是一条短信");
String seqId = UUID.randomUUID().toString();
// 将有效信息放入消息队列和消息池中
Message message = new Message();
message.setBody(jObj.toJSONString());
// 可以添加延迟配置
message.setDelay(20);
message.setTopic("SMS");
message.setCreateTime(System.currentTimeMillis());
message.setId(seqId);
// 设置消息池ttl,防止长期占用
message.setTtl(20 * 60);
message.setStatus(0);
message.setPriority(0);
redisMQ.addMsgPool(message);
redisMQ.enMessage(MQ_QUEUE_FIRST,
message.getCreateTime() + message.getDelay() + message.getPriority(), message.getId());
}
}
总结
文章利用redis已有的数据存储结构,实现了轻量级的消息队列,并未真正实现消息持久化。示例是针对点对点的消息路由方式,当然,也可以扩展成广播和主题的方式,不过,这样就得不偿失了,如果需求比较复杂,可靠性要求较高,反而不如直接维护rabbitmq之类的消息队列。 需要使用rabbitmq实现延迟消息请参考这里
源码连接
文章并未贴出所有代码,gradle构建、jedisUtils以及一些配置等可以参考源码,源码只需要设置自己的redis配置即可。 https://github.com/Yampery/rdsmq.git