import io import re import warnings from itertools import product import numpy as np import pytest from scipy import sparse from scipy.stats import kstest from sklearn import tree from sklearn.datasets import load_diabetes from sklearn.dummy import DummyRegressor from sklearn.exceptions import ConvergenceWarning # make IterativeImputer available from sklearn.experimental import enable_iterative_imputer # noqa from sklearn.impute import IterativeImputer, KNNImputer, MissingIndicator, SimpleImputer from sklearn.impute._base import _most_frequent from sklearn.linear_model import ARDRegression, BayesianRidge, RidgeCV from sklearn.model_selection import GridSearchCV from sklearn.pipeline import Pipeline, make_union from sklearn.random_projection import _sparse_random_matrix from sklearn.utils._testing import ( _convert_container, assert_allclose, assert_allclose_dense_sparse, assert_array_almost_equal, assert_array_equal, ) from sklearn.utils.fixes import ( BSR_CONTAINERS, COO_CONTAINERS, CSC_CONTAINERS, CSR_CONTAINERS, LIL_CONTAINERS, ) def _assert_array_equal_and_same_dtype(x, y): assert_array_equal(x, y) assert x.dtype == y.dtype def _assert_allclose_and_same_dtype(x, y): assert_allclose(x, y) assert x.dtype == y.dtype def _check_statistics( X, X_true, strategy, statistics, missing_values, sparse_container ): """Utility function for testing imputation for a given strategy. Test with dense and sparse arrays Check that: - the statistics (mean, median, mode) are correct - the missing values are imputed correctly""" err_msg = "Parameters: strategy = %s, missing_values = %s, sparse = {0}" % ( strategy, missing_values, ) assert_ae = assert_array_equal if X.dtype.kind == "f" or X_true.dtype.kind == "f": assert_ae = assert_array_almost_equal # Normal matrix imputer = SimpleImputer(missing_values=missing_values, strategy=strategy) X_trans = imputer.fit(X).transform(X.copy()) assert_ae(imputer.statistics_, statistics, err_msg=err_msg.format(False)) assert_ae(X_trans, X_true, err_msg=err_msg.format(False)) # Sparse matrix imputer = SimpleImputer(missing_values=missing_values, strategy=strategy) imputer.fit(sparse_container(X)) X_trans = imputer.transform(sparse_container(X.copy())) if sparse.issparse(X_trans): X_trans = X_trans.toarray() assert_ae(imputer.statistics_, statistics, err_msg=err_msg.format(True)) assert_ae(X_trans, X_true, err_msg=err_msg.format(True)) @pytest.mark.parametrize("strategy", ["mean", "median", "most_frequent", "constant"]) @pytest.mark.parametrize("csr_container", CSR_CONTAINERS) def test_imputation_shape(strategy, csr_container): # Verify the shapes of the imputed matrix for different strategies. X = np.random.randn(10, 2) X[::2] = np.nan imputer = SimpleImputer(strategy=strategy) X_imputed = imputer.fit_transform(csr_container(X)) assert X_imputed.shape == (10, 2) X_imputed = imputer.fit_transform(X) assert X_imputed.shape == (10, 2) iterative_imputer = IterativeImputer(initial_strategy=strategy) X_imputed = iterative_imputer.fit_transform(X) assert X_imputed.shape == (10, 2) @pytest.mark.parametrize("strategy", ["mean", "median", "most_frequent"]) def test_imputation_deletion_warning(strategy): X = np.ones((3, 5)) X[:, 0] = np.nan imputer = SimpleImputer(strategy=strategy).fit(X) with pytest.warns(UserWarning, match="Skipping"): imputer.transform(X) @pytest.mark.parametrize("strategy", ["mean", "median", "most_frequent"]) def test_imputation_deletion_warning_feature_names(strategy): pd = pytest.importorskip("pandas") missing_values = np.nan feature_names = np.array(["a", "b", "c", "d"], dtype=object) X = pd.DataFrame( [ [missing_values, missing_values, 1, missing_values], [4, missing_values, 2, 10], ], columns=feature_names, ) imputer = SimpleImputer(strategy=strategy).fit(X) # check SimpleImputer returning feature name attribute correctly assert_array_equal(imputer.feature_names_in_, feature_names) # ensure that skipped feature warning includes feature name with pytest.warns( UserWarning, match=r"Skipping features without any observed values: \['b'\]" ): imputer.transform(X) @pytest.mark.parametrize("strategy", ["mean", "median", "most_frequent", "constant"]) @pytest.mark.parametrize("csc_container", CSC_CONTAINERS) def test_imputation_error_sparse_0(strategy, csc_container): # check that error are raised when missing_values = 0 and input is sparse X = np.ones((3, 5)) X[0] = 0 X = csc_container(X) imputer = SimpleImputer(strategy=strategy, missing_values=0) with pytest.raises(ValueError, match="Provide a dense array"): imputer.fit(X) imputer.fit(X.toarray()) with pytest.raises(ValueError, match="Provide a dense array"): imputer.transform(X) def safe_median(arr, *args, **kwargs): # np.median([]) raises a TypeError for numpy >= 1.10.1 length = arr.size if hasattr(arr, "size") else len(arr) return np.nan if length == 0 else np.median(arr, *args, **kwargs) def safe_mean(arr, *args, **kwargs): # np.mean([]) raises a RuntimeWarning for numpy >= 1.10.1 length = arr.size if hasattr(arr, "size") else len(arr) return np.nan if length == 0 else np.mean(arr, *args, **kwargs) @pytest.mark.parametrize("csc_container", CSC_CONTAINERS) def test_imputation_mean_median(csc_container): # Test imputation using the mean and median strategies, when # missing_values != 0. rng = np.random.RandomState(0) dim = 10 dec = 10 shape = (dim * dim, dim + dec) zeros = np.zeros(shape[0]) values = np.arange(1, shape[0] + 1) values[4::2] = -values[4::2] tests = [ ("mean", np.nan, lambda z, v, p: safe_mean(np.hstack((z, v)))), ("median", np.nan, lambda z, v, p: safe_median(np.hstack((z, v)))), ] for strategy, test_missing_values, true_value_fun in tests: X = np.empty(shape) X_true = np.empty(shape) true_statistics = np.empty(shape[1]) # Create a matrix X with columns # - with only zeros, # - with only missing values # - with zeros, missing values and values # And a matrix X_true containing all true values for j in range(shape[1]): nb_zeros = (j - dec + 1 > 0) * (j - dec + 1) * (j - dec + 1) nb_missing_values = max(shape[0] + dec * dec - (j + dec) * (j + dec), 0) nb_values = shape[0] - nb_zeros - nb_missing_values z = zeros[:nb_zeros] p = np.repeat(test_missing_values, nb_missing_values) v = values[rng.permutation(len(values))[:nb_values]] true_statistics[j] = true_value_fun(z, v, p) # Create the columns X[:, j] = np.hstack((v, z, p)) if 0 == test_missing_values: # XXX unreached code as of v0.22 X_true[:, j] = np.hstack( (v, np.repeat(true_statistics[j], nb_missing_values + nb_zeros)) ) else: X_true[:, j] = np.hstack( (v, z, np.repeat(true_statistics[j], nb_missing_values)) ) # Shuffle them the same way np.random.RandomState(j).shuffle(X[:, j]) np.random.RandomState(j).shuffle(X_true[:, j]) # Mean doesn't support columns containing NaNs, median does if strategy == "median": cols_to_keep = ~np.isnan(X_true).any(axis=0) else: cols_to_keep = ~np.isnan(X_true).all(axis=0) X_true = X_true[:, cols_to_keep] _check_statistics( X, X_true, strategy, true_statistics, test_missing_values, csc_container ) @pytest.mark.parametrize("csc_container", CSC_CONTAINERS) def test_imputation_median_special_cases(csc_container): # Test median imputation with sparse boundary cases X = np.array( [ [0, np.nan, np.nan], # odd: implicit zero [5, np.nan, np.nan], # odd: explicit nonzero [0, 0, np.nan], # even: average two zeros [-5, 0, np.nan], # even: avg zero and neg [0, 5, np.nan], # even: avg zero and pos [4, 5, np.nan], # even: avg nonzeros [-4, -5, np.nan], # even: avg negatives [-1, 2, np.nan], # even: crossing neg and pos ] ).transpose() X_imputed_median = np.array( [ [0, 0, 0], [5, 5, 5], [0, 0, 0], [-5, 0, -2.5], [0, 5, 2.5], [4, 5, 4.5], [-4, -5, -4.5], [-1, 2, 0.5], ] ).transpose() statistics_median = [0, 5, 0, -2.5, 2.5, 4.5, -4.5, 0.5] _check_statistics( X, X_imputed_median, "median", statistics_median, np.nan, csc_container ) @pytest.mark.parametrize("strategy", ["mean", "median"]) @pytest.mark.parametrize("dtype", [None, object, str]) def test_imputation_mean_median_error_invalid_type(strategy, dtype): X = np.array([["a", "b", 3], [4, "e", 6], ["g", "h", 9]], dtype=dtype) msg = "non-numeric data:\ncould not convert string to float:" with pytest.raises(ValueError, match=msg): imputer = SimpleImputer(strategy=strategy) imputer.fit_transform(X) @pytest.mark.parametrize("strategy", ["mean", "median"]) @pytest.mark.parametrize("type", ["list", "dataframe"]) def test_imputation_mean_median_error_invalid_type_list_pandas(strategy, type): X = [["a", "b", 3], [4, "e", 6], ["g", "h", 9]] if type == "dataframe": pd = pytest.importorskip("pandas") X = pd.DataFrame(X) msg = "non-numeric data:\ncould not convert string to float:" with pytest.raises(ValueError, match=msg): imputer = SimpleImputer(strategy=strategy) imputer.fit_transform(X) @pytest.mark.parametrize("strategy", ["constant", "most_frequent"]) @pytest.mark.parametrize("dtype", [str, np.dtype("U"), np.dtype("S")]) def test_imputation_const_mostf_error_invalid_types(strategy, dtype): # Test imputation on non-numeric data using "most_frequent" and "constant" # strategy X = np.array( [ [np.nan, np.nan, "a", "f"], [np.nan, "c", np.nan, "d"], [np.nan, "b", "d", np.nan], [np.nan, "c", "d", "h"], ], dtype=dtype, ) err_msg = "SimpleImputer does not support data" with pytest.raises(ValueError, match=err_msg): imputer = SimpleImputer(strategy=strategy) imputer.fit(X).transform(X) @pytest.mark.parametrize("csc_container", CSC_CONTAINERS) def test_imputation_most_frequent(csc_container): # Test imputation using the most-frequent strategy. X = np.array( [ [-1, -1, 0, 5], [-1, 2, -1, 3], [-1, 1, 3, -1], [-1, 2, 3, 7], ] ) X_true = np.array( [ [2, 0, 5], [2, 3, 3], [1, 3, 3], [2, 3, 7], ] ) # scipy.stats.mode, used in SimpleImputer, doesn't return the first most # frequent as promised in the doc but the lowest most frequent. When this # test will fail after an update of scipy, SimpleImputer will need to be # updated to be consistent with the new (correct) behaviour _check_statistics(X, X_true, "most_frequent", [np.nan, 2, 3, 3], -1, csc_container) @pytest.mark.parametrize("marker", [None, np.nan, "NAN", "", 0]) def test_imputation_most_frequent_objects(marker): # Test imputation using the most-frequent strategy. X = np.array( [ [marker, marker, "a", "f"], [marker, "c", marker, "d"], [marker, "b", "d", marker], [marker, "c", "d", "h"], ], dtype=object, ) X_true = np.array( [ ["c", "a", "f"], ["c", "d", "d"], ["b", "d", "d"], ["c", "d", "h"], ], dtype=object, ) imputer = SimpleImputer(missing_values=marker, strategy="most_frequent") X_trans = imputer.fit(X).transform(X) assert_array_equal(X_trans, X_true) @pytest.mark.parametrize("dtype", [object, "category"]) def test_imputation_most_frequent_pandas(dtype): # Test imputation using the most frequent strategy on pandas df pd = pytest.importorskip("pandas") f = io.StringIO("Cat1,Cat2,Cat3,Cat4\n,i,x,\na,,y,\na,j,,\nb,j,x,") df = pd.read_csv(f, dtype=dtype) X_true = np.array( [["a", "i", "x"], ["a", "j", "y"], ["a", "j", "x"], ["b", "j", "x"]], dtype=object, ) imputer = SimpleImputer(strategy="most_frequent") X_trans = imputer.fit_transform(df) assert_array_equal(X_trans, X_true) @pytest.mark.parametrize("X_data, missing_value", [(1, 0), (1.0, np.nan)]) def test_imputation_constant_error_invalid_type(X_data, missing_value): # Verify that exceptions are raised on invalid fill_value type X = np.full((3, 5), X_data, dtype=float) X[0, 0] = missing_value fill_value = "x" err_msg = f"fill_value={fill_value!r} (of type {type(fill_value)!r}) cannot be cast" with pytest.raises(ValueError, match=re.escape(err_msg)): imputer = SimpleImputer( missing_values=missing_value, strategy="constant", fill_value=fill_value ) imputer.fit_transform(X) def test_imputation_constant_integer(): # Test imputation using the constant strategy on integers X = np.array([[-1, 2, 3, -1], [4, -1, 5, -1], [6, 7, -1, -1], [8, 9, 0, -1]]) X_true = np.array([[0, 2, 3, 0], [4, 0, 5, 0], [6, 7, 0, 0], [8, 9, 0, 0]]) imputer = SimpleImputer(missing_values=-1, strategy="constant", fill_value=0) X_trans = imputer.fit_transform(X) assert_array_equal(X_trans, X_true) @pytest.mark.parametrize("array_constructor", CSR_CONTAINERS + [np.asarray]) def test_imputation_constant_float(array_constructor): # Test imputation using the constant strategy on floats X = np.array( [ [np.nan, 1.1, 0, np.nan], [1.2, np.nan, 1.3, np.nan], [0, 0, np.nan, np.nan], [1.4, 1.5, 0, np.nan], ] ) X_true = np.array( [[-1, 1.1, 0, -1], [1.2, -1, 1.3, -1], [0, 0, -1, -1], [1.4, 1.5, 0, -1]] ) X = array_constructor(X) X_true = array_constructor(X_true) imputer = SimpleImputer(strategy="constant", fill_value=-1) X_trans = imputer.fit_transform(X) assert_allclose_dense_sparse(X_trans, X_true) @pytest.mark.parametrize("marker", [None, np.nan, "NAN", "", 0]) def test_imputation_constant_object(marker): # Test imputation using the constant strategy on objects X = np.array( [ [marker, "a", "b", marker], ["c", marker, "d", marker], ["e", "f", marker, marker], ["g", "h", "i", marker], ], dtype=object, ) X_true = np.array( [ ["missing", "a", "b", "missing"], ["c", "missing", "d", "missing"], ["e", "f", "missing", "missing"], ["g", "h", "i", "missing"], ], dtype=object, ) imputer = SimpleImputer( missing_values=marker, strategy="constant", fill_value="missing" ) X_trans = imputer.fit_transform(X) assert_array_equal(X_trans, X_true) @pytest.mark.parametrize("dtype", [object, "category"]) def test_imputation_constant_pandas(dtype): # Test imputation using the constant strategy on pandas df pd = pytest.importorskip("pandas") f = io.StringIO("Cat1,Cat2,Cat3,Cat4\n,i,x,\na,,y,\na,j,,\nb,j,x,") df = pd.read_csv(f, dtype=dtype) X_true = np.array( [ ["missing_value", "i", "x", "missing_value"], ["a", "missing_value", "y", "missing_value"], ["a", "j", "missing_value", "missing_value"], ["b", "j", "x", "missing_value"], ], dtype=object, ) imputer = SimpleImputer(strategy="constant") X_trans = imputer.fit_transform(df) assert_array_equal(X_trans, X_true) @pytest.mark.parametrize("X", [[[1], [2]], [[1], [np.nan]]]) def test_iterative_imputer_one_feature(X): # check we exit early when there is a single feature imputer = IterativeImputer().fit(X) assert imputer.n_iter_ == 0 imputer = IterativeImputer() imputer.fit([[1], [2]]) assert imputer.n_iter_ == 0 imputer.fit([[1], [np.nan]]) assert imputer.n_iter_ == 0 def test_imputation_pipeline_grid_search(): # Test imputation within a pipeline + gridsearch. X = _sparse_random_matrix(100, 100, density=0.10) missing_values = X.data[0] pipeline = Pipeline( [ ("imputer", SimpleImputer(missing_values=missing_values)), ("tree", tree.DecisionTreeRegressor(random_state=0)), ] ) parameters = {"imputer__strategy": ["mean", "median", "most_frequent"]} Y = _sparse_random_matrix(100, 1, density=0.10).toarray() gs = GridSearchCV(pipeline, parameters) gs.fit(X, Y) def test_imputation_copy(): # Test imputation with copy X_orig = _sparse_random_matrix(5, 5, density=0.75, random_state=0) # copy=True, dense => copy X = X_orig.copy().toarray() imputer = SimpleImputer(missing_values=0, strategy="mean", copy=True) Xt = imputer.fit(X).transform(X) Xt[0, 0] = -1 assert not np.all(X == Xt) # copy=True, sparse csr => copy X = X_orig.copy() imputer = SimpleImputer(missing_values=X.data[0], strategy="mean", copy=True) Xt = imputer.fit(X).transform(X) Xt.data[0] = -1 assert not np.all(X.data == Xt.data) # copy=False, dense => no copy X = X_orig.copy().toarray() imputer = SimpleImputer(missing_values=0, strategy="mean", copy=False) Xt = imputer.fit(X).transform(X) Xt[0, 0] = -1 assert_array_almost_equal(X, Xt) # copy=False, sparse csc => no copy X = X_orig.copy().tocsc() imputer = SimpleImputer(missing_values=X.data[0], strategy="mean", copy=False) Xt = imputer.fit(X).transform(X) Xt.data[0] = -1 assert_array_almost_equal(X.data, Xt.data) # copy=False, sparse csr => copy X = X_orig.copy() imputer = SimpleImputer(missing_values=X.data[0], strategy="mean", copy=False) Xt = imputer.fit(X).transform(X) Xt.data[0] = -1 assert not np.all(X.data == Xt.data) # Note: If X is sparse and if missing_values=0, then a (dense) copy of X is # made, even if copy=False. def test_iterative_imputer_zero_iters(): rng = np.random.RandomState(0) n = 100 d = 10 X = _sparse_random_matrix(n, d, density=0.10, random_state=rng).toarray() missing_flag = X == 0 X[missing_flag] = np.nan imputer = IterativeImputer(max_iter=0) X_imputed = imputer.fit_transform(X) # with max_iter=0, only initial imputation is performed assert_allclose(X_imputed, imputer.initial_imputer_.transform(X)) # repeat but force n_iter_ to 0 imputer = IterativeImputer(max_iter=5).fit(X) # transformed should not be equal to initial imputation assert not np.all(imputer.transform(X) == imputer.initial_imputer_.transform(X)) imputer.n_iter_ = 0 # now they should be equal as only initial imputation is done assert_allclose(imputer.transform(X), imputer.initial_imputer_.transform(X)) def test_iterative_imputer_verbose(): rng = np.random.RandomState(0) n = 100 d = 3 X = _sparse_random_matrix(n, d, density=0.10, random_state=rng).toarray() imputer = IterativeImputer(missing_values=0, max_iter=1, verbose=1) imputer.fit(X) imputer.transform(X) imputer = IterativeImputer(missing_values=0, max_iter=1, verbose=2) imputer.fit(X) imputer.transform(X) def test_iterative_imputer_all_missing(): n = 100 d = 3 X = np.zeros((n, d)) imputer = IterativeImputer(missing_values=0, max_iter=1) X_imputed = imputer.fit_transform(X) assert_allclose(X_imputed, imputer.initial_imputer_.transform(X)) @pytest.mark.parametrize( "imputation_order", ["random", "roman", "ascending", "descending", "arabic"] ) def test_iterative_imputer_imputation_order(imputation_order): rng = np.random.RandomState(0) n = 100 d = 10 max_iter = 2 X = _sparse_random_matrix(n, d, density=0.10, random_state=rng).toarray() X[:, 0] = 1 # this column should not be discarded by IterativeImputer imputer = IterativeImputer( missing_values=0, max_iter=max_iter, n_nearest_features=5, sample_posterior=False, skip_complete=True, min_value=0, max_value=1, verbose=1, imputation_order=imputation_order, random_state=rng, ) imputer.fit_transform(X) ordered_idx = [i.feat_idx for i in imputer.imputation_sequence_] assert len(ordered_idx) // imputer.n_iter_ == imputer.n_features_with_missing_ if imputation_order == "roman": assert np.all(ordered_idx[: d - 1] == np.arange(1, d)) elif imputation_order == "arabic": assert np.all(ordered_idx[: d - 1] == np.arange(d - 1, 0, -1)) elif imputation_order == "random": ordered_idx_round_1 = ordered_idx[: d - 1] ordered_idx_round_2 = ordered_idx[d - 1 :] assert ordered_idx_round_1 != ordered_idx_round_2 elif "ending" in imputation_order: assert len(ordered_idx) == max_iter * (d - 1) @pytest.mark.parametrize( "estimator", [None, DummyRegressor(), BayesianRidge(), ARDRegression(), RidgeCV()] ) def test_iterative_imputer_estimators(estimator): rng = np.random.RandomState(0) n = 100 d = 10 X = _sparse_random_matrix(n, d, density=0.10, random_state=rng).toarray() imputer = IterativeImputer( missing_values=0, max_iter=1, estimator=estimator, random_state=rng ) imputer.fit_transform(X) # check that types are correct for estimators hashes = [] for triplet in imputer.imputation_sequence_: expected_type = ( type(estimator) if estimator is not None else type(BayesianRidge()) ) assert isinstance(triplet.estimator, expected_type) hashes.append(id(triplet.estimator)) # check that each estimator is unique assert len(set(hashes)) == len(hashes) def test_iterative_imputer_clip(): rng = np.random.RandomState(0) n = 100 d = 10 X = _sparse_random_matrix(n, d, density=0.10, random_state=rng).toarray() imputer = IterativeImputer( missing_values=0, max_iter=1, min_value=0.1, max_value=0.2, random_state=rng ) Xt = imputer.fit_transform(X) assert_allclose(np.min(Xt[X == 0]), 0.1) assert_allclose(np.max(Xt[X == 0]), 0.2) assert_allclose(Xt[X != 0], X[X != 0]) def test_iterative_imputer_clip_truncnorm(): rng = np.random.RandomState(0) n = 100 d = 10 X = _sparse_random_matrix(n, d, density=0.10, random_state=rng).toarray() X[:, 0] = 1 imputer = IterativeImputer( missing_values=0, max_iter=2, n_nearest_features=5, sample_posterior=True, min_value=0.1, max_value=0.2, verbose=1, imputation_order="random", random_state=rng, ) Xt = imputer.fit_transform(X) assert_allclose(np.min(Xt[X == 0]), 0.1) assert_allclose(np.max(Xt[X == 0]), 0.2) assert_allclose(Xt[X != 0], X[X != 0]) def test_iterative_imputer_truncated_normal_posterior(): # test that the values that are imputed using `sample_posterior=True` # with boundaries (`min_value` and `max_value` are not None) are drawn # from a distribution that looks gaussian via the Kolmogorov Smirnov test. # note that starting from the wrong random seed will make this test fail # because random sampling doesn't occur at all when the imputation # is outside of the (min_value, max_value) range rng = np.random.RandomState(42) X = rng.normal(size=(5, 5)) X[0][0] = np.nan imputer = IterativeImputer( min_value=0, max_value=0.5, sample_posterior=True, random_state=rng ) imputer.fit_transform(X) # generate multiple imputations for the single missing value imputations = np.array([imputer.transform(X)[0][0] for _ in range(100)]) assert all(imputations >= 0) assert all(imputations <= 0.5) mu, sigma = imputations.mean(), imputations.std() ks_statistic, p_value = kstest((imputations - mu) / sigma, "norm") if sigma == 0: sigma += 1e-12 ks_statistic, p_value = kstest((imputations - mu) / sigma, "norm") # we want to fail to reject null hypothesis # null hypothesis: distributions are the same assert ks_statistic < 0.2 or p_value > 0.1, "The posterior does appear to be normal" @pytest.mark.parametrize("strategy", ["mean", "median", "most_frequent"]) def test_iterative_imputer_missing_at_transform(strategy): rng = np.random.RandomState(0) n = 100 d = 10 X_train = rng.randint(low=0, high=3, size=(n, d)) X_test = rng.randint(low=0, high=3, size=(n, d)) X_train[:, 0] = 1 # definitely no missing values in 0th column X_test[0, 0] = 0 # definitely missing value in 0th column imputer = IterativeImputer( missing_values=0, max_iter=1, initial_strategy=strategy, random_state=rng ).fit(X_train) initial_imputer = SimpleImputer(missing_values=0, strategy=strategy).fit(X_train) # if there were no missing values at time of fit, then imputer will # only use the initial imputer for that feature at transform assert_allclose( imputer.transform(X_test)[:, 0], initial_imputer.transform(X_test)[:, 0] ) def test_iterative_imputer_transform_stochasticity(): rng1 = np.random.RandomState(0) rng2 = np.random.RandomState(1) n = 100 d = 10 X = _sparse_random_matrix(n, d, density=0.10, random_state=rng1).toarray() # when sample_posterior=True, two transforms shouldn't be equal imputer = IterativeImputer( missing_values=0, max_iter=1, sample_posterior=True, random_state=rng1 ) imputer.fit(X) X_fitted_1 = imputer.transform(X) X_fitted_2 = imputer.transform(X) # sufficient to assert that the means are not the same assert np.mean(X_fitted_1) != pytest.approx(np.mean(X_fitted_2)) # when sample_posterior=False, and n_nearest_features=None # and imputation_order is not random # the two transforms should be identical even if rng are different imputer1 = IterativeImputer( missing_values=0, max_iter=1, sample_posterior=False, n_nearest_features=None, imputation_order="ascending", random_state=rng1, ) imputer2 = IterativeImputer( missing_values=0, max_iter=1, sample_posterior=False, n_nearest_features=None, imputation_order="ascending", random_state=rng2, ) imputer1.fit(X) imputer2.fit(X) X_fitted_1a = imputer1.transform(X) X_fitted_1b = imputer1.transform(X) X_fitted_2 = imputer2.transform(X) assert_allclose(X_fitted_1a, X_fitted_1b) assert_allclose(X_fitted_1a, X_fitted_2) def test_iterative_imputer_no_missing(): rng = np.random.RandomState(0) X = rng.rand(100, 100) X[:, 0] = np.nan m1 = IterativeImputer(max_iter=10, random_state=rng) m2 = IterativeImputer(max_iter=10, random_state=rng) pred1 = m1.fit(X).transform(X) pred2 = m2.fit_transform(X) # should exclude the first column entirely assert_allclose(X[:, 1:], pred1) # fit and fit_transform should both be identical assert_allclose(pred1, pred2) def test_iterative_imputer_rank_one(): rng = np.random.RandomState(0) d = 50 A = rng.rand(d, 1) B = rng.rand(1, d) X = np.dot(A, B) nan_mask = rng.rand(d, d) < 0.5 X_missing = X.copy() X_missing[nan_mask] = np.nan imputer = IterativeImputer(max_iter=5, verbose=1, random_state=rng) X_filled = imputer.fit_transform(X_missing) assert_allclose(X_filled, X, atol=0.02) @pytest.mark.parametrize("rank", [3, 5]) def test_iterative_imputer_transform_recovery(rank): rng = np.random.RandomState(0) n = 70 d = 70 A = rng.rand(n, rank) B = rng.rand(rank, d) X_filled = np.dot(A, B) nan_mask = rng.rand(n, d) < 0.5 X_missing = X_filled.copy() X_missing[nan_mask] = np.nan # split up data in half n = n // 2 X_train = X_missing[:n] X_test_filled = X_filled[n:] X_test = X_missing[n:] imputer = IterativeImputer( max_iter=5, imputation_order="descending", verbose=1, random_state=rng ).fit(X_train) X_test_est = imputer.transform(X_test) assert_allclose(X_test_filled, X_test_est, atol=0.1) def test_iterative_imputer_additive_matrix(): rng = np.random.RandomState(0) n = 100 d = 10 A = rng.randn(n, d) B = rng.randn(n, d) X_filled = np.zeros(A.shape) for i in range(d): for j in range(d): X_filled[:, (i + j) % d] += (A[:, i] + B[:, j]) / 2 # a quarter is randomly missing nan_mask = rng.rand(n, d) < 0.25 X_missing = X_filled.copy() X_missing[nan_mask] = np.nan # split up data n = n // 2 X_train = X_missing[:n] X_test_filled = X_filled[n:] X_test = X_missing[n:] imputer = IterativeImputer(max_iter=10, verbose=1, random_state=rng).fit(X_train) X_test_est = imputer.transform(X_test) assert_allclose(X_test_filled, X_test_est, rtol=1e-3, atol=0.01) def test_iterative_imputer_early_stopping(): rng = np.random.RandomState(0) n = 50 d = 5 A = rng.rand(n, 1) B = rng.rand(1, d) X = np.dot(A, B) nan_mask = rng.rand(n, d) < 0.5 X_missing = X.copy() X_missing[nan_mask] = np.nan imputer = IterativeImputer( max_iter=100, tol=1e-2, sample_posterior=False, verbose=1, random_state=rng ) X_filled_100 = imputer.fit_transform(X_missing) assert len(imputer.imputation_sequence_) == d * imputer.n_iter_ imputer = IterativeImputer( max_iter=imputer.n_iter_, sample_posterior=False, verbose=1, random_state=rng ) X_filled_early = imputer.fit_transform(X_missing) assert_allclose(X_filled_100, X_filled_early, atol=1e-7) imputer = IterativeImputer( max_iter=100, tol=0, sample_posterior=False, verbose=1, random_state=rng ) imputer.fit(X_missing) assert imputer.n_iter_ == imputer.max_iter def test_iterative_imputer_catch_warning(): # check that we catch a RuntimeWarning due to a division by zero when a # feature is constant in the dataset X, y = load_diabetes(return_X_y=True) n_samples, n_features = X.shape # simulate that a feature only contain one category during fit X[:, 3] = 1 # add some missing values rng = np.random.RandomState(0) missing_rate = 0.15 for feat in range(n_features): sample_idx = rng.choice( np.arange(n_samples), size=int(n_samples * missing_rate), replace=False ) X[sample_idx, feat] = np.nan imputer = IterativeImputer(n_nearest_features=5, sample_posterior=True) with warnings.catch_warnings(): warnings.simplefilter("error", RuntimeWarning) X_fill = imputer.fit_transform(X, y) assert not np.any(np.isnan(X_fill)) @pytest.mark.parametrize( "min_value, max_value, correct_output", [ (0, 100, np.array([[0] * 3, [100] * 3])), (None, None, np.array([[-np.inf] * 3, [np.inf] * 3])), (-np.inf, np.inf, np.array([[-np.inf] * 3, [np.inf] * 3])), ([-5, 5, 10], [100, 200, 300], np.array([[-5, 5, 10], [100, 200, 300]])), ( [-5, -np.inf, 10], [100, 200, np.inf], np.array([[-5, -np.inf, 10], [100, 200, np.inf]]), ), ], ids=["scalars", "None-default", "inf", "lists", "lists-with-inf"], ) def test_iterative_imputer_min_max_array_like(min_value, max_value, correct_output): # check that passing scalar or array-like # for min_value and max_value in IterativeImputer works X = np.random.RandomState(0).randn(10, 3) imputer = IterativeImputer(min_value=min_value, max_value=max_value) imputer.fit(X) assert isinstance(imputer._min_value, np.ndarray) and isinstance( imputer._max_value, np.ndarray ) assert (imputer._min_value.shape[0] == X.shape[1]) and ( imputer._max_value.shape[0] == X.shape[1] ) assert_allclose(correct_output[0, :], imputer._min_value) assert_allclose(correct_output[1, :], imputer._max_value) @pytest.mark.parametrize( "min_value, max_value, err_msg", [ (100, 0, "min_value >= max_value."), (np.inf, -np.inf, "min_value >= max_value."), ([-5, 5], [100, 200, 0], "_value' should be of shape"), ], ) def test_iterative_imputer_catch_min_max_error(min_value, max_value, err_msg): # check that passing scalar or array-like # for min_value and max_value in IterativeImputer works X = np.random.random((10, 3)) imputer = IterativeImputer(min_value=min_value, max_value=max_value) with pytest.raises(ValueError, match=err_msg): imputer.fit(X) @pytest.mark.parametrize( "min_max_1, min_max_2", [([None, None], [-np.inf, np.inf]), ([-10, 10], [[-10] * 4, [10] * 4])], ids=["None-vs-inf", "Scalar-vs-vector"], ) def test_iterative_imputer_min_max_array_like_imputation(min_max_1, min_max_2): # Test that None/inf and scalar/vector give the same imputation X_train = np.array( [ [np.nan, 2, 2, 1], [10, np.nan, np.nan, 7], [3, 1, np.nan, 1], [np.nan, 4, 2, np.nan], ] ) X_test = np.array( [[np.nan, 2, np.nan, 5], [2, 4, np.nan, np.nan], [np.nan, 1, 10, 1]] ) imputer1 = IterativeImputer( min_value=min_max_1[0], max_value=min_max_1[1], random_state=0 ) imputer2 = IterativeImputer( min_value=min_max_2[0], max_value=min_max_2[1], random_state=0 ) X_test_imputed1 = imputer1.fit(X_train).transform(X_test) X_test_imputed2 = imputer2.fit(X_train).transform(X_test) assert_allclose(X_test_imputed1[:, 0], X_test_imputed2[:, 0]) @pytest.mark.parametrize("skip_complete", [True, False]) def test_iterative_imputer_skip_non_missing(skip_complete): # check the imputing strategy when missing data are present in the # testing set only. # taken from: https://github.com/scikit-learn/scikit-learn/issues/14383 rng = np.random.RandomState(0) X_train = np.array([[5, 2, 2, 1], [10, 1, 2, 7], [3, 1, 1, 1], [8, 4, 2, 2]]) X_test = np.array([[np.nan, 2, 4, 5], [np.nan, 4, 1, 2], [np.nan, 1, 10, 1]]) imputer = IterativeImputer( initial_strategy="mean", skip_complete=skip_complete, random_state=rng ) X_test_est = imputer.fit(X_train).transform(X_test) if skip_complete: # impute with the initial strategy: 'mean' assert_allclose(X_test_est[:, 0], np.mean(X_train[:, 0])) else: assert_allclose(X_test_est[:, 0], [11, 7, 12], rtol=1e-4) @pytest.mark.parametrize("rs_imputer", [None, 1, np.random.RandomState(seed=1)]) @pytest.mark.parametrize("rs_estimator", [None, 1, np.random.RandomState(seed=1)]) def test_iterative_imputer_dont_set_random_state(rs_imputer, rs_estimator): class ZeroEstimator: def __init__(self, random_state): self.random_state = random_state def fit(self, *args, **kgards): return self def predict(self, X): return np.zeros(X.shape[0]) estimator = ZeroEstimator(random_state=rs_estimator) imputer = IterativeImputer(random_state=rs_imputer) X_train = np.zeros((10, 3)) imputer.fit(X_train) assert estimator.random_state == rs_estimator @pytest.mark.parametrize( "X_fit, X_trans, params, msg_err", [ ( np.array([[-1, 1], [1, 2]]), np.array([[-1, 1], [1, -1]]), {"features": "missing-only", "sparse": "auto"}, "have missing values in transform but have no missing values in fit", ), ( np.array([["a", "b"], ["c", "a"]], dtype=str), np.array([["a", "b"], ["c", "a"]], dtype=str), {}, "MissingIndicator does not support data with dtype", ), ], ) def test_missing_indicator_error(X_fit, X_trans, params, msg_err): indicator = MissingIndicator(missing_values=-1) indicator.set_params(**params) with pytest.raises(ValueError, match=msg_err): indicator.fit(X_fit).transform(X_trans) def _generate_missing_indicator_cases(): missing_values_dtypes = [(0, np.int32), (np.nan, np.float64), (-1, np.int32)] arr_types = ( [np.array] + CSC_CONTAINERS + CSR_CONTAINERS + COO_CONTAINERS + LIL_CONTAINERS + BSR_CONTAINERS ) return [ (arr_type, missing_values, dtype) for arr_type, (missing_values, dtype) in product( arr_types, missing_values_dtypes ) if not (missing_values == 0 and arr_type is not np.array) ] @pytest.mark.parametrize( "arr_type, missing_values, dtype", _generate_missing_indicator_cases() ) @pytest.mark.parametrize( "param_features, n_features, features_indices", [("missing-only", 3, np.array([0, 1, 2])), ("all", 3, np.array([0, 1, 2]))], ) def test_missing_indicator_new( missing_values, arr_type, dtype, param_features, n_features, features_indices ): X_fit = np.array([[missing_values, missing_values, 1], [4, 2, missing_values]]) X_trans = np.array([[missing_values, missing_values, 1], [4, 12, 10]]) X_fit_expected = np.array([[1, 1, 0], [0, 0, 1]]) X_trans_expected = np.array([[1, 1, 0], [0, 0, 0]]) # convert the input to the right array format and right dtype X_fit = arr_type(X_fit).astype(dtype) X_trans = arr_type(X_trans).astype(dtype) X_fit_expected = X_fit_expected.astype(dtype) X_trans_expected = X_trans_expected.astype(dtype) indicator = MissingIndicator( missing_values=missing_values, features=param_features, sparse=False ) X_fit_mask = indicator.fit_transform(X_fit) X_trans_mask = indicator.transform(X_trans) assert X_fit_mask.shape[1] == n_features assert X_trans_mask.shape[1] == n_features assert_array_equal(indicator.features_, features_indices) assert_allclose(X_fit_mask, X_fit_expected[:, features_indices]) assert_allclose(X_trans_mask, X_trans_expected[:, features_indices]) assert X_fit_mask.dtype == bool assert X_trans_mask.dtype == bool assert isinstance(X_fit_mask, np.ndarray) assert isinstance(X_trans_mask, np.ndarray) indicator.set_params(sparse=True) X_fit_mask_sparse = indicator.fit_transform(X_fit) X_trans_mask_sparse = indicator.transform(X_trans) assert X_fit_mask_sparse.dtype == bool assert X_trans_mask_sparse.dtype == bool assert X_fit_mask_sparse.format == "csc" assert X_trans_mask_sparse.format == "csc" assert_allclose(X_fit_mask_sparse.toarray(), X_fit_mask) assert_allclose(X_trans_mask_sparse.toarray(), X_trans_mask) @pytest.mark.parametrize( "arr_type", CSC_CONTAINERS + CSR_CONTAINERS + COO_CONTAINERS + LIL_CONTAINERS + BSR_CONTAINERS, ) def test_missing_indicator_raise_on_sparse_with_missing_0(arr_type): # test for sparse input and missing_value == 0 missing_values = 0 X_fit = np.array([[missing_values, missing_values, 1], [4, missing_values, 2]]) X_trans = np.array([[missing_values, missing_values, 1], [4, 12, 10]]) # convert the input to the right array format X_fit_sparse = arr_type(X_fit) X_trans_sparse = arr_type(X_trans) indicator = MissingIndicator(missing_values=missing_values) with pytest.raises(ValueError, match="Sparse input with missing_values=0"): indicator.fit_transform(X_fit_sparse) indicator.fit_transform(X_fit) with pytest.raises(ValueError, match="Sparse input with missing_values=0"): indicator.transform(X_trans_sparse) @pytest.mark.parametrize("param_sparse", [True, False, "auto"]) @pytest.mark.parametrize( "arr_type, missing_values", [(np.array, 0)] + list( product( CSC_CONTAINERS + CSR_CONTAINERS + COO_CONTAINERS + LIL_CONTAINERS + BSR_CONTAINERS, [np.nan], ) ), ) def test_missing_indicator_sparse_param(arr_type, missing_values, param_sparse): # check the format of the output with different sparse parameter X_fit = np.array([[missing_values, missing_values, 1], [4, missing_values, 2]]) X_trans = np.array([[missing_values, missing_values, 1], [4, 12, 10]]) X_fit = arr_type(X_fit).astype(np.float64) X_trans = arr_type(X_trans).astype(np.float64) indicator = MissingIndicator(missing_values=missing_values, sparse=param_sparse) X_fit_mask = indicator.fit_transform(X_fit) X_trans_mask = indicator.transform(X_trans) if param_sparse is True: assert X_fit_mask.format == "csc" assert X_trans_mask.format == "csc" elif param_sparse == "auto" and missing_values == 0: assert isinstance(X_fit_mask, np.ndarray) assert isinstance(X_trans_mask, np.ndarray) elif param_sparse is False: assert isinstance(X_fit_mask, np.ndarray) assert isinstance(X_trans_mask, np.ndarray) else: if sparse.issparse(X_fit): assert X_fit_mask.format == "csc" assert X_trans_mask.format == "csc" else: assert isinstance(X_fit_mask, np.ndarray) assert isinstance(X_trans_mask, np.ndarray) def test_missing_indicator_string(): X = np.array([["a", "b", "c"], ["b", "c", "a"]], dtype=object) indicator = MissingIndicator(missing_values="a", features="all") X_trans = indicator.fit_transform(X) assert_array_equal(X_trans, np.array([[True, False, False], [False, False, True]])) @pytest.mark.parametrize( "X, missing_values, X_trans_exp", [ ( np.array([["a", "b"], ["b", "a"]], dtype=object), "a", np.array([["b", "b", True, False], ["b", "b", False, True]], dtype=object), ), ( np.array([[np.nan, 1.0], [1.0, np.nan]]), np.nan, np.array([[1.0, 1.0, True, False], [1.0, 1.0, False, True]]), ), ( np.array([[np.nan, "b"], ["b", np.nan]], dtype=object), np.nan, np.array([["b", "b", True, False], ["b", "b", False, True]], dtype=object), ), ( np.array([[None, "b"], ["b", None]], dtype=object), None, np.array([["b", "b", True, False], ["b", "b", False, True]], dtype=object), ), ], ) def test_missing_indicator_with_imputer(X, missing_values, X_trans_exp): trans = make_union( SimpleImputer(missing_values=missing_values, strategy="most_frequent"), MissingIndicator(missing_values=missing_values), ) X_trans = trans.fit_transform(X) assert_array_equal(X_trans, X_trans_exp) @pytest.mark.parametrize("imputer_constructor", [SimpleImputer, IterativeImputer]) @pytest.mark.parametrize( "imputer_missing_values, missing_value, err_msg", [ ("NaN", np.nan, "Input X contains NaN"), ("-1", -1, "types are expected to be both numerical."), ], ) def test_inconsistent_dtype_X_missing_values( imputer_constructor, imputer_missing_values, missing_value, err_msg ): # regression test for issue #11390. Comparison between incoherent dtype # for X and missing_values was not raising a proper error. rng = np.random.RandomState(42) X = rng.randn(10, 10) X[0, 0] = missing_value imputer = imputer_constructor(missing_values=imputer_missing_values) with pytest.raises(ValueError, match=err_msg): imputer.fit_transform(X) def test_missing_indicator_no_missing(): # check that all features are dropped if there are no missing values when # features='missing-only' (#13491) X = np.array([[1, 1], [1, 1]]) mi = MissingIndicator(features="missing-only", missing_values=-1) Xt = mi.fit_transform(X) assert Xt.shape[1] == 0 @pytest.mark.parametrize("csr_container", CSR_CONTAINERS) def test_missing_indicator_sparse_no_explicit_zeros(csr_container): # Check that non missing values don't become explicit zeros in the mask # generated by missing indicator when X is sparse. (#13491) X = csr_container([[0, 1, 2], [1, 2, 0], [2, 0, 1]]) mi = MissingIndicator(features="all", missing_values=1) Xt = mi.fit_transform(X) assert Xt.getnnz() == Xt.sum() @pytest.mark.parametrize("imputer_constructor", [SimpleImputer, IterativeImputer]) def test_imputer_without_indicator(imputer_constructor): X = np.array([[1, 1], [1, 1]]) imputer = imputer_constructor() imputer.fit(X) assert imputer.indicator_ is None @pytest.mark.parametrize( "arr_type", CSC_CONTAINERS + CSR_CONTAINERS + COO_CONTAINERS + LIL_CONTAINERS + BSR_CONTAINERS, ) def test_simple_imputation_add_indicator_sparse_matrix(arr_type): X_sparse = arr_type([[np.nan, 1, 5], [2, np.nan, 1], [6, 3, np.nan], [1, 2, 9]]) X_true = np.array( [ [3.0, 1.0, 5.0, 1.0, 0.0, 0.0], [2.0, 2.0, 1.0, 0.0, 1.0, 0.0], [6.0, 3.0, 5.0, 0.0, 0.0, 1.0], [1.0, 2.0, 9.0, 0.0, 0.0, 0.0], ] ) imputer = SimpleImputer(missing_values=np.nan, add_indicator=True) X_trans = imputer.fit_transform(X_sparse) assert sparse.issparse(X_trans) assert X_trans.shape == X_true.shape assert_allclose(X_trans.toarray(), X_true) @pytest.mark.parametrize( "strategy, expected", [("most_frequent", "b"), ("constant", "missing_value")] ) def test_simple_imputation_string_list(strategy, expected): X = [["a", "b"], ["c", np.nan]] X_true = np.array([["a", "b"], ["c", expected]], dtype=object) imputer = SimpleImputer(strategy=strategy) X_trans = imputer.fit_transform(X) assert_array_equal(X_trans, X_true) @pytest.mark.parametrize( "order, idx_order", [("ascending", [3, 4, 2, 0, 1]), ("descending", [1, 0, 2, 4, 3])], ) def test_imputation_order(order, idx_order): # regression test for #15393 rng = np.random.RandomState(42) X = rng.rand(100, 5) X[:50, 1] = np.nan X[:30, 0] = np.nan X[:20, 2] = np.nan X[:10, 4] = np.nan with pytest.warns(ConvergenceWarning): trs = IterativeImputer(max_iter=1, imputation_order=order, random_state=0).fit( X ) idx = [x.feat_idx for x in trs.imputation_sequence_] assert idx == idx_order @pytest.mark.parametrize("missing_value", [-1, np.nan]) def test_simple_imputation_inverse_transform(missing_value): # Test inverse_transform feature for np.nan X_1 = np.array( [ [9, missing_value, 3, -1], [4, -1, 5, 4], [6, 7, missing_value, -1], [8, 9, 0, missing_value], ] ) X_2 = np.array( [ [5, 4, 2, 1], [2, 1, missing_value, 3], [9, missing_value, 7, 1], [6, 4, 2, missing_value], ] ) X_3 = np.array( [ [1, missing_value, 5, 9], [missing_value, 4, missing_value, missing_value], [2, missing_value, 7, missing_value], [missing_value, 3, missing_value, 8], ] ) X_4 = np.array( [ [1, 1, 1, 3], [missing_value, 2, missing_value, 1], [2, 3, 3, 4], [missing_value, 4, missing_value, 2], ] ) imputer = SimpleImputer( missing_values=missing_value, strategy="mean", add_indicator=True ) X_1_trans = imputer.fit_transform(X_1) X_1_inv_trans = imputer.inverse_transform(X_1_trans) X_2_trans = imputer.transform(X_2) # test on new data X_2_inv_trans = imputer.inverse_transform(X_2_trans) assert_array_equal(X_1_inv_trans, X_1) assert_array_equal(X_2_inv_trans, X_2) for X in [X_3, X_4]: X_trans = imputer.fit_transform(X) X_inv_trans = imputer.inverse_transform(X_trans) assert_array_equal(X_inv_trans, X) @pytest.mark.parametrize("missing_value", [-1, np.nan]) def test_simple_imputation_inverse_transform_exceptions(missing_value): X_1 = np.array( [ [9, missing_value, 3, -1], [4, -1, 5, 4], [6, 7, missing_value, -1], [8, 9, 0, missing_value], ] ) imputer = SimpleImputer(missing_values=missing_value, strategy="mean") X_1_trans = imputer.fit_transform(X_1) with pytest.raises( ValueError, match=f"Got 'add_indicator={imputer.add_indicator}'" ): imputer.inverse_transform(X_1_trans) @pytest.mark.parametrize( "expected,array,dtype,extra_value,n_repeat", [ # array of object dtype ("extra_value", ["a", "b", "c"], object, "extra_value", 2), ( "most_frequent_value", ["most_frequent_value", "most_frequent_value", "value"], object, "extra_value", 1, ), ("a", ["min_value", "min_valuevalue"], object, "a", 2), ("min_value", ["min_value", "min_value", "value"], object, "z", 2), # array of numeric dtype (10, [1, 2, 3], int, 10, 2), (1, [1, 1, 2], int, 10, 1), (10, [20, 20, 1], int, 10, 2), (1, [1, 1, 20], int, 10, 2), ], ) def test_most_frequent(expected, array, dtype, extra_value, n_repeat): assert expected == _most_frequent( np.array(array, dtype=dtype), extra_value, n_repeat ) @pytest.mark.parametrize( "initial_strategy", ["mean", "median", "most_frequent", "constant"] ) def test_iterative_imputer_keep_empty_features(initial_strategy): """Check the behaviour of the iterative imputer with different initial strategy and keeping empty features (i.e. features containing only missing values). """ X = np.array([[1, np.nan, 2], [3, np.nan, np.nan]]) imputer = IterativeImputer( initial_strategy=initial_strategy, keep_empty_features=True ) X_imputed = imputer.fit_transform(X) assert_allclose(X_imputed[:, 1], 0) X_imputed = imputer.transform(X) assert_allclose(X_imputed[:, 1], 0) def test_iterative_imputer_constant_fill_value(): """Check that we propagate properly the parameter `fill_value`.""" X = np.array([[-1, 2, 3, -1], [4, -1, 5, -1], [6, 7, -1, -1], [8, 9, 0, -1]]) fill_value = 100 imputer = IterativeImputer( missing_values=-1, initial_strategy="constant", fill_value=fill_value, max_iter=0, ) imputer.fit_transform(X) assert_array_equal(imputer.initial_imputer_.statistics_, fill_value) @pytest.mark.parametrize("keep_empty_features", [True, False]) def test_knn_imputer_keep_empty_features(keep_empty_features): """Check the behaviour of `keep_empty_features` for `KNNImputer`.""" X = np.array([[1, np.nan, 2], [3, np.nan, np.nan]]) imputer = KNNImputer(keep_empty_features=keep_empty_features) for method in ["fit_transform", "transform"]: X_imputed = getattr(imputer, method)(X) if keep_empty_features: assert X_imputed.shape == X.shape assert_array_equal(X_imputed[:, 1], 0) else: assert X_imputed.shape == (X.shape[0], X.shape[1] - 1) def test_simple_impute_pd_na(): pd = pytest.importorskip("pandas") # Impute pandas array of string types. df = pd.DataFrame({"feature": pd.Series(["abc", None, "de"], dtype="string")}) imputer = SimpleImputer(missing_values=pd.NA, strategy="constant", fill_value="na") _assert_array_equal_and_same_dtype( imputer.fit_transform(df), np.array([["abc"], ["na"], ["de"]], dtype=object) ) # Impute pandas array of string types without any missing values. df = pd.DataFrame({"feature": pd.Series(["abc", "de", "fgh"], dtype="string")}) imputer = SimpleImputer(fill_value="ok", strategy="constant") _assert_array_equal_and_same_dtype( imputer.fit_transform(df), np.array([["abc"], ["de"], ["fgh"]], dtype=object) ) # Impute pandas array of integer types. df = pd.DataFrame({"feature": pd.Series([1, None, 3], dtype="Int64")}) imputer = SimpleImputer(missing_values=pd.NA, strategy="constant", fill_value=-1) _assert_allclose_and_same_dtype( imputer.fit_transform(df), np.array([[1], [-1], [3]], dtype="float64") ) # Use `np.nan` also works. imputer = SimpleImputer(missing_values=np.nan, strategy="constant", fill_value=-1) _assert_allclose_and_same_dtype( imputer.fit_transform(df), np.array([[1], [-1], [3]], dtype="float64") ) # Impute pandas array of integer types with 'median' strategy. df = pd.DataFrame({"feature": pd.Series([1, None, 2, 3], dtype="Int64")}) imputer = SimpleImputer(missing_values=pd.NA, strategy="median") _assert_allclose_and_same_dtype( imputer.fit_transform(df), np.array([[1], [2], [2], [3]], dtype="float64") ) # Impute pandas array of integer types with 'mean' strategy. df = pd.DataFrame({"feature": pd.Series([1, None, 2], dtype="Int64")}) imputer = SimpleImputer(missing_values=pd.NA, strategy="mean") _assert_allclose_and_same_dtype( imputer.fit_transform(df), np.array([[1], [1.5], [2]], dtype="float64") ) # Impute pandas array of float types. df = pd.DataFrame({"feature": pd.Series([1.0, None, 3.0], dtype="float64")}) imputer = SimpleImputer(missing_values=pd.NA, strategy="constant", fill_value=-2.0) _assert_allclose_and_same_dtype( imputer.fit_transform(df), np.array([[1.0], [-2.0], [3.0]], dtype="float64") ) # Impute pandas array of float types with 'median' strategy. df = pd.DataFrame({"feature": pd.Series([1.0, None, 2.0, 3.0], dtype="float64")}) imputer = SimpleImputer(missing_values=pd.NA, strategy="median") _assert_allclose_and_same_dtype( imputer.fit_transform(df), np.array([[1.0], [2.0], [2.0], [3.0]], dtype="float64"), ) def test_missing_indicator_feature_names_out(): """Check that missing indicator return the feature names with a prefix.""" pd = pytest.importorskip("pandas") missing_values = np.nan X = pd.DataFrame( [ [missing_values, missing_values, 1, missing_values], [4, missing_values, 2, 10], ], columns=["a", "b", "c", "d"], ) indicator = MissingIndicator(missing_values=missing_values).fit(X) feature_names = indicator.get_feature_names_out() expected_names = ["missingindicator_a", "missingindicator_b", "missingindicator_d"] assert_array_equal(expected_names, feature_names) def test_imputer_lists_fit_transform(): """Check transform uses object dtype when fitted on an object dtype. Non-regression test for #19572. """ X = [["a", "b"], ["c", "b"], ["a", "a"]] imp_frequent = SimpleImputer(strategy="most_frequent").fit(X) X_trans = imp_frequent.transform([[np.nan, np.nan]]) assert X_trans.dtype == object assert_array_equal(X_trans, [["a", "b"]]) @pytest.mark.parametrize("dtype_test", [np.float32, np.float64]) def test_imputer_transform_preserves_numeric_dtype(dtype_test): """Check transform preserves numeric dtype independent of fit dtype.""" X = np.asarray( [[1.2, 3.4, np.nan], [np.nan, 1.2, 1.3], [4.2, 2, 1]], dtype=np.float64 ) imp = SimpleImputer().fit(X) X_test = np.asarray([[np.nan, np.nan, np.nan]], dtype=dtype_test) X_trans = imp.transform(X_test) assert X_trans.dtype == dtype_test @pytest.mark.parametrize("array_type", ["array", "sparse"]) @pytest.mark.parametrize("keep_empty_features", [True, False]) def test_simple_imputer_constant_keep_empty_features(array_type, keep_empty_features): """Check the behaviour of `keep_empty_features` with `strategy='constant'. For backward compatibility, a column full of missing values will always be fill and never dropped. """ X = np.array([[np.nan, 2], [np.nan, 3], [np.nan, 6]]) X = _convert_container(X, array_type) fill_value = 10 imputer = SimpleImputer( strategy="constant", fill_value=fill_value, keep_empty_features=keep_empty_features, ) for method in ["fit_transform", "transform"]: X_imputed = getattr(imputer, method)(X) assert X_imputed.shape == X.shape constant_feature = ( X_imputed[:, 0].toarray() if array_type == "sparse" else X_imputed[:, 0] ) assert_array_equal(constant_feature, fill_value) @pytest.mark.parametrize("array_type", ["array", "sparse"]) @pytest.mark.parametrize("strategy", ["mean", "median", "most_frequent"]) @pytest.mark.parametrize("keep_empty_features", [True, False]) def test_simple_imputer_keep_empty_features(strategy, array_type, keep_empty_features): """Check the behaviour of `keep_empty_features` with all strategies but 'constant'. """ X = np.array([[np.nan, 2], [np.nan, 3], [np.nan, 6]]) X = _convert_container(X, array_type) imputer = SimpleImputer(strategy=strategy, keep_empty_features=keep_empty_features) for method in ["fit_transform", "transform"]: X_imputed = getattr(imputer, method)(X) if keep_empty_features: assert X_imputed.shape == X.shape constant_feature = ( X_imputed[:, 0].toarray() if array_type == "sparse" else X_imputed[:, 0] ) assert_array_equal(constant_feature, 0) else: assert X_imputed.shape == (X.shape[0], X.shape[1] - 1) def test_simple_imputer_constant_fill_value_casting(): """Check that we raise a proper error message when we cannot cast the fill value to the input data type. Otherwise, check that the casting is done properly. Non-regression test for: https://github.com/scikit-learn/scikit-learn/issues/28309 """ # cannot cast fill_value at fit fill_value = 1.5 X_int64 = np.array([[1, 2, 3], [2, 3, 4]], dtype=np.int64) imputer = SimpleImputer( strategy="constant", fill_value=fill_value, missing_values=2 ) err_msg = f"fill_value={fill_value!r} (of type {type(fill_value)!r}) cannot be cast" with pytest.raises(ValueError, match=re.escape(err_msg)): imputer.fit(X_int64) # cannot cast fill_value at transform X_float64 = np.array([[1, 2, 3], [2, 3, 4]], dtype=np.float64) imputer.fit(X_float64) err_msg = ( f"The dtype of the filling value (i.e. {imputer.statistics_.dtype!r}) " "cannot be cast" ) with pytest.raises(ValueError, match=re.escape(err_msg)): imputer.transform(X_int64) # check that no error is raised when having the same kind of dtype fill_value_list = [np.float64(1.5), 1.5, 1] X_float32 = X_float64.astype(np.float32) for fill_value in fill_value_list: imputer = SimpleImputer( strategy="constant", fill_value=fill_value, missing_values=2 ) X_trans = imputer.fit_transform(X_float32) assert X_trans.dtype == X_float32.dtype