使用Redis的Stream结构作为消息队列,实现的异步秒杀下单

需求:

  • 创建一个Stream类型的消息队列,名为stream.orders
  • 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
  • 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单\

修改了lua表达式

新增以下内容

1
2
-- 订单id
local orderId=ARGV[3]
1
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)

在扣减库存成功后,向消息队列中发送订单消息,更改了之前,在查询订单后封装订单信息,放入阻塞队列中的操作

消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 通过id抢购秒杀券
* @param voucherId
* @return
*/
@Override
public Result seckillById(Long voucherId) {

Long userId = UserHolder.getUser().getId();

long orderId = redisIdWorker.nextId("order");

Long flag = stringRedisTemplate.execute(
//执行lua脚本抢单逻辑
SECKILL_SCRIPT,
//key为空
Collections.emptyList(),
//优惠券id
voucherId.toString(),
//用户id
userId.toString(),
//订单id
String.valueOf(orderId)
);

//拆箱防止空指针
int r = flag.intValue();

if(r!=0){
return Result.fail(r==1?"库存不足":"不能重复购买");
}

//获取当前代理对象,防止Spring事务失效

this.proxy= (IVoucherOrderService) AopContext.currentProxy();

return Result.ok(orderId);

}

在这里将订单id作为参数,传到lua脚本中,在lua脚本中将完整的订单信息,传到stream.orders消息队列中。消费者消费时只需要从消息队列中取出。

信息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//创建线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();

@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandle());
}

private IVoucherOrderService proxy;

private class VoucherOrderHandle implements Runnable{

String queueName="stream.orders";

@Override
public void run() {



while(true){

try{
//1,获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
//消费者组名称与消费者名称
Consumer.from("g1", "c1"),
//读取一条信息阻塞2秒
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
//指出要监听的消息队列,要开始读消息的地方,>就是从下一个未消费的信息开始读
//0 就是从pending-list 也就是读已消费但未确认的消息
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//2,判断订单消息是否为空,为空则跳过这次循环,再次从消息队列中取出消息
if(list==null||list.isEmpty()){
continue;
}
//3,不为空解析信息并创建订单
//我们每次只读取一条信息
MapRecord<String, Object, Object> record = list.get(0);

//获取值,值就是vouncherId,userId,id
Map<Object, Object> value = record.getValue();

//将map对象转为我们需要的VoucherOrder对象
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(),true);

//4,创建订单
handleVoucherOrder(voucherOrder);

//5,ack确认 队列名称,消费者组名称,消息id
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());

}catch (Exception e){
log.error("处理订单异常",e);
//调用处理异常订单的函数
handlePendingList();
}
}

}

//处理异常订单
private void handlePendingList() {

while(true){

try{
//1,获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
//消费者组名称与消费者名称
Consumer.from("g1", "c1"),
//读取一条信息阻塞2秒
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
//指出要监听的消息队列,要开始读消息的地方,>就是从下一个未消费的信息开始读
//0 就是从pending-list 也就是读已消费但未确认的消息
StreamOffset.create(queueName, ReadOffset.from("0"))
);
//2,判断订单消息是否为空,为空直接结束循环,因为没有异常订单信息了
if(list==null||list.isEmpty()){
break;
}
//3,不为空解析信息并创建订单
//我们每次只读取一条信息
MapRecord<String, Object, Object> record = list.get(0);

//获取值,值就是vouncherId,userId,id
Map<Object, Object> value = record.getValue();

//将map对象转为我们需要的VoucherOrder对象
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(),true);

//4,创建订单
handleVoucherOrder(voucherOrder);

//5,ack确认 队列名称,消费者组名称,消息id
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());

}catch (Exception e){
log.error("处理Pending订单异常",e);
try {
Thread.sleep(20);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}

//由此函数调用创建订单的函数
private void handleVoucherOrder(VoucherOrder voucherOrder) {

Long userId = voucherOrder.getUserId();

RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);

boolean isLock = lock.tryLock();

if(!isLock){
log.error("不能重复下单!");
return;
}
try{
proxy.createVoucherOrder(voucherOrder);
} finally {
lock.unlock();
}
}


}

还是创建一个线程池,类初始化之后执行,不断监听消息队列,从中取出信息,进行消费。

详细解释都在注释中,

创建订单的逻辑没有变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 创建订单
*
*/
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
//一人一单

Long userId = voucherOrder.getUserId();

Long voucherId = voucherOrder.getVoucherId();

int count = this.query().eq("user_id", userId).eq("voucher_id", voucherId).count();

//用户如果购买,则count会大于零,没有购买资格
if(count>0){
log.error("用户已经购买过一次");
return ;
}

//count 等于零说明有购买资格,秒杀券的库存减一
boolean flag = seckillVoucherService.update()
.eq("voucher_id", voucherId)
.gt("stock", 0)
.setSql("stock = stock - 1")
.update();

// boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
// .eq(SeckillVoucher::getVoucherId, voucherId)
// .gt(SeckillVoucher::getStock, 0)
// .setSql("stock = stock - 1")
// );

if(!flag){
throw new RuntimeException("秒杀券库存不足");
}

//6,创建订单
save(voucherOrder);

}

特别注意的是:在启动服务前,一定要先在redis客户端中创建好消息队列,否则会出现循环报错:监听不到stream.orders的消息队列

1000个线程jemeter压测结果;

由于本地redis版本过低,所以更改为远程服务器上的Redis,本次服务用的是远程服务器上的redis

image-20250314152345296