#!/usr/bin/env python # -*- coding: utf-8 -*- # # Author: Tobias B """ Automated tests for checking the EnsembleLda Class """ import os import logging import unittest import numpy as np from copy import deepcopy import pytest from gensim.models import EnsembleLda, LdaMulticore, LdaModel from gensim.test.utils import datapath, get_tmpfile, common_corpus, common_dictionary NUM_TOPICS = 2 NUM_MODELS = 4 PASSES = 50 RANDOM_STATE = 0 # windows tests fail due to the required assertion precision being too high RTOL = 1e-04 if os.name == 'nt' else 1e-05 class TestEnsembleLda(unittest.TestCase): def get_elda(self): return EnsembleLda( corpus=common_corpus, id2word=common_dictionary, num_topics=NUM_TOPICS, passes=PASSES, num_models=NUM_MODELS, random_state=RANDOM_STATE, topic_model_class=LdaModel, ) def get_elda_mem_unfriendly(self): return EnsembleLda( corpus=common_corpus, id2word=common_dictionary, num_topics=NUM_TOPICS, passes=PASSES, num_models=NUM_MODELS, random_state=RANDOM_STATE, memory_friendly_ttda=False, topic_model_class=LdaModel, ) def assert_ttda_is_valid(self, elda): """Check that ttda has one or more topic and that term probabilities add to one.""" assert len(elda.ttda) > 0 sum_over_terms = elda.ttda.sum(axis=1) expected_sum_over_terms = np.ones(len(elda.ttda)).astype(np.float32) np.testing.assert_allclose(sum_over_terms, expected_sum_over_terms, rtol=1e-04) def test_elda(self): elda = self.get_elda() # given that the random_state doesn't change, it should # always be 2 detected topics in this setup. assert elda.stable_topics.shape[1] == len(common_dictionary) assert len(elda.ttda) == NUM_MODELS * NUM_TOPICS self.assert_ttda_is_valid(elda) def test_backwards_compatibility_with_persisted_model(self): elda = self.get_elda() # compare with a pre-trained reference model loaded_elda = EnsembleLda.load(datapath('ensemblelda')) np.testing.assert_allclose(elda.ttda, loaded_elda.ttda, rtol=RTOL) atol = loaded_elda.asymmetric_distance_matrix.max() * 1e-05 np.testing.assert_allclose( elda.asymmetric_distance_matrix, loaded_elda.asymmetric_distance_matrix, atol=atol, ) def test_recluster(self): # the following test is quite specific to the current implementation and not part of any api, # but it makes improving those sections of the code easier as long as sorted_clusters and the # cluster_model results are supposed to stay the same. Potentially this test will deprecate. elda = EnsembleLda.load(datapath('ensemblelda')) loaded_cluster_model_results = deepcopy(elda.cluster_model.results) loaded_valid_clusters = deepcopy(elda.valid_clusters) loaded_stable_topics = deepcopy(elda.get_topics()) # continue training with the distance matrix of the pretrained reference and see if # the generated clusters match. elda.asymmetric_distance_matrix_outdated = True elda.recluster() self.assert_clustering_results_equal(elda.cluster_model.results, loaded_cluster_model_results) assert elda.valid_clusters == loaded_valid_clusters np.testing.assert_allclose(elda.get_topics(), loaded_stable_topics, rtol=RTOL) def test_recluster_does_nothing_when_stable_topics_already_found(self): elda = self.get_elda() # reclustering shouldn't change anything without # added models or different parameters elda.recluster() assert elda.stable_topics.shape[1] == len(common_dictionary) assert len(elda.ttda) == NUM_MODELS * NUM_TOPICS self.assert_ttda_is_valid(elda) def test_not_trained_given_zero_passes(self): elda = EnsembleLda( corpus=common_corpus, id2word=common_dictionary, num_topics=NUM_TOPICS, passes=0, num_models=NUM_MODELS, random_state=RANDOM_STATE, ) assert len(elda.ttda) == 0 def test_not_trained_given_no_corpus(self): elda = EnsembleLda( id2word=common_dictionary, num_topics=NUM_TOPICS, passes=PASSES, num_models=NUM_MODELS, random_state=RANDOM_STATE, ) assert len(elda.ttda) == 0 def test_not_trained_given_zero_iterations(self): elda = EnsembleLda( corpus=common_corpus, id2word=common_dictionary, num_topics=NUM_TOPICS, iterations=0, num_models=NUM_MODELS, random_state=RANDOM_STATE, ) assert len(elda.ttda) == 0 def test_not_trained_given_zero_models(self): elda = EnsembleLda( corpus=common_corpus, id2word=common_dictionary, num_topics=NUM_TOPICS, passes=PASSES, num_models=0, random_state=RANDOM_STATE ) assert len(elda.ttda) == 0 def test_mem_unfriendly(self): # elda_mem_unfriendly and self.elda should have topics that are # the same up to floating point variations caused by the two different # implementations elda = self.get_elda() elda_mem_unfriendly = self.get_elda_mem_unfriendly() assert len(elda_mem_unfriendly.tms) == NUM_MODELS np.testing.assert_allclose(elda.ttda, elda_mem_unfriendly.ttda, rtol=RTOL) np.testing.assert_allclose(elda.get_topics(), elda_mem_unfriendly.get_topics(), rtol=RTOL) self.assert_ttda_is_valid(elda_mem_unfriendly) def test_generate_gensim_representation(self): elda = self.get_elda() gensim_model = elda.generate_gensim_representation() topics = gensim_model.get_topics() np.testing.assert_allclose(elda.get_topics(), topics, rtol=RTOL) def assert_clustering_results_equal(self, clustering_results_1, clustering_results_2): """Assert important attributes of the cluster results""" np.testing.assert_array_equal( [element.label for element in clustering_results_1], [element.label for element in clustering_results_2], ) np.testing.assert_array_equal( [element.is_core for element in clustering_results_1], [element.is_core for element in clustering_results_2], ) def test_persisting(self): elda = self.get_elda() elda_mem_unfriendly = self.get_elda_mem_unfriendly() fname = get_tmpfile('gensim_models_ensemblelda') elda.save(fname) loaded_elda = EnsembleLda.load(fname) # storing the ensemble without memory_friendy_ttda elda_mem_unfriendly.save(fname) loaded_elda_mem_unfriendly = EnsembleLda.load(fname) # topic_model_class will be lazy loaded and should be None first assert loaded_elda.topic_model_class is None # was it stored and loaded correctly? # memory friendly. loaded_elda_representation = loaded_elda.generate_gensim_representation() # generating the representation also lazily loads the topic_model_class assert loaded_elda.topic_model_class == LdaModel topics = loaded_elda_representation.get_topics() ttda = loaded_elda.ttda amatrix = loaded_elda.asymmetric_distance_matrix np.testing.assert_allclose(elda.get_topics(), topics, rtol=RTOL) np.testing.assert_allclose(elda.ttda, ttda, rtol=RTOL) np.testing.assert_allclose(elda.asymmetric_distance_matrix, amatrix, rtol=RTOL) expected_clustering_results = elda.cluster_model.results loaded_clustering_results = loaded_elda.cluster_model.results self.assert_clustering_results_equal(expected_clustering_results, loaded_clustering_results) # memory unfriendly loaded_elda_mem_unfriendly_representation = loaded_elda_mem_unfriendly.generate_gensim_representation() topics = loaded_elda_mem_unfriendly_representation.get_topics() np.testing.assert_allclose(elda.get_topics(), topics, rtol=RTOL) def test_multiprocessing(self): # same configuration random_state = RANDOM_STATE # use 3 processes for the ensemble and the distance, # so that the 4 models and 8 topics cannot be distributed # to each worker evenly workers = 3 # memory friendly. contains List of topic word distributions elda = self.get_elda() elda_multiprocessing = EnsembleLda( corpus=common_corpus, id2word=common_dictionary, topic_model_class=LdaModel, num_topics=NUM_TOPICS, passes=PASSES, num_models=NUM_MODELS, random_state=random_state, ensemble_workers=workers, distance_workers=workers, ) # memory unfriendly. contains List of models elda_mem_unfriendly = self.get_elda_mem_unfriendly() elda_multiprocessing_mem_unfriendly = EnsembleLda( corpus=common_corpus, id2word=common_dictionary, topic_model_class=LdaModel, num_topics=NUM_TOPICS, passes=PASSES, num_models=NUM_MODELS, random_state=random_state, ensemble_workers=workers, distance_workers=workers, memory_friendly_ttda=False, ) np.testing.assert_allclose( elda.get_topics(), elda_multiprocessing.get_topics(), rtol=RTOL ) np.testing.assert_allclose( elda_mem_unfriendly.get_topics(), elda_multiprocessing_mem_unfriendly.get_topics(), rtol=RTOL ) def test_add_models_to_empty(self): elda = self.get_elda() ensemble = EnsembleLda(id2word=common_dictionary, num_models=0) ensemble.add_model(elda.ttda[0:1]) ensemble.add_model(elda.ttda[1:]) ensemble.recluster() np.testing.assert_allclose(ensemble.get_topics(), elda.get_topics(), rtol=RTOL) # persisting an ensemble that is entirely built from existing ttdas fname = get_tmpfile('gensim_models_ensemblelda') ensemble.save(fname) loaded_ensemble = EnsembleLda.load(fname) np.testing.assert_allclose(loaded_ensemble.get_topics(), elda.get_topics(), rtol=RTOL) self.test_inference(loaded_ensemble) def test_add_models(self): # make sure countings and sizes after adding are correct # create new models and add other models to them. # there are a ton of configurations for the first parameter possible, # try them all # quickly train something that can be used for counting results num_new_models = 3 num_new_topics = 3 # 1. memory friendly base_elda = self.get_elda() cumulative_elda = EnsembleLda( corpus=common_corpus, id2word=common_dictionary, num_topics=num_new_topics, passes=1, num_models=num_new_models, iterations=1, random_state=RANDOM_STATE, topic_model_class=LdaMulticore, workers=3, ensemble_workers=2, ) # 1.1 ttda num_topics_before_add_model = len(cumulative_elda.ttda) num_models_before_add_model = cumulative_elda.num_models cumulative_elda.add_model(base_elda.ttda) assert len(cumulative_elda.ttda) == num_topics_before_add_model + len(base_elda.ttda) assert cumulative_elda.num_models == num_models_before_add_model + 1 # defaults to 1 for one ttda matrix # 1.2 an ensemble num_topics_before_add_model = len(cumulative_elda.ttda) num_models_before_add_model = cumulative_elda.num_models cumulative_elda.add_model(base_elda, 5) assert len(cumulative_elda.ttda) == num_topics_before_add_model + len(base_elda.ttda) assert cumulative_elda.num_models == num_models_before_add_model + 5 # 1.3 a list of ensembles num_topics_before_add_model = len(cumulative_elda.ttda) num_models_before_add_model = cumulative_elda.num_models # it should be totally legit to add a memory unfriendly object to a memory friendly one base_elda_mem_unfriendly = self.get_elda_mem_unfriendly() cumulative_elda.add_model([base_elda, base_elda_mem_unfriendly]) assert len(cumulative_elda.ttda) == num_topics_before_add_model + 2 * len(base_elda.ttda) assert cumulative_elda.num_models == num_models_before_add_model + 2 * NUM_MODELS # 1.4 a single gensim model model = base_elda.classic_model_representation num_topics_before_add_model = len(cumulative_elda.ttda) num_models_before_add_model = cumulative_elda.num_models cumulative_elda.add_model(model) assert len(cumulative_elda.ttda) == num_topics_before_add_model + len(model.get_topics()) assert cumulative_elda.num_models == num_models_before_add_model + 1 # 1.5 a list gensim models num_topics_before_add_model = len(cumulative_elda.ttda) num_models_before_add_model = cumulative_elda.num_models cumulative_elda.add_model([model, model]) assert len(cumulative_elda.ttda) == num_topics_before_add_model + 2 * len(model.get_topics()) assert cumulative_elda.num_models == num_models_before_add_model + 2 self.assert_ttda_is_valid(cumulative_elda) # 2. memory unfriendly elda_mem_unfriendly = EnsembleLda( corpus=common_corpus, id2word=common_dictionary, num_topics=num_new_topics, passes=1, num_models=num_new_models, iterations=1, random_state=RANDOM_STATE, topic_model_class=LdaMulticore, workers=3, ensemble_workers=2, memory_friendly_ttda=False, ) # 2.1 a single ensemble num_topics_before_add_model = len(elda_mem_unfriendly.tms) num_models_before_add_model = elda_mem_unfriendly.num_models elda_mem_unfriendly.add_model(base_elda_mem_unfriendly) assert len(elda_mem_unfriendly.tms) == num_topics_before_add_model + NUM_MODELS assert elda_mem_unfriendly.num_models == num_models_before_add_model + NUM_MODELS # 2.2 a list of ensembles num_topics_before_add_model = len(elda_mem_unfriendly.tms) num_models_before_add_model = elda_mem_unfriendly.num_models elda_mem_unfriendly.add_model([base_elda_mem_unfriendly, base_elda_mem_unfriendly]) assert len(elda_mem_unfriendly.tms) == num_topics_before_add_model + 2 * NUM_MODELS assert elda_mem_unfriendly.num_models == num_models_before_add_model + 2 * NUM_MODELS # 2.3 a single gensim model num_topics_before_add_model = len(elda_mem_unfriendly.tms) num_models_before_add_model = elda_mem_unfriendly.num_models elda_mem_unfriendly.add_model(base_elda_mem_unfriendly.tms[0]) assert len(elda_mem_unfriendly.tms) == num_topics_before_add_model + 1 assert elda_mem_unfriendly.num_models == num_models_before_add_model + 1 # 2.4 a list of gensim models num_topics_before_add_model = len(elda_mem_unfriendly.tms) num_models_before_add_model = elda_mem_unfriendly.num_models elda_mem_unfriendly.add_model(base_elda_mem_unfriendly.tms) assert len(elda_mem_unfriendly.tms) == num_topics_before_add_model + NUM_MODELS assert elda_mem_unfriendly.num_models == num_models_before_add_model + NUM_MODELS # 2.5 topic term distributions should throw errors, because the # actual models are needed for the memory unfriendly ensemble num_topics_before_add_model = len(elda_mem_unfriendly.tms) num_models_before_add_model = elda_mem_unfriendly.num_models with pytest.raises(ValueError): elda_mem_unfriendly.add_model(base_elda_mem_unfriendly.tms[0].get_topics()) # remains unchanged assert len(elda_mem_unfriendly.tms) == num_topics_before_add_model assert elda_mem_unfriendly.num_models == num_models_before_add_model assert elda_mem_unfriendly.num_models == len(elda_mem_unfriendly.tms) self.assert_ttda_is_valid(elda_mem_unfriendly) def test_add_and_recluster(self): # See if after adding a model, the model still makes sense num_new_models = 3 num_new_topics = 3 random_state = 1 # train models two sets of models (mem friendly and unfriendly) elda_1 = EnsembleLda( corpus=common_corpus, id2word=common_dictionary, num_topics=num_new_topics, passes=10, num_models=num_new_models, iterations=30, random_state=random_state, topic_model_class='lda', distance_workers=4, ) elda_mem_unfriendly_1 = EnsembleLda( corpus=common_corpus, id2word=common_dictionary, num_topics=num_new_topics, passes=10, num_models=num_new_models, iterations=30, random_state=random_state, topic_model_class=LdaModel, distance_workers=4, memory_friendly_ttda=False, ) elda_2 = self.get_elda() elda_mem_unfriendly_2 = self.get_elda_mem_unfriendly() assert elda_1.random_state != elda_2.random_state assert elda_mem_unfriendly_1.random_state != elda_mem_unfriendly_2.random_state # both should be similar np.testing.assert_allclose(elda_1.ttda, elda_mem_unfriendly_1.ttda, rtol=RTOL) np.testing.assert_allclose(elda_1.get_topics(), elda_mem_unfriendly_1.get_topics(), rtol=RTOL) # and every next step applied to both should result in similar results # 1. adding to ttda and tms elda_1.add_model(elda_2) elda_mem_unfriendly_1.add_model(elda_mem_unfriendly_2) np.testing.assert_allclose(elda_1.ttda, elda_mem_unfriendly_1.ttda, rtol=RTOL) assert len(elda_1.ttda) == len(elda_2.ttda) + num_new_models * num_new_topics assert len(elda_mem_unfriendly_1.ttda) == len(elda_mem_unfriendly_2.ttda) + num_new_models * num_new_topics assert len(elda_mem_unfriendly_1.tms) == NUM_MODELS + num_new_models self.assert_ttda_is_valid(elda_1) self.assert_ttda_is_valid(elda_mem_unfriendly_1) # 2. distance matrix elda_1._generate_asymmetric_distance_matrix() elda_mem_unfriendly_1._generate_asymmetric_distance_matrix() np.testing.assert_allclose( elda_1.asymmetric_distance_matrix, elda_mem_unfriendly_1.asymmetric_distance_matrix, ) # 3. CBDBSCAN results elda_1._generate_topic_clusters() elda_mem_unfriendly_1._generate_topic_clusters() clustering_results = elda_1.cluster_model.results mem_unfriendly_clustering_results = elda_mem_unfriendly_1.cluster_model.results self.assert_clustering_results_equal(clustering_results, mem_unfriendly_clustering_results) # 4. finally, the stable topics elda_1._generate_stable_topics() elda_mem_unfriendly_1._generate_stable_topics() np.testing.assert_allclose( elda_1.get_topics(), elda_mem_unfriendly_1.get_topics(), ) elda_1.generate_gensim_representation() elda_mem_unfriendly_1.generate_gensim_representation() # same random state, hence topics should be still similar np.testing.assert_allclose(elda_1.get_topics(), elda_mem_unfriendly_1.get_topics(), rtol=RTOL) def test_inference(self, elda=None): if elda is None: elda = self.get_elda() # get the most likely token id from topic 0 max_id = np.argmax(elda.get_topics()[0, :]) assert elda.classic_model_representation.iterations > 0 # topic 0 should be dominant in the inference. # the difference between the probabilities should be significant and larger than 0.3 inferred = elda[[(max_id, 1)]] assert inferred[0][1] - 0.3 > inferred[1][1] if __name__ == '__main__': logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.WARN) unittest.main()