# Copyright 2017 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Support for resumable uploads. Also supported here are simple (media) uploads and multipart uploads that contain both metadata and a small file as payload. """ from google.resumable_media import _upload from google.resumable_media.requests import _request_helpers class SimpleUpload(_request_helpers.RequestsMixin, _upload.SimpleUpload): """Upload a resource to a Google API. A **simple** media upload sends no metadata and completes the upload in a single request. Args: upload_url (str): The URL where the content will be uploaded. headers (Optional[Mapping[str, str]]): Extra headers that should be sent with the request, e.g. headers for encrypted data. Attributes: upload_url (str): The URL where the content will be uploaded. """ def transmit( self, transport, data, content_type, timeout=( _request_helpers._DEFAULT_CONNECT_TIMEOUT, _request_helpers._DEFAULT_READ_TIMEOUT, ), ): """Transmit the resource to be uploaded. Args: transport (~requests.Session): A ``requests`` object which can make authenticated requests. data (bytes): The resource content to be uploaded. content_type (str): The content type of the resource, e.g. a JPEG image has content type ``image/jpeg``. timeout (Optional[Union[float, Tuple[float, float]]]): The number of seconds to wait for the server response. Depending on the retry strategy, a request may be repeated several times using the same timeout each time. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. Returns: ~requests.Response: The HTTP response returned by ``transport``. """ method, url, payload, headers = self._prepare_request(data, content_type) # Wrap the request business logic in a function to be retried. def retriable_request(): result = transport.request( method, url, data=payload, headers=headers, timeout=timeout ) self._process_response(result) return result return _request_helpers.wait_and_retry( retriable_request, self._get_status_code, self._retry_strategy ) class MultipartUpload(_request_helpers.RequestsMixin, _upload.MultipartUpload): """Upload a resource with metadata to a Google API. A **multipart** upload sends both metadata and the resource in a single (multipart) request. Args: upload_url (str): The URL where the content will be uploaded. headers (Optional[Mapping[str, str]]): Extra headers that should be sent with the request, e.g. headers for encrypted data. checksum Optional([str]): The type of checksum to compute to verify the integrity of the object. The request metadata will be amended to include the computed value. Using this option will override a manually-set checksum value. Supported values are "md5", "crc32c" and None. The default is None. Attributes: upload_url (str): The URL where the content will be uploaded. """ def transmit( self, transport, data, metadata, content_type, timeout=( _request_helpers._DEFAULT_CONNECT_TIMEOUT, _request_helpers._DEFAULT_READ_TIMEOUT, ), ): """Transmit the resource to be uploaded. Args: transport (~requests.Session): A ``requests`` object which can make authenticated requests. data (bytes): The resource content to be uploaded. metadata (Mapping[str, str]): The resource metadata, such as an ACL list. content_type (str): The content type of the resource, e.g. a JPEG image has content type ``image/jpeg``. timeout (Optional[Union[float, Tuple[float, float]]]): The number of seconds to wait for the server response. Depending on the retry strategy, a request may be repeated several times using the same timeout each time. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. Returns: ~requests.Response: The HTTP response returned by ``transport``. """ method, url, payload, headers = self._prepare_request( data, metadata, content_type ) # Wrap the request business logic in a function to be retried. def retriable_request(): result = transport.request( method, url, data=payload, headers=headers, timeout=timeout ) self._process_response(result) return result return _request_helpers.wait_and_retry( retriable_request, self._get_status_code, self._retry_strategy ) class ResumableUpload(_request_helpers.RequestsMixin, _upload.ResumableUpload): """Initiate and fulfill a resumable upload to a Google API. A **resumable** upload sends an initial request with the resource metadata and then gets assigned an upload ID / upload URL to send bytes to. Using the upload URL, the upload is then done in chunks (determined by the user) until all bytes have been uploaded. When constructing a resumable upload, only the resumable upload URL and the chunk size are required: .. testsetup:: resumable-constructor bucket = 'bucket-foo' .. doctest:: resumable-constructor >>> from google.resumable_media.requests import ResumableUpload >>> >>> url_template = ( ... 'https://www.googleapis.com/upload/storage/v1/b/{bucket}/o?' ... 'uploadType=resumable') >>> upload_url = url_template.format(bucket=bucket) >>> >>> chunk_size = 3 * 1024 * 1024 # 3MB >>> upload = ResumableUpload(upload_url, chunk_size) When initiating an upload (via :meth:`initiate`), the caller is expected to pass the resource being uploaded as a file-like ``stream``. If the size of the resource is explicitly known, it can be passed in directly: .. testsetup:: resumable-explicit-size import os import tempfile import mock import requests import http.client from google.resumable_media.requests import ResumableUpload upload_url = 'http://test.invalid' chunk_size = 3 * 1024 * 1024 # 3MB upload = ResumableUpload(upload_url, chunk_size) file_desc, filename = tempfile.mkstemp() os.close(file_desc) data = b'some bytes!' with open(filename, 'wb') as file_obj: file_obj.write(data) fake_response = requests.Response() fake_response.status_code = int(http.client.OK) fake_response._content = b'' resumable_url = 'http://test.invalid?upload_id=7up' fake_response.headers['location'] = resumable_url post_method = mock.Mock(return_value=fake_response, spec=[]) transport = mock.Mock(request=post_method, spec=['request']) .. doctest:: resumable-explicit-size >>> import os >>> >>> upload.total_bytes is None True >>> >>> stream = open(filename, 'rb') >>> total_bytes = os.path.getsize(filename) >>> metadata = {'name': filename} >>> response = upload.initiate( ... transport, stream, metadata, 'text/plain', ... total_bytes=total_bytes) >>> response >>> >>> upload.total_bytes == total_bytes True .. testcleanup:: resumable-explicit-size os.remove(filename) If the stream is in a "final" state (i.e. it won't have any more bytes written to it), the total number of bytes can be determined implicitly from the ``stream`` itself: .. testsetup:: resumable-implicit-size import io import mock import requests import http.client from google.resumable_media.requests import ResumableUpload upload_url = 'http://test.invalid' chunk_size = 3 * 1024 * 1024 # 3MB upload = ResumableUpload(upload_url, chunk_size) fake_response = requests.Response() fake_response.status_code = int(http.client.OK) fake_response._content = b'' resumable_url = 'http://test.invalid?upload_id=7up' fake_response.headers['location'] = resumable_url post_method = mock.Mock(return_value=fake_response, spec=[]) transport = mock.Mock(request=post_method, spec=['request']) data = b'some MOAR bytes!' metadata = {'name': 'some-file.jpg'} content_type = 'image/jpeg' .. doctest:: resumable-implicit-size >>> stream = io.BytesIO(data) >>> response = upload.initiate( ... transport, stream, metadata, content_type) >>> >>> upload.total_bytes == len(data) True If the size of the resource is **unknown** when the upload is initiated, the ``stream_final`` argument can be used. This might occur if the resource is being dynamically created on the client (e.g. application logs). To use this argument: .. testsetup:: resumable-unknown-size import io import mock import requests import http.client from google.resumable_media.requests import ResumableUpload upload_url = 'http://test.invalid' chunk_size = 3 * 1024 * 1024 # 3MB upload = ResumableUpload(upload_url, chunk_size) fake_response = requests.Response() fake_response.status_code = int(http.client.OK) fake_response._content = b'' resumable_url = 'http://test.invalid?upload_id=7up' fake_response.headers['location'] = resumable_url post_method = mock.Mock(return_value=fake_response, spec=[]) transport = mock.Mock(request=post_method, spec=['request']) metadata = {'name': 'some-file.jpg'} content_type = 'application/octet-stream' stream = io.BytesIO(b'data') .. doctest:: resumable-unknown-size >>> response = upload.initiate( ... transport, stream, metadata, content_type, ... stream_final=False) >>> >>> upload.total_bytes is None True Args: upload_url (str): The URL where the resumable upload will be initiated. chunk_size (int): The size of each chunk used to upload the resource. headers (Optional[Mapping[str, str]]): Extra headers that should be sent with the :meth:`initiate` request, e.g. headers for encrypted data. These **will not** be sent with :meth:`transmit_next_chunk` or :meth:`recover` requests. checksum Optional([str]): The type of checksum to compute to verify the integrity of the object. After the upload is complete, the server-computed checksum of the resulting object will be checked and google.resumable_media.common.DataCorruption will be raised on a mismatch. The corrupted file will not be deleted from the remote host automatically. Supported values are "md5", "crc32c" and None. The default is None. Attributes: upload_url (str): The URL where the content will be uploaded. Raises: ValueError: If ``chunk_size`` is not a multiple of :data:`.UPLOAD_CHUNK_SIZE`. """ def initiate( self, transport, stream, metadata, content_type, total_bytes=None, stream_final=True, timeout=( _request_helpers._DEFAULT_CONNECT_TIMEOUT, _request_helpers._DEFAULT_READ_TIMEOUT, ), ): """Initiate a resumable upload. By default, this method assumes your ``stream`` is in a "final" state ready to transmit. However, ``stream_final=False`` can be used to indicate that the size of the resource is not known. This can happen if bytes are being dynamically fed into ``stream``, e.g. if the stream is attached to application logs. If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be read from the stream every time :meth:`transmit_next_chunk` is called. If one of those reads produces strictly fewer bites than the chunk size, the upload will be concluded. Args: transport (~requests.Session): A ``requests`` object which can make authenticated requests. stream (IO[bytes]): The stream (i.e. file-like object) that will be uploaded. The stream **must** be at the beginning (i.e. ``stream.tell() == 0``). metadata (Mapping[str, str]): The resource metadata, such as an ACL list. content_type (str): The content type of the resource, e.g. a JPEG image has content type ``image/jpeg``. total_bytes (Optional[int]): The total number of bytes to be uploaded. If specified, the upload size **will not** be determined from the stream (even if ``stream_final=True``). stream_final (Optional[bool]): Indicates if the ``stream`` is "final" (i.e. no more bytes will be added to it). In this case we determine the upload size from the size of the stream. If ``total_bytes`` is passed, this argument will be ignored. timeout (Optional[Union[float, Tuple[float, float]]]): The number of seconds to wait for the server response. Depending on the retry strategy, a request may be repeated several times using the same timeout each time. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. Returns: ~requests.Response: The HTTP response returned by ``transport``. """ method, url, payload, headers = self._prepare_initiate_request( stream, metadata, content_type, total_bytes=total_bytes, stream_final=stream_final, ) # Wrap the request business logic in a function to be retried. def retriable_request(): result = transport.request( method, url, data=payload, headers=headers, timeout=timeout ) self._process_initiate_response(result) return result return _request_helpers.wait_and_retry( retriable_request, self._get_status_code, self._retry_strategy ) def transmit_next_chunk( self, transport, timeout=( _request_helpers._DEFAULT_CONNECT_TIMEOUT, _request_helpers._DEFAULT_READ_TIMEOUT, ), ): """Transmit the next chunk of the resource to be uploaded. If the current upload was initiated with ``stream_final=False``, this method will dynamically determine if the upload has completed. The upload will be considered complete if the stream produces fewer than :attr:`chunk_size` bytes when a chunk is read from it. In the case of failure, an exception is thrown that preserves the failed response: .. testsetup:: bad-response import io import mock import requests import http.client from google import resumable_media import google.resumable_media.requests.upload as upload_mod transport = mock.Mock(spec=['request']) fake_response = requests.Response() fake_response.status_code = int(http.client.BAD_REQUEST) transport.request.return_value = fake_response upload_url = 'http://test.invalid' upload = upload_mod.ResumableUpload( upload_url, resumable_media.UPLOAD_CHUNK_SIZE) # Fake that the upload has been initiate()-d data = b'data is here' upload._stream = io.BytesIO(data) upload._total_bytes = len(data) upload._resumable_url = 'http://test.invalid?upload_id=nope' .. doctest:: bad-response :options: +NORMALIZE_WHITESPACE >>> error = None >>> try: ... upload.transmit_next_chunk(transport) ... except resumable_media.InvalidResponse as caught_exc: ... error = caught_exc ... >>> error InvalidResponse('Request failed with status code', 400, 'Expected one of', , ) >>> error.response Args: transport (~requests.Session): A ``requests`` object which can make authenticated requests. timeout (Optional[Union[float, Tuple[float, float]]]): The number of seconds to wait for the server response. Depending on the retry strategy, a request may be repeated several times using the same timeout each time. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. Returns: ~requests.Response: The HTTP response returned by ``transport``. Raises: ~google.resumable_media.common.InvalidResponse: If the status code is not 200 or http.client.PERMANENT_REDIRECT. ~google.resumable_media.common.DataCorruption: If this is the final chunk, a checksum validation was requested, and the checksum does not match or is not available. """ method, url, payload, headers = self._prepare_request() # Wrap the request business logic in a function to be retried. def retriable_request(): result = transport.request( method, url, data=payload, headers=headers, timeout=timeout ) self._process_resumable_response(result, len(payload)) return result return _request_helpers.wait_and_retry( retriable_request, self._get_status_code, self._retry_strategy ) def recover(self, transport): """Recover from a failure and check the status of the current upload. This will verify the progress with the server and make sure the current upload is in a valid state before :meth:`transmit_next_chunk` can be used again. See https://cloud.google.com/storage/docs/performing-resumable-uploads#status-check for more information. This method can be used when a :class:`ResumableUpload` is in an :attr:`~ResumableUpload.invalid` state due to a request failure. Args: transport (~requests.Session): A ``requests`` object which can make authenticated requests. Returns: ~requests.Response: The HTTP response returned by ``transport``. """ timeout = ( _request_helpers._DEFAULT_CONNECT_TIMEOUT, _request_helpers._DEFAULT_READ_TIMEOUT, ) method, url, payload, headers = self._prepare_recover_request() # NOTE: We assume "payload is None" but pass it along anyway. # Wrap the request business logic in a function to be retried. def retriable_request(): result = transport.request( method, url, data=payload, headers=headers, timeout=timeout ) self._process_recover_response(result) return result return _request_helpers.wait_and_retry( retriable_request, self._get_status_code, self._retry_strategy ) class XMLMPUContainer(_request_helpers.RequestsMixin, _upload.XMLMPUContainer): """Initiate and close an upload using the XML MPU API. An XML MPU sends an initial request and then receives an upload ID. Using the upload ID, the upload is then done in numbered parts and the parts can be uploaded concurrently. In order to avoid concurrency issues with this container object, the uploading of individual parts is handled separately, by XMLMPUPart objects spawned from this container class. The XMLMPUPart objects are not necessarily in the same process as the container, so they do not update the container automatically. MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous given the JSON multipart upload, so the abbreviation "MPU" will be used throughout. See: https://cloud.google.com/storage/docs/multipart-uploads Args: upload_url (str): The URL of the object (without query parameters). The initiate, PUT, and finalization requests will all use this URL, with varying query parameters. headers (Optional[Mapping[str, str]]): Extra headers that should be sent with the :meth:`initiate` request, e.g. headers for encrypted data. These headers will be propagated to individual XMLMPUPart objects spawned from this container as well. Attributes: upload_url (str): The URL where the content will be uploaded. upload_id (Optional(int)): The ID of the upload from the initialization response. """ def initiate( self, transport, content_type, timeout=( _request_helpers._DEFAULT_CONNECT_TIMEOUT, _request_helpers._DEFAULT_READ_TIMEOUT, ), ): """Initiate an MPU and record the upload ID. Args: transport (object): An object which can make authenticated requests. content_type (str): The content type of the resource, e.g. a JPEG image has content type ``image/jpeg``. timeout (Optional[Union[float, Tuple[float, float]]]): The number of seconds to wait for the server response. Depending on the retry strategy, a request may be repeated several times using the same timeout each time. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. Returns: ~requests.Response: The HTTP response returned by ``transport``. """ method, url, payload, headers = self._prepare_initiate_request( content_type, ) # Wrap the request business logic in a function to be retried. def retriable_request(): result = transport.request( method, url, data=payload, headers=headers, timeout=timeout ) self._process_initiate_response(result) return result return _request_helpers.wait_and_retry( retriable_request, self._get_status_code, self._retry_strategy ) def finalize( self, transport, timeout=( _request_helpers._DEFAULT_CONNECT_TIMEOUT, _request_helpers._DEFAULT_READ_TIMEOUT, ), ): """Finalize an MPU request with all the parts. Args: transport (object): An object which can make authenticated requests. timeout (Optional[Union[float, Tuple[float, float]]]): The number of seconds to wait for the server response. Depending on the retry strategy, a request may be repeated several times using the same timeout each time. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. Returns: ~requests.Response: The HTTP response returned by ``transport``. """ method, url, payload, headers = self._prepare_finalize_request() # Wrap the request business logic in a function to be retried. def retriable_request(): result = transport.request( method, url, data=payload, headers=headers, timeout=timeout ) self._process_finalize_response(result) return result return _request_helpers.wait_and_retry( retriable_request, self._get_status_code, self._retry_strategy ) def cancel( self, transport, timeout=( _request_helpers._DEFAULT_CONNECT_TIMEOUT, _request_helpers._DEFAULT_READ_TIMEOUT, ), ): """Cancel an MPU request and permanently delete any uploaded parts. This cannot be undone. Args: transport (object): An object which can make authenticated requests. timeout (Optional[Union[float, Tuple[float, float]]]): The number of seconds to wait for the server response. Depending on the retry strategy, a request may be repeated several times using the same timeout each time. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. Returns: ~requests.Response: The HTTP response returned by ``transport``. """ method, url, payload, headers = self._prepare_cancel_request() # Wrap the request business logic in a function to be retried. def retriable_request(): result = transport.request( method, url, data=payload, headers=headers, timeout=timeout ) self._process_cancel_response(result) return result return _request_helpers.wait_and_retry( retriable_request, self._get_status_code, self._retry_strategy ) class XMLMPUPart(_request_helpers.RequestsMixin, _upload.XMLMPUPart): def upload( self, transport, timeout=( _request_helpers._DEFAULT_CONNECT_TIMEOUT, _request_helpers._DEFAULT_READ_TIMEOUT, ), ): """Upload the part. Args: transport (object): An object which can make authenticated requests. timeout (Optional[Union[float, Tuple[float, float]]]): The number of seconds to wait for the server response. Depending on the retry strategy, a request may be repeated several times using the same timeout each time. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. Returns: ~requests.Response: The HTTP response returned by ``transport``. """ method, url, payload, headers = self._prepare_upload_request() # Wrap the request business logic in a function to be retried. def retriable_request(): result = transport.request( method, url, data=payload, headers=headers, timeout=timeout ) self._process_upload_response(result) return result return _request_helpers.wait_and_retry( retriable_request, self._get_status_code, self._retry_strategy )