基于阻塞队列实现的异步秒杀

大体思路

image-20250313203220089

img

lua脚本

img

初始化

1
2
3
4
5
6
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT=new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
local voucherId=ARGV[1]

local userId=ARGV[2]

local stockKey='seckill:stock:' .. voucherId

local orderKey='seckill:order:' .. voucherId

if(tonumber(redis.call('get',stockKey))<=0)then
return 1
end
if(redis.call('sismember',orderKey,userId)==1) then
return 2
end
redis.call('incrby',stockKey,-1)

redis.call('sadd',orderKey,userId)

return 0

代码步骤

生产者的逻辑

请求打到voucherOrderService.seckillById这个服务上,并携带者优惠券的id

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private IVoucherOrderService proxy;
/**
* 通过id抢购秒杀券
* @param voucherId
* @return
*/
@Override
public Result seckillById(Long voucherId) {

Long userId = UserHolder.getUser().getId();

Long flag = stringRedisTemplate.execute(
//执行lua脚本抢单逻辑
SECKILL_SCRIPT,
//key为空
Collections.emptyList(),
//优惠券id
voucherId.toString(),
//用户id
userId.toString()
);

//拆箱防止空指针
int r = flag.intValue();

if(r!=0){
return Result.fail(r==1?"库存不足":"不能重复购买");
}

//生成订单号
long orderId = redisIdWorker.nextId("order");

//创建订单,并添加到阻塞队列中俄
VoucherOrder voucherOrder=new VoucherOrder();

voucherOrder.setVoucherId(voucherId);
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);

//添加阻塞队列
orderTasks.add(voucherOrder);

//获取当前代理对象,防止Spring事务失效
//由于是子线程所以无法从ThreadLocal中取出代理对象

this.proxy= (IVoucherOrderService) AopContext.currentProxy();

return Result.ok(orderId);

}

这里是先调用前面的lua脚本,扣减库存,并根据返回数字判断是否扣减成功。成功后则,创建订单,放入阻塞队列中

消费者的逻辑

1,创建阻塞队列

1
private BlockingQueue<VoucherOrder> orderTasks=new ArrayBlockingQueue<VoucherOrder>(1024*1024);

2,在服务一开始就要不断从阻塞队列中接受内容,所以使用@PostConstruct这个注解,执行线程任务

初始化

1
2
3
4
5
6
7
private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();

@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandle());
}

线程任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private class VoucherOrderHandle implements Runnable{

@Override
public void run() {

while(true){

try{
//1,获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
//2,创建订单
handleVoucherOrder(voucherOrder);

}catch (Exception e){
log.error("处理订单异常",e);
}
}

}


//由此函数调用创建订单的函数
private void handleVoucherOrder(VoucherOrder voucherOrder) {

Long userId = voucherOrder.getUserId();

RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);

boolean isLock = lock.tryLock();

if(!isLock){
log.error("不能重复下单!");
return;
}
try{
proxy.createVoucherOrder(voucherOrder);
} finally {
lock.unlock();
}
}
}

创建订单方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 创建订单
*
*/
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
//一人一单

Long userId = voucherOrder.getUserId();

Long voucherId = voucherOrder.getVoucherId();

int count = this.query().eq("user_id", userId).eq("voucher_id", voucherId).count();

//用户如果购买,则count会大于零,没有购买资格
if(count>0){
log.error("用户已经购买过一次");
return ;
}

//count 等于零说明有购买资格,秒杀券的库存减一
boolean flag = seckillVoucherService.update()
.eq("voucher_id", voucherId)
.gt("stock", 0)
.setSql("stock = stock - 1")
.update();

if(!flag){
throw new RuntimeException("秒杀券库存不足");
}

//6,创建订单
save(voucherOrder);

}

jemeter压测结果

image-20250313205420956