Lua脚本在Redis事务中的应用实践
使用过Redis事务的应该清楚,Redis事务实现是通过打包多条命令,单独的隔离操作,事务中的所有命令都会按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。事务中的命令要么全部被执行,要么全部都不执行(原子操作)。但其中有命令因业务原因执行失败并不会阻断后续命令的执行,且也无法回滚已经执行过的命令。如果想要实现和MySQL一样的事务处理可以使用Lua脚本来实现,Lua脚本中可实现简单的逻辑判断,执行中止等操作。
1 初始Lua脚本
Lua是一个小巧的脚本语言,Redis 脚本使用 Lua 解释器来执行脚本。 Reids 2.6 版本通过内嵌支持 Lua 环境。执行脚本的常用命令为 EVAL。编写Lua脚本就和编写shell脚本一样的简单。Lua语言详细教程参见
示例:
--[[
version:1.0
检测key是否存在,如果存在并设置过期时间
入参列表:
参数个数量:1
KEYS[1]:goodsKey 商品Key
返回列表code:
+0:不存在
+1:存在
--]]
local usableKey = KEYS[1]
--[ 判断usableKey在Redis中是否存在 存在将过期时间延长1分钟 并返回是否存在结果--]
local usableExists = redis.call('EXISTS', usableKey)
if (1 == usableExists) then
redis.call('PEXPIRE', usableKey, 60000)
end
return { usableExists }
- 示例代码中redis.call(), 是Redis内置方法,用与执行redis命令
- if () then end 是Lua语言基本分支语法
- KEYS 为Redis环境执行Lua脚本时Redis Key 参数,如果使用变量入参使用ARGV接收
- “—”代表单行注释 “—[[ 多行注释 —]]”
2 实践应用
2.1 需求分析
经典案例需求:库存量扣减并检测库存量是否充足。
基础需求分析:商品当前库存量>=扣减数量时,执行扣减。商品当前库存量<扣减数量时,返回库存不足
实现方案分析:
1)MySQL事务实现:
- 利用DB行级锁,锁定要扣减商品库存量数据,再判断库存量是否充足,充足执行扣减,否则返回库存不足。
- 执行库存扣减,再判断扣减后结果是否小于0,小于0说明库存不足,事务回滚,否则提交事务。
2)方案优缺点分析:
- 优点:MySQL天然支持事务,实现难度低。
- 缺点:不考虑热点商品场景,当业务量达到一定量级时会达到MySQL性能瓶颈,单库无法支持业务时扩展问题成为难点,分表、分库等方案对功能开发、业务运维、数据运维都须要有针对于分表、分库方案所配套的系统或方案。对于系统改造实现难度较高。
Redis Lua脚本事务实现:将库存扣减判断库存量小原子操作逻辑编写为Lua脚本。
- 从DB中初始化商品库存数量,利用Redis WATCH命令。
- 判断商品库存量是否充足,充足执行扣减,否则返回库存不足。
- 执行库存扣减,再判断扣减后结果是否小于0,小于0说明库存不足,反向操作增加减少库存量,返回操作结果
方案优缺点分析:
- 优点:Redis命令执行单线程特性,无须考虑并发锁竟争所带来的实现复杂度。Redis天然支持Lua脚本,Lua语言学习难度低,实现与MySQL方案难度相当。Redis同一时间单位支持的并发量比MySQL大,执行耗时更小。对于业务量的增长可以扩容Redis集群分片。
- 缺点:暂无
2.2 Redis Lua脚本事务方案实现
初始化商品库存量:
//利用Watch 命令乐观乐特性,减少锁竞争所损耗的性能
public boolean init(InitStockCallback initStockCallback, InitOperationData initOperationData) {
//SessionCallback 会话级Rdis事务回调接口 针对于operations所有操作将在同一个Redis tcp连接上完成
List<Object> result = stringRedisTemplate.execute(new SessionCallback<List<Object>>() {
public List<Object> execute(RedisOperations operations) {
Assert.notNull(operations, "operations must not be null");
//Watch 命令用于监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断
//当出前并发初始化同一个商品库存量时,只有一个能成功
operations.watch(initOperationData.getWatchKeys());
int initQuantity;
try {
//查询DB商品库存量
initQuantity = initStockCallback.getInitQuantity(initOperationData);
} catch (Exception e) {
//异常后释放watch
operations.unwatch();
throw e;
}
//开启Reids事务
operations.multi();
//setNx设置商品库存量
operations.opsForValue().setIfAbsent(initOperationData.getGoodsKey(), String.valueOf(initQuantity));
//设置商品库存量 key 过期时间
operations.expire(initOperationData.getGoodsKey(), Duration.ofMinutes(60000L));
///执行事事务
return operations.exec();
}
});
//判断事务执行结果
if (!CollectionUtils.isEmpty(result) && result.get() instanceof Boolean) {
return (Boolean) result.get();
}
return false;
}
库存扣减逻辑
--[[
version:1.0
减可用库存
入参列表:
参数个数量:
KEYS[1]:usableKey 商品可用量Key
KEYS[3]:usableSubtractKey 减量记录key
KEYS[4]:operateKey 操作防重Key
KEYS[5]:hSetRecord 记录操作单号信息
ARGV[1]:quantity操作数量
ARGV[2]:version 操作版本号
ARGV[5]:serialNumber 单据流水编码
ARGV[6]:record 是否记录过程量
返回列表:
+1:操作成功
0: 操作失败
-1: KEY不存在
-2:重复操作
-3: 库存不足
-4:过期操作
-5:缺量库存不足
-6:可用负库存
--]]
local usableKey = KEYS[1];
local usableSubtractKey = KEYS[3]
local operateKey = KEYS[4]
local hSetRecord = KEYS[5]
local quantity = tonumber(ARGV[1])
local version = ARGV[2]
local serialNumber = ARGV[5]
--[ 判断商品库存key是否存在 不存在返回-1 --]
local usableExists = redis.call('EXISTS', usableKey);
if ( == usableExists) then
return { -1, version, , };
end
--[ 设置防重key 设置失败说明操作重复返回-2 --]
local isNotRepeat = redis.call('SETNX', operateKey, version);
if ( == isNotRepeat) then
redis.call('SET', operateKey, version);
return { -2, version, quantity, };
end
--[ 商品库存量扣减后小0 说明库存不足 回滚扣减数量 并清除防重key立即过期 返回-3 --]
local usableResult = redis.call('DECRBY', usableKey, quantity);
if ( usableResult < ) then
redis.call('INCRBY', usableKey, quantity);
redis.call('PEXPIRE', operateKey, );
return { -3, version, , usableResult };
end
--[ 记录扣减量并设置防重key 30天后过期 返回 1--]
-- [ 需要记录过程量与过程单据信息 --]
local usableSubtractResult = redis.call('INCRBY', usableSubtractKey, quantity);
redis.call('HSET', hSetRecord, serialNumber, quantity)
redis.call('PEXPIRE', hSetRecord, 3600000)
redis.call('PEXPIRE', operateKey, 2592000000)
redis.call('PEXPIRE', usableKey, 3600000)
return { 1, version, quantity, , usableResult ,usableSubtractResult}
初始化Lua脚本到Redis服务器
//读取Lua脚本文件
private String readLua(File file) {
StringBuilder sbf = new StringBuilder();
try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
String temp;
while (Objects.nonNull(temp = reader.readLine())) {
sbf.append(temp);
sbf.append('\n');
}
return sbf.toString();
} catch (FileNotFoundException e) {
LOGGER.error("[{}]文件不存在", file.getPath());
} catch (IOException e) {
LOGGER.error("[{}]文件读取异常", file.getPath());
}
return null;
}
//初始化Lua脚本到Redis服务器 成功后会返回脚本对应的sha1码,系统缓存脚本sha1码,
//通过sha1码可以在Redis服务器执行对应的脚本
public String scriptLoad(File file) {
String script = readLua(file)
return stringRedisTemplate.execute((RedisCallback<String>) connection -> connection.scriptLoad(script.getBytes()));
}
脚本执行
public OperationResult evalSha(String redisScriptSha1,OperationData operationData) {
List<String> keys = operationData.getKeys();
String[] args = operationData.getArgs();
//执行Lua脚本 keys 为Lua脚本中使用到的KEYS args为Lua脚本中使用到的ARGV参数
//如果是在Redis集群模式下,同一个脚本中的多个key,要满足多个key在同一个分片
//服务器开启hash tag功能,多个key 使用{}将相同部分包裹
//例:usableKey:{EMG123} operateKey:operate:{EMG123}
Object result = stringRedisTemplate.execute(redisScriptSha1, keys, args);
//解析执行结果
return parseResult(operationData, result);
}
3 总结
Redis在小数据操作并发可达到10W,针对与业务中对资源强校验且高并发场景下使用Redis配合Lua脚本完成简单逻辑处理抗并发量是个不错的选择。
注:Lua脚本逻辑尽量简单,Lua脚本实用于耗时短且原子操作。耗时长影响Redis服务器性能,非原子操作或逻辑复杂会增加于脚本调试与维度难度。理想状态是将业务用Lua脚本包装成一个如Redis命令一样的操作。
相关文章