# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Helpers for retrying coroutine functions with exponential back-off. The :class:`AsyncRetry` decorator shares most functionality and behavior with :class:`Retry`, but supports coroutine functions. Please refer to description of :class:`Retry` for more details. By default, this decorator will retry transient API errors (see :func:`if_transient_error`). For example: .. code-block:: python @retry_async.AsyncRetry() async def call_flaky_rpc(): return await client.flaky_rpc() # Will retry flaky_rpc() if it raises transient API errors. result = await call_flaky_rpc() You can pass a custom predicate to retry on different exceptions, such as waiting for an eventually consistent item to be available: .. code-block:: python @retry_async.AsyncRetry(predicate=retry_async.if_exception_type(exceptions.NotFound)) async def check_if_exists(): return await client.does_thing_exist() is_available = await check_if_exists() Some client library methods apply retry automatically. These methods can accept a ``retry`` parameter that allows you to configure the behavior: .. code-block:: python my_retry = retry_async.AsyncRetry(timeout=60) result = await client.some_method(retry=my_retry) """ from __future__ import annotations import asyncio import time import functools from typing import ( Awaitable, Any, Callable, Iterable, TypeVar, TYPE_CHECKING, ) from google.api_core.retry.retry_base import _BaseRetry from google.api_core.retry.retry_base import _retry_error_helper from google.api_core.retry.retry_base import exponential_sleep_generator from google.api_core.retry.retry_base import build_retry_error from google.api_core.retry.retry_base import RetryFailureReason # for backwards compatibility, expose helpers in this module from google.api_core.retry.retry_base import if_exception_type # noqa from google.api_core.retry.retry_base import if_transient_error # noqa if TYPE_CHECKING: import sys if sys.version_info >= (3, 10): from typing import ParamSpec else: from typing_extensions import ParamSpec _P = ParamSpec("_P") # target function call parameters _R = TypeVar("_R") # target function returned value _DEFAULT_INITIAL_DELAY = 1.0 # seconds _DEFAULT_MAXIMUM_DELAY = 60.0 # seconds _DEFAULT_DELAY_MULTIPLIER = 2.0 _DEFAULT_DEADLINE = 60.0 * 2.0 # seconds _DEFAULT_TIMEOUT = 60.0 * 2.0 # seconds async def retry_target( target: Callable[_P, Awaitable[_R]], predicate: Callable[[Exception], bool], sleep_generator: Iterable[float], timeout: float | None = None, on_error: Callable[[Exception], None] | None = None, exception_factory: Callable[ [list[Exception], RetryFailureReason, float | None], tuple[Exception, Exception | None], ] = build_retry_error, **kwargs, ): """Await a coroutine and retry if it fails. This is the lowest-level retry helper. Generally, you'll use the higher-level retry helper :class:`Retry`. Args: target(Callable[[], Any]): The function to call and retry. This must be a nullary function - apply arguments with `functools.partial`. predicate (Callable[Exception]): A callable used to determine if an exception raised by the target should be considered retryable. It should return True to retry or False otherwise. sleep_generator (Iterable[float]): An infinite iterator that determines how long to sleep between retries. timeout (Optional[float]): How long to keep retrying the target, in seconds. Note: timeout is only checked before initiating a retry, so the target may run past the timeout value as long as it is healthy. on_error (Optional[Callable[Exception]]): If given, the on_error callback will be called with each retryable exception raised by the target. Any error raised by this function will *not* be caught. exception_factory: A function that is called when the retryable reaches a terminal failure state, used to construct an exception to be raised. It takes a list of all exceptions encountered, a retry.RetryFailureReason enum indicating the failure cause, and the original timeout value as arguments. It should return a tuple of the exception to be raised, along with the cause exception if any. The default implementation will raise a RetryError on timeout, or the last exception encountered otherwise. deadline (float): DEPRECATED use ``timeout`` instead. For backward compatibility, if set it will override the ``timeout`` parameter. Returns: Any: the return value of the target function. Raises: ValueError: If the sleep generator stops yielding values. Exception: a custom exception specified by the exception_factory if provided. If no exception_factory is provided: google.api_core.RetryError: If the timeout is exceeded while retrying. Exception: If the target raises an error that isn't retryable. """ timeout = kwargs.get("deadline", timeout) deadline = time.monotonic() + timeout if timeout is not None else None error_list: list[Exception] = [] for sleep in sleep_generator: try: return await target() # pylint: disable=broad-except # This function explicitly must deal with broad exceptions. except Exception as exc: # defer to shared logic for handling errors _retry_error_helper( exc, deadline, sleep, error_list, predicate, on_error, exception_factory, timeout, ) # if exception not raised, sleep before next attempt await asyncio.sleep(sleep) raise ValueError("Sleep generator stopped yielding sleep values.") class AsyncRetry(_BaseRetry): """Exponential retry decorator for async coroutines. This class is a decorator used to add exponential back-off retry behavior to an RPC call. Although the default behavior is to retry transient API errors, a different predicate can be provided to retry other exceptions. Args: predicate (Callable[Exception]): A callable that should return ``True`` if the given exception is retryable. initial (float): The minimum amount of time to delay in seconds. This must be greater than 0. maximum (float): The maximum amount of time to delay in seconds. multiplier (float): The multiplier applied to the delay. timeout (Optional[float]): How long to keep retrying in seconds. Note: timeout is only checked before initiating a retry, so the target may run past the timeout value as long as it is healthy. on_error (Optional[Callable[Exception]]): A function to call while processing a retryable exception. Any error raised by this function will *not* be caught. deadline (float): DEPRECATED use ``timeout`` instead. If set it will override ``timeout`` parameter. """ def __call__( self, func: Callable[..., Awaitable[_R]], on_error: Callable[[Exception], Any] | None = None, ) -> Callable[_P, Awaitable[_R]]: """Wrap a callable with retry behavior. Args: func (Callable): The callable or stream to add retry behavior to. on_error (Optional[Callable[Exception]]): If given, the on_error callback will be called with each retryable exception raised by the wrapped function. Any error raised by this function will *not* be caught. If on_error was specified in the constructor, this value will be ignored. Returns: Callable: A callable that will invoke ``func`` with retry behavior. """ if self._on_error is not None: on_error = self._on_error @functools.wraps(func) async def retry_wrapped_func(*args: _P.args, **kwargs: _P.kwargs) -> _R: """A wrapper that calls target function with retry.""" sleep_generator = exponential_sleep_generator( self._initial, self._maximum, multiplier=self._multiplier ) return await retry_target( functools.partial(func, *args, **kwargs), predicate=self._predicate, sleep_generator=sleep_generator, timeout=self._timeout, on_error=on_error, ) return retry_wrapped_func