python-redis-lock简介
python-redis-lock是一个python的第三方库,基于Redis,封装了分布式锁的逻辑,提供了更高级的API来简化锁的获取、保持和释放过程。包括自动续期、锁超时、重入锁等功能。
相比于直接使用redis的setnx,避免了写额外代码来实现锁的复杂逻辑。
锁的续期
在使用分布式锁的情况下,如果某个服务器A的业务还未执行完(可能因为网络拥塞、数据量突然变大等各种原因),但是锁过期了,可能引发额外的问题。例如另一台服务器B发现锁释放了,于是加锁并开始执行业务,从而导致出现问题。
更有可能导致服务器A业务在执行完后去释放锁时,意外地释放了服务器B加的锁,导致服务器C进入,从而引发问题(在未使用唯一性ID表示加锁者的情况下)。
所以对锁的过期时间来续期是很有必要的,它确保了锁在业务执行完后才释放。
python-redis-lock实现自动续期的源码分析
首先在配置python-redis-lock实例时,有两个参数:expire int 和 auto_renewal boolean。
expire:锁过期时间,单位为秒。
auto_renewal:是否开启自动续期锁。
python-redis-lock实例在初始化时,针对这两个参数的源码:
1
2
3
4
5
6
7
8
9
|
class Lock(object):
def __init__(self, redis_client, name, expire=None, auto_renewal=False, ...):
...
if expire:
expire = int(expire)
...
self._expire = expire
self._lock_renewal_interval = float(expire) * 2/3 if auto_renewal else None
...
|
假设我们设置了一个过期时间30秒,开启自动续期的Lock实例,则self._expire=30秒,self._lock_renewal_interval=20秒。
当在代码层调用Lock.acquire方法时,判断self._lock_renewal_interval是否为空,如果不为空,则开启锁的自动续期(这里省略了一些内容,因为只关注续期的代码实现逻辑):
1
2
3
4
5
|
class Lock(object):
def acquire(self, blocking=True, timeout=None):
...
if self._lock_renewal_interval is not None:
self._start_lock_renewer()
|
在self._start_lock_renewer方法中,基于python的threading模块,开启了一个锁自动更新线程self._lock_renewal_thread。这个线程执行self._lock_renewer方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
def _start_lock_renewer(self):
...
self._lock_renewal_stop = threading.Event() # threading.Event()用于线程间的协调
self._lock_renewal_thread = threading.Thread(
group=None,
target=self._lock_renewer,
kwargs={
'name': self._name,
'lockref': weakref.ref(self), # 对Lock实例的弱引用
'interval': self._lock_renewal_interval,
'stop': self._lock_renewal_stop,
},
)
self._lock_renewal_thread.demon = True
self._lock_renewal_thread.start()
|
weakref.ref(self)创建当前类实例(self)的弱引用,并将其存储在变量 lockref 中。使用 weakref.ref 创建的弱引用不会阻止对象被垃圾回收。
在self._lock_renewer方法中,首先用了while循环来每次等待20秒(interval=20)再执行循环体:如果弱引用对象(即Lock实例本身)没有被垃圾回来,则执行续期的方法(lock.extend):
1
2
3
4
5
6
7
|
def _lock_renewer(name, lockref, interval, stop):
while not stop.wait(timeout=interval):
...
lock: "Lock" = lockref()
if lock is None:
break
lock.extend(expire=lock._expire)
|
在self.extend方法中,执行了一个Lua脚本来更新锁的过期时间。
在满足self._name和self._signal的缓存键值存在且锁未过期的情况下,将当前锁的过期时间重置为expire(30秒)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
def extend(self. expire=None):
if expire:
expire = int(expire)
error = self.extend_script(
client=self._client, # 对redis的连接
keys=(self._name, self._signal),
args=(self._id, expire))
...
# self.extend_script的执行逻辑:
EXTEND_SCRIPT = b"""
if redis.call("get", KEYS[1]) ~= ARGV[1] then
return 1
elseif redis.call("ttl", KEYS[1]) < 0 then
return 2
else
redis.call("expire", KEYS[1], ARGV[2])
return 0
end
"""
cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
|
以上就是python-redis-lock实现锁自动续期的源码逻辑。
其中用到了多线程threading、弱引用weakref和Lua脚本等相关知识。
|