基于Redis的消息队列

本文以redis为数据结构基础,配合Spring管理机制,使用java实现了一个轻量级、可配置的消息队列。适合的项目特点:

  • Spring框架管理对象
  • 有消息需求,但不想维护mq中间件
  • 有使用redis
  • 对消息持久化并没有很苛刻的要求

需要使用rabbitmq实现延迟消息请参考这里


设计方案

设计主要包含以下几点:

  • 将整个Redis当做消息池,以kv形式存储消息
  • 使用ZSET做优先队列,按照score维持优先级
  • 使用LIST结构,以先进先出的方式消费
  • zset和list存储消息地址(对应消息池的每个key)
  • 自定义路由对象,存储zset和list名称,以点对点的方式将消息从zset路由到正确的list
  • 使用定时器维持路由
  • 根据TTL规则实现消息延迟

基于Redis有消息队列设计方案

代码实现

技术说明

示例使用Springboot,gradle,redis,jdk8。

核心代码

核心代码主要包含消息对象Message,路由器Route和消息队列RedisMQ。

Message对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package git.yampery.msmq;

/**
* @decription Message
* <p>封装消息元数据</p>
* @author Yampery
* @date 2017/11/2 15:50
*/
public class Message {

/**
* 消息主题
*/
private String topic;
/**
* 消息id
*/
private String id;
/**
* 消息延迟
*/
private long delay;
/**
* 消息优先级
*/
private int priority;
/**
* 消息存活时间
*/
private int ttl;
/**
* 消息体,对应业务内容
*/
private String body;
/**
* 创建时间,如果只有优先级没有延迟,可以设置创建时间为0
* 用来消除时间的影响
*/
private long createTime;
/**
* 消息状态(延迟-0;待发送-1;已发送-2;发送失败-3)
*/
private int status;

/**
* getset略...
*/
}

Route(消息路由器)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package git.yampery.msmq;

/**
* @decription Route
* <p>消息路由器,主要控制将消息从指定的队列路由到待消费的list<br>
* 通过这种方式实现自定义延迟以及优先级发送</p>
* @author Yampery
* @date 2017/11/3 14:33
*/
public class Route {

/**
* 存放消息的队列
*/
private String queue;

/**
* 待消费的列表
*/
private String list;

public Route(String queue, String list) {
this.queue = queue;
this.list = list;
}

/**
* getset略...
*/
}

RedisMq(消息队列)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package git.yampery.msmq;

import git.yampery.utils.JedisUtils;
import org.springframework.scheduling.annotation.Scheduled;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

/**
* @decription RedisMQ
* <p>基于redis的消息队列</p>
* <p>将整个redis作为消息池存储消息体,以ZSET为消息队列,LIST作为待消费列表<br>
* 用Spring定时器作为监听器,每次监听ZSET中指定数量的消息<br>
* 根据SCORE确定是否达到发送要求,如果达到,利用消息路由{@link Route}将消息路由到待消费list</p>
* @author Yampery
* @date 2017/11/2 15:49
*/
public class RedisMQ {

/**
* 消息池前缀,以此前缀加上传递的消息id作为key,以消息{@link Message}
* 的消息体body作为值存储
*/
private static final String MSG_POOL = "Message:Pool:";
/**
* 默认监听数量,对应监听zset队列前多少个元素
*/
private static final int DEFAUT_MONITOR = 10;
@Resource private JedisUtils jedisUtils;


/**
* 每次监听queue中元素的数量,可配置
*/
private int monitorCount = DEFAUT_MONITOR;


/**
* 消息路由
*/
private List<Route> routes;

/**
* 存入消息池
* @param message
* @return
*/
public boolean addMsgPool(Message message) {

if (null != message) {
return jedisUtils.setex(MSG_POOL + message.getId(), message.getBody(), message.getTtl());
}
return false;
}

/**
* 从消息池中删除消息
* @param id
* @return
*/
public boolean deMsgPool(String id) {

return jedisUtils.del(MSG_POOL + id);
}

/**
* 像队列中添加消息
* @param key
* @param score 优先级
* @param val
* @return 返回消息id
*/
public String enMessage(String key, long score, String val) {

if (jedisUtils.zadd(key, score, val)) {
return val;
}
return "";
}

/**
* 从队列删除消息
* @param id
* @return
*/
public boolean deMessage(String key, String id) {

return jedisUtils.zdel(key, id);
}

/**
* 消费
* @return
*/
public List<String> consume(String key) {

long count = jedisUtils.countList(key);
if (0 < count) {
// 可根据需求做限制
List<String> ids = jedisUtils.rangeList(key, 0, count - 1);
if (ids != null) {
List<String> result = new ArrayList<>();
ids.forEach(l -> result.add(jedisUtils.get(MSG_POOL + l, "")));
jedisUtils.removeListValue(key, ids);
return result;
} /// if end~
}

return null;
}

/**
* 消息队列监听器<br>
* 监听所有路由器,将消息队列中的消息路由到待消费列表
*/
@Scheduled(cron="*/5 * * * * *")
public void monitor() {
// 获取消息路由
int route_size;
if (null == routes || 1 > (route_size = routes.size())) return;
String queue, list;
Set<String> set;
for (int i = 0; i < route_size; i++) {
queue = routes.get(i).getQueue();
list = routes.get(i).getList();
set = jedisUtils.getSoredSetByRange(queue, 0, monitorCount, true);
if (null != set) {
long current = System.currentTimeMillis();
long score;
for (String id : set) {
score = jedisUtils.getScore(queue, id).longValue();
if (current >= score) {
// 添加到list
if (jedisUtils.insertList(list, id)) {
// 删除queue中的元素
deMessage(queue, id);
} /// if end~
} /// if end~
} /// for end~
} /// if end~
} /// for end~
}

public int getMonitorCount() {
return monitorCount;
}

public void setMonitorCount(int monitorCount) {
this.monitorCount = monitorCount;
}

public List<Route> getRoutes() {
return routes;
}

public void setRoutes(List<Route> routes) {
this.routes = routes;
}
}

RedisMq消息队列配置:

  • mq.properties文件
1
2
3
4
5
6
7
8
9
10
# 队列的监听数量
mq.monitor.count =30
# 队列一
mq.queue.first =queue:1
# 队列二
mq.queue.second =queue:2
# 消费列表一
mq.consumer.first =list:1
# 消费列表二
mq.consumer.second =list:2
  • MqConfig.java文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package git.yampery.config;

import git.yampery.msmq.RedisMQ;
import git.yampery.msmq.Route;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import java.util.ArrayList;
import java.util.List;

/**
* @decription MqConfig
* <p>消息队列配置</p>
* @author Yampery
* @date 2018/2/9 14:26
*
* 根据不同的架构可选择使用XML配置
* ---------------------------------------------------
*
<bean id="redisMQ" class="git.yampery.msmq.RedisMQ">
<property name="monitorCount" value="15"/>
<property name="routes">
<list>
<bean class="git.yampery.msmq.Route">
<property name="queue" value="${mq.queue.first}"/>
<property name="list" value="${mq.consumer.first}"/>
</bean>
<bean class="git.yampery.msmq.Route">
<property name="queue" value="${mq.queue.second}"/>
<property name="list" value="${mq.consumer.second}"/>
</bean>
</list>
</property>
</bean>

* ----------------------------------------------------
*/
@Configuration
public class MqConfig {

@Bean(name = "redisMQ")
@Primary
public RedisMQ getRedisMq() {
RedisMQ redisMQ = new RedisMQ();
// 配置监听队列元素数量
redisMQ.setMonitorCount(monitorCount);
// 配置路由表
redisMQ.setRoutes(routeList());
return redisMQ;
}

/**
* 返回路由表
* @return
*/
public List<Route> routeList() {
List<Route> routeList = new ArrayList<>();
Route routeFirst = new Route(queueFirst, listFirst);
Route routeSecond = new Route(queueSecond, listSecond);
routeList.add(routeFirst);
routeList.add(routeSecond);
return routeList;
}

@Value("${mq.monitor.count}")
private int monitorCount;
@Value("${mq.queue.first}")
private String queueFirst;
@Value("${mq.queue.second}")
private String queueSecond;
@Value("${mq.consumer.first}")
private String listFirst;
@Value("${mq.consumer.second}")
private String listSecond;
}

如果使用的是xml配置 请参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<bean id="redisMQ" class="git.yampery.msmq.RedisMQ">
<property name="monitorCount" value="15"/>
<property name="routes">
<list>
<bean class="git.yampery.msmq.Route">
<property name="queue" value="${mq.queue.first}"/>
<property name="list" value="${mq.consumer.first}"/>
</bean>
<bean class="git.yampery.msmq.Route">
<property name="queue" value="${mq.queue.second}"/>
<property name="list" value="${mq.consumer.second}"/>
</bean>
</list>
</property>
</bean>

消费者

并没有内置消费者监听器来实现,可以直接使用定时器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package git.yampery.task;

import com.alibaba.fastjson.JSONObject;
import git.yampery.msmq.RedisMQ;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import java.util.List;

/**
* @decription MsgTask
* <p>发送消息</p>
* @author Yampery
* @date 2018/2/9 18:04
*/
@Component
public class MsgTask {

@Resource private RedisMQ redisMQ;
// @Value("${mq.list.first}") private String MQ_LIST_FIRST;

@Scheduled(cron="*/5 * * * * *")
public void sendMsg() {
// 消费
List<String> msgs = redisMQ.consume(redisMQ.getRoutes().get(0).getList());
int len;
if (null != msgs && 0 < (len = msgs.size())) {
// 将每一条消息转为JSONObject
JSONObject jObj;
for (int i = 0; i < len; i++) {
if (!StringUtils.isEmpty(msgs.get(i))) {
jObj = JSONObject.parseObject(msgs.get(i));
// 取出消息
System.out.println(jObj.toJSONString());
}
}
}
}
}

测试

测试设置20秒延迟,发布消息到queue:1,在list:1消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package git.yampery.mq;

import com.alibaba.fastjson.JSONObject;
import git.yampery.msmq.Message;
import git.yampery.msmq.RedisMQ;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;
import java.util.UUID;

/**
* @decription TestMQ
* <p>测试</p>
* @author Yampery
* @date 2018/2/9 18:43
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestMQ {

@Resource
private RedisMQ redisMQ;
@Value("${mq.queue.first}")
private String MQ_QUEUE_FIRST;

@Test
public void testMq() {

JSONObject jObj = new JSONObject();
jObj.put("msg", "这是一条短信");

String seqId = UUID.randomUUID().toString();

// 将有效信息放入消息队列和消息池中
Message message = new Message();
message.setBody(jObj.toJSONString());
// 可以添加延迟配置
message.setDelay(20);
message.setTopic("SMS");
message.setCreateTime(System.currentTimeMillis());
message.setId(seqId);
// 设置消息池ttl,防止长期占用
message.setTtl(20 * 60);
message.setStatus(0);
message.setPriority(0);
redisMQ.addMsgPool(message);
redisMQ.enMessage(MQ_QUEUE_FIRST,
message.getCreateTime() + message.getDelay() + message.getPriority(), message.getId());
}
}

总结

文章利用redis已有的数据存储结构,实现了轻量级的消息队列,并未真正实现消息持久化。示例是针对点对点的消息路由方式,当然,也可以扩展成广播和主题的方式,不过,这样就得不偿失了,如果需求比较复杂,可靠性要求较高,反而不如直接维护rabbitmq之类的消息队列。 需要使用rabbitmq实现延迟消息请参考这里

源码连接

文章并未贴出所有代码,gradle构建、jedisUtils以及一些配置等可以参考源码,源码只需要设置自己的redis配置即可。 https://github.com/Yampery/rdsmq.git