基于阻塞队列实现的异步秒杀 大体思路
lua脚本
初始化
1 2 3 4 5 6 private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static { SECKILL_SCRIPT=new DefaultRedisScript <>(); SECKILL_SCRIPT.setLocation(new ClassPathResource ("seckill.lua" )); SECKILL_SCRIPT.setResultType(Long.class); }
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 local voucherId=ARGV[1 ]local userId=ARGV[2 ]local stockKey='seckill:stock:' .. voucherIdlocal orderKey='seckill:order:' .. voucherIdif (tonumber (redis.call('get' ,stockKey))<=0 )then return 1 end if (redis.call('sismember' ,orderKey,userId)==1 ) then return 2 end redis.call('incrby' ,stockKey,-1 ) redis.call('sadd' ,orderKey,userId) return 0
代码步骤
生产者的逻辑 :
请求打到voucherOrderService.seckillById这个服务上,并携带者优惠券的id
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 private IVoucherOrderService proxy; @Override public Result seckillById (Long voucherId) { Long userId = UserHolder.getUser().getId(); Long flag = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString() ); int r = flag.intValue(); if (r!=0 ){ return Result.fail(r==1 ?"库存不足" :"不能重复购买" ); } long orderId = redisIdWorker.nextId("order" ); VoucherOrder voucherOrder=new VoucherOrder (); voucherOrder.setVoucherId(voucherId); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); orderTasks.add(voucherOrder); this .proxy= (IVoucherOrderService) AopContext.currentProxy(); return Result.ok(orderId); }
这里是先调用前面的lua脚本,扣减库存,并根据返回数字判断是否扣减成功。成功后则,创建订单,放入阻塞队列中
消费者的逻辑 :
1,创建阻塞队列
1 private BlockingQueue<VoucherOrder> orderTasks=new ArrayBlockingQueue <VoucherOrder>(1024 *1024 );
2,在服务一开始就要不断从阻塞队列中接受内容,所以使用@PostConstruct这个注解,执行线程任务
初始化
1 2 3 4 5 6 7 private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor(); @PostConstruct private void init () { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandle ()); }
线程任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 private class VoucherOrderHandle implements Runnable { @Override public void run () { while (true ){ try { VoucherOrder voucherOrder = orderTasks.take(); handleVoucherOrder(voucherOrder); }catch (Exception e){ log.error("处理订单异常" ,e); } } } private void handleVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId); boolean isLock = lock.tryLock(); if (!isLock){ log.error("不能重复下单!" ); return ; } try { proxy.createVoucherOrder(voucherOrder); } finally { lock.unlock(); } } }
创建订单方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Transactional public void createVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); Long voucherId = voucherOrder.getVoucherId(); int count = this .query().eq("user_id" , userId).eq("voucher_id" , voucherId).count(); if (count>0 ){ log.error("用户已经购买过一次" ); return ; } boolean flag = seckillVoucherService.update() .eq("voucher_id" , voucherId) .gt("stock" , 0 ) .setSql("stock = stock - 1" ) .update(); if (!flag){ throw new RuntimeException ("秒杀券库存不足" ); } save(voucherOrder); }
jemeter压测结果