# Copyright 2016-2018 Julien Danjou # Copyright 2017 Elisey Zanko # Copyright 2016 Étienne Bersac # Copyright 2016 Joshua Harlow # Copyright 2013-2014 Ray Holder # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import functools import sys import threading import time import typing as t import warnings from abc import ABC, abstractmethod from concurrent import futures from inspect import iscoroutinefunction # Import all built-in retry strategies for easier usage. from .retry import retry_base # noqa from .retry import retry_all # noqa from .retry import retry_always # noqa from .retry import retry_any # noqa from .retry import retry_if_exception # noqa from .retry import retry_if_exception_type # noqa from .retry import retry_if_not_exception_type # noqa from .retry import retry_if_not_result # noqa from .retry import retry_if_result # noqa from .retry import retry_never # noqa from .retry import retry_unless_exception_type # noqa from .retry import retry_if_exception_message # noqa from .retry import retry_if_not_exception_message # noqa # Import all nap strategies for easier usage. from .nap import sleep # noqa from .nap import sleep_using_event # noqa # Import all built-in stop strategies for easier usage. from .stop import stop_after_attempt # noqa from .stop import stop_after_delay # noqa from .stop import stop_all # noqa from .stop import stop_any # noqa from .stop import stop_never # noqa from .stop import stop_when_event_set # noqa # Import all built-in wait strategies for easier usage. from .wait import wait_chain # noqa from .wait import wait_combine # noqa from .wait import wait_exponential # noqa from .wait import wait_fixed # noqa from .wait import wait_incrementing # noqa from .wait import wait_none # noqa from .wait import wait_random # noqa from .wait import wait_random_exponential # noqa from .wait import wait_random_exponential as wait_full_jitter # noqa # Import all built-in before strategies for easier usage. from .before import before_log # noqa from .before import before_nothing # noqa # Import all built-in after strategies for easier usage. from .after import after_log # noqa from .after import after_nothing # noqa # Import all built-in after strategies for easier usage. from .before_sleep import before_sleep_log # noqa from .before_sleep import before_sleep_nothing # noqa try: import tornado # type: ignore except ImportError: tornado = None # type: ignore if t.TYPE_CHECKING: import types from .wait import wait_base from .stop import stop_base WrappedFn = t.TypeVar("WrappedFn", bound=t.Callable) _RetValT = t.TypeVar("_RetValT") @t.overload def retry(fn: WrappedFn) -> WrappedFn: pass @t.overload def retry(*dargs: t.Any, **dkw: t.Any) -> t.Callable[[WrappedFn], WrappedFn]: # noqa pass def retry(*dargs: t.Any, **dkw: t.Any) -> t.Union[WrappedFn, t.Callable[[WrappedFn], WrappedFn]]: # noqa """Wrap a function with a new `Retrying` object. :param dargs: positional arguments passed to Retrying object :param dkw: keyword arguments passed to the Retrying object """ # support both @retry and @retry() as valid syntax if len(dargs) == 1 and callable(dargs[0]): return retry()(dargs[0]) else: def wrap(f: WrappedFn) -> WrappedFn: if isinstance(f, retry_base): warnings.warn( f"Got retry_base instance ({f.__class__.__name__}) as callable argument, " f"this will probably hang indefinitely (did you mean retry={f.__class__.__name__}(...)?)" ) if iscoroutinefunction(f): r: "BaseRetrying" = AsyncRetrying(*dargs, **dkw) elif tornado and hasattr(tornado.gen, "is_coroutine_function") and tornado.gen.is_coroutine_function(f): r = TornadoRetrying(*dargs, **dkw) else: r = Retrying(*dargs, **dkw) return r.wraps(f) return wrap class TryAgain(Exception): """Always retry the executed function when raised.""" NO_RESULT = object() class DoAttempt: pass class DoSleep(float): pass class BaseAction: """Base class for representing actions to take by retry object. Concrete implementations must define: - __init__: to initialize all necessary fields - REPR_FIELDS: class variable specifying attributes to include in repr(self) - NAME: for identification in retry object methods and callbacks """ REPR_FIELDS: t.Sequence[str] = () NAME: t.Optional[str] = None def __repr__(self) -> str: state_str = ", ".join(f"{field}={getattr(self, field)!r}" for field in self.REPR_FIELDS) return f"{self.__class__.__name__}({state_str})" def __str__(self) -> str: return repr(self) class RetryAction(BaseAction): REPR_FIELDS = ("sleep",) NAME = "retry" def __init__(self, sleep: t.SupportsFloat) -> None: self.sleep = float(sleep) _unset = object() def _first_set(first: t.Union[t.Any, object], second: t.Any) -> t.Any: return second if first is _unset else first class RetryError(Exception): """Encapsulates the last attempt instance right before giving up.""" def __init__(self, last_attempt: "Future") -> None: self.last_attempt = last_attempt super().__init__(last_attempt) def reraise(self) -> t.NoReturn: if self.last_attempt.failed: raise self.last_attempt.result() raise self def __str__(self) -> str: return f"{self.__class__.__name__}[{self.last_attempt}]" class AttemptManager: """Manage attempt context.""" def __init__(self, retry_state: "RetryCallState"): self.retry_state = retry_state def __enter__(self) -> None: pass def __exit__( self, exc_type: t.Optional[t.Type[BaseException]], exc_value: t.Optional[BaseException], traceback: t.Optional["types.TracebackType"], ) -> t.Optional[bool]: if isinstance(exc_value, BaseException): self.retry_state.set_exception((exc_type, exc_value, traceback)) return True # Swallow exception. else: # We don't have the result, actually. self.retry_state.set_result(None) return None class BaseRetrying(ABC): def __init__( self, sleep: t.Callable[[t.Union[int, float]], None] = sleep, stop: "stop_base" = stop_never, wait: "wait_base" = wait_none(), retry: retry_base = retry_if_exception_type(), before: t.Callable[["RetryCallState"], None] = before_nothing, after: t.Callable[["RetryCallState"], None] = after_nothing, before_sleep: t.Optional[t.Callable[["RetryCallState"], None]] = None, reraise: bool = False, retry_error_cls: t.Type[RetryError] = RetryError, retry_error_callback: t.Optional[t.Callable[["RetryCallState"], t.Any]] = None, ): self.sleep = sleep self.stop = stop self.wait = wait self.retry = retry self.before = before self.after = after self.before_sleep = before_sleep self.reraise = reraise self._local = threading.local() self.retry_error_cls = retry_error_cls self.retry_error_callback = retry_error_callback def copy( self, sleep: t.Union[t.Callable[[t.Union[int, float]], None], object] = _unset, stop: t.Union["stop_base", object] = _unset, wait: t.Union["wait_base", object] = _unset, retry: t.Union[retry_base, object] = _unset, before: t.Union[t.Callable[["RetryCallState"], None], object] = _unset, after: t.Union[t.Callable[["RetryCallState"], None], object] = _unset, before_sleep: t.Union[t.Optional[t.Callable[["RetryCallState"], None]], object] = _unset, reraise: t.Union[bool, object] = _unset, retry_error_cls: t.Union[t.Type[RetryError], object] = _unset, retry_error_callback: t.Union[t.Optional[t.Callable[["RetryCallState"], t.Any]], object] = _unset, ) -> "BaseRetrying": """Copy this object with some parameters changed if needed.""" return self.__class__( sleep=_first_set(sleep, self.sleep), stop=_first_set(stop, self.stop), wait=_first_set(wait, self.wait), retry=_first_set(retry, self.retry), before=_first_set(before, self.before), after=_first_set(after, self.after), before_sleep=_first_set(before_sleep, self.before_sleep), reraise=_first_set(reraise, self.reraise), retry_error_cls=_first_set(retry_error_cls, self.retry_error_cls), retry_error_callback=_first_set(retry_error_callback, self.retry_error_callback), ) def __repr__(self) -> str: return ( f"<{self.__class__.__name__} object at 0x{id(self):x} (" f"stop={self.stop}, " f"wait={self.wait}, " f"sleep={self.sleep}, " f"retry={self.retry}, " f"before={self.before}, " f"after={self.after})>" ) @property def statistics(self) -> t.Dict[str, t.Any]: """Return a dictionary of runtime statistics. This dictionary will be empty when the controller has never been ran. When it is running or has ran previously it should have (but may not) have useful and/or informational keys and values when running is underway and/or completed. .. warning:: The keys in this dictionary **should** be some what stable (not changing), but there existence **may** change between major releases as new statistics are gathered or removed so before accessing keys ensure that they actually exist and handle when they do not. .. note:: The values in this dictionary are local to the thread running call (so if multiple threads share the same retrying object - either directly or indirectly) they will each have there own view of statistics they have collected (in the future we may provide a way to aggregate the various statistics from each thread). """ try: return self._local.statistics except AttributeError: self._local.statistics = {} return self._local.statistics def wraps(self, f: WrappedFn) -> WrappedFn: """Wrap a function for retrying. :param f: A function to wraps for retrying. """ @functools.wraps(f) def wrapped_f(*args: t.Any, **kw: t.Any) -> t.Any: return self(f, *args, **kw) def retry_with(*args: t.Any, **kwargs: t.Any) -> WrappedFn: return self.copy(*args, **kwargs).wraps(f) wrapped_f.retry = self wrapped_f.retry_with = retry_with return wrapped_f def begin(self) -> None: self.statistics.clear() self.statistics["start_time"] = time.monotonic() self.statistics["attempt_number"] = 1 self.statistics["idle_for"] = 0 def iter(self, retry_state: "RetryCallState") -> t.Union[DoAttempt, DoSleep, t.Any]: # noqa fut = retry_state.outcome if fut is None: if self.before is not None: self.before(retry_state) return DoAttempt() is_explicit_retry = retry_state.outcome.failed and isinstance(retry_state.outcome.exception(), TryAgain) if not (is_explicit_retry or self.retry(retry_state=retry_state)): return fut.result() if self.after is not None: self.after(retry_state) self.statistics["delay_since_first_attempt"] = retry_state.seconds_since_start if self.stop(retry_state=retry_state): if self.retry_error_callback: return self.retry_error_callback(retry_state) retry_exc = self.retry_error_cls(fut) if self.reraise: raise retry_exc.reraise() raise retry_exc from fut.exception() if self.wait: sleep = self.wait(retry_state=retry_state) else: sleep = 0.0 retry_state.next_action = RetryAction(sleep) retry_state.idle_for += sleep self.statistics["idle_for"] += sleep self.statistics["attempt_number"] += 1 if self.before_sleep is not None: self.before_sleep(retry_state) return DoSleep(sleep) def __iter__(self) -> t.Generator[AttemptManager, None, None]: self.begin() retry_state = RetryCallState(self, fn=None, args=(), kwargs={}) while True: do = self.iter(retry_state=retry_state) if isinstance(do, DoAttempt): yield AttemptManager(retry_state=retry_state) elif isinstance(do, DoSleep): retry_state.prepare_for_next_attempt() self.sleep(do) else: break @abstractmethod def __call__(self, fn: t.Callable[..., _RetValT], *args: t.Any, **kwargs: t.Any) -> _RetValT: pass class Retrying(BaseRetrying): """Retrying controller.""" def __call__(self, fn: t.Callable[..., _RetValT], *args: t.Any, **kwargs: t.Any) -> _RetValT: self.begin() retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs) while True: do = self.iter(retry_state=retry_state) if isinstance(do, DoAttempt): try: result = fn(*args, **kwargs) except BaseException: # noqa: B902 retry_state.set_exception(sys.exc_info()) else: retry_state.set_result(result) elif isinstance(do, DoSleep): retry_state.prepare_for_next_attempt() self.sleep(do) else: return do class Future(futures.Future): """Encapsulates a (future or past) attempted call to a target function.""" def __init__(self, attempt_number: int) -> None: super().__init__() self.attempt_number = attempt_number @property def failed(self) -> bool: """Return whether a exception is being held in this future.""" return self.exception() is not None @classmethod def construct(cls, attempt_number: int, value: t.Any, has_exception: bool) -> "Future": """Construct a new Future object.""" fut = cls(attempt_number) if has_exception: fut.set_exception(value) else: fut.set_result(value) return fut class RetryCallState: """State related to a single call wrapped with Retrying.""" def __init__( self, retry_object: BaseRetrying, fn: t.Optional[WrappedFn], args: t.Any, kwargs: t.Any, ) -> None: #: Retry call start timestamp self.start_time = time.monotonic() #: Retry manager object self.retry_object = retry_object #: Function wrapped by this retry call self.fn = fn #: Arguments of the function wrapped by this retry call self.args = args #: Keyword arguments of the function wrapped by this retry call self.kwargs = kwargs #: The number of the current attempt self.attempt_number: int = 1 #: Last outcome (result or exception) produced by the function self.outcome: t.Optional[Future] = None #: Timestamp of the last outcome self.outcome_timestamp: t.Optional[float] = None #: Time spent sleeping in retries self.idle_for: float = 0.0 #: Next action as decided by the retry manager self.next_action: t.Optional[RetryAction] = None @property def seconds_since_start(self) -> t.Optional[float]: if self.outcome_timestamp is None: return None return self.outcome_timestamp - self.start_time def prepare_for_next_attempt(self) -> None: self.outcome = None self.outcome_timestamp = None self.attempt_number += 1 self.next_action = None def set_result(self, val: t.Any) -> None: ts = time.monotonic() fut = Future(self.attempt_number) fut.set_result(val) self.outcome, self.outcome_timestamp = fut, ts def set_exception(self, exc_info: t.Tuple[t.Type[BaseException], BaseException, "types.TracebackType"]) -> None: ts = time.monotonic() fut = Future(self.attempt_number) fut.set_exception(exc_info[1]) self.outcome, self.outcome_timestamp = fut, ts def __repr__(self): if self.outcome is None: result = "none yet" elif self.outcome.failed: exception = self.outcome.exception() result = f"failed ({exception.__class__.__name__} {exception})" else: result = f"returned {self.outcome.result()}" slept = float(round(self.idle_for, 2)) clsname = self.__class__.__name__ return f"<{clsname} {id(self)}: attempt #{self.attempt_number}; slept for {slept}; last result: {result}>" from tenacity._asyncio import AsyncRetrying # noqa:E402,I100 if tornado: from tenacity.tornadoweb import TornadoRetrying