#----------------------------------------------------------------------------- # Copyright (c) 2012 - 2018, Anaconda, Inc. and Intake contributors # All rights reserved. # # The full license is in the LICENSE file, distributed with this software. #----------------------------------------------------------------------------- from . import base, import_name class TextFilesSource(base.DataSource): """Read textfiles as sequence of lines Prototype of sources reading sequential data. Takes a set of files, and returns an iterator over the text in each of them. The files can be local or remote. Extra parameters for encoding, etc., go into ``storage_options``. """ name = 'textfiles' version = '0.0.1' container = 'python' partition_access = True def __init__(self, urlpath, text_mode=True, text_encoding='utf8', compression=None, decoder=None, read=True, metadata=None, storage_options=None): """ Parameters ---------- urlpath : str or list(str) Target files. Can be a glob-path (with "*") and include protocol specified (e.g., "s3://"). Can also be a list of absolute paths. text_mode : bool Whether to open the file in text mode, recoding binary characters on the fly text_encoding : str If text_mode is True, apply this encoding. UTF* is by far the most common compression : str or None If given, decompress the file with the given codec on load. Can be something like "gzip", "bz2", or to try to guess from the filename, 'infer' decoder : function, str or None Use this to decode the contents of files. If None, you will get a list of lines of text/bytes. If a function, it must operate on an open file-like object or a bytes/str instance, and return a list read : bool If decoder is not None, this flag controls whether bytes/str get passed to the function indicated (True) or the open file-like object (False) storage_options: dict Options to pass to the file reader backend, including text-specific encoding arguments, and parameters specific to the remote file-system driver, if using. """ self._urlpath = urlpath self._storage_options = storage_options or {} self._dataframe = None self._files = None if isinstance(decoder, str): decoder = import_name(decoder) self.decoder = decoder self.compression = compression self.mode = 'rt' if text_mode else 'rb' self.encoding = text_encoding self._read = read super(TextFilesSource, self).__init__(metadata=metadata) def _get_schema(self): from fsspec import open_files if self._files is None: urlpath = self._get_cache(self._urlpath)[0] self._files = open_files( urlpath, mode=self.mode, encoding=self.encoding, compression=self.compression, **self._storage_options) self.npartitions = len(self._files) return base.Schema(dtype=None, shape=(None, ), npartitions=self.npartitions, extra_metadata=self.metadata) def _get_partition(self, i): return get_file(self._files[i], self.decoder, self._read) def read(self): self._get_schema() return self.to_dask().compute() def to_spark(self): from intake_spark.base import SparkHolder h = SparkHolder(False, [ ('textFile', (self._urlpath, )) ], {}) return h.setup() def to_dask(self): import dask.bag as db from dask import delayed self._get_schema() dfile = delayed(get_file) return db.from_delayed([dfile(f, self.decoder, self._read) for f in self._files]) def get_file(f, decoder, read): """Serializable function to take an OpenFile object and read lines""" with f as f: if decoder is None: return list(f) else: d = f.read() if read else f out = decoder(d) if isinstance(out, (tuple, list)): return out else: return [out]