Bug#1109865: marked as done (unblock: patroni/4.0.6-1) (2/4)
From
Debian Bug Tracking System@21:1/5 to
All on Sat Jul 26 15:20:01 2025
[continued from previous message]
+ @staticmethod
+ def _isotime() -> str:
+ return datetime.datetime.now(tzutc).isoformat()
+
def update_leader(self, cluster: Cluster, last_lsn: Optional[int],
slots: Optional[Dict[str, int]] = None, failsafe: Optional[Dict[str, str]] = None) -> bool:
kind = self._kinds.get(self.leader_path)
@@ -1241,7 +1244,7 @@
if kind and kind_annotations.get(self._LEADER) != self._name:
return False
- now = datetime.datetime.now(tzutc).isoformat()
+ now = self._isotime()
leader_observed_record = kind_annotations or self._leader_observed_record
annotations = {self._LEADER: self._name, 'ttl': str(self._ttl), 'renewTime': now,
'acquireTime': leader_observed_record.get('acquireTime') or now,
@@ -1259,7 +1262,7 @@
return self._update_leader_with_retry(annotations, resource_version, self.__ips)
def attempt_to_acquire_leader(self) -> bool:
- now = datetime.datetime.now(tzutc).isoformat()
+ now = self._isotime()
annotations = {self._LEADER: self._name, 'ttl': str(self._ttl),
'renewTime': now, 'acquireTime': now, 'transitions': '0'}
if self._leader_observed_record:
@@ -1274,18 +1277,55 @@
annotations['acquireTime'] = self._leader_observed_record.get('acquireTime') or now
annotations['transitions'] = str(transitions)
+ resource_version = self._leader_resource_version
+ if resource_version:
+ kind = self._kinds.get(self.leader_path)
+ # If leader object in cache was updated we should better use fresh resource_version
+ if kind and kind.metadata.resource_version != resource_version:
+ kind_annotations = kind and kind.metadata.annotations or EMPTY_DICT
+ # But, only in case if leader annotations didn't change
+ if all(kind_annotations.get(k) == self._leader_observed_record.get(k) for k in annotations.keys()):
+ resource_version = kind.metadata.resource_version
+
+ retry = self._retry.copy()
+
+ def _retry(*args: Any, **kwargs: Any) -> Any:
+ kwargs['_retry'] = retry
+ return retry(*args, **kwargs)
+
+ handle_conflict = False
try:
ret = bool(self._patch_or_create(self.leader_path, annotations,
- self._leader_resource_version, retry=self.retry, ips=self.__ips))
+ resource_version, retry=_retry, ips=self.__ips))
except k8s_client.rest.ApiException as e:
- if e.status == 409 and self._leader_resource_version: # Conflict in resource_version
+ if e.status == 409 and resource_version: # Conflict in resource_version
# Terminate watchers, it could be a sign that K8s API is in a failed state
self._kinds.kill_stream()
self._pods.kill_stream()
+ handle_conflict = True
ret = False
except (RetryFailedError, K8sException) as e:
raise KubernetesError(e)
+ if handle_conflict and retry.ensure_deadline(1):
+ # if we are here, that means update failed with 409
+ # Try to get the latest version directly from K8s API instead of relying on async cache
+ try:
+ kind = _retry(self._api.read_namespaced_kind, self.leader_path, self._namespace)
+ except (RetryFailedError, K8sException) as e:
+ raise KubernetesError(e)
+ except Exception as e:
+ logger.error('Failed to get the leader object "%s": %r', self.leader_path, e)
+ else:
+ kind_annotations = kind and kind.metadata.annotations or EMPTY_DICT
+ kind_resource_version = kind and kind.metadata.resource_version
+
+ # We can get 409 because we do at least one retry, and the first update might have succeeded,
+ # therefore we will check if annotations on the read object match expectations.
+ if kind and kind_resource_version != resource_version and\
+ all(kind_annotations.get(k) == v for k, v in annotations.items()):
+ ret = True
+
if not ret:
logger.info('Could not take out TTL lock')
return ret
diff -Nru patroni-4.0.5/patroni/ha.py patroni-4.0.6/patroni/ha.py
--- patroni-4.0.5/patroni/ha.py 2025-02-20 16:40:20.000000000 +0100
+++ patroni-4.0.6/patroni/ha.py 2025-06-06 19:27:48.000000000 +0200
@@ -263,9 +263,6 @@
# used only in backoff after failing a pre_promote script
self._released_leader_key_timestamp = 0
- # Initialize global config
- global_config.update(None, self.patroni.config.dynamic_configuration) -
def primary_stop_timeout(self) -> Union[int, None]:
""":returns: "primary_stop_timeout" from the global configuration or `None` when not in synchronous mode."""
ret = global_config.primary_stop_timeout
@@ -853,7 +850,7 @@
voters=sync.voters,
numsync=sync_state.numsync,
sync=sync_state.sync,
- numsync_confirmed=s