# Copyright 2014 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Create / interact with Google Cloud Storage buckets.""" import base64 import copy import datetime import json from urllib.parse import urlsplit import warnings from google.api_core import datetime_helpers from google.cloud._helpers import _datetime_to_rfc3339 from google.cloud._helpers import _rfc3339_nanos_to_datetime from google.cloud.exceptions import NotFound from google.api_core.iam import Policy from google.cloud.storage import _signing from google.cloud.storage._helpers import _add_etag_match_headers from google.cloud.storage._helpers import _add_generation_match_parameters from google.cloud.storage._helpers import _NOW from google.cloud.storage._helpers import _PropertyMixin from google.cloud.storage._helpers import _UTC from google.cloud.storage._helpers import _scalar_property from google.cloud.storage._helpers import _validate_name from google.cloud.storage._signing import generate_signed_url_v2 from google.cloud.storage._signing import generate_signed_url_v4 from google.cloud.storage._helpers import _bucket_bound_hostname_url from google.cloud.storage._helpers import _virtual_hosted_style_base_url from google.cloud.storage.acl import BucketACL from google.cloud.storage.acl import DefaultObjectACL from google.cloud.storage.blob import Blob from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.constants import ARCHIVE_STORAGE_CLASS from google.cloud.storage.constants import COLDLINE_STORAGE_CLASS from google.cloud.storage.constants import DUAL_REGION_LOCATION_TYPE from google.cloud.storage.constants import ( DURABLE_REDUCED_AVAILABILITY_LEGACY_STORAGE_CLASS, ) from google.cloud.storage.constants import MULTI_REGIONAL_LEGACY_STORAGE_CLASS from google.cloud.storage.constants import MULTI_REGION_LOCATION_TYPE from google.cloud.storage.constants import NEARLINE_STORAGE_CLASS from google.cloud.storage.constants import PUBLIC_ACCESS_PREVENTION_INHERITED from google.cloud.storage.constants import REGIONAL_LEGACY_STORAGE_CLASS from google.cloud.storage.constants import REGION_LOCATION_TYPE from google.cloud.storage.constants import STANDARD_STORAGE_CLASS from google.cloud.storage.notification import BucketNotification from google.cloud.storage.notification import NONE_PAYLOAD_FORMAT from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED _UBLA_BPO_ENABLED_MESSAGE = ( "Pass only one of 'uniform_bucket_level_access_enabled' / " "'bucket_policy_only_enabled' to 'IAMConfiguration'." ) _BPO_ENABLED_MESSAGE = ( "'IAMConfiguration.bucket_policy_only_enabled' is deprecated. " "Instead, use 'IAMConfiguration.uniform_bucket_level_access_enabled'." ) _UBLA_BPO_LOCK_TIME_MESSAGE = ( "Pass only one of 'uniform_bucket_level_access_lock_time' / " "'bucket_policy_only_lock_time' to 'IAMConfiguration'." ) _BPO_LOCK_TIME_MESSAGE = ( "'IAMConfiguration.bucket_policy_only_lock_time' is deprecated. " "Instead, use 'IAMConfiguration.uniform_bucket_level_access_lock_time'." ) _LOCATION_SETTER_MESSAGE = ( "Assignment to 'Bucket.location' is deprecated, as it is only " "valid before the bucket is created. Instead, pass the location " "to `Bucket.create`." ) def _blobs_page_start(iterator, page, response): """Grab prefixes after a :class:`~google.cloud.iterator.Page` started. :type iterator: :class:`~google.api_core.page_iterator.Iterator` :param iterator: The iterator that is currently in use. :type page: :class:`~google.cloud.api.core.page_iterator.Page` :param page: The page that was just created. :type response: dict :param response: The JSON API response for a page of blobs. """ page.prefixes = tuple(response.get("prefixes", ())) iterator.prefixes.update(page.prefixes) def _item_to_blob(iterator, item): """Convert a JSON blob to the native object. .. note:: This assumes that the ``bucket`` attribute has been added to the iterator after being created. :type iterator: :class:`~google.api_core.page_iterator.Iterator` :param iterator: The iterator that has retrieved the item. :type item: dict :param item: An item to be converted to a blob. :rtype: :class:`.Blob` :returns: The next blob in the page. """ name = item.get("name") blob = Blob(name, bucket=iterator.bucket) blob._set_properties(item) return blob def _item_to_notification(iterator, item): """Convert a JSON blob to the native object. .. note:: This assumes that the ``bucket`` attribute has been added to the iterator after being created. :type iterator: :class:`~google.api_core.page_iterator.Iterator` :param iterator: The iterator that has retrieved the item. :type item: dict :param item: An item to be converted to a blob. :rtype: :class:`.BucketNotification` :returns: The next notification being iterated. """ return BucketNotification.from_api_repr(item, bucket=iterator.bucket) class LifecycleRuleConditions(dict): """Map a single lifecycle rule for a bucket. See: https://cloud.google.com/storage/docs/lifecycle :type age: int :param age: (Optional) Apply rule action to items whose age, in days, exceeds this value. :type created_before: datetime.date :param created_before: (Optional) Apply rule action to items created before this date. :type is_live: bool :param is_live: (Optional) If true, apply rule action to non-versioned items, or to items with no newer versions. If false, apply rule action to versioned items with at least one newer version. :type matches_prefix: list(str) :param matches_prefix: (Optional) Apply rule action to items which any prefix matches the beginning of the item name. :type matches_storage_class: list(str), one or more of :attr:`Bucket.STORAGE_CLASSES`. :param matches_storage_class: (Optional) Apply rule action to items whose storage class matches this value. :type matches_suffix: list(str) :param matches_suffix: (Optional) Apply rule action to items which any suffix matches the end of the item name. :type number_of_newer_versions: int :param number_of_newer_versions: (Optional) Apply rule action to versioned items having N newer versions. :type days_since_custom_time: int :param days_since_custom_time: (Optional) Apply rule action to items whose number of days elapsed since the custom timestamp. This condition is relevant only for versioned objects. The value of the field must be a non negative integer. If it's zero, the object version will become eligible for lifecycle action as soon as it becomes custom. :type custom_time_before: :class:`datetime.date` :param custom_time_before: (Optional) Date object parsed from RFC3339 valid date, apply rule action to items whose custom time is before this date. This condition is relevant only for versioned objects, e.g., 2019-03-16. :type days_since_noncurrent_time: int :param days_since_noncurrent_time: (Optional) Apply rule action to items whose number of days elapsed since the non current timestamp. This condition is relevant only for versioned objects. The value of the field must be a non negative integer. If it's zero, the object version will become eligible for lifecycle action as soon as it becomes non current. :type noncurrent_time_before: :class:`datetime.date` :param noncurrent_time_before: (Optional) Date object parsed from RFC3339 valid date, apply rule action to items whose non current time is before this date. This condition is relevant only for versioned objects, e.g, 2019-03-16. :raises ValueError: if no arguments are passed. """ def __init__( self, age=None, created_before=None, is_live=None, matches_storage_class=None, number_of_newer_versions=None, days_since_custom_time=None, custom_time_before=None, days_since_noncurrent_time=None, noncurrent_time_before=None, matches_prefix=None, matches_suffix=None, _factory=False, ): conditions = {} if age is not None: conditions["age"] = age if created_before is not None: conditions["createdBefore"] = created_before.isoformat() if is_live is not None: conditions["isLive"] = is_live if matches_storage_class is not None: conditions["matchesStorageClass"] = matches_storage_class if number_of_newer_versions is not None: conditions["numNewerVersions"] = number_of_newer_versions if days_since_custom_time is not None: conditions["daysSinceCustomTime"] = days_since_custom_time if custom_time_before is not None: conditions["customTimeBefore"] = custom_time_before.isoformat() if days_since_noncurrent_time is not None: conditions["daysSinceNoncurrentTime"] = days_since_noncurrent_time if noncurrent_time_before is not None: conditions["noncurrentTimeBefore"] = noncurrent_time_before.isoformat() if matches_prefix is not None: conditions["matchesPrefix"] = matches_prefix if matches_suffix is not None: conditions["matchesSuffix"] = matches_suffix if not _factory and not conditions: raise ValueError("Supply at least one condition") super(LifecycleRuleConditions, self).__init__(conditions) @classmethod def from_api_repr(cls, resource): """Factory: construct instance from resource. :type resource: dict :param resource: mapping as returned from API call. :rtype: :class:`LifecycleRuleConditions` :returns: Instance created from resource. """ instance = cls(_factory=True) instance.update(resource) return instance @property def age(self): """Conditon's age value.""" return self.get("age") @property def created_before(self): """Conditon's created_before value.""" before = self.get("createdBefore") if before is not None: return datetime_helpers.from_iso8601_date(before) @property def is_live(self): """Conditon's 'is_live' value.""" return self.get("isLive") @property def matches_prefix(self): """Conditon's 'matches_prefix' value.""" return self.get("matchesPrefix") @property def matches_storage_class(self): """Conditon's 'matches_storage_class' value.""" return self.get("matchesStorageClass") @property def matches_suffix(self): """Conditon's 'matches_suffix' value.""" return self.get("matchesSuffix") @property def number_of_newer_versions(self): """Conditon's 'number_of_newer_versions' value.""" return self.get("numNewerVersions") @property def days_since_custom_time(self): """Conditon's 'days_since_custom_time' value.""" return self.get("daysSinceCustomTime") @property def custom_time_before(self): """Conditon's 'custom_time_before' value.""" before = self.get("customTimeBefore") if before is not None: return datetime_helpers.from_iso8601_date(before) @property def days_since_noncurrent_time(self): """Conditon's 'days_since_noncurrent_time' value.""" return self.get("daysSinceNoncurrentTime") @property def noncurrent_time_before(self): """Conditon's 'noncurrent_time_before' value.""" before = self.get("noncurrentTimeBefore") if before is not None: return datetime_helpers.from_iso8601_date(before) class LifecycleRuleDelete(dict): """Map a lifecycle rule deleting matching items. :type kw: dict :params kw: arguments passed to :class:`LifecycleRuleConditions`. """ def __init__(self, **kw): conditions = LifecycleRuleConditions(**kw) rule = {"action": {"type": "Delete"}, "condition": dict(conditions)} super().__init__(rule) @classmethod def from_api_repr(cls, resource): """Factory: construct instance from resource. :type resource: dict :param resource: mapping as returned from API call. :rtype: :class:`LifecycleRuleDelete` :returns: Instance created from resource. """ instance = cls(_factory=True) instance.update(resource) return instance class LifecycleRuleSetStorageClass(dict): """Map a lifecycle rule updating storage class of matching items. :type storage_class: str, one of :attr:`Bucket.STORAGE_CLASSES`. :param storage_class: new storage class to assign to matching items. :type kw: dict :params kw: arguments passed to :class:`LifecycleRuleConditions`. """ def __init__(self, storage_class, **kw): conditions = LifecycleRuleConditions(**kw) rule = { "action": {"type": "SetStorageClass", "storageClass": storage_class}, "condition": dict(conditions), } super().__init__(rule) @classmethod def from_api_repr(cls, resource): """Factory: construct instance from resource. :type resource: dict :param resource: mapping as returned from API call. :rtype: :class:`LifecycleRuleSetStorageClass` :returns: Instance created from resource. """ action = resource["action"] instance = cls(action["storageClass"], _factory=True) instance.update(resource) return instance class LifecycleRuleAbortIncompleteMultipartUpload(dict): """Map a rule aborting incomplete multipart uploads of matching items. The "age" lifecycle condition is the only supported condition for this rule. :type kw: dict :params kw: arguments passed to :class:`LifecycleRuleConditions`. """ def __init__(self, **kw): conditions = LifecycleRuleConditions(**kw) rule = { "action": {"type": "AbortIncompleteMultipartUpload"}, "condition": dict(conditions), } super().__init__(rule) @classmethod def from_api_repr(cls, resource): """Factory: construct instance from resource. :type resource: dict :param resource: mapping as returned from API call. :rtype: :class:`LifecycleRuleAbortIncompleteMultipartUpload` :returns: Instance created from resource. """ instance = cls(_factory=True) instance.update(resource) return instance _default = object() class IAMConfiguration(dict): """Map a bucket's IAM configuration. :type bucket: :class:`Bucket` :params bucket: Bucket for which this instance is the policy. :type public_access_prevention: str :params public_access_prevention: (Optional) Whether the public access prevention policy is 'inherited' (default) or 'enforced' See: https://cloud.google.com/storage/docs/public-access-prevention :type uniform_bucket_level_access_enabled: bool :params bucket_policy_only_enabled: (Optional) Whether the IAM-only policy is enabled for the bucket. :type uniform_bucket_level_access_locked_time: :class:`datetime.datetime` :params uniform_bucket_level_locked_time: (Optional) When the bucket's IAM-only policy was enabled. This value should normally only be set by the back-end API. :type bucket_policy_only_enabled: bool :params bucket_policy_only_enabled: Deprecated alias for :data:`uniform_bucket_level_access_enabled`. :type bucket_policy_only_locked_time: :class:`datetime.datetime` :params bucket_policy_only_locked_time: Deprecated alias for :data:`uniform_bucket_level_access_locked_time`. """ def __init__( self, bucket, public_access_prevention=_default, uniform_bucket_level_access_enabled=_default, uniform_bucket_level_access_locked_time=_default, bucket_policy_only_enabled=_default, bucket_policy_only_locked_time=_default, ): if bucket_policy_only_enabled is not _default: if uniform_bucket_level_access_enabled is not _default: raise ValueError(_UBLA_BPO_ENABLED_MESSAGE) warnings.warn(_BPO_ENABLED_MESSAGE, DeprecationWarning, stacklevel=2) uniform_bucket_level_access_enabled = bucket_policy_only_enabled if bucket_policy_only_locked_time is not _default: if uniform_bucket_level_access_locked_time is not _default: raise ValueError(_UBLA_BPO_LOCK_TIME_MESSAGE) warnings.warn(_BPO_LOCK_TIME_MESSAGE, DeprecationWarning, stacklevel=2) uniform_bucket_level_access_locked_time = bucket_policy_only_locked_time if uniform_bucket_level_access_enabled is _default: uniform_bucket_level_access_enabled = False if public_access_prevention is _default: public_access_prevention = PUBLIC_ACCESS_PREVENTION_INHERITED data = { "uniformBucketLevelAccess": { "enabled": uniform_bucket_level_access_enabled }, "publicAccessPrevention": public_access_prevention, } if uniform_bucket_level_access_locked_time is not _default: data["uniformBucketLevelAccess"]["lockedTime"] = _datetime_to_rfc3339( uniform_bucket_level_access_locked_time ) super(IAMConfiguration, self).__init__(data) self._bucket = bucket @classmethod def from_api_repr(cls, resource, bucket): """Factory: construct instance from resource. :type bucket: :class:`Bucket` :params bucket: Bucket for which this instance is the policy. :type resource: dict :param resource: mapping as returned from API call. :rtype: :class:`IAMConfiguration` :returns: Instance created from resource. """ instance = cls(bucket) instance.update(resource) return instance @property def bucket(self): """Bucket for which this instance is the policy. :rtype: :class:`Bucket` :returns: the instance's bucket. """ return self._bucket @property def public_access_prevention(self): """Setting for public access prevention policy. Options are 'inherited' (default) or 'enforced'. See: https://cloud.google.com/storage/docs/public-access-prevention :rtype: string :returns: the public access prevention status, either 'enforced' or 'inherited'. """ return self["publicAccessPrevention"] @public_access_prevention.setter def public_access_prevention(self, value): self["publicAccessPrevention"] = value self.bucket._patch_property("iamConfiguration", self) @property def uniform_bucket_level_access_enabled(self): """If set, access checks only use bucket-level IAM policies or above. :rtype: bool :returns: whether the bucket is configured to allow only IAM. """ ubla = self.get("uniformBucketLevelAccess", {}) return ubla.get("enabled", False) @uniform_bucket_level_access_enabled.setter def uniform_bucket_level_access_enabled(self, value): ubla = self.setdefault("uniformBucketLevelAccess", {}) ubla["enabled"] = bool(value) self.bucket._patch_property("iamConfiguration", self) @property def uniform_bucket_level_access_locked_time(self): """Deadline for changing :attr:`uniform_bucket_level_access_enabled` from true to false. If the bucket's :attr:`uniform_bucket_level_access_enabled` is true, this property is time time after which that setting becomes immutable. If the bucket's :attr:`uniform_bucket_level_access_enabled` is false, this property is ``None``. :rtype: Union[:class:`datetime.datetime`, None] :returns: (readonly) Time after which :attr:`uniform_bucket_level_access_enabled` will be frozen as true. """ ubla = self.get("uniformBucketLevelAccess", {}) stamp = ubla.get("lockedTime") if stamp is not None: stamp = _rfc3339_nanos_to_datetime(stamp) return stamp @property def bucket_policy_only_enabled(self): """Deprecated alias for :attr:`uniform_bucket_level_access_enabled`. :rtype: bool :returns: whether the bucket is configured to allow only IAM. """ return self.uniform_bucket_level_access_enabled @bucket_policy_only_enabled.setter def bucket_policy_only_enabled(self, value): warnings.warn(_BPO_ENABLED_MESSAGE, DeprecationWarning, stacklevel=2) self.uniform_bucket_level_access_enabled = value @property def bucket_policy_only_locked_time(self): """Deprecated alias for :attr:`uniform_bucket_level_access_locked_time`. :rtype: Union[:class:`datetime.datetime`, None] :returns: (readonly) Time after which :attr:`bucket_policy_only_enabled` will be frozen as true. """ return self.uniform_bucket_level_access_locked_time class Bucket(_PropertyMixin): """A class representing a Bucket on Cloud Storage. :type client: :class:`google.cloud.storage.client.Client` :param client: A client which holds credentials and project configuration for the bucket (which requires a project). :type name: str :param name: The name of the bucket. Bucket names must start and end with a number or letter. :type user_project: str :param user_project: (Optional) the project ID to be billed for API requests made via this instance. """ _MAX_OBJECTS_FOR_ITERATION = 256 """Maximum number of existing objects allowed in iteration. This is used in Bucket.delete() and Bucket.make_public(). """ STORAGE_CLASSES = ( STANDARD_STORAGE_CLASS, NEARLINE_STORAGE_CLASS, COLDLINE_STORAGE_CLASS, ARCHIVE_STORAGE_CLASS, MULTI_REGIONAL_LEGACY_STORAGE_CLASS, # legacy REGIONAL_LEGACY_STORAGE_CLASS, # legacy DURABLE_REDUCED_AVAILABILITY_LEGACY_STORAGE_CLASS, # legacy ) """Allowed values for :attr:`storage_class`. Default value is :attr:`STANDARD_STORAGE_CLASS`. See https://cloud.google.com/storage/docs/json_api/v1/buckets#storageClass https://cloud.google.com/storage/docs/storage-classes """ _LOCATION_TYPES = ( MULTI_REGION_LOCATION_TYPE, REGION_LOCATION_TYPE, DUAL_REGION_LOCATION_TYPE, ) """Allowed values for :attr:`location_type`.""" def __init__(self, client, name=None, user_project=None): """ property :attr:`name` Get the bucket's name. """ name = _validate_name(name) super(Bucket, self).__init__(name=name) self._client = client self._acl = BucketACL(self) self._default_object_acl = DefaultObjectACL(self) self._label_removals = set() self._user_project = user_project def __repr__(self): return f"" @property def client(self): """The client bound to this bucket.""" return self._client def _set_properties(self, value): """Set the properties for the current object. :type value: dict or :class:`google.cloud.storage.batch._FutureDict` :param value: The properties to be set. """ self._label_removals.clear() return super(Bucket, self)._set_properties(value) @property def rpo(self): """Get the RPO (Recovery Point Objective) of this bucket See: https://cloud.google.com/storage/docs/managing-turbo-replication "ASYNC_TURBO" or "DEFAULT" :rtype: str """ return self._properties.get("rpo") @rpo.setter def rpo(self, value): """ Set the RPO (Recovery Point Objective) of this bucket. See: https://cloud.google.com/storage/docs/managing-turbo-replication :type value: str :param value: "ASYNC_TURBO" or "DEFAULT" """ self._patch_property("rpo", value) @property def user_project(self): """Project ID to be billed for API requests made via this bucket. If unset, API requests are billed to the bucket owner. A user project is required for all operations on Requester Pays buckets. See https://cloud.google.com/storage/docs/requester-pays#requirements for details. :rtype: str """ return self._user_project @classmethod def from_string(cls, uri, client=None): """Get a constructor for bucket object by URI. .. code-block:: python from google.cloud import storage from google.cloud.storage.bucket import Bucket client = storage.Client() bucket = Bucket.from_string("gs://bucket", client=client) :type uri: str :param uri: The bucket uri pass to get bucket object. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. Application code should *always* pass ``client``. :rtype: :class:`google.cloud.storage.bucket.Bucket` :returns: The bucket object created. """ scheme, netloc, path, query, frag = urlsplit(uri) if scheme != "gs": raise ValueError("URI scheme must be gs") return cls(client, name=netloc) def blob( self, blob_name, chunk_size=None, encryption_key=None, kms_key_name=None, generation=None, ): """Factory constructor for blob object. .. note:: This will not make an HTTP request; it simply instantiates a blob object owned by this bucket. :type blob_name: str :param blob_name: The name of the blob to be instantiated. :type chunk_size: int :param chunk_size: The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification. :type encryption_key: bytes :param encryption_key: (Optional) 32 byte encryption key for customer-supplied encryption. :type kms_key_name: str :param kms_key_name: (Optional) Resource name of KMS key used to encrypt blob's content. :type generation: long :param generation: (Optional) If present, selects a specific revision of this object. :rtype: :class:`google.cloud.storage.blob.Blob` :returns: The blob object created. """ return Blob( name=blob_name, bucket=self, chunk_size=chunk_size, encryption_key=encryption_key, kms_key_name=kms_key_name, generation=generation, ) def notification( self, topic_name=None, topic_project=None, custom_attributes=None, event_types=None, blob_name_prefix=None, payload_format=NONE_PAYLOAD_FORMAT, notification_id=None, ): """Factory: create a notification resource for the bucket. See: :class:`.BucketNotification` for parameters. :rtype: :class:`.BucketNotification` """ return BucketNotification( self, topic_name=topic_name, topic_project=topic_project, custom_attributes=custom_attributes, event_types=event_types, blob_name_prefix=blob_name_prefix, payload_format=payload_format, notification_id=notification_id, ) def exists( self, client=None, timeout=_DEFAULT_TIMEOUT, if_etag_match=None, if_etag_not_match=None, if_metageneration_match=None, if_metageneration_not_match=None, retry=DEFAULT_RETRY, ): """Determines whether or not this bucket exists. If :attr:`user_project` is set, bills the API request to that project. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type if_etag_match: Union[str, Set[str]] :param if_etag_match: (Optional) Make the operation conditional on whether the bucket's current ETag matches the given value. :type if_etag_not_match: Union[str, Set[str]]) :param if_etag_not_match: (Optional) Make the operation conditional on whether the bucket's current ETag does not match the given value. :type if_metageneration_match: long :param if_metageneration_match: (Optional) Make the operation conditional on whether the bucket's current metageneration matches the given value. :type if_metageneration_not_match: long :param if_metageneration_not_match: (Optional) Make the operation conditional on whether the bucket's current metageneration does not match the given value. :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` :rtype: bool :returns: True if the bucket exists in Cloud Storage. """ client = self._require_client(client) # We only need the status code (200 or not) so we seek to # minimize the returned payload. query_params = {"fields": "name"} if self.user_project is not None: query_params["userProject"] = self.user_project _add_generation_match_parameters( query_params, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, ) headers = {} _add_etag_match_headers( headers, if_etag_match=if_etag_match, if_etag_not_match=if_etag_not_match ) try: # We intentionally pass `_target_object=None` since fields=name # would limit the local properties. client._get_resource( self.path, query_params=query_params, headers=headers, timeout=timeout, retry=retry, _target_object=None, ) except NotFound: # NOTE: This will not fail immediately in a batch. However, when # Batch.finish() is called, the resulting `NotFound` will be # raised. return False return True def create( self, client=None, project=None, location=None, predefined_acl=None, predefined_default_object_acl=None, enable_object_retention=False, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY, ): """Creates current bucket. If the bucket already exists, will raise :class:`google.cloud.exceptions.Conflict`. This implements "storage.buckets.insert". If :attr:`user_project` is set, bills the API request to that project. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type project: str :param project: (Optional) The project under which the bucket is to be created. If not passed, uses the project set on the client. :raises ValueError: if ``project`` is None and client's :attr:`project` is also None. :type location: str :param location: (Optional) The location of the bucket. If not passed, the default location, US, will be used. See https://cloud.google.com/storage/docs/bucket-locations :type predefined_acl: str :param predefined_acl: (Optional) Name of predefined ACL to apply to bucket. See: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl :type predefined_default_object_acl: str :param predefined_default_object_acl: (Optional) Name of predefined ACL to apply to bucket's objects. See: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl :type enable_object_retention: bool :param enable_object_retention: (Optional) Whether object retention should be enabled on this bucket. See: https://cloud.google.com/storage/docs/object-lock :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` """ client = self._require_client(client) client.create_bucket( bucket_or_name=self, project=project, user_project=self.user_project, location=location, predefined_acl=predefined_acl, predefined_default_object_acl=predefined_default_object_acl, enable_object_retention=enable_object_retention, timeout=timeout, retry=retry, ) def update( self, client=None, timeout=_DEFAULT_TIMEOUT, if_metageneration_match=None, if_metageneration_not_match=None, retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, ): """Sends all properties in a PUT request. Updates the ``_properties`` with the response from the backend. If :attr:`user_project` is set, bills the API request to that project. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: the client to use. If not passed, falls back to the ``client`` stored on the current object. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type if_metageneration_match: long :param if_metageneration_match: (Optional) Make the operation conditional on whether the blob's current metageneration matches the given value. :type if_metageneration_not_match: long :param if_metageneration_not_match: (Optional) Make the operation conditional on whether the blob's current metageneration does not match the given value. :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` """ super(Bucket, self).update( client=client, timeout=timeout, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, retry=retry, ) def reload( self, client=None, projection="noAcl", timeout=_DEFAULT_TIMEOUT, if_etag_match=None, if_etag_not_match=None, if_metageneration_match=None, if_metageneration_not_match=None, retry=DEFAULT_RETRY, ): """Reload properties from Cloud Storage. If :attr:`user_project` is set, bills the API request to that project. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: the client to use. If not passed, falls back to the ``client`` stored on the current object. :type projection: str :param projection: (Optional) If used, must be 'full' or 'noAcl'. Defaults to ``'noAcl'``. Specifies the set of properties to return. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type if_etag_match: Union[str, Set[str]] :param if_etag_match: (Optional) Make the operation conditional on whether the bucket's current ETag matches the given value. :type if_etag_not_match: Union[str, Set[str]]) :param if_etag_not_match: (Optional) Make the operation conditional on whether the bucket's current ETag does not match the given value. :type if_metageneration_match: long :param if_metageneration_match: (Optional) Make the operation conditional on whether the bucket's current metageneration matches the given value. :type if_metageneration_not_match: long :param if_metageneration_not_match: (Optional) Make the operation conditional on whether the bucket's current metageneration does not match the given value. :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` """ super(Bucket, self).reload( client=client, projection=projection, timeout=timeout, if_etag_match=if_etag_match, if_etag_not_match=if_etag_not_match, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, retry=retry, ) def patch( self, client=None, timeout=_DEFAULT_TIMEOUT, if_metageneration_match=None, if_metageneration_not_match=None, retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, ): """Sends all changed properties in a PATCH request. Updates the ``_properties`` with the response from the backend. If :attr:`user_project` is set, bills the API request to that project. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: the client to use. If not passed, falls back to the ``client`` stored on the current object. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type if_metageneration_match: long :param if_metageneration_match: (Optional) Make the operation conditional on whether the blob's current metageneration matches the given value. :type if_metageneration_not_match: long :param if_metageneration_not_match: (Optional) Make the operation conditional on whether the blob's current metageneration does not match the given value. :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` """ # Special case: For buckets, it is possible that labels are being # removed; this requires special handling. if self._label_removals: self._changes.add("labels") self._properties.setdefault("labels", {}) for removed_label in self._label_removals: self._properties["labels"][removed_label] = None # Call the superclass method. super(Bucket, self).patch( client=client, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, retry=retry, ) @property def acl(self): """Create our ACL on demand.""" return self._acl @property def default_object_acl(self): """Create our defaultObjectACL on demand.""" return self._default_object_acl @staticmethod def path_helper(bucket_name): """Relative URL path for a bucket. :type bucket_name: str :param bucket_name: The bucket name in the path. :rtype: str :returns: The relative URL path for ``bucket_name``. """ return "/b/" + bucket_name @property def path(self): """The URL path to this bucket.""" if not self.name: raise ValueError("Cannot determine path without bucket name.") return self.path_helper(self.name) def get_blob( self, blob_name, client=None, encryption_key=None, generation=None, if_etag_match=None, if_etag_not_match=None, if_generation_match=None, if_generation_not_match=None, if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY, soft_deleted=None, **kwargs, ): """Get a blob object by name. See a [code sample](https://cloud.google.com/storage/docs/samples/storage-get-metadata#storage_get_metadata-python) on how to retrieve metadata of an object. If :attr:`user_project` is set, bills the API request to that project. :type blob_name: str :param blob_name: The name of the blob to retrieve. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type encryption_key: bytes :param encryption_key: (Optional) 32 byte encryption key for customer-supplied encryption. See https://cloud.google.com/storage/docs/encryption#customer-supplied. :type generation: long :param generation: (Optional) If present, selects a specific revision of this object. :type if_etag_match: Union[str, Set[str]] :param if_etag_match: (Optional) See :ref:`using-if-etag-match` :type if_etag_not_match: Union[str, Set[str]] :param if_etag_not_match: (Optional) See :ref:`using-if-etag-not-match` :type if_generation_match: long :param if_generation_match: (Optional) See :ref:`using-if-generation-match` :type if_generation_not_match: long :param if_generation_not_match: (Optional) See :ref:`using-if-generation-not-match` :type if_metageneration_match: long :param if_metageneration_match: (Optional) See :ref:`using-if-metageneration-match` :type if_metageneration_not_match: long :param if_metageneration_not_match: (Optional) See :ref:`using-if-metageneration-not-match` :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` :type soft_deleted: bool :param soft_deleted: (Optional) If True, looks for a soft-deleted object. Will only return the object metadata if the object exists and is in a soft-deleted state. Object ``generation`` is required if ``soft_deleted`` is set to True. See: https://cloud.google.com/storage/docs/soft-delete :param kwargs: Keyword arguments to pass to the :class:`~google.cloud.storage.blob.Blob` constructor. :rtype: :class:`google.cloud.storage.blob.Blob` or None :returns: The blob object if it exists, otherwise None. """ blob = Blob( bucket=self, name=blob_name, encryption_key=encryption_key, generation=generation, **kwargs, ) try: # NOTE: This will not fail immediately in a batch. However, when # Batch.finish() is called, the resulting `NotFound` will be # raised. blob.reload( client=client, timeout=timeout, if_etag_match=if_etag_match, if_etag_not_match=if_etag_not_match, if_generation_match=if_generation_match, if_generation_not_match=if_generation_not_match, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, retry=retry, soft_deleted=soft_deleted, ) except NotFound: return None else: return blob def list_blobs( self, max_results=None, page_token=None, prefix=None, delimiter=None, start_offset=None, end_offset=None, include_trailing_delimiter=None, versions=None, projection="noAcl", fields=None, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY, match_glob=None, include_folders_as_prefixes=None, soft_deleted=None, page_size=None, ): """Return an iterator used to find blobs in the bucket. If :attr:`user_project` is set, bills the API request to that project. :type max_results: int :param max_results: (Optional) The maximum number of blobs to return. :type page_token: str :param page_token: (Optional) If present, return the next batch of blobs, using the value, which must correspond to the ``nextPageToken`` value returned in the previous response. Deprecated: use the ``pages`` property of the returned iterator instead of manually passing the token. :type prefix: str :param prefix: (Optional) Prefix used to filter blobs. :type delimiter: str :param delimiter: (Optional) Delimiter, used with ``prefix`` to emulate hierarchy. :type start_offset: str :param start_offset: (Optional) Filter results to objects whose names are lexicographically equal to or after ``startOffset``. If ``endOffset`` is also set, the objects listed will have names between ``startOffset`` (inclusive) and ``endOffset`` (exclusive). :type end_offset: str :param end_offset: (Optional) Filter results to objects whose names are lexicographically before ``endOffset``. If ``startOffset`` is also set, the objects listed will have names between ``startOffset`` (inclusive) and ``endOffset`` (exclusive). :type include_trailing_delimiter: boolean :param include_trailing_delimiter: (Optional) If true, objects that end in exactly one instance of ``delimiter`` will have their metadata included in ``items`` in addition to ``prefixes``. :type versions: bool :param versions: (Optional) Whether object versions should be returned as separate blobs. :type projection: str :param projection: (Optional) If used, must be 'full' or 'noAcl'. Defaults to ``'noAcl'``. Specifies the set of properties to return. :type fields: str :param fields: (Optional) Selector specifying which fields to include in a partial response. Must be a list of fields. For example to get a partial response with just the next page token and the name and language of each blob returned: ``'items(name,contentLanguage),nextPageToken'``. See: https://cloud.google.com/storage/docs/json_api/v1/parameters#fields :type client: :class:`~google.cloud.storage.client.Client` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` :type match_glob: str :param match_glob: (Optional) A glob pattern used to filter results (for example, foo*bar). The string value must be UTF-8 encoded. See: https://cloud.google.com/storage/docs/json_api/v1/objects/list#list-object-glob :type include_folders_as_prefixes: bool (Optional) If true, includes Folders and Managed Folders in the set of ``prefixes`` returned by the query. Only applicable if ``delimiter`` is set to /. See: https://cloud.google.com/storage/docs/managed-folders :type soft_deleted: bool :param soft_deleted: (Optional) If true, only soft-deleted objects will be listed as distinct results in order of increasing generation number. This parameter can only be used successfully if the bucket has a soft delete policy. Note ``soft_deleted`` and ``versions`` cannot be set to True simultaneously. See: https://cloud.google.com/storage/docs/soft-delete :type page_size: int :param page_size: (Optional) Maximum number of blobs to return in each page. Defaults to a value set by the API. :rtype: :class:`~google.api_core.page_iterator.Iterator` :returns: Iterator of all :class:`~google.cloud.storage.blob.Blob` in this bucket matching the arguments. """ client = self._require_client(client) return client.list_blobs( self, max_results=max_results, page_token=page_token, prefix=prefix, delimiter=delimiter, start_offset=start_offset, end_offset=end_offset, include_trailing_delimiter=include_trailing_delimiter, versions=versions, projection=projection, fields=fields, page_size=page_size, timeout=timeout, retry=retry, match_glob=match_glob, include_folders_as_prefixes=include_folders_as_prefixes, soft_deleted=soft_deleted, ) def list_notifications( self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY ): """List Pub / Sub notifications for this bucket. See: https://cloud.google.com/storage/docs/json_api/v1/notifications/list If :attr:`user_project` is set, bills the API request to that project. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` :rtype: list of :class:`.BucketNotification` :returns: notification instances """ client = self._require_client(client) path = self.path + "/notificationConfigs" iterator = client._list_resource( path, _item_to_notification, timeout=timeout, retry=retry, ) iterator.bucket = self return iterator def get_notification( self, notification_id, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY, ): """Get Pub / Sub notification for this bucket. See [API reference docs](https://cloud.google.com/storage/docs/json_api/v1/notifications/get) and a [code sample](https://cloud.google.com/storage/docs/samples/storage-print-pubsub-bucket-notification#storage_print_pubsub_bucket_notification-python). If :attr:`user_project` is set, bills the API request to that project. :type notification_id: str :param notification_id: The notification id to retrieve the notification configuration. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` :rtype: :class:`.BucketNotification` :returns: notification instance. """ notification = self.notification(notification_id=notification_id) notification.reload(client=client, timeout=timeout, retry=retry) return notification def delete( self, force=False, client=None, if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY, ): """Delete this bucket. The bucket **must** be empty in order to submit a delete request. If ``force=True`` is passed, this will first attempt to delete all the objects / blobs in the bucket (i.e. try to empty the bucket). If the bucket doesn't exist, this will raise :class:`google.cloud.exceptions.NotFound`. If the bucket is not empty (and ``force=False``), will raise :class:`google.cloud.exceptions.Conflict`. If ``force=True`` and the bucket contains more than 256 objects / blobs this will cowardly refuse to delete the objects (or the bucket). This is to prevent accidental bucket deletion and to prevent extremely long runtime of this method. Also note that ``force=True`` is not supported in a ``Batch`` context. If :attr:`user_project` is set, bills the API request to that project. :type force: bool :param force: If True, empties the bucket's objects then deletes it. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type if_metageneration_match: long :param if_metageneration_match: (Optional) Make the operation conditional on whether the blob's current metageneration matches the given value. :type if_metageneration_not_match: long :param if_metageneration_not_match: (Optional) Make the operation conditional on whether the blob's current metageneration does not match the given value. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` :raises: :class:`ValueError` if ``force`` is ``True`` and the bucket contains more than 256 objects / blobs. """ client = self._require_client(client) query_params = {} if self.user_project is not None: query_params["userProject"] = self.user_project _add_generation_match_parameters( query_params, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, ) if force: blobs = list( self.list_blobs( max_results=self._MAX_OBJECTS_FOR_ITERATION + 1, client=client, timeout=timeout, retry=retry, versions=True, ) ) if len(blobs) > self._MAX_OBJECTS_FOR_ITERATION: message = ( "Refusing to delete bucket with more than " "%d objects. If you actually want to delete " "this bucket, please delete the objects " "yourself before calling Bucket.delete()." ) % (self._MAX_OBJECTS_FOR_ITERATION,) raise ValueError(message) # Ignore 404 errors on delete. self.delete_blobs( blobs, on_error=lambda blob: None, client=client, timeout=timeout, retry=retry, preserve_generation=True, ) # We intentionally pass `_target_object=None` since a DELETE # request has no response value (whether in a standard request or # in a batch request). client._delete_resource( self.path, query_params=query_params, timeout=timeout, retry=retry, _target_object=None, ) def delete_blob( self, blob_name, client=None, generation=None, if_generation_match=None, if_generation_not_match=None, if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, ): """Deletes a blob from the current bucket. If :attr:`user_project` is set, bills the API request to that project. :type blob_name: str :param blob_name: A blob name to delete. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type generation: long :param generation: (Optional) If present, permanently deletes a specific revision of this object. :type if_generation_match: long :param if_generation_match: (Optional) See :ref:`using-if-generation-match` :type if_generation_not_match: long :param if_generation_not_match: (Optional) See :ref:`using-if-generation-not-match` :type if_metageneration_match: long :param if_metageneration_match: (Optional) See :ref:`using-if-metageneration-match` :type if_metageneration_not_match: long :param if_metageneration_not_match: (Optional) See :ref:`using-if-metageneration-not-match` :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. The default value is ``DEFAULT_RETRY_IF_GENERATION_SPECIFIED``, a conditional retry policy which will only enable retries if ``if_generation_match`` or ``generation`` is set, in order to ensure requests are idempotent before retrying them. Change the value to ``DEFAULT_RETRY`` or another `google.api_core.retry.Retry` object to enable retries regardless of generation precondition setting. See [Configuring Retries](https://cloud.google.com/python/docs/reference/storage/latest/retry_timeout). :raises: :class:`google.cloud.exceptions.NotFound` Raises a NotFound if the blob isn't found. To suppress the exception, use :meth:`delete_blobs` by passing a no-op ``on_error`` callback. """ client = self._require_client(client) blob = Blob(blob_name, bucket=self, generation=generation) query_params = copy.deepcopy(blob._query_params) _add_generation_match_parameters( query_params, if_generation_match=if_generation_match, if_generation_not_match=if_generation_not_match, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, ) # We intentionally pass `_target_object=None` since a DELETE # request has no response value (whether in a standard request or # in a batch request). client._delete_resource( blob.path, query_params=query_params, timeout=timeout, retry=retry, _target_object=None, ) def delete_blobs( self, blobs, on_error=None, client=None, preserve_generation=False, timeout=_DEFAULT_TIMEOUT, if_generation_match=None, if_generation_not_match=None, if_metageneration_match=None, if_metageneration_not_match=None, retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, ): """Deletes a list of blobs from the current bucket. Uses :meth:`delete_blob` to delete each individual blob. By default, any generation information in the list of blobs is ignored, and the live versions of all blobs are deleted. Set `preserve_generation` to True if blob generation should instead be propagated from the list of blobs. If :attr:`user_project` is set, bills the API request to that project. :type blobs: list :param blobs: A list of :class:`~google.cloud.storage.blob.Blob`-s or blob names to delete. :type on_error: callable :param on_error: (Optional) Takes single argument: ``blob``. Called once for each blob raising :class:`~google.cloud.exceptions.NotFound`; otherwise, the exception is propagated. Note that ``on_error`` is not supported in a ``Batch`` context. :type client: :class:`~google.cloud.storage.client.Client` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type preserve_generation: bool :param preserve_generation: (Optional) Deletes only the generation specified on the blob object, instead of the live version, if set to True. Only :class:~google.cloud.storage.blob.Blob objects can have their generation set in this way. Default: False. :type if_generation_match: list of long :param if_generation_match: (Optional) See :ref:`using-if-generation-match` Note that the length of the list must match the length of The list must match ``blobs`` item-to-item. :type if_generation_not_match: list of long :param if_generation_not_match: (Optional) See :ref:`using-if-generation-not-match` The list must match ``blobs`` item-to-item. :type if_metageneration_match: list of long :param if_metageneration_match: (Optional) See :ref:`using-if-metageneration-match` The list must match ``blobs`` item-to-item. :type if_metageneration_not_match: list of long :param if_metageneration_not_match: (Optional) See :ref:`using-if-metageneration-not-match` The list must match ``blobs`` item-to-item. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. The default value is ``DEFAULT_RETRY_IF_GENERATION_SPECIFIED``, a conditional retry policy which will only enable retries if ``if_generation_match`` or ``generation`` is set, in order to ensure requests are idempotent before retrying them. Change the value to ``DEFAULT_RETRY`` or another `google.api_core.retry.Retry` object to enable retries regardless of generation precondition setting. See [Configuring Retries](https://cloud.google.com/python/docs/reference/storage/latest/retry_timeout). :raises: :class:`~google.cloud.exceptions.NotFound` (if `on_error` is not passed). """ _raise_if_len_differs( len(blobs), if_generation_match=if_generation_match, if_generation_not_match=if_generation_not_match, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, ) if_generation_match = iter(if_generation_match or []) if_generation_not_match = iter(if_generation_not_match or []) if_metageneration_match = iter(if_metageneration_match or []) if_metageneration_not_match = iter(if_metageneration_not_match or []) for blob in blobs: try: blob_name = blob generation = None if not isinstance(blob_name, str): blob_name = blob.name generation = blob.generation if preserve_generation else None self.delete_blob( blob_name, client=client, generation=generation, if_generation_match=next(if_generation_match, None), if_generation_not_match=next(if_generation_not_match, None), if_metageneration_match=next(if_metageneration_match, None), if_metageneration_not_match=next(if_metageneration_not_match, None), timeout=timeout, retry=retry, ) except NotFound: if on_error is not None: on_error(blob) else: raise def copy_blob( self, blob, destination_bucket, new_name=None, client=None, preserve_acl=True, source_generation=None, if_generation_match=None, if_generation_not_match=None, if_metageneration_match=None, if_metageneration_not_match=None, if_source_generation_match=None, if_source_generation_not_match=None, if_source_metageneration_match=None, if_source_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, ): """Copy the given blob to the given bucket, optionally with a new name. If :attr:`user_project` is set, bills the API request to that project. See [API reference docs](https://cloud.google.com/storage/docs/json_api/v1/objects/copy) and a [code sample](https://cloud.google.com/storage/docs/samples/storage-copy-file#storage_copy_file-python). :type blob: :class:`google.cloud.storage.blob.Blob` :param blob: The blob to be copied. :type destination_bucket: :class:`google.cloud.storage.bucket.Bucket` :param destination_bucket: The bucket into which the blob should be copied. :type new_name: str :param new_name: (Optional) The new name for the copied file. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type preserve_acl: bool :param preserve_acl: DEPRECATED. This argument is not functional! (Optional) Copies ACL from old blob to new blob. Default: True. Note that ``preserve_acl`` is not supported in a ``Batch`` context. :type source_generation: long :param source_generation: (Optional) The generation of the blob to be copied. :type if_generation_match: long :param if_generation_match: (Optional) See :ref:`using-if-generation-match` Note that the generation to be matched is that of the ``destination`` blob. :type if_generation_not_match: long :param if_generation_not_match: (Optional) See :ref:`using-if-generation-not-match` Note that the generation to be matched is that of the ``destination`` blob. :type if_metageneration_match: long :param if_metageneration_match: (Optional) See :ref:`using-if-metageneration-match` Note that the metageneration to be matched is that of the ``destination`` blob. :type if_metageneration_not_match: long :param if_metageneration_not_match: (Optional) See :ref:`using-if-metageneration-not-match` Note that the metageneration to be matched is that of the ``destination`` blob. :type if_source_generation_match: long :param if_source_generation_match: (Optional) Makes the operation conditional on whether the source object's generation matches the given value. :type if_source_generation_not_match: long :param if_source_generation_not_match: (Optional) Makes the operation conditional on whether the source object's generation does not match the given value. :type if_source_metageneration_match: long :param if_source_metageneration_match: (Optional) Makes the operation conditional on whether the source object's current metageneration matches the given value. :type if_source_metageneration_not_match: long :param if_source_metageneration_not_match: (Optional) Makes the operation conditional on whether the source object's current metageneration does not match the given value. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. The default value is ``DEFAULT_RETRY_IF_GENERATION_SPECIFIED``, a conditional retry policy which will only enable retries if ``if_generation_match`` or ``generation`` is set, in order to ensure requests are idempotent before retrying them. Change the value to ``DEFAULT_RETRY`` or another `google.api_core.retry.Retry` object to enable retries regardless of generation precondition setting. See [Configuring Retries](https://cloud.google.com/python/docs/reference/storage/latest/retry_timeout). :rtype: :class:`google.cloud.storage.blob.Blob` :returns: The new Blob. """ client = self._require_client(client) query_params = {} if self.user_project is not None: query_params["userProject"] = self.user_project if source_generation is not None: query_params["sourceGeneration"] = source_generation _add_generation_match_parameters( query_params, if_generation_match=if_generation_match, if_generation_not_match=if_generation_not_match, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, if_source_generation_match=if_source_generation_match, if_source_generation_not_match=if_source_generation_not_match, if_source_metageneration_match=if_source_metageneration_match, if_source_metageneration_not_match=if_source_metageneration_not_match, ) if new_name is None: new_name = blob.name new_blob = Blob(bucket=destination_bucket, name=new_name) api_path = blob.path + "/copyTo" + new_blob.path copy_result = client._post_resource( api_path, None, query_params=query_params, timeout=timeout, retry=retry, _target_object=new_blob, ) if not preserve_acl: new_blob.acl.save(acl={}, client=client, timeout=timeout) new_blob._set_properties(copy_result) return new_blob def rename_blob( self, blob, new_name, client=None, if_generation_match=None, if_generation_not_match=None, if_metageneration_match=None, if_metageneration_not_match=None, if_source_generation_match=None, if_source_generation_not_match=None, if_source_metageneration_match=None, if_source_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, ): """Rename the given blob using copy and delete operations. If :attr:`user_project` is set, bills the API request to that project. Effectively, copies blob to the same bucket with a new name, then deletes the blob. .. warning:: This method will first duplicate the data and then delete the old blob. This means that with very large objects renaming could be a very (temporarily) costly or a very slow operation. If you need more control over the copy and deletion, instead use ``google.cloud.storage.blob.Blob.copy_to`` and ``google.cloud.storage.blob.Blob.delete`` directly. Also note that this method is not fully supported in a ``Batch`` context. :type blob: :class:`google.cloud.storage.blob.Blob` :param blob: The blob to be renamed. :type new_name: str :param new_name: The new name for this blob. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type if_generation_match: long :param if_generation_match: (Optional) See :ref:`using-if-generation-match` Note that the generation to be matched is that of the ``destination`` blob. :type if_generation_not_match: long :param if_generation_not_match: (Optional) See :ref:`using-if-generation-not-match` Note that the generation to be matched is that of the ``destination`` blob. :type if_metageneration_match: long :param if_metageneration_match: (Optional) See :ref:`using-if-metageneration-match` Note that the metageneration to be matched is that of the ``destination`` blob. :type if_metageneration_not_match: long :param if_metageneration_not_match: (Optional) See :ref:`using-if-metageneration-not-match` Note that the metageneration to be matched is that of the ``destination`` blob. :type if_source_generation_match: long :param if_source_generation_match: (Optional) Makes the operation conditional on whether the source object's generation matches the given value. Also used in the (implied) delete request. :type if_source_generation_not_match: long :param if_source_generation_not_match: (Optional) Makes the operation conditional on whether the source object's generation does not match the given value. Also used in the (implied) delete request. :type if_source_metageneration_match: long :param if_source_metageneration_match: (Optional) Makes the operation conditional on whether the source object's current metageneration matches the given value. Also used in the (implied) delete request. :type if_source_metageneration_not_match: long :param if_source_metageneration_not_match: (Optional) Makes the operation conditional on whether the source object's current metageneration does not match the given value. Also used in the (implied) delete request. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. The default value is ``DEFAULT_RETRY_IF_GENERATION_SPECIFIED``, a conditional retry policy which will only enable retries if ``if_generation_match`` or ``generation`` is set, in order to ensure requests are idempotent before retrying them. Change the value to ``DEFAULT_RETRY`` or another `google.api_core.retry.Retry` object to enable retries regardless of generation precondition setting. See [Configuring Retries](https://cloud.google.com/python/docs/reference/storage/latest/retry_timeout). :rtype: :class:`Blob` :returns: The newly-renamed blob. """ same_name = blob.name == new_name new_blob = self.copy_blob( blob, self, new_name, client=client, timeout=timeout, if_generation_match=if_generation_match, if_generation_not_match=if_generation_not_match, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, if_source_generation_match=if_source_generation_match, if_source_generation_not_match=if_source_generation_not_match, if_source_metageneration_match=if_source_metageneration_match, if_source_metageneration_not_match=if_source_metageneration_not_match, retry=retry, ) if not same_name: blob.delete( client=client, timeout=timeout, if_generation_match=if_source_generation_match, if_generation_not_match=if_source_generation_not_match, if_metageneration_match=if_source_metageneration_match, if_metageneration_not_match=if_source_metageneration_not_match, retry=retry, ) return new_blob def restore_blob( self, blob_name, client=None, generation=None, copy_source_acl=None, projection=None, if_generation_match=None, if_generation_not_match=None, if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, ): """Restores a soft-deleted object. If :attr:`user_project` is set on the bucket, bills the API request to that project. See [API reference docs](https://cloud.google.com/storage/docs/json_api/v1/objects/restore) :type blob_name: str :param blob_name: The name of the blob to be restored. :type client: :class:`~google.cloud.storage.client.Client` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type generation: long :param generation: (Optional) If present, selects a specific revision of this object. :type copy_source_acl: bool :param copy_source_acl: (Optional) If true, copy the soft-deleted object's access controls. :type projection: str :param projection: (Optional) Specifies the set of properties to return. If used, must be 'full' or 'noAcl'. :type if_generation_match: long :param if_generation_match: (Optional) See :ref:`using-if-generation-match` :type if_generation_not_match: long :param if_generation_not_match: (Optional) See :ref:`using-if-generation-not-match` :type if_metageneration_match: long :param if_metageneration_match: (Optional) See :ref:`using-if-metageneration-match` :type if_metageneration_not_match: long :param if_metageneration_not_match: (Optional) See :ref:`using-if-metageneration-not-match` :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. The default value is ``DEFAULT_RETRY_IF_GENERATION_SPECIFIED``, which only restore operations with ``if_generation_match`` or ``generation`` set will be retried. Users can configure non-default retry behavior. A ``None`` value will disable retries. A ``DEFAULT_RETRY`` value will enable retries even if restore operations are not guaranteed to be idempotent. See [Configuring Retries](https://cloud.google.com/python/docs/reference/storage/latest/retry_timeout). :rtype: :class:`google.cloud.storage.blob.Blob` :returns: The restored Blob. """ client = self._require_client(client) query_params = {} if self.user_project is not None: query_params["userProject"] = self.user_project if generation is not None: query_params["generation"] = generation if copy_source_acl is not None: query_params["copySourceAcl"] = copy_source_acl if projection is not None: query_params["projection"] = projection _add_generation_match_parameters( query_params, if_generation_match=if_generation_match, if_generation_not_match=if_generation_not_match, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, ) blob = Blob(bucket=self, name=blob_name) api_response = client._post_resource( f"{blob.path}/restore", None, query_params=query_params, timeout=timeout, retry=retry, ) blob._set_properties(api_response) return blob @property def cors(self): """Retrieve or set CORS policies configured for this bucket. See http://www.w3.org/TR/cors/ and https://cloud.google.com/storage/docs/json_api/v1/buckets .. note:: The getter for this property returns a list which contains *copies* of the bucket's CORS policy mappings. Mutating the list or one of its dicts has no effect unless you then re-assign the dict via the setter. E.g.: >>> policies = bucket.cors >>> policies.append({'origin': '/foo', ...}) >>> policies[1]['maxAgeSeconds'] = 3600 >>> del policies[0] >>> bucket.cors = policies >>> bucket.update() :setter: Set CORS policies for this bucket. :getter: Gets the CORS policies for this bucket. :rtype: list of dictionaries :returns: A sequence of mappings describing each CORS policy. """ return [copy.deepcopy(policy) for policy in self._properties.get("cors", ())] @cors.setter def cors(self, entries): """Set CORS policies configured for this bucket. See http://www.w3.org/TR/cors/ and https://cloud.google.com/storage/docs/json_api/v1/buckets :type entries: list of dictionaries :param entries: A sequence of mappings describing each CORS policy. """ self._patch_property("cors", entries) default_event_based_hold = _scalar_property("defaultEventBasedHold") """Are uploaded objects automatically placed under an even-based hold? If True, uploaded objects will be placed under an event-based hold to be released at a future time. When released an object will then begin the retention period determined by the policy retention period for the object bucket. See https://cloud.google.com/storage/docs/json_api/v1/buckets If the property is not set locally, returns ``None``. :rtype: bool or ``NoneType`` """ @property def default_kms_key_name(self): """Retrieve / set default KMS encryption key for objects in the bucket. See https://cloud.google.com/storage/docs/json_api/v1/buckets :setter: Set default KMS encryption key for items in this bucket. :getter: Get default KMS encryption key for items in this bucket. :rtype: str :returns: Default KMS encryption key, or ``None`` if not set. """ encryption_config = self._properties.get("encryption", {}) return encryption_config.get("defaultKmsKeyName") @default_kms_key_name.setter def default_kms_key_name(self, value): """Set default KMS encryption key for objects in the bucket. :type value: str or None :param value: new KMS key name (None to clear any existing key). """ encryption_config = self._properties.get("encryption", {}) encryption_config["defaultKmsKeyName"] = value self._patch_property("encryption", encryption_config) @property def labels(self): """Retrieve or set labels assigned to this bucket. See https://cloud.google.com/storage/docs/json_api/v1/buckets#labels .. note:: The getter for this property returns a dict which is a *copy* of the bucket's labels. Mutating that dict has no effect unless you then re-assign the dict via the setter. E.g.: >>> labels = bucket.labels >>> labels['new_key'] = 'some-label' >>> del labels['old_key'] >>> bucket.labels = labels >>> bucket.update() :setter: Set labels for this bucket. :getter: Gets the labels for this bucket. :rtype: :class:`dict` :returns: Name-value pairs (string->string) labelling the bucket. """ labels = self._properties.get("labels") if labels is None: return {} return copy.deepcopy(labels) @labels.setter def labels(self, mapping): """Set labels assigned to this bucket. See https://cloud.google.com/storage/docs/json_api/v1/buckets#labels :type mapping: :class:`dict` :param mapping: Name-value pairs (string->string) labelling the bucket. """ # If any labels have been expressly removed, we need to track this # so that a future .patch() call can do the correct thing. existing = set([k for k in self.labels.keys()]) incoming = set([k for k in mapping.keys()]) self._label_removals = self._label_removals.union(existing.difference(incoming)) mapping = {k: str(v) for k, v in mapping.items()} # Actually update the labels on the object. self._patch_property("labels", copy.deepcopy(mapping)) @property def etag(self): """Retrieve the ETag for the bucket. See https://tools.ietf.org/html/rfc2616#section-3.11 and https://cloud.google.com/storage/docs/json_api/v1/buckets :rtype: str or ``NoneType`` :returns: The bucket etag or ``None`` if the bucket's resource has not been loaded from the server. """ return self._properties.get("etag") @property def id(self): """Retrieve the ID for the bucket. See https://cloud.google.com/storage/docs/json_api/v1/buckets :rtype: str or ``NoneType`` :returns: The ID of the bucket or ``None`` if the bucket's resource has not been loaded from the server. """ return self._properties.get("id") @property def iam_configuration(self): """Retrieve IAM configuration for this bucket. :rtype: :class:`IAMConfiguration` :returns: an instance for managing the bucket's IAM configuration. """ info = self._properties.get("iamConfiguration", {}) return IAMConfiguration.from_api_repr(info, self) @property def soft_delete_policy(self): """Retrieve the soft delete policy for this bucket. See https://cloud.google.com/storage/docs/soft-delete :rtype: :class:`SoftDeletePolicy` :returns: an instance for managing the bucket's soft delete policy. """ policy = self._properties.get("softDeletePolicy", {}) return SoftDeletePolicy.from_api_repr(policy, self) @property def lifecycle_rules(self): """Retrieve or set lifecycle rules configured for this bucket. See https://cloud.google.com/storage/docs/lifecycle and https://cloud.google.com/storage/docs/json_api/v1/buckets .. note:: The getter for this property returns a generator which yields *copies* of the bucket's lifecycle rules mappings. Mutating the output dicts has no effect unless you then re-assign the dict via the setter. E.g.: >>> rules = list(bucket.lifecycle_rules) >>> rules.append({'origin': '/foo', ...}) >>> rules[1]['rule']['action']['type'] = 'Delete' >>> del rules[0] >>> bucket.lifecycle_rules = rules >>> bucket.update() :setter: Set lifecycle rules for this bucket. :getter: Gets the lifecycle rules for this bucket. :rtype: generator(dict) :returns: A sequence of mappings describing each lifecycle rule. """ info = self._properties.get("lifecycle", {}) for rule in info.get("rule", ()): action_type = rule["action"]["type"] if action_type == "Delete": yield LifecycleRuleDelete.from_api_repr(rule) elif action_type == "SetStorageClass": yield LifecycleRuleSetStorageClass.from_api_repr(rule) elif action_type == "AbortIncompleteMultipartUpload": yield LifecycleRuleAbortIncompleteMultipartUpload.from_api_repr(rule) else: warnings.warn( "Unknown lifecycle rule type received: {}. Please upgrade to the latest version of google-cloud-storage.".format( rule ), UserWarning, stacklevel=1, ) @lifecycle_rules.setter def lifecycle_rules(self, rules): """Set lifecycle rules configured for this bucket. See https://cloud.google.com/storage/docs/lifecycle and https://cloud.google.com/storage/docs/json_api/v1/buckets :type rules: list of dictionaries :param rules: A sequence of mappings describing each lifecycle rule. """ rules = [dict(rule) for rule in rules] # Convert helpers if needed self._patch_property("lifecycle", {"rule": rules}) def clear_lifecycle_rules(self): """Clear lifecycle rules configured for this bucket. See https://cloud.google.com/storage/docs/lifecycle and https://cloud.google.com/storage/docs/json_api/v1/buckets """ self.lifecycle_rules = [] def clear_lifecyle_rules(self): """Deprecated alias for clear_lifecycle_rules.""" return self.clear_lifecycle_rules() def add_lifecycle_delete_rule(self, **kw): """Add a "delete" rule to lifecycle rules configured for this bucket. This defines a [lifecycle configuration](https://cloud.google.com/storage/docs/lifecycle), which is set on the bucket. For the general format of a lifecycle configuration, see the [bucket resource representation for JSON](https://cloud.google.com/storage/docs/json_api/v1/buckets). See also a [code sample](https://cloud.google.com/storage/docs/samples/storage-enable-bucket-lifecycle-management#storage_enable_bucket_lifecycle_management-python). :type kw: dict :params kw: arguments passed to :class:`LifecycleRuleConditions`. """ rules = list(self.lifecycle_rules) rules.append(LifecycleRuleDelete(**kw)) self.lifecycle_rules = rules def add_lifecycle_set_storage_class_rule(self, storage_class, **kw): """Add a "set storage class" rule to lifecycle rules. This defines a [lifecycle configuration](https://cloud.google.com/storage/docs/lifecycle), which is set on the bucket. For the general format of a lifecycle configuration, see the [bucket resource representation for JSON](https://cloud.google.com/storage/docs/json_api/v1/buckets). :type storage_class: str, one of :attr:`STORAGE_CLASSES`. :param storage_class: new storage class to assign to matching items. :type kw: dict :params kw: arguments passed to :class:`LifecycleRuleConditions`. """ rules = list(self.lifecycle_rules) rules.append(LifecycleRuleSetStorageClass(storage_class, **kw)) self.lifecycle_rules = rules def add_lifecycle_abort_incomplete_multipart_upload_rule(self, **kw): """Add a "abort incomplete multipart upload" rule to lifecycle rules. .. note:: The "age" lifecycle condition is the only supported condition for this rule. This defines a [lifecycle configuration](https://cloud.google.com/storage/docs/lifecycle), which is set on the bucket. For the general format of a lifecycle configuration, see the [bucket resource representation for JSON](https://cloud.google.com/storage/docs/json_api/v1/buckets). :type kw: dict :params kw: arguments passed to :class:`LifecycleRuleConditions`. """ rules = list(self.lifecycle_rules) rules.append(LifecycleRuleAbortIncompleteMultipartUpload(**kw)) self.lifecycle_rules = rules _location = _scalar_property("location") @property def location(self): """Retrieve location configured for this bucket. See https://cloud.google.com/storage/docs/json_api/v1/buckets and https://cloud.google.com/storage/docs/locations Returns ``None`` if the property has not been set before creation, or if the bucket's resource has not been loaded from the server. :rtype: str or ``NoneType`` """ return self._location @location.setter def location(self, value): """(Deprecated) Set `Bucket.location` This can only be set at bucket **creation** time. See https://cloud.google.com/storage/docs/json_api/v1/buckets and https://cloud.google.com/storage/docs/bucket-locations .. warning:: Assignment to 'Bucket.location' is deprecated, as it is only valid before the bucket is created. Instead, pass the location to `Bucket.create`. """ warnings.warn(_LOCATION_SETTER_MESSAGE, DeprecationWarning, stacklevel=2) self._location = value @property def data_locations(self): """Retrieve the list of regional locations for custom dual-region buckets. See https://cloud.google.com/storage/docs/json_api/v1/buckets and https://cloud.google.com/storage/docs/locations Returns ``None`` if the property has not been set before creation, if the bucket's resource has not been loaded from the server, or if the bucket is not a dual-regions bucket. :rtype: list of str or ``NoneType`` """ custom_placement_config = self._properties.get("customPlacementConfig", {}) return custom_placement_config.get("dataLocations") @property def location_type(self): """Retrieve the location type for the bucket. See https://cloud.google.com/storage/docs/storage-classes :getter: Gets the the location type for this bucket. :rtype: str or ``NoneType`` :returns: If set, one of :attr:`~google.cloud.storage.constants.MULTI_REGION_LOCATION_TYPE`, :attr:`~google.cloud.storage.constants.REGION_LOCATION_TYPE`, or :attr:`~google.cloud.storage.constants.DUAL_REGION_LOCATION_TYPE`, else ``None``. """ return self._properties.get("locationType") def get_logging(self): """Return info about access logging for this bucket. See https://cloud.google.com/storage/docs/access-logs#status :rtype: dict or None :returns: a dict w/ keys, ``logBucket`` and ``logObjectPrefix`` (if logging is enabled), or None (if not). """ info = self._properties.get("logging") return copy.deepcopy(info) def enable_logging(self, bucket_name, object_prefix=""): """Enable access logging for this bucket. See https://cloud.google.com/storage/docs/access-logs :type bucket_name: str :param bucket_name: name of bucket in which to store access logs :type object_prefix: str :param object_prefix: prefix for access log filenames """ info = {"logBucket": bucket_name, "logObjectPrefix": object_prefix} self._patch_property("logging", info) def disable_logging(self): """Disable access logging for this bucket. See https://cloud.google.com/storage/docs/access-logs#disabling """ self._patch_property("logging", None) @property def metageneration(self): """Retrieve the metageneration for the bucket. See https://cloud.google.com/storage/docs/json_api/v1/buckets :rtype: int or ``NoneType`` :returns: The metageneration of the bucket or ``None`` if the bucket's resource has not been loaded from the server. """ metageneration = self._properties.get("metageneration") if metageneration is not None: return int(metageneration) @property def owner(self): """Retrieve info about the owner of the bucket. See https://cloud.google.com/storage/docs/json_api/v1/buckets :rtype: dict or ``NoneType`` :returns: Mapping of owner's role/ID. Returns ``None`` if the bucket's resource has not been loaded from the server. """ return copy.deepcopy(self._properties.get("owner")) @property def project_number(self): """Retrieve the number of the project to which the bucket is assigned. See https://cloud.google.com/storage/docs/json_api/v1/buckets :rtype: int or ``NoneType`` :returns: The project number that owns the bucket or ``None`` if the bucket's resource has not been loaded from the server. """ project_number = self._properties.get("projectNumber") if project_number is not None: return int(project_number) @property def retention_policy_effective_time(self): """Retrieve the effective time of the bucket's retention policy. :rtype: datetime.datetime or ``NoneType`` :returns: point-in time at which the bucket's retention policy is effective, or ``None`` if the property is not set locally. """ policy = self._properties.get("retentionPolicy") if policy is not None: timestamp = policy.get("effectiveTime") if timestamp is not None: return _rfc3339_nanos_to_datetime(timestamp) @property def retention_policy_locked(self): """Retrieve whthere the bucket's retention policy is locked. :rtype: bool :returns: True if the bucket's policy is locked, or else False if the policy is not locked, or the property is not set locally. """ policy = self._properties.get("retentionPolicy") if policy is not None: return policy.get("isLocked") @property def retention_period(self): """Retrieve or set the retention period for items in the bucket. :rtype: int or ``NoneType`` :returns: number of seconds to retain items after upload or release from event-based lock, or ``None`` if the property is not set locally. """ policy = self._properties.get("retentionPolicy") if policy is not None: period = policy.get("retentionPeriod") if period is not None: return int(period) @retention_period.setter def retention_period(self, value): """Set the retention period for items in the bucket. :type value: int :param value: number of seconds to retain items after upload or release from event-based lock. :raises ValueError: if the bucket's retention policy is locked. """ policy = self._properties.setdefault("retentionPolicy", {}) if value is not None: policy["retentionPeriod"] = str(value) else: policy = None self._patch_property("retentionPolicy", policy) @property def self_link(self): """Retrieve the URI for the bucket. See https://cloud.google.com/storage/docs/json_api/v1/buckets :rtype: str or ``NoneType`` :returns: The self link for the bucket or ``None`` if the bucket's resource has not been loaded from the server. """ return self._properties.get("selfLink") @property def storage_class(self): """Retrieve or set the storage class for the bucket. See https://cloud.google.com/storage/docs/storage-classes :setter: Set the storage class for this bucket. :getter: Gets the the storage class for this bucket. :rtype: str or ``NoneType`` :returns: If set, one of :attr:`~google.cloud.storage.constants.NEARLINE_STORAGE_CLASS`, :attr:`~google.cloud.storage.constants.COLDLINE_STORAGE_CLASS`, :attr:`~google.cloud.storage.constants.ARCHIVE_STORAGE_CLASS`, :attr:`~google.cloud.storage.constants.STANDARD_STORAGE_CLASS`, :attr:`~google.cloud.storage.constants.MULTI_REGIONAL_LEGACY_STORAGE_CLASS`, :attr:`~google.cloud.storage.constants.REGIONAL_LEGACY_STORAGE_CLASS`, or :attr:`~google.cloud.storage.constants.DURABLE_REDUCED_AVAILABILITY_LEGACY_STORAGE_CLASS`, else ``None``. """ return self._properties.get("storageClass") @storage_class.setter def storage_class(self, value): """Set the storage class for the bucket. See https://cloud.google.com/storage/docs/storage-classes :type value: str :param value: One of :attr:`~google.cloud.storage.constants.NEARLINE_STORAGE_CLASS`, :attr:`~google.cloud.storage.constants.COLDLINE_STORAGE_CLASS`, :attr:`~google.cloud.storage.constants.ARCHIVE_STORAGE_CLASS`, :attr:`~google.cloud.storage.constants.STANDARD_STORAGE_CLASS`, :attr:`~google.cloud.storage.constants.MULTI_REGIONAL_LEGACY_STORAGE_CLASS`, :attr:`~google.cloud.storage.constants.REGIONAL_LEGACY_STORAGE_CLASS`, or :attr:`~google.cloud.storage.constants.DURABLE_REDUCED_AVAILABILITY_LEGACY_STORAGE_CLASS`, """ self._patch_property("storageClass", value) @property def time_created(self): """Retrieve the timestamp at which the bucket was created. See https://cloud.google.com/storage/docs/json_api/v1/buckets :rtype: :class:`datetime.datetime` or ``NoneType`` :returns: Datetime object parsed from RFC3339 valid timestamp, or ``None`` if the bucket's resource has not been loaded from the server. """ value = self._properties.get("timeCreated") if value is not None: return _rfc3339_nanos_to_datetime(value) @property def updated(self): """Retrieve the timestamp at which the bucket was last updated. See https://cloud.google.com/storage/docs/json_api/v1/buckets :rtype: :class:`datetime.datetime` or ``NoneType`` :returns: Datetime object parsed from RFC3339 valid timestamp, or ``None`` if the bucket's resource has not been loaded from the server. """ value = self._properties.get("updated") if value is not None: return _rfc3339_nanos_to_datetime(value) @property def versioning_enabled(self): """Is versioning enabled for this bucket? See https://cloud.google.com/storage/docs/object-versioning for details. :setter: Update whether versioning is enabled for this bucket. :getter: Query whether versioning is enabled for this bucket. :rtype: bool :returns: True if enabled, else False. """ versioning = self._properties.get("versioning", {}) return versioning.get("enabled", False) @versioning_enabled.setter def versioning_enabled(self, value): """Enable versioning for this bucket. See https://cloud.google.com/storage/docs/object-versioning for details. :type value: convertible to boolean :param value: should versioning be enabled for the bucket? """ self._patch_property("versioning", {"enabled": bool(value)}) @property def requester_pays(self): """Does the requester pay for API requests for this bucket? See https://cloud.google.com/storage/docs/requester-pays for details. :setter: Update whether requester pays for this bucket. :getter: Query whether requester pays for this bucket. :rtype: bool :returns: True if requester pays for API requests for the bucket, else False. """ versioning = self._properties.get("billing", {}) return versioning.get("requesterPays", False) @requester_pays.setter def requester_pays(self, value): """Update whether requester pays for API requests for this bucket. See https://cloud.google.com/storage/docs/using-requester-pays for details. :type value: convertible to boolean :param value: should requester pay for API requests for the bucket? """ self._patch_property("billing", {"requesterPays": bool(value)}) @property def autoclass_enabled(self): """Whether Autoclass is enabled for this bucket. See https://cloud.google.com/storage/docs/using-autoclass for details. :setter: Update whether autoclass is enabled for this bucket. :getter: Query whether autoclass is enabled for this bucket. :rtype: bool :returns: True if enabled, else False. """ autoclass = self._properties.get("autoclass", {}) return autoclass.get("enabled", False) @autoclass_enabled.setter def autoclass_enabled(self, value): """Enable or disable Autoclass at the bucket-level. See https://cloud.google.com/storage/docs/using-autoclass for details. :type value: convertible to boolean :param value: If true, enable Autoclass for this bucket. If false, disable Autoclass for this bucket. """ autoclass = self._properties.get("autoclass", {}) autoclass["enabled"] = bool(value) self._patch_property("autoclass", autoclass) @property def autoclass_toggle_time(self): """Retrieve the toggle time when Autoclaass was last enabled or disabled for the bucket. :rtype: datetime.datetime or ``NoneType`` :returns: point-in time at which the bucket's autoclass is toggled, or ``None`` if the property is not set locally. """ autoclass = self._properties.get("autoclass") if autoclass is not None: timestamp = autoclass.get("toggleTime") if timestamp is not None: return _rfc3339_nanos_to_datetime(timestamp) @property def autoclass_terminal_storage_class(self): """The storage class that objects in an Autoclass bucket eventually transition to if they are not read for a certain length of time. Valid values are NEARLINE and ARCHIVE. See https://cloud.google.com/storage/docs/using-autoclass for details. :setter: Set the terminal storage class for Autoclass configuration. :getter: Get the terminal storage class for Autoclass configuration. :rtype: str :returns: The terminal storage class if Autoclass is enabled, else ``None``. """ autoclass = self._properties.get("autoclass", {}) return autoclass.get("terminalStorageClass", None) @autoclass_terminal_storage_class.setter def autoclass_terminal_storage_class(self, value): """The storage class that objects in an Autoclass bucket eventually transition to if they are not read for a certain length of time. Valid values are NEARLINE and ARCHIVE. See https://cloud.google.com/storage/docs/using-autoclass for details. :type value: str :param value: The only valid values are `"NEARLINE"` and `"ARCHIVE"`. """ autoclass = self._properties.get("autoclass", {}) autoclass["terminalStorageClass"] = value self._patch_property("autoclass", autoclass) @property def autoclass_terminal_storage_class_update_time(self): """The time at which the Autoclass terminal_storage_class field was last updated for this bucket :rtype: datetime.datetime or ``NoneType`` :returns: point-in time at which the bucket's terminal_storage_class is last updated, or ``None`` if the property is not set locally. """ autoclass = self._properties.get("autoclass") if autoclass is not None: timestamp = autoclass.get("terminalStorageClassUpdateTime") if timestamp is not None: return _rfc3339_nanos_to_datetime(timestamp) @property def object_retention_mode(self): """Retrieve the object retention mode set on the bucket. :rtype: str :returns: When set to Enabled, retention configurations can be set on objects in the bucket. """ object_retention = self._properties.get("objectRetention") if object_retention is not None: return object_retention.get("mode") @property def hierarchical_namespace_enabled(self): """Whether hierarchical namespace is enabled for this bucket. :setter: Update whether hierarchical namespace is enabled for this bucket. :getter: Query whether hierarchical namespace is enabled for this bucket. :rtype: bool :returns: True if enabled, else False. """ hns = self._properties.get("hierarchicalNamespace", {}) return hns.get("enabled") @hierarchical_namespace_enabled.setter def hierarchical_namespace_enabled(self, value): """Enable or disable hierarchical namespace at the bucket-level. :type value: convertible to boolean :param value: If true, enable hierarchical namespace for this bucket. If false, disable hierarchical namespace for this bucket. .. note:: To enable hierarchical namespace, you must set it at bucket creation time. Currently, hierarchical namespace configuration cannot be changed after bucket creation. """ hns = self._properties.get("hierarchicalNamespace", {}) hns["enabled"] = bool(value) self._patch_property("hierarchicalNamespace", hns) def configure_website(self, main_page_suffix=None, not_found_page=None): """Configure website-related properties. See https://cloud.google.com/storage/docs/static-website .. note:: This configures the bucket's website-related properties,controlling how the service behaves when accessing bucket contents as a web site. See [tutorials](https://cloud.google.com/storage/docs/hosting-static-website) and [code samples](https://cloud.google.com/storage/docs/samples/storage-define-bucket-website-configuration#storage_define_bucket_website_configuration-python) for more information. :type main_page_suffix: str :param main_page_suffix: The page to use as the main page of a directory. Typically something like index.html. :type not_found_page: str :param not_found_page: The file to use when a page isn't found. """ data = {"mainPageSuffix": main_page_suffix, "notFoundPage": not_found_page} self._patch_property("website", data) def disable_website(self): """Disable the website configuration for this bucket. This is really just a shortcut for setting the website-related attributes to ``None``. """ return self.configure_website(None, None) def get_iam_policy( self, client=None, requested_policy_version=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY, ): """Retrieve the IAM policy for the bucket. See [API reference docs](https://cloud.google.com/storage/docs/json_api/v1/buckets/getIamPolicy) and a [code sample](https://cloud.google.com/storage/docs/samples/storage-view-bucket-iam-members#storage_view_bucket_iam_members-python). If :attr:`user_project` is set, bills the API request to that project. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type requested_policy_version: int or ``NoneType`` :param requested_policy_version: (Optional) The version of IAM policies to request. If a policy with a condition is requested without setting this, the server will return an error. This must be set to a value of 3 to retrieve IAM policies containing conditions. This is to prevent client code that isn't aware of IAM conditions from interpreting and modifying policies incorrectly. The service might return a policy with version lower than the one that was requested, based on the feature syntax in the policy fetched. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` :rtype: :class:`google.api_core.iam.Policy` :returns: the policy instance, based on the resource returned from the ``getIamPolicy`` API request. """ client = self._require_client(client) query_params = {} if self.user_project is not None: query_params["userProject"] = self.user_project if requested_policy_version is not None: query_params["optionsRequestedPolicyVersion"] = requested_policy_version info = client._get_resource( f"{self.path}/iam", query_params=query_params, timeout=timeout, retry=retry, _target_object=None, ) return Policy.from_api_repr(info) def set_iam_policy( self, policy, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY_IF_ETAG_IN_JSON, ): """Update the IAM policy for the bucket. See https://cloud.google.com/storage/docs/json_api/v1/buckets/setIamPolicy If :attr:`user_project` is set, bills the API request to that project. :type policy: :class:`google.api_core.iam.Policy` :param policy: policy instance used to update bucket's IAM policy. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` :rtype: :class:`google.api_core.iam.Policy` :returns: the policy instance, based on the resource returned from the ``setIamPolicy`` API request. """ client = self._require_client(client) query_params = {} if self.user_project is not None: query_params["userProject"] = self.user_project path = f"{self.path}/iam" resource = policy.to_api_repr() resource["resourceId"] = self.path info = client._put_resource( path, resource, query_params=query_params, timeout=timeout, retry=retry, _target_object=None, ) return Policy.from_api_repr(info) def test_iam_permissions( self, permissions, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY ): """API call: test permissions See https://cloud.google.com/storage/docs/json_api/v1/buckets/testIamPermissions If :attr:`user_project` is set, bills the API request to that project. :type permissions: list of string :param permissions: the permissions to check :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` :rtype: list of string :returns: the permissions returned by the ``testIamPermissions`` API request. """ client = self._require_client(client) query_params = {"permissions": permissions} if self.user_project is not None: query_params["userProject"] = self.user_project path = f"{self.path}/iam/testPermissions" resp = client._get_resource( path, query_params=query_params, timeout=timeout, retry=retry, _target_object=None, ) return resp.get("permissions", []) def make_public( self, recursive=False, future=False, client=None, timeout=_DEFAULT_TIMEOUT, if_metageneration_match=None, if_metageneration_not_match=None, retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, ): """Update bucket's ACL, granting read access to anonymous users. :type recursive: bool :param recursive: If True, this will make all blobs inside the bucket public as well. :type future: bool :param future: If True, this will make all objects created in the future public as well. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type if_metageneration_match: long :param if_metageneration_match: (Optional) Make the operation conditional on whether the blob's current metageneration matches the given value. :type if_metageneration_not_match: long :param if_metageneration_not_match: (Optional) Make the operation conditional on whether the blob's current metageneration does not match the given value. :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` :raises ValueError: If ``recursive`` is True, and the bucket contains more than 256 blobs. This is to prevent extremely long runtime of this method. For such buckets, iterate over the blobs returned by :meth:`list_blobs` and call :meth:`~google.cloud.storage.blob.Blob.make_public` for each blob. """ self.acl.all().grant_read() self.acl.save( client=client, timeout=timeout, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, retry=retry, ) if future: doa = self.default_object_acl if not doa.loaded: doa.reload(client=client, timeout=timeout) doa.all().grant_read() doa.save( client=client, timeout=timeout, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, retry=retry, ) if recursive: blobs = list( self.list_blobs( projection="full", max_results=self._MAX_OBJECTS_FOR_ITERATION + 1, client=client, timeout=timeout, ) ) if len(blobs) > self._MAX_OBJECTS_FOR_ITERATION: message = ( "Refusing to make public recursively with more than " "%d objects. If you actually want to make every object " "in this bucket public, iterate through the blobs " "returned by 'Bucket.list_blobs()' and call " "'make_public' on each one." ) % (self._MAX_OBJECTS_FOR_ITERATION,) raise ValueError(message) for blob in blobs: blob.acl.all().grant_read() blob.acl.save( client=client, timeout=timeout, ) def make_private( self, recursive=False, future=False, client=None, timeout=_DEFAULT_TIMEOUT, if_metageneration_match=None, if_metageneration_not_match=None, retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, ): """Update bucket's ACL, revoking read access for anonymous users. :type recursive: bool :param recursive: If True, this will make all blobs inside the bucket private as well. :type future: bool :param future: If True, this will make all objects created in the future private as well. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type if_metageneration_match: long :param if_metageneration_match: (Optional) Make the operation conditional on whether the blob's current metageneration matches the given value. :type if_metageneration_not_match: long :param if_metageneration_not_match: (Optional) Make the operation conditional on whether the blob's current metageneration does not match the given value. :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` :raises ValueError: If ``recursive`` is True, and the bucket contains more than 256 blobs. This is to prevent extremely long runtime of this method. For such buckets, iterate over the blobs returned by :meth:`list_blobs` and call :meth:`~google.cloud.storage.blob.Blob.make_private` for each blob. """ self.acl.all().revoke_read() self.acl.save( client=client, timeout=timeout, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, retry=retry, ) if future: doa = self.default_object_acl if not doa.loaded: doa.reload(client=client, timeout=timeout) doa.all().revoke_read() doa.save( client=client, timeout=timeout, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, retry=retry, ) if recursive: blobs = list( self.list_blobs( projection="full", max_results=self._MAX_OBJECTS_FOR_ITERATION + 1, client=client, timeout=timeout, ) ) if len(blobs) > self._MAX_OBJECTS_FOR_ITERATION: message = ( "Refusing to make private recursively with more than " "%d objects. If you actually want to make every object " "in this bucket private, iterate through the blobs " "returned by 'Bucket.list_blobs()' and call " "'make_private' on each one." ) % (self._MAX_OBJECTS_FOR_ITERATION,) raise ValueError(message) for blob in blobs: blob.acl.all().revoke_read() blob.acl.save(client=client, timeout=timeout) def generate_upload_policy(self, conditions, expiration=None, client=None): """Create a signed upload policy for uploading objects. This method generates and signs a policy document. You can use [`policy documents`](https://cloud.google.com/storage/docs/xml-api/post-object-forms) to allow visitors to a website to upload files to Google Cloud Storage without giving them direct write access. See a [code sample](https://cloud.google.com/storage/docs/xml-api/post-object-forms#python). :type expiration: datetime :param expiration: (Optional) Expiration in UTC. If not specified, the policy will expire in 1 hour. :type conditions: list :param conditions: A list of conditions as described in the `policy documents` documentation. :type client: :class:`~google.cloud.storage.client.Client` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the current bucket. :rtype: dict :returns: A dictionary of (form field name, form field value) of form fields that should be added to your HTML upload form in order to attach the signature. """ client = self._require_client(client) credentials = client._credentials _signing.ensure_signed_credentials(credentials) if expiration is None: expiration = _NOW(_UTC).replace(tzinfo=None) + datetime.timedelta(hours=1) conditions = conditions + [{"bucket": self.name}] policy_document = { "expiration": _datetime_to_rfc3339(expiration), "conditions": conditions, } encoded_policy_document = base64.b64encode( json.dumps(policy_document).encode("utf-8") ) signature = base64.b64encode(credentials.sign_bytes(encoded_policy_document)) fields = { "bucket": self.name, "GoogleAccessId": credentials.signer_email, "policy": encoded_policy_document.decode("utf-8"), "signature": signature.decode("utf-8"), } return fields def lock_retention_policy( self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY ): """Lock the bucket's retention policy. :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the blob's bucket. :type timeout: float or tuple :param timeout: (Optional) The amount of time, in seconds, to wait for the server response. See: :ref:`configuring_timeouts` :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` :raises ValueError: if the bucket has no metageneration (i.e., new or never reloaded); if the bucket has no retention policy assigned; if the bucket's retention policy is already locked. """ if "metageneration" not in self._properties: raise ValueError("Bucket has no retention policy assigned: try 'reload'?") policy = self._properties.get("retentionPolicy") if policy is None: raise ValueError("Bucket has no retention policy assigned: try 'reload'?") if policy.get("isLocked"): raise ValueError("Bucket's retention policy is already locked.") client = self._require_client(client) query_params = {"ifMetagenerationMatch": self.metageneration} if self.user_project is not None: query_params["userProject"] = self.user_project path = f"/b/{self.name}/lockRetentionPolicy" api_response = client._post_resource( path, None, query_params=query_params, timeout=timeout, retry=retry, _target_object=self, ) self._set_properties(api_response) def generate_signed_url( self, expiration=None, api_access_endpoint=None, method="GET", headers=None, query_parameters=None, client=None, credentials=None, version=None, virtual_hosted_style=False, bucket_bound_hostname=None, scheme="http", ): """Generates a signed URL for this bucket. .. note:: If you are on Google Compute Engine, you can't generate a signed URL using GCE service account. If you'd like to be able to generate a signed URL from GCE, you can use a standard service account from a JSON file rather than a GCE service account. If you have a bucket that you want to allow access to for a set amount of time, you can use this method to generate a URL that is only valid within a certain time period. If ``bucket_bound_hostname`` is set as an argument of :attr:`api_access_endpoint`, ``https`` works only if using a ``CDN``. :type expiration: Union[Integer, datetime.datetime, datetime.timedelta] :param expiration: Point in time when the signed URL should expire. If a ``datetime`` instance is passed without an explicit ``tzinfo`` set, it will be assumed to be ``UTC``. :type api_access_endpoint: str :param api_access_endpoint: (Optional) URI base, for instance "https://storage.googleapis.com". If not specified, the client's api_endpoint will be used. Incompatible with bucket_bound_hostname. :type method: str :param method: The HTTP verb that will be used when requesting the URL. :type headers: dict :param headers: (Optional) Additional HTTP headers to be included as part of the signed URLs. See: https://cloud.google.com/storage/docs/xml-api/reference-headers Requests using the signed URL *must* pass the specified header (name and value) with each request for the URL. :type query_parameters: dict :param query_parameters: (Optional) Additional query parameters to be included as part of the signed URLs. See: https://cloud.google.com/storage/docs/xml-api/reference-headers#query :type client: :class:`~google.cloud.storage.client.Client` or ``NoneType`` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the blob's bucket. :type credentials: :class:`google.auth.credentials.Credentials` or :class:`NoneType` :param credentials: The authorization credentials to attach to requests. These credentials identify this application to the service. If none are specified, the client will attempt to ascertain the credentials from the environment. :type version: str :param version: (Optional) The version of signed credential to create. Must be one of 'v2' | 'v4'. :type virtual_hosted_style: bool :param virtual_hosted_style: (Optional) If true, then construct the URL relative the bucket's virtual hostname, e.g., '.storage.googleapis.com'. Incompatible with bucket_bound_hostname. :type bucket_bound_hostname: str :param bucket_bound_hostname: (Optional) If passed, then construct the URL relative to the bucket-bound hostname. Value can be a bare or with scheme, e.g., 'example.com' or 'http://example.com'. Incompatible with api_access_endpoint and virtual_hosted_style. See: https://cloud.google.com/storage/docs/request-endpoints#cname :type scheme: str :param scheme: (Optional) If ``bucket_bound_hostname`` is passed as a bare hostname, use this value as the scheme. ``https`` will work only when using a CDN. Defaults to ``"http"``. :raises: :exc:`ValueError` when version is invalid or mutually exclusive arguments are used. :raises: :exc:`TypeError` when expiration is not a valid type. :raises: :exc:`AttributeError` if credentials is not an instance of :class:`google.auth.credentials.Signing`. :rtype: str :returns: A signed URL you can use to access the resource until expiration. """ if version is None: version = "v2" elif version not in ("v2", "v4"): raise ValueError("'version' must be either 'v2' or 'v4'") if ( api_access_endpoint is not None or virtual_hosted_style ) and bucket_bound_hostname: raise ValueError( "The bucket_bound_hostname argument is not compatible with " "either api_access_endpoint or virtual_hosted_style." ) if api_access_endpoint is None: client = self._require_client(client) api_access_endpoint = client.api_endpoint # If you are on Google Compute Engine, you can't generate a signed URL # using GCE service account. # See https://github.com/googleapis/google-auth-library-python/issues/50 if virtual_hosted_style: api_access_endpoint = _virtual_hosted_style_base_url( api_access_endpoint, self.name ) resource = "/" elif bucket_bound_hostname: api_access_endpoint = _bucket_bound_hostname_url( bucket_bound_hostname, scheme ) resource = "/" else: resource = f"/{self.name}" if credentials is None: client = self._require_client(client) # May be redundant, but that's ok. credentials = client._credentials if version == "v2": helper = generate_signed_url_v2 else: helper = generate_signed_url_v4 return helper( credentials, resource=resource, expiration=expiration, api_access_endpoint=api_access_endpoint, method=method.upper(), headers=headers, query_parameters=query_parameters, ) class SoftDeletePolicy(dict): """Map a bucket's soft delete policy. See https://cloud.google.com/storage/docs/soft-delete :type bucket: :class:`Bucket` :param bucket: Bucket for which this instance is the policy. :type retention_duration_seconds: int :param retention_duration_seconds: (Optional) The period of time in seconds that soft-deleted objects in the bucket will be retained and cannot be permanently deleted. :type effective_time: :class:`datetime.datetime` :param effective_time: (Optional) When the bucket's soft delete policy is effective. This value should normally only be set by the back-end API. """ def __init__(self, bucket, **kw): data = {} retention_duration_seconds = kw.get("retention_duration_seconds") data["retentionDurationSeconds"] = retention_duration_seconds effective_time = kw.get("effective_time") if effective_time is not None: effective_time = _datetime_to_rfc3339(effective_time) data["effectiveTime"] = effective_time super().__init__(data) self._bucket = bucket @classmethod def from_api_repr(cls, resource, bucket): """Factory: construct instance from resource. :type resource: dict :param resource: mapping as returned from API call. :type bucket: :class:`Bucket` :params bucket: Bucket for which this instance is the policy. :rtype: :class:`SoftDeletePolicy` :returns: Instance created from resource. """ instance = cls(bucket) instance.update(resource) return instance @property def bucket(self): """Bucket for which this instance is the policy. :rtype: :class:`Bucket` :returns: the instance's bucket. """ return self._bucket @property def retention_duration_seconds(self): """Get the retention duration of the bucket's soft delete policy. :rtype: int or ``NoneType`` :returns: The period of time in seconds that soft-deleted objects in the bucket will be retained and cannot be permanently deleted; Or ``None`` if the property is not set. """ duration = self.get("retentionDurationSeconds") if duration is not None: return int(duration) @retention_duration_seconds.setter def retention_duration_seconds(self, value): """Set the retention duration of the bucket's soft delete policy. :type value: int :param value: The period of time in seconds that soft-deleted objects in the bucket will be retained and cannot be permanently deleted. """ self["retentionDurationSeconds"] = value self.bucket._patch_property("softDeletePolicy", self) @property def effective_time(self): """Get the effective time of the bucket's soft delete policy. :rtype: datetime.datetime or ``NoneType`` :returns: point-in time at which the bucket's soft delte policy is effective, or ``None`` if the property is not set. """ timestamp = self.get("effectiveTime") if timestamp is not None: return _rfc3339_nanos_to_datetime(timestamp) def _raise_if_len_differs(expected_len, **generation_match_args): """ Raise an error if any generation match argument is set and its len differs from the given value. :type expected_len: int :param expected_len: Expected argument length in case it's set. :type generation_match_args: dict :param generation_match_args: Lists, which length must be checked. :raises: :exc:`ValueError` if any argument set, but has an unexpected length. """ for name, value in generation_match_args.items(): if value is not None and len(value) != expected_len: raise ValueError(f"'{name}' length must be the same as 'blobs' length")