import os from collections.abc import Mapping from io import BytesIO from warnings import catch_warnings, simplefilter, warn from ...highlevelgraph import HighLevelGraph from ...layers import DataFrameIOLayer try: import psutil except ImportError: psutil = None import fsspec.implementations.local import numpy as np import pandas as pd from fsspec.compression import compr from fsspec.core import get_fs_token_paths from fsspec.core import open as open_file from fsspec.core import open_files from fsspec.utils import infer_compression from pandas.api.types import ( CategoricalDtype, is_datetime64_any_dtype, is_float_dtype, is_integer_dtype, is_object_dtype, ) from ...base import tokenize from ...bytes import read_bytes from ...core import flatten from ...delayed import delayed from ...utils import asciitable, parse_bytes from ..core import new_dd_object from ..utils import clear_known_categories class CSVFunctionWrapper: """ CSV Function-Wrapper Class Reads CSV data from disk to produce a partition (given a key). """ def __init__( self, full_columns, columns, colname, head, header, reader, dtypes, enforce, kwargs, ): self.full_columns = full_columns self.columns = columns self.colname = colname self.head = head self.header = header self.reader = reader self.dtypes = dtypes self.enforce = enforce self.kwargs = kwargs def project_columns(self, columns): """Return a new CSVFunctionWrapper object with a sub-column projection. """ # Make sure columns is ordered correctly columns = [c for c in self.head.columns if c in columns] if columns == self.columns: return self return CSVFunctionWrapper( self.full_columns, columns, self.colname, self.head[columns], self.header, self.reader, {c: self.dtypes[c] for c in columns}, self.enforce, self.kwargs, ) def __call__(self, part): # Part will be a 3-element tuple block, path, is_first, is_last = part # Construct `path_info` if path is not None: path_info = ( self.colname, path, sorted(list(self.head[self.colname].cat.categories)), ) else: path_info = None # Deal with arguments that are special # for the first block of each file write_header = False rest_kwargs = self.kwargs.copy() if not is_first: write_header = True rest_kwargs.pop("skiprows", None) if rest_kwargs.get("header", 0) is not None: rest_kwargs.pop("header", None) if not is_last: rest_kwargs.pop("skipfooter", None) # Deal with column projection columns = self.full_columns project_after_read = False if self.columns is not None: if self.kwargs: # To be safe, if any kwargs are defined, avoid # changing `usecols` here. Instead, we can just # select columns after the read project_after_read = True else: columns = self.columns rest_kwargs["usecols"] = columns # Call `pandas_read_text` df = pandas_read_text( self.reader, block, self.header, rest_kwargs, self.dtypes, columns, write_header, self.enforce, path_info, ) if project_after_read: return df[self.columns] return df def pandas_read_text( reader, b, header, kwargs, dtypes=None, columns=None, write_header=True, enforce=False, path=None, ): """Convert a block of bytes to a Pandas DataFrame Parameters ---------- reader : callable ``pd.read_csv`` or ``pd.read_table``. b : bytestring The content to be parsed with ``reader`` header : bytestring An optional header to prepend to ``b`` kwargs : dict A dictionary of keyword arguments to be passed to ``reader`` dtypes : dict dtypes to assign to columns path : tuple A tuple containing path column name, path to file, and an ordered list of paths. See Also -------- dask.dataframe.csv.read_pandas_from_bytes """ bio = BytesIO() if write_header and not b.startswith(header.rstrip()): bio.write(header) bio.write(b) bio.seek(0) df = reader(bio, **kwargs) if dtypes: coerce_dtypes(df, dtypes) if enforce and columns and (list(df.columns) != list(columns)): raise ValueError("Columns do not match", df.columns, columns) if path: colname, path, paths = path code = paths.index(path) df = df.assign( **{colname: pd.Categorical.from_codes(np.full(len(df), code), paths)} ) return df def coerce_dtypes(df, dtypes): """Coerce dataframe to dtypes safely Operates in place Parameters ---------- df: Pandas DataFrame dtypes: dict like {'x': float} """ bad_dtypes = [] bad_dates = [] errors = [] for c in df.columns: if c in dtypes and df.dtypes[c] != dtypes[c]: actual = df.dtypes[c] desired = dtypes[c] if is_float_dtype(actual) and is_integer_dtype(desired): bad_dtypes.append((c, actual, desired)) elif is_object_dtype(actual) and is_datetime64_any_dtype(desired): # This can only occur when parse_dates is specified, but an # invalid date is encountered. Pandas then silently falls back # to object dtype. Since `object_array.astype(datetime)` will # silently overflow, error here and report. bad_dates.append(c) else: try: df[c] = df[c].astype(dtypes[c]) except Exception as e: bad_dtypes.append((c, actual, desired)) errors.append((c, e)) if bad_dtypes: if errors: ex = "\n".join( f"- {c}\n {e!r}" for c, e in sorted(errors, key=lambda x: str(x[0])) ) exceptions = ( "The following columns also raised exceptions on " "conversion:\n\n%s\n\n" ) % ex extra = "" else: exceptions = "" # All mismatches are int->float, also suggest `assume_missing=True` extra = ( "\n\nAlternatively, provide `assume_missing=True` " "to interpret\n" "all unspecified integer columns as floats." ) bad_dtypes = sorted(bad_dtypes, key=lambda x: str(x[0])) table = asciitable(["Column", "Found", "Expected"], bad_dtypes) dtype_kw = "dtype={%s}" % ",\n ".join( f"{k!r}: '{v}'" for (k, v, _) in bad_dtypes ) dtype_msg = ( "{table}\n\n" "{exceptions}" "Usually this is due to dask's dtype inference failing, and\n" "*may* be fixed by specifying dtypes manually by adding:\n\n" "{dtype_kw}\n\n" "to the call to `read_csv`/`read_table`." "{extra}" ).format(table=table, exceptions=exceptions, dtype_kw=dtype_kw, extra=extra) else: dtype_msg = None if bad_dates: also = " also " if bad_dtypes else " " cols = "\n".join("- %s" % c for c in bad_dates) date_msg = ( "The following columns{also}failed to properly parse as dates:\n\n" "{cols}\n\n" "This is usually due to an invalid value in that column. To\n" "diagnose and fix it's recommended to drop these columns from the\n" "`parse_dates` keyword, and manually convert them to dates later\n" "using `dd.to_datetime`." ).format(also=also, cols=cols) else: date_msg = None if bad_dtypes or bad_dates: rule = "\n\n%s\n\n" % ("-" * 61) msg = "Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.\n\n%s" % ( rule.join(filter(None, [dtype_msg, date_msg])) ) raise ValueError(msg) def text_blocks_to_pandas( reader, block_lists, header, head, kwargs, enforce=False, specified_dtypes=None, path=None, blocksize=None, urlpath=None, ): """Convert blocks of bytes to a dask.dataframe This accepts a list of lists of values of bytes where each list corresponds to one file, and the value of bytes concatenate to comprise the entire file, in order. Parameters ---------- reader : callable ``pd.read_csv`` or ``pd.read_table``. block_lists : list of lists of delayed values of bytes The lists of bytestrings where each list corresponds to one logical file header : bytestring The header, found at the front of the first file, to be prepended to all blocks head : pd.DataFrame An example Pandas DataFrame to be used for metadata. kwargs : dict Keyword arguments to pass down to ``reader`` path : tuple, optional A tuple containing column name for path and the path_converter if provided Returns ------- A dask.dataframe """ dtypes = head.dtypes.to_dict() # dtypes contains only instances of CategoricalDtype, which causes issues # in coerce_dtypes for non-uniform categories across partitions. # We will modify `dtype` (which is inferred) to # 1. contain instances of CategoricalDtypes for user-provided types # 2. contain 'category' for data inferred types categoricals = head.select_dtypes(include=["category"]).columns if isinstance(specified_dtypes, Mapping): known_categoricals = [ k for k in categoricals if isinstance(specified_dtypes.get(k), CategoricalDtype) and specified_dtypes.get(k).categories is not None ] unknown_categoricals = categoricals.difference(known_categoricals) else: unknown_categoricals = categoricals # Fixup the dtypes for k in unknown_categoricals: dtypes[k] = "category" columns = list(head.columns) blocks = tuple(flatten(block_lists)) # Create mask of first blocks from nested block_lists is_first = tuple(block_mask(block_lists)) is_last = tuple(block_mask_last(block_lists)) if path: colname, path_converter = path paths = [b[1].path for b in blocks] if path_converter: paths = [path_converter(p) for p in paths] head = head.assign( **{ colname: pd.Categorical.from_codes( np.zeros(len(head), dtype=int), set(paths) ) } ) path = (colname, paths) if len(unknown_categoricals): head = clear_known_categories(head, cols=unknown_categoricals) # Define parts parts = [] colname, paths = path or (None, None) for i in range(len(blocks)): parts.append([blocks[i], paths[i] if paths else None, is_first[i], is_last[i]]) # Create Blockwise layer label = "read-csv-" name = label + tokenize(reader, urlpath, columns, enforce, head, blocksize) layer = DataFrameIOLayer( name, columns, parts, CSVFunctionWrapper( columns, None, colname, head, header, reader, dtypes, enforce, kwargs, ), label=label, produces_tasks=True, ) graph = HighLevelGraph({name: layer}, {name: set()}) return new_dd_object(graph, name, head, (None,) * (len(blocks) + 1)) def block_mask(block_lists): """ Yields a flat iterable of booleans to mark the zeroth elements of the nested input ``block_lists`` in a flattened output. >>> list(block_mask([[1, 2], [3, 4], [5]])) [True, False, True, False, True] """ for block in block_lists: if not block: continue yield True yield from (False for _ in block[1:]) def block_mask_last(block_lists): """ Yields a flat iterable of booleans to mark the last element of the nested input ``block_lists`` in a flattened output. >>> list(block_mask_last([[1, 2], [3, 4], [5]])) [False, True, False, True, True] """ for block in block_lists: if not block: continue yield from (False for _ in block[:-1]) yield True def auto_blocksize(total_memory, cpu_count): memory_factor = 10 blocksize = int(total_memory // cpu_count / memory_factor) return min(blocksize, int(64e6)) def _infer_block_size(): default = 2**25 if psutil is not None: with catch_warnings(): simplefilter("ignore", RuntimeWarning) mem = psutil.virtual_memory().total cpu = psutil.cpu_count() if mem and cpu: return auto_blocksize(mem, cpu) return default # guess blocksize if psutil is installed or use acceptable default one if not AUTO_BLOCKSIZE = _infer_block_size() def read_pandas( reader, urlpath, blocksize="default", lineterminator=None, compression="infer", sample=256000, sample_rows=10, enforce=False, assume_missing=False, storage_options=None, include_path_column=False, **kwargs, ): reader_name = reader.__name__ if lineterminator is not None and len(lineterminator) == 1: kwargs["lineterminator"] = lineterminator else: lineterminator = "\n" if include_path_column and isinstance(include_path_column, bool): include_path_column = "path" if "index" in kwargs or "index_col" in kwargs: raise ValueError( "Keywords 'index' and 'index_col' not supported. " f"Use dd.{reader_name}(...).set_index('my-index') instead" ) for kw in ["iterator", "chunksize"]: if kw in kwargs: raise ValueError(f"{kw} not supported for dd.{reader_name}") if kwargs.get("nrows", None): raise ValueError( "The 'nrows' keyword is not supported by " "`dd.{0}`. To achieve the same behavior, it's " "recommended to use `dd.{0}(...)." "head(n=nrows)`".format(reader_name) ) if isinstance(kwargs.get("skiprows"), int): skiprows = lastskiprow = firstrow = kwargs.get("skiprows") elif kwargs.get("skiprows") is None: skiprows = lastskiprow = firstrow = 0 else: # When skiprows is a list, we expect more than max(skiprows) to # be included in the sample. This means that [0,2] will work well, # but [0, 440] might not work. skiprows = set(kwargs.get("skiprows")) lastskiprow = max(skiprows) # find the firstrow that is not skipped, for use as header firstrow = min(set(range(len(skiprows) + 1)) - set(skiprows)) if isinstance(kwargs.get("header"), list): raise TypeError(f"List of header rows not supported for dd.{reader_name}") if isinstance(kwargs.get("converters"), dict) and include_path_column: path_converter = kwargs.get("converters").get(include_path_column, None) else: path_converter = None # If compression is "infer", inspect the (first) path suffix and # set the proper compression option if the suffix is recongnized. if compression == "infer": # Translate the input urlpath to a simple path list paths = get_fs_token_paths(urlpath, mode="rb", storage_options=storage_options)[ 2 ] # Infer compression from first path compression = infer_compression(paths[0]) if blocksize == "default": blocksize = AUTO_BLOCKSIZE if isinstance(blocksize, str): blocksize = parse_bytes(blocksize) if blocksize and compression: # NONE of the compressions should use chunking warn( "Warning %s compression does not support breaking apart files\n" "Please ensure that each individual file can fit in memory and\n" "use the keyword ``blocksize=None to remove this message``\n" "Setting ``blocksize=None``" % compression ) blocksize = None if compression not in compr: raise NotImplementedError("Compression format %s not installed" % compression) if blocksize and sample and blocksize < sample and lastskiprow != 0: warn( "Unexpected behavior can result from passing skiprows when\n" "blocksize is smaller than sample size.\n" "Setting ``sample=blocksize``" ) sample = blocksize b_lineterminator = lineterminator.encode() b_out = read_bytes( urlpath, delimiter=b_lineterminator, blocksize=blocksize, sample=sample, compression=compression, include_path=include_path_column, **(storage_options or {}), ) if include_path_column: b_sample, values, paths = b_out path = (include_path_column, path_converter) else: b_sample, values = b_out path = None if not isinstance(values[0], (tuple, list)): values = [values] # If we have not sampled, then use the first row of the first values # as a representative sample. if b_sample is False and len(values[0]): b_sample = values[0][0].compute() # Get header row, and check that sample is long enough. If the file # contains a header row, we need at least 2 nonempty rows + the number of # rows to skip. names = kwargs.get("names", None) header = kwargs.get("header", "infer" if names is None else None) need = 1 if header is None else 2 if kwargs.get("comment"): # if comment is provided, step through lines of b_sample and strip out comments parts = [] for part in b_sample.split(b_lineterminator): split_comment = part.decode().split(kwargs.get("comment")) if len(split_comment) > 1: # if line starts with comment, don't include that line in parts. if len(split_comment[0]) > 0: parts.append(split_comment[0].strip().encode()) else: parts.append(part) if len(parts) > need: break else: parts = b_sample.split(b_lineterminator, lastskiprow + need) # If the last partition is empty, don't count it nparts = 0 if not parts else len(parts) - int(not parts[-1]) if sample is not False and nparts < lastskiprow + need and len(b_sample) >= sample: raise ValueError( "Sample is not large enough to include at least one " "row of data. Please increase the number of bytes " "in `sample` in the call to `read_csv`/`read_table`" ) if isinstance(header, int): firstrow += header header = b"" if header is None else parts[firstrow] + b_lineterminator # Use sample to infer dtypes and check for presence of include_path_column head_kwargs = kwargs.copy() head_kwargs.pop("skipfooter", None) try: head = reader(BytesIO(b_sample), nrows=sample_rows, **head_kwargs) except pd.errors.ParserError as e: if "EOF" in str(e): raise ValueError( "EOF encountered while reading header. \n" "Pass argument `sample_rows` and make sure the value of `sample` " "is large enough to accommodate that many rows of data" ) from e raise if include_path_column and (include_path_column in head.columns): raise ValueError( "Files already contain the column name: %s, so the " "path column cannot use this name. Please set " "`include_path_column` to a unique name." % include_path_column ) specified_dtypes = kwargs.get("dtype", {}) if specified_dtypes is None: specified_dtypes = {} # If specified_dtypes is a single type, then all columns were specified if assume_missing and isinstance(specified_dtypes, dict): # Convert all non-specified integer columns to floats for c in head.columns: if is_integer_dtype(head[c].dtype) and c not in specified_dtypes: head[c] = head[c].astype(float) values = [[list(dsk.dask.values()) for dsk in block] for block in values] return text_blocks_to_pandas( reader, values, header, head, kwargs, enforce=enforce, specified_dtypes=specified_dtypes, path=path, blocksize=blocksize, urlpath=urlpath, ) READ_DOC_TEMPLATE = """ Read {file_type} files into a Dask.DataFrame This parallelizes the :func:`pandas.{reader}` function in the following ways: - It supports loading many files at once using globstrings: >>> df = dd.{reader}('myfiles.*.csv') # doctest: +SKIP - In some cases it can break up large files: >>> df = dd.{reader}('largefile.csv', blocksize=25e6) # 25MB chunks # doctest: +SKIP - It can read CSV files from external resources (e.g. S3, HDFS) by providing a URL: >>> df = dd.{reader}('s3://bucket/myfiles.*.csv') # doctest: +SKIP >>> df = dd.{reader}('hdfs:///myfiles.*.csv') # doctest: +SKIP >>> df = dd.{reader}('hdfs://namenode.example.com/myfiles.*.csv') # doctest: +SKIP Internally ``dd.{reader}`` uses :func:`pandas.{reader}` and supports many of the same keyword arguments with the same performance guarantees. See the docstring for :func:`pandas.{reader}` for more information on available keyword arguments. Parameters ---------- urlpath : string or list Absolute or relative filepath(s). Prefix with a protocol like ``s3://`` to read from alternative filesystems. To read from multiple files you can pass a globstring or a list of paths, with the caveat that they must all have the same protocol. blocksize : str, int or None, optional Number of bytes by which to cut up larger files. Default value is computed based on available physical memory and the number of cores, up to a maximum of 64MB. Can be a number like ``64000000`` or a string like ``"64MB"``. If ``None``, a single block is used for each file. sample : int, optional Number of bytes to use when determining dtypes assume_missing : bool, optional If True, all integer columns that aren't specified in ``dtype`` are assumed to contain missing values, and are converted to floats. Default is False. storage_options : dict, optional Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc. include_path_column : bool or str, optional Whether or not to include the path to each particular file. If True a new column is added to the dataframe called ``path``. If str, sets new column name. Default is False. **kwargs Extra keyword arguments to forward to :func:`pandas.{reader}`. Notes ----- Dask dataframe tries to infer the ``dtype`` of each column by reading a sample from the start of the file (or of the first file if it's a glob). Usually this works fine, but if the ``dtype`` is different later in the file (or in other files) this can cause issues. For example, if all the rows in the sample had integer dtypes, but later on there was a ``NaN``, then this would error at compute time. To fix this, you have a few options: - Provide explicit dtypes for the offending columns using the ``dtype`` keyword. This is the recommended solution. - Use the ``assume_missing`` keyword to assume that all columns inferred as integers contain missing values, and convert them to floats. - Increase the size of the sample using the ``sample`` keyword. It should also be noted that this function may fail if a {file_type} file includes quoted strings that contain the line terminator. To get around this you can specify ``blocksize=None`` to not split files into multiple partitions, at the cost of reduced parallelism. """ def make_reader(reader, reader_name, file_type): def read( urlpath, blocksize="default", lineterminator=None, compression="infer", sample=256000, sample_rows=10, enforce=False, assume_missing=False, storage_options=None, include_path_column=False, **kwargs, ): return read_pandas( reader, urlpath, blocksize=blocksize, lineterminator=lineterminator, compression=compression, sample=sample, sample_rows=sample_rows, enforce=enforce, assume_missing=assume_missing, storage_options=storage_options, include_path_column=include_path_column, **kwargs, ) read.__doc__ = READ_DOC_TEMPLATE.format(reader=reader_name, file_type=file_type) read.__name__ = reader_name return read read_csv = make_reader(pd.read_csv, "read_csv", "CSV") read_table = make_reader(pd.read_table, "read_table", "delimited") read_fwf = make_reader(pd.read_fwf, "read_fwf", "fixed-width") def _write_csv(df, fil, *, depend_on=None, **kwargs): with fil as f: df.to_csv(f, **kwargs) return os.path.normpath(fil.path) def to_csv( df, filename, single_file=False, encoding="utf-8", mode="wt", name_function=None, compression=None, compute=True, scheduler=None, storage_options=None, header_first_partition_only=None, compute_kwargs=None, **kwargs, ): """ Store Dask DataFrame to CSV files One filename per partition will be created. You can specify the filenames in a variety of ways. Use a globstring:: >>> df.to_csv('/path/to/data/export-*.csv') # doctest: +SKIP The * will be replaced by the increasing sequence 0, 1, 2, ... :: /path/to/data/export-0.csv /path/to/data/export-1.csv Use a globstring and a ``name_function=`` keyword argument. The name_function function should expect an integer and produce a string. Strings produced by name_function must preserve the order of their respective partition indices. >>> from datetime import date, timedelta >>> def name(i): ... return str(date(2015, 1, 1) + i * timedelta(days=1)) >>> name(0) '2015-01-01' >>> name(15) '2015-01-16' >>> df.to_csv('/path/to/data/export-*.csv', name_function=name) # doctest: +SKIP :: /path/to/data/export-2015-01-01.csv /path/to/data/export-2015-01-02.csv ... You can also provide an explicit list of paths:: >>> paths = ['/path/to/data/alice.csv', '/path/to/data/bob.csv', ...] # doctest: +SKIP >>> df.to_csv(paths) # doctest: +SKIP Parameters ---------- df : dask.DataFrame Data to save filename : string Path glob indicating the naming scheme for the output files single_file : bool, default False Whether to save everything into a single CSV file. Under the single file mode, each partition is appended at the end of the specified CSV file. Note that not all filesystems support the append mode and thus the single file mode, especially on cloud storage systems such as S3 or GCS. A warning will be issued when writing to a file that is not backed by a local filesystem. encoding : string, optional A string representing the encoding to use in the output file, defaults to 'ascii' on Python 2 and 'utf-8' on Python 3. mode : str Python write mode, default 'w' name_function : callable, default None Function accepting an integer (partition index) and producing a string to replace the asterisk in the given filename globstring. Should preserve the lexicographic order of partitions. Not supported when `single_file` is `True`. compression : string, optional a string representing the compression to use in the output file, allowed values are 'gzip', 'bz2', 'xz', only used when the first argument is a filename compute : bool If true, immediately executes. If False, returns a set of delayed objects, which can be computed at a later time. storage_options : dict Parameters passed on to the backend filesystem class. header_first_partition_only : boolean, default None If set to `True`, only write the header row in the first output file. By default, headers are written to all partitions under the multiple file mode (`single_file` is `False`) and written only once under the single file mode (`single_file` is `True`). It must not be `False` under the single file mode. compute_kwargs : dict, optional Options to be passed in to the compute method kwargs : dict, optional Additional parameters to pass to `pd.DataFrame.to_csv()` Returns ------- The names of the file written if they were computed right away If not, the delayed tasks associated to the writing of the files Raises ------ ValueError If `header_first_partition_only` is set to `False` or `name_function` is specified when `single_file` is `True`. """ if single_file and name_function is not None: raise ValueError("name_function is not supported under the single file mode") if header_first_partition_only is None: header_first_partition_only = single_file elif not header_first_partition_only and single_file: raise ValueError( "header_first_partition_only cannot be False in the single file mode." ) file_options = dict( compression=compression, encoding=encoding, newline="", **(storage_options or {}), ) to_csv_chunk = delayed(_write_csv, pure=False) dfs = df.to_delayed() if single_file: first_file = open_file(filename, mode=mode, **file_options) if not isinstance(first_file.fs, fsspec.implementations.local.LocalFileSystem): warn("Appending data to a network storage system may not work.") value = to_csv_chunk(dfs[0], first_file, **kwargs) append_mode = mode.replace("w", "") + "a" append_file = open_file(filename, mode=append_mode, **file_options) kwargs["header"] = False for d in dfs[1:]: value = to_csv_chunk(d, append_file, depend_on=value, **kwargs) values = [value] files = [first_file] else: files = open_files( filename, mode=mode, name_function=name_function, num=df.npartitions, **file_options, ) values = [to_csv_chunk(dfs[0], files[0], **kwargs)] if header_first_partition_only: kwargs["header"] = False values.extend( [to_csv_chunk(d, f, **kwargs) for d, f in zip(dfs[1:], files[1:])] ) if compute: if compute_kwargs is None: compute_kwargs = dict() if scheduler is not None: warn( "The 'scheduler' keyword argument for `to_csv()` is deprecated and" "will be removed in a future version. " "Please use the `compute_kwargs` argument instead. " f"For example, df.to_csv(..., compute_kwargs={{scheduler: {scheduler}}})", FutureWarning, ) if ( scheduler is not None and compute_kwargs.get("scheduler") is not None and compute_kwargs.get("scheduler") != scheduler ): raise ValueError( f"Differing values for 'scheduler' have been passed in.\n" f"scheduler argument: {scheduler}\n" f"via compute_kwargs: {compute_kwargs.get('scheduler')}" ) if scheduler is not None and compute_kwargs.get("scheduler") is None: compute_kwargs["scheduler"] = scheduler import dask return list(dask.compute(*values, **compute_kwargs)) else: return values from ..core import _Frame _Frame.to_csv.__doc__ = to_csv.__doc__