问题:为什么需要消息队列?

​ –解答:当前项目中使用java中提供的阻塞队列来存储订单,该阻塞队列有两个问题:1. 内存受限,不能超过机器的内存;2. 一旦机器宕机,位于队列中的信息会全部丢失。而消息队列就是专门用于存放中间信息的,相当于一个中转站,它不受限于机器本身,是一个第三方的组件。

image-20240315212305725

问题:如何使用Redis实现消息队列?为什么不使用List以及PubSub?

​ –解答:目前市面上有专门的消息队列组件比如:MQ、RabbitMQ、Kafaka等,但其实也可以使用Redis的Stream数据结构实现。

image-20240315220322697

问题:什么情况下会发生Stream消息漏读的情况?如何解决?

​ –解答:当消费者进程由于某种原因停止运行,那么在这段时间内生产者发送的消息就会被遗漏。可以使用消费者组机制解决,具体来说就是将消费者归为某个组,当消费者出现问题后,恢复时可以选择从pending-list中读取消息,pending-list中存放的是那些未处理的消息。并且采用这种机制后,每个消息处理后消费者需要发送ACK表示已经处理完了该消息,该消息才能从pending-list中移除。

​ 总结一句话:采用消息确认机制来解决消息漏读问题。

image-20240315214657878

实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/**
* <p>
* 秒杀优惠券表,与优惠券是一对一关系 服务实现类
* </p>
*
* @author dch
* @since 2024-03-08
*/
@Service
@Slf4j
public class SeckillVoucherServiceImpl extends ServiceImpl<SeckillVoucherMapper, SeckillVoucher> implements ISeckillVoucherService {
@Resource
private UniqueIdGenerator uniqueIdGenerator;
@Resource
private VoucherOrderMapper voucherOrderMapper;
private static DefaultRedisScript redisScript;
private String messageStream = "stream.orders";

static{
redisScript = new DefaultRedisScript();
redisScript.setLocation(new ClassPathResource("/RedisWork.lua"));
redisScript.setResultType(Long.class);
}
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 创建一个线程
*/
private static ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
private ISeckillVoucherService currentProxy;

@PostConstruct
public void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

private class VoucherOrderHandler implements Runnable{
//项目一启动就检查阻塞队列中是否有未处理的订单,有就进行处理
public void run(){
handleVoucherOrder();
}
}
public Result seckillVoucher(Long id){
//1.根据id从数据库中查询对应的优惠券
SeckillVoucher seckillVoucher = getById(id);
//2.判断该优惠券是否开始以及结束
if(seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("活动未开始");
}
if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("活动已结束");
}
List<String> keys = new ArrayList<>();
keys.add(RedisConstants.SECKILL_STOCK_KEY + id);
keys.add(RedisConstants.SECKILL_VOUCHER_KEY + id);
Long userid = UserHolder.getUser().getId();
long seckillVoucherOrderId = uniqueIdGenerator.generatorId(RedisConstants.SECKILL_VOUCHER_ID_PREFIX);
//调用lua脚本判断库存是否充足以及该用户是否已经购买过该秒杀券,并将满足条件的订单加入消息队列
Long flag = (Long)stringRedisTemplate.execute(redisScript, keys, userid + "",id + "",seckillVoucherOrderId + "");

if(flag == 1){
return Result.fail("库存不足");
}
if(flag == 2){
return Result.fail("不能重复购买");
}
//2.生成订单
VoucherOrder voucherOrder = new VoucherOrder();
//2.1 填入订单id,使用全局唯一id生成器生成
voucherOrder.setId(seckillVoucherOrderId);
//2.2 填入用户id
voucherOrder.setUserId(userid);
//2.3 填入优惠券id
voucherOrder.setVoucherId(id);
currentProxy = (ISeckillVoucherService) AopContext.currentProxy();

return Result.ok(voucherOrder);
}

/**
* 不断从消息队列中读取消息
*/
public void handleVoucherOrder(){
while(true){
try {
//读取消息
List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),
StreamReadOptions.empty().block(Duration.ofSeconds(2)).count(1),
StreamOffset.create(messageStream, ReadOffset.lastConsumed()));
if(read.isEmpty() || read == null){
//未读到消息
continue;
}

MapRecord<String, Object, Object> message = read.get(0);
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(message.getValue(), new VoucherOrder(),true);
//ACK确认,注意:第137行和138行不可交换顺序,否则当更新数据库操作失败时,该订单会不翼而飞
currentProxy.createVoucherOrder(voucherOrder);
stringRedisTemplate.opsForStream().acknowledge(messageStream,"g1",message.getId());
} catch (Exception e) {
handlePaddinList();
log.error("处理订单异常");
}
}
}

/**
* 处理订单过程发送异常,恢复时需要处理paddin-list中的消息
*/
public void handlePaddinList(){
while(true){
try {
//读取消息
List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(messageStream, ReadOffset.from("0")));
if(read.isEmpty() || read == null){
//未读到消息
break;
}

MapRecord<String, Object, Object> message = read.get(0);
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(message.getValue(), new VoucherOrder(),true);
//ACK确认
currentProxy.createVoucherOrder(voucherOrder);
stringRedisTemplate.opsForStream().acknowledge(messageStream,"g1",message.getId());
} catch (Exception e) {
log.error("处理padding-list异常");
}
}
}
/**
* 更新数据库
* @param voucherOrder
*/
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder){
//更新库存
boolean success = update().setSql("stock = stock - 1").eq("voucher_id", voucherOrder.getId()).update();
voucherOrderMapper.insert(voucherOrder);
}
}