使用Redis的Stream结构作为消息队列,实现的异步秒杀下单
需求:
- 创建一个Stream类型的消息队列,名为stream.orders
- 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
- 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单\
修改了lua表达式
新增以下内容
1
| redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
|
在扣减库存成功后,向消息队列中发送订单消息,更改了之前,在查询订单后封装订单信息,放入阻塞队列中的操作
消息生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
@Override public Result seckillById(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
Long flag = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) );
int r = flag.intValue();
if(r!=0){ return Result.fail(r==1?"库存不足":"不能重复购买"); }
this.proxy= (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(orderId);
}
|
在这里将订单id作为参数,传到lua脚本中,在lua脚本中将完整的订单信息,传到stream.orders消息队列中。消费者消费时只需要从消息队列中取出。
信息消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
| private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();
@PostConstruct private void init(){ SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandle()); }
private IVoucherOrderService proxy;
private class VoucherOrderHandle implements Runnable{
String queueName="stream.orders";
@Override public void run() {
while(true){
try{ List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(queueName, ReadOffset.lastConsumed()) ); if(list==null||list.isEmpty()){ continue; } MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(),true);
handleVoucherOrder(voucherOrder);
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
}catch (Exception e){ log.error("处理订单异常",e); handlePendingList(); } }
}
private void handlePendingList() {
while(true){
try{ List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(queueName, ReadOffset.from("0")) ); if(list==null||list.isEmpty()){ break; } MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(),true);
handleVoucherOrder(voucherOrder);
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
}catch (Exception e){ log.error("处理Pending订单异常",e); try { Thread.sleep(20); } catch (InterruptedException ex) { ex.printStackTrace(); } } } }
private void handleVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);
boolean isLock = lock.tryLock();
if(!isLock){ log.error("不能重复下单!"); return; } try{ proxy.createVoucherOrder(voucherOrder); } finally { lock.unlock(); } }
}
|
还是创建一个线程池,类初始化之后执行,不断监听消息队列,从中取出信息,进行消费。
详细解释都在注释中,
创建订单的逻辑没有变
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
@Transactional public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
int count = this.query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count>0){ log.error("用户已经购买过一次"); return ; }
boolean flag = seckillVoucherService.update() .eq("voucher_id", voucherId) .gt("stock", 0) .setSql("stock = stock - 1") .update();
if(!flag){ throw new RuntimeException("秒杀券库存不足"); }
save(voucherOrder);
}
|
特别注意的是:在启动服务前,一定要先在redis客户端中创建好消息队列,否则会出现循环报错:监听不到stream.orders的消息队列
1000个线程jemeter压测结果;
由于本地redis版本过低,所以更改为远程服务器上的Redis,本次服务用的是远程服务器上的redis
