import pytest pytest.importorskip("numpy") import numpy as np from numpy.testing import assert_array_almost_equal, assert_array_equal import dask.array as da from dask.array.overlap import ( boundaries, constant, ensure_minimum_chunksize, nearest, overlap, overlap_internal, periodic, reflect, trim_internal, ) from dask.array.utils import assert_eq, same_keys from ..lib.stride_tricks import sliding_window_view def test_overlap_internal(): x = np.arange(64).reshape((8, 8)) d = da.from_array(x, chunks=(4, 4)) g = overlap_internal(d, {0: 2, 1: 1}) result = g.compute(scheduler="sync") assert g.chunks == ((6, 6), (5, 5)) expected = np.array( [ [0, 1, 2, 3, 4, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 11, 12, 13, 14, 15], [16, 17, 18, 19, 20, 19, 20, 21, 22, 23], [24, 25, 26, 27, 28, 27, 28, 29, 30, 31], [32, 33, 34, 35, 36, 35, 36, 37, 38, 39], [40, 41, 42, 43, 44, 43, 44, 45, 46, 47], [16, 17, 18, 19, 20, 19, 20, 21, 22, 23], [24, 25, 26, 27, 28, 27, 28, 29, 30, 31], [32, 33, 34, 35, 36, 35, 36, 37, 38, 39], [40, 41, 42, 43, 44, 43, 44, 45, 46, 47], [48, 49, 50, 51, 52, 51, 52, 53, 54, 55], [56, 57, 58, 59, 60, 59, 60, 61, 62, 63], ] ) assert_eq(result, expected) assert same_keys(overlap_internal(d, {0: 2, 1: 1}), g) def test_overlap_internal_asymmetric(): x = np.arange(64).reshape((8, 8)) d = da.from_array(x, chunks=(4, 4)) result = overlap_internal(d, {0: (2, 0), 1: (1, 0)}) assert result.chunks == ((4, 6), (4, 5)) expected = np.array( [ [0, 1, 2, 3, 3, 4, 5, 6, 7], [8, 9, 10, 11, 11, 12, 13, 14, 15], [16, 17, 18, 19, 19, 20, 21, 22, 23], [24, 25, 26, 27, 27, 28, 29, 30, 31], [16, 17, 18, 19, 19, 20, 21, 22, 23], [24, 25, 26, 27, 27, 28, 29, 30, 31], [32, 33, 34, 35, 35, 36, 37, 38, 39], [40, 41, 42, 43, 43, 44, 45, 46, 47], [48, 49, 50, 51, 51, 52, 53, 54, 55], [56, 57, 58, 59, 59, 60, 61, 62, 63], ] ) assert_eq(result, expected) assert same_keys(overlap_internal(d, {0: (2, 0), 1: (1, 0)}), result) def test_overlap_internal_asymmetric_small(): x = np.arange(32).reshape((2, 16)) d = da.from_array(x, chunks=(2, 4)) result = overlap_internal(d, {0: (0, 0), 1: (1, 1)}) assert result.chunks == ((2,), (5, 6, 6, 5)) expected = np.array( [ [0, 1, 2, 3, 4, 3, 4, 5, 6, 7, 8, 7, 8, 9, 10, 11, 12, 11, 12, 13, 14, 15], [ 16, 17, 18, 19, 20, 19, 20, 21, 22, 23, 24, 23, 24, 25, 26, 27, 28, 27, 28, 29, 30, 31, ], ] ) assert_eq(result, expected) assert same_keys(overlap_internal(d, {0: (0, 0), 1: (1, 1)}), result) def test_trim_internal(): d = da.ones((40, 60), chunks=(10, 10)) e = trim_internal(d, axes={0: 1, 1: 2}) assert e.chunks == ((8, 8, 8, 8), (6, 6, 6, 6, 6, 6)) def test_periodic(): x = np.arange(64).reshape((8, 8)) d = da.from_array(x, chunks=(4, 4)) e = periodic(d, axis=0, depth=2) assert e.shape[0] == d.shape[0] + 4 assert e.shape[1] == d.shape[1] assert_eq(e[1, :], d[-1, :]) assert_eq(e[0, :], d[-2, :]) def test_reflect(): x = np.arange(10) d = da.from_array(x, chunks=(5, 5)) e = reflect(d, axis=0, depth=2) expected = np.array([1, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 8]) assert_eq(e, expected) e = reflect(d, axis=0, depth=1) expected = np.array([0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9]) assert_eq(e, expected) def test_nearest(): x = np.arange(10) d = da.from_array(x, chunks=(5, 5)) e = nearest(d, axis=0, depth=2) expected = np.array([0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9]) assert_eq(e, expected) e = nearest(d, axis=0, depth=1) expected = np.array([0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9]) assert_eq(e, expected) def test_constant(): x = np.arange(64).reshape((8, 8)) d = da.from_array(x, chunks=(4, 4)) e = constant(d, axis=0, depth=2, value=10) assert e.shape[0] == d.shape[0] + 4 assert e.shape[1] == d.shape[1] assert_eq(e[1, :], np.ones(8, dtype=x.dtype) * 10) assert_eq(e[-1, :], np.ones(8, dtype=x.dtype) * 10) def test_boundaries(): x = np.arange(64).reshape((8, 8)) d = da.from_array(x, chunks=(4, 4)) e = boundaries(d, {0: 2, 1: 1}, {0: 0, 1: "periodic"}) expected = np.array( [ [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [7, 0, 1, 2, 3, 4, 5, 6, 7, 0], [15, 8, 9, 10, 11, 12, 13, 14, 15, 8], [23, 16, 17, 18, 19, 20, 21, 22, 23, 16], [31, 24, 25, 26, 27, 28, 29, 30, 31, 24], [39, 32, 33, 34, 35, 36, 37, 38, 39, 32], [47, 40, 41, 42, 43, 44, 45, 46, 47, 40], [55, 48, 49, 50, 51, 52, 53, 54, 55, 48], [63, 56, 57, 58, 59, 60, 61, 62, 63, 56], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], ] ) assert_eq(e, expected) def test_overlap(): x = np.arange(64).reshape((8, 8)) d = da.from_array(x, chunks=(4, 4)) g = overlap(d, depth={0: 2, 1: 1}, boundary={0: 100, 1: "reflect"}) assert g.chunks == ((8, 8), (6, 6)) expected = np.array( [ [100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100], [100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100], [0, 0, 1, 2, 3, 4, 3, 4, 5, 6, 7, 7], [8, 8, 9, 10, 11, 12, 11, 12, 13, 14, 15, 15], [16, 16, 17, 18, 19, 20, 19, 20, 21, 22, 23, 23], [24, 24, 25, 26, 27, 28, 27, 28, 29, 30, 31, 31], [32, 32, 33, 34, 35, 36, 35, 36, 37, 38, 39, 39], [40, 40, 41, 42, 43, 44, 43, 44, 45, 46, 47, 47], [16, 16, 17, 18, 19, 20, 19, 20, 21, 22, 23, 23], [24, 24, 25, 26, 27, 28, 27, 28, 29, 30, 31, 31], [32, 32, 33, 34, 35, 36, 35, 36, 37, 38, 39, 39], [40, 40, 41, 42, 43, 44, 43, 44, 45, 46, 47, 47], [48, 48, 49, 50, 51, 52, 51, 52, 53, 54, 55, 55], [56, 56, 57, 58, 59, 60, 59, 60, 61, 62, 63, 63], [100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100], [100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100], ] ) assert_eq(g, expected) assert same_keys(g, overlap(d, depth={0: 2, 1: 1}, boundary={0: 100, 1: "reflect"})) u_depth = np.uint16([2, 1]) u_depth = {k: v for k, v in enumerate(u_depth)} g = overlap(d, depth=u_depth, boundary={0: 100, 1: "reflect"}) assert g.chunks == ((8, 8), (6, 6)) assert_eq(g, expected) assert same_keys(g, overlap(d, depth={0: 2, 1: 1}, boundary={0: 100, 1: "reflect"})) g = overlap(d, depth={0: 2, 1: 1}, boundary={0: 100, 1: "none"}) expected = np.array( [ [100, 100, 100, 100, 100, 100, 100, 100, 100, 100], [100, 100, 100, 100, 100, 100, 100, 100, 100, 100], [0, 1, 2, 3, 4, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 11, 12, 13, 14, 15], [16, 17, 18, 19, 20, 19, 20, 21, 22, 23], [24, 25, 26, 27, 28, 27, 28, 29, 30, 31], [32, 33, 34, 35, 36, 35, 36, 37, 38, 39], [40, 41, 42, 43, 44, 43, 44, 45, 46, 47], [16, 17, 18, 19, 20, 19, 20, 21, 22, 23], [24, 25, 26, 27, 28, 27, 28, 29, 30, 31], [32, 33, 34, 35, 36, 35, 36, 37, 38, 39], [40, 41, 42, 43, 44, 43, 44, 45, 46, 47], [48, 49, 50, 51, 52, 51, 52, 53, 54, 55], [56, 57, 58, 59, 60, 59, 60, 61, 62, 63], [100, 100, 100, 100, 100, 100, 100, 100, 100, 100], [100, 100, 100, 100, 100, 100, 100, 100, 100, 100], ] ) assert_eq(g, expected) assert g.chunks == ((8, 8), (5, 5)) u_depth = np.uint16([2, 1]) u_depth = {k: v for k, v in enumerate(u_depth)} g = overlap(d, depth=u_depth, boundary={0: 100, 1: "none"}) assert_eq(g, expected) assert g.chunks == ((8, 8), (5, 5)) def test_asymmetric_overlap_boundary_exception(): x = da.arange(10, chunks=5) with pytest.raises(NotImplementedError): x.map_overlap( lambda x: x + len(x), depth={0: (0, 2)}, boundary="reflect", dtype=x.dtype ) def test_map_overlap(): x = da.arange(10, chunks=5) y = x.map_overlap(lambda x: x + len(x), depth=2, dtype=x.dtype, boundary="reflect") assert_eq(y, np.arange(10) + 5 + 2 + 2) x = da.arange(10, chunks=5) y = x.map_overlap( lambda x: x + len(x), depth=np.int64(2), dtype=x.dtype, boundary="reflect" ) assert all([(type(s) is int) for s in y.shape]) assert_eq(y, np.arange(10) + 5 + 2 + 2) x = da.ones((10, 10), chunks=(3, 4)) z = x.map_overlap(lambda x: x, depth={1: 5}, boundary="reflect") assert z.chunks[0] == x.chunks[0] # don't rechunk the first dimension x = np.arange(16).reshape((4, 4)) d = da.from_array(x, chunks=(2, 2)) exp1 = d.map_overlap( lambda x: x + x.size, depth=1, dtype=d.dtype, boundary="reflect" ) exp2 = d.map_overlap( lambda x: x + x.size, depth={0: 1, 1: 1}, boundary={0: "reflect", 1: "none"}, dtype=d.dtype, ) exp3 = d.map_overlap( lambda x: x + x.size, depth={1: 1}, boundary={1: "reflect"}, dtype=d.dtype ) exp4 = d.map_overlap( lambda x: x + x.size, depth={1: (1, 0)}, boundary={0: "none", 1: "none"}, dtype=d.dtype, ) assert_eq(exp1, x + 16) assert_eq(exp2, x + 12) assert_eq(exp3, x + 8) assert_eq( exp4, np.block( [[x[0:2, 0:2] + 4, x[0:2, 2:4] + 6], [x[2:4, 0:2] + 4, x[2:4, 2:4] + 6]] ), ) def test_map_overlap_escapes_to_map_blocks_when_depth_is_zero(): x = da.arange(10, chunks=5) y = x.map_overlap(lambda x: x + 1, depth=0, boundary="none") assert len(y.dask) == 2 * x.numblocks[0] # depth=0 --> map_blocks assert_eq(y, np.arange(10) + 1) @pytest.mark.parametrize( "boundary", [None, "reflect", "periodic", "nearest", "none", 0] ) def test_map_overlap_no_depth(boundary): x = da.arange(10, chunks=5) if boundary is None: # Can be removed after boundary default argument is changed to "none" # See https://github.com/dask/dask/issues/8391 with pytest.raises(FutureWarning): y = x.map_overlap(lambda i: i, depth=0, boundary=boundary, dtype=x.dtype) assert_eq(y, x) else: y = x.map_overlap(lambda i: i, depth=0, boundary=boundary, dtype=x.dtype) assert_eq(y, x) def test_map_overlap_multiarray(): # Same ndim, same numblocks, same chunks x = da.arange(10, chunks=5) y = da.arange(10, chunks=5) z = da.map_overlap(lambda x, y: x + y, x, y, depth=1, boundary="none") assert_eq(z, 2 * np.arange(10)) # Same ndim, same numblocks, different chunks x = da.arange(10, chunks=(2, 3, 5)) y = da.arange(10, chunks=(5, 3, 2)) z = da.map_overlap(lambda x, y: x + y, x, y, depth=1, boundary="none") assert z.chunks == ((2, 3, 3, 2),) assert_eq(z, 2 * np.arange(10)) # Same ndim, different numblocks, different chunks x = da.arange(10, chunks=(10,)) y = da.arange(10, chunks=(4, 4, 2)) z = da.map_overlap(lambda x, y: x + y, x, y, depth=1, boundary="none") assert z.chunks == ((4, 4, 2),) assert_eq(z, 2 * np.arange(10)) # Different ndim, different numblocks, different chunks x = da.arange(10, chunks=(10,)) y = da.arange(10).reshape(1, 10).rechunk((1, (4, 4, 2))) z = da.map_overlap(lambda x, y: x + y, x, y, depth=1, boundary="none") assert z.chunks == ((1,), (4, 4, 2)) assert z.shape == (1, 10) assert_eq(z, 2 * np.arange(10)[np.newaxis]) # Note: checks on arange equality in all of the above help ensure that # trimming is applied appropriately to result chunks (i.e. results # are not somehow shifted) def test_map_overlap_multiarray_defaults(): # Check that by default, chunk alignment and arrays of varying dimensionality # are supported by with no effect on result shape # (i.e. defaults are pass-through to map_blocks) x = da.ones((10,), chunks=10) y = da.ones((1, 10), chunks=5) z = da.map_overlap(lambda x, y: x + y, x, y, boundary="none") # func should be called twice and get (5,) and (1, 5) arrays of ones each time assert_eq(z.shape, (1, 10)) assert_eq(z.sum(), 20.0) def test_map_overlap_multiarray_different_depths(): x = da.ones(5, dtype="int") y = da.ones(5, dtype="int") def run(depth): return da.map_overlap( lambda x, y: x.sum() + y.sum(), x, y, depth=depth, chunks=(0,), trim=False, boundary="reflect", ).compute() # Check that the number of elements added # to arrays in overlap works as expected # when depths differ for each array assert run([0, 0]) == 10 assert run([0, 1]) == 12 assert run([1, 1]) == 14 assert run([1, 2]) == 16 assert run([0, 5]) == 20 assert run([5, 5]) == 30 # Ensure that depth > chunk size results in error with pytest.raises(ValueError): run([0, 6]) def test_map_overlap_multiarray_uneven_numblocks_exception(): x = da.arange(10, chunks=(10,)) y = da.arange(10, chunks=(5, 5)) with pytest.raises(ValueError): # Fail with chunk alignment explicitly disabled da.map_overlap( lambda x, y: x + y, x, y, align_arrays=False, boundary="none" ).compute() def test_map_overlap_multiarray_block_broadcast(): def func(x, y): # Return result with expected padding z = x.size + y.size return np.ones((3, 3)) * z # Chunks in trailing dimension will be unified to two chunks of size 6 # and block broadcast will allow chunks from x to repeat x = da.ones((12,), chunks=12) # numblocks = (1,) -> (2, 2) after broadcast y = da.ones((16, 12), chunks=(8, 6)) # numblocks = (2, 2) z = da.map_overlap( func, x, y, chunks=(3, 3), depth=1, trim=True, boundary="reflect" ) assert_eq(z, z) assert z.shape == (2, 2) # func call will receive (8,) and (10, 8) arrays for each of 4 blocks assert_eq(z.sum(), 4.0 * (10 * 8 + 8)) def test_map_overlap_multiarray_variadic(): # Test overlapping row slices from 3D arrays xs = [ # Dim 0 will unify to chunks of size 4 for all: da.ones((12, 1, 1), chunks=((12,), 1, 1)), da.ones((12, 8, 1), chunks=((8, 4), 8, 1)), da.ones((12, 8, 4), chunks=((4, 8), 8, 4)), ] def func(*args): return np.array([sum(x.size for x in args)]) x = da.map_overlap( func, *xs, chunks=(1,), depth=1, trim=False, drop_axis=[1, 2], boundary="reflect", ) # Each func call should get 4 rows from each array padded by 1 in each dimension size_per_slice = sum(np.pad(x[:4], 1, mode="constant").size for x in xs) assert x.shape == (3,) assert all(x.compute() == size_per_slice) @pytest.mark.parametrize( "drop_axis", ( (0,), (1,), (2,), (0, 1), (1, 2), (2, 0), 1, (-3,), (-2,), (-1,), (-3, -2), (-2, -1), (-1, -3), -2, ), ) def test_map_overlap_trim_using_drop_axis_and_different_depths(drop_axis): x = da.random.standard_normal((5, 10, 8), chunks=(2, 5, 4)) def _mean(x): return x.mean(axis=drop_axis) expected = _mean(x) # unique boundary and depth value per axis boundary = (0, "reflect", "nearest") depth = (1, 3, 2) # to match expected result, dropped axes must have depth 0 _drop_axis = (drop_axis,) if np.isscalar(drop_axis) else drop_axis _drop_axis = [d % x.ndim for d in _drop_axis] depth = tuple(0 if i in _drop_axis else d for i, d in enumerate(depth)) y = da.map_overlap( _mean, x, depth=depth, boundary=boundary, drop_axis=drop_axis, dtype=float ).compute() assert_array_almost_equal(expected, y) def test_map_overlap_assumes_shape_matches_first_array_if_trim_is_false(): # https://github.com/dask/dask/issues/6681 x1 = da.ones((10,), chunks=(5, 5)) x2 = x1.rechunk(10) def oversum(x): return x[2:-2] z1 = da.map_overlap(oversum, x1, depth=2, trim=False, boundary="none") assert z1.shape == (10,) z2 = da.map_overlap(oversum, x2, depth=2, trim=False, boundary="none") assert z2.shape == (10,) def test_map_overlap_deprecated_signature(): def func(x): return np.array(x.sum()) x = da.ones(3) # Old positional signature: func, depth, boundary, trim with pytest.warns(FutureWarning): y = da.map_overlap(x, func, 0, "reflect", True) assert y.compute() == 3 assert y.shape == (3,) with pytest.warns(FutureWarning): y = da.map_overlap(x, func, 1, "reflect", True) assert y.compute() == 5 assert y.shape == (3,) with pytest.warns(FutureWarning): y = da.map_overlap(x, func, 1, "reflect", False) assert y.compute() == 5 assert y.shape == (3,) def test_nearest_overlap(): a = np.arange(144).reshape(12, 12).astype(float) darr = da.from_array(a, chunks=(6, 6)) garr = overlap(darr, depth={0: 5, 1: 5}, boundary={0: "nearest", 1: "nearest"}) tarr = trim_internal(garr, {0: 5, 1: 5}) assert_array_almost_equal(tarr, a) def test_0_depth(): expected = np.arange(100).reshape(10, 10) darr = da.from_array(expected, chunks=(5, 2)) depth = {0: 0, 1: 0} reflected = overlap(darr, depth=depth, boundary="reflect") nearest = overlap(darr, depth=depth, boundary="nearest") periodic = overlap(darr, depth=depth, boundary="periodic") constant = overlap(darr, depth=depth, boundary=42) result = trim_internal(reflected, depth) assert_array_equal(result, expected) result = trim_internal(nearest, depth) assert_array_equal(result, expected) result = trim_internal(periodic, depth) assert_array_equal(result, expected) result = trim_internal(constant, depth) assert_array_equal(result, expected) def test_some_0_depth(): expected = np.arange(100).reshape(10, 10) darr = da.from_array(expected, chunks=(5, 5)) depth = {0: 4, 1: 0} reflected = overlap(darr, depth=depth, boundary="reflect") nearest = overlap(darr, depth=depth, boundary="nearest") periodic = overlap(darr, depth=depth, boundary="periodic") constant = overlap(darr, depth=depth, boundary=42) result = trim_internal(reflected, depth) assert_array_equal(result, expected) result = trim_internal(nearest, depth) assert_array_equal(result, expected) result = trim_internal(periodic, depth) assert_array_equal(result, expected) result = trim_internal(constant, depth) assert_array_equal(result, expected) def test_one_chunk_along_axis(): a = np.arange(2 * 9).reshape(2, 9) darr = da.from_array(a, chunks=((2,), (2, 2, 2, 3))) g = overlap(darr, depth=0, boundary=0) assert a.shape == g.shape def test_constant_boundaries(): a = np.arange(1 * 9).reshape(1, 9) darr = da.from_array(a, chunks=((1,), (2, 2, 2, 3))) b = boundaries(darr, {0: 0, 1: 0}, {0: 0, 1: 0}) assert b.chunks == darr.chunks def test_depth_equals_boundary_length(): expected = np.arange(100).reshape(10, 10) darr = da.from_array(expected, chunks=(5, 5)) depth = {0: 5, 1: 5} reflected = overlap(darr, depth=depth, boundary="reflect") nearest = overlap(darr, depth=depth, boundary="nearest") periodic = overlap(darr, depth=depth, boundary="periodic") constant = overlap(darr, depth=depth, boundary=42) result = trim_internal(reflected, depth) assert_array_equal(result, expected) result = trim_internal(nearest, depth) assert_array_equal(result, expected) result = trim_internal(periodic, depth) assert_array_equal(result, expected) result = trim_internal(constant, depth) assert_array_equal(result, expected) def test_depth_greater_than_boundary_length(): expected = np.arange(100).reshape(10, 10) darr = da.from_array(expected, chunks=(5, 5)) depth = {0: 8, 1: 7} reflected = overlap(darr, depth=depth, boundary="reflect") nearest = overlap(darr, depth=depth, boundary="nearest") periodic = overlap(darr, depth=depth, boundary="periodic") constant = overlap(darr, depth=depth, boundary=42) result = trim_internal(reflected, depth) assert_array_equal(result, expected) result = trim_internal(nearest, depth) assert_array_equal(result, expected) result = trim_internal(periodic, depth) assert_array_equal(result, expected) result = trim_internal(constant, depth) assert_array_equal(result, expected) @pytest.mark.parametrize( "chunks", [ ((5, 5, 2), (5, 5, 2)), ((3, 3, 3, 3), (11, 1)), ], ) def test_depth_greater_than_smallest_chunk_combines_chunks(chunks): a = np.arange(144).reshape(12, 12) darr = da.from_array(a, chunks=chunks) depth = {0: 4, 1: 2} output = overlap(darr, depth=depth, boundary=1) assert all(c >= depth[0] * 2 for c in output.chunks[0]) assert all(c >= depth[1] * 2 for c in output.chunks[1]) def test_depth_greater_than_dim(): a = np.arange(144).reshape(12, 12) darr = da.from_array(a, chunks=(3, 5)) depth = {0: 13, 1: 4} with pytest.raises(ValueError, match="The overlapping depth"): overlap(darr, depth=depth, boundary=1) def test_none_boundaries(): x = da.from_array(np.arange(16).reshape(4, 4), chunks=(2, 2)) exp = boundaries(x, 2, {0: "none", 1: 33}) res = np.array( [ [33, 33, 0, 1, 2, 3, 33, 33], [33, 33, 4, 5, 6, 7, 33, 33], [33, 33, 8, 9, 10, 11, 33, 33], [33, 33, 12, 13, 14, 15, 33, 33], ] ) assert_eq(exp, res) def test_overlap_small(): x = da.ones((10, 10), chunks=(5, 5)) y = x.map_overlap(lambda x: x, depth=1, boundary="none") assert len(y.dask) < 200 y = x.map_overlap(lambda x: x, depth=1, boundary="none") assert len(y.dask) < 100 def test_no_shared_keys_with_different_depths(): da.random.seed(0) a = da.random.random((9, 9), chunks=(3, 3)) def check(x): assert x.shape == (3, 3) return x r = [ a.map_overlap( lambda a: a + 1, dtype=a.dtype, depth={j: int(i == j) for j in range(a.ndim)}, boundary="none", ).map_blocks(check, dtype=a.dtype) for i in range(a.ndim) ] assert set(r[0].dask) & set(r[1].dask) == set(a.dask) da.compute(*r, scheduler="single-threaded") def test_overlap_few_dimensions_small(): x = da.ones((20, 20), chunks=(10, 10)) a = x.map_overlap(lambda x: x, depth={0: 1}, boundary="none") assert_eq(x, a) assert any(isinstance(k[1], float) for k in a.dask) assert all(isinstance(k[2], int) for k in a.dask) b = x.map_overlap(lambda x: x, depth={1: 1}, boundary="none") assert_eq(x, b) assert all(isinstance(k[1], int) for k in b.dask) assert any(isinstance(k[2], float) for k in b.dask) c = x.map_overlap(lambda x: x, depth={0: 1, 1: 1}, boundary="none") assert_eq(x, c) assert any(isinstance(k[1], float) for k in c.dask) assert any(isinstance(k[2], float) for k in c.dask) def test_overlap_few_dimensions(): x = da.ones((100, 100), chunks=(10, 10)) a = x.map_overlap(lambda x: x, depth={0: 1}, boundary="none") b = x.map_overlap(lambda x: x, depth={1: 1}, boundary="none") c = x.map_overlap(lambda x: x, depth={0: 1, 1: 1}, boundary="none") assert len(a.dask) == len(b.dask) assert len(a.dask) < len(c.dask) assert len(c.dask) < 10 * len(a.dask) @pytest.mark.parametrize("boundary", ["reflect", "periodic", "nearest", "none"]) def test_trim_boundry(boundary): x = da.from_array(np.arange(24).reshape(4, 6), chunks=(2, 3)) x_overlaped = da.overlap.overlap(x, 2, boundary={0: "reflect", 1: boundary}) x_trimmed = da.overlap.trim_overlap( x_overlaped, 2, boundary={0: "reflect", 1: boundary} ) assert np.all(x == x_trimmed) x_overlaped = da.overlap.overlap(x, 2, boundary={1: boundary}) x_trimmed = da.overlap.trim_overlap(x_overlaped, 2, boundary={1: boundary}) assert np.all(x == x_trimmed) x_overlaped = da.overlap.overlap(x, 2, boundary=boundary) x_trimmed = da.overlap.trim_overlap(x_overlaped, 2, boundary=boundary) assert np.all(x == x_trimmed) def test_map_overlap_rechunks_array_if_needed(): # https://github.com/dask/dask/issues/6597 expected = np.arange(11) x = da.from_array(expected, chunks=5) y = x.map_overlap(lambda x: x, depth=2, boundary=0) assert all(c >= 2 for c in y.chunks[0]) assert_eq(y, expected) def test_map_overlap_rechunks_array_along_multiple_dims_if_needed(): # https://github.com/dask/dask/issues/6688 rand = da.random.random((860, 1024, 1024), chunks=(1, 1024, 1024)) filtered = rand.map_overlap( lambda arr: arr, depth=(2, 2, 2), boundary="reflect", ) assert all(all(c >= 2 for c in chunks) for chunks in filtered.chunks) @pytest.mark.parametrize( "chunks,expected", [ [(10,), (10,)], [(10, 10), (10, 10)], [ (10, 10, 1), (10, 11), ], [(20, 20, 20, 1), (20, 20, 11, 10)], [(20, 20, 10, 1), (20, 20, 11)], [(2, 20, 2, 20), (14, 10, 20)], [(1, 1, 1, 1, 7), (11,)], [(20, 20, 2, 20, 20, 2), (20, 12, 10, 20, 12, 10)], ], ) def test_ensure_minimum_chunksize(chunks, expected): actual = ensure_minimum_chunksize(10, chunks) assert actual == expected def test_ensure_minimum_chunksize_raises_error(): chunks = (5, 2, 1, 1) with pytest.raises(ValueError, match="overlapping depth 10 is larger than"): ensure_minimum_chunksize(10, chunks) @pytest.mark.parametrize( "shape, chunks, window_shape, axis", [ ((6, 7, 8), (6, (2, 2, 2, 1), 4), (3, 2), (1, 2)), # chunks vary along axis ((40, 30, 2), 5, (3,), (0,)), # window < chunk ((21,), 3, (7,), (0,)), # window > chunk ((9,), 3, 3, 0), # window == chunk, axis is integer ((9,), 3, 3, -1), # axis=-1 ((9,), 3, 3, None), # axis=None ((9, 8), 3, (2, 4), None), # axis=None ((9,), 3, (3, 3, 3), (0, 0, 0)), # axis is repeated ((9,), 3, (3, 3), (0, -1)), # axis is repeated, with -1 ((9,), 3, [3, 3], [0, -1]), # list instead of tuple ], ) def test_sliding_window_view(shape, chunks, window_shape, axis): from ..numpy_compat import sliding_window_view as np_sliding_window_view arr = da.from_array(np.arange(np.prod(shape)).reshape(shape), chunks=chunks) actual = sliding_window_view(arr, window_shape, axis) expected = np_sliding_window_view(arr.compute(), window_shape, axis) assert_eq(expected, actual) @pytest.mark.parametrize( "window_shape, axis", [ ((10,), 0), # window > axis shape ((2,), 3), # axis > ndim (-1, 0), # window shape is negative (2, (0, 1)), # len(window shape) < len(axis) (2, None), # len(window shape) < len(axis) (0, None), # window_shape = 0 ], ) def test_sliding_window_errors(window_shape, axis): arr = da.zeros((4, 3)) with pytest.raises(ValueError): sliding_window_view(arr, window_shape, axis)