import warnings
import weakref
import xml.etree.ElementTree
from itertools import product
from operator import add
import numpy as np
import pandas as pd
import pytest
from pandas.io.formats import format as pandas_format
import dask
import dask.array as da
import dask.dataframe as dd
import dask.dataframe.groupby
from dask.base import compute_as_if_collection
from dask.blockwise import fuse_roots
from dask.dataframe import _compat, methods
from dask.dataframe._compat import (
PANDAS_GT_110,
PANDAS_GT_120,
PANDAS_GT_140,
PANDAS_GT_150,
tm,
)
from dask.dataframe.core import (
Scalar,
_concat,
_map_freq_to_period_start,
aca,
has_parallel_type,
is_broadcastable,
repartition_divisions,
total_mem_usage,
)
from dask.dataframe.utils import assert_eq, assert_max_deps, make_meta
from dask.datasets import timeseries
from dask.utils import M, is_dataframe_like, is_series_like, put_lines
from dask.utils_test import _check_warning, hlg_layer
dsk = {
("x", 0): pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]),
("x", 1): pd.DataFrame({"a": [4, 5, 6], "b": [3, 2, 1]}, index=[5, 6, 8]),
("x", 2): pd.DataFrame({"a": [7, 8, 9], "b": [0, 0, 0]}, index=[9, 9, 9]),
}
meta = make_meta(
{"a": "i8", "b": "i8"}, index=pd.Index([], "i8"), parent_meta=pd.DataFrame()
)
d = dd.DataFrame(dsk, "x", meta, [0, 5, 9, 9])
full = d.compute()
CHECK_FREQ = {}
if dd._compat.PANDAS_GT_110:
CHECK_FREQ["check_freq"] = False
def test_dataframe_doc():
doc = d.add.__doc__
disclaimer = "Some inconsistencies with the Dask version may exist."
assert disclaimer in doc
def test_dataframe_doc_from_non_pandas():
class Foo:
def foo(self):
"""This is a new docstring that I just made up
Parameters:
----------
None
"""
d._bind_operator_method("foo", Foo.foo, original=Foo)
try:
doc = d.foo.__doc__
disclaimer = "Some inconsistencies with the Dask version may exist."
assert disclaimer in doc
assert "new docstring that I just made up" in doc
finally:
# make sure to clean up this alteration of the dd.DataFrame class
del dd.DataFrame.foo
def test_Dataframe():
expected = pd.Series(
[2, 3, 4, 5, 6, 7, 8, 9, 10], index=[0, 1, 3, 5, 6, 8, 9, 9, 9], name="a"
)
assert_eq(d["a"] + 1, expected)
tm.assert_index_equal(d.columns, pd.Index(["a", "b"]))
assert_eq(d[d["b"] > 2], full[full["b"] > 2])
assert_eq(d[["a", "b"]], full[["a", "b"]])
assert_eq(d.a, full.a)
assert d.b.mean().compute() == full.b.mean()
assert np.allclose(d.b.var().compute(), full.b.var())
assert np.allclose(d.b.std().compute(), full.b.std())
assert d.index._name == d.index._name # this is deterministic
assert repr(d)
def test_head_tail():
assert_eq(d.head(2), full.head(2))
assert_eq(d.head(3), full.head(3))
assert_eq(d.head(2), dsk[("x", 0)].head(2))
assert_eq(d["a"].head(2), full["a"].head(2))
assert_eq(d["a"].head(3), full["a"].head(3))
assert_eq(d["a"].head(2), dsk[("x", 0)]["a"].head(2))
assert sorted(d.head(2, compute=False).dask) == sorted(
d.head(2, compute=False).dask
)
assert sorted(d.head(2, compute=False).dask) != sorted(
d.head(3, compute=False).dask
)
assert_eq(d.tail(2), full.tail(2))
assert_eq(d.tail(3), full.tail(3))
assert_eq(d.tail(2), dsk[("x", 2)].tail(2))
assert_eq(d["a"].tail(2), full["a"].tail(2))
assert_eq(d["a"].tail(3), full["a"].tail(3))
assert_eq(d["a"].tail(2), dsk[("x", 2)]["a"].tail(2))
assert sorted(d.tail(2, compute=False).dask) == sorted(
d.tail(2, compute=False).dask
)
assert sorted(d.tail(2, compute=False).dask) != sorted(
d.tail(3, compute=False).dask
)
def test_head_npartitions():
assert_eq(d.head(5, npartitions=2), full.head(5))
assert_eq(d.head(5, npartitions=2, compute=False), full.head(5))
assert_eq(d.head(5, npartitions=-1), full.head(5))
assert_eq(d.head(7, npartitions=-1), full.head(7))
assert_eq(d.head(2, npartitions=-1), full.head(2))
with pytest.raises(ValueError):
d.head(2, npartitions=5)
def test_head_npartitions_warn():
match = "5 elements requested, only 3 elements"
with pytest.warns(UserWarning, match=match):
d.head(5)
match = "Insufficient elements"
with pytest.warns(UserWarning, match=match):
d.head(100)
with pytest.warns(UserWarning, match=match):
d.head(7)
with pytest.warns(UserWarning, match=match):
d.head(7, npartitions=2)
# No warn if all partitions are inspected
for n in [3, -1]:
with pytest.warns(None) as rec:
d.head(10, npartitions=n)
assert not rec
# With default args, this means that a 1 partition dataframe won't warn
d2 = dd.from_pandas(pd.DataFrame({"x": [1, 2, 3]}), npartitions=1)
with pytest.warns(None) as rec:
d2.head()
assert not rec
def test_index_head():
assert_eq(d.index.head(2), full.index[:2])
assert_eq(d.index.head(3), full.index[:3])
def test_Series():
assert isinstance(d.a, dd.Series)
assert isinstance(d.a + 1, dd.Series)
assert_eq((d + 1), full + 1)
def test_Index():
for case in [
pd.DataFrame(np.random.randn(10, 5), index=list("abcdefghij")),
pd.DataFrame(
np.random.randn(10, 5),
index=pd.date_range("2011-01-01", freq="D", periods=10),
),
]:
ddf = dd.from_pandas(case, 3)
assert_eq(ddf.index, case.index)
pytest.raises(AttributeError, lambda: ddf.index.index)
def test_axes():
pdf = pd.DataFrame({"col1": [1, 2], "col2": [3, 4]})
df = dd.from_pandas(pdf, npartitions=2)
assert len(df.axes) == len(pdf.axes)
assert all(assert_eq(d, p) for d, p in zip(df.axes, pdf.axes))
def test_series_axes():
ps = pd.Series(["abcde"])
ds = dd.from_pandas(ps, npartitions=2)
assert len(ds.axes) == len(ps.axes)
assert all(assert_eq(d, p) for d, p in zip(ds.axes, ps.axes))
def test_Scalar():
val = np.int64(1)
s = Scalar({("a", 0): val}, "a", "i8")
assert hasattr(s, "dtype")
assert "dtype" in dir(s)
assert_eq(s, val)
assert repr(s) == "dd.Scalar"
val = pd.Timestamp("2001-01-01")
s = Scalar({("a", 0): val}, "a", val)
assert not hasattr(s, "dtype")
assert "dtype" not in dir(s)
assert_eq(s, val)
assert repr(s) == "dd.Scalar"
def test_scalar_raises():
val = np.int64(1)
s = Scalar({("a", 0): val}, "a", "i8")
msg = "cannot be converted to a boolean value"
with pytest.raises(TypeError, match=msg):
bool(s)
def test_attributes():
assert "a" in dir(d)
assert "foo" not in dir(d)
pytest.raises(AttributeError, lambda: d.foo)
df = dd.from_pandas(pd.DataFrame({"a b c": [1, 2, 3]}), npartitions=2)
assert "a b c" not in dir(df)
df = dd.from_pandas(pd.DataFrame({"a": [1, 2], 5: [1, 2]}), npartitions=2)
assert "a" in dir(df)
assert 5 not in dir(df)
df = dd.from_pandas(_compat.makeTimeDataFrame(), npartitions=3)
pytest.raises(AttributeError, lambda: df.foo)
def test_column_names():
tm.assert_index_equal(d.columns, pd.Index(["a", "b"]))
tm.assert_index_equal(d[["b", "a"]].columns, pd.Index(["b", "a"]))
assert d["a"].name == "a"
assert (d["a"] + 1).name == "a"
assert (d["a"] + d["b"]).name is None
def test_columns_named_divisions_and_meta():
# https://github.com/dask/dask/issues/7599
df = pd.DataFrame(
{"_meta": [1, 2, 3, 4], "divisions": ["a", "b", "c", "d"]},
index=[0, 1, 3, 5],
)
ddf = dd.from_pandas(df, 2)
assert ddf.divisions == (0, 3, 5)
assert_eq(ddf["divisions"], df.divisions)
assert all(ddf._meta.columns == ["_meta", "divisions"])
assert_eq(ddf["_meta"], df._meta)
def test_index_names():
assert d.index.name is None
idx = pd.Index([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], name="x")
df = pd.DataFrame(np.random.randn(10, 5), idx)
ddf = dd.from_pandas(df, 3)
assert ddf.index.name == "x"
assert ddf.index.compute().name == "x"
@pytest.mark.skipif(dd._compat.PANDAS_GT_130, reason="Freq no longer included in ts")
@pytest.mark.parametrize(
"npartitions",
[
1,
pytest.param(
2,
marks=pytest.mark.xfail(
not dd._compat.PANDAS_GT_110, reason="Fixed upstream."
),
),
],
)
def test_timezone_freq(npartitions):
s_naive = pd.Series(pd.date_range("20130101", periods=10))
s_aware = pd.Series(pd.date_range("20130101", periods=10, tz="US/Eastern"))
pdf = pd.DataFrame({"tz": s_aware, "notz": s_naive})
ddf = dd.from_pandas(pdf, npartitions=npartitions)
assert pdf.tz[0].freq == ddf.compute().tz[0].freq == ddf.tz.compute()[0].freq
def test_rename_columns():
# GH 819
df = pd.DataFrame({"a": [1, 2, 3, 4, 5, 6, 7], "b": [7, 6, 5, 4, 3, 2, 1]})
ddf = dd.from_pandas(df, 2)
ddf.columns = ["x", "y"]
df.columns = ["x", "y"]
tm.assert_index_equal(ddf.columns, pd.Index(["x", "y"]))
tm.assert_index_equal(ddf._meta.columns, pd.Index(["x", "y"]))
assert_eq(ddf, df)
msg = r"Length mismatch: Expected axis has 2 elements, new values have 4 elements"
with pytest.raises(ValueError) as err:
ddf.columns = [1, 2, 3, 4]
assert msg in str(err.value)
# Multi-index columns
df = pd.DataFrame({("A", "0"): [1, 2, 2, 3], ("B", 1): [1, 2, 3, 4]})
ddf = dd.from_pandas(df, npartitions=2)
df.columns = ["x", "y"]
ddf.columns = ["x", "y"]
tm.assert_index_equal(ddf.columns, pd.Index(["x", "y"]))
tm.assert_index_equal(ddf._meta.columns, pd.Index(["x", "y"]))
assert_eq(ddf, df)
def test_rename_series():
# GH 819
s = pd.Series([1, 2, 3, 4, 5, 6, 7], name="x")
ds = dd.from_pandas(s, 2)
s.name = "renamed"
ds.name = "renamed"
assert s.name == "renamed"
assert_eq(ds, s)
ind = s.index
dind = ds.index
ind.name = "renamed"
dind.name = "renamed"
assert ind.name == "renamed"
assert_eq(dind, ind)
def test_rename_series_method():
# Series name
s = pd.Series([1, 2, 3, 4, 5, 6, 7], name="x")
ds = dd.from_pandas(s, 2)
assert_eq(ds.rename("y"), s.rename("y"))
assert ds.name == "x" # no mutation
assert_eq(ds.rename(), s.rename())
assert_eq(ds, s)
def test_rename_series_method_2():
# Series index
s = pd.Series(["a", "b", "c", "d", "e", "f", "g"], name="x")
ds = dd.from_pandas(s, 2)
for is_sorted in [True, False]:
res = ds.rename(lambda x: x**2, sorted_index=is_sorted)
assert_eq(res, s.rename(lambda x: x**2))
assert res.known_divisions == is_sorted
res = ds.rename(s, sorted_index=is_sorted)
assert_eq(res, s.rename(s))
assert res.known_divisions == is_sorted
with pytest.raises(ValueError):
ds.rename(lambda x: -x, sorted_index=True)
assert_eq(ds.rename(lambda x: -x), s.rename(lambda x: -x))
res = ds.rename(ds)
assert_eq(res, s.rename(s))
assert not res.known_divisions
ds2 = ds.clear_divisions()
res = ds2.rename(lambda x: x**2, sorted_index=True)
assert_eq(res, s.rename(lambda x: x**2))
assert not res.known_divisions
res = ds.rename(lambda x: x**2, inplace=True, sorted_index=True)
assert res is ds
s.rename(lambda x: x**2, inplace=True)
assert_eq(ds, s)
@pytest.mark.parametrize(
"method,test_values", [("tdigest", (6, 10)), ("dask", (4, 20))]
)
def test_describe_numeric(method, test_values):
if method == "tdigest":
pytest.importorskip("crick")
# prepare test case which approx quantiles will be the same as actuals
s = pd.Series(list(range(test_values[1])) * test_values[0])
df = pd.DataFrame(
{
"a": list(range(test_values[1])) * test_values[0],
"b": list(range(test_values[0])) * test_values[1],
}
)
ds = dd.from_pandas(s, test_values[0])
ddf = dd.from_pandas(df, test_values[0])
test_quantiles = [0.25, 0.75]
assert_eq(df.describe(), ddf.describe(percentiles_method=method))
assert_eq(s.describe(), ds.describe(percentiles_method=method))
assert_eq(
df.describe(percentiles=test_quantiles),
ddf.describe(percentiles=test_quantiles, percentiles_method=method),
)
assert_eq(s.describe(), ds.describe(split_every=2, percentiles_method=method))
assert_eq(df.describe(), ddf.describe(split_every=2, percentiles_method=method))
# remove string columns
df = pd.DataFrame(
{
"a": list(range(test_values[1])) * test_values[0],
"b": list(range(test_values[0])) * test_values[1],
"c": list("abcdef"[: test_values[0]]) * test_values[1],
}
)
ddf = dd.from_pandas(df, test_values[0])
assert_eq(df.describe(), ddf.describe(percentiles_method=method))
assert_eq(df.describe(), ddf.describe(split_every=2, percentiles_method=method))
@pytest.mark.parametrize(
"include,exclude,percentiles,subset",
[
(None, None, None, ["c", "d"]), # numeric
(None, None, None, ["c", "d", "f"]), # numeric + timedelta
(None, None, None, ["c", "d", "g"]), # numeric + bool
(None, None, None, ["c", "d", "f", "g"]), # numeric + bool + timedelta
(None, None, None, ["f", "g"]), # bool + timedelta
("all", None, None, None),
(["number"], None, [0.25, 0.5], None),
([np.timedelta64], None, None, None),
(["number", "object"], None, [0.25, 0.75], None),
(None, ["number", "object"], None, None),
(["object", "datetime", "bool"], None, None, None),
],
)
def test_describe(include, exclude, percentiles, subset):
data = {
"a": ["aaa", "bbb", "bbb", None, None, "zzz"] * 2,
"c": [None, 0, 1, 2, 3, 4] * 2,
"d": [None, 0, 1] * 4,
"e": [
pd.Timestamp("2017-05-09 00:00:00.006000"),
pd.Timestamp("2017-05-09 00:00:00.006000"),
pd.Timestamp("2017-05-09 07:56:23.858694"),
pd.Timestamp("2017-05-09 05:59:58.938999"),
None,
None,
]
* 2,
"f": [
np.timedelta64(3, "D"),
np.timedelta64(1, "D"),
None,
None,
np.timedelta64(3, "D"),
np.timedelta64(1, "D"),
]
* 2,
"g": [True, False, True] * 4,
}
# Arrange
df = pd.DataFrame(data)
if subset is not None:
df = df.loc[:, subset]
ddf = dd.from_pandas(df, 2)
if PANDAS_GT_110:
datetime_is_numeric_kwarg = {"datetime_is_numeric": True}
else:
datetime_is_numeric_kwarg = {}
# Act
actual = ddf.describe(
include=include,
exclude=exclude,
percentiles=percentiles,
**datetime_is_numeric_kwarg,
)
expected = df.describe(
include=include,
exclude=exclude,
percentiles=percentiles,
**datetime_is_numeric_kwarg,
)
if "e" in expected and datetime_is_numeric_kwarg:
expected.at["mean", "e"] = np.nan
expected.dropna(how="all", inplace=True)
assert_eq(actual, expected)
# Check series
if subset is None:
for col in ["a", "c", "e", "g"]:
expected = df[col].describe(
include=include, exclude=exclude, **datetime_is_numeric_kwarg
)
if col == "e" and datetime_is_numeric_kwarg:
expected.drop("mean", inplace=True)
actual = ddf[col].describe(
include=include, exclude=exclude, **datetime_is_numeric_kwarg
)
assert_eq(expected, actual)
def test_describe_without_datetime_is_numeric():
data = {
"a": ["aaa", "bbb", "bbb", None, None, "zzz"] * 2,
"c": [None, 0, 1, 2, 3, 4] * 2,
"d": [None, 0, 1] * 4,
"e": [
pd.Timestamp("2017-05-09 00:00:00.006000"),
pd.Timestamp("2017-05-09 00:00:00.006000"),
pd.Timestamp("2017-05-09 07:56:23.858694"),
pd.Timestamp("2017-05-09 05:59:58.938999"),
None,
None,
]
* 2,
}
# Arrange
df = pd.DataFrame(data)
ddf = dd.from_pandas(df, 2)
# Assert
assert_eq(ddf.describe(), df.describe())
# Check series
for col in ["a", "c"]:
assert_eq(df[col].describe(), ddf[col].describe())
if PANDAS_GT_110:
with pytest.warns(
FutureWarning,
match=(
"Treating datetime data as categorical rather than numeric in `.describe` is deprecated"
),
):
ddf.e.describe()
else:
assert_eq(df.e.describe(), ddf.e.describe())
with pytest.raises(
NotImplementedError,
match="datetime_is_numeric=True is only supported for pandas >= 1.1.0",
):
ddf.e.describe(datetime_is_numeric=True)
def test_describe_empty():
df_none = pd.DataFrame({"A": [None, None]})
ddf_none = dd.from_pandas(df_none, 2)
df_len0 = pd.DataFrame({"A": [], "B": []})
ddf_len0 = dd.from_pandas(df_len0, 2)
ddf_nocols = dd.from_pandas(pd.DataFrame({}), 2)
# Pandas have different dtypes for resulting describe dataframe if there are only
# None-values, pre-compute dask df to bypass _meta check
assert_eq(
df_none.describe(), ddf_none.describe(percentiles_method="dask").compute()
)
with pytest.raises((ValueError, RuntimeWarning)):
ddf_len0.describe(percentiles_method="dask").compute()
with pytest.raises(ValueError):
ddf_nocols.describe(percentiles_method="dask").compute()
def test_describe_empty_tdigest():
pytest.importorskip("crick")
df_none = pd.DataFrame({"A": [None, None]})
ddf_none = dd.from_pandas(df_none, 2)
df_len0 = pd.DataFrame({"A": []})
ddf_len0 = dd.from_pandas(df_len0, 2)
ddf_nocols = dd.from_pandas(pd.DataFrame({}), 2)
# Pandas have different dtypes for resulting describe dataframe if there are only
# None-values, pre-compute dask df to bypass _meta check
assert_eq(
df_none.describe(), ddf_none.describe(percentiles_method="tdigest").compute()
)
with warnings.catch_warnings():
# dask.dataframe should probably filter this, to match pandas, but
# it seems quite difficult.
warnings.simplefilter("ignore", RuntimeWarning)
assert_eq(df_len0.describe(), ddf_len0.describe(percentiles_method="tdigest"))
assert_eq(df_len0.describe(), ddf_len0.describe(percentiles_method="tdigest"))
with pytest.raises(ValueError):
ddf_nocols.describe(percentiles_method="tdigest").compute()
def test_describe_for_possibly_unsorted_q():
"""make sure describe is sorting percentiles parameter, q, properly and can
handle lists, tuples and ndarrays.
See https://github.com/dask/dask/issues/4642.
"""
# prepare test case where quantiles should equal values
A = da.arange(0, 101)
ds = dd.from_dask_array(A)
for q in [None, [0.25, 0.50, 0.75], [0.25, 0.50, 0.75, 0.99], [0.75, 0.5, 0.25]]:
for f_convert in [list, tuple, np.array]:
if q is None:
r = ds.describe(percentiles=q).compute()
else:
r = ds.describe(percentiles=f_convert(q)).compute()
assert_eq(r["25%"], 25.0)
assert_eq(r["50%"], 50.0)
assert_eq(r["75%"], 75.0)
def test_cumulative():
index = [f"row{i:03d}" for i in range(100)]
df = pd.DataFrame(np.random.randn(100, 5), columns=list("abcde"), index=index)
df_out = pd.DataFrame(np.random.randn(100, 5), columns=list("abcde"), index=index)
ddf = dd.from_pandas(df, 5)
ddf_out = dd.from_pandas(df_out, 5)
assert_eq(ddf.cumsum(), df.cumsum())
assert_eq(ddf.cumprod(), df.cumprod())
assert_eq(ddf.cummin(), df.cummin())
assert_eq(ddf.cummax(), df.cummax())
assert_eq(ddf.cumsum(axis=1), df.cumsum(axis=1))
assert_eq(ddf.cumprod(axis=1), df.cumprod(axis=1))
assert_eq(ddf.cummin(axis=1), df.cummin(axis=1))
assert_eq(ddf.cummax(axis=1), df.cummax(axis=1))
np.cumsum(ddf, out=ddf_out)
assert_eq(ddf_out, df.cumsum())
np.cumprod(ddf, out=ddf_out)
assert_eq(ddf_out, df.cumprod())
ddf.cummin(out=ddf_out)
assert_eq(ddf_out, df.cummin())
ddf.cummax(out=ddf_out)
assert_eq(ddf_out, df.cummax())
np.cumsum(ddf, out=ddf_out, axis=1)
assert_eq(ddf_out, df.cumsum(axis=1))
np.cumprod(ddf, out=ddf_out, axis=1)
assert_eq(ddf_out, df.cumprod(axis=1))
ddf.cummin(out=ddf_out, axis=1)
assert_eq(ddf_out, df.cummin(axis=1))
ddf.cummax(out=ddf_out, axis=1)
assert_eq(ddf_out, df.cummax(axis=1))
assert_eq(ddf.a.cumsum(), df.a.cumsum())
assert_eq(ddf.a.cumprod(), df.a.cumprod())
assert_eq(ddf.a.cummin(), df.a.cummin())
assert_eq(ddf.a.cummax(), df.a.cummax())
# With NaNs
df = pd.DataFrame(
{
"a": [1, 2, np.nan, 4, 5, 6, 7, 8],
"b": [1, 2, np.nan, np.nan, np.nan, 5, np.nan, np.nan],
"c": [np.nan] * 8,
}
)
ddf = dd.from_pandas(df, 3)
assert_eq(df.cumsum(), ddf.cumsum())
assert_eq(df.cummin(), ddf.cummin())
assert_eq(df.cummax(), ddf.cummax())
assert_eq(df.cumprod(), ddf.cumprod())
assert_eq(df.cumsum(skipna=False), ddf.cumsum(skipna=False))
assert_eq(df.cummin(skipna=False), ddf.cummin(skipna=False))
assert_eq(df.cummax(skipna=False), ddf.cummax(skipna=False))
assert_eq(df.cumprod(skipna=False), ddf.cumprod(skipna=False))
assert_eq(df.cumsum(axis=1), ddf.cumsum(axis=1))
assert_eq(df.cummin(axis=1), ddf.cummin(axis=1))
assert_eq(df.cummax(axis=1), ddf.cummax(axis=1))
assert_eq(df.cumprod(axis=1), ddf.cumprod(axis=1))
assert_eq(df.cumsum(axis=1, skipna=False), ddf.cumsum(axis=1, skipna=False))
assert_eq(df.cummin(axis=1, skipna=False), ddf.cummin(axis=1, skipna=False))
assert_eq(df.cummax(axis=1, skipna=False), ddf.cummax(axis=1, skipna=False))
assert_eq(df.cumprod(axis=1, skipna=False), ddf.cumprod(axis=1, skipna=False))
@pytest.mark.parametrize(
"func",
[
M.cumsum,
M.cumprod,
pytest.param(
M.cummin,
marks=[
pytest.mark.xfail(
reason="ValueError: Can only compare identically-labeled Series objects"
)
],
),
pytest.param(
M.cummax,
marks=[
pytest.mark.xfail(
reason="ValueError: Can only compare identically-labeled Series objects"
)
],
),
],
)
def test_cumulative_empty_partitions(func):
df = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6, 7, 8]})
ddf = dd.from_pandas(df, npartitions=4)
assert_eq(func(df[df.x < 5]), func(ddf[ddf.x < 5]))
df = pd.DataFrame({"x": [1, 2, 3, 4, None, 5, 6, None, 7, 8]})
ddf = dd.from_pandas(df, npartitions=5)
assert_eq(func(df[df.x < 5]), func(ddf[ddf.x < 5]))
def test_dropna():
df = pd.DataFrame(
{
"x": [np.nan, 2, 3, 4, np.nan, 6],
"y": [1, 2, np.nan, 4, np.nan, np.nan],
"z": [1, 2, 3, 4, np.nan, 6],
},
index=[10, 20, 30, 40, 50, 60],
)
ddf = dd.from_pandas(df, 3)
assert_eq(ddf.x.dropna(), df.x.dropna())
assert_eq(ddf.y.dropna(), df.y.dropna())
assert_eq(ddf.z.dropna(), df.z.dropna())
assert_eq(ddf.dropna(), df.dropna())
assert_eq(ddf.dropna(how="all"), df.dropna(how="all"))
assert_eq(ddf.dropna(subset=["x"]), df.dropna(subset=["x"]))
assert_eq(ddf.dropna(subset=["y", "z"]), df.dropna(subset=["y", "z"]))
assert_eq(
ddf.dropna(subset=["y", "z"], how="all"),
df.dropna(subset=["y", "z"], how="all"),
)
# threshold
assert_eq(df.dropna(thresh=None), df.loc[[20, 40]])
assert_eq(ddf.dropna(thresh=None), df.dropna(thresh=None))
assert_eq(df.dropna(thresh=0), df.loc[:])
assert_eq(ddf.dropna(thresh=0), df.dropna(thresh=0))
assert_eq(df.dropna(thresh=1), df.loc[[10, 20, 30, 40, 60]])
assert_eq(ddf.dropna(thresh=1), df.dropna(thresh=1))
assert_eq(df.dropna(thresh=2), df.loc[[10, 20, 30, 40, 60]])
assert_eq(ddf.dropna(thresh=2), df.dropna(thresh=2))
assert_eq(df.dropna(thresh=3), df.loc[[20, 40]])
assert_eq(ddf.dropna(thresh=3), df.dropna(thresh=3))
# Regression test for https://github.com/dask/dask/issues/6540
df = pd.DataFrame({"_0": [0, 0, np.nan], "_1": [1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(ddf.dropna(subset=["_0"]), df.dropna(subset=["_0"]))
@pytest.mark.parametrize("lower, upper", [(2, 5), (2.5, 3.5)])
def test_clip(lower, upper):
df = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [3, 5, 2, 5, 7, 2, 4, 2, 4]}
)
ddf = dd.from_pandas(df, 3)
s = pd.Series([1, 2, 3, 4, 5, 6, 7, 8, 9])
ds = dd.from_pandas(s, 3)
assert_eq(ddf.clip(lower=lower, upper=upper), df.clip(lower=lower, upper=upper))
assert_eq(ddf.clip(lower=lower), df.clip(lower=lower))
assert_eq(ddf.clip(upper=upper), df.clip(upper=upper))
assert_eq(ds.clip(lower=lower, upper=upper), s.clip(lower=lower, upper=upper))
assert_eq(ds.clip(lower=lower), s.clip(lower=lower))
assert_eq(ds.clip(upper=upper), s.clip(upper=upper))
def test_squeeze():
df = pd.DataFrame({"x": [1, 3, 6]})
df2 = pd.DataFrame({"x": [0]})
s = pd.Series({"test": 0, "b": 100})
ddf = dd.from_pandas(df, 3)
ddf2 = dd.from_pandas(df2, 3)
ds = dd.from_pandas(s, 2)
assert_eq(df.squeeze(), ddf.squeeze())
assert_eq(pd.Series([0], name="x"), ddf2.squeeze())
assert_eq(ds.squeeze(), s.squeeze())
with pytest.raises(NotImplementedError) as info:
ddf.squeeze(axis=0)
msg = f"{type(ddf)} does not support squeeze along axis 0"
assert msg in str(info.value)
with pytest.raises(ValueError) as info:
ddf.squeeze(axis=2)
msg = f"No axis {2} for object type {type(ddf)}"
assert msg in str(info.value)
with pytest.raises(ValueError) as info:
ddf.squeeze(axis="test")
msg = f"No axis test for object type {type(ddf)}"
assert msg in str(info.value)
def test_where_mask():
pdf1 = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [3, 5, 2, 5, 7, 2, 4, 2, 4]}
)
ddf1 = dd.from_pandas(pdf1, 2)
pdf2 = pd.DataFrame({"a": [True, False, True] * 3, "b": [False, False, True] * 3})
ddf2 = dd.from_pandas(pdf2, 2)
# different index
pdf3 = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [3, 5, 2, 5, 7, 2, 4, 2, 4]},
index=[0, 1, 2, 3, 4, 5, 6, 7, 8],
)
ddf3 = dd.from_pandas(pdf3, 2)
pdf4 = pd.DataFrame(
{"a": [True, False, True] * 3, "b": [False, False, True] * 3},
index=[5, 6, 7, 8, 9, 10, 11, 12, 13],
)
ddf4 = dd.from_pandas(pdf4, 2)
# different columns
pdf5 = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5, 6, 7, 8, 9],
"b": [9, 4, 2, 6, 2, 3, 1, 6, 2],
"c": [5, 6, 7, 8, 9, 10, 11, 12, 13],
},
index=[0, 1, 2, 3, 4, 5, 6, 7, 8],
)
ddf5 = dd.from_pandas(pdf5, 2)
pdf6 = pd.DataFrame(
{
"a": [True, False, True] * 3,
"b": [False, False, True] * 3,
"c": [False] * 9,
"d": [True] * 9,
},
index=[5, 6, 7, 8, 9, 10, 11, 12, 13],
)
ddf6 = dd.from_pandas(pdf6, 2)
cases = [
(ddf1, ddf2, pdf1, pdf2),
(ddf1.repartition([0, 3, 6, 8]), ddf2, pdf1, pdf2),
(ddf1, ddf4, pdf3, pdf4),
(ddf3.repartition([0, 4, 6, 8]), ddf4.repartition([5, 9, 10, 13]), pdf3, pdf4),
(ddf5, ddf6, pdf5, pdf6),
(ddf5.repartition([0, 4, 7, 8]), ddf6, pdf5, pdf6),
# use pd.DataFrame as cond
(ddf1, pdf2, pdf1, pdf2),
(ddf1, pdf4, pdf3, pdf4),
(ddf5, pdf6, pdf5, pdf6),
]
for ddf, ddcond, pdf, pdcond in cases:
assert isinstance(ddf, dd.DataFrame)
assert isinstance(ddcond, (dd.DataFrame, pd.DataFrame))
assert isinstance(pdf, pd.DataFrame)
assert isinstance(pdcond, pd.DataFrame)
assert_eq(ddf.where(ddcond), pdf.where(pdcond))
assert_eq(ddf.mask(ddcond), pdf.mask(pdcond))
assert_eq(ddf.where(ddcond, -ddf), pdf.where(pdcond, -pdf))
assert_eq(ddf.mask(ddcond, -ddf), pdf.mask(pdcond, -pdf))
assert_eq(ddf.where(ddcond.a, -ddf), pdf.where(pdcond.a, -pdf))
assert_eq(ddf.mask(ddcond.a, -ddf), pdf.mask(pdcond.a, -pdf))
assert_eq(ddf.a.where(ddcond.a), pdf.a.where(pdcond.a))
assert_eq(ddf.a.mask(ddcond.a), pdf.a.mask(pdcond.a))
assert_eq(ddf.a.where(ddcond.a, -ddf.a), pdf.a.where(pdcond.a, -pdf.a))
assert_eq(ddf.a.mask(ddcond.a, -ddf.a), pdf.a.mask(pdcond.a, -pdf.a))
def test_map_partitions_multi_argument():
assert_eq(dd.map_partitions(lambda a, b: a + b, d.a, d.b), full.a + full.b)
assert_eq(
dd.map_partitions(lambda a, b, c: a + b + c, d.a, d.b, 1), full.a + full.b + 1
)
def test_map_partitions():
assert_eq(d.map_partitions(lambda df: df, meta=d), full)
assert_eq(d.map_partitions(lambda df: df), full)
result = d.map_partitions(lambda df: df.sum(axis=1))
layer = hlg_layer(result.dask, "lambda-")
assert not layer.is_materialized(), layer
assert_eq(result, full.sum(axis=1))
assert_eq(
d.map_partitions(lambda df: 1),
pd.Series([1, 1, 1], dtype=np.int64),
check_divisions=False,
)
x = Scalar({("x", 0): 1}, "x", int)
result = dd.map_partitions(lambda x: 2, x)
assert result.dtype in (np.int32, np.int64) and result.compute() == 2
result = dd.map_partitions(lambda x: 4.0, x)
assert result.dtype == np.float64 and result.compute() == 4.0
def test_map_partitions_type():
result = d.map_partitions(type).compute(scheduler="single-threaded")
assert isinstance(result, pd.Series)
assert all(x == pd.DataFrame for x in result)
def test_map_partitions_partition_info():
def f(df, partition_info=None):
assert partition_info is not None
assert "number" in partition_info
assert "division" in partition_info
assert dsk[("x", partition_info["number"])].equals(df)
assert dsk[("x", d.divisions.index(partition_info["division"]))].equals(df)
return df
df = d.map_partitions(f, meta=d)
layer = hlg_layer(df.dask, "f-")
assert not layer.is_materialized()
df.dask.validate()
result = df.compute(scheduler="single-threaded")
assert type(result) == pd.DataFrame
def test_map_partitions_names():
func = lambda x: x
assert sorted(dd.map_partitions(func, d, meta=d).dask) == sorted(
dd.map_partitions(func, d, meta=d).dask
)
assert sorted(dd.map_partitions(lambda x: x, d, meta=d, token=1).dask) == sorted(
dd.map_partitions(lambda x: x, d, meta=d, token=1).dask
)
func = lambda x, y: x
assert sorted(dd.map_partitions(func, d, d, meta=d).dask) == sorted(
dd.map_partitions(func, d, d, meta=d).dask
)
def test_map_partitions_column_info():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [5, 6, 7, 8]})
a = dd.from_pandas(df, npartitions=2)
b = dd.map_partitions(lambda x: x, a, meta=a)
tm.assert_index_equal(b.columns, a.columns)
assert_eq(df, b)
b = dd.map_partitions(lambda x: x, a.x, meta=a.x)
assert b.name == a.x.name
assert_eq(df.x, b)
b = dd.map_partitions(lambda x: x, a.x, meta=a.x)
assert b.name == a.x.name
assert_eq(df.x, b)
b = dd.map_partitions(lambda df: df.x + df.y, a)
assert isinstance(b, dd.Series)
assert b.dtype == "i8"
b = dd.map_partitions(lambda df: df.x + 1, a, meta=("x", "i8"))
assert isinstance(b, dd.Series)
assert b.name == "x"
assert b.dtype == "i8"
def test_map_partitions_method_names():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [5, 6, 7, 8]})
a = dd.from_pandas(df, npartitions=2)
b = a.map_partitions(lambda x: x)
assert isinstance(b, dd.DataFrame)
tm.assert_index_equal(b.columns, a.columns)
b = a.map_partitions(lambda df: df.x + 1)
assert isinstance(b, dd.Series)
assert b.dtype == "i8"
b = a.map_partitions(lambda df: df.x + 1, meta=("x", "i8"))
assert isinstance(b, dd.Series)
assert b.name == "x"
assert b.dtype == "i8"
def test_map_partitions_propagates_index_metadata():
index = pd.Series(list("abcde"), name="myindex")
df = pd.DataFrame(
{"A": np.arange(5, dtype=np.int32), "B": np.arange(10, 15, dtype=np.int32)},
index=index,
)
ddf = dd.from_pandas(df, npartitions=2)
res = ddf.map_partitions(
lambda df: df.assign(C=df.A + df.B),
meta=[("A", "i4"), ("B", "i4"), ("C", "i4")],
)
sol = df.assign(C=df.A + df.B)
assert_eq(res, sol)
res = ddf.map_partitions(lambda df: df.rename_axis("newindex"))
sol = df.rename_axis("newindex")
assert_eq(res, sol)
@pytest.mark.xfail(reason="now we use SubgraphCallables")
def test_map_partitions_keeps_kwargs_readable():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [5, 6, 7, 8]})
a = dd.from_pandas(df, npartitions=2)
def f(s, x=1):
return s + x
b = a.x.map_partitions(f, x=5)
# NOTE: we'd like to ensure that we keep the keyword arguments readable
# in the dask graph
assert "['x', 5]" in str(dict(b.dask)) or "{'x': 5}" in str(dict(b.dask))
assert_eq(df.x + 5, b)
assert a.x.map_partitions(f, x=5)._name != a.x.map_partitions(f, x=6)._name
def test_map_partitions_with_delayed_collection():
# https://github.com/dask/dask/issues/5854
df = pd.DataFrame(columns=list("abcdefghijk"))
ddf = dd.from_pandas(df, 2)
ddf.dropna(subset=list("abcdefghijk")).compute()
# no error!
def test_metadata_inference_single_partition_aligned_args():
# https://github.com/dask/dask/issues/3034
# Previously broadcastable series functionality broke this
df = pd.DataFrame({"x": [1, 2, 3, 4, 5]})
ddf = dd.from_pandas(df, npartitions=1)
def check(df, df_x):
assert len(df) == len(df_x)
assert len(df) > 0
return df
res = dd.map_partitions(check, ddf, ddf.x)
assert_eq(res, ddf)
def test_align_dataframes():
df1 = pd.DataFrame({"A": [1, 2, 3, 3, 2, 3], "B": [1, 2, 3, 4, 5, 6]})
df2 = pd.DataFrame({"A": [3, 1, 2], "C": [1, 2, 3]})
def merge(a, b):
res = pd.merge(a, b, left_on="A", right_on="A", how="left")
return res
expected = merge(df1, df2)
ddf1 = dd.from_pandas(df1, npartitions=2)
actual = ddf1.map_partitions(merge, df2, align_dataframes=False)
assert_eq(actual, expected, check_index=False, check_divisions=False)
def test_drop_duplicates():
res = d.drop_duplicates()
res2 = d.drop_duplicates(split_every=2)
sol = full.drop_duplicates()
assert_eq(res, sol)
assert_eq(res2, sol)
assert res._name != res2._name
res = d.a.drop_duplicates()
res2 = d.a.drop_duplicates(split_every=2)
sol = full.a.drop_duplicates()
assert_eq(res, sol)
assert_eq(res2, sol)
assert res._name != res2._name
res = d.index.drop_duplicates()
res2 = d.index.drop_duplicates(split_every=2)
sol = full.index.drop_duplicates()
assert_eq(res, sol)
assert_eq(res2, sol)
assert res._name != res2._name
with pytest.raises(NotImplementedError):
d.drop_duplicates(keep=False)
def test_drop_duplicates_subset():
df = pd.DataFrame({"x": [1, 2, 3, 1, 2, 3], "y": ["a", "a", "b", "b", "c", "c"]})
ddf = dd.from_pandas(df, npartitions=2)
for kwarg in [{"keep": "first"}, {"keep": "last"}]:
assert_eq(df.x.drop_duplicates(**kwarg), ddf.x.drop_duplicates(**kwarg))
for ss in [["x"], "y", ["x", "y"]]:
assert_eq(
df.drop_duplicates(subset=ss, **kwarg),
ddf.drop_duplicates(subset=ss, **kwarg),
)
assert_eq(df.drop_duplicates(ss, **kwarg), ddf.drop_duplicates(ss, **kwarg))
def test_get_partition():
pdf = pd.DataFrame(np.random.randn(10, 5), columns=list("abcde"))
ddf = dd.from_pandas(pdf, 3)
assert ddf.divisions == (0, 4, 8, 9)
# DataFrame
div1 = ddf.get_partition(0)
assert isinstance(div1, dd.DataFrame)
assert_eq(div1, pdf.loc[0:3])
div2 = ddf.get_partition(1)
assert_eq(div2, pdf.loc[4:7])
div3 = ddf.get_partition(2)
assert_eq(div3, pdf.loc[8:9])
assert len(div1) + len(div2) + len(div3) == len(pdf)
# Series
div1 = ddf.a.get_partition(0)
assert isinstance(div1, dd.Series)
assert_eq(div1, pdf.a.loc[0:3])
div2 = ddf.a.get_partition(1)
assert_eq(div2, pdf.a.loc[4:7])
div3 = ddf.a.get_partition(2)
assert_eq(div3, pdf.a.loc[8:9])
assert len(div1) + len(div2) + len(div3) == len(pdf.a)
with pytest.raises(ValueError):
ddf.get_partition(-1)
with pytest.raises(ValueError):
ddf.get_partition(3)
def test_ndim():
assert d.ndim == 2
assert d.a.ndim == 1
assert d.index.ndim == 1
def test_dtype():
assert (d.dtypes == full.dtypes).all()
def test_value_counts():
df = pd.DataFrame({"x": [1, 2, 1, 3, 3, 1, 4]})
ddf = dd.from_pandas(df, npartitions=3)
result = ddf.x.value_counts()
expected = df.x.value_counts()
assert_eq(result, expected)
result2 = ddf.x.value_counts(split_every=2)
assert_eq(result2, expected)
assert result._name != result2._name
def test_value_counts_not_sorted():
df = pd.DataFrame({"x": [1, 2, 1, 3, 3, 1, 4]})
ddf = dd.from_pandas(df, npartitions=3)
result = ddf.x.value_counts(sort=False)
expected = df.x.value_counts(sort=False)
assert_eq(result, expected)
result2 = ddf.x.value_counts(split_every=2)
assert_eq(result2, expected)
assert result._name != result2._name
def test_value_counts_with_dropna():
df = pd.DataFrame({"x": [1, 2, 1, 3, np.nan, 1, 4]})
ddf = dd.from_pandas(df, npartitions=3)
if not PANDAS_GT_110:
with pytest.raises(NotImplementedError, match="dropna is not a valid argument"):
ddf.x.value_counts(dropna=False)
return
result = ddf.x.value_counts(dropna=False)
expected = df.x.value_counts(dropna=False)
assert_eq(result, expected)
result2 = ddf.x.value_counts(split_every=2, dropna=False)
assert_eq(result2, expected)
assert result._name != result2._name
def test_value_counts_with_normalize():
df = pd.DataFrame({"x": [1, 2, 1, 3, 3, 1, 4]})
ddf = dd.from_pandas(df, npartitions=3)
result = ddf.x.value_counts(normalize=True)
expected = df.x.value_counts(normalize=True)
assert_eq(result, expected)
result2 = ddf.x.value_counts(split_every=2, normalize=True)
assert_eq(result2, expected)
assert result._name != result2._name
result3 = ddf.x.value_counts(split_out=2, normalize=True)
assert_eq(result3, expected)
assert result._name != result3._name
@pytest.mark.skipif(not PANDAS_GT_110, reason="dropna implemented in pandas 1.1.0")
def test_value_counts_with_normalize_and_dropna():
df = pd.DataFrame({"x": [1, 2, 1, 3, np.nan, 1, 4]})
ddf = dd.from_pandas(df, npartitions=3)
result = ddf.x.value_counts(dropna=False, normalize=True)
expected = df.x.value_counts(dropna=False, normalize=True)
assert_eq(result, expected)
result2 = ddf.x.value_counts(split_every=2, dropna=False, normalize=True)
assert_eq(result2, expected)
assert result._name != result2._name
result3 = ddf.x.value_counts(split_out=2, dropna=False, normalize=True)
assert_eq(result3, expected)
assert result._name != result3._name
result4 = ddf.x.value_counts(dropna=True, normalize=True, split_out=2)
expected4 = df.x.value_counts(dropna=True, normalize=True)
assert_eq(result4, expected4)
def test_unique():
pdf = pd.DataFrame(
{
"x": [1, 2, 1, 3, 3, 1, 4, 2, 3, 1],
"y": ["a", "c", "b", np.nan, "c", "b", "a", "d", np.nan, "a"],
}
)
ddf = dd.from_pandas(pdf, npartitions=3)
assert_eq(ddf.x.unique(), pd.Series(pdf.x.unique(), name="x"))
assert_eq(ddf.y.unique(), pd.Series(pdf.y.unique(), name="y"))
assert_eq(ddf.x.unique(split_every=2), pd.Series(pdf.x.unique(), name="x"))
assert_eq(ddf.y.unique(split_every=2), pd.Series(pdf.y.unique(), name="y"))
assert_eq(ddf.index.unique(), pdf.index.unique())
assert ddf.x.unique(split_every=2)._name != ddf.x.unique()._name
def test_isin():
f_list = [1, 2, 3]
f_series = pd.Series(f_list)
f_dict = {"a": [0, 3], "b": [1, 2]}
# Series
assert_eq(d.a.isin(f_list), full.a.isin(f_list))
assert_eq(d.a.isin(f_series), full.a.isin(f_series))
with pytest.raises(NotImplementedError):
d.a.isin(d.a)
# Index
da.utils.assert_eq(d.index.isin(f_list), full.index.isin(f_list))
da.utils.assert_eq(d.index.isin(f_series), full.index.isin(f_series))
with pytest.raises(NotImplementedError):
d.a.isin(d.a)
# DataFrame test
assert_eq(d.isin(f_list), full.isin(f_list))
assert_eq(d.isin(f_dict), full.isin(f_dict))
for obj in [d, f_series, full]:
with pytest.raises(NotImplementedError):
d.isin(obj)
def test_contains_frame():
df = dd.from_pandas(pd.DataFrame({"A": [1, 2], 0: [3, 4]}), 1)
assert "A" in df
assert 0 in df
assert "B" not in df
assert 1 not in df
def test_len():
assert len(d) == len(full)
assert len(d.a) == len(full.a)
assert len(dd.from_pandas(pd.DataFrame(), npartitions=1)) == 0
assert len(dd.from_pandas(pd.DataFrame(columns=[1, 2]), npartitions=1)) == 0
# Regression test for https://github.com/dask/dask/issues/6110
assert len(dd.from_pandas(pd.DataFrame(columns=["foo", "foo"]), npartitions=1)) == 0
def test_size():
assert_eq(d.size, full.size)
assert_eq(d.a.size, full.a.size)
assert_eq(d.index.size, full.index.size)
def test_shape():
result = d.shape
assert_eq((result[0].compute(), result[1]), (len(full), len(full.columns)))
assert_eq(dd.compute(result)[0], (len(full), len(full.columns)))
result = d.a.shape
assert_eq(result[0].compute(), len(full.a))
assert_eq(dd.compute(result)[0], (len(full.a),))
sh = dd.from_pandas(pd.DataFrame(index=[1, 2, 3]), npartitions=2).shape
assert (sh[0].compute(), sh[1]) == (3, 0)
sh = dd.from_pandas(pd.DataFrame({"a": [], "b": []}, index=[]), npartitions=1).shape
assert (sh[0].compute(), sh[1]) == (0, 2)
def test_nbytes():
assert_eq(d.a.nbytes, full.a.nbytes)
assert_eq(d.index.nbytes, full.index.nbytes)
@pytest.mark.parametrize(
"method,expected",
[("tdigest", (0.35, 3.80, 2.5, 6.5, 2.0)), ("dask", (0.0, 4.0, 1.2, 6.2, 2.0))],
)
def test_quantile(method, expected):
if method == "tdigest":
pytest.importorskip("crick")
# series / multiple
result = d.b.quantile([0.3, 0.7], method=method)
exp = full.b.quantile([0.3, 0.7]) # result may different
assert len(result) == 2
assert result.divisions == (0.3, 0.7)
assert_eq(result.index, exp.index)
assert isinstance(result, dd.Series)
result = result.compute()
assert isinstance(result, pd.Series)
assert result.iloc[0] == pytest.approx(expected[0])
assert result.iloc[1] == pytest.approx(expected[1])
# index
s = pd.Series(np.arange(10), index=np.arange(10))
ds = dd.from_pandas(s, 2)
result = ds.index.quantile([0.3, 0.7], method=method)
exp = s.quantile([0.3, 0.7])
assert len(result) == 2
assert result.divisions == (0.3, 0.7)
assert_eq(result.index, exp.index)
assert isinstance(result, dd.Series)
result = result.compute()
assert isinstance(result, pd.Series)
assert result.iloc[0] == pytest.approx(expected[2])
assert result.iloc[1] == pytest.approx(expected[3])
# series / single
result = d.b.quantile(0.5, method=method)
assert isinstance(result, dd.core.Scalar)
result = result.compute()
assert result == expected[4]
@pytest.mark.parametrize("method", ["tdigest", "dask"])
def test_quantile_missing(method):
if method == "tdigest":
pytest.importorskip("crick")
df = pd.DataFrame({"A": [0, np.nan, 2]})
ddf = dd.from_pandas(df, 2)
expected = df.quantile()
result = ddf.quantile(method=method)
assert_eq(result, expected)
expected = df.A.quantile()
result = ddf.A.quantile(method=method)
assert_eq(result, expected)
@pytest.mark.parametrize("method", ["tdigest", "dask"])
def test_empty_quantile(method):
if method == "tdigest":
pytest.importorskip("crick")
result = d.b.quantile([], method=method)
exp = full.b.quantile([])
assert result.divisions == (None, None)
assert result.name == "b"
assert result.compute().name == "b"
assert_eq(result, exp)
@pytest.mark.parametrize(
"method,expected",
[
(
"tdigest",
(
pd.Series([9.5, 29.5, 19.5], index=["A", "X", "B"]),
pd.DataFrame(
[[4.5, 24.5, 14.5], [14.5, 34.5, 24.5]],
index=[0.25, 0.75],
columns=["A", "X", "B"],
),
),
),
(
"dask",
(
pd.Series([7.0, 27.0, 17.0], index=["A", "X", "B"]),
pd.DataFrame(
[[1.50, 21.50, 11.50], [14.0, 34.0, 24.0]],
index=[0.25, 0.75],
columns=["A", "X", "B"],
),
),
),
],
)
def test_dataframe_quantile(method, expected):
if method == "tdigest":
pytest.importorskip("crick")
# column X is for test column order and result division
df = pd.DataFrame(
{
"A": np.arange(20),
"X": np.arange(20, 40),
"B": np.arange(10, 30),
"C": ["a", "b", "c", "d"] * 5,
},
columns=["A", "X", "B", "C"],
)
ddf = dd.from_pandas(df, 3)
result = ddf.quantile(method=method)
assert result.npartitions == 1
assert result.divisions == ("A", "X")
result = result.compute()
assert isinstance(result, pd.Series)
assert result.name == 0.5
tm.assert_index_equal(result.index, pd.Index(["A", "X", "B"]))
assert (result == expected[0]).all()
result = ddf.quantile([0.25, 0.75], method=method)
assert result.npartitions == 1
assert result.divisions == (0.25, 0.75)
result = result.compute()
assert isinstance(result, pd.DataFrame)
tm.assert_index_equal(result.index, pd.Index([0.25, 0.75]))
tm.assert_index_equal(result.columns, pd.Index(["A", "X", "B"]))
assert (result == expected[1]).all().all()
assert_eq(ddf.quantile(axis=1, method=method), df.quantile(axis=1))
pytest.raises(ValueError, lambda: ddf.quantile([0.25, 0.75], axis=1, method=method))
def test_quantile_for_possibly_unsorted_q():
"""check that quantile is giving correct answers even when quantile parameter, q, may be unsorted.
See https://github.com/dask/dask/issues/4642.
"""
# prepare test case where percentiles should equal values
A = da.arange(0, 101)
ds = dd.from_dask_array(A)
for q in [
[0.25, 0.50, 0.75],
[0.25, 0.50, 0.75, 0.99],
[0.75, 0.5, 0.25],
[0.25, 0.99, 0.75, 0.50],
]:
r = ds.quantile(q).compute()
assert_eq(r.loc[0.25], 25.0)
assert_eq(r.loc[0.50], 50.0)
assert_eq(r.loc[0.75], 75.0)
r = ds.quantile([0.25]).compute()
assert_eq(r.loc[0.25], 25.0)
r = ds.quantile(0.25).compute()
assert_eq(r, 25.0)
def test_quantile_tiny_partitions():
"""See https://github.com/dask/dask/issues/6551"""
df = pd.DataFrame({"a": [1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=3)
r = ddf["a"].quantile(0.5).compute()
assert r == 2
def test_quantile_trivial_partitions():
"""See https://github.com/dask/dask/issues/2792"""
df = pd.DataFrame({"A": []})
ddf = dd.from_pandas(df, npartitions=2)
expected = df.quantile(0.5)
assert_eq(ddf.quantile(0.5), expected)
df = pd.DataFrame({"A": [np.nan, np.nan, np.nan, np.nan]})
ddf = dd.from_pandas(df, npartitions=2)
expected = df.quantile(0.5)
assert_eq(ddf.quantile(0.5), expected)
def test_index():
assert_eq(d.index, full.index)
def test_assign():
df = pd.DataFrame(
{"a": range(8), "b": [float(i) for i in range(10, 18)]},
index=pd.Index(list("abcdefgh")),
)
ddf = dd.from_pandas(df, npartitions=3)
ddf_unknown = dd.from_pandas(df, npartitions=3, sort=False)
assert not ddf_unknown.known_divisions
res = ddf.assign(
c=1,
d="string",
e=ddf.a.sum(),
f=ddf.a + ddf.b,
g=lambda x: x.a + x.c,
dt=pd.Timestamp(2018, 2, 13),
)
res_unknown = ddf_unknown.assign(
c=1,
d="string",
e=ddf_unknown.a.sum(),
f=ddf_unknown.a + ddf_unknown.b,
g=lambda x: x.a + x.c,
dt=pd.Timestamp(2018, 2, 13),
)
sol = df.assign(
c=1,
d="string",
e=df.a.sum(),
f=df.a + df.b,
g=lambda x: x.a + x.c,
dt=pd.Timestamp(2018, 2, 13),
)
assert_eq(res, sol)
assert_eq(res_unknown, sol)
res = ddf.assign(c=df.a + 1)
assert_eq(res, df.assign(c=df.a + 1))
res = ddf.assign(c=ddf.index)
assert_eq(res, df.assign(c=df.index))
# divisions unknown won't work with pandas
with pytest.raises(ValueError):
ddf_unknown.assign(c=df.a + 1)
# unsupported type
with pytest.raises(TypeError):
ddf.assign(c=list(range(9)))
# Fails when assigning known divisions to unknown divisions
with pytest.raises(ValueError):
ddf_unknown.assign(foo=ddf.a)
# Fails when assigning unknown divisions to known divisions
with pytest.raises(ValueError):
ddf.assign(foo=ddf_unknown.a)
df = pd.DataFrame({"A": [1, 2]})
df.assign(B=lambda df: df["A"], C=lambda df: df.A + df.B)
ddf = dd.from_pandas(pd.DataFrame({"A": [1, 2]}), npartitions=2)
ddf.assign(B=lambda df: df["A"], C=lambda df: df.A + df.B)
assert_eq(df, ddf)
def test_assign_callable():
df = dd.from_pandas(pd.DataFrame({"A": range(10)}), npartitions=2)
a = df.assign(B=df.A.shift())
b = df.assign(B=lambda x: x.A.shift())
assert_eq(a, b)
def test_assign_dtypes():
ddf = dd.from_pandas(
pd.DataFrame(
data={"col1": ["a", "b"], "col2": [1, 2]}, columns=["col1", "col2"]
),
npartitions=2,
)
new_col = {"col3": pd.Series(["0", "1"])}
res = ddf.assign(**new_col)
assert_eq(
res.dtypes,
pd.Series(data=["object", "int64", "object"], index=["col1", "col2", "col3"]),
)
def test_map():
df = pd.DataFrame(
{"a": range(9), "b": [4, 5, 6, 1, 2, 3, 0, 0, 0]},
index=pd.Index([0, 1, 3, 5, 6, 8, 9, 9, 9], name="myindex"),
)
ddf = dd.from_pandas(df, npartitions=3)
assert_eq(ddf.a.map(lambda x: x + 1), df.a.map(lambda x: x + 1))
lk = {v: v + 1 for v in df.a.values}
assert_eq(ddf.a.map(lk), df.a.map(lk))
assert_eq(ddf.b.map(lk), df.b.map(lk))
lk = pd.Series(lk)
assert_eq(ddf.a.map(lk), df.a.map(lk))
assert_eq(ddf.b.map(lk), df.b.map(lk))
assert_eq(ddf.b.map(lk, meta=ddf.b), df.b.map(lk))
assert_eq(ddf.b.map(lk, meta=("b", "i8")), df.b.map(lk))
def test_concat():
x = _concat([pd.DataFrame(columns=["a", "b"]), pd.DataFrame(columns=["a", "b"])])
assert list(x.columns) == ["a", "b"]
assert len(x) == 0
def test_args():
e = d.assign(c=d.a + 1)
f = type(e)(*e._args)
assert_eq(e, f)
assert_eq(d.a, type(d.a)(*d.a._args))
assert_eq(d.a.sum(), type(d.a.sum())(*d.a.sum()._args))
def test_known_divisions():
assert d.known_divisions
df = dd.DataFrame(dsk, "x", meta, divisions=[None, None, None])
assert not df.known_divisions
def test_unknown_divisions():
dsk = {
("x", 0): pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}),
("x", 1): pd.DataFrame({"a": [4, 5, 6], "b": [3, 2, 1]}),
("x", 2): pd.DataFrame({"a": [7, 8, 9], "b": [0, 0, 0]}),
}
meta = make_meta({"a": "i8", "b": "i8"}, parent_meta=pd.DataFrame())
d = dd.DataFrame(dsk, "x", meta, [None, None, None, None])
full = d.compute(scheduler="sync")
assert_eq(d.a.sum(), full.a.sum())
assert_eq(d.a + d.b + 1, full.a + full.b + 1)
def test_with_min_count():
dfs = [
pd.DataFrame([[None, 2, 3], [None, 5, 6], [5, 4, 9]]),
pd.DataFrame([[2, None, None], [None, 5, 6], [5, 4, 9]]),
]
ddfs = [dd.from_pandas(df, npartitions=4) for df in dfs]
axes = [0, 1]
for df, ddf in zip(dfs, ddfs):
for axis in axes:
for min_count in [0, 1, 2, 3]:
assert_eq(
df.sum(min_count=min_count, axis=axis),
ddf.sum(min_count=min_count, axis=axis),
)
assert_eq(
df.prod(min_count=min_count, axis=axis),
ddf.prod(min_count=min_count, axis=axis),
)
@pytest.mark.parametrize("join", ["inner", "outer", "left", "right"])
def test_align(join):
df1a = pd.DataFrame(
{"A": np.random.randn(10), "B": np.random.randn(10)},
index=[1, 12, 5, 6, 3, 9, 10, 4, 13, 11],
)
df1b = pd.DataFrame(
{"A": np.random.randn(10), "B": np.random.randn(10)},
index=[0, 3, 2, 10, 5, 6, 7, 8, 12, 13],
)
ddf1a = dd.from_pandas(df1a, 3)
ddf1b = dd.from_pandas(df1b, 3)
# DataFrame
res1, res2 = ddf1a.align(ddf1b, join=join)
exp1, exp2 = df1a.align(df1b, join=join)
assert assert_eq(res1, exp1)
assert assert_eq(res2, exp2)
# Series
res1, res2 = ddf1a["A"].align(ddf1b["B"], join=join)
exp1, exp2 = df1a["A"].align(df1b["B"], join=join)
assert assert_eq(res1, exp1)
assert assert_eq(res2, exp2)
# DataFrame with fill_value
res1, res2 = ddf1a.align(ddf1b, join=join, fill_value=1)
exp1, exp2 = df1a.align(df1b, join=join, fill_value=1)
assert assert_eq(res1, exp1)
assert assert_eq(res2, exp2)
# Series
res1, res2 = ddf1a["A"].align(ddf1b["B"], join=join, fill_value=1)
exp1, exp2 = df1a["A"].align(df1b["B"], join=join, fill_value=1)
assert assert_eq(res1, exp1)
assert assert_eq(res2, exp2)
@pytest.mark.parametrize("join", ["inner", "outer", "left", "right"])
def test_align_axis(join):
df1a = pd.DataFrame(
{"A": np.random.randn(10), "B": np.random.randn(10), "C": np.random.randn(10)},
index=[1, 12, 5, 6, 3, 9, 10, 4, 13, 11],
)
df1b = pd.DataFrame(
{"B": np.random.randn(10), "C": np.random.randn(10), "D": np.random.randn(10)},
index=[0, 3, 2, 10, 5, 6, 7, 8, 12, 13],
)
ddf1a = dd.from_pandas(df1a, 3)
ddf1b = dd.from_pandas(df1b, 3)
res1, res2 = ddf1a.align(ddf1b, join=join, axis=0)
exp1, exp2 = df1a.align(df1b, join=join, axis=0)
assert assert_eq(res1, exp1)
assert assert_eq(res2, exp2)
res1, res2 = ddf1a.align(ddf1b, join=join, axis=1)
exp1, exp2 = df1a.align(df1b, join=join, axis=1)
assert assert_eq(res1, exp1)
assert assert_eq(res2, exp2)
res1, res2 = ddf1a.align(ddf1b, join=join, axis="index")
exp1, exp2 = df1a.align(df1b, join=join, axis="index")
assert assert_eq(res1, exp1)
assert assert_eq(res2, exp2)
res1, res2 = ddf1a.align(ddf1b, join=join, axis="columns")
exp1, exp2 = df1a.align(df1b, join=join, axis="columns")
assert assert_eq(res1, exp1)
assert assert_eq(res2, exp2)
# invalid
with pytest.raises(ValueError):
ddf1a.align(ddf1b, join=join, axis="XXX")
with pytest.raises(ValueError):
ddf1a["A"].align(ddf1b["B"], join=join, axis=1)
def test_combine():
df1 = pd.DataFrame(
{
"A": np.random.choice([1, 2, np.nan], 100),
"B": np.random.choice(["a", "b", "nan"], 100),
}
)
df2 = pd.DataFrame(
{
"A": np.random.choice([1, 2, 3], 100),
"B": np.random.choice(["a", "b", "c"], 100),
}
)
ddf1 = dd.from_pandas(df1, 4)
ddf2 = dd.from_pandas(df2, 5)
first = lambda a, b: a
# You can add series with strings and nans but you can't add scalars 'a' + np.NaN
str_add = lambda a, b: a + b if a is not np.nan else a
# DataFrame
for dda, ddb, a, b, runs in [
(ddf1, ddf2, df1, df2, [(add, None), (first, None)]),
(ddf1.A, ddf2.A, df1.A, df2.A, [(add, None), (add, 100), (first, None)]),
(
ddf1.B,
ddf2.B,
df1.B,
df2.B,
[(str_add, None), (str_add, "d"), (first, None)],
),
]:
for func, fill_value in runs:
sol = a.combine(b, func, fill_value=fill_value)
assert_eq(dda.combine(ddb, func, fill_value=fill_value), sol)
assert_eq(dda.combine(b, func, fill_value=fill_value), sol)
assert_eq(
ddf1.combine(ddf2, add, overwrite=False), df1.combine(df2, add, overwrite=False)
)
assert dda.combine(ddb, add)._name == dda.combine(ddb, add)._name
def test_combine_first():
df1 = pd.DataFrame(
{
"A": np.random.choice([1, 2, np.nan], 100),
"B": np.random.choice(["a", "b", "nan"], 100),
}
)
df2 = pd.DataFrame(
{
"A": np.random.choice([1, 2, 3], 100),
"B": np.random.choice(["a", "b", "c"], 100),
}
)
ddf1 = dd.from_pandas(df1, 4)
ddf2 = dd.from_pandas(df2, 5)
# DataFrame
assert_eq(ddf1.combine_first(ddf2), df1.combine_first(df2))
assert_eq(ddf1.combine_first(df2), df1.combine_first(df2))
# Series
assert_eq(ddf1.A.combine_first(ddf2.A), df1.A.combine_first(df2.A))
assert_eq(ddf1.A.combine_first(df2.A), df1.A.combine_first(df2.A))
assert_eq(ddf1.B.combine_first(ddf2.B), df1.B.combine_first(df2.B))
assert_eq(ddf1.B.combine_first(df2.B), df1.B.combine_first(df2.B))
def test_dataframe_picklable():
from pickle import dumps, loads
from cloudpickle import dumps as cp_dumps
from cloudpickle import loads as cp_loads
d = _compat.makeTimeDataFrame()
df = dd.from_pandas(d, npartitions=3)
df = df + 2
# dataframe
df2 = loads(dumps(df))
assert_eq(df, df2)
df2 = cp_loads(cp_dumps(df))
assert_eq(df, df2)
# series
a2 = loads(dumps(df.A))
assert_eq(df.A, a2)
a2 = cp_loads(cp_dumps(df.A))
assert_eq(df.A, a2)
# index
i2 = loads(dumps(df.index))
assert_eq(df.index, i2)
i2 = cp_loads(cp_dumps(df.index))
assert_eq(df.index, i2)
# scalar
# lambdas are present, so only test cloudpickle
s = df.A.sum()
s2 = cp_loads(cp_dumps(s))
assert_eq(s, s2)
def test_random_partitions():
a, b = d.random_split([0.5, 0.5], 42)
assert isinstance(a, dd.DataFrame)
assert isinstance(b, dd.DataFrame)
assert a._name != b._name
np.testing.assert_array_equal(a.index, sorted(a.index))
assert len(a.compute()) + len(b.compute()) == len(full)
a2, b2 = d.random_split([0.5, 0.5], 42)
assert a2._name == a._name
assert b2._name == b._name
a, b = d.random_split([0.5, 0.5], 42, True)
a2, b2 = d.random_split([0.5, 0.5], 42, True)
assert_eq(a, a2)
assert_eq(b, b2)
with pytest.raises(AssertionError):
np.testing.assert_array_equal(a.index, sorted(a.index))
parts = d.random_split([0.4, 0.5, 0.1], 42)
names = {p._name for p in parts}
names.update([a._name, b._name])
assert len(names) == 5
with pytest.raises(ValueError):
d.random_split([0.4, 0.5], 42)
def test_series_round():
ps = pd.Series([1.123, 2.123, 3.123, 1.234, 2.234, 3.234], name="a")
s = dd.from_pandas(ps, npartitions=3)
assert_eq(s.round(), ps.round())
@pytest.mark.slow
def test_repartition():
def _check_split_data(orig, d):
"""Check data is split properly"""
keys = [k for k in d.dask if k[0].startswith("repartition-split")]
keys = sorted(keys)
sp = pd.concat(
[compute_as_if_collection(dd.DataFrame, d.dask, k) for k in keys]
)
assert_eq(orig, sp)
assert_eq(orig, d)
df = pd.DataFrame(
{"x": [1, 2, 3, 4, 5, 6], "y": list("abdabd")}, index=[10, 20, 30, 40, 50, 60]
)
a = dd.from_pandas(df, 2)
b = a.repartition(divisions=[10, 20, 50, 60])
assert b.divisions == (10, 20, 50, 60)
assert_eq(a, b)
assert_eq(compute_as_if_collection(dd.DataFrame, b.dask, (b._name, 0)), df.iloc[:1])
for div in [
[20, 60],
[10, 50],
[1], # first / last element mismatch
[0, 60],
[10, 70], # do not allow to expand divisions by default
[10, 50, 20, 60], # not sorted
[10, 10, 20, 60],
]: # not unique (last element can be duplicated)
pytest.raises(ValueError, lambda: a.repartition(divisions=div))
pdf = pd.DataFrame(np.random.randn(7, 5), columns=list("abxyz"))
for p in range(1, 7):
ddf = dd.from_pandas(pdf, p)
assert_eq(ddf, pdf)
for div in [
[0, 6],
[0, 6, 6],
[0, 5, 6],
[0, 4, 6, 6],
[0, 2, 6],
[0, 2, 6, 6],
[0, 2, 3, 6, 6],
[0, 1, 2, 3, 4, 5, 6, 6],
]:
rddf = ddf.repartition(divisions=div)
_check_split_data(ddf, rddf)
assert rddf.divisions == tuple(div)
assert_eq(pdf, rddf)
rds = ddf.x.repartition(divisions=div)
_check_split_data(ddf.x, rds)
assert rds.divisions == tuple(div)
assert_eq(pdf.x, rds)
# expand divisions
for div in [[-5, 10], [-2, 3, 5, 6], [0, 4, 5, 9, 10]]:
rddf = ddf.repartition(divisions=div, force=True)
_check_split_data(ddf, rddf)
assert rddf.divisions == tuple(div)
assert_eq(pdf, rddf)
rds = ddf.x.repartition(divisions=div, force=True)
_check_split_data(ddf.x, rds)
assert rds.divisions == tuple(div)
assert_eq(pdf.x, rds)
pdf = pd.DataFrame(
{"x": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], "y": [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]},
index=list("abcdefghij"),
)
for p in range(1, 7):
ddf = dd.from_pandas(pdf, p)
assert_eq(ddf, pdf)
for div in [
list("aj"),
list("ajj"),
list("adj"),
list("abfj"),
list("ahjj"),
list("acdj"),
list("adfij"),
list("abdefgij"),
list("abcdefghij"),
]:
rddf = ddf.repartition(divisions=div)
_check_split_data(ddf, rddf)
assert rddf.divisions == tuple(div)
assert_eq(pdf, rddf)
rds = ddf.x.repartition(divisions=div)
_check_split_data(ddf.x, rds)
assert rds.divisions == tuple(div)
assert_eq(pdf.x, rds)
# expand divisions
for div in [list("Yadijm"), list("acmrxz"), list("Yajz")]:
rddf = ddf.repartition(divisions=div, force=True)
_check_split_data(ddf, rddf)
assert rddf.divisions == tuple(div)
assert_eq(pdf, rddf)
rds = ddf.x.repartition(divisions=div, force=True)
_check_split_data(ddf.x, rds)
assert rds.divisions == tuple(div)
assert_eq(pdf.x, rds)
def test_repartition_divisions():
result = repartition_divisions([0, 6], [0, 6, 6], "a", "b", "c")
assert result == {
("b", 0): (methods.boundary_slice, ("a", 0), 0, 6, False),
("b", 1): (methods.boundary_slice, ("a", 0), 6, 6, True),
("c", 0): ("b", 0),
("c", 1): ("b", 1),
}
result = repartition_divisions([1, 3, 7], [1, 4, 6, 7], "a", "b", "c")
assert result == {
("b", 0): (methods.boundary_slice, ("a", 0), 1, 3, False),
("b", 1): (methods.boundary_slice, ("a", 1), 3, 4, False),
("b", 2): (methods.boundary_slice, ("a", 1), 4, 6, False),
("b", 3): (methods.boundary_slice, ("a", 1), 6, 7, True),
("c", 0): (methods.concat, [("b", 0), ("b", 1)]),
("c", 1): ("b", 2),
("c", 2): ("b", 3),
}
def test_repartition_on_pandas_dataframe():
df = pd.DataFrame(
{"x": [1, 2, 3, 4, 5, 6], "y": list("abdabd")}, index=[10, 20, 30, 40, 50, 60]
)
ddf = dd.repartition(df, divisions=[10, 20, 50, 60])
assert isinstance(ddf, dd.DataFrame)
assert ddf.divisions == (10, 20, 50, 60)
assert_eq(ddf, df)
ddf = dd.repartition(df.y, divisions=[10, 20, 50, 60])
assert isinstance(ddf, dd.Series)
assert ddf.divisions == (10, 20, 50, 60)
assert_eq(ddf, df.y)
@pytest.mark.parametrize("use_index", [True, False])
@pytest.mark.parametrize("n", [1, 2, 4, 5])
@pytest.mark.parametrize("k", [1, 2, 4, 5])
@pytest.mark.parametrize("dtype", [float, "M8[ns]"])
@pytest.mark.parametrize("transform", [lambda df: df, lambda df: df.x])
def test_repartition_npartitions(use_index, n, k, dtype, transform):
df = pd.DataFrame(
{"x": [1, 2, 3, 4, 5, 6] * 10, "y": list("abdabd") * 10},
index=pd.Series([1, 2, 3, 4, 5, 6] * 10, dtype=dtype),
)
df = transform(df)
a = dd.from_pandas(df, npartitions=n, sort=use_index)
b = a.repartition(k)
assert_eq(a, b)
assert b.npartitions == k
parts = dask.get(b.dask, b.__dask_keys__())
assert all(map(len, parts))
@pytest.mark.parametrize("use_index", [True, False])
@pytest.mark.parametrize("n", [2, 5])
@pytest.mark.parametrize("partition_size", ["1kiB", 379])
@pytest.mark.parametrize("transform", [lambda df: df, lambda df: df.x])
def test_repartition_partition_size(use_index, n, partition_size, transform):
df = pd.DataFrame(
{"x": [1, 2, 3, 4, 5, 6] * 10, "y": list("abdabd") * 10},
index=pd.Series([10, 20, 30, 40, 50, 60] * 10),
)
df = transform(df)
a = dd.from_pandas(df, npartitions=n, sort=use_index)
b = a.repartition(partition_size=partition_size)
assert_eq(a, b, check_divisions=False)
assert np.alltrue(b.map_partitions(total_mem_usage, deep=True).compute() <= 1024)
parts = dask.get(b.dask, b.__dask_keys__())
assert all(map(len, parts))
def test_repartition_partition_size_arg():
df = pd.DataFrame({"x": range(10)})
a = dd.from_pandas(df, npartitions=2)
b = a.repartition("1 MiB")
assert b.npartitions == 1
def test_repartition_npartitions_same_limits():
df = pd.DataFrame(
{"x": [1, 2, 3]},
index=[
pd.Timestamp("2017-05-09 00:00:00.006000"),
pd.Timestamp("2017-05-09 02:45:00.017999"),
pd.Timestamp("2017-05-09 05:59:58.938999"),
],
)
ddf = dd.from_pandas(df, npartitions=2)
ddf.repartition(npartitions=10)
def test_repartition_npartitions_numeric_edge_case():
"""
Test that we cover numeric edge cases when
int(ddf.npartitions / npartitions) * npartitions) != ddf.npartitions
"""
df = pd.DataFrame({"x": range(100)})
a = dd.from_pandas(df, npartitions=15)
assert a.npartitions == 15
b = a.repartition(npartitions=11)
assert_eq(a, b)
def test_repartition_object_index():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6] * 10}, index=list("abdabd") * 10)
a = dd.from_pandas(df, npartitions=5)
b = a.repartition(npartitions=2)
assert b.npartitions == 2
assert_eq(b, df)
b = a.repartition(npartitions=10)
assert b.npartitions == 10
assert_eq(b, df)
assert not b.known_divisions
@pytest.mark.slow
@pytest.mark.parametrize("npartitions", [1, 20, 243])
@pytest.mark.parametrize("freq", ["1D", "7D", "28h", "1h"])
@pytest.mark.parametrize(
"end", ["2000-04-15", "2000-04-15 12:37:01", "2000-01-01 12:37:00"]
)
@pytest.mark.parametrize(
"start", ["2000-01-01", "2000-01-01 12:30:00", "2000-01-01 12:30:00"]
)
def test_repartition_freq(npartitions, freq, start, end):
start = pd.Timestamp(start)
end = pd.Timestamp(end)
ind = pd.date_range(start=start, end=end, freq="60s")
df = pd.DataFrame({"x": np.arange(len(ind))}, index=ind)
ddf = dd.from_pandas(df, npartitions=npartitions, name="x")
ddf2 = ddf.repartition(freq=freq)
assert_eq(ddf2, df)
def test_repartition_freq_divisions():
df = pd.DataFrame(
{"x": np.random.random(10)},
index=pd.DatetimeIndex(np.random.random(10) * 100e9),
)
ddf = dd.from_pandas(df, npartitions=3)
ddf2 = ddf.repartition(freq="15s")
for div in ddf2.divisions[1:-1]:
assert div == div.round("15s")
assert ddf2.divisions[0] == df.index.min()
assert ddf2.divisions[-1] == df.index.max()
assert_eq(ddf2, df)
def test_repartition_freq_errors():
df = pd.DataFrame({"x": [1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=1)
with pytest.raises(TypeError) as info:
ddf.repartition(freq="1s")
assert "only" in str(info.value)
assert "timeseries" in str(info.value)
def test_repartition_freq_month():
ts = pd.date_range("2015-01-01 00:00", "2015-05-01 23:50", freq="10min")
df = pd.DataFrame(
np.random.randint(0, 100, size=(len(ts), 4)), columns=list("ABCD"), index=ts
)
ddf = dd.from_pandas(df, npartitions=1).repartition(freq="MS")
assert_eq(df, ddf)
assert ddf.divisions == (
pd.Timestamp("2015-1-1 00:00:00"),
pd.Timestamp("2015-2-1 00:00:00"),
pd.Timestamp("2015-3-1 00:00:00"),
pd.Timestamp("2015-4-1 00:00:00"),
pd.Timestamp("2015-5-1 00:00:00"),
pd.Timestamp("2015-5-1 23:50:00"),
)
assert ddf.npartitions == 5
def test_repartition_freq_day():
index = [
pd.Timestamp("2020-1-1"),
pd.Timestamp("2020-1-1"),
pd.Timestamp("2020-1-2"),
pd.Timestamp("2020-1-2"),
]
pdf = pd.DataFrame(index=index, data={"foo": "foo"})
ddf = dd.from_pandas(pdf, npartitions=1).repartition(freq="D")
assert_eq(ddf, pdf)
assert ddf.npartitions == 2
assert ddf.divisions == (
pd.Timestamp("2020-1-1"),
pd.Timestamp("2020-1-2"),
pd.Timestamp("2020-1-2"),
)
@pytest.mark.parametrize(
"freq, expected_freq",
[
("M", "MS"),
("MS", "MS"),
("2M", "2MS"),
("Q", "QS"),
("Q-FEB", "QS-FEB"),
("2Q", "2QS"),
("2Q-FEB", "2QS-FEB"),
("2QS-FEB", "2QS-FEB"),
("BQ", "BQS"),
("2BQ", "2BQS"),
("SM", "SMS"),
("A", "AS"),
("A-JUN", "AS-JUN"),
("BA", "BAS"),
("2BA", "2BAS"),
("BY", "BAS"),
("Y", "AS"),
(pd.Timedelta(seconds=1), pd.Timedelta(seconds=1)),
],
)
def test_map_freq_to_period_start(freq, expected_freq):
new_freq = _map_freq_to_period_start(freq)
assert new_freq == expected_freq
def test_repartition_input_errors():
df = pd.DataFrame({"x": [1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=1)
with pytest.raises(ValueError):
ddf.repartition(npartitions=5, divisions=[None, None])
with pytest.raises(ValueError):
ddf.repartition(npartitions=5, partition_size="5MiB")
def test_embarrassingly_parallel_operations():
df = pd.DataFrame(
{"x": [1, 2, 3, 4, None, 6], "y": list("abdabd")},
index=[10, 20, 30, 40, 50, 60],
)
a = dd.from_pandas(df, 2)
assert_eq(a.x.astype("float32"), df.x.astype("float32"))
assert a.x.astype("float32").compute().dtype == "float32"
assert_eq(a.x.dropna(), df.x.dropna())
assert_eq(a.x.between(2, 4), df.x.between(2, 4))
assert_eq(a.x.clip(2, 4), df.x.clip(2, 4))
assert_eq(a.x.notnull(), df.x.notnull())
assert_eq(a.x.isnull(), df.x.isnull())
assert_eq(a.notnull(), df.notnull())
assert_eq(a.isnull(), df.isnull())
assert len(a.sample(frac=0.5).compute()) < len(df)
def test_fillna():
df = _compat.makeMissingDataframe()
ddf = dd.from_pandas(df, npartitions=5, sort=False)
assert_eq(ddf.fillna(100), df.fillna(100))
assert_eq(ddf.A.fillna(100), df.A.fillna(100))
assert_eq(ddf.A.fillna(ddf["A"].mean()), df.A.fillna(df["A"].mean()))
assert_eq(ddf.fillna(method="pad"), df.fillna(method="pad"))
assert_eq(ddf.A.fillna(method="pad"), df.A.fillna(method="pad"))
assert_eq(ddf.fillna(method="bfill"), df.fillna(method="bfill"))
assert_eq(ddf.A.fillna(method="bfill"), df.A.fillna(method="bfill"))
assert_eq(ddf.fillna(method="pad", limit=2), df.fillna(method="pad", limit=2))
assert_eq(ddf.A.fillna(method="pad", limit=2), df.A.fillna(method="pad", limit=2))
assert_eq(ddf.fillna(method="bfill", limit=2), df.fillna(method="bfill", limit=2))
assert_eq(
ddf.A.fillna(method="bfill", limit=2), df.A.fillna(method="bfill", limit=2)
)
assert_eq(ddf.fillna(100, axis=1), df.fillna(100, axis=1))
assert_eq(ddf.fillna(method="pad", axis=1), df.fillna(method="pad", axis=1))
assert_eq(
ddf.fillna(method="pad", limit=2, axis=1),
df.fillna(method="pad", limit=2, axis=1),
)
pytest.raises(ValueError, lambda: ddf.A.fillna(0, axis=1))
pytest.raises(NotImplementedError, lambda: ddf.fillna(0, limit=10))
pytest.raises(NotImplementedError, lambda: ddf.fillna(0, limit=10, axis=1))
df = _compat.makeMissingDataframe()
df.iloc[:15, 0] = np.nan # all NaN partition
ddf = dd.from_pandas(df, npartitions=5, sort=False)
pytest.raises(ValueError, lambda: ddf.fillna(method="pad").compute())
assert_eq(df.fillna(method="pad", limit=3), ddf.fillna(method="pad", limit=3))
@pytest.mark.parametrize("optimize", [True, False])
def test_delayed_roundtrip(optimize: bool):
df1 = d + 1 + 1
delayed = df1.to_delayed(optimize_graph=optimize)
for x in delayed:
assert x.__dask_layers__() == (
"delayed-" + df1._name if optimize else df1._name,
)
x.dask.validate()
assert len(delayed) == df1.npartitions
assert len(delayed[0].dask.layers) == (1 if optimize else 3)
dm = d.a.mean().to_delayed(optimize_graph=optimize)
delayed2 = [x * 2 - dm for x in delayed]
for x in delayed2:
x.dask.validate()
df3 = dd.from_delayed(delayed2, meta=df1, divisions=df1.divisions)
df4 = df3 - 1 - 1
df4.dask.validate()
assert_eq(df4, (full + 2) * 2 - full.a.mean() - 2)
def test_from_delayed_lazy_if_meta_provided():
"""Ensure that the graph is 100% lazily evaluted if meta is provided"""
@dask.delayed
def raise_exception():
raise RuntimeError()
tasks = [raise_exception()]
ddf = dd.from_delayed(tasks, meta=dict(a=float))
with pytest.raises(RuntimeError):
ddf.compute()
def test_from_delayed_empty_meta_provided():
ddf = dd.from_delayed([], meta=dict(a=float))
expected = pd.DataFrame({"a": [0.1]}).iloc[:0]
assert_eq(ddf, expected)
def test_fillna_duplicate_index():
@dask.delayed
def f():
return pd.DataFrame(dict(a=[1.0], b=[np.NaN]))
ddf = dd.from_delayed([f(), f()], meta=dict(a=float, b=float))
ddf.b = ddf.b.fillna(ddf.a)
ddf.compute()
def test_fillna_multi_dataframe():
df = _compat.makeMissingDataframe()
ddf = dd.from_pandas(df, npartitions=5, sort=False)
assert_eq(ddf.A.fillna(ddf.B), df.A.fillna(df.B))
assert_eq(ddf.B.fillna(ddf.A), df.B.fillna(df.A))
def test_ffill_bfill():
df = _compat.makeMissingDataframe()
ddf = dd.from_pandas(df, npartitions=5, sort=False)
assert_eq(ddf.ffill(), df.ffill())
assert_eq(ddf.bfill(), df.bfill())
assert_eq(ddf.ffill(axis=1), df.ffill(axis=1))
assert_eq(ddf.bfill(axis=1), df.bfill(axis=1))
def test_fillna_series_types():
# https://github.com/dask/dask/issues/2809
df = pd.DataFrame({"A": [1, np.nan, 3], "B": [1, np.nan, 3]})
ddf = dd.from_pandas(df, npartitions=2)
fill_value = pd.Series([1, 10], index=["A", "C"])
assert_eq(ddf.fillna(fill_value), df.fillna(fill_value))
def test_sample():
df = pd.DataFrame(
{"x": [1, 2, 3, 4, None, 6], "y": list("abdabd")},
index=[10, 20, 30, 40, 50, 60],
)
a = dd.from_pandas(df, 2)
b = a.sample(frac=0.5)
assert_eq(b, b)
c = a.sample(frac=0.5, random_state=1234)
d = a.sample(frac=0.5, random_state=1234)
assert_eq(c, d)
assert a.sample(frac=0.5)._name != a.sample(frac=0.5)._name
def test_sample_without_replacement():
df = pd.DataFrame(
{"x": [1, 2, 3, 4, None, 6], "y": list("abdabd")},
index=[10, 20, 30, 40, 50, 60],
)
a = dd.from_pandas(df, 2)
b = a.sample(frac=0.7, replace=False)
bb = b.index.compute()
assert len(bb) == len(set(bb))
def test_sample_raises():
df = pd.DataFrame(
{"x": [1, 2, 3, 4, None, 6], "y": list("abdabd")},
index=[10, 20, 30, 40, 50, 60],
)
a = dd.from_pandas(df, 2)
# Make sure frac is replaced with n when 0 <= n <= 1
# This is so existing code (i.e. ddf.sample(0.5)) won't break
with pytest.warns(UserWarning):
b = a.sample(0.5, random_state=1234)
c = a.sample(frac=0.5, random_state=1234)
assert_eq(b, c)
with pytest.raises(ValueError):
a.sample(n=10)
# Make sure frac is provided
with pytest.raises(ValueError):
a.sample(frac=None)
def test_empty_max():
meta = make_meta({"x": "i8"}, parent_meta=pd.DataFrame())
a = dd.DataFrame(
{("x", 0): pd.DataFrame({"x": [1]}), ("x", 1): pd.DataFrame({"x": []})},
"x",
meta,
[None, None, None],
)
assert_eq(a.x.max(), 1)
def test_query():
pytest.importorskip("numexpr")
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [5, 6, 7, 8]})
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(ddf.query("x**2 > y"), df.query("x**2 > y"))
assert_eq(
ddf.query("x**2 > @value", local_dict={"value": 4}),
df.query("x**2 > @value", local_dict={"value": 4}),
)
def test_eval():
pytest.importorskip("numexpr")
p = pd.DataFrame({"x": [1, 2, 3, 4], "y": [5, 6, 7, 8]})
d = dd.from_pandas(p, npartitions=2)
assert_eq(p.eval("x + y"), d.eval("x + y"))
assert_eq(p.eval("z = x + y", inplace=False), d.eval("z = x + y", inplace=False))
with pytest.raises(NotImplementedError):
d.eval("z = x + y", inplace=True)
@pytest.mark.parametrize(
"include, exclude",
[
([int], None),
(None, [int]),
([np.number, object], [float]),
(["datetime"], None),
],
)
def test_select_dtypes(include, exclude):
n = 10
df = pd.DataFrame(
{
"cint": [1] * n,
"cstr": ["a"] * n,
"clfoat": [1.0] * n,
"cdt": pd.date_range("2016-01-01", periods=n),
}
)
a = dd.from_pandas(df, npartitions=2)
result = a.select_dtypes(include=include, exclude=exclude)
expected = df.select_dtypes(include=include, exclude=exclude)
assert_eq(result, expected)
# count dtypes
tm.assert_series_equal(a.dtypes.value_counts(), df.dtypes.value_counts())
tm.assert_series_equal(result.dtypes.value_counts(), expected.dtypes.value_counts())
def test_deterministic_apply_concat_apply_names():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [5, 6, 7, 8]})
a = dd.from_pandas(df, npartitions=2)
assert sorted(a.x.nlargest(2).dask) == sorted(a.x.nlargest(2).dask)
assert sorted(a.x.nlargest(2).dask) != sorted(a.x.nlargest(3).dask)
assert sorted(a.x.drop_duplicates().dask) == sorted(a.x.drop_duplicates().dask)
assert sorted(a.groupby("x").y.mean().dask) == sorted(a.groupby("x").y.mean().dask)
# Test aca without passing in token string
f = lambda a: a.nlargest(5)
f2 = lambda a: a.nlargest(3)
assert sorted(aca(a.x, f, f, a.x._meta).dask) != sorted(
aca(a.x, f2, f2, a.x._meta).dask
)
assert sorted(aca(a.x, f, f, a.x._meta).dask) == sorted(
aca(a.x, f, f, a.x._meta).dask
)
# Test aca with keywords
def chunk(x, c_key=0, both_key=0):
return x.sum() + c_key + both_key
def agg(x, a_key=0, both_key=0):
return pd.Series(x).sum() + a_key + both_key
c_key = 2
a_key = 3
both_key = 4
res = aca(
a.x,
chunk=chunk,
aggregate=agg,
chunk_kwargs={"c_key": c_key},
aggregate_kwargs={"a_key": a_key},
both_key=both_key,
)
assert sorted(res.dask) == sorted(
aca(
a.x,
chunk=chunk,
aggregate=agg,
chunk_kwargs={"c_key": c_key},
aggregate_kwargs={"a_key": a_key},
both_key=both_key,
).dask
)
assert sorted(res.dask) != sorted(
aca(
a.x,
chunk=chunk,
aggregate=agg,
chunk_kwargs={"c_key": c_key},
aggregate_kwargs={"a_key": a_key},
both_key=0,
).dask
)
assert_eq(res, df.x.sum() + 2 * (c_key + both_key) + a_key + both_key)
def test_aca_meta_infer():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [5, 6, 7, 8]})
ddf = dd.from_pandas(df, npartitions=2)
def chunk(x, y, constant=1.0):
return (x + y + constant).head()
def agg(x):
return x.head()
res = aca([ddf, 2.0], chunk=chunk, aggregate=agg, chunk_kwargs=dict(constant=2.0))
sol = (df + 2.0 + 2.0).head()
assert_eq(res, sol)
# Should infer as a scalar
res = aca(
[ddf.x], chunk=lambda x: pd.Series([x.sum()]), aggregate=lambda x: x.sum()
)
assert isinstance(res, Scalar)
assert res.compute() == df.x.sum()
def test_aca_split_every():
df = pd.DataFrame({"x": [1] * 60})
ddf = dd.from_pandas(df, npartitions=15)
def chunk(x, y, constant=0):
return x.sum() + y + constant
def combine(x, constant=0):
return x.sum() + constant + 1
def agg(x, constant=0):
return x.sum() + constant + 2
f = lambda n: aca(
[ddf, 2.0],
chunk=chunk,
aggregate=agg,
combine=combine,
chunk_kwargs=dict(constant=1.0),
combine_kwargs=dict(constant=2.0),
aggregate_kwargs=dict(constant=3.0),
split_every=n,
)
assert_max_deps(f(3), 3)
assert_max_deps(f(4), 4, False)
assert_max_deps(f(5), 5)
assert f(15).dask.keys() == f(ddf.npartitions).dask.keys()
r3 = f(3)
r4 = f(4)
assert r3._name != r4._name
# Only intersect on reading operations
assert len(r3.dask.keys() & r4.dask.keys()) == len(ddf.dask)
# Keywords are different for each step
assert f(3).compute() == 60 + 15 * (2 + 1) + 7 * (2 + 1) + (3 + 2)
# Keywords are same for each step
res = aca(
[ddf, 2.0],
chunk=chunk,
aggregate=agg,
combine=combine,
constant=3.0,
split_every=3,
)
assert res.compute() == 60 + 15 * (2 + 3) + 7 * (3 + 1) + (3 + 2)
# No combine provided, combine is agg
res = aca([ddf, 2.0], chunk=chunk, aggregate=agg, constant=3, split_every=3)
assert res.compute() == 60 + 15 * (2 + 3) + 8 * (3 + 2)
# split_every must be >= 2
with pytest.raises(ValueError):
f(1)
# combine_kwargs with no combine provided
with pytest.raises(ValueError):
aca(
[ddf, 2.0],
chunk=chunk,
aggregate=agg,
split_every=3,
chunk_kwargs=dict(constant=1.0),
combine_kwargs=dict(constant=2.0),
aggregate_kwargs=dict(constant=3.0),
)
def test_reduction_method():
df = pd.DataFrame({"x": range(50), "y": range(50, 100)})
ddf = dd.from_pandas(df, npartitions=4)
chunk = lambda x, val=0: (x >= val).sum()
agg = lambda x: x.sum()
# Output of chunk is a scalar
res = ddf.x.reduction(chunk, aggregate=agg)
assert_eq(res, df.x.count())
# Output of chunk is a series
res = ddf.reduction(chunk, aggregate=agg)
assert res._name == ddf.reduction(chunk, aggregate=agg)._name
assert_eq(res, df.count())
# Test with keywords
res2 = ddf.reduction(chunk, aggregate=agg, chunk_kwargs={"val": 25})
res2._name == ddf.reduction(chunk, aggregate=agg, chunk_kwargs={"val": 25})._name
assert res2._name != res._name
assert_eq(res2, (df >= 25).sum())
# Output of chunk is a dataframe
def sum_and_count(x):
return pd.DataFrame({"sum": x.sum(), "count": x.count()})
res = ddf.reduction(sum_and_count, aggregate=lambda x: x.groupby(level=0).sum())
assert_eq(res, pd.DataFrame({"sum": df.sum(), "count": df.count()}))
def test_reduction_method_split_every():
df = pd.Series([1] * 60)
ddf = dd.from_pandas(df, npartitions=15)
def chunk(x, constant=0):
return x.sum() + constant
def combine(x, constant=0):
return x.sum() + constant + 1
def agg(x, constant=0):
return x.sum() + constant + 2
f = lambda n: ddf.reduction(
chunk,
aggregate=agg,
combine=combine,
chunk_kwargs=dict(constant=1.0),
combine_kwargs=dict(constant=2.0),
aggregate_kwargs=dict(constant=3.0),
split_every=n,
)
assert_max_deps(f(3), 3)
assert_max_deps(f(4), 4, False)
assert_max_deps(f(5), 5)
assert f(15).dask.keys() == f(ddf.npartitions).dask.keys()
r3 = f(3)
r4 = f(4)
assert r3._name != r4._name
# Only intersect on reading operations
assert len(r3.dask.keys() & r4.dask.keys()) == len(ddf.dask)
# Keywords are different for each step
assert f(3).compute() == 60 + 15 + 7 * (2 + 1) + (3 + 2)
# Keywords are same for each step
res = ddf.reduction(
chunk, aggregate=agg, combine=combine, constant=3.0, split_every=3
)
assert res.compute() == 60 + 15 * 3 + 7 * (3 + 1) + (3 + 2)
# No combine provided, combine is agg
res = ddf.reduction(chunk, aggregate=agg, constant=3.0, split_every=3)
assert res.compute() == 60 + 15 * 3 + 8 * (3 + 2)
# split_every must be >= 2
with pytest.raises(ValueError):
f(1)
# combine_kwargs with no combine provided
with pytest.raises(ValueError):
ddf.reduction(
chunk,
aggregate=agg,
split_every=3,
chunk_kwargs=dict(constant=1.0),
combine_kwargs=dict(constant=2.0),
aggregate_kwargs=dict(constant=3.0),
)
def test_pipe():
df = pd.DataFrame({"x": range(50), "y": range(50, 100)})
ddf = dd.from_pandas(df, npartitions=4)
def f(x, y, z=0):
return x + y + z
assert_eq(ddf.pipe(f, 1, z=2), f(ddf, 1, z=2))
assert_eq(ddf.x.pipe(f, 1, z=2), f(ddf.x, 1, z=2))
def test_gh_517():
arr = np.random.randn(100, 2)
df = pd.DataFrame(arr, columns=["a", "b"])
ddf = dd.from_pandas(df, 2)
assert ddf.index.nunique().compute() == 100
ddf2 = dd.from_pandas(pd.concat([df, df]), 5)
assert ddf2.index.nunique().compute() == 100
def test_drop_axis_1():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [5, 6, 7, 8], "z": [9, 10, 11, 12]})
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(ddf.drop("y", axis=1), df.drop("y", axis=1))
assert_eq(ddf.drop(["y", "z"], axis=1), df.drop(["y", "z"], axis=1))
with pytest.raises(ValueError):
ddf.drop(["a", "x"], axis=1)
assert_eq(
ddf.drop(["a", "x"], axis=1, errors="ignore"),
df.drop(["a", "x"], axis=1, errors="ignore"),
)
assert_eq(ddf.drop(columns=["y", "z"]), df.drop(columns=["y", "z"]))
@pytest.mark.parametrize("columns", [["b"], []])
def test_drop_columns(columns):
# Check both populated and empty list argument
# https://github.com/dask/dask/issues/6870
df = pd.DataFrame(
{
"a": [2, 4, 6, 8],
"b": ["1a", "2b", "3c", "4d"],
}
)
ddf = dd.from_pandas(df, npartitions=2)
ddf2 = ddf.drop(columns=columns)
ddf["new"] = ddf["a"] + 1 # Check that ddf2 is not modified
assert_eq(df.drop(columns=columns), ddf2)
def test_gh580():
df = pd.DataFrame({"x": np.arange(10, dtype=float)})
ddf = dd.from_pandas(df, 2)
assert_eq(np.cos(df["x"]), np.cos(ddf["x"]))
assert_eq(np.cos(df["x"]), np.cos(ddf["x"]))
def test_gh6305():
df = pd.DataFrame({"x": np.arange(3, dtype=float)})
ddf = dd.from_pandas(df, 1)
ddf_index_only = ddf.set_index("x")
ds = ddf["x"]
is_broadcastable([ddf_index_only], ds)
def test_rename_dict():
renamer = {"a": "A", "b": "B"}
assert_eq(d.rename(columns=renamer), full.rename(columns=renamer))
def test_rename_function():
renamer = lambda x: x.upper()
assert_eq(d.rename(columns=renamer), full.rename(columns=renamer))
def test_rename_index():
renamer = {0: 1}
pytest.raises(ValueError, lambda: d.rename(index=renamer))
def test_to_timestamp():
index = pd.period_range(freq="A", start="1/1/2001", end="12/1/2004")
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]}, index=index)
ddf = dd.from_pandas(df, npartitions=3)
assert_eq(ddf.to_timestamp(), df.to_timestamp(), **CHECK_FREQ)
assert_eq(
ddf.to_timestamp(freq="M", how="s").compute(),
df.to_timestamp(freq="M", how="s"),
**CHECK_FREQ,
)
assert_eq(ddf.x.to_timestamp(), df.x.to_timestamp())
assert_eq(
ddf.x.to_timestamp(freq="M", how="s").compute(),
df.x.to_timestamp(freq="M", how="s"),
**CHECK_FREQ,
)
def test_to_frame():
s = pd.Series([1, 2, 3], name="foo")
a = dd.from_pandas(s, npartitions=2)
assert_eq(s.to_frame(), a.to_frame())
assert_eq(s.to_frame("bar"), a.to_frame("bar"))
@pytest.mark.parametrize("as_frame", [False, False])
def test_to_dask_array_raises(as_frame):
s = pd.Series([1, 2, 3, 4, 5, 6], name="foo")
a = dd.from_pandas(s, npartitions=2)
if as_frame:
a = a.to_frame()
with pytest.raises(ValueError, match="4 != 2"):
a.to_dask_array((1, 2, 3, 4))
with pytest.raises(ValueError, match="Unexpected value"):
a.to_dask_array(5)
@pytest.mark.parametrize("as_frame", [False, True])
def test_to_dask_array_unknown(as_frame):
s = pd.Series([1, 2, 3, 4, 5], name="foo")
a = dd.from_pandas(s, chunksize=2)
if as_frame:
a = a.to_frame()
result = a.to_dask_array()
assert isinstance(result, da.Array)
result = result.chunks
if as_frame:
assert len(result) == 2
assert result[1] == (1,)
else:
assert len(result) == 1
result = result[0]
assert len(result) == 2
assert all(np.isnan(x) for x in result)
@pytest.mark.parametrize(
"lengths,as_frame,meta",
[
([2, 3], False, None),
(True, False, None),
(True, False, np.array([], dtype="f4")),
],
)
def test_to_dask_array(meta, as_frame, lengths):
s = pd.Series([1, 2, 3, 4, 5], name="foo", dtype="i4")
a = dd.from_pandas(s, chunksize=2)
if as_frame:
a = a.to_frame()
result = a.to_dask_array(lengths=lengths, meta=meta)
assert isinstance(result, da.Array)
expected_chunks = ((2, 3),)
if as_frame:
expected_chunks = expected_chunks + ((1,),)
assert result.chunks == expected_chunks
def test_apply():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
ddf = dd.from_pandas(df, npartitions=2)
func = lambda row: row["x"] + row["y"]
assert_eq(
ddf.x.apply(lambda x: x + 1, meta=("x", int)), df.x.apply(lambda x: x + 1)
)
# specify meta
assert_eq(
ddf.apply(lambda xy: xy[0] + xy[1], axis=1, meta=(None, int)),
df.apply(lambda xy: xy[0] + xy[1], axis=1),
)
assert_eq(
ddf.apply(lambda xy: xy[0] + xy[1], axis="columns", meta=(None, int)),
df.apply(lambda xy: xy[0] + xy[1], axis="columns"),
)
# inference
with pytest.warns(None):
assert_eq(
ddf.apply(lambda xy: xy[0] + xy[1], axis=1),
df.apply(lambda xy: xy[0] + xy[1], axis=1),
)
with pytest.warns(None):
assert_eq(ddf.apply(lambda xy: xy, axis=1), df.apply(lambda xy: xy, axis=1))
# specify meta
func = lambda x: pd.Series([x, x])
assert_eq(ddf.x.apply(func, meta=[(0, int), (1, int)]), df.x.apply(func))
# inference
with pytest.warns(None):
assert_eq(ddf.x.apply(func), df.x.apply(func))
# axis=0
with pytest.raises(NotImplementedError):
ddf.apply(lambda xy: xy, axis=0)
with pytest.raises(NotImplementedError):
ddf.apply(lambda xy: xy, axis="index")
def test_apply_warns():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
ddf = dd.from_pandas(df, npartitions=2)
func = lambda row: row["x"] + row["y"]
with pytest.warns(UserWarning) as w:
ddf.apply(func, axis=1)
assert len(w) == 1
with pytest.warns(None) as w:
ddf.apply(func, axis=1, meta=(None, int))
assert len(w) == 0
with pytest.warns(UserWarning) as w:
ddf.apply(lambda x: x, axis=1)
assert len(w) == 1
assert "'x'" in str(w[0].message)
assert "int64" in str(w[0].message)
def test_apply_warns_with_invalid_meta():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
ddf = dd.from_pandas(df, npartitions=2)
func = lambda row: row["x"] + row["y"]
with pytest.warns(FutureWarning, match="Meta is not valid"):
ddf.apply(func, axis=1, meta=int)
def test_applymap():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(ddf.applymap(lambda x: x + 1), df.applymap(lambda x: x + 1))
assert_eq(ddf.applymap(lambda x: (x, x)), df.applymap(lambda x: (x, x)))
def test_add_prefix():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5], "y": [4, 5, 6, 7, 8]})
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(ddf.add_prefix("abc"), df.add_prefix("abc"))
assert_eq(ddf.x.add_prefix("abc"), df.x.add_prefix("abc"))
def test_add_suffix():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5], "y": [4, 5, 6, 7, 8]})
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(ddf.add_suffix("abc"), df.add_suffix("abc"))
assert_eq(ddf.x.add_suffix("abc"), df.x.add_suffix("abc"))
def test_abs():
df = pd.DataFrame(
{
"A": [1, -2, 3, -4, 5],
"B": [-6.0, -7, -8, -9, 10],
"C": ["a", "b", "c", "d", "e"],
}
)
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(ddf.A.abs(), df.A.abs())
assert_eq(ddf[["A", "B"]].abs(), df[["A", "B"]].abs())
pytest.raises(ValueError, lambda: ddf.C.abs())
pytest.raises(TypeError, lambda: ddf.abs())
def test_round():
df = pd.DataFrame({"col1": [1.123, 2.123, 3.123], "col2": [1.234, 2.234, 3.234]})
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(ddf.round(), df.round())
assert_eq(ddf.round(2), df.round(2))
def test_cov():
# DataFrame
df = _compat.makeMissingDataframe()
ddf = dd.from_pandas(df, npartitions=6)
res = ddf.cov()
res2 = ddf.cov(split_every=2)
res3 = ddf.cov(10)
res4 = ddf.cov(10, split_every=2)
sol = df.cov()
sol2 = df.cov(10)
assert_eq(res, sol)
assert_eq(res2, sol)
assert_eq(res3, sol2)
assert_eq(res4, sol2)
assert res._name == ddf.cov()._name
assert res._name != res2._name
assert res3._name != res4._name
assert res._name != res3._name
# Series
a = df.A
b = df.B
da = dd.from_pandas(a, npartitions=6)
db = dd.from_pandas(b, npartitions=7)
res = da.cov(db)
res2 = da.cov(db, split_every=2)
res3 = da.cov(db, 10)
res4 = da.cov(db, 10, split_every=2)
sol = a.cov(b)
sol2 = a.cov(b, 10)
assert_eq(res, sol)
assert_eq(res2, sol)
assert_eq(res3, sol2)
assert_eq(res4, sol2)
assert res._name == da.cov(db)._name
assert res._name != res2._name
assert res3._name != res4._name
assert res._name != res3._name
def test_corr():
# DataFrame
df = _compat.makeMissingDataframe()
ddf = dd.from_pandas(df, npartitions=6)
res = ddf.corr()
res2 = ddf.corr(split_every=2)
res3 = ddf.corr(min_periods=10)
res4 = ddf.corr(min_periods=10, split_every=2)
sol = df.corr()
sol2 = df.corr(min_periods=10)
assert_eq(res, sol)
assert_eq(res2, sol)
assert_eq(res3, sol2)
assert_eq(res4, sol2)
assert res._name == ddf.corr()._name
assert res._name != res2._name
assert res3._name != res4._name
assert res._name != res3._name
pytest.raises(NotImplementedError, lambda: ddf.corr(method="spearman"))
# Series
a = df.A
b = df.B
da = dd.from_pandas(a, npartitions=6)
db = dd.from_pandas(b, npartitions=7)
res = da.corr(db)
res2 = da.corr(db, split_every=2)
res3 = da.corr(db, min_periods=10)
res4 = da.corr(db, min_periods=10, split_every=2)
sol = da.corr(db)
sol2 = da.corr(db, min_periods=10)
assert_eq(res, sol)
assert_eq(res2, sol)
assert_eq(res3, sol2)
assert_eq(res4, sol2)
assert res._name == da.corr(db)._name
assert res._name != res2._name
assert res3._name != res4._name
assert res._name != res3._name
pytest.raises(NotImplementedError, lambda: da.corr(db, method="spearman"))
pytest.raises(TypeError, lambda: da.corr(ddf))
def test_corr_same_name():
# Series with same names (see https://github.com/dask/dask/issues/4906)
df = _compat.makeMissingDataframe()
ddf = dd.from_pandas(df, npartitions=6)
result = ddf.A.corr(ddf.B.rename("A"))
expected = ddf.A.corr(ddf.B)
assert_eq(result, expected)
# test with split_every
result2 = ddf.A.corr(ddf.B.rename("A"), split_every=2)
assert_eq(result2, expected)
def test_cov_corr_meta():
df = pd.DataFrame(
{
"a": np.array([1, 2, 3]),
"b": np.array([1.0, 2.0, 3.0], dtype="f4"),
"c": np.array([1.0, 2.0, 3.0]),
},
index=pd.Index([1, 2, 3], name="myindex"),
)
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(ddf.corr(), df.corr())
assert_eq(ddf.cov(), df.cov())
assert ddf.a.cov(ddf.b)._meta.dtype == "f8"
assert ddf.a.corr(ddf.b)._meta.dtype == "f8"
@pytest.mark.slow
def test_cov_corr_stable():
df = pd.DataFrame(np.random.uniform(-1, 1, (20000000, 2)), columns=["a", "b"])
ddf = dd.from_pandas(df, npartitions=50)
assert_eq(ddf.cov(split_every=8), df.cov())
assert_eq(ddf.corr(split_every=8), df.corr())
def test_cov_corr_mixed():
size = 1000
d = {
"dates": pd.date_range("2015-01-01", periods=size, freq="1T"),
"unique_id": np.arange(0, size),
"ints": np.random.randint(0, size, size=size),
"floats": np.random.randn(size),
"bools": np.random.choice([0, 1], size=size),
"int_nans": np.random.choice([0, 1, np.nan], size=size),
"float_nans": np.random.choice([0.0, 1.0, np.nan], size=size),
"constant": 1,
"int_categorical": np.random.choice([10, 20, 30, 40, 50], size=size),
"categorical_binary": np.random.choice(["a", "b"], size=size),
"categorical_nans": np.random.choice(["a", "b", "c"], size=size),
}
df = pd.DataFrame(d)
df["hardbools"] = df["bools"] == 1
df["categorical_nans"] = df["categorical_nans"].replace("c", np.nan)
df["categorical_binary"] = df["categorical_binary"].astype("category")
df["unique_id"] = df["unique_id"].astype(str)
ddf = dd.from_pandas(df, npartitions=20)
assert_eq(ddf.corr(split_every=4), df.corr(), check_divisions=False)
assert_eq(ddf.cov(split_every=4), df.cov(), check_divisions=False)
def test_autocorr():
x = pd.Series(np.random.random(100))
dx = dd.from_pandas(x, npartitions=10)
assert_eq(dx.autocorr(2), x.autocorr(2))
assert_eq(dx.autocorr(0), x.autocorr(0))
assert_eq(dx.autocorr(-2), x.autocorr(-2))
assert_eq(dx.autocorr(2, split_every=3), x.autocorr(2))
pytest.raises(TypeError, lambda: dx.autocorr(1.5))
def test_apply_infer_columns():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
ddf = dd.from_pandas(df, npartitions=2)
def return_df(x):
# will create new DataFrame which columns is ['sum', 'mean']
return pd.Series([x.sum(), x.mean()], index=["sum", "mean"])
# DataFrame to completely different DataFrame
with pytest.warns(None):
result = ddf.apply(return_df, axis=1)
assert isinstance(result, dd.DataFrame)
tm.assert_index_equal(result.columns, pd.Index(["sum", "mean"]))
assert_eq(result, df.apply(return_df, axis=1))
# DataFrame to Series
with pytest.warns(None):
result = ddf.apply(lambda x: 1, axis=1)
assert isinstance(result, dd.Series)
assert result.name is None
assert_eq(result, df.apply(lambda x: 1, axis=1))
def return_df2(x):
return pd.Series([x * 2, x * 3], index=["x2", "x3"])
# Series to completely different DataFrame
with pytest.warns(None):
result = ddf.x.apply(return_df2)
assert isinstance(result, dd.DataFrame)
tm.assert_index_equal(result.columns, pd.Index(["x2", "x3"]))
assert_eq(result, df.x.apply(return_df2))
# Series to Series
with pytest.warns(None):
result = ddf.x.apply(lambda x: 1)
assert isinstance(result, dd.Series)
assert result.name == "x"
assert_eq(result, df.x.apply(lambda x: 1))
def test_index_time_properties():
i = _compat.makeTimeSeries()
a = dd.from_pandas(i, npartitions=3)
assert "day" in dir(a.index)
# returns a numpy array in pandas, but a Index in dask
assert_eq(a.index.day, pd.Index(i.index.day))
assert_eq(a.index.month, pd.Index(i.index.month))
def test_nlargest_nsmallest():
from string import ascii_lowercase
df = pd.DataFrame(
{
"a": np.random.permutation(20),
"b": list(ascii_lowercase[:20]),
"c": np.random.permutation(20).astype("float64"),
}
)
ddf = dd.from_pandas(df, npartitions=3)
for m in ["nlargest", "nsmallest"]:
f = lambda df, *args, **kwargs: getattr(df, m)(*args, **kwargs)
res = f(ddf, 5, "a")
res2 = f(ddf, 5, "a", split_every=2)
sol = f(df, 5, "a")
assert_eq(res, sol)
assert_eq(res2, sol)
assert res._name != res2._name
res = f(ddf, 5, ["a", "c"])
res2 = f(ddf, 5, ["a", "c"], split_every=2)
sol = f(df, 5, ["a", "c"])
assert_eq(res, sol)
assert_eq(res2, sol)
assert res._name != res2._name
res = f(ddf.a, 5)
res2 = f(ddf.a, 5, split_every=2)
sol = f(df.a, 5)
assert_eq(res, sol)
assert_eq(res2, sol)
assert res._name != res2._name
def test_reset_index():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
ddf = dd.from_pandas(df, npartitions=2)
sol = df.reset_index()
res = ddf.reset_index()
assert all(d is None for d in res.divisions)
assert_eq(res, sol, check_index=False)
sol = df.reset_index(drop=True)
res = ddf.reset_index(drop=True)
assert all(d is None for d in res.divisions)
assert_eq(res, sol, check_index=False)
sol = df.x.reset_index()
res = ddf.x.reset_index()
assert all(d is None for d in res.divisions)
assert_eq(res, sol, check_index=False)
sol = df.x.reset_index(drop=True)
res = ddf.x.reset_index(drop=True)
assert all(d is None for d in res.divisions)
assert_eq(res, sol, check_index=False)
def test_dataframe_compute_forward_kwargs():
x = dd.from_pandas(pd.DataFrame({"a": range(10)}), npartitions=2).a.sum()
x.compute(bogus_keyword=10)
def test_contains_series_raises_deprecated_warning_preserves_behavior():
s = pd.Series(["a", "b", "c", "d"])
ds = dd.from_pandas(s, npartitions=2)
with pytest.warns(
FutureWarning,
match="Using the ``in`` operator to test for membership in Series is deprecated",
):
output = "a" in ds
assert output
with pytest.warns(
FutureWarning,
match="Using the ``in`` operator to test for membership in Series is deprecated",
):
output = 0 in ds
assert not output
def test_series_iteritems():
df = pd.DataFrame({"x": [1, 2, 3, 4]})
ddf = dd.from_pandas(df, npartitions=2)
# `iteritems` was deprecated starting in `pandas=1.5.0`
with _check_warning(
PANDAS_GT_150, FutureWarning, message="iteritems is deprecated"
):
pd_items = df["x"].iteritems()
with _check_warning(
PANDAS_GT_150, FutureWarning, message="iteritems is deprecated"
):
dd_items = ddf["x"].iteritems()
for (a, b) in zip(pd_items, dd_items):
assert a == b
def test_series_iter():
s = pd.DataFrame({"x": [1, 2, 3, 4]})
ds = dd.from_pandas(s, npartitions=2)
for (a, b) in zip(s["x"], ds["x"]):
assert a == b
def test_dataframe_iterrows():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
ddf = dd.from_pandas(df, npartitions=2)
for (a, b) in zip(df.iterrows(), ddf.iterrows()):
tm.assert_series_equal(a[1], b[1])
def test_dataframe_itertuples():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
ddf = dd.from_pandas(df, npartitions=2)
for (a, b) in zip(df.itertuples(), ddf.itertuples()):
assert a == b
@pytest.mark.parametrize(
"columns",
[
("x", "y"),
("x", "x"),
pd.MultiIndex.from_tuples([("x", 1), ("x", 2)], names=("letter", "number")),
],
)
def test_dataframe_items(columns):
df = pd.DataFrame([[1, 10], [2, 20], [3, 30], [4, 40]], columns=columns)
ddf = dd.from_pandas(df, npartitions=2)
for (a, b) in zip(df.items(), ddf.items()):
assert a[0] == b[0] # column name
assert_eq(a[1], b[1].compute()) # column values
def test_dataframe_itertuples_with_index_false():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
ddf = dd.from_pandas(df, npartitions=2)
for (a, b) in zip(df.itertuples(index=False), ddf.itertuples(index=False)):
assert a == b
def test_dataframe_itertuples_with_name_none():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
ddf = dd.from_pandas(df, npartitions=2)
for (a, b) in zip(df.itertuples(name=None), ddf.itertuples(name=None)):
assert a == b
assert type(a) is type(b)
def test_astype():
df = pd.DataFrame(
{"x": [1, 2, 3, None], "y": [10, 20, 30, 40]}, index=[10, 20, 30, 40]
)
a = dd.from_pandas(df, 2)
assert_eq(a.astype(float), df.astype(float))
assert_eq(a.x.astype(float), df.x.astype(float))
def test_astype_categoricals():
df = pd.DataFrame(
{
"x": ["a", "b", "c", "b", "c"],
"y": ["x", "y", "z", "x", "y"],
"z": [1, 2, 3, 4, 5],
}
)
df = df.astype({"y": "category"})
ddf = dd.from_pandas(df, 2)
assert ddf.y.cat.known
ddf2 = ddf.astype({"x": "category"})
assert not ddf2.x.cat.known
assert ddf2.y.cat.known
assert ddf2.x.dtype == "category"
assert ddf2.compute().x.dtype == "category"
dx = ddf.x.astype("category")
assert not dx.cat.known
assert dx.dtype == "category"
assert dx.compute().dtype == "category"
def test_astype_categoricals_known():
df = pd.DataFrame(
{
"x": ["a", "b", "c", "b", "c"],
"y": ["x", "y", "z", "y", "z"],
"z": ["b", "b", "b", "c", "b"],
"other": [1, 2, 3, 4, 5],
}
)
ddf = dd.from_pandas(df, 2)
abc = pd.api.types.CategoricalDtype(["a", "b", "c"], ordered=False)
category = pd.api.types.CategoricalDtype(ordered=False)
# DataFrame
ddf2 = ddf.astype({"x": abc, "y": category, "z": "category", "other": "f8"})
for col, known in [("x", True), ("y", False), ("z", False)]:
x = getattr(ddf2, col)
assert pd.api.types.is_categorical_dtype(x.dtype)
assert x.cat.known == known
# Series
for dtype, known in [("category", False), (category, False), (abc, True)]:
dx2 = ddf.x.astype(dtype)
assert pd.api.types.is_categorical_dtype(dx2.dtype)
assert dx2.cat.known == known
def test_groupby_callable():
a = pd.DataFrame({"x": [1, 2, 3, None], "y": [10, 20, 30, 40]}, index=[1, 2, 3, 4])
b = dd.from_pandas(a, 2)
def iseven(x):
return x % 2 == 0
assert_eq(a.groupby(iseven).y.sum(), b.groupby(iseven).y.sum())
assert_eq(a.y.groupby(iseven).sum(), b.y.groupby(iseven).sum())
def test_methods_tokenize_differently():
df = pd.DataFrame({"x": [1, 2, 3, 4]})
df = dd.from_pandas(df, npartitions=1)
assert (
df.x.map_partitions(lambda x: pd.Series(x.min()))._name
!= df.x.map_partitions(lambda x: pd.Series(x.max()))._name
)
def _assert_info(df, ddf, memory_usage=True):
from io import StringIO
assert isinstance(df, pd.DataFrame)
assert isinstance(ddf, dd.DataFrame)
buf_pd, buf_da = StringIO(), StringIO()
df.info(buf=buf_pd, memory_usage=memory_usage)
ddf.info(buf=buf_da, verbose=True, memory_usage=memory_usage)
stdout_pd = buf_pd.getvalue()
stdout_da = buf_da.getvalue()
stdout_da = stdout_da.replace(str(type(ddf)), str(type(df)))
# TODO
assert stdout_pd == stdout_da
def test_info():
from io import StringIO
pandas_format._put_lines = put_lines
test_frames = [
pd.DataFrame({"x": [1, 2, 3, 4], "y": [1, 0, 1, 0]}, index=[0, 1, 2, 3]),
pd.DataFrame(),
]
for df in test_frames:
ddf = dd.from_pandas(df, npartitions=4)
_assert_info(df, ddf)
buf = StringIO()
ddf = dd.from_pandas(
pd.DataFrame({"x": [1, 2, 3, 4], "y": [1, 0, 1, 0]}, index=range(4)),
npartitions=4,
)
# Verbose=False
ddf.info(buf=buf, verbose=False)
assert buf.getvalue() == (
"\n"
"Columns: 2 entries, x to y\n"
"dtypes: int64(2)"
)
# buf=None
assert ddf.info(buf=None) is None
def test_groupby_multilevel_info():
# GH 1844
from io import StringIO
pandas_format._put_lines = put_lines
df = pd.DataFrame({"A": [1, 1, 2, 2], "B": [1, 2, 3, 4], "C": [1, 2, 3, 4]})
ddf = dd.from_pandas(df, npartitions=2)
g = ddf.groupby(["A", "B"]).sum()
# slight difference between memory repr (single additional space)
_assert_info(g.compute(), g, memory_usage=True)
buf = StringIO()
g.info(buf, verbose=False)
assert buf.getvalue() == (
"\n"
"Columns: 1 entries, C to C\n"
"dtypes: int64(1)"
)
# multilevel
g = ddf.groupby(["A", "B"]).agg(["count", "sum"])
_assert_info(g.compute(), g, memory_usage=True)
buf = StringIO()
g.info(buf, verbose=False)
expected = (
"\n"
"Columns: 2 entries, ('C', 'count') to ('C', 'sum')\n"
"dtypes: int64(2)"
)
assert buf.getvalue() == expected
@pytest.mark.skipif(not PANDAS_GT_120, reason="need newer version of Pandas")
def test_categorize_info():
# assert that we can call info after categorize
# workaround for: https://github.com/pydata/pandas/issues/14368
from io import StringIO
pandas_format._put_lines = put_lines
df = pd.DataFrame(
{"x": [1, 2, 3, 4], "y": pd.Series(list("aabc")), "z": pd.Series(list("aabc"))},
index=[0, 1, 2, 3],
)
ddf = dd.from_pandas(df, npartitions=4).categorize(["y"])
# Verbose=False
buf = StringIO()
ddf.info(buf=buf, verbose=True)
expected = (
"\n"
"Int64Index: 4 entries, 0 to 3\n"
"Data columns (total 3 columns):\n"
" # Column Non-Null Count Dtype\n"
"--- ------ -------------- -----\n"
" 0 x 4 non-null int64\n"
" 1 y 4 non-null category\n"
" 2 z 4 non-null object\n"
"dtypes: category(1), object(1), int64(1)\n"
"memory usage: 496.0 bytes\n"
)
assert buf.getvalue() == expected
def test_gh_1301():
df = pd.DataFrame([["1", "2"], ["3", "4"]])
ddf = dd.from_pandas(df, npartitions=2)
ddf2 = ddf.assign(y=ddf[1].astype(int))
assert_eq(ddf2, df.assign(y=df[1].astype(int)))
assert ddf2.dtypes["y"] == np.dtype(int)
def test_timeseries_sorted():
df = _compat.makeTimeDataFrame()
ddf = dd.from_pandas(df.reset_index(), npartitions=2)
df.index.name = "index"
assert_eq(ddf.set_index("index", sorted=True, drop=True), df)
def test_column_assignment():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [1, 0, 1, 0]})
ddf = dd.from_pandas(df, npartitions=2)
orig = ddf.copy()
ddf["z"] = ddf.x + ddf.y
df["z"] = df.x + df.y
assert_eq(df, ddf)
assert "z" not in orig.columns
def test_array_assignment():
df = pd.DataFrame({"x": np.random.normal(size=50), "y": np.random.normal(size=50)})
ddf = dd.from_pandas(df, npartitions=2)
orig = ddf.copy()
arr = np.array(np.random.normal(size=50))
darr = da.from_array(arr, chunks=25)
df["z"] = arr
ddf["z"] = darr
assert_eq(df, ddf)
assert "z" not in orig.columns
arr = np.array(np.random.normal(size=(50, 50)))
darr = da.from_array(arr, chunks=25)
msg = "Array assignment only supports 1-D arrays"
with pytest.raises(ValueError, match=msg):
ddf["z"] = darr
arr = np.array(np.random.normal(size=50))
darr = da.from_array(arr, chunks=10)
msg = "Number of partitions do not match"
with pytest.raises(ValueError, match=msg):
ddf["z"] = darr
def test_columns_assignment():
df = pd.DataFrame({"x": [1, 2, 3, 4]})
ddf = dd.from_pandas(df, npartitions=2)
df2 = df.assign(y=df.x + 1, z=df.x - 1)
df[["a", "b"]] = df2[["y", "z"]]
ddf2 = ddf.assign(y=ddf.x + 1, z=ddf.x - 1)
ddf[["a", "b"]] = ddf2[["y", "z"]]
assert_eq(df, ddf)
def test_attribute_assignment():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5], "y": [1.0, 2.0, 3.0, 4.0, 5.0]})
ddf = dd.from_pandas(df, npartitions=2)
ddf.y = ddf.x + ddf.y
assert_eq(ddf, df.assign(y=df.x + df.y))
def test_setitem_triggering_realign():
a = dd.from_pandas(pd.DataFrame({"A": range(12)}), npartitions=3)
b = dd.from_pandas(pd.Series(range(12), name="B"), npartitions=4)
a["C"] = b
assert len(a) == 12
def test_inplace_operators():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5], "y": [1.0, 2.0, 3.0, 4.0, 5.0]})
ddf = dd.from_pandas(df, npartitions=2)
ddf.y **= 0.5
assert_eq(ddf.y, df.y**0.5)
assert_eq(ddf, df.assign(y=df.y**0.5))
@pytest.mark.parametrize("skipna", [True, False])
@pytest.mark.parametrize(
"idx",
[
np.arange(100),
sorted(np.random.random(size=100)),
pd.date_range("20150101", periods=100),
],
)
def test_idxmaxmin(idx, skipna):
df = pd.DataFrame(np.random.randn(100, 5), columns=list("abcde"), index=idx)
df.b.iloc[31] = np.nan
df.d.iloc[78] = np.nan
ddf = dd.from_pandas(df, npartitions=3)
# https://github.com/pandas-dev/pandas/issues/43587
check_dtype = not all(
(_compat.PANDAS_GT_133, skipna is False, isinstance(idx, pd.DatetimeIndex))
)
with warnings.catch_warnings(record=True):
assert_eq(df.idxmax(axis=1, skipna=skipna), ddf.idxmax(axis=1, skipna=skipna))
assert_eq(df.idxmin(axis=1, skipna=skipna), ddf.idxmin(axis=1, skipna=skipna))
assert_eq(
df.idxmax(skipna=skipna), ddf.idxmax(skipna=skipna), check_dtype=check_dtype
)
assert_eq(
df.idxmax(skipna=skipna),
ddf.idxmax(skipna=skipna, split_every=2),
check_dtype=check_dtype,
)
assert (
ddf.idxmax(skipna=skipna)._name
!= ddf.idxmax(skipna=skipna, split_every=2)._name
)
assert_eq(
df.idxmin(skipna=skipna), ddf.idxmin(skipna=skipna), check_dtype=check_dtype
)
assert_eq(
df.idxmin(skipna=skipna),
ddf.idxmin(skipna=skipna, split_every=2),
check_dtype=check_dtype,
)
assert (
ddf.idxmin(skipna=skipna)._name
!= ddf.idxmin(skipna=skipna, split_every=2)._name
)
assert_eq(df.a.idxmax(skipna=skipna), ddf.a.idxmax(skipna=skipna))
assert_eq(
df.a.idxmax(skipna=skipna), ddf.a.idxmax(skipna=skipna, split_every=2)
)
assert (
ddf.a.idxmax(skipna=skipna)._name
!= ddf.a.idxmax(skipna=skipna, split_every=2)._name
)
assert_eq(df.a.idxmin(skipna=skipna), ddf.a.idxmin(skipna=skipna))
assert_eq(
df.a.idxmin(skipna=skipna), ddf.a.idxmin(skipna=skipna, split_every=2)
)
assert (
ddf.a.idxmin(skipna=skipna)._name
!= ddf.a.idxmin(skipna=skipna, split_every=2)._name
)
def test_idxmaxmin_empty_partitions():
df = pd.DataFrame(
{"a": [1, 2, 3], "b": [1.5, 2, 3], "c": [np.NaN] * 3, "d": [1, 2, np.NaN]}
)
empty = df.iloc[:0]
ddf = dd.concat(
[dd.from_pandas(df, npartitions=1)]
+ [dd.from_pandas(empty, npartitions=1)] * 10
)
for skipna in [True, False]:
assert_eq(ddf.idxmin(skipna=skipna, split_every=3), df.idxmin(skipna=skipna))
assert_eq(
ddf[["a", "b", "d"]].idxmin(skipna=skipna, split_every=3),
df[["a", "b", "d"]].idxmin(skipna=skipna),
)
assert_eq(ddf.b.idxmax(split_every=3), df.b.idxmax())
# Completely empty raises
ddf = dd.concat([dd.from_pandas(empty, npartitions=1)] * 10)
with pytest.raises(ValueError):
ddf.idxmax().compute()
with pytest.raises(ValueError):
ddf.b.idxmax().compute()
def test_getitem_meta():
data = {"col1": ["a", "a", "b"], "col2": [0, 1, 0]}
df = pd.DataFrame(data=data, columns=["col1", "col2"])
ddf = dd.from_pandas(df, npartitions=1)
assert_eq(df.col2[df.col1 == "a"], ddf.col2[ddf.col1 == "a"])
def test_getitem_multilevel():
pdf = pd.DataFrame({("A", "0"): [1, 2, 2], ("B", "1"): [1, 2, 3]})
ddf = dd.from_pandas(pdf, npartitions=3)
assert_eq(pdf["A", "0"], ddf["A", "0"])
assert_eq(pdf[[("A", "0"), ("B", "1")]], ddf[[("A", "0"), ("B", "1")]])
def test_getitem_string_subclass():
df = pd.DataFrame({"column_1": list(range(10))})
ddf = dd.from_pandas(df, npartitions=3)
class string_subclass(str):
pass
column_1 = string_subclass("column_1")
assert_eq(df[column_1], ddf[column_1])
@pytest.mark.parametrize("col_type", [list, np.array, pd.Series, pd.Index])
def test_getitem_column_types(col_type):
df = pd.DataFrame({"A": [1, 2], "B": [3, 4], "C": [5, 6]})
ddf = dd.from_pandas(df, 2)
cols = col_type(["C", "A", "B"])
assert_eq(df[cols], ddf[cols])
def test_getitem_with_bool_dataframe_as_key():
df = pd.DataFrame({"A": [1, 2], "B": [3, 4], "C": [5, 6]})
ddf = dd.from_pandas(df, 2)
assert_eq(df[df > 3], ddf[ddf > 3])
def test_getitem_with_non_series():
s = pd.Series(list(range(10)), index=list("abcdefghij"))
ds = dd.from_pandas(s, npartitions=3)
assert_eq(s[["a", "b"]], ds[["a", "b"]])
def test_ipython_completion():
df = pd.DataFrame({"a": [1], "b": [2]})
ddf = dd.from_pandas(df, npartitions=1)
completions = ddf._ipython_key_completions_()
assert "a" in completions
assert "b" in completions
assert "c" not in completions
def test_diff():
df = pd.DataFrame(np.random.randn(100, 5), columns=list("abcde"))
ddf = dd.from_pandas(df, 5)
assert_eq(ddf.diff(), df.diff())
assert_eq(ddf.diff(0), df.diff(0))
assert_eq(ddf.diff(2), df.diff(2))
assert_eq(ddf.diff(-2), df.diff(-2))
assert_eq(ddf.diff(2, axis=1), df.diff(2, axis=1))
assert_eq(ddf.a.diff(), df.a.diff())
assert_eq(ddf.a.diff(0), df.a.diff(0))
assert_eq(ddf.a.diff(2), df.a.diff(2))
assert_eq(ddf.a.diff(-2), df.a.diff(-2))
assert ddf.diff(2)._name == ddf.diff(2)._name
assert ddf.diff(2)._name != ddf.diff(3)._name
pytest.raises(TypeError, lambda: ddf.diff(1.5))
def test_shift():
df = _compat.makeTimeDataFrame()
ddf = dd.from_pandas(df, npartitions=4)
# DataFrame
assert_eq(ddf.shift(), df.shift())
assert_eq(ddf.shift(0), df.shift(0))
assert_eq(ddf.shift(2), df.shift(2))
assert_eq(ddf.shift(-2), df.shift(-2))
assert_eq(ddf.shift(2, axis=1), df.shift(2, axis=1))
# Series
assert_eq(ddf.A.shift(), df.A.shift())
assert_eq(ddf.A.shift(0), df.A.shift(0))
assert_eq(ddf.A.shift(2), df.A.shift(2))
assert_eq(ddf.A.shift(-2), df.A.shift(-2))
with pytest.raises(TypeError):
ddf.shift(1.5)
@pytest.mark.parametrize("data_freq,divs1", [("B", False), ("D", True), ("H", True)])
def test_shift_with_freq_DatetimeIndex(data_freq, divs1):
df = _compat.makeTimeDataFrame()
df = df.set_index(_compat.makeDateIndex(30, freq=data_freq))
ddf = dd.from_pandas(df, npartitions=4)
for freq, divs2 in [("S", True), ("W", False), (pd.Timedelta(10, unit="h"), True)]:
for d, p in [(ddf, df), (ddf.A, df.A), (ddf.index, df.index)]:
res = d.shift(2, freq=freq)
assert_eq(res, p.shift(2, freq=freq))
assert res.known_divisions == divs2
# Index shifts also work with freq=None
res = ddf.index.shift(2)
assert_eq(res, df.index.shift(2))
assert res.known_divisions == divs1
@pytest.mark.parametrize("data_freq,divs", [("B", False), ("D", True), ("H", True)])
def test_shift_with_freq_PeriodIndex(data_freq, divs):
df = _compat.makeTimeDataFrame()
# PeriodIndex
df = df.set_index(pd.period_range("2000-01-01", periods=30, freq=data_freq))
ddf = dd.from_pandas(df, npartitions=4)
for d, p in [(ddf, df), (ddf.A, df.A)]:
res = d.shift(2, freq=data_freq)
assert_eq(res, p.shift(2, freq=data_freq))
assert res.known_divisions == divs
# PeriodIndex.shift doesn't have `freq` parameter
res = ddf.index.shift(2)
assert_eq(res, df.index.shift(2))
assert res.known_divisions == divs
df = _compat.makeTimeDataFrame()
with pytest.raises(ValueError):
ddf.index.shift(2, freq="D") # freq keyword not supported
def test_shift_with_freq_TimedeltaIndex():
df = _compat.makeTimeDataFrame()
# TimedeltaIndex
for data_freq in ["T", "D", "H"]:
df = df.set_index(_compat.makeTimedeltaIndex(30, freq=data_freq))
ddf = dd.from_pandas(df, npartitions=4)
for freq in ["S", pd.Timedelta(10, unit="h")]:
for d, p in [(ddf, df), (ddf.A, df.A), (ddf.index, df.index)]:
res = d.shift(2, freq=freq)
assert_eq(res, p.shift(2, freq=freq))
assert res.known_divisions
# Index shifts also work with freq=None
res = ddf.index.shift(2)
assert_eq(res, df.index.shift(2))
assert res.known_divisions
def test_shift_with_freq_errors():
# Other index types error
df = _compat.makeDataFrame()
ddf = dd.from_pandas(df, npartitions=4)
pytest.raises(NotImplementedError, lambda: ddf.shift(2, freq="S"))
pytest.raises(NotImplementedError, lambda: ddf.A.shift(2, freq="S"))
pytest.raises(NotImplementedError, lambda: ddf.index.shift(2))
@pytest.mark.parametrize("method", ["first", "last"])
def test_first_and_last(method):
f = lambda x, offset: getattr(x, method)(offset)
freqs = ["12h", "D"]
offsets = ["0d", "100h", "20d", "20B", "3W", "3M", "400d", "13M"]
for freq in freqs:
index = pd.date_range("1/1/2000", "1/1/2001", freq=freq)[::4]
df = pd.DataFrame(
np.random.random((len(index), 4)), index=index, columns=["A", "B", "C", "D"]
)
ddf = dd.from_pandas(df, npartitions=10)
for offset in offsets:
assert_eq(f(ddf, offset), f(df, offset))
assert_eq(f(ddf.A, offset), f(df.A, offset))
@pytest.mark.parametrize("npartitions", [1, 4, 20])
@pytest.mark.parametrize("split_every", [2, 5])
@pytest.mark.parametrize("split_out", [None, 1, 5, 20])
def test_hash_split_unique(npartitions, split_every, split_out):
from string import ascii_lowercase
s = pd.Series(np.random.choice(list(ascii_lowercase), 1000, replace=True))
ds = dd.from_pandas(s, npartitions=npartitions)
dropped = ds.unique(split_every=split_every, split_out=split_out)
dsk = dropped.__dask_optimize__(dropped.dask, dropped.__dask_keys__())
from dask.core import get_deps
dependencies, dependents = get_deps(dsk)
assert len([k for k, v in dependencies.items() if not v]) == npartitions
assert dropped.npartitions == (split_out or 1)
assert sorted(dropped.compute(scheduler="sync")) == sorted(s.unique())
@pytest.mark.parametrize("split_every", [None, 2])
def test_split_out_drop_duplicates(split_every):
x = np.concatenate([np.arange(10)] * 100)[:, None]
y = x.copy()
z = np.concatenate([np.arange(20)] * 50)[:, None]
rs = np.random.RandomState(1)
rs.shuffle(x)
rs.shuffle(y)
rs.shuffle(z)
df = pd.DataFrame(np.concatenate([x, y, z], axis=1), columns=["x", "y", "z"])
ddf = dd.from_pandas(df, npartitions=20)
for subset, keep in product([None, ["x", "z"]], ["first", "last"]):
sol = df.drop_duplicates(subset=subset, keep=keep)
res = ddf.drop_duplicates(
subset=subset, keep=keep, split_every=split_every, split_out=10
)
assert res.npartitions == 10
assert_eq(sol, res)
@pytest.mark.parametrize("split_every", [None, 2])
def test_split_out_value_counts(split_every):
df = pd.DataFrame({"x": [1, 2, 3] * 100})
ddf = dd.from_pandas(df, npartitions=5)
assert ddf.x.value_counts(split_out=10, split_every=split_every).npartitions == 10
assert_eq(
ddf.x.value_counts(split_out=10, split_every=split_every), df.x.value_counts()
)
def test_values():
from dask.array.utils import assert_eq
df = pd.DataFrame(
{"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]},
index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"),
)
ddf = dd.from_pandas(df, 2)
assert_eq(df.values, ddf.values)
assert_eq(df.x.values, ddf.x.values)
assert_eq(df.y.values, ddf.y.values)
assert_eq(df.index.values, ddf.index.values)
def test_copy():
df = pd.DataFrame({"x": [1, 2, 3]})
a = dd.from_pandas(df, npartitions=2)
b = a.copy()
c = a.copy(deep=False)
a["y"] = a.x * 2
assert_eq(b, df)
assert_eq(c, df)
deep_err = (
"The `deep` value must be False. This is strictly a shallow copy "
"of the underlying computational graph."
)
for deep in [True, None, ""]:
with pytest.raises(ValueError, match=deep_err):
a.copy(deep=deep)
def test_del():
df = pd.DataFrame(
{"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]},
index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"),
)
a = dd.from_pandas(df, 2)
b = a.copy()
del a["x"]
assert_eq(b, df)
del df["x"]
assert_eq(a, df)
@pytest.mark.parametrize("index", [True, False])
@pytest.mark.parametrize("deep", [True, False])
def test_memory_usage(index, deep):
df = pd.DataFrame({"x": [1, 2, 3], "y": [1.0, 2.0, 3.0], "z": ["a", "b", "c"]})
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(
df.memory_usage(index=index, deep=deep),
ddf.memory_usage(index=index, deep=deep),
)
assert (
df.x.memory_usage(index=index, deep=deep)
== ddf.x.memory_usage(index=index, deep=deep).compute()
)
@pytest.mark.parametrize("index", [True, False])
@pytest.mark.parametrize("deep", [True, False])
def test_memory_usage_per_partition(index, deep):
df = pd.DataFrame(
{
"x": [1, 2, 3, 4, 5],
"y": [1.0, 2.0, 3.0, 4.0, 5.0],
"z": ["a", "b", "c", "d", "e"],
}
)
ddf = dd.from_pandas(df, npartitions=2)
# DataFrame.memory_usage_per_partition
expected = pd.Series(
part.compute().memory_usage(index=index, deep=deep).sum()
for part in ddf.partitions
)
result = ddf.memory_usage_per_partition(index=index, deep=deep)
assert_eq(expected, result)
# Series.memory_usage_per_partition
expected = pd.Series(
part.x.compute().memory_usage(index=index, deep=deep) for part in ddf.partitions
)
result = ddf.x.memory_usage_per_partition(index=index, deep=deep)
assert_eq(expected, result)
@pytest.mark.parametrize(
"reduction",
[
"sum",
"mean",
"std",
"var",
"count",
"min",
"max",
"idxmin",
"idxmax",
"prod",
"all",
"sem",
],
)
def test_dataframe_reductions_arithmetic(reduction):
df = pd.DataFrame({"x": [1, 2, 3, 4, 5], "y": [1.1, 2.2, 3.3, 4.4, 5.5]})
ddf = dd.from_pandas(df, npartitions=3)
assert_eq(
ddf - (getattr(ddf, reduction)() + 1), df - (getattr(df, reduction)() + 1)
)
def test_dataframe_mode():
data = [["Tom", 10, 7], ["Farahn", 14, 7], ["Julie", 14, 5], ["Nick", 10, 10]]
df = pd.DataFrame(data, columns=["Name", "Num", "Num"])
ddf = dd.from_pandas(df, npartitions=3)
assert_eq(ddf.mode(), df.mode())
# name is not preserved in older pandas
assert_eq(ddf.Name.mode(), df.Name.mode(), check_names=PANDAS_GT_140)
# test empty
df = pd.DataFrame(columns=["a", "b"])
ddf = dd.from_pandas(df, npartitions=1)
# check_index=False should be removed once https://github.com/pandas-dev/pandas/issues/33321 is resolved.
assert_eq(ddf.mode(), df.mode(), check_index=False)
def test_datetime_loc_open_slicing():
dtRange = pd.date_range("01.01.2015", "05.05.2015")
df = pd.DataFrame(np.random.random((len(dtRange), 2)), index=dtRange)
ddf = dd.from_pandas(df, npartitions=5)
assert_eq(df.loc[:"02.02.2015"], ddf.loc[:"02.02.2015"])
assert_eq(df.loc["02.02.2015":], ddf.loc["02.02.2015":])
assert_eq(df[0].loc[:"02.02.2015"], ddf[0].loc[:"02.02.2015"])
assert_eq(df[0].loc["02.02.2015":], ddf[0].loc["02.02.2015":])
def test_to_datetime():
df = pd.DataFrame({"year": [2015, 2016], "month": [2, 3], "day": [4, 5]})
df.index.name = "ix"
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(pd.to_datetime(df), dd.to_datetime(ddf))
s = pd.Series(["3/11/2000", "3/12/2000", "3/13/2000"] * 100)
s.index = s.values
ds = dd.from_pandas(s, npartitions=10, sort=False)
assert_eq(
pd.to_datetime(s, infer_datetime_format=True),
dd.to_datetime(ds, infer_datetime_format=True),
)
assert_eq(
pd.to_datetime(s.index, infer_datetime_format=True),
dd.to_datetime(ds.index, infer_datetime_format=True),
check_divisions=False,
)
assert_eq(
pd.to_datetime(s, utc=True),
dd.to_datetime(ds, utc=True),
)
for arg in ("2021-08-03", 2021):
with pytest.raises(NotImplementedError, match="non-index-able arguments"):
dd.to_datetime(arg)
def test_to_timedelta():
s = pd.Series(range(10))
ds = dd.from_pandas(s, npartitions=2)
assert_eq(pd.to_timedelta(s), dd.to_timedelta(ds))
assert_eq(pd.to_timedelta(s, unit="h"), dd.to_timedelta(ds, unit="h"))
s = pd.Series([1, 2, "this will error"])
ds = dd.from_pandas(s, npartitions=2)
assert_eq(pd.to_timedelta(s, errors="coerce"), dd.to_timedelta(ds, errors="coerce"))
@pytest.mark.parametrize("values", [[np.NaN, 0], [1, 1]])
def test_isna(values):
s = pd.Series(values)
ds = dd.from_pandas(s, npartitions=2)
assert_eq(pd.isna(s), dd.isna(ds))
@pytest.mark.parametrize("drop", [0, 9])
def test_slice_on_filtered_boundary(drop):
# https://github.com/dask/dask/issues/2211
x = np.arange(10)
x[[5, 6]] -= 2
df = pd.DataFrame({"A": x, "B": np.arange(len(x))})
pdf = df.set_index("A").query(f"B != {drop}")
ddf = dd.from_pandas(df, 1).set_index("A").query(f"B != {drop}")
result = dd.concat([ddf, ddf.rename(columns={"B": "C"})], axis=1)
expected = pd.concat([pdf, pdf.rename(columns={"B": "C"})], axis=1)
assert_eq(result, expected)
def test_boundary_slice_nonmonotonic():
x = np.array([-1, -2, 2, 4, 3])
df = pd.DataFrame({"B": range(len(x))}, index=x)
result = methods.boundary_slice(df, 0, 4)
expected = df.iloc[2:]
tm.assert_frame_equal(result, expected)
result = methods.boundary_slice(df, -1, 4)
expected = df.drop(-2)
tm.assert_frame_equal(result, expected)
result = methods.boundary_slice(df, -2, 3)
expected = df.drop(4)
tm.assert_frame_equal(result, expected)
result = methods.boundary_slice(df, -2, 3.5)
expected = df.drop(4)
tm.assert_frame_equal(result, expected)
result = methods.boundary_slice(df, -2, 4)
expected = df
tm.assert_frame_equal(result, expected)
def test_boundary_slice_empty():
df = pd.DataFrame()
result = methods.boundary_slice(df, 1, 4)
expected = pd.DataFrame()
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize(
"start, stop, right_boundary, left_boundary, drop",
[
(-1, None, False, False, [-1, -2]),
(-1, None, False, True, [-2]),
(None, 3, False, False, [3, 4]),
(None, 3, True, False, [4]),
# Missing keys
(-0.5, None, False, False, [-1, -2]),
(-0.5, None, False, True, [-1, -2]),
(-1.5, None, False, True, [-2]),
(None, 3.5, False, False, [4]),
(None, 3.5, True, False, [4]),
(None, 2.5, False, False, [3, 4]),
],
)
def test_with_boundary(start, stop, right_boundary, left_boundary, drop):
x = np.array([-1, -2, 2, 4, 3])
df = pd.DataFrame({"B": range(len(x))}, index=x)
result = methods.boundary_slice(df, start, stop, right_boundary, left_boundary)
expected = df.drop(drop)
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize(
"index, left, right",
[
(range(10), 0, 9),
(range(10), -1, None),
(range(10), None, 10),
([-1, 0, 2, 1], None, None),
([-1, 0, 2, 1], -1, None),
([-1, 0, 2, 1], None, 2),
([-1, 0, 2, 1], -2, 3),
(pd.date_range("2017", periods=10), None, None),
(pd.date_range("2017", periods=10), pd.Timestamp("2017"), None),
(pd.date_range("2017", periods=10), None, pd.Timestamp("2017-01-10")),
(pd.date_range("2017", periods=10), pd.Timestamp("2016"), None),
(pd.date_range("2017", periods=10), None, pd.Timestamp("2018")),
],
)
def test_boundary_slice_same(index, left, right):
df = pd.DataFrame({"A": range(len(index))}, index=index)
result = methods.boundary_slice(df, left, right)
tm.assert_frame_equal(result, df)
def test_better_errors_object_reductions():
# GH2452
s = pd.Series(["a", "b", "c", "d"])
ds = dd.from_pandas(s, npartitions=2)
with pytest.raises(ValueError) as err:
ds.mean()
assert str(err.value) == "`mean` not supported with object series"
def test_sample_empty_partitions():
@dask.delayed
def make_df(n):
return pd.DataFrame(np.zeros((n, 4)), columns=list("abcd"))
ddf = dd.from_delayed([make_df(0), make_df(100), make_df(0)])
ddf2 = ddf.sample(frac=0.2)
# smoke test sample on empty partitions
res = ddf2.compute()
assert res.dtypes.equals(ddf2.dtypes)
def test_coerce():
df = pd.DataFrame(np.arange(100).reshape((10, 10)))
ddf = dd.from_pandas(df, npartitions=2)
funcs = (int, float, complex)
for d, t in product(funcs, (ddf, ddf[0])):
pytest.raises(TypeError, lambda: t(d))
def test_bool():
df = pd.DataFrame(np.arange(100).reshape((10, 10)))
ddf = dd.from_pandas(df, npartitions=2)
conditions = [ddf, ddf[0], ddf == ddf, ddf[0] == ddf[0]]
for cond in conditions:
with pytest.raises(ValueError):
bool(cond)
def test_cumulative_multiple_columns():
# GH 3037
df = pd.DataFrame(np.random.randn(100, 5), columns=list("abcde"))
ddf = dd.from_pandas(df, 5)
for d in [ddf, df]:
for c in df.columns:
d[c + "cs"] = d[c].cumsum()
d[c + "cmin"] = d[c].cummin()
d[c + "cmax"] = d[c].cummax()
d[c + "cp"] = d[c].cumprod()
assert_eq(ddf, df)
@pytest.mark.parametrize("func", [np.asarray, M.to_records])
def test_map_partition_array(func):
from dask.array.utils import assert_eq
df = pd.DataFrame(
{"x": [1, 2, 3, 4, 5], "y": [6.0, 7.0, 8.0, 9.0, 10.0]},
index=["a", "b", "c", "d", "e"],
)
ddf = dd.from_pandas(df, npartitions=2)
for pre in [lambda a: a, lambda a: a.x, lambda a: a.y, lambda a: a.index]:
try:
expected = func(pre(df))
except Exception:
continue
x = pre(ddf).map_partitions(func)
assert_eq(x, expected, check_type=False) # TODO: make check_type pass
assert isinstance(x, da.Array)
assert x.chunks[0] == (np.nan, np.nan)
def test_map_partition_sparse():
sparse = pytest.importorskip("sparse")
# Avoid searchsorted failure.
pytest.importorskip("numba", minversion="0.40.0")
df = pd.DataFrame(
{"x": [1, 2, 3, 4, 5], "y": [6.0, 7.0, 8.0, 9.0, 10.0]},
index=["a", "b", "c", "d", "e"],
)
ddf = dd.from_pandas(df, npartitions=2)
def f(d):
return sparse.COO(np.array(d))
for pre in [lambda a: a, lambda a: a.x]:
expected = f(pre(df))
result = pre(ddf).map_partitions(f)
assert isinstance(result, da.Array)
computed = result.compute()
assert (computed.data == expected.data).all()
assert (computed.coords == expected.coords).all()
def test_mixed_dask_array_operations():
df = pd.DataFrame({"x": [1, 2, 3]}, index=[4, 5, 6])
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(df.x + df.x.values, ddf.x + ddf.x.values)
assert_eq(df.x.values + df.x, ddf.x.values + ddf.x)
assert_eq(df.x + df.index.values, ddf.x + ddf.index.values)
assert_eq(df.index.values + df.x, ddf.index.values + ddf.x)
assert_eq(df.x + df.x.values.sum(), ddf.x + ddf.x.values.sum())
def test_mixed_dask_array_operations_errors():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5]}, index=[4, 5, 6, 7, 8])
ddf = dd.from_pandas(df, npartitions=2)
x = da.arange(5, chunks=((1, 4),))
x._chunks = ((np.nan, np.nan),)
with pytest.raises(ValueError):
(ddf.x + x).compute()
x = da.arange(5, chunks=((2, 2, 1),))
with pytest.raises(ValueError) as info:
ddf.x + x
assert "add" in str(info.value)
def test_mixed_dask_array_multi_dimensional():
df = pd.DataFrame(
{"x": [1, 2, 3, 4, 5], "y": [5.0, 6.0, 7.0, 8.0, 9.0]}, columns=["x", "y"]
)
ddf = dd.from_pandas(df, npartitions=2)
x = (df.values + 1).astype(float)
dx = (ddf.values + 1).astype(float)
assert_eq(ddf + dx + 1, df + x + 1)
assert_eq(ddf + dx.rechunk((None, 1)) + 1, df + x + 1)
assert_eq(ddf[["y", "x"]] + dx + 1, df[["y", "x"]] + x + 1)
def test_meta_raises():
# Raise when we use a user defined function
s = pd.Series(["abcd", "abcd"])
ds = dd.from_pandas(s, npartitions=2)
try:
ds.map(lambda x: x[3])
except ValueError as e:
assert "meta=" in str(e)
# But not otherwise
df = pd.DataFrame({"a": ["x", "y", "y"], "b": ["x", "y", "z"], "c": [1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=1)
with pytest.raises(Exception) as info:
ddf.a + ddf.c
assert "meta=" not in str(info.value)
def test_meta_nonempty_uses_meta_value_if_provided():
# https://github.com/dask/dask/issues/6958
base = pd.Series([1, 2, 3], dtype="datetime64[ns]")
offsets = pd.Series([pd.offsets.DateOffset(years=o) for o in range(3)])
dask_base = dd.from_pandas(base, npartitions=1)
dask_offsets = dd.from_pandas(offsets, npartitions=1)
dask_offsets._meta = offsets.head()
with pytest.warns(None): # not vectorized performance warning
expected = base + offsets
actual = dask_base + dask_offsets
assert_eq(expected, actual)
def test_dask_dataframe_holds_scipy_sparse_containers():
sparse = pytest.importorskip("scipy.sparse")
da = pytest.importorskip("dask.array")
x = da.random.random((1000, 10), chunks=(100, 10))
x[x < 0.9] = 0
df = dd.from_dask_array(x)
y = df.map_partitions(sparse.csr_matrix)
assert isinstance(y, da.Array)
vs = y.to_delayed().flatten().tolist()
values = dask.compute(*vs, scheduler="single-threaded")
assert all(isinstance(v, sparse.csr_matrix) for v in values)
def test_map_partitions_delays_large_inputs():
df = pd.DataFrame({"x": [1, 2, 3, 4]})
ddf = dd.from_pandas(df, npartitions=2)
big = np.ones(1000000)
b = ddf.map_partitions(lambda x, y: x, y=big)
assert any(big is v for v in b.dask.values())
a = ddf.map_partitions(lambda x, y: x, big)
assert any(big is v for v in a.dask.values())
def test_partitions_indexer():
df = pd.DataFrame({"x": range(10)})
ddf = dd.from_pandas(df, npartitions=5)
assert_eq(ddf.partitions[0], ddf.get_partition(0))
assert_eq(ddf.partitions[3], ddf.get_partition(3))
assert_eq(ddf.partitions[-1], ddf.get_partition(4))
assert ddf.partitions[:3].npartitions == 3
assert ddf.x.partitions[:3].npartitions == 3
assert ddf.x.partitions[::2].compute().tolist() == [0, 1, 4, 5, 8, 9]
def test_mod_eq():
df = pd.DataFrame({"a": [1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=1)
assert_eq(df, ddf)
assert_eq(df.a, ddf.a)
assert_eq(df.a + 2, ddf.a + 2)
assert_eq(df.a + 2 == 0, ddf.a + 2 == 0)
def test_setitem():
df = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
ddf = dd.from_pandas(df.copy(), 2)
df[df.columns] = 1
ddf[ddf.columns] = 1
assert_eq(df, ddf)
def test_setitem_with_bool_dataframe_as_key():
df = pd.DataFrame({"A": [1, 4], "B": [3, 2]})
ddf = dd.from_pandas(df.copy(), 2)
df[df > 2] = 5
ddf[ddf > 2] = 5
assert_eq(df, ddf)
def test_setitem_with_bool_series_as_key():
df = pd.DataFrame({"A": [1, 4], "B": [3, 2]})
ddf = dd.from_pandas(df.copy(), 2)
df[df["A"] > 2] = 5
ddf[ddf["A"] > 2] = 5
assert_eq(df, ddf)
def test_setitem_with_numeric_column_name_raises_not_implemented():
df = pd.DataFrame({0: [1, 4], 1: [3, 2]})
ddf = dd.from_pandas(df.copy(), 2)
# works for pandas
df[0] = 5
# raises error for dask
with pytest.raises(NotImplementedError, match="not supported"):
ddf[0] = 5
def test_broadcast():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5]})
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(ddf - (ddf.sum() + 1), df - (df.sum() + 1))
def test_scalar_with_array():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5]})
ddf = dd.from_pandas(df, npartitions=2)
da.utils.assert_eq(df.x.values + df.x.mean(), ddf.x.values + ddf.x.mean())
def test_has_parallel_type():
assert has_parallel_type(pd.DataFrame())
assert has_parallel_type(pd.Series(dtype=float))
assert not has_parallel_type(123)
def test_meta_error_message():
with pytest.raises(TypeError) as info:
dd.DataFrame({("x", 1): 123}, "x", pd.Series(dtype=float), [None, None])
assert "Series" in str(info.value)
assert "DataFrame" in str(info.value)
assert "pandas" in str(info.value)
def test_map_index():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5]})
ddf = dd.from_pandas(df, npartitions=2)
assert ddf.known_divisions is True
cleared = ddf.index.map(lambda x: x * 10)
assert cleared.known_divisions is False
applied = ddf.index.map(lambda x: x * 10, is_monotonic=True)
assert applied.known_divisions is True
assert applied.divisions == tuple(x * 10 for x in ddf.divisions)
def test_assign_index():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5]})
ddf = dd.from_pandas(df, npartitions=2)
ddf_copy = ddf.copy()
ddf.index = ddf.index * 10
expected = df.copy()
expected.index = expected.index * 10
assert_eq(ddf, expected)
assert_eq(ddf_copy, df)
def test_index_divisions():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5]})
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(ddf.index + 1, df.index + 1)
assert_eq(10 * ddf.index, 10 * df.index)
assert_eq(-ddf.index, -df.index)
def test_replace():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5]})
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(df.replace(1, 10), ddf.replace(1, 10))
assert_eq(df.replace({1: 10, 2: 20}), ddf.replace({1: 10, 2: 20}))
assert_eq(df.x.replace(1, 10), ddf.x.replace(1, 10))
assert_eq(df.x.replace({1: 10, 2: 20}), ddf.x.replace({1: 10, 2: 20}))
def test_map_partitions_delays_lists():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5]})
ddf = dd.from_pandas(df, npartitions=2)
L = list(range(100))
out = ddf.map_partitions(lambda x, y: x + sum(y), y=L)
assert any(str(L) == str(v) for v in out.__dask_graph__().values())
out = ddf.map_partitions(lambda x, y: x + sum(y), L)
assert any(str(L) == str(v) for v in out.__dask_graph__().values())
def test_dtype_cast():
df = pd.DataFrame(
{
"A": np.arange(10, dtype=np.int32),
"B": np.arange(10, dtype=np.int64),
"C": np.arange(10, dtype=np.float32),
}
)
ddf = dd.from_pandas(df, npartitions=2)
assert ddf.A.dtype == np.int32
assert ddf.B.dtype == np.int64
assert ddf.C.dtype == np.float32
col = pd.Series(np.arange(10, dtype=np.float32)) / 2
assert col.dtype == np.float32
ddf = ddf.assign(D=col)
assert ddf.D.dtype == np.float32
assert ddf.C.dtype == np.float32
# fails
assert ddf.B.dtype == np.int64
# fails
assert ddf.A.dtype == np.int32
@pytest.mark.parametrize("base_npart", [1, 4])
@pytest.mark.parametrize("map_npart", [1, 3])
@pytest.mark.parametrize("sorted_index", [False, True])
@pytest.mark.parametrize("sorted_map_index", [False, True])
def test_series_map(base_npart, map_npart, sorted_index, sorted_map_index):
base = pd.Series(
["".join(np.random.choice(["a", "b", "c"], size=3)) for x in range(100)]
)
if not sorted_index:
index = np.arange(100)
np.random.shuffle(index)
base.index = index
map_index = ["".join(x) for x in product("abc", repeat=3)]
mapper = pd.Series(np.random.randint(50, size=len(map_index)), index=map_index)
if not sorted_map_index:
map_index = np.array(map_index)
np.random.shuffle(map_index)
mapper.index = map_index
expected = base.map(mapper)
dask_base = dd.from_pandas(base, npartitions=base_npart, sort=False)
dask_map = dd.from_pandas(mapper, npartitions=map_npart, sort=False)
result = dask_base.map(dask_map)
dd.utils.assert_eq(expected, result)
def test_dataframe_explode():
df = pd.DataFrame({"A": [[1, 2, 3], "foo", [3, 4]], "B": 1})
exploded_df = df.explode("A")
ddf = dd.from_pandas(df, npartitions=2)
exploded_ddf = ddf.explode("A")
assert ddf.divisions == exploded_ddf.divisions
assert_eq(exploded_ddf.compute(), exploded_df)
def test_series_explode():
s = pd.Series([[1, 2, 3], "foo", [3, 4]])
exploded_s = s.explode()
ds = dd.from_pandas(s, npartitions=2)
exploded_ds = ds.explode()
assert_eq(exploded_ds, exploded_s)
assert ds.divisions == exploded_ds.divisions
def test_pop():
df = pd.DataFrame({"x": range(10), "y": range(10)})
ddf = dd.from_pandas(df, npartitions=2)
s = ddf.pop("y")
assert s.name == "y"
assert ddf.columns == ["x"]
assert_eq(ddf, df[["x"]])
@pytest.mark.parametrize("dropna", [True, False])
@pytest.mark.parametrize("axis", [0, 1])
def test_nunique(dropna, axis):
df = pd.DataFrame(
{"x": ["a", "a", "c"], "y": [None, 1, 2], "c": np.arange(0, 1, 0.4)}
)
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(ddf["y"].nunique(dropna=dropna), df["y"].nunique(dropna=dropna))
assert_eq(
ddf.nunique(dropna=dropna, axis=axis), df.nunique(dropna=dropna, axis=axis)
)
def test_view():
data = {
"x": pd.Series(range(5), dtype="int8"),
"y": pd.Series(
[
"2021-11-27 00:05:02.175274",
"2021-11-27 00:05:05.205596",
"2021-11-27 00:05:29.212572",
"2021-11-27 00:05:25.708343",
"2021-11-27 00:05:47.714958",
],
dtype="datetime64[ns]",
),
}
df = pd.DataFrame(data)
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(ddf["x"].view("uint8"), df["x"].view("uint8"))
assert_eq(ddf["y"].view("int64"), df["y"].view("int64"))
def test_simple_map_partitions():
data = {"col_0": [9, -3, 0, -1, 5], "col_1": [-2, -7, 6, 8, -5]}
df = pd.DataFrame(data)
ddf = dd.from_pandas(df, npartitions=2)
ddf = ddf.clip(-4, 6)
task = ddf.__dask_graph__()[ddf.__dask_keys__()[0]]
[v] = task[0].dsk.values()
assert v[0] == M.clip or v[1] == M.clip
def test_iter():
df = pd.DataFrame({"A": [1, 2, 3, 4], "B": [1, 2, 3, 4]})
ddf = dd.from_pandas(df, 2)
assert list(df) == list(ddf)
for col, expected in zip(ddf, ["A", "B"]):
assert col == expected
def test_dataframe_groupby_cumsum_agg_empty_partitions():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6, 7, 8]})
ddf = dd.from_pandas(df, npartitions=4)
assert_eq(ddf[ddf.x < 5].x.cumsum(), df[df.x < 5].x.cumsum())
assert_eq(ddf[ddf.x > 5].x.cumsum(), df[df.x > 5].x.cumsum())
def test_dataframe_groupby_cumprod_agg_empty_partitions():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6, 7, 8]})
ddf = dd.from_pandas(df, npartitions=4)
assert_eq(ddf[ddf.x < 5].x.cumprod(), df[df.x < 5].x.cumprod())
assert_eq(ddf[ddf.x > 5].x.cumprod(), df[df.x > 5].x.cumprod())
def test_fuse_roots():
pdf1 = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [3, 5, 2, 5, 7, 2, 4, 2, 4]}
)
ddf1 = dd.from_pandas(pdf1, 2)
pdf2 = pd.DataFrame({"a": [True, False, True] * 3, "b": [False, False, True] * 3})
ddf2 = dd.from_pandas(pdf2, 2)
res = ddf1.where(ddf2)
hlg = fuse_roots(res.__dask_graph__(), keys=res.__dask_keys__())
hlg.validate()
def test_attrs_dataframe():
df = pd.DataFrame({"A": [1, 2], "B": [3, 4], "C": [5, 6]})
df.attrs = {"date": "2020-10-16"}
ddf = dd.from_pandas(df, 2)
assert df.attrs == ddf.attrs
assert df.abs().attrs == ddf.abs().attrs
def test_attrs_series():
s = pd.Series([1, 2], name="A")
s.attrs["unit"] = "kg"
ds = dd.from_pandas(s, 2)
assert s.attrs == ds.attrs
assert s.fillna(1).attrs == ds.fillna(1).attrs
@pytest.mark.xfail(reason="df.iloc[:0] does not keep the series attrs")
def test_attrs_series_in_dataframes():
df = pd.DataFrame({"A": [1, 2], "B": [3, 4], "C": [5, 6]})
df.A.attrs["unit"] = "kg"
ddf = dd.from_pandas(df, 2)
# Fails because the pandas iloc method doesn't currently persist
# the attrs dict for series in a dataframe. Dask uses df.iloc[:0]
# when creating the _meta dataframe in make_meta_pandas(x, index=None).
# Should start xpassing when df.iloc works. Remove the xfail then.
assert df.A.attrs == ddf.A.attrs
def test_join_series():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6, 7, 8]})
ddf = dd.from_pandas(df, npartitions=1)
expected_df = dd.from_pandas(df.join(df["x"], lsuffix="_"), npartitions=1)
actual_df = ddf.join(ddf["x"], lsuffix="_")
assert_eq(actual_df, expected_df)
def test_dask_layers():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6, 7, 8]})
ddf = dd.from_pandas(df, npartitions=2)
assert ddf.dask.layers.keys() == {ddf._name}
assert ddf.dask.dependencies == {ddf._name: set()}
assert ddf.__dask_layers__() == (ddf._name,)
dds = ddf["x"]
assert dds.dask.layers.keys() == {ddf._name, dds._name}
assert dds.dask.dependencies == {ddf._name: set(), dds._name: {ddf._name}}
assert dds.__dask_layers__() == (dds._name,)
ddi = dds.min()
assert ddi.key[1:] == (0,)
# Note that the `min` operation will use two layers
# now that ACA uses uses HLG
assert {ddf._name, dds._name, ddi.key[0]}.issubset(ddi.dask.layers.keys())
assert len(ddi.dask.layers) == 4
assert ddi.dask.dependencies[ddf._name] == set()
assert ddi.dask.dependencies[dds._name] == {ddf._name}
assert len(ddi.dask.dependencies) == 4
assert ddi.__dask_layers__() == (ddi.key[0],)
def test_repr_html_dataframe_highlevelgraph():
pytest.importorskip("jinja2")
x = timeseries().shuffle("id", shuffle="tasks").head(compute=False)
hg = x.dask
assert xml.etree.ElementTree.fromstring(hg._repr_html_()) is not None
for layer in hg.layers.values():
assert xml.etree.ElementTree.fromstring(layer._repr_html_()) is not None
@pytest.mark.skipif(
not dd._compat.PANDAS_GT_120, reason="Float64 was introduced in pandas>=1.2"
)
def test_assign_na_float_columns():
# See https://github.com/dask/dask/issues/7156
df_pandas = pd.DataFrame({"a": [1.1]}, dtype="Float64")
df = dd.from_pandas(df_pandas, npartitions=1)
df = df.assign(new_col=df["a"])
assert df.compute()["a"].dtypes == "Float64"
assert df.compute()["new_col"].dtypes == "Float64"
def test_dot():
s1 = pd.Series([1, 2, 3, 4])
s2 = pd.Series([4, 5, 6, 6])
df = pd.DataFrame({"one": s1, "two": s2})
dask_s1 = dd.from_pandas(s1, npartitions=1)
dask_df = dd.from_pandas(df, npartitions=1)
dask_s2 = dd.from_pandas(s2, npartitions=1)
assert_eq(s1.dot(s2), dask_s1.dot(dask_s2))
assert_eq(s1.dot(df), dask_s1.dot(dask_df))
# With partitions
partitioned_s1 = dd.from_pandas(s1, npartitions=2)
partitioned_df = dd.from_pandas(df, npartitions=2)
partitioned_s2 = dd.from_pandas(s2, npartitions=2)
assert_eq(s1.dot(s2), partitioned_s1.dot(partitioned_s2))
assert_eq(s1.dot(df), partitioned_s1.dot(partitioned_df))
# Test passing meta kwarg
res = dask_s1.dot(dask_df, meta=pd.Series([1], name="test_series")).compute()
assert res.name == "test_series"
# Test validation of second operand
with pytest.raises(TypeError):
dask_s1.dot(da.array([1, 2, 3, 4]))
def test_dot_nan():
# Test that nan inputs match pandas' behavior
s1 = pd.Series([1, 2, 3, 4])
dask_s1 = dd.from_pandas(s1, npartitions=1)
s2 = pd.Series([np.nan, np.nan, np.nan, np.nan])
dask_s2 = dd.from_pandas(s2, npartitions=1)
df = pd.DataFrame({"one": s1, "two": s2})
dask_df = dd.from_pandas(df, npartitions=1)
assert_eq(s1.dot(s2), dask_s1.dot(dask_s2))
assert_eq(s2.dot(df), dask_s2.dot(dask_df))
def test_use_of_weakref_proxy():
"""Testing wrapping frames in proxy wrappers"""
df = pd.DataFrame({"data": [1, 2, 3]})
df_pxy = weakref.proxy(df)
ser = pd.Series({"data": [1, 2, 3]})
ser_pxy = weakref.proxy(ser)
assert is_dataframe_like(df_pxy)
assert is_series_like(ser_pxy)
assert dask.dataframe.groupby._cov_chunk(df_pxy, "data")
assert isinstance(
dask.dataframe.groupby._groupby_apply_funcs(df_pxy, "data", funcs=[]),
pd.DataFrame,
)
# Test wrapping each Dask dataframe chunk in a proxy
l = []
def f(x):
l.append(x) # Keep `x` alive
return weakref.proxy(x)
d = pd.DataFrame({"g": [0, 0, 1] * 3, "b": [1, 2, 3] * 3})
a = dd.from_pandas(d, npartitions=1)
a = a.map_partitions(f, meta=a._meta)
pxy = weakref.proxy(a)
res = pxy["b"].groupby(pxy["g"]).sum()
isinstance(res.compute(), pd.Series)
def test_is_monotonic_numeric():
s = pd.Series(range(20))
ds = dd.from_pandas(s, npartitions=5)
assert_eq(s.is_monotonic_increasing, ds.is_monotonic_increasing)
# `is_monotonic` was deprecated starting in `pandas=1.5.0`
with _check_warning(
PANDAS_GT_150, FutureWarning, message="is_monotonic is deprecated"
):
expected = s.is_monotonic
with _check_warning(
PANDAS_GT_150, FutureWarning, message="is_monotonic is deprecated"
):
result = ds.is_monotonic
assert_eq(expected, result)
s_2 = pd.Series(range(20, 0, -1))
ds_2 = dd.from_pandas(s_2, npartitions=5)
assert_eq(s_2.is_monotonic_decreasing, ds_2.is_monotonic_decreasing)
s_3 = pd.Series(list(range(0, 5)) + list(range(0, 20)))
ds_3 = dd.from_pandas(s_3, npartitions=5)
assert_eq(s_3.is_monotonic_increasing, ds_3.is_monotonic_increasing)
assert_eq(s_3.is_monotonic_decreasing, ds_3.is_monotonic_decreasing)
def test_is_monotonic_dt64():
s = pd.Series(pd.date_range("20130101", periods=10))
ds = dd.from_pandas(s, npartitions=5)
assert_eq(s.is_monotonic_increasing, ds.is_monotonic_increasing)
s_2 = pd.Series(list(reversed(s)))
ds_2 = dd.from_pandas(s_2, npartitions=5)
assert_eq(s_2.is_monotonic_decreasing, ds_2.is_monotonic_decreasing)
def test_index_is_monotonic_numeric():
s = pd.Series(1, index=range(20))
ds = dd.from_pandas(s, npartitions=5, sort=False)
assert_eq(s.index.is_monotonic_increasing, ds.index.is_monotonic_increasing)
# `is_monotonic` was deprecated starting in `pandas=1.5.0`
with _check_warning(
PANDAS_GT_150, FutureWarning, message="is_monotonic is deprecated"
):
expected = s.index.is_monotonic
with _check_warning(
PANDAS_GT_150, FutureWarning, message="is_monotonic is deprecated"
):
result = ds.index.is_monotonic
assert_eq(expected, result)
s_2 = pd.Series(1, index=range(20, 0, -1))
ds_2 = dd.from_pandas(s_2, npartitions=5, sort=False)
assert_eq(s_2.index.is_monotonic_decreasing, ds_2.index.is_monotonic_decreasing)
s_3 = pd.Series(1, index=list(range(0, 5)) + list(range(0, 20)))
ds_3 = dd.from_pandas(s_3, npartitions=5, sort=False)
assert_eq(s_3.index.is_monotonic_increasing, ds_3.index.is_monotonic_increasing)
assert_eq(s_3.index.is_monotonic_decreasing, ds_3.index.is_monotonic_decreasing)
def test_index_is_monotonic_dt64():
s = pd.Series(1, index=pd.date_range("20130101", periods=10))
ds = dd.from_pandas(s, npartitions=5, sort=False)
assert_eq(s.index.is_monotonic_increasing, ds.index.is_monotonic_increasing)
s_2 = pd.Series(1, index=list(reversed(s)))
ds_2 = dd.from_pandas(s_2, npartitions=5, sort=False)
assert_eq(s_2.index.is_monotonic_decreasing, ds_2.index.is_monotonic_decreasing)
def test_custom_map_reduce():
# Make sure custom map-reduce workflows can use
# the universal ACA code path with metadata
# that is not DataFrame-like.
# See: https://github.com/dask/dask/issues/8636
df = pd.DataFrame(columns=["a"], data=[[2], [4], [8]], index=[0, 1, 2])
ddf = dd.from_pandas(df, npartitions=2)
def map_fn(x):
return {"x": x, "y": x}
def reduce_fn(series):
merged = None
for mapped in series:
if merged is None:
merged = mapped.copy()
else:
merged["x"] += mapped["x"]
merged["y"] *= mapped["y"]
return merged
result = (
ddf["a"]
.map(map_fn, meta=("data", "object"))
.reduction(reduce_fn, aggregate=reduce_fn, meta=("data", "object"))
.compute()[0]
)
assert result == {"x": 14, "y": 64}