# Copyright 2017 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Helpers for wrapping low-level gRPC methods with common functionality. This is used by gapic clients to provide common error mapping, retry, timeout, pagination, and long-running operations to gRPC methods. """ from google.api_core import general_helpers from google.api_core import grpc_helpers from google.api_core import timeout from google.api_core.gapic_v1 import client_info USE_DEFAULT_METADATA = object() DEFAULT = object() """Sentinel value indicating that a retry or timeout argument was unspecified, so the default should be used.""" def _is_not_none_or_false(value): return value is not None and value is not False def _apply_decorators(func, decorators): """Apply a list of decorators to a given function. ``decorators`` may contain items that are ``None`` or ``False`` which will be ignored. """ decorators = filter(_is_not_none_or_false, reversed(decorators)) for decorator in decorators: func = decorator(func) return func def _determine_timeout(default_timeout, specified_timeout, retry): """Determines how timeout should be applied to a wrapped method. Args: default_timeout (Optional[Timeout]): The default timeout specified at method creation time. specified_timeout (Optional[Timeout]): The timeout specified at invocation time. If :attr:`DEFAULT`, this will be set to the ``default_timeout``. retry (Optional[Retry]): The retry specified at invocation time. Returns: Optional[Timeout]: The timeout to apply to the method or ``None``. """ # If timeout is specified as a number instead of a Timeout instance, # convert it to a ConstantTimeout. if isinstance(specified_timeout, (int, float)): specified_timeout = timeout.ConstantTimeout(specified_timeout) if isinstance(default_timeout, (int, float)): default_timeout = timeout.ConstantTimeout(default_timeout) if specified_timeout is DEFAULT: specified_timeout = default_timeout if specified_timeout is default_timeout: # If timeout is the default and the default timeout is exponential and # a non-default retry is specified, make sure the timeout's deadline # matches the retry's. This handles the case where the user leaves # the timeout default but specifies a lower deadline via the retry. if ( retry and retry is not DEFAULT and isinstance(default_timeout, timeout.ExponentialTimeout) ): return default_timeout.with_deadline(retry._deadline) else: return default_timeout return specified_timeout class _GapicCallable(object): """Callable that applies retry, timeout, and metadata logic. Args: target (Callable): The low-level RPC method. retry (google.api_core.retry.Retry): The default retry for the callable. If ``None``, this callable will not retry by default timeout (google.api_core.timeout.Timeout): The default timeout for the callable. If ``None``, this callable will not specify a timeout argument to the low-level RPC method by default. metadata (Sequence[Tuple[str, str]]): Additional metadata that is provided to the RPC method on every invocation. This is merged with any metadata specified during invocation. If ``None``, no additional metadata will be passed to the RPC method. """ def __init__(self, target, retry, timeout, metadata=None): self._target = target self._retry = retry self._timeout = timeout self._metadata = metadata def __call__(self, *args, **kwargs): """Invoke the low-level RPC with retry, timeout, and metadata.""" # Note: Due to Python 2 lacking keyword-only arguments we use kwargs to # extract the retry and timeout params. timeout_ = _determine_timeout( self._timeout, kwargs.pop("timeout", self._timeout), # Use only the invocation-specified retry only for this, as we only # want to adjust the timeout deadline if the *user* specified # a different retry. kwargs.get("retry", None), ) retry = kwargs.pop("retry", self._retry) if retry is DEFAULT: retry = self._retry # Apply all applicable decorators. wrapped_func = _apply_decorators(self._target, [retry, timeout_]) # Add the user agent metadata to the call. if self._metadata is not None: metadata = kwargs.get("metadata", []) # Due to the nature of invocation, None should be treated the same # as not specified. if metadata is None: metadata = [] metadata = list(metadata) metadata.extend(self._metadata) kwargs["metadata"] = metadata return wrapped_func(*args, **kwargs) def wrap_method( func, default_retry=None, default_timeout=None, client_info=client_info.DEFAULT_CLIENT_INFO, ): """Wrap an RPC method with common behavior. This applies common error wrapping, retry, and timeout behavior a function. The wrapped function will take optional ``retry`` and ``timeout`` arguments. For example:: import google.api_core.gapic_v1.method from google.api_core import retry from google.api_core import timeout # The original RPC method. def get_topic(name, timeout=None): request = publisher_v2.GetTopicRequest(name=name) return publisher_stub.GetTopic(request, timeout=timeout) default_retry = retry.Retry(deadline=60) default_timeout = timeout.Timeout(deadline=60) wrapped_get_topic = google.api_core.gapic_v1.method.wrap_method( get_topic, default_retry) # Execute get_topic with default retry and timeout: response = wrapped_get_topic() # Execute get_topic without doing any retying but with the default # timeout: response = wrapped_get_topic(retry=None) # Execute get_topic but only retry on 5xx errors: my_retry = retry.Retry(retry.if_exception_type( exceptions.InternalServerError)) response = wrapped_get_topic(retry=my_retry) The way this works is by late-wrapping the given function with the retry and timeout decorators. Essentially, when ``wrapped_get_topic()`` is called: * ``get_topic()`` is first wrapped with the ``timeout`` into ``get_topic_with_timeout``. * ``get_topic_with_timeout`` is wrapped with the ``retry`` into ``get_topic_with_timeout_and_retry()``. * The final ``get_topic_with_timeout_and_retry`` is called passing through the ``args`` and ``kwargs``. The callstack is therefore:: method.__call__() -> Retry.__call__() -> Timeout.__call__() -> wrap_errors() -> get_topic() Note that if ``timeout`` or ``retry`` is ``None``, then they are not applied to the function. For example, ``wrapped_get_topic(timeout=None, retry=None)`` is more or less equivalent to just calling ``get_topic`` but with error re-mapping. Args: func (Callable[Any]): The function to wrap. It should accept an optional ``timeout`` argument. If ``metadata`` is not ``None``, it should accept a ``metadata`` argument. default_retry (Optional[google.api_core.Retry]): The default retry strategy. If ``None``, the method will not retry by default. default_timeout (Optional[google.api_core.Timeout]): The default timeout strategy. Can also be specified as an int or float. If ``None``, the method will not have timeout specified by default. client_info (Optional[google.api_core.gapic_v1.client_info.ClientInfo]): Client information used to create a user-agent string that's passed as gRPC metadata to the method. If unspecified, then a sane default will be used. If ``None``, then no user agent metadata will be provided to the RPC method. Returns: Callable: A new callable that takes optional ``retry`` and ``timeout`` arguments and applies the common error mapping, retry, timeout, and metadata behavior to the low-level RPC method. """ func = grpc_helpers.wrap_errors(func) if client_info is not None: user_agent_metadata = [client_info.to_grpc_metadata()] else: user_agent_metadata = None return general_helpers.wraps(func)( _GapicCallable( func, default_retry, default_timeout, metadata=user_agent_metadata ) )