a
    S/biw                  
   @   s   d dl Z d dlZd dlZd dlZd dlmZ ddlm	Z	 ddlm
Z
 ddlmZmZ d#ddZd$ddZdd Zd%ddZd&ddZedddZd'ddZd(eeeedddZe	dd d!d"ZdS ))    N)tokenize   )delayed   )methods   )from_delayedfrom_pandas256 MiB   c
                 K   sh  ddl }t|ts*tdtt| d |du r:tdt|t|j|jjj	fsftdtt| |dks|du r~td|du r|du rtd|r|rtd	|	du ri n|	}	|j
|fi |	}t|tr||n||j|j}|j|
d
< |dkrf| |}tj||fi |
}t|dkr8t|ddS |jddd | }|du rf|jdd }|du r|du r|j|jj||jj|g|  }t||}|jd \}}|jd }n|\}}t|j}|du rD|j|jj|g|  }t||d d }tt|| t j!"| pBd}|j#dkrt$%tj&||d|| ' |  d}||d< ||d< n2|j#dv rt()|||d % }ntd*|g }|dd |dd  }}t+t,||D ]j\}\}}|t|d kr||kn||k }| -|j.||k|}|/t0t1|||fd|	i|
 q|2  t3|||dS )ar	  
    Read SQL query into a DataFrame.

    If neither ``divisions`` or ``npartitions`` is given, the memory footprint of the
    first few rows will be determined, and partitions of size ~256MB will
    be used.

    Parameters
    ----------
    sql : SQLAlchemy Selectable
        SQL query to be executed. TextClause is not supported
    con : str
        Full sqlalchemy URI for the database connection
    index_col : str
        Column which becomes the index, and defines the partitioning. Should
        be a indexed column in the SQL server, and any orderable type. If the
        type is number or time, then partition boundaries can be inferred from
        ``npartitions`` or ``bytes_per_chunk``; otherwise must supply explicit
        ``divisions``.
    divisions: sequence
        Values of the index column to split the table by. If given, this will
        override ``npartitions`` and ``bytes_per_chunk``. The divisions are the value
        boundaries of the index column used to define the partitions. For
        example, ``divisions=list('acegikmoqsuwz')`` could be used to partition
        a string column lexographically into 12 partitions, with the implicit
        assumption that each partition contains similar numbers of records.
    npartitions : int
        Number of partitions, if ``divisions`` is not given. Will split the values
        of the index column linearly between ``limits``, if given, or the column
        max/min. The index column must be numeric or time for this to work
    limits: 2-tuple or None
        Manually give upper and lower range of values for use with ``npartitions``;
        if None, first fetches max/min from the DB. Upper limit, if
        given, is inclusive.
    bytes_per_chunk : str or int
        If both ``divisions`` and ``npartitions`` is None, this is the target size of
        each partition, in bytes
    head_rows : int
        How many rows to load for inferring the data-types, and memory per row
    meta : empty DataFrame or None
        If provided, do not attempt to infer dtypes, but use these, coercing
        all chunks on load
    engine_kwargs : dict or None
        Specific db engine parameters for sqlalchemy
    kwargs : dict
        Additional parameters to pass to `pd.read_sql()`

    Returns
    -------
    dask.dataframe

    See Also
    --------
    read_sql_table : Read SQL database table into a DataFrame.
    r   Nz'con' must be of type str, not 8Note: Dask does not support SQLAlchemy connectables here)Must specify index column to partition onz2'index_col' must be of type str or sa.Column, not z'Must provide 'meta' if 'head_rows' is 0z=Must provide 'divisions' or 'npartitions' if 'head_rows' is 0z9Must supply either 'divisions' or 'npartitions', not both	index_colr   npartitionsTZdeepindexmax_1count_1M%iSstartendZfreqiufwProvided index column is of type "{}".  If divisions is not provided the index column type must be numeric or datetime.engine_kwargs	divisions)4
sqlalchemy
isinstancestr	TypeErrortype
ValueErrorColumnsqlelementsZColumnClausecreate_enginenamelimitpdread_sqllenr	   memory_usagesumilocselectfuncmaxminselect_fromZsubquerydtypesSeriesdtypecountintrounddaskutilsparse_byteskindr   tolist
date_rangetotal_secondsnplinspaceformat	enumeratezipwhereand_appendr   _read_sql_chunkdisposer   )r*   conr   r"   r   limitsbytes_per_chunk	head_rowsmetar    kwargssaenginer   qheadbytes_per_rowminmaxmaximinir<   r=   partslowersuppersr   loweruppercond re   4lib/python3.9/site-packages/dask/dataframe/io/sql.pyread_sql_query   s    D








"
"rg   c                    s0  ddl  ddl m} d|v r2tdt |dd|v rPtdt |d}d}ttsxtd	t	t d
}|dur|D ]$}t| j
tfstdt d
}qt stjdtd |st stf |||||||||	|
|d|S t|tstdtt	| d |du r"i n|} j|fi |}  }ttrb j|d
||	dntdtt	 |  |r fdd|D n fddjD }t|tr̈ 
|j| j	n 
|j|j	}|jdd |D vr|| ||}tf |||||||||
|d
|S )a  
    Read SQL database table into a DataFrame.

    If neither ``divisions`` or ``npartitions`` is given, the memory footprint of the
    first few rows will be determined, and partitions of size ~256MB will
    be used.

    Parameters
    ----------
    table_name : str
        Name of SQL table in database.
    con : str
        Full sqlalchemy URI for the database connection
    index_col : str
        Column which becomes the index, and defines the partitioning. Should
        be a indexed column in the SQL server, and any orderable type. If the
        type is number or time, then partition boundaries can be inferred from
        ``npartitions`` or ``bytes_per_chunk``; otherwise must supply explicit
        ``divisions``.
    columns : sequence of str or SqlAlchemy column or None
        Which columns to select; if None, gets all. Note can be a mix of str and SqlAlchemy columns
    schema : str or None
        Pass this to sqlalchemy to select which DB schema to use within the
        URI connection
    divisions: sequence
        Values of the index column to split the table by. If given, this will
        override ``npartitions`` and ``bytes_per_chunk``. The divisions are the value
        boundaries of the index column used to define the partitions. For
        example, ``divisions=list('acegikmoqsuwz')`` could be used to partition
        a string column lexographically into 12 partitions, with the implicit
        assumption that each partition contains similar numbers of records.
    npartitions : int
        Number of partitions, if ``divisions`` is not given. Will split the values
        of the index column linearly between ``limits``, if given, or the column
        max/min. The index column must be numeric or time for this to work
    limits: 2-tuple or None
        Manually give upper and lower range of values for use with ``npartitions``;
        if None, first fetches max/min from the DB. Upper limit, if
        given, is inclusive.
    bytes_per_chunk : str or int
        If both ``divisions`` and ``npartitions`` is None, this is the target size of
        each partition, in bytes
    head_rows : int
        How many rows to load for inferring the data-types, and memory per row
    meta : empty DataFrame or None
        If provided, do not attempt to infer dtypes, but use these, coercing
        all chunks on load
    engine_kwargs : dict or None
        Specific db engine parameters for sqlalchemy
    kwargs : dict
        Additional parameters to pass to `pd.read_sql()`

    Returns
    -------
    dask.dataframe

    See Also
    --------
    read_sql_query : Read SQL query into a DataFrame.

    Examples
    --------
    >>> df = dd.read_sql_table('accounts', 'sqlite:///path/to/bank.db',
    ...                  npartitions=10, index_col='id')  # doctest: +SKIP
    r   Nr*   tablezWThe `table` keyword has been replaced by `table_name`. Please use `table_name` instead.urizGThe `uri` keyword has been replaced by `con`. Please use `con` instead.Fz`read_sql_table` will no longer support {}; please use a `table_name` of type str instead or use `read_sql_query`, if you are using a SQLAlchemy queryTz\`columns` will no longer support SQLAlchemy selectables; please use `read_sql_query` insteadzDask will soon require SQLAlchemy 1.4 or newer. Please update your SQLAlchemy version. Friendly note: Upgrading to SQLAlchemy 1.4 may brake code. Do it with caution. )category)ri   rj   r   r"   r   rR   columnsrS   rT   schemarU   r    z`con` must be of type str, not r   ZautoloadZautoload_withrm   z&`table_name` must be of type str, not c                    s:   g | ]2}t |tr& |j| jn |j|jqS re   )r$   r%   r)   rl   r'   r-   .0crW   
table_namere   rf   
<listcomp>\  s   z"read_sql_table.<locals>.<listcomp>c                    s   g | ]}  |j|jqS re   )r)   r-   r'   ro   )rW   re   rf   rt   e      c                 S   s   g | ]
}|j qS re   )r-   ro   re   re   rf   rt   m  ru   )
r*   rQ   r   r"   r   rR   rS   rT   rU   r    )r#   r*   warningswarnDeprecationWarningpopr$   r%   rI   r'   r)   _gt14_old_read_sql_tabler&   r,   MetaDataTablerP   rl   r-   rN   r5   r9   rg   )rs   rQ   r   r"   r   rR   rl   rS   rT   rm   rU   r    rV   r*   Zdeprecated_argscolrX   mr   queryre   rr   rf   read_sql_table   s    P





	
r   c                 K   s6   t | trt| ||fi |S t| ||fi |S dS )a  
    Read SQL query or database table into a DataFrame.

    This function is a convenience wrapper around ``read_sql_table`` and
    ``read_sql_query``. It will delegate to the specific function depending
    on the provided input. A SQL query will be routed to ``read_sql_query``,
    while a database table name will be routed to ``read_sql_table``.
    Note that the delegated function might have more specific notes about
    their functionality not listed here.

    Parameters
    ----------
    sql : str or SQLAlchemy Selectable
        Name of SQL table in database or SQL query to be executed. TextClause is not supported
    con : str
        Full sqlalchemy URI for the database connection
    index_col : str
        Column which becomes the index, and defines the partitioning. Should
        be a indexed column in the SQL server, and any orderable type. If the
        type is number or time, then partition boundaries can be inferred from
        ``npartitions`` or ``bytes_per_chunk``; otherwise must supply explicit
        ``divisions``.

    Returns
    -------
    dask.dataframe

    See Also
    --------
    read_sql_table : Read SQL database table into a DataFrame.
    read_sql_query : Read SQL query into a DataFrame.
    N)r$   r%   r   rg   )r*   rQ   r   rV   re   re   rf   r0     s    !
r0   c                 K   s|   dd l }|pi }|j|fi |}tj| |fi |}|  t|dkrN|S t|j dkrd|S |j|j ddS d S )Nr   F)copy)	r#   r,   r/   r0   rP   r1   r:   Zto_dictZastype)rY   rj   rU   r    rV   rW   rX   dfre   re   rf   rO     s    rO   c                 K   sB   dd l }|pi }|j|fi |}| jf d|i|}|  |S )Nr   rQ   )r#   r,   to_sqlrP   )drj   r    rV   rW   rX   rY   re   re   rf   _to_sql_chunk  s    r   )returnc                  C   sP   ddl } | jdsD| jdsD| jdsD| jdsD| jdrHdS d	S dS )
ze
    Check if sqlalchemy.__version__ is at least 1.4.0, when several
    deprecations were made.
    r   Nz0.z1.0z1.1z1.2z1.3FT)r#   __version__
startswith)r#   re   re   rf   rz     s    




rz   c           #         s  ddl }ddl m} ddlm} tdt |du r<td|du rHi n|}|j|fi |}|	 }t
 tr|j |d||	d t
|tr j| n|}t
|t|jfstd	| |r|rtd
|rއ fdd|D nt j}||vr|| t
|tr||d< n
|j|d< |dkr||| }tj||fi |}t|dkr j} j}	tj|||	|d}t|ddS |jddd | }|
du r|jdd }
n0|
du rtdn|du r|du rtd|du r(|du rD||j ||j!|g }t||}|jd \}}|j"d }n|\}}t#|j$}|du r||j%|g }t||d d }t&t'|| t(j)*| pd}|j+dkrt,-tj.||d|| / |  d}||d< ||d< n2|j+dv rt01|||d - }ntd2|g }|dd |dd  }}t3t4||D ]t\}\} }!|t|d krz||!kn||!k }"||5|6|| k|" }|t7t8|||
fd|i| qT|9  t:||
|dS )aY  
    Create dataframe from an SQL table.
    If neither divisions or npartitions is given, the memory footprint of the
    first few rows will be determined, and partitions of size ~256MB will
    be used.
    Parameters
    ----------
    table : string or sqlalchemy expression
        Select columns from here.
    uri : string
        Full sqlalchemy URI for the database connection
    index_col : string
        Column which becomes the index, and defines the partitioning. Should
        be a indexed column in the SQL server, and any orderable type. If the
        type is number or time, then partition boundaries can be inferred from
        npartitions or bytes_per_chunk; otherwide must supply explicit
        ``divisions=``.
        ``index_col`` could be a function to return a value, e.g.,
        ``sql.func.abs(sql.column('value')).label('abs(value)')``.
        ``index_col=sql.func.abs(sql.column("value")).label("abs(value)")``, or
        ``index_col=cast(sql.column("id"),types.BigInteger).label("id")`` to convert
        the textfield ``id`` to ``BigInteger``.
        Note ``sql``, ``cast``, ``types`` methods comes from ``sqlalchemy`` module.
        Labeling columns created by functions or arithmetic operations is
        required.
    divisions: sequence
        Values of the index column to split the table by. If given, this will
        override npartitions and bytes_per_chunk. The divisions are the value
        boundaries of the index column used to define the partitions. For
        example, ``divisions=list('acegikmoqsuwz')`` could be used to partition
        a string column lexographically into 12 partitions, with the implicit
        assumption that each partition contains similar numbers of records.
    npartitions : int
        Number of partitions, if divisions is not given. Will split the values
        of the index column linearly between limits, if given, or the column
        max/min. The index column must be numeric or time for this to work
    limits: 2-tuple or None
        Manually give upper and lower range of values for use with npartitions;
        if None, first fetches max/min from the DB. Upper limit, if
        given, is inclusive.
    columns : list of strings or None
        Which columns to select; if None, gets all; can include sqlalchemy
        functions, e.g.,
        ``sql.func.abs(sql.column('value')).label('abs(value)')``.
        Labeling columns created by functions or arithmetic operations is
        recommended.
    bytes_per_chunk : str, int
        If both divisions and npartitions is None, this is the target size of
        each partition, in bytes
    head_rows : int
        How many rows to load for inferring the data-types, unless passing meta
    meta : empty DataFrame or None
        If provided, do not attempt to infer dtypes, but use these, coercing
        all chunks on load
    schema : str or None
        If using a table name, pass this to sqlalchemy to select which DB
        schema to use within the URI connection
    engine_kwargs : dict or None
        Specific db engine parameters for sqlalchemy
    kwargs : dict
        Additional parameters to pass to `pd.read_sql()`
    Returns
    -------
    dask.dataframe
    Examples
    --------
    >>> df = dd.read_sql_table('accounts', 'sqlite:///path/to/bank.db',
    ...                  npartitions=10, index_col='id')  # doctest: +SKIP
    r   Nrh   )r+   a  You are using a compatibility version of `read_sql_table` that will be removed in a future version of dask. This function existst to support old versions of SQLAlchemy (< 1.4). This compatibility function is less stable than the new version. We recommend you update your code.r   Trn   z?Use label when passing an SQLAlchemy instance as the index (%s)z5Must supply either divisions or npartitions, not bothc                    s$   g | ]}t |tr j| n|qS re   )r$   r%   rl   ro   ri   re   rf   rt   J  ru   z'_old_read_sql_table.<locals>.<listcomp>r   )rm   r   r   r   r   z#Must provide meta if head_rows is 0z?Must provide divisions or npartitions when using explicit meta.r   r   r   r   r   r   r   r   r    r!   );r#   r*   Zsqlalchemy.sqlr+   rv   rw   rx   r(   r,   r|   r$   r%   r}   rl   ZLabelr&   listrN   r-   r5   r.   r9   r/   r0   r1   rm   r   r	   r2   r3   r4   r6   r7   r8   r:   r;   r<   r=   r>   r?   r@   rA   rB   rC   r   rD   rE   rF   rG   rH   rI   rJ   rK   rL   rM   r   rO   rP   r   )#ri   rj   r   r"   r   rR   rl   rS   rT   rm   rU   r    rV   rW   r*   r+   rX   r   r   rY   rZ   r-   r[   r\   r]   r^   r<   r=   r_   r`   ra   r   rb   rc   rd   re   r   rf   r{     s    T











""r{   failTF)r-   rj   	if_existsr   c                    s   t |tstdt| dt||||||||||	d
}tt| jfi | t|dd|rz fdd|  D }nRg } }|  D ]@}|	t
t|fd|id	d
t|fi  i |d }qt|}|
rt| n|S dS )a  Store Dask Dataframe to a SQL table

    An empty table is created based on the "meta" DataFrame (and conforming to the caller's "if_exists" preference), and
    then each block calls pd.DataFrame.to_sql (with `if_exists="append"`).

    Databases supported by SQLAlchemy [1]_ are supported. Tables can be
    newly created, appended to, or overwritten.

    Parameters
    ----------
    name : str
        Name of SQL table.
    uri : string
        Full sqlalchemy URI for the database connection
    schema : str, optional
        Specify the schema (if database flavor supports this). If None, use
        default schema.
    if_exists : {'fail', 'replace', 'append'}, default 'fail'
        How to behave if the table already exists.

        * fail: Raise a ValueError.
        * replace: Drop the table before inserting new values.
        * append: Insert new values to the existing table.

    index : bool, default True
        Write DataFrame index as a column. Uses `index_label` as the column
        name in the table.
    index_label : str or sequence, default None
        Column label for index column(s). If None is given (default) and
        `index` is True, then the index names are used.
        A sequence should be given if the DataFrame uses MultiIndex.
    chunksize : int, optional
        Specify the number of rows in each batch to be written at a time.
        By default, all rows will be written at once.
    dtype : dict or scalar, optional
        Specifying the datatype for columns. If a dictionary is used, the
        keys should be the column names and the values should be the
        SQLAlchemy types or strings for the sqlite3 legacy mode. If a
        scalar is provided, it will be applied to all columns.
    method : {None, 'multi', callable}, optional
        Controls the SQL insertion clause used:

        * None : Uses standard SQL ``INSERT`` clause (one per row).
        * 'multi': Pass multiple values in a single ``INSERT`` clause.
        * callable with signature ``(pd_table, conn, keys, data_iter)``.

        Details and a sample callable implementation can be found in the
        section :ref:`insert method <io.sql.method>`.
    compute : bool, default True
        When true, call dask.compute and perform the load into SQL; otherwise, return a Dask object (or array of
        per-block objects when parallel=True)
    parallel : bool, default False
        When true, have each block append itself to the DB table concurrently. This can result in DB rows being in a
        different order than the source DataFrame's corresponding rows. When false, load each block into the SQL DB in
        sequence.
    engine_kwargs : dict or None
        Specific db engine parameters for sqlalchemy

    Raises
    ------
    ValueError
        When the table already exists and `if_exists` is 'fail' (the
        default).

    See Also
    --------
    read_sql : Read a DataFrame from a table.

    Notes
    -----
    Timezone aware datetime columns will be written as
    ``Timestamp with timezone`` type with SQLAlchemy if supported by the
    database. Otherwise, the datetimes will be stored as timezone unaware
    timestamps local to the original timezone.

    .. versionadded:: 0.24.0

    References
    ----------
    .. [1] https://docs.sqlalchemy.org
    .. [2] https://www.python.org/dev/peps/pep-0249/

    Examples
    --------
    Create a table from scratch with 4 rows.

    >>> import pandas as pd
    >>> df = pd.DataFrame([ {'i':i, 's':str(i)*2 } for i in range(4) ])
    >>> from dask.dataframe import from_pandas
    >>> ddf = from_pandas(df, npartitions=2)
    >>> ddf  # doctest: +SKIP
    Dask DataFrame Structure:
                       i       s
    npartitions=2
    0              int64  object
    2                ...     ...
    3                ...     ...
    Dask Name: from_pandas, 2 tasks

    >>> from dask.utils import tmpfile
    >>> from sqlalchemy import create_engine
    >>> with tmpfile() as f:
    ...     db = 'sqlite:///%s' %f
    ...     ddf.to_sql('test', db)
    ...     engine = create_engine(db, echo=False)
    ...     result = engine.execute("SELECT * FROM test").fetchall()
    >>> result
    [(0, 0, '00'), (1, 1, '11'), (2, 2, '22'), (3, 3, '33')]
    z!Expected URI to be a string, got .)
r-   rj   r    rm   r   r   index_label	chunksizer<   methodrN   )r   c                    s:   g | ]2}t t|fd  iddt|fi  iqS )extrasdask_key_name	to_sql-%s)_extra_depsr   r   )rp   r   Z	meta_taskZworker_kwargsre   rf   rt   <  s   zto_sql.<locals>.<listcomp>r   r   r   r   N)r$   r%   r(   r'   dictr   r   Z_metaZ
to_delayedrN   r   r   r@   compute)r   r-   rj   rm   r   r   r   r   r<   r   r   Zparallelr    rV   resultZlastr   re   r   rf   r     sP    |
	

r   )r   c                O   s   | |i |S )Nre   )r6   r   argsrV   re   re   rf   r   ]  s    r   )NNNr
   r   NN)	NNNNr
   r   NNN)N)N)	NNNNr
   r   NNN)
Nr   TNNNNTFN)rv   ZnumpyrG   Zpandasr/   r@   Zdask.delayedr    r   r   ior   r	   rg   r   r0   rO   r   boolrz   r{   r%   r   r   re   re   re   rf   <module>   sz          
 3         
 F'

         
 U           6