延迟队列技术解读
2025/12/3大约 6 分钟
延迟队列技术解读
1. 技术概述
延迟队列是一种能够将消息延迟一定时间后再投递的队列机制,在淘票票项目中广泛应用于订单超时处理、定时任务调度等场景。本技术基于Redis实现,提供高效可靠的延迟消息处理能力。
2. 应用场景
- 订单超时关闭:用户下单后在规定时间内未支付,自动关闭订单并释放库存
- 短信延迟发送:用户操作后延迟一段时间发送提醒短信
- 定时任务调度:按照设定的时间执行特定任务
- 重试机制实现:接口调用失败后,按照指数退避策略进行重试
- 会话超时处理:用户会话超时后,清理相关资源
3. 实现原理
3.1 基于Redis的基础实现
基于Redis的Sorted Set(有序集合)实现延迟队列:
- Score字段:存储消息的到期时间戳(毫秒)
- Member字段:存储消息内容(通常是JSON字符串)
- 实现思路:
- 生产者:将消息以当前时间+延迟时间作为score放入ZSET
- 消费者:定时查询ZSET中score小于当前时间的消息
- 处理:获取到到期消息后,从ZSET中删除并进行业务处理
3.2 Redisson高级实现
淘票票项目的延迟队列模块基于Redisson实现,位于taopiaopiao-service-delay-queue-framework目录下,提供了更完善的功能:
- 基于RDelayedQueue:利用Redisson的RDelayedQueue实现可靠的延迟队列功能
- 分片机制:通过IsolationRegionSelector实现队列分片,提高并发处理能力
- 线程池隔离:生产者和消费者使用独立的线程池,避免相互影响
4. 核心组件设计
4.1 核心组件列表
- DelayQueueContext - 延迟队列发送者上下文,用于发送延迟消息
- ConsumerTask接口 - 延迟队列消费者任务接口,业务方需要实现该接口处理延迟消息
- DelayQueueInitHandler - 延迟队列初始化处理器,在应用启动时自动注册消费者
- DelayQueueProperties - 延迟队列配置属性,支持自定义线程池参数和分区数
- DelayProduceQueue - 延迟队列生产者实现
- DelayConsumerQueue - 延迟队列消费者实现
- IsolationRegionSelector - 分片选择器,用于实现队列分区
4.2 消费者任务接口定义
package com.taopiaopiao.core;
/**
* 延迟队列消费者接口
*/
public interface ConsumerTask {
/**
* 消费任务
* @param content 具体参数
*/
void execute(String content);
/**
* 主题
* @return 主题
*/
String topic();
}4.3 延迟队列生产者实现
package com.taopiaopiao.core;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import java.util.concurrent.TimeUnit;
/**
* 延迟队列生产者实现
*/
public class DelayProduceQueue extends DelayBaseQueue{
private final RDelayedQueue<String> delayedQueue;
public DelayProduceQueue(RedissonClient redissonClient, final String relTopic) {
super(redissonClient, relTopic);
this.delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
}
public void offer(String content, long delayTime, TimeUnit timeUnit) {
delayedQueue.offer(content, delayTime, timeUnit);
}
}5. 消息生产与消费实现
5.1 消息生产者核心逻辑
@Component
public class DelayMessageProducer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 发送延迟消息
* @param queueName 队列名称
* @param message 消息内容
* @param delayTime 延迟时间
* @param timeUnit 时间单位
*/
public void sendDelayMessage(String queueName, String message, long delayTime, TimeUnit timeUnit) {
// 计算到期时间戳
long expireTime = System.currentTimeMillis() + timeUnit.toMillis(delayTime);
// 添加到有序集合
redisTemplate.opsForZSet().add(queueName, message, expireTime);
}
}5.2 消息消费者核心逻辑
@Component
public class DelayMessageConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private ServiceLockTool serviceLockTool;
/**
* 处理到期消息
* @param queueName 队列名称
* @param consumer 消息处理函数
*/
public void processExpireMessages(String queueName, Consumer<String> consumer) {
while (true) {
try {
// 获取当前时间戳
long now = System.currentTimeMillis();
// 查找到期的消息(score <= now)
Set<String> messages = redisTemplate.opsForZSet().rangeByScore(queueName, 0, now, 0, 1);
if (messages != null && !messages.isEmpty()) {
String message = messages.iterator().next();
// 使用分布式锁确保消息只被一个消费者处理
String lockKey = "delay_queue:lock:" + queueName + ":" + message;
if (serviceLockTool.tryLock(lockKey, 3, TimeUnit.SECONDS)) {
try {
// 二次检查消息是否存在,防止重复消费
Double score = redisTemplate.opsForZSet().score(queueName, message);
if (score != null && score <= now) {
// 从队列中删除消息
redisTemplate.opsForZSet().remove(queueName, message);
// 处理消息
consumer.accept(message);
}
} finally {
// 释放锁
serviceLockTool.unlock(lockKey);
}
}
} else {
// 无到期消息,短暂休眠
Thread.sleep(1000);
}
} catch (Exception e) {
log.error("处理延迟消息异常", e);
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
}
}6. 高级特性与优化策略
6.1 性能优化
- 批量处理:一次性获取多条到期消息进行批量处理
- 分片设计:将一个大的延迟队列拆分为多个小队列,提高并行处理能力
- 异步处理:消息出队后放入线程池异步处理,减少阻塞时间
- 自定义线程池:根据业务需求配置合适的线程池参数
6.2 可靠性保障
- 消息去重:利用Redis的Set结构确保消息唯一性
- 持久化保障:配置Redis的AOF或RDB持久化,防止消息丢失
- 并发处理:使用分布式锁确保同一消息只被一个消费者处理
- 消息确认:采用手动确认机制,确保消息被正确处理
- 死信队列:处理失败的消息放入死信队列,进行重试或人工干预
6.3 可扩展性设计
- 配置化参数:支持自定义线程池参数和分区数
- SPI机制:通过实现ConsumerTask接口扩展消费者任务
- 自动注册:应用启动时自动扫描并注册消费者任务
7. 最佳实践
- 合理设置轮询间隔:根据业务需求设置合适的轮询频率,避免过于频繁导致性能问题
- 控制消息大小:消息体不宜过大,可只存储ID,业务数据从数据库查询
- 使用命名空间:为不同业务场景的队列设置不同的命名空间,避免冲突
- 设置最大延迟时间:避免消息延迟时间过长,影响系统稳定性
- 监控告警:对队列长度、处理延迟等指标进行监控,异常情况及时告警
- 消息幂等:确保消息重复处理不会产生副作用
- 合理设置线程池参数:根据消息处理量和处理耗时调整线程池配置
8. 集成使用示例
8.1 引入依赖
在项目的pom.xml中添加延迟队列框架依赖:
<dependency>
<groupId>com.taopiaopiao</groupId>
<artifactId>taopiaopiao-service-delay-queue-framework</artifactId>
<version>${project.version}</version>
</dependency>8.2 实现消费者任务
@Component
public class OrderTimeoutConsumerTask implements ConsumerTask {
@Autowired
private OrderService orderService;
@Override
public void execute(String content) {
// 处理订单超时逻辑
orderService.closeTimeoutOrder(content);
}
@Override
public String topic() {
return "order:timeout";
}
}8.3 发送延迟消息
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private DelayQueueContext delayQueueContext;
public void createOrder(Order order) {
// 创建订单逻辑
// 发送延迟消息,30分钟后检查订单是否支付
delayQueueContext.send("order:timeout", order.getOrderNo(), 30, TimeUnit.MINUTES);
}
}