import warnings import numpy as np import pandas as pd import pytest import dask.dataframe as dd from dask.base import compute_as_if_collection from dask.dataframe._compat import PANDAS_GT_140, tm from dask.dataframe.core import _Frame from dask.dataframe.methods import concat from dask.dataframe.multi import ( _maybe_align_partitions, align_partitions, concat_indexed_dataframes, hash_join, merge_indexed_dataframes, ) from dask.dataframe.utils import ( assert_divisions, assert_eq, clear_known_categories, has_known_categories, make_meta, ) from dask.utils_test import hlg_layer_topological def test_align_partitions(): A = pd.DataFrame( {"x": [1, 2, 3, 4, 5, 6], "y": list("abdabd")}, index=[10, 20, 30, 40, 50, 60] ) a = dd.repartition(A, [10, 40, 60]) B = pd.DataFrame({"x": [1, 2, 3, 4], "y": list("abda")}, index=[30, 70, 80, 100]) b = dd.repartition(B, [30, 80, 100]) s = dd.core.Scalar({("s", 0): 10}, "s", "i8") (aa, bb), divisions, L = align_partitions(a, b) def _check(a, b, aa, bb): assert isinstance(a, dd.DataFrame) assert isinstance(b, dd.DataFrame) assert isinstance(aa, dd.DataFrame) assert isinstance(bb, dd.DataFrame) assert_eq(a, aa) assert_eq(b, bb) assert divisions == (10, 30, 40, 60, 80, 100) assert isinstance(L, list) assert len(divisions) == 1 + len(L) _check(a, b, aa, bb) assert L == [ [(aa._name, 0), (bb._name, 0)], [(aa._name, 1), (bb._name, 1)], [(aa._name, 2), (bb._name, 2)], [(aa._name, 3), (bb._name, 3)], [(aa._name, 4), (bb._name, 4)], ] (aa, ss, bb), divisions, L = align_partitions(a, s, b) _check(a, b, aa, bb) assert L == [ [(aa._name, 0), None, (bb._name, 0)], [(aa._name, 1), None, (bb._name, 1)], [(aa._name, 2), None, (bb._name, 2)], [(aa._name, 3), None, (bb._name, 3)], [(aa._name, 4), None, (bb._name, 4)], ] assert_eq(ss, 10) ldf = pd.DataFrame({"a": [1, 2, 3, 4, 5, 6, 7], "b": [7, 6, 5, 4, 3, 2, 1]}) rdf = pd.DataFrame({"c": [1, 2, 3, 4, 5, 6, 7], "d": [7, 6, 5, 4, 3, 2, 1]}) for lhs, rhs in [ (dd.from_pandas(ldf, 1), dd.from_pandas(rdf, 1)), (dd.from_pandas(ldf, 2), dd.from_pandas(rdf, 2)), (dd.from_pandas(ldf, 2), dd.from_pandas(rdf, 3)), (dd.from_pandas(ldf, 3), dd.from_pandas(rdf, 2)), ]: (lresult, rresult), div, parts = align_partitions(lhs, rhs) assert_eq(lresult, ldf) assert_eq(rresult, rdf) # different index ldf = pd.DataFrame( {"a": [1, 2, 3, 4, 5, 6, 7], "b": [7, 6, 5, 4, 3, 2, 1]}, index=list("abcdefg") ) rdf = pd.DataFrame( {"c": [1, 2, 3, 4, 5, 6, 7], "d": [7, 6, 5, 4, 3, 2, 1]}, index=list("fghijkl") ) for lhs, rhs in [ (dd.from_pandas(ldf, 1), dd.from_pandas(rdf, 1)), (dd.from_pandas(ldf, 2), dd.from_pandas(rdf, 2)), (dd.from_pandas(ldf, 2), dd.from_pandas(rdf, 3)), (dd.from_pandas(ldf, 3), dd.from_pandas(rdf, 2)), ]: (lresult, rresult), div, parts = align_partitions(lhs, rhs) assert_eq(lresult, ldf) assert_eq(rresult, rdf) def test_align_partitions_unknown_divisions(): df = pd.DataFrame({"a": [1, 2, 3, 4, 5, 6, 7], "b": [7, 6, 5, 4, 3, 2, 1]}) # One known, one unknown ddf = dd.from_pandas(df, npartitions=2) ddf2 = dd.from_pandas(df, npartitions=2, sort=False) assert not ddf2.known_divisions with pytest.raises(ValueError): align_partitions(ddf, ddf2) # Both unknown ddf = dd.from_pandas(df + 1, npartitions=2, sort=False) ddf2 = dd.from_pandas(df, npartitions=2, sort=False) assert not ddf.known_divisions assert not ddf2.known_divisions with pytest.raises(ValueError): align_partitions(ddf, ddf2) def test__maybe_align_partitions(): df = pd.DataFrame({"a": [1, 2, 3, 4, 5, 6, 7], "b": [7, 6, 5, 4, 3, 2, 1]}) # Both known, same divisions ddf = dd.from_pandas(df + 1, npartitions=2) ddf2 = dd.from_pandas(df, npartitions=2) a, b = _maybe_align_partitions([ddf, ddf2]) assert a is ddf assert b is ddf2 # Both unknown, same divisions ddf = dd.from_pandas(df + 1, npartitions=2, sort=False) ddf2 = dd.from_pandas(df, npartitions=2, sort=False) assert not ddf.known_divisions assert not ddf2.known_divisions a, b = _maybe_align_partitions([ddf, ddf2]) assert a is ddf assert b is ddf2 # Both known, different divisions ddf = dd.from_pandas(df + 1, npartitions=2) ddf2 = dd.from_pandas(df, npartitions=3) a, b = _maybe_align_partitions([ddf, ddf2]) assert a.divisions == b.divisions # Both unknown, different divisions ddf = dd.from_pandas(df + 1, npartitions=2, sort=False) ddf2 = dd.from_pandas(df, npartitions=3, sort=False) assert not ddf.known_divisions assert not ddf2.known_divisions with pytest.raises(ValueError): _maybe_align_partitions([ddf, ddf2]) # One known, one unknown ddf = dd.from_pandas(df, npartitions=2) ddf2 = dd.from_pandas(df, npartitions=2, sort=False) assert not ddf2.known_divisions with pytest.raises(ValueError): _maybe_align_partitions([ddf, ddf2]) def test_merge_indexed_dataframe_to_indexed_dataframe(): A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6]}, index=[1, 2, 3, 4, 6, 7]) a = dd.repartition(A, [1, 4, 7]) B = pd.DataFrame({"y": list("abcdef")}, index=[1, 2, 4, 5, 6, 8]) b = dd.repartition(B, [1, 2, 5, 8]) c = merge_indexed_dataframes(a, b, how="left") assert c.divisions[0] == a.divisions[0] assert c.divisions[-1] == max(a.divisions + b.divisions) assert_eq(c, A.join(B)) c = merge_indexed_dataframes(a, b, how="right") assert c.divisions[0] == b.divisions[0] assert c.divisions[-1] == b.divisions[-1] assert_eq(c, A.join(B, how="right")) c = merge_indexed_dataframes(a, b, how="inner") assert c.divisions[0] == 1 assert c.divisions[-1] == max(a.divisions + b.divisions) assert_eq(c.compute(), A.join(B, how="inner")) c = merge_indexed_dataframes(a, b, how="outer") assert c.divisions[0] == 1 assert c.divisions[-1] == 8 assert_eq(c.compute(), A.join(B, how="outer")) assert sorted(merge_indexed_dataframes(a, b, how="inner").dask) == sorted( merge_indexed_dataframes(a, b, how="inner").dask ) assert sorted(merge_indexed_dataframes(a, b, how="inner").dask) != sorted( merge_indexed_dataframes(a, b, how="outer").dask ) def list_eq(aa, bb): if isinstance(aa, dd.DataFrame): a = aa.compute(scheduler="sync") else: a = aa if isinstance(bb, dd.DataFrame): b = bb.compute(scheduler="sync") else: b = bb tm.assert_index_equal(a.columns, b.columns) if isinstance(a, pd.DataFrame): av = a.sort_values(list(a.columns)).values bv = b.sort_values(list(b.columns)).values else: av = a.sort_values().values bv = b.sort_values().values dd._compat.assert_numpy_array_equal(av, bv) @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"]) def test_hash_join(how, shuffle_method): A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]}) a = dd.repartition(A, [0, 4, 5]) B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]}) b = dd.repartition(B, [0, 2, 5]) c = hash_join(a, "y", b, "y", how) result = c.compute() expected = pd.merge(A, B, how, "y") list_eq(result, expected) # Different columns and npartitions c = hash_join(a, "x", b, "z", "outer", npartitions=3, shuffle=shuffle_method) assert c.npartitions == 3 result = c.compute(scheduler="single-threaded") expected = pd.merge(A, B, "outer", None, "x", "z") list_eq(result, expected) assert ( hash_join(a, "y", b, "y", "inner", shuffle=shuffle_method)._name == hash_join(a, "y", b, "y", "inner", shuffle=shuffle_method)._name ) assert ( hash_join(a, "y", b, "y", "inner", shuffle=shuffle_method)._name != hash_join(a, "y", b, "y", "outer", shuffle=shuffle_method)._name ) def test_sequential_joins(): # Pandas version of multiple inner joins df1 = pd.DataFrame( {"key": list(range(6)), "A": ["A0", "A1", "A2", "A3", "A4", "A5"]} ) df2 = pd.DataFrame({"key": list(range(4)), "B": ["B0", "B1", "B2", "B3"]}) df3 = pd.DataFrame({"key": list(range(1, 5)), "C": ["C0", "C1", "C2", "C3"]}) join_pd = df1.join(df2, how="inner", lsuffix="_l", rsuffix="_r") multi_join_pd = join_pd.join(df3, how="inner", lsuffix="_l", rsuffix="_r") # Dask version of multiple inner joins ddf1 = dd.from_pandas(df1, npartitions=3) ddf2 = dd.from_pandas(df2, npartitions=2) ddf3 = dd.from_pandas(df3, npartitions=2) join_dd = ddf1.join(ddf2, how="inner", lsuffix="_l", rsuffix="_r") multi_join_dd = join_dd.join(ddf3, how="inner", lsuffix="_l", rsuffix="_r") assert_eq(multi_join_pd, multi_join_dd) def test_merge_asof_indexed(): A = pd.DataFrame( {"left_val": list("abcd" * 3)}, index=[1, 3, 7, 9, 10, 13, 14, 17, 20, 24, 25, 28], ) a = dd.from_pandas(A, npartitions=4) B = pd.DataFrame( {"right_val": list("xyz" * 4)}, index=[1, 2, 3, 6, 7, 10, 12, 14, 16, 19, 23, 26], ) b = dd.from_pandas(B, npartitions=3) C = pd.merge_asof(A, B, left_index=True, right_index=True) c = dd.merge_asof(a, b, left_index=True, right_index=True) assert_eq(c, C) def test_merge_asof_on_basic(): A = pd.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]}) a = dd.from_pandas(A, npartitions=2) B = pd.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 7]}) b = dd.from_pandas(B, npartitions=2) C = pd.merge_asof(A, B, on="a") c = dd.merge_asof(a, b, on="a") assert_eq(c, C) @pytest.mark.parametrize("allow_exact_matches", [True, False]) @pytest.mark.parametrize("direction", ["backward", "forward", "nearest"]) def test_merge_asof_on(allow_exact_matches, direction): A = pd.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]}) a = dd.from_pandas(A, npartitions=2) B = pd.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 7]}) b = dd.from_pandas(B, npartitions=2) C = pd.merge_asof( A, B, on="a", allow_exact_matches=allow_exact_matches, direction=direction ) c = dd.merge_asof( a, b, on="a", allow_exact_matches=allow_exact_matches, direction=direction ) assert_eq(c, C) @pytest.mark.parametrize("allow_exact_matches", [True, False]) @pytest.mark.parametrize("direction", ["backward", "forward", "nearest"]) @pytest.mark.parametrize("unknown_divisions", [False, True]) def test_merge_asof_left_on_right_index( allow_exact_matches, direction, unknown_divisions ): A = pd.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]}, index=[10, 20, 30]) a = dd.from_pandas(A, npartitions=2) B = pd.DataFrame({"right_val": [2, 3, 6, 7]}, index=[2, 3, 6, 7]) b = dd.from_pandas(B, npartitions=2) if unknown_divisions: a.divisions = (None,) * len(a.divisions) C = pd.merge_asof( A, B, left_on="a", right_index=True, allow_exact_matches=allow_exact_matches, direction=direction, ) c = dd.merge_asof( a, b, left_on="a", right_index=True, allow_exact_matches=allow_exact_matches, direction=direction, ) assert_eq(c, C) for nparts in [1, 2, 3]: for a1, idx2 in ( ([5, 10, 15, 20], [1, 2, 3, 4]), ([1, 2, 3, 4], [5, 10, 15, 20]), ([5, 5, 10, 10, 15, 15], [4, 5, 6, 9, 10, 11, 14, 15, 16]), ([5, 10, 15], [4, 4, 5, 5, 6, 6, 9, 9, 10, 10, 11, 11]), ): A = pd.DataFrame({"a": a1}, index=[x * 10 for x in a1]) a = dd.from_pandas(A, npartitions=nparts) B = pd.DataFrame({"b": idx2}, index=idx2) b = dd.from_pandas(B, npartitions=nparts) if unknown_divisions: a.divisions = (None,) * len(a.divisions) C = pd.merge_asof( A, B, left_on="a", right_index=True, allow_exact_matches=allow_exact_matches, direction=direction, ) c = dd.merge_asof( a, b, left_on="a", right_index=True, allow_exact_matches=allow_exact_matches, direction=direction, ) assert_eq(c, C) def test_merge_asof_indexed_two_partitions(): A = pd.DataFrame({"left_val": ["a", "b", "c"]}, index=[1, 5, 10]) a = dd.from_pandas(A, npartitions=2) B = pd.DataFrame({"right_val": [1, 2, 3, 6, 7]}, index=[1, 2, 3, 6, 7]) b = dd.from_pandas(B, npartitions=2) C = pd.merge_asof(A, B, left_index=True, right_index=True) c = dd.merge_asof(a, b, left_index=True, right_index=True) assert_eq(c, C) def test_merge_asof_on_by(): times_A = [ pd.to_datetime(d) for d in [ "2016-05-25 13:30:00.023", "2016-05-25 13:30:00.023", "2016-05-25 13:30:00.030", "2016-05-25 13:30:00.041", "2016-05-25 13:30:00.048", "2016-05-25 13:30:00.049", "2016-05-25 13:30:00.072", "2016-05-25 13:30:00.075", ] ] tickers_A = ["GOOG", "MSFT", "MSFT", "MSFT", "GOOG", "AAPL", "GOOG", "MSFT"] bids_A = [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 52.01] asks_A = [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 52.03] times_B = [ pd.to_datetime(d) for d in [ "2016-05-25 13:30:00.023", "2016-05-25 13:30:00.038", "2016-05-25 13:30:00.048", "2016-05-25 13:30:00.048", "2016-05-25 13:30:00.048", ] ] tickers_B = ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"] prices_B = [51.95, 51.95, 720.77, 720.92, 98.00] quantities_B = [75, 155, 100, 100, 100] A = pd.DataFrame( {"time": times_A, "ticker": tickers_A, "bid": bids_A, "ask": asks_A}, columns=["time", "ticker", "bid", "ask"], ) a = dd.from_pandas(A, npartitions=4) B = pd.DataFrame( { "time": times_B, "ticker": tickers_B, "price": prices_B, "quantity": quantities_B, }, columns=["time", "ticker", "price", "quantity"], ) b = dd.from_pandas(B, npartitions=3) C = pd.merge_asof(B, A, on="time", by="ticker") c = dd.merge_asof(b, a, on="time", by="ticker") assert_eq(c, C, check_index=False) def test_merge_asof_on_by_tolerance(): times_A = [ pd.to_datetime(d) for d in [ "2016-05-25 13:30:00.023", "2016-05-25 13:30:00.023", "2016-05-25 13:30:00.030", "2016-05-25 13:30:00.041", "2016-05-25 13:30:00.048", "2016-05-25 13:30:00.049", "2016-05-25 13:30:00.072", "2016-05-25 13:30:00.075", ] ] tickers_A = ["GOOG", "MSFT", "MSFT", "MSFT", "GOOG", "AAPL", "GOOG", "MSFT"] bids_A = [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 52.01] asks_A = [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 52.03] times_B = [ pd.to_datetime(d) for d in [ "2016-05-25 13:30:00.023", "2016-05-25 13:30:00.038", "2016-05-25 13:30:00.048", "2016-05-25 13:30:00.048", "2016-05-25 13:30:00.048", ] ] tickers_B = ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"] prices_B = [51.95, 51.95, 720.77, 720.92, 98.00] quantities_B = [75, 155, 100, 100, 100] A = pd.DataFrame( {"time": times_A, "ticker": tickers_A, "bid": bids_A, "ask": asks_A}, columns=["time", "ticker", "bid", "ask"], ) a = dd.from_pandas(A, npartitions=4) B = pd.DataFrame( { "time": times_B, "ticker": tickers_B, "price": prices_B, "quantity": quantities_B, }, columns=["time", "ticker", "price", "quantity"], ) b = dd.from_pandas(B, npartitions=3) C = pd.merge_asof(B, A, on="time", by="ticker", tolerance=pd.Timedelta("2ms")) c = dd.merge_asof(b, a, on="time", by="ticker", tolerance=pd.Timedelta("2ms")) assert_eq(c, C, check_index=False) def test_merge_asof_on_by_tolerance_no_exact_matches(): times_A = [ pd.to_datetime(d) for d in [ "2016-05-25 13:30:00.023", "2016-05-25 13:30:00.023", "2016-05-25 13:30:00.030", "2016-05-25 13:30:00.041", "2016-05-25 13:30:00.048", "2016-05-25 13:30:00.049", "2016-05-25 13:30:00.072", "2016-05-25 13:30:00.075", ] ] tickers_A = ["GOOG", "MSFT", "MSFT", "MSFT", "GOOG", "AAPL", "GOOG", "MSFT"] bids_A = [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 52.01] asks_A = [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 52.03] times_B = [ pd.to_datetime(d) for d in [ "2016-05-25 13:30:00.023", "2016-05-25 13:30:00.038", "2016-05-25 13:30:00.048", "2016-05-25 13:30:00.048", "2016-05-25 13:30:00.048", ] ] tickers_B = ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"] prices_B = [51.95, 51.95, 720.77, 720.92, 98.00] quantities_B = [75, 155, 100, 100, 100] A = pd.DataFrame( {"time": times_A, "ticker": tickers_A, "bid": bids_A, "ask": asks_A}, columns=["time", "ticker", "bid", "ask"], ) a = dd.from_pandas(A, npartitions=4) B = pd.DataFrame( { "time": times_B, "ticker": tickers_B, "price": prices_B, "quantity": quantities_B, }, columns=["time", "ticker", "price", "quantity"], ) b = dd.from_pandas(B, npartitions=3) C = pd.merge_asof( B, A, on="time", by="ticker", tolerance=pd.Timedelta("10ms"), allow_exact_matches=False, ) c = dd.merge_asof( b, a, on="time", by="ticker", tolerance=pd.Timedelta("10ms"), allow_exact_matches=False, ) assert_eq(c, C, check_index=False) def test_merge_asof_unsorted_raises(): A = pd.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]}) a = dd.from_pandas(A, npartitions=2) B = pd.DataFrame({"a": [2, 1, 3, 6, 7], "right_val": [1, 2, 3, 6, 7]}) b = dd.from_pandas(B, npartitions=2) result = dd.merge_asof(a, b, on="a") # raise at runtime with pytest.raises(ValueError, match="right keys"): result.compute() def test_merge_asof_with_empty(): good_df = pd.DataFrame({"index_col": list(range(10)), "good_val": list(range(10))}) good_dd = dd.from_pandas(good_df, npartitions=2) empty_df = ( good_df[good_df.index_col < 0].copy().rename(columns={"good_val": "empty_val"}) ) empty_dd = dd.from_pandas(empty_df, npartitions=2) # left good, right empty result_dd = dd.merge_asof( good_dd.set_index("index_col"), empty_dd.set_index("index_col"), left_index=True, right_index=True, ) result_df = pd.merge_asof( good_df.set_index("index_col"), empty_df.set_index("index_col"), left_index=True, right_index=True, ) assert_eq(result_dd, result_df, check_index=False) # left empty, right good result_dd = dd.merge_asof( empty_dd.set_index("index_col"), good_dd.set_index("index_col"), left_index=True, right_index=True, ) result_df = pd.merge_asof( empty_df.set_index("index_col"), good_df.set_index("index_col"), left_index=True, right_index=True, ) assert_eq(result_dd, result_df, check_index=False) # left/right both empty result_dd = dd.merge_asof( empty_dd.set_index("index_col"), empty_dd.set_index("index_col"), left_index=True, right_index=True, ) result_df = pd.merge_asof( empty_df.set_index("index_col"), empty_df.set_index("index_col"), left_index=True, right_index=True, ) assert_eq(result_dd, result_df, check_index=False) @pytest.mark.parametrize("join", ["inner", "outer"]) def test_indexed_concat(join): A = pd.DataFrame( {"x": [1, 2, 3, 4, 6, 7], "y": list("abcdef")}, index=[1, 2, 3, 4, 6, 7] ) a = dd.repartition(A, [1, 4, 7]) B = pd.DataFrame({"x": [10, 20, 40, 50, 60, 80]}, index=[1, 2, 4, 5, 6, 8]) b = dd.repartition(B, [1, 2, 5, 8]) expected = pd.concat([A, B], axis=0, join=join, sort=False) with warnings.catch_warnings(): warnings.simplefilter("ignore", FutureWarning) result = concat_indexed_dataframes([a, b], join=join) assert_eq(result, expected) assert sorted(concat_indexed_dataframes([a, b], join=join).dask) == sorted( concat_indexed_dataframes([a, b], join=join).dask ) assert sorted(concat_indexed_dataframes([a, b], join="inner").dask) != sorted( concat_indexed_dataframes([a, b], join="outer").dask ) @pytest.mark.parametrize("join", ["inner", "outer"]) def test_concat(join): pdf1 = pd.DataFrame( {"x": [1, 2, 3, 4, 6, 7], "y": list("abcdef")}, index=[1, 2, 3, 4, 6, 7] ) ddf1 = dd.from_pandas(pdf1, 2) pdf2 = pd.DataFrame( {"x": [1, 2, 3, 4, 6, 7], "y": list("abcdef")}, index=[8, 9, 10, 11, 12, 13] ) ddf2 = dd.from_pandas(pdf2, 2) # different columns pdf3 = pd.DataFrame( {"x": [1, 2, 3, 4, 6, 7], "z": list("abcdef")}, index=[8, 9, 10, 11, 12, 13] ) ddf3 = dd.from_pandas(pdf3, 2) kwargs = {"sort": False} for (dd1, dd2, pd1, pd2) in [ (ddf1, ddf2, pdf1, pdf2), (ddf1, ddf3, pdf1, pdf3), ]: expected = pd.concat([pd1, pd2], join=join, **kwargs) result = dd.concat([dd1, dd2], join=join, **kwargs) assert_eq(result, expected) # test outer only, inner has a problem on pandas side for (dd1, dd2, pd1, pd2) in [ (ddf1, ddf2, pdf1, pdf2), (ddf1, ddf3, pdf1, pdf3), (ddf1.x, ddf2.x, pdf1.x, pdf2.x), (ddf1.x, ddf3.z, pdf1.x, pdf3.z), (ddf1.x, ddf2.x, pdf1.x, pdf2.x), (ddf1.x, ddf3.z, pdf1.x, pdf3.z), ]: expected = pd.concat([pd1, pd2], **kwargs) result = dd.concat([dd1, dd2], **kwargs) assert_eq(result, expected) @pytest.mark.parametrize( "value_1, value_2", [ (1.0, 1), (1.0, "one"), (1.0, pd.to_datetime("1970-01-01")), (1, "one"), (1, pd.to_datetime("1970-01-01")), ("one", pd.to_datetime("1970-01-01")), ], ) def test_concat_different_dtypes(value_1, value_2): # check that the resulting dataframe has coherent dtypes # refer to https://github.com/dask/dask/issues/4685 and # https://github.com/dask/dask/issues/5968 df_1 = pd.DataFrame({"x": [value_1]}) df_2 = pd.DataFrame({"x": [value_2]}) df = pd.concat([df_1, df_2], axis=0) pandas_dtype = df["x"].dtype ddf_1 = dd.from_pandas(df_1, npartitions=1) ddf_2 = dd.from_pandas(df_2, npartitions=1) ddf = dd.concat([ddf_1, ddf_2], axis=0) dask_dtypes = list(ddf.map_partitions(lambda x: x.dtypes).compute()) assert dask_dtypes == [pandas_dtype, pandas_dtype] @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"]) @pytest.mark.parametrize("on_index", [True, False]) def test_merge_columns_dtypes(how, on_index): # tests results of merges with merge columns having different dtypes; # asserts that either the merge was successful or the corresponding warning is raised # addresses issue #4574 df1 = pd.DataFrame( {"A": list(np.arange(5).astype(float)) * 2, "B": list(np.arange(5)) * 2} ) df2 = pd.DataFrame({"A": np.arange(5), "B": np.arange(5)}) a = dd.from_pandas(df1, 2) # merge column "A" is float b = dd.from_pandas(df2, 2) # merge column "A" is int on = ["A"] left_index = right_index = on_index if on_index: a = a.set_index("A") b = b.set_index("A") on = None with pytest.warns(None) as record: result = dd.merge( a, b, on=on, how=how, left_index=left_index, right_index=right_index ) warned = any("merge column data type mismatches" in str(r) for r in record) # result type depends on merge operation -> convert to pandas result = result if isinstance(result, pd.DataFrame) else result.compute() has_nans = result.isna().values.any() assert (has_nans and warned) or not has_nans @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"]) def test_merge(how, shuffle_method): A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]}) a = dd.repartition(A, [0, 4, 5]) B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]}) b = dd.repartition(B, [0, 2, 5]) assert_eq( dd.merge( a, b, left_index=True, right_index=True, how=how, shuffle=shuffle_method ), pd.merge(A, B, left_index=True, right_index=True, how=how), ) result = dd.merge(a, b, on="y", how=how) list_eq(result, pd.merge(A, B, on="y", how=how)) assert all(d is None for d in result.divisions) list_eq( dd.merge(a, b, left_on="x", right_on="z", how=how, shuffle=shuffle_method), pd.merge(A, B, left_on="x", right_on="z", how=how), ) list_eq( dd.merge( a, b, left_on="x", right_on="z", how=how, suffixes=("1", "2"), shuffle=shuffle_method, ), pd.merge(A, B, left_on="x", right_on="z", how=how, suffixes=("1", "2")), ) list_eq(dd.merge(a, b, how=how, shuffle=shuffle_method), pd.merge(A, B, how=how)) list_eq(dd.merge(a, B, how=how, shuffle=shuffle_method), pd.merge(A, B, how=how)) list_eq(dd.merge(A, b, how=how, shuffle=shuffle_method), pd.merge(A, B, how=how)) list_eq(dd.merge(A, B, how=how, shuffle=shuffle_method), pd.merge(A, B, how=how)) list_eq( dd.merge( a, b, left_index=True, right_index=True, how=how, shuffle=shuffle_method ), pd.merge(A, B, left_index=True, right_index=True, how=how), ) list_eq( dd.merge( a, b, left_index=True, right_index=True, how=how, suffixes=("1", "2"), shuffle=shuffle_method, ), pd.merge(A, B, left_index=True, right_index=True, how=how, suffixes=("1", "2")), ) list_eq( dd.merge(a, b, left_on="x", right_index=True, how=how, shuffle=shuffle_method), pd.merge(A, B, left_on="x", right_index=True, how=how), ) list_eq( dd.merge( a, b, left_on="x", right_index=True, how=how, suffixes=("1", "2"), shuffle=shuffle_method, ), pd.merge(A, B, left_on="x", right_index=True, how=how, suffixes=("1", "2")), ) # pandas result looks buggy # list_eq(dd.merge(a, B, left_index=True, right_on='y'), # pd.merge(A, B, left_index=True, right_on='y')) @pytest.mark.parametrize("parts", [(3, 3), (3, 1), (1, 3)]) @pytest.mark.parametrize("how", ["leftsemi", "leftanti"]) @pytest.mark.parametrize( "engine", [ pytest.param( "pandas", marks=pytest.mark.xfail( reason="Pandas does not support leftsemi or leftanti" ), ), pytest.param("cudf", marks=pytest.mark.gpu), ], ) def test_merge_tasks_semi_anti_cudf(engine, how, parts): if engine == "cudf": # NOTE: engine == "cudf" requires cudf/dask_cudf, # will be skipped by non-GPU CI. cudf = pytest.importorskip("cudf") dask_cudf = pytest.importorskip("dask_cudf") emp = pd.DataFrame( { "emp_id": np.arange(101, stop=106), "name": ["John", "Tom", "Harry", "Rahul", "Sakil"], "city": ["Cal", "Mum", "Del", "Ban", "Del"], "salary": [50000, 40000, 80000, 60000, 90000], } ) skills = pd.DataFrame( { "skill_id": [404, 405, 406, 407, 408], "emp_id": [103, 101, 105, 102, 101], "skill_name": ["Dask", "Spark", "C", "Python", "R"], } ) if engine == "cudf": emp = cudf.from_pandas(emp) skills = cudf.from_pandas(skills) dd_emp = dask_cudf.from_cudf(emp, npartitions=parts[0]) dd_skills = dask_cudf.from_cudf(skills, npartitions=parts[1]) else: dd_emp = dd.from_pandas(emp, npartitions=parts[0]) dd_skills = dd.from_pandas(skills, npartitions=parts[1]) expect = emp.merge(skills, on="emp_id", how=how).sort_values(["emp_id"]) result = dd_emp.merge(dd_skills, on="emp_id", how=how).sort_values(["emp_id"]) assert_eq(result, expect, check_index=False) def test_merge_tasks_passes_through(): a = pd.DataFrame({"a": [1, 2, 3, 4, 5, 6, 7], "b": [7, 6, 5, 4, 3, 2, 1]}) b = pd.DataFrame({"c": [1, 2, 3, 4, 5, 6, 7], "d": [7, 6, 5, 4, 3, 2, 1]}) aa = dd.from_pandas(a, npartitions=3) bb = dd.from_pandas(b, npartitions=2) cc = aa.merge(bb, left_on="a", right_on="d", shuffle="tasks") assert not any("partd" in k[0] for k in cc.dask) @pytest.mark.slow @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"]) def test_merge_by_index_patterns(how, shuffle_method): pdf1l = pd.DataFrame({"a": [1, 2, 3, 4, 5, 6, 7], "b": [7, 6, 5, 4, 3, 2, 1]}) pdf1r = pd.DataFrame({"c": [1, 2, 3, 4, 5, 6, 7], "d": [7, 6, 5, 4, 3, 2, 1]}) pdf2l = pd.DataFrame( {"a": [1, 2, 3, 4, 5, 6, 7], "b": [7, 6, 5, 4, 3, 2, 1]}, index=list("abcdefg") ) pdf2r = pd.DataFrame( {"c": [7, 6, 5, 4, 3, 2, 1], "d": [7, 6, 5, 4, 3, 2, 1]}, index=list("abcdefg") ) pdf3l = pdf2l pdf3r = pd.DataFrame({"c": [6, 7, 8, 9], "d": [5, 4, 3, 2]}, index=list("abdg")) pdf4l = pdf2l pdf4r = pd.DataFrame({"c": [9, 10, 11, 12], "d": [5, 4, 3, 2]}, index=list("abdg")) # completely different index pdf5l = pd.DataFrame( {"a": [1, 1, 2, 2, 3, 3, 4], "b": [7, 6, 5, 4, 3, 2, 1]}, index=list("lmnopqr") ) pdf5r = pd.DataFrame({"c": [1, 1, 1, 1], "d": [5, 4, 3, 2]}, index=list("abcd")) pdf6l = pd.DataFrame( {"a": [1, 1, 2, 2, 3, 3, 4], "b": [7, 6, 5, 4, 3, 2, 1]}, index=list("cdefghi") ) pdf6r = pd.DataFrame({"c": [1, 2, 1, 2], "d": [5, 4, 3, 2]}, index=list("abcd")) pdf7l = pd.DataFrame( {"a": [1, 1, 2, 2, 3, 3, 4], "b": [7, 6, 5, 4, 3, 2, 1]}, index=list("abcdefg") ) pdf7r = pd.DataFrame({"c": [5, 6, 7, 8], "d": [5, 4, 3, 2]}, index=list("fghi")) def pd_merge(left, right, **kwargs): # Workaround pandas bug where output dtype of empty index will be int64 # even if input was object. out = pd.merge(left, right, **kwargs) if len(out) == 0: return out.set_index(out.index.astype(left.index.dtype)) return out for pdl, pdr in [ (pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r), (pdf4l, pdf4r), (pdf5l, pdf5r), (pdf6l, pdf6r), (pdf7l, pdf7r), ]: for lpart, rpart in [ (2, 2), # same partition (3, 2), # left npartition > right npartition (2, 3), ]: # left npartition < right npartition ddl = dd.from_pandas(pdl, lpart) ddr = dd.from_pandas(pdr, rpart) assert_eq( dd.merge( ddl, ddr, how=how, left_index=True, right_index=True, shuffle=shuffle_method, ), pd_merge(pdl, pdr, how=how, left_index=True, right_index=True), ) assert_eq( dd.merge( ddr, ddl, how=how, left_index=True, right_index=True, shuffle=shuffle_method, ), pd_merge(pdr, pdl, how=how, left_index=True, right_index=True), ) assert_eq( dd.merge( ddl, ddr, how=how, left_index=True, right_index=True, shuffle=shuffle_method, indicator=True, ), pd_merge( pdl, pdr, how=how, left_index=True, right_index=True, indicator=True ), ) assert_eq( dd.merge( ddr, ddl, how=how, left_index=True, right_index=True, shuffle=shuffle_method, indicator=True, ), pd_merge( pdr, pdl, how=how, left_index=True, right_index=True, indicator=True ), ) assert_eq( ddr.merge( ddl, how=how, left_index=True, right_index=True, shuffle=shuffle_method, ), pdr.merge(pdl, how=how, left_index=True, right_index=True), ) assert_eq( ddl.merge( ddr, how=how, left_index=True, right_index=True, shuffle=shuffle_method, ), pdl.merge(pdr, how=how, left_index=True, right_index=True), ) # hash join list_eq( dd.merge( ddl, ddr, how=how, left_on="a", right_on="c", shuffle=shuffle_method ), pd.merge(pdl, pdr, how=how, left_on="a", right_on="c"), ) list_eq( dd.merge( ddl, ddr, how=how, left_on="b", right_on="d", shuffle=shuffle_method ), pd.merge(pdl, pdr, how=how, left_on="b", right_on="d"), ) list_eq( dd.merge( ddr, ddl, how=how, left_on="c", right_on="a", shuffle=shuffle_method, indicator=True, ), pd.merge(pdr, pdl, how=how, left_on="c", right_on="a", indicator=True), ) list_eq( dd.merge( ddr, ddl, how=how, left_on="d", right_on="b", shuffle=shuffle_method, indicator=True, ), pd.merge(pdr, pdl, how=how, left_on="d", right_on="b", indicator=True), ) list_eq( dd.merge( ddr, ddl, how=how, left_on="c", right_on="a", shuffle=shuffle_method ), pd.merge(pdr, pdl, how=how, left_on="c", right_on="a"), ) list_eq( dd.merge( ddr, ddl, how=how, left_on="d", right_on="b", shuffle=shuffle_method ), pd.merge(pdr, pdl, how=how, left_on="d", right_on="b"), ) list_eq( ddl.merge( ddr, how=how, left_on="a", right_on="c", shuffle=shuffle_method ), pdl.merge(pdr, how=how, left_on="a", right_on="c"), ) list_eq( ddl.merge( ddr, how=how, left_on="b", right_on="d", shuffle=shuffle_method ), pdl.merge(pdr, how=how, left_on="b", right_on="d"), ) list_eq( ddr.merge( ddl, how=how, left_on="c", right_on="a", shuffle=shuffle_method ), pdr.merge(pdl, how=how, left_on="c", right_on="a"), ) list_eq( ddr.merge( ddl, how=how, left_on="d", right_on="b", shuffle=shuffle_method ), pdr.merge(pdl, how=how, left_on="d", right_on="b"), ) @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"]) def test_join_by_index_patterns(how, shuffle_method): # Similar test cases as test_merge_by_index_patterns, # but columns / index for join have same dtype pdf1l = pd.DataFrame( {"a": list("abcdefg"), "b": [7, 6, 5, 4, 3, 2, 1]}, index=list("abcdefg") ) pdf1r = pd.DataFrame( {"c": list("abcdefg"), "d": [7, 6, 5, 4, 3, 2, 1]}, index=list("abcdefg") ) pdf2l = pdf1l pdf2r = pd.DataFrame( {"c": list("gfedcba"), "d": [7, 6, 5, 4, 3, 2, 1]}, index=list("abcdefg") ) pdf3l = pdf1l pdf3r = pd.DataFrame({"c": list("abdg"), "d": [5, 4, 3, 2]}, index=list("abdg")) pdf4l = pd.DataFrame( {"a": list("abcabce"), "b": [7, 6, 5, 4, 3, 2, 1]}, index=list("abcdefg") ) pdf4r = pd.DataFrame({"c": list("abda"), "d": [5, 4, 3, 2]}, index=list("abdg")) # completely different index pdf5l = pd.DataFrame( {"a": list("lmnopqr"), "b": [7, 6, 5, 4, 3, 2, 1]}, index=list("lmnopqr") ) pdf5r = pd.DataFrame({"c": list("abcd"), "d": [5, 4, 3, 2]}, index=list("abcd")) pdf6l = pd.DataFrame( {"a": list("cdefghi"), "b": [7, 6, 5, 4, 3, 2, 1]}, index=list("cdefghi") ) pdf6r = pd.DataFrame({"c": list("abab"), "d": [5, 4, 3, 2]}, index=list("abcd")) pdf7l = pd.DataFrame( {"a": list("aabbccd"), "b": [7, 6, 5, 4, 3, 2, 1]}, index=list("abcdefg") ) pdf7r = pd.DataFrame({"c": list("aabb"), "d": [5, 4, 3, 2]}, index=list("fghi")) for pdl, pdr in [ (pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r), (pdf4l, pdf4r), (pdf5l, pdf5r), (pdf6l, pdf6r), (pdf7l, pdf7r), ]: for lpart, rpart in [(2, 2), (3, 2), (2, 3)]: ddl = dd.from_pandas(pdl, lpart) ddr = dd.from_pandas(pdr, rpart) assert_eq( ddl.join(ddr, how=how, shuffle=shuffle_method), pdl.join(pdr, how=how) ) assert_eq( ddr.join(ddl, how=how, shuffle=shuffle_method), pdr.join(pdl, how=how) ) assert_eq( ddl.join( ddr, how=how, lsuffix="l", rsuffix="r", shuffle=shuffle_method ), pdl.join(pdr, how=how, lsuffix="l", rsuffix="r"), ) assert_eq( ddr.join( ddl, how=how, lsuffix="l", rsuffix="r", shuffle=shuffle_method ), pdr.join(pdl, how=how, lsuffix="l", rsuffix="r"), ) """ # temporary disabled bacause pandas may incorrectly raise # IndexError for empty DataFrame # https://github.com/pydata/pandas/pull/10826 list_assert_eq(ddl.join(ddr, how=how, on='a', lsuffix='l', rsuffix='r'), pdl.join(pdr, how=how, on='a', lsuffix='l', rsuffix='r')) list_eq(ddr.join(ddl, how=how, on='c', lsuffix='l', rsuffix='r'), pdr.join(pdl, how=how, on='c', lsuffix='l', rsuffix='r')) # merge with index and columns list_eq(ddl.merge(ddr, how=how, left_on='a', right_index=True), pdl.merge(pdr, how=how, left_on='a', right_index=True)) list_eq(ddr.merge(ddl, how=how, left_on='c', right_index=True), pdr.merge(pdl, how=how, left_on='c', right_index=True)) list_eq(ddl.merge(ddr, how=how, left_index=True, right_on='c'), pdl.merge(pdr, how=how, left_index=True, right_on='c')) list_eq(ddr.merge(ddl, how=how, left_index=True, right_on='a'), pdr.merge(pdl, how=how, left_index=True, right_on='a')) """ def test_join_gives_proper_divisions(): # https://github.com/dask/dask/issues/8113 df = pd.DataFrame({"a": ["a", "b", "c"]}, index=[0, 1, 2]) ddf = dd.from_pandas(df, npartitions=1) right_df = pd.DataFrame({"b": [1.0, 2.0, 3.0]}, index=["a", "b", "c"]) expected = df.join(right_df, how="inner", on="a") actual = ddf.join(right_df, how="inner", on="a") assert actual.divisions == ddf.divisions assert_eq(expected, actual) @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"]) def test_merge_by_multiple_columns(how, shuffle_method): # warnings here from pandas pdf1l = pd.DataFrame( { "a": list("abcdefghij"), "b": list("abcdefghij"), "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], }, index=list("abcdefghij"), ) pdf1r = pd.DataFrame( { "d": list("abcdefghij"), "e": list("abcdefghij"), "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1], }, index=list("abcdefghij"), ) pdf2l = pd.DataFrame( { "a": list("abcdeabcde"), "b": list("abcabcabca"), "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], }, index=list("abcdefghij"), ) pdf2r = pd.DataFrame( { "d": list("edcbaedcba"), "e": list("aaabbbcccd"), "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1], }, index=list("fghijklmno"), ) pdf3l = pd.DataFrame( { "a": list("aaaaaaaaaa"), "b": list("aaaaaaaaaa"), "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], }, index=list("abcdefghij"), ) pdf3r = pd.DataFrame( { "d": list("aaabbbccaa"), "e": list("abbbbbbbbb"), "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1], }, index=list("ABCDEFGHIJ"), ) for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]: for lpart, rpart in [(2, 2), (3, 2), (2, 3)]: ddl = dd.from_pandas(pdl, lpart) ddr = dd.from_pandas(pdr, rpart) assert_eq( ddl.join(ddr, how=how, shuffle=shuffle_method), pdl.join(pdr, how=how) ) assert_eq( ddr.join(ddl, how=how, shuffle=shuffle_method), pdr.join(pdl, how=how) ) assert_eq( dd.merge( ddl, ddr, how=how, left_index=True, right_index=True, shuffle=shuffle_method, ), pd.merge(pdl, pdr, how=how, left_index=True, right_index=True), ) assert_eq( dd.merge( ddr, ddl, how=how, left_index=True, right_index=True, shuffle=shuffle_method, ), pd.merge(pdr, pdl, how=how, left_index=True, right_index=True), ) # hash join list_eq( dd.merge( ddl, ddr, how=how, left_on="a", right_on="d", shuffle=shuffle_method ), pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"), ) list_eq( dd.merge( ddl, ddr, how=how, left_on="b", right_on="e", shuffle=shuffle_method ), pd.merge(pdl, pdr, how=how, left_on="b", right_on="e"), ) list_eq( dd.merge( ddr, ddl, how=how, left_on="d", right_on="a", shuffle=shuffle_method ), pd.merge(pdr, pdl, how=how, left_on="d", right_on="a"), ) list_eq( dd.merge( ddr, ddl, how=how, left_on="e", right_on="b", shuffle=shuffle_method ), pd.merge(pdr, pdl, how=how, left_on="e", right_on="b"), ) list_eq( dd.merge( ddl, ddr, how=how, left_on=["a", "b"], right_on=["d", "e"], shuffle=shuffle_method, ), pd.merge(pdl, pdr, how=how, left_on=["a", "b"], right_on=["d", "e"]), ) def test_melt(): pdf = pd.DataFrame( {"A": list("abcd") * 5, "B": list("XY") * 10, "C": np.random.randn(20)} ) ddf = dd.from_pandas(pdf, 4) list_eq(dd.melt(ddf), pd.melt(pdf)) list_eq(dd.melt(ddf, id_vars="C"), pd.melt(pdf, id_vars="C")) list_eq(dd.melt(ddf, value_vars="C"), pd.melt(pdf, value_vars="C")) list_eq( dd.melt(ddf, value_vars=["A", "C"], var_name="myvar"), pd.melt(pdf, value_vars=["A", "C"], var_name="myvar"), ) list_eq( dd.melt(ddf, id_vars="B", value_vars=["A", "C"], value_name="myval"), pd.melt(pdf, id_vars="B", value_vars=["A", "C"], value_name="myval"), ) # test again as DataFrame method list_eq(ddf.melt(), pdf.melt()) list_eq(ddf.melt(id_vars="C"), pdf.melt(id_vars="C")) list_eq(ddf.melt(value_vars="C"), pdf.melt(value_vars="C")) list_eq( ddf.melt(value_vars=["A", "C"], var_name="myvar"), pdf.melt(value_vars=["A", "C"], var_name="myvar"), ) list_eq( ddf.melt(id_vars="B", value_vars=["A", "C"], value_name="myval"), pdf.melt(id_vars="B", value_vars=["A", "C"], value_name="myval"), ) def test_cheap_inner_merge_with_pandas_object(): a = pd.DataFrame( {"x": [1, 2, 3, 4, 5, 6], "y": list("abdabd")}, index=[10, 20, 30, 40, 50, 60] ) da = dd.from_pandas(a, npartitions=3) b = pd.DataFrame({"x": [1, 2, 3, 4], "z": list("abda")}) dc = da.merge(b, on="x", how="inner") assert not hlg_layer_topological(dc.dask, -1).is_materialized() assert all("shuffle" not in k[0] for k in dc.dask) list_eq(da.merge(b, on="x", how="inner"), a.merge(b, on="x", how="inner")) @pytest.mark.parametrize("flip", [False, True]) def test_cheap_single_partition_merge(flip): a = pd.DataFrame( {"x": [1, 2, 3, 4, 5, 6], "y": list("abdabd")}, index=[10, 20, 30, 40, 50, 60] ) aa = dd.from_pandas(a, npartitions=3) b = pd.DataFrame({"x": [1, 2, 3, 4], "z": list("abda")}) bb = dd.from_pandas(b, npartitions=1, sort=False) pd_inputs = (b, a) if flip else (a, b) inputs = (bb, aa) if flip else (aa, bb) cc = dd.merge(*inputs, on="x", how="inner") assert not hlg_layer_topological(cc.dask, -1).is_materialized() assert all("shuffle" not in k[0] for k in cc.dask) assert len(cc.dask) == len(aa.dask) * 2 + len(bb.dask) list_eq(cc, pd.merge(*pd_inputs, on="x", how="inner")) def test_cheap_single_partition_merge_divisions(): a = pd.DataFrame( {"x": [1, 2, 3, 4, 5, 6], "y": list("abdabd")}, index=[10, 20, 30, 40, 50, 60] ) aa = dd.from_pandas(a, npartitions=3) b = pd.DataFrame({"x": [1, 2, 3, 4], "z": list("abda")}) bb = dd.from_pandas(b, npartitions=1, sort=False) actual = aa.merge(bb, on="x", how="inner") assert not hlg_layer_topological(actual.dask, -1).is_materialized() assert not actual.known_divisions assert_divisions(actual) actual = bb.merge(aa, on="x", how="inner") assert not actual.known_divisions assert_divisions(actual) @pytest.mark.parametrize("how", ["left", "right"]) @pytest.mark.parametrize("flip", [False, True]) def test_cheap_single_parition_merge_left_right(how, flip): a = pd.DataFrame({"x": range(8), "z": list("ababbdda")}, index=range(8)) aa = dd.from_pandas(a, npartitions=1) b = pd.DataFrame({"x": [1, 2, 3, 4], "z": list("abda")}, index=range(4)) bb = dd.from_pandas(b, npartitions=1) pd_inputs = (b, a) if flip else (a, b) inputs = (bb, aa) if flip else (aa, bb) actual = dd.merge(*inputs, left_index=True, right_on="x", how=how) expected = pd.merge(*pd_inputs, left_index=True, right_on="x", how=how) assert not hlg_layer_topological(actual.dask, -1).is_materialized() assert_eq(actual, expected) actual = dd.merge(*inputs, left_on="x", right_index=True, how=how) expected = pd.merge(*pd_inputs, left_on="x", right_index=True, how=how) assert not hlg_layer_topological(actual.dask, -1).is_materialized() assert_eq(actual, expected) def test_cheap_single_partition_merge_on_index(): a = pd.DataFrame( {"x": [1, 2, 3, 4, 5, 6], "y": list("abdabd")}, index=[10, 20, 30, 40, 50, 60] ) aa = dd.from_pandas(a, npartitions=3) b = pd.DataFrame({"x": [1, 2, 3, 4], "z": list("abda")}) bb = dd.from_pandas(b, npartitions=1, sort=False) actual = aa.merge(bb, left_index=True, right_on="x", how="inner") expected = a.merge(b, left_index=True, right_on="x", how="inner") # Workaround https://github.com/pandas-dev/pandas/issues/26925 # actual has the correct dtype for the index (Int64). Pandas as object-dtype # for empty joins. expected.index = expected.index.astype("int64") assert not hlg_layer_topological(actual.dask, -1).is_materialized() assert not actual.known_divisions assert_eq(actual, expected) actual = bb.merge(aa, right_index=True, left_on="x", how="inner") expected = b.merge(a, right_index=True, left_on="x", how="inner") expected.index = expected.index.astype("int64") assert not hlg_layer_topological(actual.dask, -1).is_materialized() assert not actual.known_divisions assert_eq(actual, expected) def test_merge_maintains_columns(): lhs = pd.DataFrame( {"A": [1, 2, 3], "B": list("abc"), "C": "foo", "D": 1.0}, columns=list("DCBA") ) rhs = pd.DataFrame( {"G": [4, 5], "H": 6.0, "I": "bar", "B": list("ab")}, columns=list("GHIB") ) ddf = dd.from_pandas(lhs, npartitions=1) merged = dd.merge(ddf, rhs, on="B").compute() assert tuple(merged.columns) == ("D", "C", "B", "A", "G", "H", "I") def test_merge_index_without_divisions(shuffle_method): a = pd.DataFrame({"x": [1, 2, 3, 4, 5]}, index=[1, 2, 3, 4, 5]) b = pd.DataFrame({"y": [1, 2, 3, 4, 5]}, index=[5, 4, 3, 2, 1]) aa = dd.from_pandas(a, npartitions=3, sort=False) bb = dd.from_pandas(b, npartitions=2) result = aa.join(bb, how="inner", shuffle=shuffle_method) expected = a.join(b, how="inner") assert_eq(result, expected) def test_half_indexed_dataframe_avoids_shuffle(): a = pd.DataFrame({"x": np.random.randint(100, size=1000)}) b = pd.DataFrame( {"y": np.random.randint(100, size=100)}, index=np.random.randint(100, size=100) ) aa = dd.from_pandas(a, npartitions=100) bb = dd.from_pandas(b, npartitions=2) c = pd.merge(a, b, left_index=True, right_on="y") cc = dd.merge(aa, bb, left_index=True, right_on="y", shuffle="tasks") list_eq(c, cc) assert len(cc.dask) < 500 def test_errors_for_merge_on_frame_columns(): a = pd.DataFrame({"x": [1, 2, 3, 4, 5]}, index=[1, 2, 3, 4, 5]) b = pd.DataFrame({"y": [1, 2, 3, 4, 5]}, index=[5, 4, 3, 2, 1]) aa = dd.from_pandas(a, npartitions=3, sort=False) bb = dd.from_pandas(b, npartitions=2) with pytest.raises(NotImplementedError): dd.merge(aa, bb, left_on="x", right_on=bb.y) with pytest.raises(NotImplementedError): dd.merge(aa, bb, left_on=aa.x, right_on=bb.y) def test_concat_one_series(): a = pd.Series([1, 2, 3, 4]) aa = dd.from_pandas(a, npartitions=2, sort=False) c = dd.concat([aa], axis=0) assert isinstance(c, dd.Series) c = dd.concat([aa], axis=1) assert isinstance(c, dd.DataFrame) def test_concat_unknown_divisions(): a = pd.Series([1, 2, 3, 4]) b = pd.Series([4, 3, 2, 1]) aa = dd.from_pandas(a, npartitions=2, sort=False) bb = dd.from_pandas(b, npartitions=2, sort=False) assert not aa.known_divisions with pytest.warns(UserWarning): assert_eq(pd.concat([a, b], axis=1), dd.concat([aa, bb], axis=1)) cc = dd.from_pandas(b, npartitions=1, sort=False) with pytest.raises(ValueError): dd.concat([aa, cc], axis=1) with pytest.warns(None) as record: dd.concat([aa, bb], axis=1, ignore_unknown_divisions=True) assert len(record) == 0 def test_concat_unknown_divisions_errors(): a = pd.Series([1, 2, 3, 4, 5, 6]) b = pd.Series([4, 3, 2, 1]) aa = dd.from_pandas(a, npartitions=2, sort=False) bb = dd.from_pandas(b, npartitions=2, sort=False) with pytest.raises(ValueError): with pytest.warns(UserWarning): # Concat with unknown divisions dd.concat([aa, bb], axis=1).compute() def test_concat2(): dsk = { ("x", 0): pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}), ("x", 1): pd.DataFrame({"a": [4, 5, 6], "b": [3, 2, 1]}), ("x", 2): pd.DataFrame({"a": [7, 8, 9], "b": [0, 0, 0]}), } meta = make_meta({"a": "i8", "b": "i8"}, parent_meta=pd.DataFrame()) a = dd.DataFrame(dsk, "x", meta, [None, None]) dsk = { ("y", 0): pd.DataFrame({"a": [10, 20, 30], "b": [40, 50, 60]}), ("y", 1): pd.DataFrame({"a": [40, 50, 60], "b": [30, 20, 10]}), ("y", 2): pd.DataFrame({"a": [70, 80, 90], "b": [0, 0, 0]}), } b = dd.DataFrame(dsk, "y", meta, [None, None]) dsk = { ("y", 0): pd.DataFrame({"b": [10, 20, 30], "c": [40, 50, 60]}), ("y", 1): pd.DataFrame({"b": [40, 50, 60], "c": [30, 20, 10]}), } meta = make_meta({"b": "i8", "c": "i8"}, parent_meta=pd.DataFrame()) c = dd.DataFrame(dsk, "y", meta, [None, None]) dsk = { ("y", 0): pd.DataFrame( {"b": [10, 20, 30], "c": [40, 50, 60], "d": [70, 80, 90]} ), ("y", 1): pd.DataFrame( {"b": [40, 50, 60], "c": [30, 20, 10], "d": [90, 80, 70]}, index=[3, 4, 5] ), } meta = make_meta( {"b": "i8", "c": "i8", "d": "i8"}, index=pd.Index([], "i8"), parent_meta=pd.DataFrame(), ) d = dd.DataFrame(dsk, "y", meta, [0, 3, 5]) cases = [[a, b], [a, c], [a, d]] assert dd.concat([a]) is a for case in cases: pdcase = [_c.compute() for _c in case] with warnings.catch_warnings(record=True) as w: expected = pd.concat(pdcase, sort=False) ctx = FutureWarning if w else None with pytest.warns(ctx): result = dd.concat(case) assert result.npartitions == case[0].npartitions + case[1].npartitions assert result.divisions == (None,) * (result.npartitions + 1) assert_eq(expected, result) with warnings.catch_warnings(): warnings.simplefilter("ignore", FutureWarning) assert set(result.dask) == set(dd.concat(case).dask) with warnings.catch_warnings(record=True) as w: expected = pd.concat(pdcase, join="inner", sort=False) ctx = FutureWarning if w else None with pytest.warns(ctx): result = dd.concat(case, join="inner") assert result.npartitions == case[0].npartitions + case[1].npartitions assert result.divisions == (None,) * (result.npartitions + 1) assert_eq(result, result) with warnings.catch_warnings(): warnings.simplefilter("ignore", FutureWarning) assert set(result.dask) == set(dd.concat(case, join="inner").dask) def test_concat3(): pdf1 = pd.DataFrame( np.random.randn(6, 5), columns=list("ABCDE"), index=list("abcdef") ) pdf2 = pd.DataFrame( np.random.randn(6, 5), columns=list("ABCFG"), index=list("ghijkl") ) pdf3 = pd.DataFrame( np.random.randn(6, 5), columns=list("ABCHI"), index=list("mnopqr") ) ddf1 = dd.from_pandas(pdf1, 2) ddf2 = dd.from_pandas(pdf2, 3) ddf3 = dd.from_pandas(pdf3, 2) with warnings.catch_warnings(record=True) as w: expected = pd.concat([pdf1, pdf2], sort=False) ctx = FutureWarning if w else None with pytest.warns(ctx): result = dd.concat([ddf1, ddf2]) assert result.divisions == ddf1.divisions[:-1] + ddf2.divisions assert result.npartitions == ddf1.npartitions + ddf2.npartitions assert_eq(result, expected) with warnings.catch_warnings(): warnings.simplefilter("ignore", FutureWarning) assert_eq( dd.concat([ddf1, ddf2], interleave_partitions=True), pd.concat([pdf1, pdf2]) ) with warnings.catch_warnings(record=True) as w: expected = pd.concat([pdf1, pdf2, pdf3], sort=False) ctx = FutureWarning if w else None with pytest.warns(ctx): result = dd.concat([ddf1, ddf2, ddf3]) assert result.divisions == ( ddf1.divisions[:-1] + ddf2.divisions[:-1] + ddf3.divisions ) assert result.npartitions == ( ddf1.npartitions + ddf2.npartitions + ddf3.npartitions ) assert_eq(result, expected) with warnings.catch_warnings(): warnings.simplefilter("ignore", FutureWarning) assert_eq( dd.concat([ddf1, ddf2, ddf3], interleave_partitions=True), pd.concat([pdf1, pdf2, pdf3]), ) def test_concat4_interleave_partitions(): pdf1 = pd.DataFrame( np.random.randn(10, 5), columns=list("ABCDE"), index=list("abcdefghij") ) pdf2 = pd.DataFrame( np.random.randn(13, 5), columns=list("ABCDE"), index=list("fghijklmnopqr") ) pdf3 = pd.DataFrame( np.random.randn(13, 6), columns=list("CDEXYZ"), index=list("fghijklmnopqr") ) ddf1 = dd.from_pandas(pdf1, 2) ddf2 = dd.from_pandas(pdf2, 3) ddf3 = dd.from_pandas(pdf3, 2) msg = ( "All inputs have known divisions which cannot be " "concatenated in order. Specify " "interleave_partitions=True to ignore order" ) cases = [ [ddf1, ddf1], [ddf1, ddf2], [ddf1, ddf3], [ddf2, ddf1], [ddf2, ddf3], [ddf3, ddf1], [ddf3, ddf2], ] for case in cases: pdcase = [c.compute() for c in case] assert_eq( dd.concat(case, interleave_partitions=True), pd.concat(pdcase, sort=False) ) assert_eq( dd.concat(case, join="inner", interleave_partitions=True), pd.concat(pdcase, join="inner", sort=False), ) msg = "'join' must be 'inner' or 'outer'" with pytest.raises(ValueError) as err: dd.concat([ddf1, ddf1], join="invalid", interleave_partitions=True) assert msg in str(err.value) def test_concat5(): pdf1 = pd.DataFrame( np.random.randn(7, 5), columns=list("ABCDE"), index=list("abcdefg") ) pdf2 = pd.DataFrame( np.random.randn(7, 6), columns=list("FGHIJK"), index=list("abcdefg") ) pdf3 = pd.DataFrame( np.random.randn(7, 6), columns=list("FGHIJK"), index=list("cdefghi") ) pdf4 = pd.DataFrame( np.random.randn(7, 5), columns=list("FGHAB"), index=list("cdefghi") ) pdf5 = pd.DataFrame( np.random.randn(7, 5), columns=list("FGHAB"), index=list("fklmnop") ) ddf1 = dd.from_pandas(pdf1, 2) ddf2 = dd.from_pandas(pdf2, 3) ddf3 = dd.from_pandas(pdf3, 2) ddf4 = dd.from_pandas(pdf4, 2) ddf5 = dd.from_pandas(pdf5, 3) cases = [ [ddf1, ddf2], [ddf1, ddf3], [ddf1, ddf4], [ddf1, ddf5], [ddf3, ddf4], [ddf3, ddf5], [ddf5, ddf1, ddf4], [ddf5, ddf3], [ddf1.A, ddf4.A], [ddf2.F, ddf3.F], [ddf4.A, ddf5.A], [ddf1.A, ddf4.F], [ddf2.F, ddf3.H], [ddf4.A, ddf5.B], [ddf1, ddf4.A], [ddf3.F, ddf2], [ddf5, ddf1.A, ddf2], ] for case in cases: pdcase = [c.compute() for c in case] with pytest.warns(None): # some cases will raise warning directly from pandas assert_eq( dd.concat(case, interleave_partitions=True), pd.concat(pdcase, sort=False), ) assert_eq( dd.concat(case, join="inner", interleave_partitions=True), pd.concat(pdcase, join="inner"), ) assert_eq(dd.concat(case, axis=1), pd.concat(pdcase, axis=1)) assert_eq( dd.concat(case, axis=1, join="inner"), pd.concat(pdcase, axis=1, join="inner"), ) # Dask + pandas cases = [ [ddf1, pdf2], [ddf1, pdf3], [pdf1, ddf4], [pdf1.A, ddf4.A], [ddf2.F, pdf3.F], [ddf1, pdf4.A], [ddf3.F, pdf2], [ddf2, pdf1, ddf3.F], ] for case in cases: pdcase = [c.compute() if isinstance(c, _Frame) else c for c in case] assert_eq(dd.concat(case, interleave_partitions=True), pd.concat(pdcase)) assert_eq( dd.concat(case, join="inner", interleave_partitions=True), pd.concat(pdcase, join="inner"), ) assert_eq(dd.concat(case, axis=1), pd.concat(pdcase, axis=1)) assert_eq( dd.concat(case, axis=1, join="inner"), pd.concat(pdcase, axis=1, join="inner"), ) @pytest.mark.parametrize( "known, cat_index, divisions", [ (True, True, False), (True, False, True), (True, False, False), (False, True, False), (False, False, True), (False, False, False), ], ) def test_concat_categorical(known, cat_index, divisions): frames = [ pd.DataFrame( { "w": list("xxxxx"), "x": np.arange(5), "y": list("abcbc"), "z": np.arange(5, dtype="f8"), } ), pd.DataFrame( { "w": list("yyyyy"), "x": np.arange(5, 10), "y": list("abbba"), "z": np.arange(5, 10, dtype="f8"), } ), pd.DataFrame( { "w": list("zzzzz"), "x": np.arange(10, 15), "y": list("bcbcc"), "z": np.arange(10, 15, dtype="f8"), } ), ] for df in frames: df.w = df.w.astype("category") df.y = df.y.astype("category") if cat_index: frames = [df.set_index(df.y) for df in frames] dframes = [dd.from_pandas(p, npartitions=2, sort=divisions) for p in frames] if not known: dframes[0]._meta = clear_known_categories(dframes[0]._meta, ["y"], index=True) def check_and_return(ddfs, dfs, join): sol = concat(dfs, join=join) res = dd.concat(ddfs, join=join, interleave_partitions=divisions) assert_eq(res, sol) if known: parts = compute_as_if_collection( dd.DataFrame, res.dask, res.__dask_keys__() ) for p in [i.iloc[:0] for i in parts]: res._meta == p # will error if schemas don't align assert not cat_index or has_known_categories(res.index) == known return res for join in ["inner", "outer"]: # Frame res = check_and_return(dframes, frames, join) assert has_known_categories(res.w) assert has_known_categories(res.y) == known # Series res = check_and_return([i.y for i in dframes], [i.y for i in frames], join) assert has_known_categories(res) == known # Non-cat series with cat index if cat_index: res = check_and_return([i.x for i in dframes], [i.x for i in frames], join) # Partition missing columns res = check_and_return( [dframes[0][["x", "y"]]] + dframes[1:], [frames[0][["x", "y"]]] + frames[1:], join, ) assert not hasattr(res, "w") or has_known_categories(res.w) assert has_known_categories(res.y) == known def test_concat_categorical_mixed_simple(): a = pd.Series(["a", "b", "c"], dtype="category") b = pd.Series(["a", "b"], dtype="category") da = dd.from_pandas(a, 2).cat.as_unknown().to_frame("A") db = dd.from_pandas(b, 2).to_frame("A") expected = concat([a.to_frame("A"), b.to_frame("A")]) result = dd.concat([da, db]) assert_eq(result, expected) def test_concat_datetimeindex(): # https://github.com/dask/dask/issues/2932 b2 = pd.DataFrame( {"x": ["a"]}, index=pd.DatetimeIndex(["2015-03-24 00:00:16"], dtype="datetime64[ns]"), ) b3 = pd.DataFrame( {"x": ["c"]}, index=pd.DatetimeIndex(["2015-03-29 00:00:44"], dtype="datetime64[ns]"), ) b2["x"] = b2.x.astype("category").cat.set_categories(["a", "c"]) b3["x"] = b3.x.astype("category").cat.set_categories(["a", "c"]) db2 = dd.from_pandas(b2, 1) db3 = dd.from_pandas(b3, 1) result = concat([b2.iloc[:0], b3.iloc[:0]]) assert result.index.dtype == "M8[ns]" result = dd.concat([db2, db3]) expected = pd.concat([b2, b3]) assert_eq(result, expected) def check_append_with_warning(dask_obj, dask_append, pandas_obj, pandas_append): if PANDAS_GT_140: with pytest.warns() as record: expected = pandas_obj.append(pandas_append) result = dask_obj.append(dask_append) assert_eq(result, expected) for w in record: assert w.category == FutureWarning assert "append method is deprecated" in str(w.message) else: with warnings.catch_warnings(): warnings.simplefilter("error") expected = pandas_obj.append(pandas_append) result = dask_obj.append(dask_append) assert_eq(result, expected) return result def test_append(): df = pd.DataFrame({"a": [1, 2, 3, 4, 5, 6], "b": [1, 2, 3, 4, 5, 6]}) df2 = pd.DataFrame( {"a": [1, 2, 3, 4, 5, 6], "b": [1, 2, 3, 4, 5, 6]}, index=[6, 7, 8, 9, 10, 11] ) df3 = pd.DataFrame( {"b": [1, 2, 3, 4, 5, 6], "c": [1, 2, 3, 4, 5, 6]}, index=[6, 7, 8, 9, 10, 11] ) ddf = dd.from_pandas(df, 2) ddf2 = dd.from_pandas(df2, 2) ddf3 = dd.from_pandas(df3, 2) s = pd.Series([7, 8], name=6, index=["a", "b"]) check_append_with_warning(ddf, s, df, s) check_append_with_warning(ddf, ddf2, df, df2) check_append_with_warning(ddf.a, ddf2.a, df.a, df2.a) # different columns check_append_with_warning(ddf, ddf3, df, df3) check_append_with_warning(ddf.a, ddf3.b, df.a, df3.b) # dask + pandas check_append_with_warning(ddf, df2, df, df2) check_append_with_warning(ddf.a, df2.a, df.a, df2.a) check_append_with_warning(ddf, df3, df, df3) check_append_with_warning(ddf.a, df3.b, df.a, df3.b) def test_append2(): dsk = { ("x", 0): pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}), ("x", 1): pd.DataFrame({"a": [4, 5, 6], "b": [3, 2, 1]}), ("x", 2): pd.DataFrame({"a": [7, 8, 9], "b": [0, 0, 0]}), } meta = make_meta({"a": "i8", "b": "i8"}, parent_meta=pd.DataFrame()) ddf1 = dd.DataFrame(dsk, "x", meta, [None, None]) df1 = ddf1.compute() dsk = { ("y", 0): pd.DataFrame({"a": [10, 20, 30], "b": [40, 50, 60]}), ("y", 1): pd.DataFrame({"a": [40, 50, 60], "b": [30, 20, 10]}), ("y", 2): pd.DataFrame({"a": [70, 80, 90], "b": [0, 0, 0]}), } ddf2 = dd.DataFrame(dsk, "y", meta, [None, None]) df2 = ddf2.compute() dsk = { ("y", 0): pd.DataFrame({"b": [10, 20, 30], "c": [40, 50, 60]}), ("y", 1): pd.DataFrame({"b": [40, 50, 60], "c": [30, 20, 10]}), } meta = make_meta({"b": "i8", "c": "i8"}, parent_meta=pd.DataFrame()) ddf3 = dd.DataFrame(dsk, "y", meta, [None, None]) df3 = ddf3.compute() check_append_with_warning(ddf1, ddf2, df1, df2) check_append_with_warning(ddf2, ddf1, df2, df1) # different columns check_append_with_warning(ddf1, ddf3, df1, df3) check_append_with_warning(ddf3, ddf1, df3, df1) # Dask + pandas check_append_with_warning(ddf1, df2, df1, df2) check_append_with_warning(ddf2, df1, df2, df1) # different columns check_append_with_warning(ddf1, df3, df1, df3) check_append_with_warning(ddf3, df1, df3, df1) def test_append_categorical(): frames = [ pd.DataFrame( { "x": np.arange(5, 10), "y": list("abbba"), "z": np.arange(5, 10, dtype="f8"), } ), pd.DataFrame( { "x": np.arange(10, 15), "y": list("bcbcc"), "z": np.arange(10, 15, dtype="f8"), } ), ] frames2 = [] for df in frames: df.y = df.y.astype("category") df2 = df.copy() df2.y = df2.y.cat.set_categories(list("abc")) df.index = df.y frames2.append(df2.set_index(df2.y)) df1, df2 = frames2 for known in [True, False]: dframes = [dd.from_pandas(p, npartitions=2, sort=False) for p in frames] if not known: dframes[0]._meta = clear_known_categories( dframes[0]._meta, ["y"], index=True ) ddf1, ddf2 = dframes res = check_append_with_warning(ddf1, ddf2, df1, df2) assert has_known_categories(res.index) == known assert has_known_categories(res.y) == known res = check_append_with_warning(ddf1.y, ddf2.y, df1.y, df2.y) assert has_known_categories(res.index) == known assert has_known_categories(res) == known res = check_append_with_warning(ddf1.index, ddf2.index, df1.index, df2.index) assert has_known_categories(res) == known def test_append_lose_divisions(): df = pd.DataFrame({"x": [1, 2, 3, 4]}, index=[1, 2, 3, 4]) ddf = dd.from_pandas(df, npartitions=2) res = check_append_with_warning(ddf, ddf, df, df) assert res.known_divisions is False def test_singleton_divisions(): df = pd.DataFrame({"x": [1, 1, 1]}, index=[1, 2, 3]) ddf = dd.from_pandas(df, npartitions=2) ddf2 = ddf.set_index("x") joined = ddf2.join(ddf2, rsuffix="r") assert joined.divisions == (1, 1) joined.compute() def test_repartition_repeated_divisions(): df = pd.DataFrame({"x": [0, 0, 0, 0]}) ddf = dd.from_pandas(df, npartitions=2).set_index("x") ddf2 = ddf.repartition(divisions=(0, 0), force=True) assert_eq(ddf2, df.set_index("x")) def test_multi_duplicate_divisions(): df1 = pd.DataFrame({"x": [0, 0, 0, 0]}) df2 = pd.DataFrame({"x": [0]}) ddf1 = dd.from_pandas(df1, npartitions=2).set_index("x") ddf2 = dd.from_pandas(df2, npartitions=1).set_index("x") assert ddf1.npartitions == 2 assert len(ddf1) == len(df1) r1 = ddf1.merge(ddf2, how="left", left_index=True, right_index=True) sf1 = df1.set_index("x") sf2 = df2.set_index("x") r2 = sf1.merge(sf2, how="left", left_index=True, right_index=True) assert_eq(r1, r2) def test_merge_outer_empty(): # Issue #5470 bug reproducer k_clusters = 3 df = pd.DataFrame( {"user": ["A", "B", "C", "D", "E", "F"], "cluster": [1, 1, 2, 2, 3, 3]} ) df = dd.from_pandas(df, npartitions=10) empty_df = dd.from_pandas(pd.DataFrame(), npartitions=10) for x in range(0, k_clusters + 1): assert_eq( dd.merge(empty_df, df[df.cluster == x], how="outer"), df[df.cluster == x], check_index=False, check_divisions=False, ) def test_dtype_equality_warning(): # https://github.com/dask/dask/issues/5437 df1 = pd.DataFrame({"a": np.array([1, 2], dtype=np.dtype(np.int64))}) df2 = pd.DataFrame({"a": np.array([1, 2], dtype=np.dtype(np.longlong))}) with pytest.warns(None) as r: dd.multi.warn_dtype_mismatch(df1, df2, "a", "a") assert len(r) == 0 @pytest.mark.parametrize( "engine", ["pandas", pytest.param("cudf", marks=pytest.mark.gpu)] ) def test_groupby_concat_cudf(engine): # NOTE: Issue #5643 Reproducer size = 6 npartitions = 3 d1 = pd.DataFrame( { "a": np.random.permutation(np.arange(size)), "b": np.random.randint(100, size=size), } ) d2 = pd.DataFrame( { "c": np.random.permutation(np.arange(size)), "d": np.random.randint(100, size=size), } ) if engine == "cudf": # NOTE: engine == "cudf" requires cudf/dask_cudf, # will be skipped by non-GPU CI. cudf = pytest.importorskip("cudf") dask_cudf = pytest.importorskip("dask_cudf") d1 = cudf.from_pandas(d1) d2 = cudf.from_pandas(d2) dd1 = dask_cudf.from_cudf(d1, npartitions) dd2 = dask_cudf.from_cudf(d2, npartitions) else: dd1 = dd.from_pandas(d1, npartitions) dd2 = dd.from_pandas(d2, npartitions) grouped_d1 = d1.groupby(["a"]).sum() grouped_d2 = d2.groupby(["c"]).sum() res = concat([grouped_d1, grouped_d2], axis=1) grouped_dd1 = dd1.groupby(["a"]).sum() grouped_dd2 = dd2.groupby(["c"]).sum() with warnings.catch_warnings(): warnings.simplefilter("ignore", UserWarning) res_dd = dd.concat([grouped_dd1, grouped_dd2], axis=1) assert_eq(res_dd.compute().sort_index(), res.sort_index()) @pytest.mark.parametrize("ordered", [True, False]) def test_concat_ignore_order(ordered): pdf1 = pd.DataFrame( { "x": pd.Categorical( ["a", "b", "c", "a"], categories=["a", "b", "c"], ordered=ordered ) } ) ddf1 = dd.from_pandas(pdf1, 2) pdf2 = pd.DataFrame( { "x": pd.Categorical( ["c", "b", "a"], categories=["c", "b", "a"], ordered=ordered ) } ) ddf2 = dd.from_pandas(pdf2, 2) expected = pd.concat([pdf1, pdf2]) expected["x"] = expected["x"].astype("category") result = dd.concat([ddf1, ddf2], ignore_order=True) assert_eq(result, expected) def test_categorical_join(): # https://github.com/dask/dask/issues/6134 df = pd.DataFrame( { "join_col": ["a", "a", "b", "b"], "a": [0, 0, 10, 10], } ) df2 = pd.DataFrame({"b": [1, 2, 1, 2]}, index=["a", "a", "b", "b"]) ddf = dd.from_pandas(df, npartitions=2) ddf2 = dd.from_pandas(df2, npartitions=1) ddf["join_col"] = ddf["join_col"].astype("category") ddf2.index = ddf2.index.astype("category") expected = ddf.compute().join(ddf2.compute(), on="join_col", how="left") actual_dask = ddf.join(ddf2, on="join_col", how="left") assert actual_dask.join_col.dtype == "category" actual = actual_dask.compute() assert actual.join_col.dtype == "category" assert assert_eq(expected, actual) def test_categorical_merge_with_columns_missing_from_left(): df1 = pd.DataFrame({"A": [0, 1], "B": pd.Categorical(["a", "b"])}) df2 = pd.DataFrame({"C": pd.Categorical(["a", "b"])}) expected = pd.merge(df2, df1, left_index=True, right_on="A") ddf1 = dd.from_pandas(df1, npartitions=2) ddf2 = dd.from_pandas(df2, npartitions=2) actual = dd.merge(ddf2, ddf1, left_index=True, right_on="A").compute() assert actual.C.dtype == "category" assert actual.B.dtype == "category" assert actual.A.dtype == "int64" assert actual.index.dtype == "int64" assert assert_eq(expected, actual) def test_categorical_merge_with_merge_column_cat_in_one_and_not_other_upcasts(): df1 = pd.DataFrame({"A": pd.Categorical([0, 1]), "B": pd.Categorical(["a", "b"])}) df2 = pd.DataFrame({"C": pd.Categorical(["a", "b"])}) expected = pd.merge(df2, df1, left_index=True, right_on="A") ddf1 = dd.from_pandas(df1, npartitions=2) ddf2 = dd.from_pandas(df2, npartitions=2) actual = dd.merge(ddf2, ddf1, left_index=True, right_on="A").compute() assert actual.C.dtype == "category" assert actual.B.dtype == "category" assert actual.A.dtype == "int64" assert actual.index.dtype == "int64" assert assert_eq(expected, actual) def test_categorical_merge_retains_category_dtype(): # https://github.com/dask/dask/issues/6142 a = pd.DataFrame({"A": [0, 1, 2, 3], "B": [4, 5, 6, 7]}) b = pd.DataFrame({"A": [0, 1, 2, 4], "C": [4, 5, 7, 7]}) df1 = dd.from_pandas(a, 2) df1["A"] = df1.A.astype("category") df2 = dd.from_pandas(b, 2) df2["A"] = df2.A.astype("category") actual_dask = df1.merge(df2, on="A") assert actual_dask.A.dtype == "category" actual = actual_dask.compute() assert actual.A.dtype == "category" def test_categorical_merge_does_not_raise_setting_with_copy_warning(): # https://github.com/dask/dask/issues/7087 df1 = pd.DataFrame(data={"A": ["a", "b", "c"]}, index=["s", "v", "w"]) df2 = pd.DataFrame(data={"B": ["t", "d", "i"]}, index=["v", "w", "r"]) ddf1 = dd.from_pandas(df1, npartitions=1) df2 = df2.astype({"B": "category"}) assert_eq(df1.join(df2), ddf1.join(df2)) @pytest.mark.parametrize("how", ["inner", "left", "right"]) @pytest.mark.parametrize("npartitions", [28, 32]) @pytest.mark.parametrize("base", ["lg", "sm"]) def test_merge_tasks_large_to_small(how, npartitions, base): size_lg = 3000 size_sm = 300 npartitions_lg = 30 npartitions_sm = 3 broadcast_bias = 1.0 # Prioritize broadcast lg = pd.DataFrame( { "x": np.random.choice(np.arange(100), size_lg), "y": np.arange(size_lg), } ) ddf_lg = dd.from_pandas(lg, npartitions=npartitions_lg) sm = pd.DataFrame( { "x": np.random.choice(np.arange(100), size_sm), "y": np.arange(size_sm), } ) ddf_sm = dd.from_pandas(sm, npartitions=npartitions_sm) if base == "lg": left = lg ddf_left = ddf_lg right = sm ddf_right = ddf_sm else: left = sm ddf_left = ddf_sm right = lg ddf_right = ddf_lg dd_result = dd.merge( ddf_left, ddf_right, on="y", how=how, npartitions=npartitions, broadcast=broadcast_bias, shuffle="tasks", ) pd_result = pd.merge(left, right, on="y", how=how) # Make sure `on` dtypes match dd_result["y"] = dd_result["y"].astype(np.int32) pd_result["y"] = pd_result["y"].astype(np.int32) if npartitions: assert dd_result.npartitions == npartitions assert_eq( dd_result.compute().sort_values("y"), pd_result.sort_values("y"), check_index=False, ) @pytest.mark.parametrize("how", ["right", "inner"]) def test_pairwise_rejects_unsupported_join_types(how): base_df = dd.from_pandas( pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]), 3 ) dfs = [ dd.from_pandas( pd.DataFrame({"a": [4, 5, 6], "b": [3, 2, 1]}, index=[5, 6, 8]), 3 ), dd.from_pandas( pd.DataFrame({"a": [7, 8, 9], "b": [0, 0, 0]}, index=[9, 9, 9]), 3 ), ] with pytest.raises(ValueError) as e: base_df.join(dfs, how=how) e.match("merge_multi only supports left or outer joins") @pytest.mark.parametrize("how", ["left", "outer"]) @pytest.mark.parametrize("npartitions_base", [1, 2, 3]) @pytest.mark.parametrize("npartitions_other", [1, 2, 3]) def test_pairwise_merge_results_in_identical_output_df( how, npartitions_base, npartitions_other ): dfs_to_merge = [] for i in range(10): df = pd.DataFrame( { f"{i}A": [5, 6, 7, 8], f"{i}B": [4, 3, 2, 1], }, index=[0, 1, 2, 3], ) ddf = dd.from_pandas(df, npartitions_other) dfs_to_merge.append(ddf) ddf_loop = dd.from_pandas(pd.DataFrame(index=[0, 1, 3]), npartitions_base) for ddf in dfs_to_merge: ddf_loop = ddf_loop.join(ddf, how=how) ddf_pairwise = dd.from_pandas(pd.DataFrame(index=[0, 1, 3]), npartitions_base) ddf_pairwise = ddf_pairwise.join(dfs_to_merge, how=how) assert_eq(ddf_pairwise, ddf_loop)