from io import SEEK_SET, UnsupportedOperation from os import PathLike from pathlib import Path from typing import Any, BinaryIO, Callable, Dict, Mapping, Union, cast from .. import ( BrokenResourceError, ClosedResourceError, EndOfStream, TypedAttributeSet, to_thread, typed_attribute) from ..abc import ByteReceiveStream, ByteSendStream class FileStreamAttribute(TypedAttributeSet): #: the open file descriptor file: BinaryIO = typed_attribute() #: the path of the file on the file system, if available (file must be a real file) path: Path = typed_attribute() #: the file number, if available (file must be a real file or a TTY) fileno: int = typed_attribute() class _BaseFileStream: def __init__(self, file: BinaryIO): self._file = file async def aclose(self) -> None: await to_thread.run_sync(self._file.close) @property def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: attributes: Dict[Any, Callable[[], Any]] = { FileStreamAttribute.file: lambda: self._file, } if hasattr(self._file, 'name'): attributes[FileStreamAttribute.path] = lambda: Path(self._file.name) try: self._file.fileno() except UnsupportedOperation: pass else: attributes[FileStreamAttribute.fileno] = lambda: self._file.fileno() return attributes class FileReadStream(_BaseFileStream, ByteReceiveStream): """ A byte stream that reads from a file in the file system. :param file: a file that has been opened for reading in binary mode .. versionadded:: 3.0 """ @classmethod async def from_path(cls, path: Union[str, 'PathLike[str]']) -> 'FileReadStream': """ Create a file read stream by opening the given file. :param path: path of the file to read from """ file = await to_thread.run_sync(Path(path).open, 'rb') return cls(cast(BinaryIO, file)) async def receive(self, max_bytes: int = 65536) -> bytes: try: data = await to_thread.run_sync(self._file.read, max_bytes) except ValueError: raise ClosedResourceError from None except OSError as exc: raise BrokenResourceError from exc if data: return data else: raise EndOfStream async def seek(self, position: int, whence: int = SEEK_SET) -> int: """ Seek the file to the given position. .. seealso:: :meth:`io.IOBase.seek` .. note:: Not all file descriptors are seekable. :param position: position to seek the file to :param whence: controls how ``position`` is interpreted :return: the new absolute position :raises OSError: if the file is not seekable """ return await to_thread.run_sync(self._file.seek, position, whence) async def tell(self) -> int: """ Return the current stream position. .. note:: Not all file descriptors are seekable. :return: the current absolute position :raises OSError: if the file is not seekable """ return await to_thread.run_sync(self._file.tell) class FileWriteStream(_BaseFileStream, ByteSendStream): """ A byte stream that writes to a file in the file system. :param file: a file that has been opened for writing in binary mode .. versionadded:: 3.0 """ @classmethod async def from_path(cls, path: Union[str, 'PathLike[str]'], append: bool = False) -> 'FileWriteStream': """ Create a file write stream by opening the given file for writing. :param path: path of the file to write to :param append: if ``True``, open the file for appending; if ``False``, any existing file at the given path will be truncated """ mode = 'ab' if append else 'wb' file = await to_thread.run_sync(Path(path).open, mode) return cls(cast(BinaryIO, file)) async def send(self, item: bytes) -> None: try: await to_thread.run_sync(self._file.write, item) except ValueError: raise ClosedResourceError from None except OSError as exc: raise BrokenResourceError from exc