# Copyright 2015 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Client for interacting with the Google Cloud Storage API.""" import base64 import binascii import collections import datetime import functools import json import warnings import google.api_core.client_options from google.auth.credentials import AnonymousCredentials from google.api_core import page_iterator from google.cloud._helpers import _LocalStack from google.cloud.client import ClientWithProject from google.cloud.exceptions import NotFound from google.cloud.storage._helpers import _bucket_bound_hostname_url from google.cloud.storage._helpers import _get_api_endpoint_override from google.cloud.storage._helpers import _get_environ_project from google.cloud.storage._helpers import _get_storage_emulator_override from google.cloud.storage._helpers import _use_client_cert from google.cloud.storage._helpers import _virtual_hosted_style_base_url from google.cloud.storage._helpers import _DEFAULT_UNIVERSE_DOMAIN from google.cloud.storage._helpers import _DEFAULT_SCHEME from google.cloud.storage._helpers import _STORAGE_HOST_TEMPLATE from google.cloud.storage._helpers import _NOW from google.cloud.storage._helpers import _UTC from google.cloud.storage._http import Connection from google.cloud.storage._signing import ( get_expiration_seconds_v4, get_v4_now_dtstamps, ensure_signed_credentials, _sign_message, ) from google.cloud.storage.batch import Batch from google.cloud.storage.bucket import Bucket, _item_to_blob, _blobs_page_start from google.cloud.storage.blob import Blob from google.cloud.storage.hmac_key import HMACKeyMetadata from google.cloud.storage.acl import BucketACL from google.cloud.storage.acl import DefaultObjectACL from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.retry import DEFAULT_RETRY _marker = object() class Client(ClientWithProject): """Client to bundle configuration needed for API requests. :type project: str or None :param project: the project which the client acts on behalf of. Will be passed when creating a topic. If not passed, falls back to the default inferred from the environment. :type credentials: :class:`~google.auth.credentials.Credentials` :param credentials: (Optional) The OAuth2 Credentials to use for this client. If not passed (and if no ``_http`` object is passed), falls back to the default inferred from the environment. :type _http: :class:`~requests.Session` :param _http: (Optional) HTTP object to make requests. Can be any object that defines ``request()`` with the same interface as :meth:`requests.Session.request`. If not passed, an ``_http`` object is created that is bound to the ``credentials`` for the current object. This parameter should be considered private, and could change in the future. :type client_info: :class:`~google.api_core.client_info.ClientInfo` :param client_info: The client info used to send a user-agent string along with API requests. If ``None``, then default info will be used. Generally, you only need to set this if you're developing your own library or partner tool. :type client_options: :class:`~google.api_core.client_options.ClientOptions` or :class:`dict` :param client_options: (Optional) Client options used to set user options on the client. A non-default universe domain or api endpoint should be set through client_options. :type use_auth_w_custom_endpoint: bool :param use_auth_w_custom_endpoint: (Optional) Whether authentication is required under custom endpoints. If false, uses AnonymousCredentials and bypasses authentication. Defaults to True. Note this is only used when a custom endpoint is set in conjunction. :type extra_headers: dict :param extra_headers: (Optional) Custom headers to be sent with the requests attached to the client. For example, you can add custom audit logging headers. """ SCOPE = ( "https://www.googleapis.com/auth/devstorage.full_control", "https://www.googleapis.com/auth/devstorage.read_only", "https://www.googleapis.com/auth/devstorage.read_write", ) """The scopes required for authenticating as a Cloud Storage consumer.""" def __init__( self, project=_marker, credentials=None, _http=None, client_info=None, client_options=None, use_auth_w_custom_endpoint=True, extra_headers={}, ): self._base_connection = None if project is None: no_project = True project = "" else: no_project = False if project is _marker: project = None # Save the initial value of constructor arguments before they # are passed along, for use in __reduce__ defined elsewhere. self._initial_client_info = client_info self._initial_client_options = client_options self._extra_headers = extra_headers connection_kw_args = {"client_info": client_info} if client_options: if isinstance(client_options, dict): client_options = google.api_core.client_options.from_dict( client_options ) if client_options and client_options.universe_domain: self._universe_domain = client_options.universe_domain else: self._universe_domain = None storage_emulator_override = _get_storage_emulator_override() api_endpoint_override = _get_api_endpoint_override() # Determine the api endpoint. The rules are as follows: # 1. If the `api_endpoint` is set in `client_options`, use that as the # endpoint. if client_options and client_options.api_endpoint: api_endpoint = client_options.api_endpoint # 2. Elif the "STORAGE_EMULATOR_HOST" env var is set, then use that as the # endpoint. elif storage_emulator_override: api_endpoint = storage_emulator_override # 3. Elif the "API_ENDPOINT_OVERRIDE" env var is set, then use that as the # endpoint. elif api_endpoint_override: api_endpoint = api_endpoint_override # 4. Elif the `universe_domain` is set in `client_options`, # create the endpoint using that as the default. # # Mutual TLS is not compatible with a non-default universe domain # at this time. If such settings are enabled along with the # "GOOGLE_API_USE_CLIENT_CERTIFICATE" env variable, a ValueError will # be raised. elif self._universe_domain: # The final decision of whether to use mTLS takes place in # google-auth-library-python. We peek at the environment variable # here only to issue an exception in case of a conflict. if _use_client_cert(): raise ValueError( 'The "GOOGLE_API_USE_CLIENT_CERTIFICATE" env variable is ' 'set to "true" and a non-default universe domain is ' "configured. mTLS is not supported in any universe other than" "googleapis.com." ) api_endpoint = _DEFAULT_SCHEME + _STORAGE_HOST_TEMPLATE.format( universe_domain=self._universe_domain ) # 5. Else, use the default, which is to use the default # universe domain of "googleapis.com" and create the endpoint # "storage.googleapis.com" from that. else: api_endpoint = None connection_kw_args["api_endpoint"] = api_endpoint self._is_emulator_set = True if storage_emulator_override else False # If a custom endpoint is set, the client checks for credentials # or finds the default credentials based on the current environment. # Authentication may be bypassed under certain conditions: # (1) STORAGE_EMULATOR_HOST is set (for backwards compatibility), OR # (2) use_auth_w_custom_endpoint is set to False. if connection_kw_args["api_endpoint"] is not None: if self._is_emulator_set or not use_auth_w_custom_endpoint: if credentials is None: credentials = AnonymousCredentials() if project is None: project = _get_environ_project() if project is None: no_project = True project = "" super(Client, self).__init__( project=project, credentials=credentials, client_options=client_options, _http=_http, ) # Validate that the universe domain of the credentials matches the # universe domain of the client. if self._credentials.universe_domain != self.universe_domain: raise ValueError( "The configured universe domain ({client_ud}) does not match " "the universe domain found in the credentials ({cred_ud}). If " "you haven't configured the universe domain explicitly, " "`googleapis.com` is the default.".format( client_ud=self.universe_domain, cred_ud=self._credentials.universe_domain, ) ) if no_project: self.project = None # Pass extra_headers to Connection connection = Connection(self, **connection_kw_args) connection.extra_headers = extra_headers self._connection = connection self._batch_stack = _LocalStack() @classmethod def create_anonymous_client(cls): """Factory: return client with anonymous credentials. .. note:: Such a client has only limited access to "public" buckets: listing their contents and downloading their blobs. :rtype: :class:`google.cloud.storage.client.Client` :returns: Instance w/ anonymous credentials and no project. """ client = cls(project="", credentials=AnonymousCredentials()) client.project = None return client @property def universe_domain(self): return self._universe_domain or _DEFAULT_UNIVERSE_DOMAIN @property def api_endpoint(self): return self._connection.API_BASE_URL @property def _connection(self): """Get connection or batch on the client. :rtype: :class:`google.cloud.storage._http.Connection` :returns: The connection set on the client, or the batch if one is set. """ if self.current_batch is not None: return self.current_batch else: return self._base_connection @_connection.setter def _connection(self, value): """Set connection on the client. Intended to be used by constructor (since the base class calls) self._connection = connection Will raise if the connection is set more than once. :type value: :class:`google.cloud.storage._http.Connection` :param value: The connection set on the client. :raises: :class:`ValueError` if connection has already been set. """ if self._base_connection is not None: raise ValueError("Connection already set on client") self._base_connection = value def _push_batch(self, batch): """Push a batch onto our stack. "Protected", intended for use by batch context mgrs. :type batch: :class:`google.cloud.storage.batch.Batch` :param batch: newly-active batch """ self._batch_stack.push(batch) def _pop_batch(self): """Pop a batch from our stack. "Protected", intended for use by batch context mgrs. :raises: IndexError if the stack is empty. :rtype: :class:`google.cloud.storage.batch.Batch` :returns: the top-most batch/transaction, after removing it. """ return self._batch_stack.pop() @property def current_batch(self): """Currently-active batch. :rtype: :class:`google.cloud.storage.batch.Batch` or ``NoneType`` (if no batch is active). :returns: The batch at the top of the batch stack. """ return self._batch_stack.top def get_service_account_email( self, project=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY ): """Get the email address of the project's GCS service account :type project: str :param project: (Optional) Project ID to use for retreiving GCS service account email address. Defaults to the client's project. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` :rtype: str :returns: service account email address """ if project is None: project = self.project path = f"/projects/{project}/serviceAccount" api_response = self._get_resource(path, timeout=timeout, retry=retry) return api_response["email_address"] def bucket(self, bucket_name, user_project=None): """Factory constructor for bucket object. .. note:: This will not make an HTTP request; it simply instantiates a bucket object owned by this client. :type bucket_name: str :param bucket_name: The name of the bucket to be instantiated. :type user_project: str :param user_project: (Optional) The project ID to be billed for API requests made via the bucket. :rtype: :class:`google.cloud.storage.bucket.Bucket` :returns: The bucket object created. """ return Bucket(client=self, name=bucket_name, user_project=user_project) def batch(self, raise_exception=True): """Factory constructor for batch object. .. note:: This will not make an HTTP request; it simply instantiates a batch object owned by this client. :type raise_exception: bool :param raise_exception: (Optional) Defaults to True. If True, instead of adding exceptions to the list of return responses, the final exception will be raised. Note that exceptions are unwrapped after all operations are complete in success or failure, and only the last exception is raised. :rtype: :class:`google.cloud.storage.batch.Batch` :returns: The batch object created. """ return Batch(client=self, raise_exception=raise_exception) def _get_resource( self, path, query_params=None, headers=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY, _target_object=None, ): """Helper for bucket / blob methods making API 'GET' calls. Args: path str: The path of the resource to fetch. query_params Optional[dict]: HTTP query parameters to be passed headers Optional[dict]: HTTP headers to be passed timeout (Optional[Union[float, Tuple[float, float]]]): The amount of time, in seconds, to wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. retry (Optional[Union[google.api_core.retry.Retry, google.cloud.storage.retry.ConditionalRetryPolicy]]): How to retry the RPC. A None value will disable retries. A google.api_core.retry.Retry value will enable retries, and the object will define retriable response codes and errors and configure backoff and timeout options. A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a Retry object and activates it only if certain conditions are met. This class exists to provide safe defaults for RPC calls that are not technically safe to retry normally (due to potential data duplication or other side-effects) but become safe to retry if a condition such as if_metageneration_match is set. See the retry.py source code and docstrings in this package (google.cloud.storage.retry) for information on retry types and how to configure them. _target_object (Union[ \ :class:`~google.cloud.storage.bucket.Bucket`, \ :class:`~google.cloud.storage.bucket.blob`, \ ]): Object to which future data is to be applied -- only relevant in the context of a batch. Returns: dict The JSON resource fetched Raises: google.cloud.exceptions.NotFound If the bucket is not found. """ return self._connection.api_request( method="GET", path=path, query_params=query_params, headers=headers, timeout=timeout, retry=retry, _target_object=_target_object, ) def _list_resource( self, path, item_to_value, page_token=None, max_results=None, extra_params=None, page_start=page_iterator._do_nothing_page_start, page_size=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY, ): api_request = functools.partial( self._connection.api_request, timeout=timeout, retry=retry ) return page_iterator.HTTPIterator( client=self, api_request=api_request, path=path, item_to_value=item_to_value, page_token=page_token, max_results=max_results, extra_params=extra_params, page_start=page_start, page_size=page_size, ) def _patch_resource( self, path, data, query_params=None, headers=None, timeout=_DEFAULT_TIMEOUT, retry=None, _target_object=None, ): """Helper for bucket / blob methods making API 'PATCH' calls. Args: path str: The path of the resource to fetch. data dict: The data to be patched. query_params Optional[dict]: HTTP query parameters to be passed headers Optional[dict]: HTTP headers to be passed timeout (Optional[Union[float, Tuple[float, float]]]): The amount of time, in seconds, to wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. retry (Optional[Union[google.api_core.retry.Retry, google.cloud.storage.retry.ConditionalRetryPolicy]]): How to retry the RPC. A None value will disable retries. A google.api_core.retry.Retry value will enable retries, and the object will define retriable response codes and errors and configure backoff and timeout options. A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a Retry object and activates it only if certain conditions are met. This class exists to provide safe defaults for RPC calls that are not technically safe to retry normally (due to potential data duplication or other side-effects) but become safe to retry if a condition such as if_metageneration_match is set. See the retry.py source code and docstrings in this package (google.cloud.storage.retry) for information on retry types and how to configure them. _target_object (Union[ \ :class:`~google.cloud.storage.bucket.Bucket`, \ :class:`~google.cloud.storage.bucket.blob`, \ ]): Object to which future data is to be applied -- only relevant in the context of a batch. Returns: dict The JSON resource fetched Raises: google.cloud.exceptions.NotFound If the bucket is not found. """ return self._connection.api_request( method="PATCH", path=path, data=data, query_params=query_params, headers=headers, timeout=timeout, retry=retry, _target_object=_target_object, ) def _put_resource( self, path, data, query_params=None, headers=None, timeout=_DEFAULT_TIMEOUT, retry=None, _target_object=None, ): """Helper for bucket / blob methods making API 'PUT' calls. Args: path str: The path of the resource to fetch. data dict: The data to be patched. query_params Optional[dict]: HTTP query parameters to be passed headers Optional[dict]: HTTP headers to be passed timeout (Optional[Union[float, Tuple[float, float]]]): The amount of time, in seconds, to wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. retry (Optional[Union[google.api_core.retry.Retry, google.cloud.storage.retry.ConditionalRetryPolicy]]): How to retry the RPC. A None value will disable retries. A google.api_core.retry.Retry value will enable retries, and the object will define retriable response codes and errors and configure backoff and timeout options. A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a Retry object and activates it only if certain conditions are met. This class exists to provide safe defaults for RPC calls that are not technically safe to retry normally (due to potential data duplication or other side-effects) but become safe to retry if a condition such as if_metageneration_match is set. See the retry.py source code and docstrings in this package (google.cloud.storage.retry) for information on retry types and how to configure them. _target_object (Union[ \ :class:`~google.cloud.storage.bucket.Bucket`, \ :class:`~google.cloud.storage.bucket.blob`, \ ]): Object to which future data is to be applied -- only relevant in the context of a batch. Returns: dict The JSON resource fetched Raises: google.cloud.exceptions.NotFound If the bucket is not found. """ return self._connection.api_request( method="PUT", path=path, data=data, query_params=query_params, headers=headers, timeout=timeout, retry=retry, _target_object=_target_object, ) def _post_resource( self, path, data, query_params=None, headers=None, timeout=_DEFAULT_TIMEOUT, retry=None, _target_object=None, ): """Helper for bucket / blob methods making API 'POST' calls. Args: path str: The path of the resource to which to post. data dict: The data to be posted. query_params Optional[dict]: HTTP query parameters to be passed headers Optional[dict]: HTTP headers to be passed timeout (Optional[Union[float, Tuple[float, float]]]): The amount of time, in seconds, to wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. retry (Optional[Union[google.api_core.retry.Retry, google.cloud.storage.retry.ConditionalRetryPolicy]]): How to retry the RPC. A None value will disable retries. A google.api_core.retry.Retry value will enable retries, and the object will define retriable response codes and errors and configure backoff and timeout options. A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a Retry object and activates it only if certain conditions are met. This class exists to provide safe defaults for RPC calls that are not technically safe to retry normally (due to potential data duplication or other side-effects) but become safe to retry if a condition such as if_metageneration_match is set. See the retry.py source code and docstrings in this package (google.cloud.storage.retry) for information on retry types and how to configure them. _target_object (Union[ \ :class:`~google.cloud.storage.bucket.Bucket`, \ :class:`~google.cloud.storage.bucket.blob`, \ ]): Object to which future data is to be applied -- only relevant in the context of a batch. Returns: dict The JSON resource returned from the post. Raises: google.cloud.exceptions.NotFound If the bucket is not found. """ return self._connection.api_request( method="POST", path=path, data=data, query_params=query_params, headers=headers, timeout=timeout, retry=retry, _target_object=_target_object, ) def _delete_resource( self, path, query_params=None, headers=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY, _target_object=None, ): """Helper for bucket / blob methods making API 'DELETE' calls. Args: path str: The path of the resource to delete. query_params Optional[dict]: HTTP query parameters to be passed headers Optional[dict]: HTTP headers to be passed timeout (Optional[Union[float, Tuple[float, float]]]): The amount of time, in seconds, to wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. retry (Optional[Union[google.api_core.retry.Retry, google.cloud.storage.retry.ConditionalRetryPolicy]]): How to retry the RPC. A None value will disable retries. A google.api_core.retry.Retry value will enable retries, and the object will define retriable response codes and errors and configure backoff and timeout options. A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a Retry object and activates it only if certain conditions are met. This class exists to provide safe defaults for RPC calls that are not technically safe to retry normally (due to potential data duplication or other side-effects) but become safe to retry if a condition such as if_metageneration_match is set. See the retry.py source code and docstrings in this package (google.cloud.storage.retry) for information on retry types and how to configure them. _target_object (Union[ \ :class:`~google.cloud.storage.bucket.Bucket`, \ :class:`~google.cloud.storage.bucket.blob`, \ ]): Object to which future data is to be applied -- only relevant in the context of a batch. Returns: dict The JSON resource fetched Raises: google.cloud.exceptions.NotFound If the bucket is not found. """ return self._connection.api_request( method="DELETE", path=path, query_params=query_params, headers=headers, timeout=timeout, retry=retry, _target_object=_target_object, ) def _bucket_arg_to_bucket(self, bucket_or_name): """Helper to return given bucket or create new by name. Args: bucket_or_name (Union[ \ :class:`~google.cloud.storage.bucket.Bucket`, \ str, \ ]): The bucket resource to pass or name to create. Returns: google.cloud.storage.bucket.Bucket The newly created bucket or the given one. """ if isinstance(bucket_or_name, Bucket): bucket = bucket_or_name if bucket.client is None: bucket._client = self else: bucket = Bucket(self, name=bucket_or_name) return bucket def get_bucket( self, bucket_or_name, timeout=_DEFAULT_TIMEOUT, if_metageneration_match=None, if_metageneration_not_match=None, retry=DEFAULT_RETRY, ): """Retrieve a bucket via a GET request. See [API reference docs](https://cloud.google.com/storage/docs/json_api/v1/buckets/get) and a [code sample](https://cloud.google.com/storage/docs/samples/storage-get-bucket-metadata#storage_get_bucket_metadata-python). Args: bucket_or_name (Union[ \ :class:`~google.cloud.storage.bucket.Bucket`, \ str, \ ]): The bucket resource to pass or name to create. timeout (Optional[Union[float, Tuple[float, float]]]): The amount of time, in seconds, to wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. if_metageneration_match (Optional[long]): Make the operation conditional on whether the blob's current metageneration matches the given value. if_metageneration_not_match (Optional[long]): Make the operation conditional on whether the blob's current metageneration does not match the given value. retry (Optional[Union[google.api_core.retry.Retry, google.cloud.storage.retry.ConditionalRetryPolicy]]): How to retry the RPC. A None value will disable retries. A google.api_core.retry.Retry value will enable retries, and the object will define retriable response codes and errors and configure backoff and timeout options. A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a Retry object and activates it only if certain conditions are met. This class exists to provide safe defaults for RPC calls that are not technically safe to retry normally (due to potential data duplication or other side-effects) but become safe to retry if a condition such as if_metageneration_match is set. See the retry.py source code and docstrings in this package (google.cloud.storage.retry) for information on retry types and how to configure them. Returns: google.cloud.storage.bucket.Bucket The bucket matching the name provided. Raises: google.cloud.exceptions.NotFound If the bucket is not found. """ bucket = self._bucket_arg_to_bucket(bucket_or_name) bucket.reload( client=self, timeout=timeout, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, retry=retry, ) return bucket def lookup_bucket( self, bucket_name, timeout=_DEFAULT_TIMEOUT, if_metageneration_match=None, if_metageneration_not_match=None, retry=DEFAULT_RETRY, ): """Get a bucket by name, returning None if not found. You can use this if you would rather check for a None value than catching a NotFound exception. :type bucket_name: str :param bucket_name: The name of the bucket to get. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type if_metageneration_match: long :param if_metageneration_match: (Optional) Make the operation conditional on whether the blob's current metageneration matches the given value. :type if_metageneration_not_match: long :param if_metageneration_not_match: (Optional) Make the operation conditional on whether the blob's current metageneration does not match the given value. :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` :rtype: :class:`google.cloud.storage.bucket.Bucket` or ``NoneType`` :returns: The bucket matching the name provided or None if not found. """ try: return self.get_bucket( bucket_name, timeout=timeout, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, retry=retry, ) except NotFound: return None def create_bucket( self, bucket_or_name, requester_pays=None, project=None, user_project=None, location=None, data_locations=None, predefined_acl=None, predefined_default_object_acl=None, enable_object_retention=False, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY, ): """Create a new bucket via a POST request. See [API reference docs](https://cloud.google.com/storage/docs/json_api/v1/buckets/insert) and a [code sample](https://cloud.google.com/storage/docs/samples/storage-create-bucket#storage_create_bucket-python). Args: bucket_or_name (Union[ \ :class:`~google.cloud.storage.bucket.Bucket`, \ str, \ ]): The bucket resource to pass or name to create. requester_pays (bool): DEPRECATED. Use Bucket().requester_pays instead. (Optional) Whether requester pays for API requests for this bucket and its blobs. project (str): (Optional) The project under which the bucket is to be created. If not passed, uses the project set on the client. user_project (str): (Optional) The project ID to be billed for API requests made via created bucket. location (str): (Optional) The location of the bucket. If not passed, the default location, US, will be used. If specifying a dual-region, `data_locations` should be set in conjunction. See: https://cloud.google.com/storage/docs/locations data_locations (list of str): (Optional) The list of regional locations of a custom dual-region bucket. Dual-regions require exactly 2 regional locations. See: https://cloud.google.com/storage/docs/locations predefined_acl (str): (Optional) Name of predefined ACL to apply to bucket. See: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl predefined_default_object_acl (str): (Optional) Name of predefined ACL to apply to bucket's objects. See: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl enable_object_retention (bool): (Optional) Whether object retention should be enabled on this bucket. See: https://cloud.google.com/storage/docs/object-lock timeout (Optional[Union[float, Tuple[float, float]]]): The amount of time, in seconds, to wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. retry (Optional[Union[google.api_core.retry.Retry, google.cloud.storage.retry.ConditionalRetryPolicy]]): How to retry the RPC. A None value will disable retries. A google.api_core.retry.Retry value will enable retries, and the object will define retriable response codes and errors and configure backoff and timeout options. A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a Retry object and activates it only if certain conditions are met. This class exists to provide safe defaults for RPC calls that are not technically safe to retry normally (due to potential data duplication or other side-effects) but become safe to retry if a condition such as if_metageneration_match is set. See the retry.py source code and docstrings in this package (google.cloud.storage.retry) for information on retry types and how to configure them. Returns: google.cloud.storage.bucket.Bucket The newly created bucket. Raises: google.cloud.exceptions.Conflict If the bucket already exists. """ bucket = self._bucket_arg_to_bucket(bucket_or_name) query_params = {} if project is None: project = self.project # Use no project if STORAGE_EMULATOR_HOST is set if self._is_emulator_set: if project is None: project = _get_environ_project() if project is None: project = "" # Only include the project parameter if a project is set. # If a project is not set, falls back to API validation (BadRequest). if project is not None: query_params = {"project": project} if requester_pays is not None: warnings.warn( "requester_pays arg is deprecated. Use Bucket().requester_pays instead.", PendingDeprecationWarning, stacklevel=1, ) bucket.requester_pays = requester_pays if predefined_acl is not None: predefined_acl = BucketACL.validate_predefined(predefined_acl) query_params["predefinedAcl"] = predefined_acl if predefined_default_object_acl is not None: predefined_default_object_acl = DefaultObjectACL.validate_predefined( predefined_default_object_acl ) query_params["predefinedDefaultObjectAcl"] = predefined_default_object_acl if user_project is not None: query_params["userProject"] = user_project if enable_object_retention: query_params["enableObjectRetention"] = enable_object_retention properties = {key: bucket._properties[key] for key in bucket._changes} properties["name"] = bucket.name if location is not None: properties["location"] = location if data_locations is not None: properties["customPlacementConfig"] = {"dataLocations": data_locations} api_response = self._post_resource( "/b", properties, query_params=query_params, timeout=timeout, retry=retry, _target_object=bucket, ) bucket._set_properties(api_response) return bucket def download_blob_to_file( self, blob_or_uri, file_obj, start=None, end=None, raw_download=False, if_etag_match=None, if_etag_not_match=None, if_generation_match=None, if_generation_not_match=None, if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum="md5", retry=DEFAULT_RETRY, ): """Download the contents of a blob object or blob URI into a file-like object. See https://cloud.google.com/storage/docs/downloading-objects Args: blob_or_uri (Union[ \ :class:`~google.cloud.storage.blob.Blob`, \ str, \ ]): The blob resource to pass or URI to download. file_obj (file): A file handle to which to write the blob's data. start (int): (Optional) The first byte in a range to be downloaded. end (int): (Optional) The last byte in a range to be downloaded. raw_download (bool): (Optional) If true, download the object without any expansion. if_etag_match (Union[str, Set[str]]): (Optional) See :ref:`using-if-etag-match` if_etag_not_match (Union[str, Set[str]]): (Optional) See :ref:`using-if-etag-not-match` if_generation_match (long): (Optional) See :ref:`using-if-generation-match` if_generation_not_match (long): (Optional) See :ref:`using-if-generation-not-match` if_metageneration_match (long): (Optional) See :ref:`using-if-metageneration-match` if_metageneration_not_match (long): (Optional) See :ref:`using-if-metageneration-not-match` timeout ([Union[float, Tuple[float, float]]]): (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` checksum (str): (Optional) The type of checksum to compute to verify the integrity of the object. The response headers must contain a checksum of the requested type. If the headers lack an appropriate checksum (for instance in the case of transcoded or ranged downloads where the remote service does not know the correct checksum, including downloads where chunk_size is set) an INFO-level log will be emitted. Supported values are "md5", "crc32c" and None. The default is "md5". retry (google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy) (Optional) How to retry the RPC. A None value will disable retries. A google.api_core.retry.Retry value will enable retries, and the object will define retriable response codes and errors and configure backoff and timeout options. A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a Retry object and activates it only if certain conditions are met. This class exists to provide safe defaults for RPC calls that are not technically safe to retry normally (due to potential data duplication or other side-effects) but become safe to retry if a condition such as if_metageneration_match is set. See the retry.py source code and docstrings in this package (google.cloud.storage.retry) for information on retry types and how to configure them. Media operations (downloads and uploads) do not support non-default predicates in a Retry object. The default will always be used. Other configuration changes for Retry objects such as delays and deadlines are respected. """ if not isinstance(blob_or_uri, Blob): blob_or_uri = Blob.from_string(blob_or_uri) blob_or_uri._prep_and_do_download( file_obj, client=self, start=start, end=end, raw_download=raw_download, if_etag_match=if_etag_match, if_etag_not_match=if_etag_not_match, if_generation_match=if_generation_match, if_generation_not_match=if_generation_not_match, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, retry=retry, ) def list_blobs( self, bucket_or_name, max_results=None, page_token=None, prefix=None, delimiter=None, start_offset=None, end_offset=None, include_trailing_delimiter=None, versions=None, projection="noAcl", fields=None, page_size=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY, match_glob=None, include_folders_as_prefixes=None, soft_deleted=None, ): """Return an iterator used to find blobs in the bucket. If :attr:`user_project` is set, bills the API request to that project. .. note:: List prefixes (directories) in a bucket using a prefix and delimiter. See a [code sample](https://cloud.google.com/storage/docs/samples/storage-list-files-with-prefix#storage_list_files_with_prefix-python) listing objects using a prefix filter. Args: bucket_or_name (Union[ \ :class:`~google.cloud.storage.bucket.Bucket`, \ str, \ ]): The bucket resource to pass or name to create. max_results (int): (Optional) The maximum number of blobs to return. page_token (str): (Optional) If present, return the next batch of blobs, using the value, which must correspond to the ``nextPageToken`` value returned in the previous response. Deprecated: use the ``pages`` property of the returned iterator instead of manually passing the token. prefix (str): (Optional) Prefix used to filter blobs. delimiter (str): (Optional) Delimiter, used with ``prefix`` to emulate hierarchy. start_offset (str): (Optional) Filter results to objects whose names are lexicographically equal to or after ``startOffset``. If ``endOffset`` is also set, the objects listed will have names between ``startOffset`` (inclusive) and ``endOffset`` (exclusive). end_offset (str): (Optional) Filter results to objects whose names are lexicographically before ``endOffset``. If ``startOffset`` is also set, the objects listed will have names between ``startOffset`` (inclusive) and ``endOffset`` (exclusive). include_trailing_delimiter (boolean): (Optional) If true, objects that end in exactly one instance of ``delimiter`` will have their metadata included in ``items`` in addition to ``prefixes``. versions (bool): (Optional) Whether object versions should be returned as separate blobs. projection (str): (Optional) If used, must be 'full' or 'noAcl'. Defaults to ``'noAcl'``. Specifies the set of properties to return. fields (str): (Optional) Selector specifying which fields to include in a partial response. Must be a list of fields. For example to get a partial response with just the next page token and the name and language of each blob returned: ``'items(name,contentLanguage),nextPageToken'``. See: https://cloud.google.com/storage/docs/json_api/v1/parameters#fields page_size (int): (Optional) Maximum number of blobs to return in each page. Defaults to a value set by the API. timeout (Optional[Union[float, Tuple[float, float]]]): The amount of time, in seconds, to wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. retry (Optional[Union[google.api_core.retry.Retry, google.cloud.storage.retry.ConditionalRetryPolicy]]): How to retry the RPC. A None value will disable retries. A google.api_core.retry.Retry value will enable retries, and the object will define retriable response codes and errors and configure backoff and timeout options. A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a Retry object and activates it only if certain conditions are met. This class exists to provide safe defaults for RPC calls that are not technically safe to retry normally (due to potential data duplication or other side-effects) but become safe to retry if a condition such as if_metageneration_match is set. See the retry.py source code and docstrings in this package (google.cloud.storage.retry) for information on retry types and how to configure them. match_glob (str): (Optional) A glob pattern used to filter results (for example, foo*bar). The string value must be UTF-8 encoded. See: https://cloud.google.com/storage/docs/json_api/v1/objects/list#list-object-glob include_folders_as_prefixes (bool): (Optional) If true, includes Folders and Managed Folders in the set of ``prefixes`` returned by the query. Only applicable if ``delimiter`` is set to /. See: https://cloud.google.com/storage/docs/managed-folders soft_deleted (bool): (Optional) If true, only soft-deleted objects will be listed as distinct results in order of increasing generation number. This parameter can only be used successfully if the bucket has a soft delete policy. Note ``soft_deleted`` and ``versions`` cannot be set to True simultaneously. See: https://cloud.google.com/storage/docs/soft-delete Returns: Iterator of all :class:`~google.cloud.storage.blob.Blob` in this bucket matching the arguments. The RPC call returns a response when the iterator is consumed. As part of the response, you'll also get back an iterator.prefixes entity that lists object names up to and including the requested delimiter. Duplicate entries are omitted from this list. """ bucket = self._bucket_arg_to_bucket(bucket_or_name) extra_params = {"projection": projection} if prefix is not None: extra_params["prefix"] = prefix if delimiter is not None: extra_params["delimiter"] = delimiter if match_glob is not None: extra_params["matchGlob"] = match_glob if start_offset is not None: extra_params["startOffset"] = start_offset if end_offset is not None: extra_params["endOffset"] = end_offset if include_trailing_delimiter is not None: extra_params["includeTrailingDelimiter"] = include_trailing_delimiter if versions is not None: extra_params["versions"] = versions if fields is not None: extra_params["fields"] = fields if include_folders_as_prefixes is not None: extra_params["includeFoldersAsPrefixes"] = include_folders_as_prefixes if soft_deleted is not None: extra_params["softDeleted"] = soft_deleted if bucket.user_project is not None: extra_params["userProject"] = bucket.user_project path = bucket.path + "/o" iterator = self._list_resource( path, _item_to_blob, page_token=page_token, max_results=max_results, extra_params=extra_params, page_start=_blobs_page_start, page_size=page_size, timeout=timeout, retry=retry, ) iterator.bucket = bucket iterator.prefixes = set() return iterator def list_buckets( self, max_results=None, page_token=None, prefix=None, projection="noAcl", fields=None, project=None, page_size=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY, ): """Get all buckets in the project associated to the client. This will not populate the list of blobs available in each bucket. See [API reference docs](https://cloud.google.com/storage/docs/json_api/v1/buckets/list) and a [code sample](https://cloud.google.com/storage/docs/samples/storage-list-buckets#storage_list_buckets-python). :type max_results: int :param max_results: (Optional) The maximum number of buckets to return. :type page_token: str :param page_token: (Optional) If present, return the next batch of buckets, using the value, which must correspond to the ``nextPageToken`` value returned in the previous response. Deprecated: use the ``pages`` property of the returned iterator instead of manually passing the token. :type prefix: str :param prefix: (Optional) Filter results to buckets whose names begin with this prefix. :type projection: str :param projection: (Optional) Specifies the set of properties to return. If used, must be 'full' or 'noAcl'. Defaults to 'noAcl'. :type fields: str :param fields: (Optional) Selector specifying which fields to include in a partial response. Must be a list of fields. For example to get a partial response with just the next page token and the language of each bucket returned: 'items/id,nextPageToken' :type project: str :param project: (Optional) The project whose buckets are to be listed. If not passed, uses the project set on the client. :type page_size: int :param page_size: (Optional) Maximum number of buckets to return in each page. Defaults to a value set by the API. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` :rtype: :class:`~google.api_core.page_iterator.Iterator` :raises ValueError: if both ``project`` is ``None`` and the client's project is also ``None``. :returns: Iterator of all :class:`~google.cloud.storage.bucket.Bucket` belonging to this project. """ extra_params = {} if project is None: project = self.project # Use no project if STORAGE_EMULATOR_HOST is set if self._is_emulator_set: if project is None: project = _get_environ_project() if project is None: project = "" # Only include the project parameter if a project is set. # If a project is not set, falls back to API validation (BadRequest). if project is not None: extra_params = {"project": project} if prefix is not None: extra_params["prefix"] = prefix extra_params["projection"] = projection if fields is not None: extra_params["fields"] = fields return self._list_resource( "/b", _item_to_bucket, page_token=page_token, max_results=max_results, extra_params=extra_params, page_size=page_size, timeout=timeout, retry=retry, ) def create_hmac_key( self, service_account_email, project_id=None, user_project=None, timeout=_DEFAULT_TIMEOUT, retry=None, ): """Create an HMAC key for a service account. :type service_account_email: str :param service_account_email: e-mail address of the service account :type project_id: str :param project_id: (Optional) Explicit project ID for the key. Defaults to the client's project. :type user_project: str :param user_project: (Optional) This parameter is currently ignored. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. A None value will disable retries. A google.api_core.retry.Retry value will enable retries, and the object will define retriable response codes and errors and configure backoff and timeout options. A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a Retry object and activates it only if certain conditions are met. This class exists to provide safe defaults for RPC calls that are not technically safe to retry normally (due to potential data duplication or other side-effects) but become safe to retry if a condition such as if_metageneration_match is set. See the retry.py source code and docstrings in this package (google.cloud.storage.retry) for information on retry types and how to configure them. :rtype: Tuple[:class:`~google.cloud.storage.hmac_key.HMACKeyMetadata`, str] :returns: metadata for the created key, plus the bytes of the key's secret, which is an 40-character base64-encoded string. """ if project_id is None: project_id = self.project path = f"/projects/{project_id}/hmacKeys" qs_params = {"serviceAccountEmail": service_account_email} if user_project is not None: qs_params["userProject"] = user_project api_response = self._post_resource( path, None, query_params=qs_params, timeout=timeout, retry=retry, ) metadata = HMACKeyMetadata(self) metadata._properties = api_response["metadata"] secret = api_response["secret"] return metadata, secret def list_hmac_keys( self, max_results=None, service_account_email=None, show_deleted_keys=None, project_id=None, user_project=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY, ): """List HMAC keys for a project. :type max_results: int :param max_results: (Optional) Max number of keys to return in a given page. :type service_account_email: str :param service_account_email: (Optional) Limit keys to those created by the given service account. :type show_deleted_keys: bool :param show_deleted_keys: (Optional) Included deleted keys in the list. Default is to exclude them. :type project_id: str :param project_id: (Optional) Explicit project ID for the key. Defaults to the client's project. :type user_project: str :param user_project: (Optional) This parameter is currently ignored. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` :rtype: Tuple[:class:`~google.cloud.storage.hmac_key.HMACKeyMetadata`, str] :returns: metadata for the created key, plus the bytes of the key's secret, which is an 40-character base64-encoded string. """ if project_id is None: project_id = self.project path = f"/projects/{project_id}/hmacKeys" extra_params = {} if service_account_email is not None: extra_params["serviceAccountEmail"] = service_account_email if show_deleted_keys is not None: extra_params["showDeletedKeys"] = show_deleted_keys if user_project is not None: extra_params["userProject"] = user_project return self._list_resource( path, _item_to_hmac_key_metadata, max_results=max_results, extra_params=extra_params, timeout=timeout, retry=retry, ) def get_hmac_key_metadata( self, access_id, project_id=None, user_project=None, timeout=_DEFAULT_TIMEOUT ): """Return a metadata instance for the given HMAC key. :type access_id: str :param access_id: Unique ID of an existing key. :type project_id: str :param project_id: (Optional) Project ID of an existing key. Defaults to client's project. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type user_project: str :param user_project: (Optional) This parameter is currently ignored. """ metadata = HMACKeyMetadata(self, access_id, project_id, user_project) metadata.reload(timeout=timeout) # raises NotFound for missing key return metadata def generate_signed_post_policy_v4( self, bucket_name, blob_name, expiration, conditions=None, fields=None, credentials=None, virtual_hosted_style=False, bucket_bound_hostname=None, scheme="http", service_account_email=None, access_token=None, ): """Generate a V4 signed policy object. Generated policy object allows user to upload objects with a POST request. .. note:: Assumes ``credentials`` implements the :class:`google.auth.credentials.Signing` interface. Also assumes ``credentials`` has a ``service_account_email`` property which identifies the credentials. See a [code sample](https://github.com/googleapis/python-storage/blob/main/samples/snippets/storage_generate_signed_post_policy_v4.py). :type bucket_name: str :param bucket_name: Bucket name. :type blob_name: str :param blob_name: Object name. :type expiration: Union[Integer, datetime.datetime, datetime.timedelta] :param expiration: Policy expiration time. If a ``datetime`` instance is passed without an explicit ``tzinfo`` set, it will be assumed to be ``UTC``. :type conditions: list :param conditions: (Optional) List of POST policy conditions, which are used to restrict what is allowed in the request. :type fields: dict :param fields: (Optional) Additional elements to include into request. :type credentials: :class:`google.auth.credentials.Signing` :param credentials: (Optional) Credentials object with an associated private key to sign text. :type virtual_hosted_style: bool :param virtual_hosted_style: (Optional) If True, construct the URL relative to the bucket virtual hostname, e.g., '.storage.googleapis.com'. Incompatible with bucket_bound_hostname. :type bucket_bound_hostname: str :param bucket_bound_hostname: (Optional) If passed, construct the URL relative to the bucket-bound hostname. Value can be bare or with a scheme, e.g., 'example.com' or 'http://example.com'. Incompatible with virtual_hosted_style. See: https://cloud.google.com/storage/docs/request-endpoints#cname :type scheme: str :param scheme: (Optional) If ``bucket_bound_hostname`` is passed as a bare hostname, use this value as a scheme. ``https`` will work only when using a CDN. Defaults to ``"http"``. :type service_account_email: str :param service_account_email: (Optional) E-mail address of the service account. :type access_token: str :param access_token: (Optional) Access token for a service account. :raises: :exc:`ValueError` when mutually exclusive arguments are used. :rtype: dict :returns: Signed POST policy. """ if virtual_hosted_style and bucket_bound_hostname: raise ValueError( "Only one of virtual_hosted_style and bucket_bound_hostname " "can be specified." ) credentials = self._credentials if credentials is None else credentials ensure_signed_credentials(credentials) # prepare policy conditions and fields timestamp, datestamp = get_v4_now_dtstamps() x_goog_credential = "{email}/{datestamp}/auto/storage/goog4_request".format( email=credentials.signer_email, datestamp=datestamp ) required_conditions = [ {"bucket": bucket_name}, {"key": blob_name}, {"x-goog-date": timestamp}, {"x-goog-credential": x_goog_credential}, {"x-goog-algorithm": "GOOG4-RSA-SHA256"}, ] conditions = conditions or [] policy_fields = {} for key, value in sorted((fields or {}).items()): if not key.startswith("x-ignore-"): policy_fields[key] = value conditions.append({key: value}) conditions += required_conditions # calculate policy expiration time now = _NOW(_UTC).replace(tzinfo=None) if expiration is None: expiration = now + datetime.timedelta(hours=1) policy_expires = now + datetime.timedelta( seconds=get_expiration_seconds_v4(expiration) ) # encode policy for signing policy = json.dumps( collections.OrderedDict( sorted( { "conditions": conditions, "expiration": policy_expires.isoformat() + "Z", }.items() ) ), separators=(",", ":"), ) str_to_sign = base64.b64encode(policy.encode("utf-8")) # sign the policy and get its cryptographic signature if access_token and service_account_email: signature = _sign_message(str_to_sign, access_token, service_account_email) signature_bytes = base64.b64decode(signature) else: signature_bytes = credentials.sign_bytes(str_to_sign) # get hexadecimal representation of the signature signature = binascii.hexlify(signature_bytes).decode("utf-8") policy_fields.update( { "key": blob_name, "x-goog-algorithm": "GOOG4-RSA-SHA256", "x-goog-credential": x_goog_credential, "x-goog-date": timestamp, "x-goog-signature": signature, "policy": str_to_sign.decode("utf-8"), } ) # designate URL if virtual_hosted_style: url = _virtual_hosted_style_base_url( self.api_endpoint, bucket_name, trailing_slash=True ) elif bucket_bound_hostname: url = f"{_bucket_bound_hostname_url(bucket_bound_hostname, scheme)}/" else: url = f"{self.api_endpoint}/{bucket_name}/" return {"url": url, "fields": policy_fields} def _item_to_bucket(iterator, item): """Convert a JSON bucket to the native object. :type iterator: :class:`~google.api_core.page_iterator.Iterator` :param iterator: The iterator that has retrieved the item. :type item: dict :param item: An item to be converted to a bucket. :rtype: :class:`.Bucket` :returns: The next bucket in the page. """ name = item.get("name") bucket = Bucket(iterator.client, name) bucket._set_properties(item) return bucket def _item_to_hmac_key_metadata(iterator, item): """Convert a JSON key metadata resource to the native object. :type iterator: :class:`~google.api_core.page_iterator.Iterator` :param iterator: The iterator that has retrieved the item. :type item: dict :param item: An item to be converted to a key metadata instance. :rtype: :class:`~google.cloud.storage.hmac_key.HMACKeyMetadata` :returns: The next key metadata instance in the page. """ metadata = HMACKeyMetadata(iterator.client) metadata._properties = item return metadata