#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (C) 2016 Radim Rehurek # Copyright (C) 2016 Olavur Mortensen # Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html """ Automated tests for the author-topic model (AuthorTopicModel class). These tests are based on the unit tests of LDA; the classes are quite similar, and the tests needed are thus quite similar. """ import logging import unittest import numbers from os import remove import numpy as np from gensim.corpora import mmcorpus, Dictionary from gensim.models import atmodel from gensim import matutils from gensim.test import basetmtests from gensim.test.utils import (datapath, get_tmpfile, common_texts, common_dictionary as dictionary, common_corpus as corpus) from gensim.matutils import jensen_shannon # TODO: # Test that computing the bound on new unseen documents works as expected (this is somewhat different # in the author-topic model than in LDA). # Perhaps test that the bound increases, in general (i.e. in several of the tests below where it makes # sense. This is not tested in LDA either. Tests can also be made to check that automatic prior learning # increases the bound. # Test that models are compatiple across versions, as done in LdaModel. # Assign some authors randomly to the documents above. author2doc = { 'john': [0, 1, 2, 3, 4, 5, 6], 'jane': [2, 3, 4, 5, 6, 7, 8], 'jack': [0, 2, 4, 6, 8], 'jill': [1, 3, 5, 7] } doc2author = { 0: ['john', 'jack'], 1: ['john', 'jill'], 2: ['john', 'jane', 'jack'], 3: ['john', 'jane', 'jill'], 4: ['john', 'jane', 'jack'], 5: ['john', 'jane', 'jill'], 6: ['john', 'jane', 'jack'], 7: ['jane', 'jill'], 8: ['jane', 'jack'] } # More data with new and old authors (to test update method). # Although the text is just a subset of the previous, the model # just sees it as completely new data. texts_new = common_texts[0:3] author2doc_new = {'jill': [0], 'bob': [0, 1], 'sally': [1, 2]} dictionary_new = Dictionary(texts_new) corpus_new = [dictionary_new.doc2bow(text) for text in texts_new] class TestAuthorTopicModel(unittest.TestCase, basetmtests.TestBaseTopicModel): def setUp(self): self.corpus = mmcorpus.MmCorpus(datapath('testcorpus.mm')) self.class_ = atmodel.AuthorTopicModel self.model = self.class_(corpus, id2word=dictionary, author2doc=author2doc, num_topics=2, passes=100) def test_transform(self): passed = False # sometimes, training gets stuck at a local minimum # in that case try re-training the model from scratch, hoping for a # better random initialization for i in range(25): # restart at most 5 times # create the transformation model model = self.class_(id2word=dictionary, num_topics=2, passes=100, random_state=0) model.update(corpus, author2doc) jill_topics = model.get_author_topics('jill') # NOTE: this test may easily fail if the author-topic model is altered in any way. The model's # output is sensitive to a lot of things, like the scheduling of the updates, or like the # author2id (because the random initialization changes when author2id changes). If it does # fail, simply be aware of whether we broke something, or if it just naturally changed the # output of the model slightly. vec = matutils.sparse2full(jill_topics, 2) # convert to dense vector, for easier equality tests expected = [0.91, 0.08] # must contain the same values, up to re-ordering passed = np.allclose(sorted(vec), sorted(expected), atol=1e-1) if passed: break logging.warning( "Author-topic model failed to converge on attempt %i (got %s, expected %s)", i, sorted(vec), sorted(expected) ) self.assertTrue(passed) def test_basic(self): # Check that training the model produces a positive topic vector for some author # Otherwise, many of the other tests are invalid. model = self.class_(corpus, author2doc=author2doc, id2word=dictionary, num_topics=2) jill_topics = model.get_author_topics('jill') jill_topics = matutils.sparse2full(jill_topics, model.num_topics) self.assertTrue(all(jill_topics > 0)) def test_empty_document(self): local_texts = common_texts + [['only_occurs_once_in_corpus_and_alone_in_doc']] dictionary = Dictionary(local_texts) dictionary.filter_extremes(no_below=2) corpus = [dictionary.doc2bow(text) for text in local_texts] a2d = author2doc.copy() a2d['joaquin'] = [len(local_texts) - 1] self.class_(corpus, author2doc=a2d, id2word=dictionary, num_topics=2) def test_author2doc_missing(self): # Check that the results are the same if author2doc is constructed automatically from doc2author. model = self.class_( corpus, author2doc=author2doc, doc2author=doc2author, id2word=dictionary, num_topics=2, random_state=0 ) model2 = self.class_( corpus, doc2author=doc2author, id2word=dictionary, num_topics=2, random_state=0 ) # Compare Jill's topics before in both models. jill_topics = model.get_author_topics('jill') jill_topics2 = model2.get_author_topics('jill') jill_topics = matutils.sparse2full(jill_topics, model.num_topics) jill_topics2 = matutils.sparse2full(jill_topics2, model.num_topics) self.assertTrue(np.allclose(jill_topics, jill_topics2)) def test_doc2author_missing(self): # Check that the results are the same if doc2author is constructed automatically from author2doc. model = self.class_( corpus, author2doc=author2doc, doc2author=doc2author, id2word=dictionary, num_topics=2, random_state=0 ) model2 = self.class_( corpus, author2doc=author2doc, id2word=dictionary, num_topics=2, random_state=0 ) # Compare Jill's topics before in both models. jill_topics = model.get_author_topics('jill') jill_topics2 = model2.get_author_topics('jill') jill_topics = matutils.sparse2full(jill_topics, model.num_topics) jill_topics2 = matutils.sparse2full(jill_topics2, model.num_topics) self.assertTrue(np.allclose(jill_topics, jill_topics2)) def test_update(self): # Check that calling update after the model already has been trained works. model = self.class_(corpus, author2doc=author2doc, id2word=dictionary, num_topics=2) jill_topics = model.get_author_topics('jill') jill_topics = matutils.sparse2full(jill_topics, model.num_topics) model.update() jill_topics2 = model.get_author_topics('jill') jill_topics2 = matutils.sparse2full(jill_topics2, model.num_topics) # Did we learn something? self.assertFalse(all(np.equal(jill_topics, jill_topics2))) def test_update_new_data_old_author(self): # Check that calling update with new documents and/or authors after the model already has # been trained works. # Test an author that already existed in the old dataset. model = self.class_(corpus, author2doc=author2doc, id2word=dictionary, num_topics=2) jill_topics = model.get_author_topics('jill') jill_topics = matutils.sparse2full(jill_topics, model.num_topics) model.update(corpus_new, author2doc_new) jill_topics2 = model.get_author_topics('jill') jill_topics2 = matutils.sparse2full(jill_topics2, model.num_topics) # Did we learn more about Jill? self.assertFalse(all(np.equal(jill_topics, jill_topics2))) def test_update_new_data_new_author(self): # Check that calling update with new documents and/or authors after the model already has # been trained works. # Test a new author, that didn't exist in the old dataset. model = self.class_(corpus, author2doc=author2doc, id2word=dictionary, num_topics=2) model.update(corpus_new, author2doc_new) # Did we learn something about Sally? sally_topics = model.get_author_topics('sally') sally_topics = matutils.sparse2full(sally_topics, model.num_topics) self.assertTrue(all(sally_topics > 0)) def test_serialized(self): # Test the model using serialized corpora. Basic tests, plus test of update functionality. model = self.class_( self.corpus, author2doc=author2doc, id2word=dictionary, num_topics=2, serialized=True, serialization_path=datapath('testcorpus_serialization.mm') ) jill_topics = model.get_author_topics('jill') jill_topics = matutils.sparse2full(jill_topics, model.num_topics) self.assertTrue(all(jill_topics > 0)) model.update() jill_topics2 = model.get_author_topics('jill') jill_topics2 = matutils.sparse2full(jill_topics2, model.num_topics) # Did we learn more about Jill? self.assertFalse(all(np.equal(jill_topics, jill_topics2))) model.update(corpus_new, author2doc_new) # Did we learn something about Sally? sally_topics = model.get_author_topics('sally') sally_topics = matutils.sparse2full(sally_topics, model.num_topics) self.assertTrue(all(sally_topics > 0)) # Delete the MmCorpus used for serialization inside the author-topic model. remove(datapath('testcorpus_serialization.mm')) def test_transform_serialized(self): # Same as testTransform, using serialized corpora. passed = False # sometimes, training gets stuck at a local minimum # in that case try re-training the model from scratch, hoping for a # better random initialization for i in range(25): # restart at most 5 times # create the transformation model model = self.class_( id2word=dictionary, num_topics=2, passes=100, random_state=0, serialized=True, serialization_path=datapath('testcorpus_serialization.mm') ) model.update(self.corpus, author2doc) jill_topics = model.get_author_topics('jill') # NOTE: this test may easily fail if the author-topic model is altered in any way. The model's # output is sensitive to a lot of things, like the scheduling of the updates, or like the # author2id (because the random initialization changes when author2id changes). If it does # fail, simply be aware of whether we broke something, or if it just naturally changed the # output of the model slightly. vec = matutils.sparse2full(jill_topics, 2) # convert to dense vector, for easier equality tests expected = [0.91, 0.08] # must contain the same values, up to re-ordering passed = np.allclose(sorted(vec), sorted(expected), atol=1e-1) # Delete the MmCorpus used for serialization inside the author-topic model. remove(datapath('testcorpus_serialization.mm')) if passed: break logging.warning( "Author-topic model failed to converge on attempt %i (got %s, expected %s)", i, sorted(vec), sorted(expected) ) self.assertTrue(passed) def test_alpha_auto(self): model1 = self.class_( corpus, author2doc=author2doc, id2word=dictionary, alpha='symmetric', passes=10, num_topics=2 ) modelauto = self.class_( corpus, author2doc=author2doc, id2word=dictionary, alpha='auto', passes=10, num_topics=2 ) # did we learn something? self.assertFalse(all(np.equal(model1.alpha, modelauto.alpha))) def test_alpha(self): kwargs = dict( author2doc=author2doc, id2word=dictionary, num_topics=2, alpha=None ) expected_shape = (2,) # should not raise anything self.class_(**kwargs) kwargs['alpha'] = 'symmetric' model = self.class_(**kwargs) self.assertEqual(model.alpha.shape, expected_shape) self.assertTrue(all(model.alpha == np.array([0.5, 0.5]))) kwargs['alpha'] = 'asymmetric' model = self.class_(**kwargs) self.assertEqual(model.alpha.shape, expected_shape) self.assertTrue(np.allclose(model.alpha, [0.630602, 0.369398])) kwargs['alpha'] = 0.3 model = self.class_(**kwargs) self.assertEqual(model.alpha.shape, expected_shape) self.assertTrue(all(model.alpha == np.array([0.3, 0.3]))) kwargs['alpha'] = 3 model = self.class_(**kwargs) self.assertEqual(model.alpha.shape, expected_shape) self.assertTrue(all(model.alpha == np.array([3, 3]))) kwargs['alpha'] = [0.3, 0.3] model = self.class_(**kwargs) self.assertEqual(model.alpha.shape, expected_shape) self.assertTrue(all(model.alpha == np.array([0.3, 0.3]))) kwargs['alpha'] = np.array([0.3, 0.3]) model = self.class_(**kwargs) self.assertEqual(model.alpha.shape, expected_shape) self.assertTrue(all(model.alpha == np.array([0.3, 0.3]))) # all should raise an exception for being wrong shape kwargs['alpha'] = [0.3, 0.3, 0.3] self.assertRaises(AssertionError, self.class_, **kwargs) kwargs['alpha'] = [[0.3], [0.3]] self.assertRaises(AssertionError, self.class_, **kwargs) kwargs['alpha'] = [0.3] self.assertRaises(AssertionError, self.class_, **kwargs) kwargs['alpha'] = "gensim is cool" self.assertRaises(ValueError, self.class_, **kwargs) def test_eta_auto(self): model1 = self.class_( corpus, author2doc=author2doc, id2word=dictionary, eta='symmetric', passes=10, num_topics=2 ) modelauto = self.class_( corpus, author2doc=author2doc, id2word=dictionary, eta='auto', passes=10, num_topics=2 ) # did we learn something? self.assertFalse(all(np.equal(model1.eta, modelauto.eta))) def test_eta(self): kwargs = dict( author2doc=author2doc, id2word=dictionary, num_topics=2, eta=None ) num_terms = len(dictionary) expected_shape = (num_terms,) # should not raise anything model = self.class_(**kwargs) self.assertEqual(model.eta.shape, expected_shape) self.assertTrue(all(model.eta == np.array([0.5] * num_terms))) kwargs['eta'] = 'symmetric' model = self.class_(**kwargs) self.assertEqual(model.eta.shape, expected_shape) self.assertTrue(all(model.eta == np.array([0.5] * num_terms))) kwargs['eta'] = 0.3 model = self.class_(**kwargs) self.assertEqual(model.eta.shape, expected_shape) self.assertTrue(all(model.eta == np.array([0.3] * num_terms))) kwargs['eta'] = 3 model = self.class_(**kwargs) self.assertEqual(model.eta.shape, expected_shape) self.assertTrue(all(model.eta == np.array([3] * num_terms))) kwargs['eta'] = [0.3] * num_terms model = self.class_(**kwargs) self.assertEqual(model.eta.shape, expected_shape) self.assertTrue(all(model.eta == np.array([0.3] * num_terms))) kwargs['eta'] = np.array([0.3] * num_terms) model = self.class_(**kwargs) self.assertEqual(model.eta.shape, expected_shape) self.assertTrue(all(model.eta == np.array([0.3] * num_terms))) # should be ok with num_topics x num_terms testeta = np.array([[0.5] * len(dictionary)] * 2) kwargs['eta'] = testeta self.class_(**kwargs) # all should raise an exception for being wrong shape kwargs['eta'] = testeta.reshape(tuple(reversed(testeta.shape))) self.assertRaises(AssertionError, self.class_, **kwargs) kwargs['eta'] = [0.3] self.assertRaises(AssertionError, self.class_, **kwargs) kwargs['eta'] = [0.3] * (num_terms + 1) self.assertRaises(AssertionError, self.class_, **kwargs) kwargs['eta'] = "gensim is cool" self.assertRaises(ValueError, self.class_, **kwargs) kwargs['eta'] = "asymmetric" self.assertRaises(ValueError, self.class_, **kwargs) def test_top_topics(self): top_topics = self.model.top_topics(corpus) for topic, score in top_topics: self.assertTrue(isinstance(topic, list)) self.assertTrue(isinstance(score, float)) for v, k in topic: self.assertTrue(isinstance(k, str)) self.assertTrue(isinstance(v, float)) def test_get_topic_terms(self): topic_terms = self.model.get_topic_terms(1) for k, v in topic_terms: self.assertTrue(isinstance(k, numbers.Integral)) self.assertTrue(isinstance(v, float)) def test_get_author_topics(self): model = self.class_( corpus, author2doc=author2doc, id2word=dictionary, num_topics=2, passes=100, random_state=np.random.seed(0) ) author_topics = [] for a in model.id2author.values(): author_topics.append(model.get_author_topics(a)) for topic in author_topics: self.assertTrue(isinstance(topic, list)) for k, v in topic: self.assertTrue(isinstance(k, int)) self.assertTrue(isinstance(v, float)) def test_term_topics(self): model = self.class_( corpus, author2doc=author2doc, id2word=dictionary, num_topics=2, passes=100, random_state=np.random.seed(0) ) # check with word_type result = model.get_term_topics(2) for topic_no, probability in result: self.assertTrue(isinstance(topic_no, int)) self.assertTrue(isinstance(probability, float)) # if user has entered word instead, check with word result = model.get_term_topics(str(model.id2word[2])) for topic_no, probability in result: self.assertTrue(isinstance(topic_no, int)) self.assertTrue(isinstance(probability, float)) def test_new_author_topics(self): model = self.class_( corpus, author2doc=author2doc, id2word=dictionary, num_topics=2, passes=100, random_state=np.random.seed(0) ) author2doc_newauthor = {} author2doc_newauthor["test"] = [0, 1] model.update(corpus=corpus[0:2], author2doc=author2doc_newauthor) # temp save model state vars before get_new_author_topics is called state_gamma_len = len(model.state.gamma) author2doc_len = len(model.author2doc) author2id_len = len(model.author2id) id2author_len = len(model.id2author) doc2author_len = len(model.doc2author) new_author_topics = model.get_new_author_topics(corpus=corpus[0:2]) # sanity check for k, v in new_author_topics: self.assertTrue(isinstance(k, int)) self.assertTrue(isinstance(v, float)) # make sure topics are similar enough similarity = 1 / (1 + jensen_shannon(model["test"], new_author_topics)) self.assertTrue(similarity >= 0.9) # produce an error to test if rollback occurs with self.assertRaises(TypeError): model.get_new_author_topics(corpus=corpus[0]) # assure rollback was successful and the model state is as before self.assertEqual(state_gamma_len, len(model.state.gamma)) self.assertEqual(author2doc_len, len(model.author2doc)) self.assertEqual(author2id_len, len(model.author2id)) self.assertEqual(id2author_len, len(model.id2author)) self.assertEqual(doc2author_len, len(model.doc2author)) def test_passes(self): # long message includes the original error message with a custom one self.longMessage = True # construct what we expect when passes aren't involved test_rhots = [] model = self.class_(id2word=dictionary, chunksize=1, num_topics=2) def final_rhot(model): return pow(model.offset + (1 * model.num_updates) / model.chunksize, -model.decay) # generate 5 updates to test rhot on for _ in range(5): model.update(corpus, author2doc) test_rhots.append(final_rhot(model)) for passes in [1, 5, 10, 50, 100]: model = self.class_(id2word=dictionary, chunksize=1, num_topics=2, passes=passes) self.assertEqual(final_rhot(model), 1.0) # make sure the rhot matches the test after each update for test_rhot in test_rhots: model.update(corpus, author2doc) msg = "{}, {}, {}".format(passes, model.num_updates, model.state.numdocs) self.assertAlmostEqual(final_rhot(model), test_rhot, msg=msg) self.assertEqual(model.state.numdocs, len(corpus) * len(test_rhots)) self.assertEqual(model.num_updates, len(corpus) * len(test_rhots)) def test_persistence(self): fname = get_tmpfile('gensim_models_atmodel.tst') model = self.model model.save(fname) model2 = self.class_.load(fname) self.assertEqual(model.num_topics, model2.num_topics) self.assertTrue(np.allclose(model.expElogbeta, model2.expElogbeta)) self.assertTrue(np.allclose(model.state.gamma, model2.state.gamma)) def test_persistence_ignore(self): fname = get_tmpfile('gensim_models_atmodel_testPersistenceIgnore.tst') model = atmodel.AuthorTopicModel(corpus, author2doc=author2doc, num_topics=2) model.save(fname, ignore='id2word') model2 = atmodel.AuthorTopicModel.load(fname) self.assertTrue(model2.id2word is None) model.save(fname, ignore=['id2word']) model2 = atmodel.AuthorTopicModel.load(fname) self.assertTrue(model2.id2word is None) def test_persistence_compressed(self): fname = get_tmpfile('gensim_models_atmodel.tst.gz') model = self.model model.save(fname) model2 = self.class_.load(fname, mmap=None) self.assertEqual(model.num_topics, model2.num_topics) self.assertTrue(np.allclose(model.expElogbeta, model2.expElogbeta)) # Compare Jill's topics before and after save/load. jill_topics = model.get_author_topics('jill') jill_topics2 = model2.get_author_topics('jill') jill_topics = matutils.sparse2full(jill_topics, model.num_topics) jill_topics2 = matutils.sparse2full(jill_topics2, model.num_topics) self.assertTrue(np.allclose(jill_topics, jill_topics2)) def test_large_mmap(self): fname = get_tmpfile('gensim_models_atmodel.tst') model = self.model # simulate storing large arrays separately model.save(fname, sep_limit=0) # test loading the large model arrays with mmap model2 = self.class_.load(fname, mmap='r') self.assertEqual(model.num_topics, model2.num_topics) self.assertTrue(isinstance(model2.expElogbeta, np.memmap)) self.assertTrue(np.allclose(model.expElogbeta, model2.expElogbeta)) # Compare Jill's topics before and after save/load. jill_topics = model.get_author_topics('jill') jill_topics2 = model2.get_author_topics('jill') jill_topics = matutils.sparse2full(jill_topics, model.num_topics) jill_topics2 = matutils.sparse2full(jill_topics2, model.num_topics) self.assertTrue(np.allclose(jill_topics, jill_topics2)) def test_large_mmap_compressed(self): fname = get_tmpfile('gensim_models_atmodel.tst.gz') model = self.model # simulate storing large arrays separately model.save(fname, sep_limit=0) # test loading the large model arrays with mmap self.assertRaises(IOError, self.class_.load, fname, mmap='r') def test_dtype_backward_compatibility(self): atmodel_3_0_1_fname = datapath('atmodel_3_0_1_model') expected_topics = [(0, 0.068200842977296727), (1, 0.93179915702270333)] # save model to use in test # self.model.save(atmodel_3_0_1_fname) # load a model saved using a 3.0.1 version of Gensim model = self.class_.load(atmodel_3_0_1_fname) # and test it on a predefined document topics = model['jane'] self.assertTrue(np.allclose(expected_topics, topics)) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.DEBUG) unittest.main()