问题:为什么需要消息队列?
–解答:当前项目中使用java中提供的阻塞队列来存储订单,该阻塞队列有两个问题:1. 内存受限,不能超过机器的内存;2. 一旦机器宕机,位于队列中的信息会全部丢失。而消息队列就是专门用于存放中间信息的,相当于一个中转站,它不受限于机器本身,是一个第三方的组件。

问题:如何使用Redis实现消息队列?为什么不使用List以及PubSub?
–解答:目前市面上有专门的消息队列组件比如:MQ、RabbitMQ、Kafaka等,但其实也可以使用Redis的Stream数据结构实现。

问题:什么情况下会发生Stream消息漏读的情况?如何解决?
–解答:当消费者进程由于某种原因停止运行,那么在这段时间内生产者发送的消息就会被遗漏。可以使用消费者组机制解决,具体来说就是将消费者归为某个组,当消费者出现问题后,恢复时可以选择从pending-list中读取消息,pending-list中存放的是那些未处理的消息。并且采用这种机制后,每个消息处理后消费者需要发送ACK表示已经处理完了该消息,该消息才能从pending-list中移除。
总结一句话:采用消息确认机制来解决消息漏读问题。

实现代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
|
@Service @Slf4j public class SeckillVoucherServiceImpl extends ServiceImpl<SeckillVoucherMapper, SeckillVoucher> implements ISeckillVoucherService { @Resource private UniqueIdGenerator uniqueIdGenerator; @Resource private VoucherOrderMapper voucherOrderMapper; private static DefaultRedisScript redisScript; private String messageStream = "stream.orders";
static{ redisScript = new DefaultRedisScript(); redisScript.setLocation(new ClassPathResource("/RedisWork.lua")); redisScript.setResultType(Long.class); } @Resource private StringRedisTemplate stringRedisTemplate;
private static ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); private ISeckillVoucherService currentProxy;
@PostConstruct public void init(){ SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); }
private class VoucherOrderHandler implements Runnable{ public void run(){ handleVoucherOrder(); } } public Result seckillVoucher(Long id){ SeckillVoucher seckillVoucher = getById(id); if(seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){ return Result.fail("活动未开始"); } if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){ return Result.fail("活动已结束"); } List<String> keys = new ArrayList<>(); keys.add(RedisConstants.SECKILL_STOCK_KEY + id); keys.add(RedisConstants.SECKILL_VOUCHER_KEY + id); Long userid = UserHolder.getUser().getId(); long seckillVoucherOrderId = uniqueIdGenerator.generatorId(RedisConstants.SECKILL_VOUCHER_ID_PREFIX); Long flag = (Long)stringRedisTemplate.execute(redisScript, keys, userid + "",id + "",seckillVoucherOrderId + "");
if(flag == 1){ return Result.fail("库存不足"); } if(flag == 2){ return Result.fail("不能重复购买"); } VoucherOrder voucherOrder = new VoucherOrder(); voucherOrder.setId(seckillVoucherOrderId); voucherOrder.setUserId(userid); voucherOrder.setVoucherId(id); currentProxy = (ISeckillVoucherService) AopContext.currentProxy();
return Result.ok(voucherOrder); }
public void handleVoucherOrder(){ while(true){ try { List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"), StreamReadOptions.empty().block(Duration.ofSeconds(2)).count(1), StreamOffset.create(messageStream, ReadOffset.lastConsumed())); if(read.isEmpty() || read == null){ continue; }
MapRecord<String, Object, Object> message = read.get(0); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(message.getValue(), new VoucherOrder(),true); currentProxy.createVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(messageStream,"g1",message.getId()); } catch (Exception e) { handlePaddinList(); log.error("处理订单异常"); } } }
public void handlePaddinList(){ while(true){ try { List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), StreamOffset.create(messageStream, ReadOffset.from("0"))); if(read.isEmpty() || read == null){ break; }
MapRecord<String, Object, Object> message = read.get(0); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(message.getValue(), new VoucherOrder(),true); currentProxy.createVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(messageStream,"g1",message.getId()); } catch (Exception e) { log.error("处理padding-list异常"); } } }
@Transactional public void createVoucherOrder(VoucherOrder voucherOrder){ boolean success = update().setSql("stock = stock - 1").eq("voucher_id", voucherOrder.getId()).update(); voucherOrderMapper.insert(voucherOrder); } }
|