import warnings from datetime import datetime import numpy as np import pandas as pd import pytest from pandas.api.types import is_scalar import dask.dataframe as dd from dask.dataframe._compat import PANDAS_GT_120, PANDAS_VERSION from dask.dataframe.utils import assert_dask_graph, assert_eq, make_meta try: import scipy except ImportError: scipy = None @pytest.mark.slow def test_arithmetics(): dsk = { ("x", 0): pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]), ("x", 1): pd.DataFrame({"a": [4, 5, 6], "b": [3, 2, 1]}, index=[5, 6, 8]), ("x", 2): pd.DataFrame({"a": [7, 8, 9], "b": [0, 0, 0]}, index=[9, 9, 9]), } meta = make_meta( {"a": "i8", "b": "i8"}, index=pd.Index([], "i8"), parent_meta=pd.DataFrame() ) ddf1 = dd.DataFrame(dsk, "x", meta, [0, 4, 9, 9]) pdf1 = ddf1.compute() pdf2 = pd.DataFrame({"a": [1, 2, 3, 4, 5, 6, 7, 8], "b": [5, 6, 7, 8, 1, 2, 3, 4]}) pdf3 = pd.DataFrame({"a": [5, 6, 7, 8, 4, 3, 2, 1], "b": [2, 4, 5, 3, 4, 2, 1, 0]}) ddf2 = dd.from_pandas(pdf2, 3) ddf3 = dd.from_pandas(pdf3, 2) dsk4 = { ("y", 0): pd.DataFrame({"a": [3, 2, 1], "b": [7, 8, 9]}, index=[0, 1, 3]), ("y", 1): pd.DataFrame({"a": [5, 2, 8], "b": [4, 2, 3]}, index=[5, 6, 8]), ("y", 2): pd.DataFrame({"a": [1, 4, 10], "b": [1, 0, 5]}, index=[9, 9, 9]), } ddf4 = dd.DataFrame(dsk4, "y", meta, [0, 4, 9, 9]) pdf4 = ddf4.compute() # Arithmetics cases = [ (ddf1, ddf1, pdf1, pdf1), (ddf1, ddf1.repartition([0, 1, 3, 6, 9]), pdf1, pdf1), (ddf2, ddf3, pdf2, pdf3), (ddf2.repartition([0, 3, 6, 7]), ddf3.repartition([0, 7]), pdf2, pdf3), (ddf2.repartition([0, 7]), ddf3.repartition([0, 2, 4, 5, 7]), pdf2, pdf3), (ddf1, ddf4, pdf1, pdf4), (ddf1, ddf4.repartition([0, 9]), pdf1, pdf4), (ddf1.repartition([0, 3, 9]), ddf4.repartition([0, 5, 9]), pdf1, pdf4), # dask + pandas (ddf1, pdf4, pdf1, pdf4), (ddf2, pdf3, pdf2, pdf3), ] for (l, r, el, er) in cases: check_series_arithmetics(l.a, r.b, el.a, er.b) check_frame_arithmetics(l, r, el, er) # different index, pandas raises ValueError in comparison ops pdf5 = pd.DataFrame( {"a": [3, 2, 1, 5, 2, 8, 1, 4, 10], "b": [7, 8, 9, 4, 2, 3, 1, 0, 5]}, index=[0, 1, 3, 5, 6, 8, 9, 9, 9], ) ddf5 = dd.from_pandas(pdf5, 2) pdf6 = pd.DataFrame( {"a": [3, 2, 1, 5, 2, 8, 1, 4, 10], "b": [7, 8, 9, 5, 7, 8, 4, 2, 5]}, index=[0, 1, 2, 3, 4, 5, 6, 7, 9], ) ddf6 = dd.from_pandas(pdf6, 4) pdf7 = pd.DataFrame( {"a": [1, 2, 3, 4, 5, 6, 7, 8], "b": [5, 6, 7, 8, 1, 2, 3, 4]}, index=list("aaabcdeh"), ) pdf8 = pd.DataFrame( {"a": [5, 6, 7, 8, 4, 3, 2, 1], "b": [2, 4, 5, 3, 4, 2, 1, 0]}, index=list("abcdefgh"), ) ddf7 = dd.from_pandas(pdf7, 3) ddf8 = dd.from_pandas(pdf8, 4) pdf9 = pd.DataFrame( { "a": [1, 2, 3, 4, 5, 6, 7, 8], "b": [5, 6, 7, 8, 1, 2, 3, 4], "c": [5, 6, 7, 8, 1, 2, 3, 4], }, index=list("aaabcdeh"), ) pdf10 = pd.DataFrame( { "b": [5, 6, 7, 8, 4, 3, 2, 1], "c": [2, 4, 5, 3, 4, 2, 1, 0], "d": [2, 4, 5, 3, 4, 2, 1, 0], }, index=list("abcdefgh"), ) ddf9 = dd.from_pandas(pdf9, 3) ddf10 = dd.from_pandas(pdf10, 4) # Arithmetics with different index cases = [ (ddf5, ddf6, pdf5, pdf6), (ddf5.repartition([0, 9]), ddf6, pdf5, pdf6), (ddf5.repartition([0, 5, 9]), ddf6.repartition([0, 7, 9]), pdf5, pdf6), (ddf7, ddf8, pdf7, pdf8), (ddf7.repartition(["a", "c", "h"]), ddf8.repartition(["a", "h"]), pdf7, pdf8), ( ddf7.repartition(["a", "b", "e", "h"]), ddf8.repartition(["a", "e", "h"]), pdf7, pdf8, ), (ddf9, ddf10, pdf9, pdf10), (ddf9.repartition(["a", "c", "h"]), ddf10.repartition(["a", "h"]), pdf9, pdf10), # dask + pandas (ddf5, pdf6, pdf5, pdf6), (ddf7, pdf8, pdf7, pdf8), (ddf9, pdf10, pdf9, pdf10), ] for (l, r, el, er) in cases: check_series_arithmetics(l.a, r.b, el.a, er.b, allow_comparison_ops=False) check_frame_arithmetics(l, r, el, er, allow_comparison_ops=False) def test_deterministic_arithmetic_names(): df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [5, 6, 7, 8]}) a = dd.from_pandas(df, npartitions=2) assert sorted((a.x + a.y**2).dask) == sorted((a.x + a.y**2).dask) assert sorted((a.x + a.y**2).dask) != sorted((a.x + a.y**3).dask) assert sorted((a.x + a.y**2).dask) != sorted((a.x - a.y**2).dask) @pytest.mark.slow def test_arithmetics_different_index(): # index are different, but overwraps pdf1 = pd.DataFrame( {"a": [1, 2, 3, 4, 5], "b": [3, 5, 2, 5, 7]}, index=[1, 2, 3, 4, 5] ) ddf1 = dd.from_pandas(pdf1, 2) pdf2 = pd.DataFrame( {"a": [3, 2, 6, 7, 8], "b": [9, 4, 2, 6, 2]}, index=[3, 4, 5, 6, 7] ) ddf2 = dd.from_pandas(pdf2, 2) # index are not overwrapped pdf3 = pd.DataFrame( {"a": [1, 2, 3, 4, 5], "b": [3, 5, 2, 5, 7]}, index=[1, 2, 3, 4, 5] ) ddf3 = dd.from_pandas(pdf3, 2) pdf4 = pd.DataFrame( {"a": [3, 2, 6, 7, 8], "b": [9, 4, 2, 6, 2]}, index=[10, 11, 12, 13, 14] ) ddf4 = dd.from_pandas(pdf4, 2) # index is included in another pdf5 = pd.DataFrame( {"a": [1, 2, 3, 4, 5], "b": [3, 5, 2, 5, 7]}, index=[1, 3, 5, 7, 9] ) ddf5 = dd.from_pandas(pdf5, 2) pdf6 = pd.DataFrame( {"a": [3, 2, 6, 7, 8], "b": [9, 4, 2, 6, 2]}, index=[2, 3, 4, 5, 6] ) ddf6 = dd.from_pandas(pdf6, 2) cases = [ (ddf1, ddf2, pdf1, pdf2), (ddf2, ddf1, pdf2, pdf1), (ddf1.repartition([1, 3, 5]), ddf2.repartition([3, 4, 7]), pdf1, pdf2), (ddf2.repartition([3, 4, 5, 7]), ddf1.repartition([1, 2, 4, 5]), pdf2, pdf1), (ddf3, ddf4, pdf3, pdf4), (ddf4, ddf3, pdf4, pdf3), ( ddf3.repartition([1, 2, 3, 4, 5]), ddf4.repartition([10, 11, 12, 13, 14]), pdf3, pdf4, ), (ddf4.repartition([10, 14]), ddf3.repartition([1, 3, 4, 5]), pdf4, pdf3), (ddf5, ddf6, pdf5, pdf6), (ddf6, ddf5, pdf6, pdf5), (ddf5.repartition([1, 7, 8, 9]), ddf6.repartition([2, 3, 4, 6]), pdf5, pdf6), (ddf6.repartition([2, 6]), ddf5.repartition([1, 3, 7, 9]), pdf6, pdf5), # dask + pandas (ddf1, pdf2, pdf1, pdf2), (ddf2, pdf1, pdf2, pdf1), (ddf3, pdf4, pdf3, pdf4), (ddf4, pdf3, pdf4, pdf3), (ddf5, pdf6, pdf5, pdf6), (ddf6, pdf5, pdf6, pdf5), ] for (l, r, el, er) in cases: check_series_arithmetics(l.a, r.b, el.a, er.b, allow_comparison_ops=False) check_frame_arithmetics(l, r, el, er, allow_comparison_ops=False) pdf7 = pd.DataFrame( {"a": [1, 2, 3, 4, 5, 6, 7, 8], "b": [5, 6, 7, 8, 1, 2, 3, 4]}, index=[0, 2, 4, 8, 9, 10, 11, 13], ) pdf8 = pd.DataFrame( {"a": [5, 6, 7, 8, 4, 3, 2, 1], "b": [2, 4, 5, 3, 4, 2, 1, 0]}, index=[1, 3, 4, 8, 9, 11, 12, 13], ) ddf7 = dd.from_pandas(pdf7, 3) ddf8 = dd.from_pandas(pdf8, 2) pdf9 = pd.DataFrame( {"a": [1, 2, 3, 4, 5, 6, 7, 8], "b": [5, 6, 7, 8, 1, 2, 3, 4]}, index=[0, 2, 4, 8, 9, 10, 11, 13], ) pdf10 = pd.DataFrame( {"a": [5, 6, 7, 8, 4, 3, 2, 1], "b": [2, 4, 5, 3, 4, 2, 1, 0]}, index=[0, 3, 4, 8, 9, 11, 12, 13], ) ddf9 = dd.from_pandas(pdf9, 3) ddf10 = dd.from_pandas(pdf10, 2) cases = [ (ddf7, ddf8, pdf7, pdf8), (ddf8, ddf7, pdf8, pdf7), # (ddf7.repartition([0, 13]), # ddf8.repartition([0, 4, 11, 14], force=True), # pdf7, pdf8), ( ddf8.repartition([-5, 10, 15], force=True), ddf7.repartition([-1, 4, 11, 14], force=True), pdf8, pdf7, ), ( ddf7.repartition([0, 8, 12, 13]), ddf8.repartition([0, 2, 8, 12, 13], force=True), pdf7, pdf8, ), ( ddf8.repartition([-5, 0, 10, 20], force=True), ddf7.repartition([-1, 4, 11, 13], force=True), pdf8, pdf7, ), (ddf9, ddf10, pdf9, pdf10), (ddf10, ddf9, pdf10, pdf9), # dask + pandas (ddf7, pdf8, pdf7, pdf8), (ddf8, pdf7, pdf8, pdf7), (ddf9, pdf10, pdf9, pdf10), (ddf10, pdf9, pdf10, pdf9), ] for (l, r, el, er) in cases: check_series_arithmetics(l.a, r.b, el.a, er.b, allow_comparison_ops=False) check_frame_arithmetics(l, r, el, er, allow_comparison_ops=False) def check_series_arithmetics(l, r, el, er, allow_comparison_ops=True): assert isinstance(l, dd.Series) assert isinstance(r, (dd.Series, pd.Series)) assert isinstance(el, pd.Series) assert isinstance(er, pd.Series) # l, r may be repartitioned, test whether repartition keeps original data assert_eq(l, el) assert_eq(r, er) assert_eq(l + r, el + er) assert_eq(l * r, el * er) assert_eq(l - r, el - er) assert_eq(l / r, el / er) assert_eq(l // r, el // er) assert_eq(l**r, el**er) assert_eq(l % r, el % er) if allow_comparison_ops: # comparison is allowed if data have same index assert_eq(l & r, el & er) assert_eq(l | r, el | er) assert_eq(l ^ r, el ^ er) assert_eq(l > r, el > er) assert_eq(l < r, el < er) assert_eq(l >= r, el >= er) assert_eq(l <= r, el <= er) assert_eq(l == r, el == er) assert_eq(l != r, el != er) assert_eq(l.lt(r), el.lt(er)) assert_eq(l.gt(r), el.gt(er)) assert_eq(l.le(r), el.le(er)) assert_eq(l.ge(r), el.ge(er)) assert_eq(l.ne(r), el.ne(er)) assert_eq(l.eq(r), el.eq(er)) assert_eq(l + 2, el + 2) assert_eq(l * 2, el * 2) assert_eq(l - 2, el - 2) assert_eq(l / 2, el / 2) assert_eq(l & True, el & True) assert_eq(l | True, el | True) assert_eq(l ^ True, el ^ True) assert_eq(l // 2, el // 2) assert_eq(l**2, el**2) assert_eq(l % 2, el % 2) assert_eq(l > 2, el > 2) assert_eq(l < 2, el < 2) assert_eq(l >= 2, el >= 2) assert_eq(l <= 2, el <= 2) assert_eq(l == 2, el == 2) assert_eq(l != 2, el != 2) assert_eq(2 + r, 2 + er) assert_eq(2 * r, 2 * er) assert_eq(2 - r, 2 - er) assert_eq(2 / r, 2 / er) assert_eq(True & r, True & er) assert_eq(True | r, True | er) assert_eq(True ^ r, True ^ er) assert_eq(2 // r, 2 // er) assert_eq(2**r, 2**er) assert_eq(2 % r, 2 % er) assert_eq(2 > r, 2 > er) assert_eq(2 < r, 2 < er) assert_eq(2 >= r, 2 >= er) assert_eq(2 <= r, 2 <= er) assert_eq(2 == r, 2 == er) assert_eq(2 != r, 2 != er) assert_eq(l.lt(2), el.lt(2)) assert_eq(l.gt(2), el.gt(2)) assert_eq(l.le(2), el.le(2)) assert_eq(l.ge(2), el.ge(2)) assert_eq(l.ne(2), el.ne(2)) assert_eq(l.eq(2), el.eq(2)) assert_eq(-l, -el) assert_eq(abs(l), abs(el)) if allow_comparison_ops: # comparison is allowed if data have same index assert_eq(~(l == r), ~(el == er)) def check_frame_arithmetics(l, r, el, er, allow_comparison_ops=True): assert isinstance(l, dd.DataFrame) assert isinstance(r, (dd.DataFrame, pd.DataFrame)) assert isinstance(el, pd.DataFrame) assert isinstance(er, pd.DataFrame) # l, r may be repartitioned, test whether repartition keeps original data assert_eq(l, el) assert_eq(r, er) assert_eq(l + r, el + er) assert_eq(l * r, el * er) assert_eq(l - r, el - er) assert_eq(l / r, el / er) assert_eq(l // r, el // er) assert_eq(l**r, el**er) assert_eq(l % r, el % er) if allow_comparison_ops: # comparison is allowed if data have same index assert_eq(l & r, el & er) assert_eq(l | r, el | er) assert_eq(l ^ r, el ^ er) assert_eq(l > r, el > er) assert_eq(l < r, el < er) assert_eq(l >= r, el >= er) assert_eq(l <= r, el <= er) assert_eq(l == r, el == er) assert_eq(l != r, el != er) assert_eq(l.lt(r), el.lt(er)) assert_eq(l.gt(r), el.gt(er)) assert_eq(l.le(r), el.le(er)) assert_eq(l.ge(r), el.ge(er)) assert_eq(l.ne(r), el.ne(er)) assert_eq(l.eq(r), el.eq(er)) assert_eq(l + 2, el + 2) assert_eq(l * 2, el * 2) assert_eq(l - 2, el - 2) assert_eq(l / 2, el / 2) assert_eq(l & True, el & True) assert_eq(l | True, el | True) assert_eq(l ^ True, el ^ True) assert_eq(l // 2, el // 2) assert_eq(l**2, el**2) assert_eq(l % 2, el % 2) assert_eq(l > 2, el > 2) assert_eq(l < 2, el < 2) assert_eq(l >= 2, el >= 2) assert_eq(l <= 2, el <= 2) assert_eq(l == 2, el == 2) assert_eq(l != 2, el != 2) assert_eq(2 + l, 2 + el) assert_eq(2 * l, 2 * el) assert_eq(2 - l, 2 - el) assert_eq(2 / l, 2 / el) assert_eq(True & l, True & el) assert_eq(True | l, True | el) assert_eq(True ^ l, True ^ el) assert_eq(2 // l, 2 // el) assert_eq(2**l, 2**el) assert_eq(2 % l, 2 % el) assert_eq(2 > l, 2 > el) assert_eq(2 < l, 2 < el) assert_eq(2 >= l, 2 >= el) assert_eq(2 <= l, 2 <= el) assert_eq(2 == l, 2 == el) assert_eq(2 != l, 2 != el) assert_eq(l.lt(2), el.lt(2)) assert_eq(l.gt(2), el.gt(2)) assert_eq(l.le(2), el.le(2)) assert_eq(l.ge(2), el.ge(2)) assert_eq(l.ne(2), el.ne(2)) assert_eq(l.eq(2), el.eq(2)) assert_eq(-l, -el) assert_eq(abs(l), abs(el)) if allow_comparison_ops: # comparison is allowed if data have same index assert_eq(~(l == r), ~(el == er)) def test_scalar_arithmetics(): el = np.int64(10) er = np.int64(4) l = dd.core.Scalar({("l", 0): el}, "l", "i8") r = dd.core.Scalar({("r", 0): er}, "r", "i8") assert isinstance(l, dd.core.Scalar) assert isinstance(r, dd.core.Scalar) assert_eq(l, el) assert_eq(r, er) assert_eq(l + r, el + er) assert_eq(l * r, el * er) assert_eq(l - r, el - er) assert_eq(l / r, el / er) assert_eq(l // r, el // er) assert_eq(l**r, el**er) assert_eq(l % r, el % er) assert_eq(l & r, el & er) assert_eq(l | r, el | er) assert_eq(l ^ r, el ^ er) assert_eq(l > r, el > er) assert_eq(l < r, el < er) assert_eq(l >= r, el >= er) assert_eq(l <= r, el <= er) assert_eq(l == r, el == er) assert_eq(l != r, el != er) assert_eq(l + 2, el + 2) assert_eq(l * 2, el * 2) assert_eq(l - 2, el - 2) assert_eq(l / 2, el / 2) assert_eq(l & True, el & True) assert_eq(l | True, el | True) assert_eq(l ^ True, el ^ True) assert_eq(l // 2, el // 2) assert_eq(l**2, el**2) assert_eq(l % 2, el % 2) assert_eq(l > 2, el > 2) assert_eq(l < 2, el < 2) assert_eq(l >= 2, el >= 2) assert_eq(l <= 2, el <= 2) assert_eq(l == 2, el == 2) assert_eq(l != 2, el != 2) assert_eq(2 + r, 2 + er) assert_eq(2 * r, 2 * er) assert_eq(2 - r, 2 - er) assert_eq(2 / r, 2 / er) assert_eq(True & r, True & er) assert_eq(True | r, True | er) assert_eq(True ^ r, True ^ er) assert_eq(2 // r, 2 // er) assert_eq(2**r, 2**er) assert_eq(2 % r, 2 % er) assert_eq(2 > r, 2 > er) assert_eq(2 < r, 2 < er) assert_eq(2 >= r, 2 >= er) assert_eq(2 <= r, 2 <= er) assert_eq(2 == r, 2 == er) assert_eq(2 != r, 2 != er) assert_eq(-l, -el) assert_eq(abs(l), abs(el)) assert_eq(~(l == r), ~(el == er)) def test_scalar_arithmetics_with_dask_instances(): s = dd.core.Scalar({("s", 0): 10}, "s", "i8") e = 10 pds = pd.Series([1, 2, 3, 4, 5, 6, 7]) dds = dd.from_pandas(pds, 2) pdf = pd.DataFrame({"a": [1, 2, 3, 4, 5, 6, 7], "b": [7, 6, 5, 4, 3, 2, 1]}) ddf = dd.from_pandas(pdf, 2) # pandas Series result = pds + s # this result pd.Series (automatically computed) assert isinstance(result, pd.Series) assert_eq(result, pds + e) result = s + pds # this result dd.Series assert isinstance(result, dd.Series) assert_eq(result, pds + e) # dask Series result = dds + s # this result dd.Series assert isinstance(result, dd.Series) assert_eq(result, pds + e) result = s + dds # this result dd.Series assert isinstance(result, dd.Series) assert_eq(result, pds + e) # pandas DataFrame result = pdf + s # this result pd.DataFrame (automatically computed) assert isinstance(result, pd.DataFrame) assert_eq(result, pdf + e) result = s + pdf # this result dd.DataFrame assert isinstance(result, dd.DataFrame) assert_eq(result, pdf + e) # dask DataFrame result = ddf + s # this result dd.DataFrame assert isinstance(result, dd.DataFrame) assert_eq(result, pdf + e) result = s + ddf # this result dd.DataFrame assert isinstance(result, dd.DataFrame) assert_eq(result, pdf + e) @pytest.mark.xfail( PANDAS_VERSION == "1.0.2", reason="https://github.com/pandas-dev/pandas/issues/32685", ) def test_frame_series_arithmetic_methods(): pdf1 = pd.DataFrame( { "A": np.arange(10), "B": [np.nan, 1, 2, 3, 4] * 2, "C": [np.nan] * 10, "D": np.arange(10), }, index=list("abcdefghij"), columns=list("ABCD"), ) pdf2 = pd.DataFrame( np.random.randn(10, 4), index=list("abcdefghjk"), columns=list("ABCX") ) ps1 = pdf1.A ps2 = pdf2.A ps3 = pd.Series(np.random.randn(10), index=list("ABCDXabcde")) ddf1 = dd.from_pandas(pdf1, 2) ddf2 = dd.from_pandas(pdf2, 2) ds1 = ddf1.A ds2 = ddf2.A s = dd.core.Scalar({("s", 0): 4}, "s", "i8") for l, r, el, er in [ (ddf1, ddf2, pdf1, pdf2), (ds1, ds2, ps1, ps2), (ddf1.repartition(["a", "f", "j"]), ddf2, pdf1, pdf2), (ds1.repartition(["a", "b", "f", "j"]), ds2, ps1, ps2), (ddf1, ddf2.repartition(["a", "k"]), pdf1, pdf2), (ds1, ds2.repartition(["a", "b", "d", "h", "k"]), ps1, ps2), (ddf1, 3, pdf1, 3), (ds1, 3, ps1, 3), (ddf1, s, pdf1, 4), (ds1, s, ps1, 4), ]: # l, r may be repartitioned, test whether repartition keeps original data assert_eq(l, el) assert_eq(r, er) assert_eq(l.add(r, fill_value=0), el.add(er, fill_value=0)) assert_eq(l.sub(r, fill_value=0), el.sub(er, fill_value=0)) assert_eq(l.mul(r, fill_value=0), el.mul(er, fill_value=0)) with warnings.catch_warnings(): # pandas-26793 warnings.simplefilter("ignore", RuntimeWarning) assert_eq(l.div(r, fill_value=0), el.div(er, fill_value=0)) assert_eq(l.divide(r, fill_value=0), el.divide(er, fill_value=0)) assert_eq(l.truediv(r, fill_value=0), el.truediv(er, fill_value=0)) assert_eq(l.floordiv(r, fill_value=1), el.floordiv(er, fill_value=1)) assert_eq(l.pow(r, fill_value=0), el.pow(er, fill_value=0)) assert_eq(l.mod(r, fill_value=0), el.mod(er, fill_value=0)) assert_eq(l.radd(r, fill_value=0), el.radd(er, fill_value=0)) assert_eq(l.rsub(r, fill_value=0), el.rsub(er, fill_value=0)) assert_eq(l.rmul(r, fill_value=0), el.rmul(er, fill_value=0)) with warnings.catch_warnings(): # pandas-26793 warnings.simplefilter("ignore", RuntimeWarning) assert_eq(l.rdiv(r, fill_value=0), el.rdiv(er, fill_value=0)) assert_eq(l.rtruediv(r, fill_value=0), el.rtruediv(er, fill_value=0)) assert_eq(l.rpow(r, fill_value=0), el.rpow(er, fill_value=0)) assert_eq(l.rmod(r, fill_value=0), el.rmod(er, fill_value=0)) for l, r, el, er in [(ddf1, ds2, pdf1, ps2), (ddf1, ddf2.X, pdf1, pdf2.X)]: assert_eq(l, el) assert_eq(r, er) # must specify axis=0 to add Series to each column # axis=1 is not supported (add to each row) assert_eq(l.add(r, axis=0), el.add(er, axis=0)) assert_eq(l.sub(r, axis=0), el.sub(er, axis=0)) assert_eq(l.mul(r, axis=0), el.mul(er, axis=0)) assert_eq(l.div(r, axis=0), el.div(er, axis=0)) assert_eq(l.divide(r, axis=0), el.divide(er, axis=0)) assert_eq(l.truediv(r, axis=0), el.truediv(er, axis=0)) assert_eq(l.floordiv(r, axis=0), el.floordiv(er, axis=0)) assert_eq(l.mod(r, axis=0), el.mod(er, axis=0)) assert_eq(l.pow(r, axis=0), el.pow(er, axis=0)) assert_eq(l.radd(r, axis=0), el.radd(er, axis=0)) assert_eq(l.rsub(r, axis=0), el.rsub(er, axis=0)) assert_eq(l.rmul(r, axis=0), el.rmul(er, axis=0)) assert_eq(l.rdiv(r, axis=0), el.rdiv(er, axis=0)) assert_eq(l.rtruediv(r, axis=0), el.rtruediv(er, axis=0)) assert_eq(l.rmod(r, axis=0), el.rmod(er, axis=0)) assert_eq(l.rpow(r, axis=0), el.rpow(er, axis=0)) pytest.raises(ValueError, lambda: l.add(r, axis=1)) for l, r, el, er in [(ddf1, pdf2, pdf1, pdf2), (ddf1, ps3, pdf1, ps3)]: assert_eq(l, el) assert_eq(r, er) for axis in [0, 1, "index", "columns"]: assert_eq(l.add(r, axis=axis), el.add(er, axis=axis)) assert_eq(l.sub(r, axis=axis), el.sub(er, axis=axis)) assert_eq(l.mul(r, axis=axis), el.mul(er, axis=axis)) assert_eq(l.div(r, axis=axis), el.div(er, axis=axis)) assert_eq(l.divide(r, axis=axis), el.divide(er, axis=axis)) assert_eq(l.truediv(r, axis=axis), el.truediv(er, axis=axis)) with warnings.catch_warnings(): # https://github.com/pandas-dev/pandas/issues/26793 warnings.simplefilter("ignore", RuntimeWarning) assert_eq(l.floordiv(r, axis=axis), el.floordiv(er, axis=axis)) assert_eq(l.mod(r, axis=axis), el.mod(er, axis=axis)) assert_eq(l.pow(r, axis=axis), el.pow(er, axis=axis)) assert_eq(l.rdiv(r, axis=axis), el.rdiv(er, axis=axis)) assert_eq(l.rtruediv(r, axis=axis), el.rtruediv(er, axis=axis)) assert_eq(l.rpow(r, axis=axis), el.rpow(er, axis=axis)) assert_eq(l.rmod(r, axis=axis), el.rmod(er, axis=axis)) assert_eq(l.radd(r, axis=axis), el.radd(er, axis=axis)) assert_eq(l.rsub(r, axis=axis), el.rsub(er, axis=axis)) assert_eq(l.rmul(r, axis=axis), el.rmul(er, axis=axis)) @pytest.mark.parametrize("split_every", [False, 2]) def test_reductions(split_every): dsk = { ("x", 0): pd.DataFrame( {"a": [1, 2, 3], "b": [4, 5, 6], "c": [True, True, False]}, index=[0, 1, 3] ), ("x", 1): pd.DataFrame( {"a": [4, 5, 6], "b": [3, 2, 1], "c": [False, False, False]}, index=[5, 6, 8], ), ("x", 2): pd.DataFrame( { "a": [13094304034, 3489385935, 100006774], "b": [0, 0, 0], "c": [True, True, True], }, index=[9, 9, 9], ), } meta = make_meta( {"a": "i8", "b": "i8", "c": "bool"}, index=pd.Index([], "i8"), parent_meta=pd.DataFrame(), ) ddf1 = dd.DataFrame(dsk, "x", meta, [0, 4, 9, 9]) pdf1 = ddf1.compute() nans1 = pd.Series([1] + [np.nan] * 4 + [2] + [np.nan] * 3) nands1 = dd.from_pandas(nans1, 2) nans2 = pd.Series([1] + [np.nan] * 8) nands2 = dd.from_pandas(nans2, 2) nans3 = pd.Series([np.nan] * 9) nands3 = dd.from_pandas(nans3, 2) bools = pd.Series([True, False, True, False, True], dtype=bool) boolds = dd.from_pandas(bools, 2) for dds, pds in [ (ddf1.a, pdf1.a), (ddf1.b, pdf1.b), (ddf1.c, pdf1.c), (ddf1["a"], pdf1["a"]), (ddf1["b"], pdf1["b"]), (nands1, nans1), (nands2, nans2), (nands3, nans3), (boolds, bools), ]: assert isinstance(dds, dd.Series) assert isinstance(pds, pd.Series) assert_eq(dds.sum(split_every=split_every), pds.sum()) assert_eq(dds.prod(split_every=split_every), pds.prod()) assert_eq(dds.product(split_every=split_every), pds.product()) assert_eq(dds.min(split_every=split_every), pds.min()) assert_eq(dds.max(split_every=split_every), pds.max()) assert_eq(dds.count(split_every=split_every), pds.count()) if scipy: # pandas uses unbiased skew, need to correct for that n = pds.shape[0] bias_factor = (n * (n - 1)) ** 0.5 / (n - 2) assert_eq(dds.skew(), pds.skew() / bias_factor) if scipy: # pandas uses a bias factor for kurtosis, need to correct for that n = pds.shape[0] factor = ((n - 1) * (n + 1)) / ((n - 2) * (n - 3)) offset = (6 * (n - 1)) / ((n - 2) * (n - 3)) assert_eq(factor * dds.kurtosis() + offset, pds.kurtosis()) with pytest.warns(None): # runtime warnings; https://github.com/dask/dask/issues/2381 assert_eq(dds.std(split_every=split_every), pds.std()) with pytest.warns(None): # runtime warnings; https://github.com/dask/dask/issues/2381 assert_eq(dds.var(split_every=split_every), pds.var()) with pytest.warns(None): # runtime warnings; https://github.com/dask/dask/issues/2381 assert_eq(dds.sem(split_every=split_every), pds.sem()) with warnings.catch_warnings(): # dask.dataframe should probably filter this, to match pandas, but # it seems quite difficult. warnings.simplefilter("ignore", RuntimeWarning) assert_eq(dds.std(ddof=0, split_every=split_every), pds.std(ddof=0)) assert_eq(dds.var(ddof=0, split_every=split_every), pds.var(ddof=0)) assert_eq(dds.sem(ddof=0, split_every=split_every), pds.sem(ddof=0)) assert_eq(dds.mean(split_every=split_every), pds.mean()) assert_eq(dds.nunique(split_every=split_every), pds.nunique()) assert_eq(dds.sum(skipna=False, split_every=split_every), pds.sum(skipna=False)) assert_eq( dds.prod(skipna=False, split_every=split_every), pds.prod(skipna=False) ) assert_eq( dds.product(skipna=False, split_every=split_every), pds.product(skipna=False), ) assert_eq(dds.min(skipna=False, split_every=split_every), pds.min(skipna=False)) assert_eq(dds.max(skipna=False, split_every=split_every), pds.max(skipna=False)) assert_eq(dds.std(skipna=False, split_every=split_every), pds.std(skipna=False)) assert_eq(dds.var(skipna=False, split_every=split_every), pds.var(skipna=False)) assert_eq(dds.sem(skipna=False, split_every=split_every), pds.sem(skipna=False)) assert_eq( dds.std(skipna=False, ddof=0, split_every=split_every), pds.std(skipna=False, ddof=0), ) assert_eq( dds.var(skipna=False, ddof=0, split_every=split_every), pds.var(skipna=False, ddof=0), ) assert_eq( dds.sem(skipna=False, ddof=0, split_every=split_every), pds.sem(skipna=False, ddof=0), ) assert_eq( dds.mean(skipna=False, split_every=split_every), pds.mean(skipna=False) ) assert_dask_graph(ddf1.b.sum(split_every=split_every), "series-sum") assert_dask_graph(ddf1.b.prod(split_every=split_every), "series-prod") assert_dask_graph(ddf1.b.min(split_every=split_every), "series-min") assert_dask_graph(ddf1.b.max(split_every=split_every), "series-max") assert_dask_graph(ddf1.b.count(split_every=split_every), "series-count") assert_dask_graph(ddf1.b.std(split_every=split_every), "series-std") assert_dask_graph(ddf1.b.var(split_every=split_every), "series-var") assert_dask_graph(ddf1.b.sem(split_every=split_every), "series-sem") assert_dask_graph(ddf1.b.std(ddof=0, split_every=split_every), "series-std") assert_dask_graph(ddf1.b.var(ddof=0, split_every=split_every), "series-var") assert_dask_graph(ddf1.b.sem(ddof=0, split_every=split_every), "series-sem") assert_dask_graph(ddf1.b.mean(split_every=split_every), "series-mean") # nunique is performed using drop-duplicates assert_dask_graph(ddf1.b.nunique(split_every=split_every), "drop-duplicates") # testing index assert_eq(ddf1.index.min(split_every=split_every), pdf1.index.min()) assert_eq(ddf1.index.max(split_every=split_every), pdf1.index.max()) assert_eq(ddf1.index.count(split_every=split_every), pd.notnull(pdf1.index).sum()) @pytest.mark.parametrize("split_every", [False, 2]) def test_reductions_timedelta(split_every): ds = pd.Series(pd.to_timedelta([2, 3, 4, np.nan, 5])) dds = dd.from_pandas(ds, 2) assert_eq(dds.sum(split_every=split_every), ds.sum()) assert_eq(dds.min(split_every=split_every), ds.min()) assert_eq(dds.max(split_every=split_every), ds.max()) assert_eq(dds.count(split_every=split_every), ds.count()) @pytest.mark.parametrize( "frame,axis,out", [ ( pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]), 0, pd.Series([], dtype="float64"), ), ( pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]), 1, pd.Series([], dtype="float64"), ), (pd.Series([1, 2.5, 6]), None, None), ], ) @pytest.mark.parametrize( "redfunc", ["sum", "prod", "product", "min", "max", "mean", "var", "std"] ) def test_reductions_out(frame, axis, out, redfunc): dsk_in = dd.from_pandas(frame, 3) dsk_out = dd.from_pandas(pd.Series([0]), 1).sum() if out is not None: dsk_out = dd.from_pandas(out, 3) np_redfunc = getattr(np, redfunc) pd_redfunc = getattr(frame.__class__, redfunc) dsk_redfunc = getattr(dsk_in.__class__, redfunc) if redfunc in ["var", "std"]: # numpy has default ddof value 0 while # dask and pandas have 1, so ddof should be passed # explicitly when calling np.var(dask) np_redfunc(dsk_in, axis=axis, ddof=1, out=dsk_out) else: np_redfunc(dsk_in, axis=axis, out=dsk_out) assert_eq(dsk_out, pd_redfunc(frame, axis=axis)) dsk_redfunc(dsk_in, axis=axis, split_every=False, out=dsk_out) assert_eq(dsk_out, pd_redfunc(frame, axis=axis)) dsk_redfunc(dsk_in, axis=axis, split_every=2, out=dsk_out) assert_eq(dsk_out, pd_redfunc(frame, axis=axis)) @pytest.mark.parametrize("split_every", [False, 2]) def test_allany(split_every): df = pd.DataFrame( np.random.choice([True, False], size=(100, 4)), columns=["A", "B", "C", "D"] ) df["E"] = list("abcde") * 20 ddf = dd.from_pandas(df, 10) assert_eq(ddf.all(split_every=split_every), df.all()) assert_eq(ddf.all(axis=1, split_every=split_every), df.all(axis=1)) assert_eq(ddf.all(axis=0, split_every=split_every), df.all(axis=0)) assert_eq(ddf.any(split_every=split_every), df.any()) assert_eq(ddf.any(axis=1, split_every=split_every), df.any(axis=1)) assert_eq(ddf.any(axis=0, split_every=split_every), df.any(axis=0)) assert_eq(ddf.A.all(split_every=split_every), df.A.all()) assert_eq(ddf.A.any(split_every=split_every), df.A.any()) # testing numpy functions with out param ddf_out_axis_default = dd.from_pandas( pd.Series([False, False, False, False, False], index=["A", "B", "C", "D", "E"]), 10, ) ddf_out_axis1 = dd.from_pandas( pd.Series(np.random.choice([True, False], size=(100,))), 10 ) # all ddf.all(split_every=split_every, out=ddf_out_axis_default) assert_eq(ddf_out_axis_default, df.all()) ddf.all(axis=1, split_every=split_every, out=ddf_out_axis1) assert_eq(ddf_out_axis1, df.all(axis=1)) ddf.all(split_every=split_every, axis=0, out=ddf_out_axis_default) assert_eq(ddf_out_axis_default, df.all(axis=0)) # any ddf.any(split_every=split_every, out=ddf_out_axis_default) assert_eq(ddf_out_axis_default, df.any()) ddf.any(axis=1, split_every=split_every, out=ddf_out_axis1) assert_eq(ddf_out_axis1, df.any(axis=1)) ddf.any(split_every=split_every, axis=0, out=ddf_out_axis_default) assert_eq(ddf_out_axis_default, df.any(axis=0)) @pytest.mark.parametrize("split_every", [False, 2]) def test_deterministic_reduction_names(split_every): df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [5, 6, 7, 8]}) ddf = dd.from_pandas(df, npartitions=2) for x in [ddf, ddf.x]: assert ( x.sum(split_every=split_every)._name == x.sum(split_every=split_every)._name ) assert ( x.prod(split_every=split_every)._name == x.prod(split_every=split_every)._name ) assert ( x.product(split_every=split_every)._name == x.product(split_every=split_every)._name ) assert ( x.min(split_every=split_every)._name == x.min(split_every=split_every)._name ) assert ( x.max(split_every=split_every)._name == x.max(split_every=split_every)._name ) assert ( x.count(split_every=split_every)._name == x.count(split_every=split_every)._name ) assert ( x.std(split_every=split_every)._name == x.std(split_every=split_every)._name ) assert ( x.var(split_every=split_every)._name == x.var(split_every=split_every)._name ) assert ( x.sem(split_every=split_every)._name == x.sem(split_every=split_every)._name ) assert ( x.mean(split_every=split_every)._name == x.mean(split_every=split_every)._name ) assert ( ddf.x.nunique(split_every=split_every)._name == ddf.x.nunique(split_every=split_every)._name ) def test_reduction_series_invalid_axis(): dsk = { ("x", 0): pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]), ("x", 1): pd.DataFrame({"a": [4, 5, 6], "b": [3, 2, 1]}, index=[5, 6, 8]), ("x", 2): pd.DataFrame({"a": [7, 8, 9], "b": [0, 0, 0]}, index=[9, 9, 9]), } meta = make_meta( {"a": "i8", "b": "i8"}, index=pd.Index([], "i8"), parent_meta=pd.DataFrame() ) ddf1 = dd.DataFrame(dsk, "x", meta, [0, 4, 9, 9]) pdf1 = ddf1.compute() for axis in [1, "columns"]: for s in [ddf1.a, pdf1.a]: # both must behave the same pytest.raises(ValueError, lambda: s.sum(axis=axis)) pytest.raises(ValueError, lambda: s.prod(axis=axis)) pytest.raises(ValueError, lambda: s.product(axis=axis)) pytest.raises(ValueError, lambda: s.min(axis=axis)) pytest.raises(ValueError, lambda: s.max(axis=axis)) # only count doesn't have axis keyword pytest.raises(TypeError, lambda: s.count(axis=axis)) pytest.raises(ValueError, lambda: s.std(axis=axis)) pytest.raises(ValueError, lambda: s.var(axis=axis)) pytest.raises(ValueError, lambda: s.sem(axis=axis)) pytest.raises(ValueError, lambda: s.mean(axis=axis)) def test_reductions_non_numeric_dtypes(): # test non-numric blocks def check_raises(d, p, func): pytest.raises((TypeError, ValueError), lambda: getattr(d, func)().compute()) pytest.raises((TypeError, ValueError), lambda: getattr(p, func)()) pds = pd.Series(["a", "b", "c", "d", "e"]) dds = dd.from_pandas(pds, 2) assert_eq(dds.sum(), pds.sum()) check_raises(dds, pds, "prod") check_raises(dds, pds, "product") assert_eq(dds.min(), pds.min()) assert_eq(dds.max(), pds.max()) assert_eq(dds.count(), pds.count()) check_raises(dds, pds, "std") check_raises(dds, pds, "var") check_raises(dds, pds, "sem") check_raises(dds, pds, "skew") check_raises(dds, pds, "kurtosis") assert_eq(dds.nunique(), pds.nunique()) for pds in [ pd.Series(pd.Categorical([1, 2, 3, 4, 5], ordered=True)), pd.Series(pd.Categorical(list("abcde"), ordered=True)), pd.Series(pd.date_range("2011-01-01", freq="D", periods=5)), ]: dds = dd.from_pandas(pds, 2) check_raises(dds, pds, "sum") check_raises(dds, pds, "prod") check_raises(dds, pds, "product") assert_eq(dds.min(), pds.min()) assert_eq(dds.max(), pds.max()) assert_eq(dds.count(), pds.count()) if PANDAS_GT_120 and pds.dtype == "datetime64[ns]": # std is implemented for datetimes in pandas 1.2.0, but dask # implementation depends on var which isn't pass else: check_raises(dds, pds, "std") check_raises(dds, pds, "var") check_raises(dds, pds, "sem") check_raises(dds, pds, "skew") check_raises(dds, pds, "kurtosis") assert_eq(dds.nunique(), pds.nunique()) pds = pd.Series(pd.timedelta_range("1 days", freq="D", periods=5)) dds = dd.from_pandas(pds, 2) assert_eq(dds.sum(), pds.sum()) assert_eq(dds.min(), pds.min()) assert_eq(dds.max(), pds.max()) assert_eq(dds.count(), pds.count()) # both pandas and dask skew calculations do not support timedelta check_raises(dds, pds, "skew") check_raises(dds, pds, "kurtosis") # ToDo: pandas supports timedelta std, dask returns float64 # assert_eq(dds.std(), pds.std()) # ToDo: pandas supports timedelta std, otherwise dask raises: # TypeError: unsupported operand type(s) for *: 'float' and 'Timedelta' # assert_eq(dds.mean(), pds.mean()) assert_eq(dds.nunique(), pds.nunique()) @pytest.mark.parametrize("split_every", [False, 2]) def test_reductions_frame(split_every): dsk = { ("x", 0): pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]), ("x", 1): pd.DataFrame({"a": [4, 5, 6], "b": [3, 2, 1]}, index=[5, 6, 8]), ("x", 2): pd.DataFrame({"a": [7, 8, 9], "b": [0, 0, 0]}, index=[9, 9, 9]), } meta = make_meta( {"a": "i8", "b": "i8"}, index=pd.Index([], "i8"), parent_meta=pd.DataFrame() ) ddf1 = dd.DataFrame(dsk, "x", meta, [0, 4, 9, 9]) pdf1 = ddf1.compute() assert_eq(ddf1.sum(split_every=split_every), pdf1.sum()) assert_eq(ddf1.prod(split_every=split_every), pdf1.prod()) assert_eq(ddf1.product(split_every=split_every), pdf1.product()) assert_eq(ddf1.min(split_every=split_every), pdf1.min()) assert_eq(ddf1.max(split_every=split_every), pdf1.max()) assert_eq(ddf1.count(split_every=split_every), pdf1.count()) assert_eq(ddf1.std(split_every=split_every), pdf1.std()) assert_eq(ddf1.var(split_every=split_every), pdf1.var()) assert_eq(ddf1.sem(split_every=split_every), pdf1.sem()) assert_eq(ddf1.std(ddof=0, split_every=split_every), pdf1.std(ddof=0)) assert_eq(ddf1.var(ddof=0, split_every=split_every), pdf1.var(ddof=0)) assert_eq(ddf1.sem(ddof=0, split_every=split_every), pdf1.sem(ddof=0)) assert_eq(ddf1.mean(split_every=split_every), pdf1.mean()) for axis in [0, 1, "index", "columns"]: assert_eq(ddf1.sum(axis=axis, split_every=split_every), pdf1.sum(axis=axis)) assert_eq(ddf1.prod(axis=axis, split_every=split_every), pdf1.prod(axis=axis)) assert_eq( ddf1.product(axis=axis, split_every=split_every), pdf1.product(axis=axis) ) assert_eq(ddf1.min(axis=axis, split_every=split_every), pdf1.min(axis=axis)) assert_eq(ddf1.max(axis=axis, split_every=split_every), pdf1.max(axis=axis)) assert_eq(ddf1.count(axis=axis, split_every=split_every), pdf1.count(axis=axis)) assert_eq(ddf1.std(axis=axis, split_every=split_every), pdf1.std(axis=axis)) assert_eq(ddf1.var(axis=axis, split_every=split_every), pdf1.var(axis=axis)) assert_eq(ddf1.sem(axis=axis, split_every=split_every), pdf1.sem(axis=axis)) assert_eq( ddf1.std(axis=axis, ddof=0, split_every=split_every), pdf1.std(axis=axis, ddof=0), ) assert_eq( ddf1.var(axis=axis, ddof=0, split_every=split_every), pdf1.var(axis=axis, ddof=0), ) assert_eq( ddf1.sem(axis=axis, ddof=0, split_every=split_every), pdf1.sem(axis=axis, ddof=0), ) assert_eq(ddf1.mean(axis=axis, split_every=split_every), pdf1.mean(axis=axis)) pytest.raises(ValueError, lambda: ddf1.sum(axis="incorrect").compute()) # axis=0 assert_dask_graph(ddf1.sum(split_every=split_every), "dataframe-sum") assert_dask_graph(ddf1.prod(split_every=split_every), "dataframe-prod") assert_dask_graph(ddf1.min(split_every=split_every), "dataframe-min") assert_dask_graph(ddf1.max(split_every=split_every), "dataframe-max") assert_dask_graph(ddf1.count(split_every=split_every), "dataframe-count") # std, var, sem, and mean consist of moment_* operations assert_dask_graph(ddf1.std(split_every=split_every), "dataframe-var") assert_dask_graph(ddf1.std(split_every=split_every), "moment_chunk") assert_dask_graph(ddf1.std(split_every=split_every), "moment_agg") assert_dask_graph(ddf1.std(split_every=split_every), "values") assert_dask_graph(ddf1.var(split_every=split_every), "moment_chunk") assert_dask_graph(ddf1.var(split_every=split_every), "moment_agg") assert_dask_graph(ddf1.var(split_every=split_every), "values") assert_dask_graph(ddf1.sem(split_every=split_every), "dataframe-var") assert_dask_graph(ddf1.sem(split_every=split_every), "moment_chunk") assert_dask_graph(ddf1.sem(split_every=split_every), "moment_agg") assert_dask_graph(ddf1.sem(split_every=split_every), "values") assert_dask_graph(ddf1.mean(split_every=split_every), "dataframe-sum") assert_dask_graph(ddf1.mean(split_every=split_every), "dataframe-count") # axis=1 assert_dask_graph(ddf1.sum(axis=1, split_every=split_every), "dataframe-sum") assert_dask_graph(ddf1.prod(axis=1, split_every=split_every), "dataframe-prod") assert_dask_graph(ddf1.min(axis=1, split_every=split_every), "dataframe-min") assert_dask_graph(ddf1.max(axis=1, split_every=split_every), "dataframe-max") assert_dask_graph(ddf1.count(axis=1, split_every=split_every), "dataframe-count") assert_dask_graph(ddf1.std(axis=1, split_every=split_every), "dataframe-std") assert_dask_graph(ddf1.var(axis=1, split_every=split_every), "dataframe-var") assert_dask_graph(ddf1.sem(axis=1, split_every=split_every), "dataframe-sem") assert_dask_graph(ddf1.mean(axis=1, split_every=split_every), "dataframe-mean") @pytest.mark.filterwarnings( "ignore:Dropping of nuisance columns:FutureWarning" ) # https://github.com/dask/dask/issues/7714 def test_reductions_frame_dtypes(): df = pd.DataFrame( { "int": [1, 2, 3, 4, 5, 6, 7, 8], "float": [1.0, 2.0, 3.0, 4.0, np.nan, 6.0, 7.0, 8.0], "dt": [pd.NaT] + [datetime(2011, i, 1) for i in range(1, 8)], "str": list("abcdefgh"), "timedelta": pd.to_timedelta([1, 2, 3, 4, 5, 6, 7, np.nan]), "bool": [True, False] * 4, } ) ddf = dd.from_pandas(df, 3) # TODO: std and mean do not support timedelta dtype df_no_timedelta = df.drop("timedelta", axis=1, inplace=False) ddf_no_timedelta = dd.from_pandas(df_no_timedelta, 3) assert_eq(df.drop(columns="dt").sum(), ddf.drop(columns="dt").sum()) assert_eq( df_no_timedelta.drop(columns="dt").mean(), ddf_no_timedelta.drop(columns="dt").mean(), ) assert_eq(df.prod(), ddf.prod()) assert_eq(df.product(), ddf.product()) assert_eq(df.min(), ddf.min()) assert_eq(df.max(), ddf.max()) assert_eq(df.count(), ddf.count()) assert_eq(df.sem(), ddf.sem()) assert_eq(df.sem(ddof=0), ddf.sem(ddof=0)) assert_eq(df_no_timedelta.std(), ddf_no_timedelta.std()) assert_eq(df_no_timedelta.std(skipna=False), ddf_no_timedelta.std(skipna=False)) assert_eq(df_no_timedelta.std(ddof=0), ddf_no_timedelta.std(ddof=0)) assert_eq(df_no_timedelta.var(), ddf_no_timedelta.var()) assert_eq(df_no_timedelta.var(skipna=False), ddf_no_timedelta.var(skipna=False)) assert_eq(df_no_timedelta.var(ddof=0), ddf_no_timedelta.var(ddof=0)) assert_eq( df_no_timedelta.var(ddof=0, skipna=False), ddf_no_timedelta.var(ddof=0, skipna=False), ) assert_eq(df._get_numeric_data(), ddf._get_numeric_data()) numerics = ddf[["int", "float"]] assert numerics._get_numeric_data().dask == numerics.dask # test var corner cases # only timedelta df_td = df[["timedelta"]] ddf_td = dd.from_pandas(df_td, 3) assert_eq(df_td.var(ddof=0), ddf_td.var(ddof=0)) assert_eq(df_td.var(), ddf_td.var()) # only numercis df_numerics = df[["int", "float", "bool"]] ddf_numerics = dd.from_pandas(df_numerics, 3) assert_eq(df_numerics.var(), ddf_numerics.var()) def test_reductions_frame_dtypes_numeric_only(): df = pd.DataFrame( { "int": [1, 2, 3, 4, 5, 6, 7, 8], "float": [1.0, 2.0, 3.0, 4.0, np.nan, 6.0, 7.0, 8.0], "dt": [pd.NaT] + [datetime(2011, i, 1) for i in range(1, 8)], "str": list("abcdefgh"), "timedelta": pd.to_timedelta([1, 2, 3, 4, 5, 6, 7, np.nan]), "bool": [True, False] * 4, } ) ddf = dd.from_pandas(df, 3) kwargs = {"numeric_only": True} funcs = [ "sum", "prod", "product", "min", "max", "mean", "var", "std", "count", "sem", ] for func in funcs: assert_eq( getattr(df, func)(**kwargs), getattr(ddf, func)(**kwargs), check_dtype=func in ["mean", "max"] and PANDAS_GT_120, ) with pytest.raises(NotImplementedError, match="'numeric_only=False"): getattr(ddf, func)(numeric_only=False) assert_eq(df.sem(ddof=0, **kwargs), ddf.sem(ddof=0, **kwargs)) assert_eq(df.std(ddof=0, **kwargs), ddf.std(ddof=0, **kwargs)) assert_eq(df.var(ddof=0, **kwargs), ddf.var(ddof=0, **kwargs)) assert_eq(df.var(skipna=False, **kwargs), ddf.var(skipna=False, **kwargs)) assert_eq( df.var(skipna=False, ddof=0, **kwargs), ddf.var(skipna=False, ddof=0, **kwargs) ) # ------ only include numerics columns ------ # assert_eq(df._get_numeric_data(), ddf._get_numeric_data()) df_numerics = df[["int", "float", "bool"]] ddf_numerics = ddf[["int", "float", "bool"]] assert_eq(df_numerics, ddf._get_numeric_data()) assert ddf_numerics._get_numeric_data().dask == ddf_numerics.dask for func in funcs: assert_eq( getattr(df_numerics, func)(), getattr(ddf_numerics, func)(), check_dtype=func in ["mean", "max"] and PANDAS_GT_120, ) @pytest.mark.parametrize("split_every", [False, 2]) def test_reductions_frame_nan(split_every): df = pd.DataFrame( { "a": [1, 2, np.nan, 4, 5, 6, 7, 8], "b": [1, 2, np.nan, np.nan, np.nan, 5, np.nan, np.nan], "c": [np.nan] * 8, } ) ddf = dd.from_pandas(df, 3) assert_eq(df.sum(), ddf.sum(split_every=split_every)) assert_eq(df.prod(), ddf.prod(split_every=split_every)) assert_eq(df.product(), ddf.product(split_every=split_every)) assert_eq(df.min(), ddf.min(split_every=split_every)) assert_eq(df.max(), ddf.max(split_every=split_every)) assert_eq(df.count(), ddf.count(split_every=split_every)) with warnings.catch_warnings(): # dask.dataframe should probably filter this, to match pandas, but # it seems quite difficult. warnings.simplefilter("ignore", RuntimeWarning) assert_eq(df.std(), ddf.std(split_every=split_every)) assert_eq(df.var(), ddf.var(split_every=split_every)) assert_eq(df.sem(), ddf.sem(split_every=split_every)) assert_eq(df.std(ddof=0), ddf.std(ddof=0, split_every=split_every)) assert_eq(df.var(ddof=0), ddf.var(ddof=0, split_every=split_every)) assert_eq(df.sem(ddof=0), ddf.sem(ddof=0, split_every=split_every)) assert_eq(df.mean(), ddf.mean(split_every=split_every)) with warnings.catch_warnings(record=True): assert_eq(df.sum(skipna=False), ddf.sum(skipna=False, split_every=split_every)) assert_eq( df.prod(skipna=False), ddf.prod(skipna=False, split_every=split_every) ) assert_eq( df.product(skipna=False), ddf.product(skipna=False, split_every=split_every) ) assert_eq(df.min(skipna=False), ddf.min(skipna=False, split_every=split_every)) assert_eq(df.max(skipna=False), ddf.max(skipna=False, split_every=split_every)) assert_eq(df.std(skipna=False), ddf.std(skipna=False, split_every=split_every)) assert_eq(df.var(skipna=False), ddf.var(skipna=False, split_every=split_every)) assert_eq(df.sem(skipna=False), ddf.sem(skipna=False, split_every=split_every)) assert_eq( df.std(skipna=False, ddof=0), ddf.std(skipna=False, ddof=0, split_every=split_every), ) assert_eq( df.var(skipna=False, ddof=0), ddf.var(skipna=False, ddof=0, split_every=split_every), ) assert_eq( df.sem(skipna=False, ddof=0), ddf.sem(skipna=False, ddof=0, split_every=split_every), ) assert_eq( df.mean(skipna=False), ddf.mean(skipna=False, split_every=split_every) ) assert_eq( df.sum(axis=1, skipna=False), ddf.sum(axis=1, skipna=False, split_every=split_every), ) assert_eq( df.prod(axis=1, skipna=False), ddf.prod(axis=1, skipna=False, split_every=split_every), ) assert_eq( df.product(axis=1, skipna=False), ddf.product(axis=1, skipna=False, split_every=split_every), ) assert_eq( df.min(axis=1, skipna=False), ddf.min(axis=1, skipna=False, split_every=split_every), ) assert_eq( df.max(axis=1, skipna=False), ddf.max(axis=1, skipna=False, split_every=split_every), ) assert_eq( df.std(axis=1, skipna=False), ddf.std(axis=1, skipna=False, split_every=split_every), ) assert_eq( df.var(axis=1, skipna=False), ddf.var(axis=1, skipna=False, split_every=split_every), ) assert_eq( df.sem(axis=1, skipna=False), ddf.sem(axis=1, skipna=False, split_every=split_every), ) assert_eq( df.std(axis=1, skipna=False, ddof=0), ddf.std(axis=1, skipna=False, ddof=0, split_every=split_every), ) assert_eq( df.var(axis=1, skipna=False, ddof=0), ddf.var(axis=1, skipna=False, ddof=0, split_every=split_every), ) assert_eq( df.sem(axis=1, skipna=False, ddof=0), ddf.sem(axis=1, skipna=False, ddof=0, split_every=split_every), ) assert_eq( df.mean(axis=1, skipna=False), ddf.mean(axis=1, skipna=False, split_every=split_every), ) @pytest.mark.parametrize("comparison", ["lt", "gt", "le", "ge", "ne", "eq"]) def test_series_comparison_nan(comparison): s = pd.Series([1, 2, 3, 4, 5, 6, 7]) s_nan = pd.Series([1, -1, 8, np.nan, 5, 6, 2.4]) ds = dd.from_pandas(s, 3) ds_nan = dd.from_pandas(s_nan, 3) fill_value = 7 comparison_pd = getattr(s, comparison) comparison_dd = getattr(ds, comparison) assert_eq( comparison_dd(ds_nan, fill_value=fill_value), comparison_pd(s_nan, fill_value=fill_value), ) def test_sum_intna(): a = pd.Series([1, None, 2], dtype=pd.Int32Dtype()) b = dd.from_pandas(a, 2) assert_eq(a.sum(), b.sum()) def test_divmod(): df1 = pd.Series(np.random.rand(10)) df2 = pd.Series(np.random.rand(10)) ddf1 = dd.from_pandas(df1, npartitions=3) ddf2 = dd.from_pandas(df2, npartitions=3) result = divmod(ddf1, 2.0) expected = divmod(df1, 2.0) assert_eq(result[0], expected[0]) assert_eq(result[1], expected[1]) result = divmod(ddf1, ddf2) expected = divmod(df1, df2) assert_eq(result[0], expected[0]) assert_eq(result[1], expected[1]) @pytest.mark.skipif("not scipy") def test_moment(): from dask.array import stats from dask.array.utils import assert_eq df = pd.Series(list(range(10))) ddf = dd.from_pandas(df, npartitions=2) assert_eq(stats.moment(ddf, 2, 0), scipy.stats.moment(df, 2, 0)) @pytest.mark.parametrize("func", ["sum", "count", "mean", "var", "sem"]) def test_empty_df_reductions(func): pdf = pd.DataFrame() ddf = dd.from_pandas(pdf, npartitions=1) dsk_func = getattr(ddf.__class__, func) pd_func = getattr(pdf.__class__, func) assert_eq(dsk_func(ddf), pd_func(pdf)) idx = pd.date_range("2000", periods=4) pdf = pd.DataFrame(index=idx) ddf = dd.from_pandas(pdf, npartitions=1) assert_eq(dsk_func(ddf), pd_func(pdf)) @pytest.mark.parametrize("method", ["sum", "prod", "product"]) @pytest.mark.parametrize("min_count", [0, 9]) def test_series_agg_with_min_count(method, min_count): df = pd.DataFrame([[1]], columns=["a"]) ddf = dd.from_pandas(df, npartitions=1) func = getattr(ddf["a"], method) result = func(min_count=min_count).compute() if min_count == 0: assert result == 1 else: assert result is np.nan # Default absolute tolerance of 2000 nanoseconds def assert_near_timedeltas(t1, t2, atol=2000): if is_scalar(t1): t1 = pd.Series([t1]) if is_scalar(t2): t2 = pd.Series([t2]) assert t1.dtype == t2.dtype assert_eq(pd.to_numeric(t1), pd.to_numeric(t2), atol=atol) @pytest.mark.skipif( not PANDAS_GT_120, reason="std() for datetime only added in pandas>=1.2" ) @pytest.mark.parametrize("axis", [0, 1]) def test_datetime_std_creates_copy_cols(axis): pdf = pd.DataFrame( { "dt1": [ datetime.fromtimestamp(1636426700 + (i * 250000)) for i in range(10) ], "dt2": [ datetime.fromtimestamp(1636426700 + (i * 300000)) for i in range(10) ], } ) ddf = dd.from_pandas(pdf, 3) # Series test (same line twice to make sure data structure wasn't mutated) assert_eq(ddf["dt1"].std(), pdf["dt1"].std()) assert_eq(ddf["dt1"].std(), pdf["dt1"].std()) # DataFrame test (same line twice to make sure data structure wasn't mutated) assert_near_timedeltas(ddf.std(axis=axis).compute(), pdf.std(axis=axis)) assert_near_timedeltas(ddf.std(axis=axis).compute(), pdf.std(axis=axis)) @pytest.mark.skipif( not PANDAS_GT_120, reason="std() for datetime only added in pandas>=1.2" ) @pytest.mark.parametrize("axis", [0, 1]) @pytest.mark.parametrize("skipna", [False, True]) def test_datetime_std_with_larger_dataset(axis, skipna): num_rows = 250 dt1 = pd.concat( [ pd.Series([pd.NaT] * 15, index=range(15)), pd.to_datetime( pd.Series( [ datetime.fromtimestamp(1636426704 + (i * 250000)) for i in range(num_rows - 15) ], index=range(15, 250), ) ), ], ignore_index=False, ) base_numbers = [ (1638290040706793300 + (i * 69527182702409)) for i in range(num_rows) ] pdf = pd.DataFrame( {"dt1": dt1, "dt2": pd.to_datetime(pd.Series(base_numbers))}, index=range(250) ) for i in range(3, 8): pdf[f"dt{i}"] = pd.to_datetime( pd.Series([int(x + (0.12 * i)) for x in base_numbers]) ) ddf = dd.from_pandas(pdf, 8) assert_near_timedeltas( ddf[["dt1"]].std(axis=axis, skipna=skipna).compute(), pdf[["dt1"]].std(axis=axis, skipna=skipna), ) # Same thing but as Series. No axis, since axis=1 raises error assert_near_timedeltas( ddf["dt1"].std(skipna=skipna).compute(), pdf["dt1"].std(skipna=skipna) ) # Computation on full dataset assert_near_timedeltas( ddf.std(axis=axis, skipna=skipna).compute(), pdf.std(axis=axis, skipna=skipna) ) @pytest.mark.skipif( not PANDAS_GT_120, reason="std() for datetime only added in pandas>=1.2" ) @pytest.mark.filterwarnings( "ignore:Dropping of nuisance columns:FutureWarning" ) # https://github.com/dask/dask/issues/7714 @pytest.mark.parametrize("skipna", [False, True]) def test_datetime_std_across_axis1_null_results(skipna): pdf = pd.DataFrame( { "dt1": [ datetime.fromtimestamp(1636426704 + (i * 250000)) for i in range(10) ], "dt2": [ datetime.fromtimestamp(1636426704 + (i * 217790)) for i in range(10) ], "nums": [i for i in range(10)], } ) ddf = dd.from_pandas(pdf, 3) # Single column always results in NaT assert_eq( ddf[["dt1"]].std(axis=1, skipna=skipna), pdf[["dt1"]].std(axis=1, skipna=skipna) ) # Mix of datetimes with other numeric types produces NaNs assert_eq(ddf.std(axis=1, skipna=skipna), pdf.std(axis=1, skipna=skipna)) # Test with mix of na and truthy datetimes pdf2 = pd.DataFrame( { "dt1": [pd.NaT] + [datetime.fromtimestamp(1636426704 + (i * 250000)) for i in range(10)] + [pd.NaT], "dt2": [ datetime.fromtimestamp(1636426704 + (i * 250000)) for i in range(12) ], "dt3": [ datetime.fromtimestamp(1636426704 + (i * 282616)) for i in range(12) ], } ) ddf2 = dd.from_pandas(pdf2, 3) assert_eq(ddf2.std(axis=1, skipna=skipna), pdf2.std(axis=1, skipna=skipna)) def test_std_raises_on_index(): with pytest.raises( NotImplementedError, match="`std` is only supported with objects that are Dataframes or Series", ): dd.from_pandas(pd.DataFrame({"test": [1, 2]}), npartitions=2).index.std()