a
    S/b                  
   @   s  d dl Z d dlmZ d dlmZ d dlmZmZmZ ddl	m
Z
 ddlmZ zd dlZW n eyn   dZY n0 d dlZd dlZd dlZd dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZm Z m!Z!m"Z"m#Z# ddl$m%Z% ddl&m'Z' ddl(m)Z) ddl*m*Z* ddl+m,Z,m-Z- ddl(m.Z. ddl+m/Z/ G dd dZ0d?ddZ1dd Z2d@ddZ3d d! Z4d"d# Z5d$d% Z6d&d' Z7e7 Z8dAd,d-Z9d.Z:d/d0 Z;e;ej<d1d2Z<e;ej=d3d4Z=e;ej>d5d6Z>dd7d8d9Z?dBd<d=Z@dd>l(mAZA e@jBeAj@_BdS )C    N)Mapping)BytesIO)catch_warningssimplefilterwarn   )HighLevelGraph)DataFrameIOLayer)compr)get_fs_token_paths)open)
open_files)infer_compression)CategoricalDtypeis_datetime64_any_dtypeis_float_dtypeis_integer_dtypeis_object_dtype)tokenize)
read_bytes)flatten)delayed)
asciitableparse_bytes   )new_dd_object)clear_known_categoriesc                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	CSVFunctionWrapperzg
    CSV Function-Wrapper Class
    Reads CSV data from disk to produce a partition (given a key).
    c
           
      C   s:   || _ || _|| _|| _|| _|| _|| _|| _|	| _d S N)	full_columnscolumnscolnameheadheaderreaderdtypesenforcekwargs)
selfr   r    r!   r"   r#   r$   r%   r&   r'    r)   4lib/python3.9/site-packages/dask/dataframe/io/csv.py__init__-   s    zCSVFunctionWrapper.__init__c              
      s\    fddj jD   jkr$S tj jj   jjfdd D jj	S )zUReturn a new CSVFunctionWrapper object with
        a sub-column projection.
        c                    s   g | ]}| v r|qS r)   r)   .0c)r    r)   r*   
<listcomp>H       z6CSVFunctionWrapper.project_columns.<locals>.<listcomp>c                    s   i | ]}| j | qS r)   )r%   r,   )r(   r)   r*   
<dictcomp>R   r0   z6CSVFunctionWrapper.project_columns.<locals>.<dictcomp>)	r"   r    r   r   r!   r#   r$   r&   r'   )r(   r    r)   )r    r(   r*   project_columnsC   s    
z"CSVFunctionWrapper.project_columnsc              
   C   s   |\}}}}|d ur6| j |tt| j| j  jjf}nd }d}| j }|sxd}|dd  |	ddd urx|dd  |s|dd  | j
}	d}
| jd ur| jrd}
n| j}	|	|d< t| j|| j|| j|	|| j|	}|
r|| j S |S )NFTskiprowsr#   r   
skipfooterZusecols)r!   sortedlistr"   cat
categoriesr'   copypopgetr   r    pandas_read_textr$   r#   r%   r&   )r(   partblockpathis_firstis_lastZ	path_infowrite_headerZrest_kwargsr    Zproject_after_readdfr)   r)   r*   __call__W   sJ    


zCSVFunctionWrapper.__call__N)__name__
__module____qualname____doc__r+   r2   rD   r)   r)   r)   r*   r   '   s   r   TFc	              
   C   s   t  }	|r"|| s"|	| |	| |	d | |	fi |}
|rTt|
| |r||r|t|
jt|kr|td|
j||r|\}}}|	|}|
j
f i |tjtt|
||i}
|
S )aa  Convert a block of bytes to a Pandas DataFrame

    Parameters
    ----------
    reader : callable
        ``pd.read_csv`` or ``pd.read_table``.
    b : bytestring
        The content to be parsed with ``reader``
    header : bytestring
        An optional header to prepend to ``b``
    kwargs : dict
        A dictionary of keyword arguments to be passed to ``reader``
    dtypes : dict
        dtypes to assign to columns
    path : tuple
        A tuple containing path column name, path to file, and an ordered list of paths.

    See Also
    --------
    dask.dataframe.csv.read_pandas_from_bytes
    r   zColumns do not match)r   
startswithrstripwriteseekcoerce_dtypesr6   r    
ValueErrorindexassignpdCategorical
from_codesnpZfulllen)r$   br#   r'   r%   r    rB   r&   r?   ZbiorC   r!   pathscoder)   r)   r*   r<      s"     





r<   c                 C   s  g }g }g }| j D ]}||v r| j| || kr| j| }|| }t|rdt|rd||||f qt|rt|r|| qz| | || | |< W q ty } z*||||f |||f W Y d}~qd}~0 0 q|rl|rd	dd t
|dd dD }	d|	 }
d	}nd	}
d
}t
|dd d}tg d|}dd	dd |D  }dj||
||d}nd}|r|rdnd}d	dd |D }dj||d}nd}|s|rdd }d|	td||g }t|dS )zCoerce dataframe to dtypes safely

    Operates in place

    Parameters
    ----------
    df: Pandas DataFrame
    dtypes: dict like {'x': float}
    N
c                 s   s"   | ]\}}d | d|V  qdS )z- z
  Nr)   )r-   r.   er)   r)   r*   	<genexpr>   s   z coerce_dtypes.<locals>.<genexpr>c                 S   s   t | d S Nr   strxr)   r)   r*   <lambda>   r0   zcoerce_dtypes.<locals>.<lambda>)keyzAThe following columns also raised exceptions on conversion:

%s

 zf

Alternatively, provide `assume_missing=True` to interpret
all unspecified integer columns as floats.c                 S   s   t | d S r\   r]   r_   r)   r)   r*   ra      r0   )ZColumnZFoundZExpectedz
dtype={%s}z	,
       c                 s   s$   | ]\}}}|d | dV  qdS )z: ''Nr)   )r-   kv_r)   r)   r*   r[      s   z{table}

{exceptions}Usually this is due to dask's dtype inference failing, and
*may* be fixed by specifying dtypes manually by adding:

{dtype_kw}

to the call to `read_csv`/`read_table`.{extra})table
exceptionsdtype_kwextraz also  c                 s   s   | ]}d | V  qdS )z- %sNr)   r,   r)   r)   r*   r[     r0   a  The following columns{also}failed to properly parse as dates:

{cols}

This is usually due to an invalid value in that column. To
diagnose and fix it's recommended to drop these columns from the
`parse_dates` keyword, and manually convert them to dates later
using `dd.to_datetime`.)alsocolsz

%s

z=-------------------------------------------------------------z=Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

%s)r    r%   r   r   appendr   r   astype	Exceptionjoinr5   r   formatfilterrN   )rC   r%   Z
bad_dtypesZ	bad_dateserrorsr.   actualZdesiredrZ   exri   rk   rh   rj   Z	dtype_msgrm   rn   Zdate_msgZrulemsgr)   r)   r*   rM      sh    


&


	rM   c
                    s  |j  }
|jdgdj}ttrBfdd|D }||}n|}|D ]}d|
|< qJt|j}tt	|}tt
|}tt|}|r|\} dd |D } r fdd|D }|jf i |tjtjt|tdt|i}||f}t|rt||d}g }|pd	\}}tt|D ]2}||| |r>|| nd
|| || g q"d}|t| |	|||| }t|||t|d
|||| |
||	|dd}t||i|t i}t|||dt|d  S )a  Convert blocks of bytes to a dask.dataframe

    This accepts a list of lists of values of bytes where each list corresponds
    to one file, and the value of bytes concatenate to comprise the entire
    file, in order.

    Parameters
    ----------
    reader : callable
        ``pd.read_csv`` or ``pd.read_table``.
    block_lists : list of lists of delayed values of bytes
        The lists of bytestrings where each list corresponds to one logical file
    header : bytestring
        The header, found at the front of the first file, to be prepended to
        all blocks
    head : pd.DataFrame
        An example Pandas DataFrame to be used for metadata.
    kwargs : dict
        Keyword arguments to pass down to ``reader``
    path : tuple, optional
        A tuple containing column name for path and the path_converter if provided

    Returns
    -------
    A dask.dataframe
    category)Zincludec                    s0   g | ](}t  |tr |jd ur|qS r   )
isinstancer;   r   r8   )r-   re   )specified_dtypesr)   r*   r/   N  s   z)text_blocks_to_pandas.<locals>.<listcomp>c                 S   s   g | ]}|d  j qS )   )r?   )r-   rV   r)   r)   r*   r/   e  r0   c                    s   g | ]} |qS r)   r)   )r-   p)path_converterr)   r*   r/   g  r0   )dtype)rn   )NNNz	read-csv-T)labelZproduces_tasksr   r|   )r%   Zto_dictZselect_dtypesr    rz   r   
differencer6   tupler   
block_maskblock_mask_lastrP   rQ   rR   rS   rT   ZzerosrU   intsetr   rangero   r   r	   r   r   r   )r$   block_listsr#   r"   r'   r&   r{   r?   	blocksizeurlpathr%   ZcategoricalsZknown_categoricalsZunknown_categoricalsre   r    Zblocksr@   rA   r!   rW   partsir   nameZlayerZgraphr)   )r~   r{   r*   text_blocks_to_pandas  sl    &





0r   c                 c   s6   | D ],}|sqdV  dd |dd D E dH  qdS )z
    Yields a flat iterable of booleans to mark the zeroth elements of the
    nested input ``block_lists`` in a flattened output.

    >>> list(block_mask([[1, 2], [3, 4], [5]]))
    [True, False, True, False, True]
    Tc                 s   s   | ]
}d V  qdS FNr)   r-   rg   r)   r)   r*   r[     r0   zblock_mask.<locals>.<genexpr>r|   Nr)   r   r>   r)   r)   r*   r     s
    r   c                 c   s6   | D ],}|sqdd |dd D E dH  dV  qdS )z
    Yields a flat iterable of booleans to mark the last element of the
    nested input ``block_lists`` in a flattened output.

    >>> list(block_mask_last([[1, 2], [3, 4], [5]]))
    [False, True, False, True, True]
    c                 s   s   | ]
}d V  qdS r   r)   r   r)   r)   r*   r[     r0   z"block_mask_last.<locals>.<genexpr>NTr)   r   r)   r)   r*   r     s
    r   c                 C   s"   d}t | | | }t|t dS )N
   g    A)r   min)Ztotal_memory	cpu_countZmemory_factorr   r)   r)   r*   auto_blocksize  s    r   c                  C   sd   d} t d ur`t , tdt t  j}t  }W d    n1 sD0    Y  |r`|r`t||S | S )Ni   ignore)psutilr   r   RuntimeWarningZvirtual_memorytotalr   r   )defaultZmemZcpur)   r)   r*   _infer_block_size  s    

&
r   r   infer  r   c           $      K   s  | j }|d ur$t|dkr$||d< nd}|
r:t|
tr:d}
d|v sJd|v rZtd| dd	D ]}||v r^t| d
| q^|dd rtd|t|dtr|d } }}nR|dd u rd } }}n6t|d}t	|}t
ttt|d t| }t|dtr.td| t|dtrZ|
rZ|d|
d }nd }|dkrt|d|	dd }t|d }|dkrt}t|trt|}|r|rtd|  d }|tvrtd| |r|r||k r|dkrtd |}| }t|f|||||
d|	p*i }|
rJ|\}}}|
|f}n|\}}d }t|d ttfsp|g}|du rt|d r|d d  }|dd }|d|d u rdnd }|d u rdnd}|dr^g }||D ]p}| |d}t|dkr:t|d dkrD||d    n
|| t||kr qnqn|||| }|sxdnt|t|d   }|dur||| k rt||krtdt|tr||7 }|d u rd n
|| | }| }| d!d  z| t!|fd|i|} W nD t"j#j$y^ }! z&d"t|!v rHtd#|! W Y d }!~!n
d }!~!0 0 |
r~|
| j%v r~td$|
 |d%i }"|"d u ri }"|rt|"tr| j%D ]2}#t&| |# j'r|#|"vr| |# (t)| |#< qd&d' |D }t*| ||| |||"|||d(
S ))Nr|   lineterminatorrY   r?   rO   Z	index_colz7Keywords 'index' and 'index_col' not supported. Use dd.z#(...).set_index('my-index') instead)iteratorZ	chunksizez not supported for dd.ZnrowszThe 'nrows' keyword is not supported by `dd.{0}`. To achieve the same behavior, it's recommended to use `dd.{0}(...).head(n=nrows)`r3   r   r#   z)List of header rows not supported for dd.Z
convertersr   rb)modestorage_optionsr   r   zWarning %s compression does not support breaking apart files
Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``z#Compression format %s not installedz}Unexpected behavior can result from passing skiprows when
blocksize is smaller than sample size.
Setting ``sample=blocksize``)Z	delimiterr   samplecompressionZinclude_pathFnamesZcommentr   zSample is not large enough to include at least one row of data. Please increase the number of bytes in `sample` in the call to `read_csv`/`read_table`r0   r4   ZEOFzEOF encountered while reading header. 
Pass argument `sample_rows` and make sure the value of `sample` is large enough to accommodate that many rows of datazFiles already contain the column name: %s, so the path column cannot use this name. Please set `include_path_column` to a unique name.r   c                 S   s   g | ]}d d |D qS )c                 S   s   g | ]}t |j qS r)   )r6   daskvalues)r-   Zdskr)   r)   r*   r/   }  r0   z*read_pandas.<locals>.<listcomp>.<listcomp>r)   )r-   r>   r)   r)   r*   r/   }  r0   zread_pandas.<locals>.<listcomp>)r&   r{   r?   r   r   )+rE   rU   rz   boolrN   r;   rs   r   r   maxr   r   r6   	TypeErrordictr   r   AUTO_BLOCKSIZEr^   r   r   r
   NotImplementedErrorencoder   r   computesplitdecodero   stripr9   r:   r   rQ   ru   ZParserErrorr    r   r   rp   floatr   )$r$   r   r   r   r   r   sample_rowsr&   assume_missingr   include_path_columnr'   reader_namekwr3   ZlastskiprowZfirstrowr~   rW   Zb_lineterminatorZb_outZb_sampler   r?   r   r#   Zneedr   r=   Zsplit_commentZnpartsZhead_kwargsr"   rZ   r{   r.   r)   r)   r*   read_pandas  s   
 


 



 &

r   a  
Read {file_type} files into a Dask.DataFrame

This parallelizes the :func:`pandas.{reader}` function in the following ways:

- It supports loading many files at once using globstrings:

    >>> df = dd.{reader}('myfiles.*.csv')  # doctest: +SKIP

- In some cases it can break up large files:

    >>> df = dd.{reader}('largefile.csv', blocksize=25e6)  # 25MB chunks  # doctest: +SKIP

- It can read CSV files from external resources (e.g. S3, HDFS) by
  providing a URL:

    >>> df = dd.{reader}('s3://bucket/myfiles.*.csv')  # doctest: +SKIP
    >>> df = dd.{reader}('hdfs:///myfiles.*.csv')  # doctest: +SKIP
    >>> df = dd.{reader}('hdfs://namenode.example.com/myfiles.*.csv')  # doctest: +SKIP

Internally ``dd.{reader}`` uses :func:`pandas.{reader}` and supports many of the
same keyword arguments with the same performance guarantees. See the docstring
for :func:`pandas.{reader}` for more information on available keyword arguments.

Parameters
----------
urlpath : string or list
    Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
    to read from alternative filesystems. To read from multiple files you
    can pass a globstring or a list of paths, with the caveat that they
    must all have the same protocol.
blocksize : str, int or None, optional
    Number of bytes by which to cut up larger files. Default value is computed
    based on available physical memory and the number of cores, up to a maximum
    of 64MB. Can be a number like ``64000000`` or a string like ``"64MB"``. If
    ``None``, a single block is used for each file.
sample : int, optional
    Number of bytes to use when determining dtypes
assume_missing : bool, optional
    If True, all integer columns that aren't specified in ``dtype`` are assumed
    to contain missing values, and are converted to floats. Default is False.
storage_options : dict, optional
    Extra options that make sense for a particular storage connection, e.g.
    host, port, username, password, etc.
include_path_column : bool or str, optional
    Whether or not to include the path to each particular file. If True a new
    column is added to the dataframe called ``path``. If str, sets new column
    name. Default is False.
**kwargs
    Extra keyword arguments to forward to :func:`pandas.{reader}`.

Notes
-----
Dask dataframe tries to infer the ``dtype`` of each column by reading a sample
from the start of the file (or of the first file if it's a glob). Usually this
works fine, but if the ``dtype`` is different later in the file (or in other
files) this can cause issues. For example, if all the rows in the sample had
integer dtypes, but later on there was a ``NaN``, then this would error at
compute time. To fix this, you have a few options:

- Provide explicit dtypes for the offending columns using the ``dtype``
  keyword. This is the recommended solution.

- Use the ``assume_missing`` keyword to assume that all columns inferred as
  integers contain missing values, and convert them to floats.

- Increase the size of the sample using the ``sample`` keyword.

It should also be noted that this function may fail if a {file_type} file
includes quoted strings that contain the line terminator. To get around this
you can specify ``blocksize=None`` to not split files into multiple partitions,
at the cost of reduced parallelism.
c              	      s(   d	 fdd	}t j||d|_||_|S )
Nr   r   r   r   Fc
                    s&   t  | f|||||||||	d	|
S )N)	r   r   r   r   r   r&   r   r   r   )r   )r   r   r   r   r   r   r&   r   r   r   r'   r$   r)   r*   read  s     zmake_reader.<locals>.read)r$   	file_type)	r   Nr   r   r   FFNF)READ_DOC_TEMPLATErs   rH   rE   )r$   r   r   r   r)   r   r*   make_reader  s             r   read_csvZCSV
read_tableZ	delimitedread_fwfzfixed-width)	depend_onc                K   sD   |"}| j |fi | W d    n1 s,0    Y  tj|jS r   )to_csvosr?   normpath)rC   Zfilr   r'   fr)   r)   r*   
_write_csv  s    0r   utf-8wtc                    s<  |r|durt d|
du r"|}
n|
s2|r2t dtf ||dd|	pFi }ttdd|  }|rt|fd|i|}t|jtj	j
jstd	 |d
 |fi  }|ddd }t|fd|i|}d d< |dd D ]}||fd|i }q|g}|g}ntt|f||| jd|}|d
 |d
 fi  g}|
rPd d< | fddt|dd |dd D  |r4|du rt }|durtd| dt |dur|ddur|d|krt d| d|d |dur|ddu r||d< d
dl}t|j|i |S |S dS )a  
    Store Dask DataFrame to CSV files

    One filename per partition will be created. You can specify the
    filenames in a variety of ways.

    Use a globstring::

    >>> df.to_csv('/path/to/data/export-*.csv')  # doctest: +SKIP

    The * will be replaced by the increasing sequence 0, 1, 2, ...

    ::

        /path/to/data/export-0.csv
        /path/to/data/export-1.csv

    Use a globstring and a ``name_function=`` keyword argument.  The
    name_function function should expect an integer and produce a string.
    Strings produced by name_function must preserve the order of their
    respective partition indices.

    >>> from datetime import date, timedelta
    >>> def name(i):
    ...     return str(date(2015, 1, 1) + i * timedelta(days=1))

    >>> name(0)
    '2015-01-01'
    >>> name(15)
    '2015-01-16'

    >>> df.to_csv('/path/to/data/export-*.csv', name_function=name)  # doctest: +SKIP

    ::

        /path/to/data/export-2015-01-01.csv
        /path/to/data/export-2015-01-02.csv
        ...

    You can also provide an explicit list of paths::

    >>> paths = ['/path/to/data/alice.csv', '/path/to/data/bob.csv', ...]  # doctest: +SKIP
    >>> df.to_csv(paths) # doctest: +SKIP

    Parameters
    ----------
    df : dask.DataFrame
        Data to save
    filename : string
        Path glob indicating the naming scheme for the output files
    single_file : bool, default False
        Whether to save everything into a single CSV file. Under the
        single file mode, each partition is appended at the end of the
        specified CSV file. Note that not all filesystems support the
        append mode and thus the single file mode, especially on cloud
        storage systems such as S3 or GCS. A warning will be issued when
        writing to a file that is not backed by a local filesystem.
    encoding : string, optional
        A string representing the encoding to use in the output file,
        defaults to 'ascii' on Python 2 and 'utf-8' on Python 3.
    mode : str
        Python write mode, default 'w'
    name_function : callable, default None
        Function accepting an integer (partition index) and producing a
        string to replace the asterisk in the given filename globstring.
        Should preserve the lexicographic order of partitions. Not
        supported when `single_file` is `True`.
    compression : string, optional
        a string representing the compression to use in the output file,
        allowed values are 'gzip', 'bz2', 'xz',
        only used when the first argument is a filename
    compute : bool
        If true, immediately executes. If False, returns a set of delayed
        objects, which can be computed at a later time.
    storage_options : dict
        Parameters passed on to the backend filesystem class.
    header_first_partition_only : boolean, default None
        If set to `True`, only write the header row in the first output
        file. By default, headers are written to all partitions under
        the multiple file mode (`single_file` is `False`) and written
        only once under the single file mode (`single_file` is `True`).
        It must not be `False` under the single file mode.
    compute_kwargs : dict, optional
        Options to be passed in to the compute method
    kwargs : dict, optional
        Additional parameters to pass to `pd.DataFrame.to_csv()`

    Returns
    -------
    The names of the file written if they were computed right away
    If not, the delayed tasks associated to the writing of the files

    Raises
    ------
    ValueError
        If `header_first_partition_only` is set to `False` or
        `name_function` is specified when `single_file` is `True`.
    Nz9name_function is not supported under the single file modezDheader_first_partition_only cannot be False in the single file mode.rc   )r   encodingnewlineF)Zpurer   z8Appending data to a network storage system may not work.r   war#   r|   r   )r   name_functionZnumc                    s"   g | ]\}}||fi  qS r)   r)   )r-   dr   r'   Zto_csv_chunkr)   r*   r/     r0   zto_csv.<locals>.<listcomp>zThe 'scheduler' keyword argument for `to_csv()` is deprecated andwill be removed in a future version. Please use the `compute_kwargs` argument instead. For example, df.to_csv(..., compute_kwargs={scheduler: z})	schedulerzJDiffering values for 'scheduler' have been passed in.
scheduler argument: z
via compute_kwargs: )rN   r   r   r   Z
to_delayed	open_filerz   ZfsfsspecZimplementationslocalZLocalFileSystemr   replacer   ZnpartitionsextendzipFutureWarningr;   r   r6   r   )rC   filenameZsingle_filer   r   r   r   r   r   r   Zheader_first_partition_onlyZcompute_kwargsr'   Zfile_optionsZdfsZ
first_filevalueZappend_modeZappend_filer   r   filesr   r)   r   r*   r     s    q(

	r   )_Frame)NNTFN)FNNNN)	r   Nr   r   r   FFNF)
Fr   r   NNTNNNN)Cr   collections.abcr   ior   warningsr   r   r   Zhighlevelgraphr   Zlayersr	   r   ImportErrorZfsspec.implementations.localr   ZnumpyrT   ZpandasrQ   Zfsspec.compressionr
   Zfsspec.corer   r   r   r   Zfsspec.utilsr   Zpandas.api.typesr   r   r   r   r   baser   bytesr   corer   r   Zutilsr   r   r   r   r   r<   rM   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rH   r)   r)   r)   r*   <module>   s   
n     
4a     
t         
 EK"	          
 >