""" Testing for Clustering methods """ import warnings import numpy as np import pytest from sklearn.cluster import AffinityPropagation, affinity_propagation from sklearn.cluster._affinity_propagation import _equal_similarities_and_preferences from sklearn.datasets import make_blobs from sklearn.exceptions import ConvergenceWarning, NotFittedError from sklearn.metrics import euclidean_distances from sklearn.utils._testing import assert_allclose, assert_array_equal from sklearn.utils.fixes import CSR_CONTAINERS n_clusters = 3 centers = np.array([[1, 1], [-1, -1], [1, -1]]) + 10 X, _ = make_blobs( n_samples=60, n_features=2, centers=centers, cluster_std=0.4, shuffle=True, random_state=0, ) # TODO: AffinityPropagation must preserve dtype for its fitted attributes # and test must be created accordingly to this new behavior. # For more details, see: https://github.com/scikit-learn/scikit-learn/issues/11000 def test_affinity_propagation(global_random_seed, global_dtype): """Test consistency of the affinity propagations.""" S = -euclidean_distances(X.astype(global_dtype, copy=False), squared=True) preference = np.median(S) * 10 cluster_centers_indices, labels = affinity_propagation( S, preference=preference, random_state=global_random_seed ) n_clusters_ = len(cluster_centers_indices) assert n_clusters == n_clusters_ def test_affinity_propagation_precomputed(): """Check equality of precomputed affinity matrix to internally computed affinity matrix. """ S = -euclidean_distances(X, squared=True) preference = np.median(S) * 10 af = AffinityPropagation( preference=preference, affinity="precomputed", random_state=28 ) labels_precomputed = af.fit(S).labels_ af = AffinityPropagation(preference=preference, verbose=True, random_state=37) labels = af.fit(X).labels_ assert_array_equal(labels, labels_precomputed) cluster_centers_indices = af.cluster_centers_indices_ n_clusters_ = len(cluster_centers_indices) assert np.unique(labels).size == n_clusters_ assert n_clusters == n_clusters_ def test_affinity_propagation_no_copy(): """Check behaviour of not copying the input data.""" S = -euclidean_distances(X, squared=True) S_original = S.copy() preference = np.median(S) * 10 assert not np.allclose(S.diagonal(), preference) # with copy=True S should not be modified affinity_propagation(S, preference=preference, copy=True, random_state=0) assert_allclose(S, S_original) assert not np.allclose(S.diagonal(), preference) assert_allclose(S.diagonal(), np.zeros(S.shape[0])) # with copy=False S will be modified inplace affinity_propagation(S, preference=preference, copy=False, random_state=0) assert_allclose(S.diagonal(), preference) # test that copy=True and copy=False lead to the same result S = S_original.copy() af = AffinityPropagation(preference=preference, verbose=True, random_state=0) labels = af.fit(X).labels_ _, labels_no_copy = affinity_propagation( S, preference=preference, copy=False, random_state=74 ) assert_array_equal(labels, labels_no_copy) def test_affinity_propagation_affinity_shape(): """Check the shape of the affinity matrix when using `affinity_propagation.""" S = -euclidean_distances(X, squared=True) err_msg = "The matrix of similarities must be a square array" with pytest.raises(ValueError, match=err_msg): affinity_propagation(S[:, :-1]) @pytest.mark.parametrize("csr_container", CSR_CONTAINERS) def test_affinity_propagation_precomputed_with_sparse_input(csr_container): err_msg = "Sparse data was passed for X, but dense data is required" with pytest.raises(TypeError, match=err_msg): AffinityPropagation(affinity="precomputed").fit(csr_container((3, 3))) def test_affinity_propagation_predict(global_random_seed, global_dtype): # Test AffinityPropagation.predict af = AffinityPropagation(affinity="euclidean", random_state=global_random_seed) X_ = X.astype(global_dtype, copy=False) labels = af.fit_predict(X_) labels2 = af.predict(X_) assert_array_equal(labels, labels2) def test_affinity_propagation_predict_error(): # Test exception in AffinityPropagation.predict # Not fitted. af = AffinityPropagation(affinity="euclidean") with pytest.raises(NotFittedError): af.predict(X) # Predict not supported when affinity="precomputed". S = np.dot(X, X.T) af = AffinityPropagation(affinity="precomputed", random_state=57) af.fit(S) with pytest.raises(ValueError, match="expecting 60 features as input"): af.predict(X) def test_affinity_propagation_fit_non_convergence(global_dtype): # In case of non-convergence of affinity_propagation(), the cluster # centers should be an empty array and training samples should be labelled # as noise (-1) X = np.array([[0, 0], [1, 1], [-2, -2]], dtype=global_dtype) # Force non-convergence by allowing only a single iteration af = AffinityPropagation(preference=-10, max_iter=1, random_state=82) with pytest.warns(ConvergenceWarning): af.fit(X) assert_allclose(np.empty((0, 2)), af.cluster_centers_) assert_array_equal(np.array([-1, -1, -1]), af.labels_) def test_affinity_propagation_equal_mutual_similarities(global_dtype): X = np.array([[-1, 1], [1, -1]], dtype=global_dtype) S = -euclidean_distances(X, squared=True) # setting preference > similarity with pytest.warns(UserWarning, match="mutually equal"): cluster_center_indices, labels = affinity_propagation(S, preference=0) # expect every sample to become an exemplar assert_array_equal([0, 1], cluster_center_indices) assert_array_equal([0, 1], labels) # setting preference < similarity with pytest.warns(UserWarning, match="mutually equal"): cluster_center_indices, labels = affinity_propagation(S, preference=-10) # expect one cluster, with arbitrary (first) sample as exemplar assert_array_equal([0], cluster_center_indices) assert_array_equal([0, 0], labels) # setting different preferences with warnings.catch_warnings(): warnings.simplefilter("error", UserWarning) cluster_center_indices, labels = affinity_propagation( S, preference=[-20, -10], random_state=37 ) # expect one cluster, with highest-preference sample as exemplar assert_array_equal([1], cluster_center_indices) assert_array_equal([0, 0], labels) def test_affinity_propagation_predict_non_convergence(global_dtype): # In case of non-convergence of affinity_propagation(), the cluster # centers should be an empty array X = np.array([[0, 0], [1, 1], [-2, -2]], dtype=global_dtype) # Force non-convergence by allowing only a single iteration with pytest.warns(ConvergenceWarning): af = AffinityPropagation(preference=-10, max_iter=1, random_state=75).fit(X) # At prediction time, consider new samples as noise since there are no # clusters to_predict = np.array([[2, 2], [3, 3], [4, 4]]) with pytest.warns(ConvergenceWarning): y = af.predict(to_predict) assert_array_equal(np.array([-1, -1, -1]), y) def test_affinity_propagation_non_convergence_regressiontest(global_dtype): X = np.array( [[1, 0, 0, 0, 0, 0], [0, 1, 1, 1, 0, 0], [0, 0, 1, 0, 0, 1]], dtype=global_dtype ) af = AffinityPropagation(affinity="euclidean", max_iter=2, random_state=34) msg = ( "Affinity propagation did not converge, this model may return degenerate" " cluster centers and labels." ) with pytest.warns(ConvergenceWarning, match=msg): af.fit(X) assert_array_equal(np.array([0, 0, 0]), af.labels_) def test_equal_similarities_and_preferences(global_dtype): # Unequal distances X = np.array([[0, 0], [1, 1], [-2, -2]], dtype=global_dtype) S = -euclidean_distances(X, squared=True) assert not _equal_similarities_and_preferences(S, np.array(0)) assert not _equal_similarities_and_preferences(S, np.array([0, 0])) assert not _equal_similarities_and_preferences(S, np.array([0, 1])) # Equal distances X = np.array([[0, 0], [1, 1]], dtype=global_dtype) S = -euclidean_distances(X, squared=True) # Different preferences assert not _equal_similarities_and_preferences(S, np.array([0, 1])) # Same preferences assert _equal_similarities_and_preferences(S, np.array([0, 0])) assert _equal_similarities_and_preferences(S, np.array(0)) def test_affinity_propagation_random_state(): """Check that different random states lead to different initialisations by looking at the center locations after two iterations. """ centers = [[1, 1], [-1, -1], [1, -1]] X, labels_true = make_blobs( n_samples=300, centers=centers, cluster_std=0.5, random_state=0 ) # random_state = 0 ap = AffinityPropagation(convergence_iter=1, max_iter=2, random_state=0) ap.fit(X) centers0 = ap.cluster_centers_ # random_state = 76 ap = AffinityPropagation(convergence_iter=1, max_iter=2, random_state=76) ap.fit(X) centers76 = ap.cluster_centers_ # check that the centers have not yet converged to the same solution assert np.mean((centers0 - centers76) ** 2) > 1 @pytest.mark.parametrize("container", CSR_CONTAINERS + [np.array]) def test_affinity_propagation_convergence_warning_dense_sparse(container, global_dtype): """ Check that having sparse or dense `centers` format should not influence the convergence. Non-regression test for gh-13334. """ centers = container(np.zeros((1, 10))) rng = np.random.RandomState(42) X = rng.rand(40, 10).astype(global_dtype, copy=False) y = (4 * rng.rand(40)).astype(int) ap = AffinityPropagation(random_state=46) ap.fit(X, y) ap.cluster_centers_ = centers with warnings.catch_warnings(): warnings.simplefilter("error", ConvergenceWarning) assert_array_equal(ap.predict(X), np.zeros(X.shape[0], dtype=int)) # FIXME; this test is broken with different random states, needs to be revisited def test_correct_clusters(global_dtype): # Test to fix incorrect clusters due to dtype change # (non-regression test for issue #10832) X = np.array( [[1, 0, 0, 0], [0, 1, 1, 0], [0, 1, 1, 0], [0, 0, 0, 1]], dtype=global_dtype ) afp = AffinityPropagation(preference=1, affinity="precomputed", random_state=0).fit( X ) expected = np.array([0, 1, 1, 2]) assert_array_equal(afp.labels_, expected) @pytest.mark.parametrize("csr_container", CSR_CONTAINERS) def test_sparse_input_for_predict(csr_container): # Test to make sure sparse inputs are accepted for predict # (non-regression test for issue #20049) af = AffinityPropagation(affinity="euclidean", random_state=42) af.fit(X) labels = af.predict(csr_container((2, 2))) assert_array_equal(labels, (2, 2)) @pytest.mark.parametrize("csr_container", CSR_CONTAINERS) def test_sparse_input_for_fit_predict(csr_container): # Test to make sure sparse inputs are accepted for fit_predict # (non-regression test for issue #20049) af = AffinityPropagation(affinity="euclidean", random_state=42) rng = np.random.RandomState(42) X = csr_container(rng.randint(0, 2, size=(5, 5))) labels = af.fit_predict(X) assert_array_equal(labels, (0, 1, 1, 2, 3)) def test_affinity_propagation_equal_points(): """Make sure we do not assign multiple clusters to equal points. Non-regression test for: https://github.com/scikit-learn/scikit-learn/pull/20043 """ X = np.zeros((8, 1)) af = AffinityPropagation(affinity="euclidean", damping=0.5, random_state=42).fit(X) assert np.all(af.labels_ == 0)