消息队列技术解读
2025/12/3大约 6 分钟
淘票票项目消息队列技术实践
1. 技术概述
消息队列是淘票票分布式系统架构中的关键组件,用于实现服务间的解耦、流量削峰、异步通信和数据同步。通过消息队列,系统能够更好地应对高并发流量,提高整体系统的可用性和稳定性。项目中主要采用Kafka和Redis Stream两种消息队列技术,各自应用于不同的业务场景。
2. 应用场景
- 订单异步处理:将订单创建请求通过Kafka异步处理,避免高峰期系统过载
- API数据收集:通过Kafka收集API调用数据,用于监控和分析
- 缓存同步更新:通过Redis Stream实现缓存的异步更新,确保数据一致性
- 延迟消息处理:基于Redis的ZSet实现延迟队列,用于处理订单支付超时等场景
3. 技术选型
淘票票项目主要采用以下消息队列技术:
3.1 Kafka
- 适用场景:高吞吐、异步订单处理、API数据收集
- 优势:高吞吐量、高可靠性、分布式架构、可扩展性强
- 主要用途:订单创建异步处理、API调用数据收集
3.2 Redis Stream
- 适用场景:缓存同步、轻量级消息传递、消费组支持
- 优势:集成Redis、支持消费组、简单易用
- 主要用途:缓存数据同步更新、服务间简单消息通知
3.3 Redis延迟队列(基于ZSet)
- 适用场景:订单超时处理、定时任务
- 优势:精准控制、实现简单、与Redis集成
- 主要用途:订单支付超时关闭、资源定时释放
4. 核心实现
4.1 Kafka消息发布实现
项目中使用Kafka主要用于订单异步处理和API数据收集,下面是订单异步创建的核心实现:
@Slf4j
@AllArgsConstructor
@Component
public class CreateOrderSend {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaTopic kafkaTopic;
/**
* 发送订单创建消息到Kafka,支持成功和失败回调
* @param message 消息内容(订单数据JSON字符串)
* @param successCallback 发送成功回调
* @param failureCallback 发送失败回调
*/
public void sendMessage(String message, SuccessCallback<SendResult<String, String>> successCallback,
FailureCallback failureCallback) {
log.info("创建订单kafka发送消息 消息体 : {}", message);
CompletableFuture<SendResult<String, String>> completableFuture =
kafkaTemplate.send(SpringUtil.getPrefixDistinctionName() + "-" + kafkaTopic.getTopic(), message);
completableFuture.whenComplete((result,ex) -> {
if (Objects.isNull(ex)) {
successCallback.onSuccess(result);
}else {
failureCallback.onFailure(ex);
}
});
}
}同时,在网关服务中实现了API数据收集的消息发送:
@Slf4j
@AllArgsConstructor
public class ApiDataMessageSend {
private KafkaTemplate<String, String> kafkaTemplate;
private String topic;
/**
* 发送API数据到Kafka
* @param message API数据消息
*/
public void sendMessage(String message) {
log.info("sendMessage message : {}", message);
kafkaTemplate.send(SpringUtil.getPrefixDistinctionName() + "-" + topic, message);
}
}4.2 Redis Stream实现
Redis Stream主要用于缓存同步更新,下面是核心实现:
@Slf4j
@AllArgsConstructor
public class RedisStreamPushHandler {
private final StringRedisTemplate stringRedisTemplate;
private final RedisStreamConfigProperties redisStreamConfigProperties;
/**
* 发送消息到Redis Stream
* @param msg 消息内容
* @return 记录ID
*/
public RecordId push(String msg){
ObjectRecord<String, String> record = StreamRecords.newRecord()
.in(redisStreamConfigProperties.getStreamName())
.ofObject(msg)
.withId(RecordId.autoGenerate());
RecordId recordId = this.stringRedisTemplate.opsForStream().add(record);
log.info("redis streamName : {} message : {}", redisStreamConfigProperties.getStreamName(), msg);
return recordId;
}
}Redis Stream消费者实现:
@Slf4j
@Component
public class ProgramRedisStreamConsumer implements MessageConsumer {
@Autowired
private ProgramService programService;
@Override
public void accept(ObjectRecord<String, String> message) {
Long programId = Long.parseLong(message.getValue());
programService.delLocalCache(programId);
}
}Redis Stream自动配置:
@Slf4j
@EnableConfigurationProperties(RedisStreamConfigProperties.class)
public class RedisStreamAutoConfig {
@Bean
public RedisStreamPushHandler redisStreamPushHandler(StringRedisTemplate stringRedisTemplate,
RedisStreamConfigProperties redisStreamConfigProperties) {
return new RedisStreamPushHandler(stringRedisTemplate, redisStreamConfigProperties);
}
@Bean
public RedisStreamHandler redisStreamHandler(RedisStreamPushHandler redisStreamPushHandler,
StringRedisTemplate stringRedisTemplate) {
return new RedisStreamHandler(redisStreamPushHandler, stringRedisTemplate);
}
/**
* 配置Redis Stream消息监听器容器,支持消费组模式
*/
@Bean
@ConditionalOnBean(MessageConsumer.class)
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer(
RedisConnectionFactory redisConnectionFactory,
RedisStreamConfigProperties redisStreamConfigProperties,
RedisStreamHandler redisStreamHandler,
MessageConsumer messageConsumer) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>>
options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofSeconds(5))
.batchSize(10)
.targetType(String.class)
.errorHandler(t -> log.error("出现异常", t))
.executor(createThreadPool()).build();
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
// 配置消费组模式或单独消费者模式
checkConsumerType(redisStreamConfigProperties.getConsumerType());
RedisStreamListener redisStreamListener = new RedisStreamListener(messageConsumer);
if (RedisStreamConstant.GROUP.equals(redisStreamConfigProperties.getConsumerType())) {
redisStreamHandler.streamBindingGroup(redisStreamConfigProperties.getStreamName(),
redisStreamConfigProperties.getConsumerGroup());
container.receiveAutoAck(Consumer.from(redisStreamConfigProperties.getConsumerGroup(),
redisStreamConfigProperties.getConsumerName()),
StreamOffset.create(redisStreamConfigProperties.getStreamName(), ReadOffset.lastConsumed()),
redisStreamListener);
} else {
container.receive(StreamOffset.fromStart(redisStreamConfigProperties.getStreamName()), redisStreamListener);
}
container.start();
return container;
}
}4.3 实际业务应用 - 订单异步创建
在ProgramOrderService中,通过Kafka实现订单的异步创建:
/**
* 通过MQ创建订单
*
* @param orderCreateDto 订单创建数据传输对象
* @param purchaseSeatList 购买座位列表
* @return 订单号
*/
private String createOrderByMq(OrderCreateDto orderCreateDto, List<SeatVo> purchaseSeatList){
CreateOrderMqDomain createOrderMqDomain = new CreateOrderMqDomain();
CountDownLatch latch = new CountDownLatch(1);
createOrderSend.sendMessage(JSON.toJSONString(orderCreateDto), sendResult -> {
createOrderMqDomain.orderNumber = String.valueOf(orderCreateDto.getOrderNumber());
assert sendResult != null;
log.info("创建订单kafka发送消息成功 topic : {}", sendResult.getRecordMetadata().topic());
latch.countDown();
}, ex -> {
log.error("创建订单kafka发送消息失败 error", ex);
log.error("创建订单失败 需人工处理 orderCreateDto : {}", JSON.toJSONString(orderCreateDto));
updateProgramCacheDataResolution(orderCreateDto.getProgramId(), purchaseSeatList, OrderStatus.CANCEL);
createOrderMqDomain.taoPiaoPiaoFrameException = new TaoPiaoPiaoFrameException(ex);
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
log.error("createOrderByMq InterruptedException", e);
throw new TaoPiaoPiaoFrameException(e);
}
if (Objects.nonNull(createOrderMqDomain.taoPiaoPiaoFrameException)) {
throw createOrderMqDomain.taoPiaoPiaoFrameException;
}
return createOrderMqDomain.orderNumber;
}5. 优化策略
5.1 Kafka优化策略
- 异步发送模式:使用CompletableFuture实现异步发送,避免阻塞主线程
- 回调机制:实现成功和失败回调,便于异常处理和日志记录
- 前缀区分:通过SpringUtil.getPrefixDistinctionName()添加环境前缀,实现多环境隔离
5.2 Redis Stream优化策略
- 线程池优化:使用自定义线程池处理消息消费,提高并行处理能力
- 批量消费:配置batchSize=10,支持批量处理消息
- 消费组模式:支持Redis Stream的消费组功能,实现消息的负载均衡
- 自动确认:使用receiveAutoAck模式,消息处理成功后自动确认
5.3 高可用保障
- 失败重试:对于Kafka发送失败的情况,通过回调机制捕获异常并进行处理
- 资源释放:在订单创建失败时,通过updateProgramCacheDataResolution方法释放座位资源
- 异常隔离:使用CountDownLatch确保异步操作完成后再返回结果,避免异常丢失
- 日志记录:详细记录消息发送和处理的关键日志,便于问题排查
6. 最佳实践
6.1 消息设计最佳实践
- JSON格式:使用JSON格式序列化消息,便于不同语言之间的互操作
- 异常处理:消息发送失败时必须有明确的异常处理机制,特别是涉及资源占用的场景
- 超时控制:使用CountDownLatch配合超时机制,避免异步操作无限等待
6.2 业务应用最佳实践
- 异步解耦:核心业务如订单创建采用异步处理,提高系统吞吐量
- 缓存同步:使用Redis Stream实现缓存的异步更新,确保数据一致性
- 资源隔离:不同业务场景使用不同的消息队列实例,避免相互影响
6.3 配置与部署建议
- 条件配置:通过@ConditionalOnProperty实现条件化配置,灵活控制功能开关
- 环境隔离:使用前缀区分不同环境的消息队列,避免消息混淆
- 动态配置:消息队列参数如topic、streamName等通过配置文件管理,便于动态调整