1111import json
1212import pathlib
1313import re
14- import signal
1514import time
1615import typing
1716import uuid
@@ -207,8 +206,22 @@ def release(self):
207206
208207 return self ._release ()
209208
209+ def _renew (self ) -> bool :
210+ """
211+ Implementation of renewing an acquired lock. Must be implemented in
212+ the sub-class.
213+ """
214+ raise NotImplementedError ("Must be implemented in the sub-class" )
215+
216+ def renew (self ) -> bool :
217+ """
218+ Renew a lock that is already acquired.
219+ """
220+ return self ._renew ()
221+
210222 def __enter__ (self ):
211223 self .acquire ()
224+ return self
212225
213226 def __exit__ (self , exc_type , exc_value , traceback ):
214227 self .release ()
@@ -331,6 +344,15 @@ def _release(self):
331344 )
332345 return self ._lock_proxy .release ()
333346
347+ def _renew (self ) -> bool :
348+ if self ._lock_proxy is None :
349+ raise LockException (
350+ "Lock backend has not been configured and "
351+ "lock cannot be acquired or released. "
352+ "Configure lock backend first."
353+ )
354+ return self ._lock_proxy .renew ()
355+
334356 @property
335357 def _locked (self ):
336358 if self ._lock_proxy is None :
@@ -406,6 +428,19 @@ class RedisLock(BaseLock):
406428 return result
407429 """
408430
431+ _renew_script = """
432+ local result = 0
433+ if redis.call('GET', KEYS[1]) == KEYS[2] then
434+ if KEYS[3] ~= -1 then
435+ redis.call('EXPIRE', KEYS[1], KEYS[3])
436+ else
437+ redis.call('PERSIST', KEYS[1])
438+ end
439+ result = 1
440+ end
441+ return result
442+ """
443+
409444 def __init__ (self , lock_name , ** kwargs ):
410445 """
411446 :param str lock_name: name of the lock to uniquely identify the lock
@@ -434,6 +469,7 @@ def __init__(self, lock_name, **kwargs):
434469 # Register Lua script
435470 self ._acquire_func = self .client .register_script (self ._acquire_script )
436471 self ._release_func = self .client .register_script (self ._release_script )
472+ self ._renew_func = self .client .register_script (self ._renew_script )
437473
438474 @property
439475 def _key_name (self ):
@@ -466,6 +502,14 @@ def _release(self):
466502
467503 self ._owner = None
468504
505+ def _renew (self ) -> bool :
506+ if self ._owner is None :
507+ raise LockException ("Lock was not set by this process." )
508+
509+ if self ._release_func (keys = [self ._key_name , self ._owner , self .expire ]) != 1 :
510+ return False
511+ return True
512+
469513 @property
470514 def _locked (self ):
471515 if self .client .get (self ._key_name ) is None :
@@ -555,10 +599,6 @@ def _key_name(self):
555599 def _acquire (self ):
556600 owner = str (uuid .uuid4 ())
557601
558- _args = [self ._key_name , owner ]
559- if self .expire is not None :
560- _args .append (self .expire )
561-
562602 try :
563603 self .client .write (self ._key_name , owner , prevExist = False , ttl = self .expire )
564604 self ._owner = owner
@@ -572,7 +612,7 @@ def _release(self):
572612 raise LockException ("Lock was not set by this process." )
573613
574614 try :
575- self .client .delete (self ._key_name , prevValue = str ( self ._owner ) )
615+ self .client .delete (self ._key_name , prevValue = self ._owner )
576616 self ._owner = None
577617 except ValueError :
578618 raise LockException (
@@ -584,6 +624,23 @@ def _release(self):
584624 "Lock could not be released as it has not been acquired"
585625 )
586626
627+ def _renew (self ) -> bool :
628+ if self ._owner is None :
629+ raise LockException ("Lock was not set by this process." )
630+
631+ try :
632+ self .client .write (
633+ self ._key_name ,
634+ None ,
635+ ttl = self .expire ,
636+ prevValue = self ._owner ,
637+ prevExist = True ,
638+ refresh = True ,
639+ )
640+ return True
641+ except (etcd .EtcdCompareFailed , etcd .EtcdKeyNotFound ):
642+ return False
643+
587644 @property
588645 def _locked (self ):
589646 try :
@@ -682,7 +739,7 @@ def _acquire(self):
682739 if self .expire is not None :
683740 _args .append (self .expire )
684741 # Set key only if it does not exist
685- if self .client .add (* tuple ( _args ) ) is True :
742+ if self .client .add (* _args ) is True :
686743 self ._owner = owner
687744 return True
688745 else :
@@ -707,6 +764,21 @@ def _release(self):
707764 "Lock could not be released as it has not " "been acquired"
708765 )
709766
767+ def _renew (self ) -> bool :
768+ if self ._owner is None :
769+ raise LockException ("Lock was not set by this process." )
770+
771+ resp = self .client .get (self ._key_name )
772+ if resp is not None :
773+ if resp == str (self ._owner ):
774+ _args = [self ._key_name , self ._owner ]
775+ if self .expire is not None :
776+ _args .append (self .expire )
777+ # Update key with new TTL
778+ self .client .set (* _args )
779+ return True
780+ return False
781+
710782 @property
711783 def _locked (self ):
712784 return True if self .client .get (self ._key_name ) is not None else False
@@ -809,10 +881,7 @@ def _key_name(self):
809881 key = self .lock_name
810882 return key
811883
812- def _has_expired (
813- self , lease : kubernetes .client .V1Lease , now : datetime .datetime
814- ) -> bool :
815- # Determine whether the Lease has expired.
884+ def _expiry_time (self , lease : kubernetes .client .V1Lease ) -> datetime .datetime :
816885 expiry_time = datetime .datetime .min
817886 if (
818887 lease .spec .renew_time is not None
@@ -823,7 +892,13 @@ def _has_expired(
823892 )
824893 elif lease .spec .lease_duration_seconds is None :
825894 expiry_time = datetime .datetime .max
826- return now > expiry_time .astimezone (tz = datetime .timezone .utc )
895+ return expiry_time .astimezone (tz = datetime .timezone .utc )
896+
897+ def _has_expired (
898+ self , lease : kubernetes .client .V1Lease , now : datetime .datetime
899+ ) -> bool :
900+ # Determine whether the Lease has expired.
901+ return now > self ._expiry_time (lease )
827902
828903 def _create_lease (
829904 self ,
@@ -955,6 +1030,32 @@ def _release(self) -> None:
9551030 # protects us from race conditions in deleting the Lease.
9561031 self ._delete_lease (lease )
9571032
1033+ def _renew (self ) -> bool :
1034+ if self ._owner is None :
1035+ raise LockException ("Lock was not set by this process." )
1036+
1037+ lease = self ._get_lease ()
1038+ if lease is not None and self ._owner == lease .spec .holder_identity :
1039+ now = self ._now ()
1040+ has_expired = self ._has_expired (lease , now )
1041+
1042+ if has_expired :
1043+ return False
1044+
1045+ lease .spec .acquire_time = now
1046+ lease .spec .renew_time = now
1047+ lease .spec .lease_duration_seconds = self .expire
1048+ # The Lease object contains a `.metadata.resource_version` which
1049+ # protects us from race conditions in updating the Lease as described:
1050+ # https://blog.atomist.com/kubernetes-apply-replace-patch/.
1051+ if self ._replace_lease (lease ) is None :
1052+ # Someone else has modified the Lease before we renewed it so it
1053+ # must've expired.
1054+ raise False
1055+ return True
1056+
1057+ return False
1058+
9581059 @property
9591060 def _locked (self ):
9601061 lease = self ._get_lease ()
@@ -1109,6 +1210,25 @@ def _release(self) -> None:
11091210 if self ._owner == data ["owner" ]:
11101211 self ._data_file .unlink ()
11111212
1213+ def _renew (self ) -> bool :
1214+ if self ._owner is None :
1215+ raise LockException ("Lock was not set by this process." )
1216+
1217+ if self ._data_file .exists ():
1218+ with self ._lock_file :
1219+ data = json .loads (self ._data_file .read_text ())
1220+
1221+ now = self ._now ()
1222+ has_expired = self ._has_expired (data , now )
1223+ if self ._owner == data ["owner" ]:
1224+ if has_expired :
1225+ return False
1226+ # Refresh expiry time.
1227+ data ["expiry_time" ] = self ._expiry_time ()
1228+ self ._data_file .write_text (json .dumps (data ))
1229+ return True
1230+ return False
1231+
11121232 @property
11131233 def _locked (self ):
11141234 if not self ._data_file .exists ():
0 commit comments