#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (C) 2010 Radim Rehurek # Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html """ Automated tests for checking transformation algorithms (the models package). """ import logging import unittest import os import bz2 import sys import numpy as np from testfixtures import log_capture try: from pyemd import emd # noqa:F401 PYEMD_EXT = True except (ImportError, ValueError): PYEMD_EXT = False from gensim import utils from gensim.models import word2vec, keyedvectors from gensim.test.utils import ( datapath, get_tmpfile, temporary_file, common_texts as sentences, LeeCorpus, lee_corpus_list, ) new_sentences = [ ['computer', 'artificial', 'intelligence'], ['artificial', 'trees'], ['human', 'intelligence'], ['artificial', 'graph'], ['intelligence'], ['artificial', 'intelligence', 'system'] ] def _rule(word, count, min_count): if word == "human": return utils.RULE_DISCARD # throw out else: return utils.RULE_DEFAULT # apply default rule, i.e. min_count def load_on_instance(): # Save and load a Word2Vec Model on instance for test tmpf = get_tmpfile('gensim_word2vec.tst') model = word2vec.Word2Vec(sentences, min_count=1) model.save(tmpf) model = word2vec.Word2Vec() # should fail at this point return model.load(tmpf) class TestWord2VecModel(unittest.TestCase): def test_build_vocab_from_freq(self): """Test that the algorithm is able to build vocabulary from given frequency table""" freq_dict = { 'minors': 2, 'graph': 3, 'system': 4, 'trees': 3, 'eps': 2, 'computer': 2, 'survey': 2, 'user': 3, 'human': 2, 'time': 2, 'interface': 2, 'response': 2 } freq_dict_orig = freq_dict.copy() model_hs = word2vec.Word2Vec(vector_size=10, min_count=0, seed=42, hs=1, negative=0) model_neg = word2vec.Word2Vec(vector_size=10, min_count=0, seed=42, hs=0, negative=5) model_hs.build_vocab_from_freq(freq_dict) model_neg.build_vocab_from_freq(freq_dict) self.assertEqual(len(model_hs.wv), 12) self.assertEqual(len(model_neg.wv), 12) for k in freq_dict_orig.keys(): self.assertEqual(model_hs.wv.get_vecattr(k, 'count'), freq_dict_orig[k]) self.assertEqual(model_neg.wv.get_vecattr(k, 'count'), freq_dict_orig[k]) new_freq_dict = { 'computer': 1, 'artificial': 4, 'human': 1, 'graph': 1, 'intelligence': 4, 'system': 1, 'trees': 1 } model_hs.build_vocab_from_freq(new_freq_dict, update=True) model_neg.build_vocab_from_freq(new_freq_dict, update=True) self.assertEqual(model_hs.wv.get_vecattr('graph', 'count'), 4) self.assertEqual(model_hs.wv.get_vecattr('artificial', 'count'), 4) self.assertEqual(len(model_hs.wv), 14) self.assertEqual(len(model_neg.wv), 14) def test_prune_vocab(self): """Test Prune vocab while scanning sentences""" sentences = [ ["graph", "system"], ["graph", "system"], ["system", "eps"], ["graph", "system"] ] model = word2vec.Word2Vec(sentences, vector_size=10, min_count=0, max_vocab_size=2, seed=42, hs=1, negative=0) self.assertEqual(len(model.wv), 2) self.assertEqual(model.wv.get_vecattr('graph', 'count'), 3) self.assertEqual(model.wv.get_vecattr('system', 'count'), 4) sentences = [ ["graph", "system"], ["graph", "system"], ["system", "eps"], ["graph", "system"], ["minors", "survey", "minors", "survey", "minors"] ] model = word2vec.Word2Vec(sentences, vector_size=10, min_count=0, max_vocab_size=2, seed=42, hs=1, negative=0) self.assertEqual(len(model.wv), 3) self.assertEqual(model.wv.get_vecattr('graph', 'count'), 3) self.assertEqual(model.wv.get_vecattr('minors', 'count'), 3) self.assertEqual(model.wv.get_vecattr('system', 'count'), 4) def test_total_word_count(self): model = word2vec.Word2Vec(vector_size=10, min_count=0, seed=42) total_words = model.scan_vocab(sentences)[0] self.assertEqual(total_words, 29) def test_max_final_vocab(self): # Test for less restricting effect of max_final_vocab # max_final_vocab is specified but has no effect model = word2vec.Word2Vec(vector_size=10, max_final_vocab=4, min_count=4, sample=0) model.scan_vocab(sentences) reported_values = model.prepare_vocab() self.assertEqual(reported_values['drop_unique'], 11) self.assertEqual(reported_values['retain_total'], 4) self.assertEqual(reported_values['num_retained_words'], 1) self.assertEqual(model.effective_min_count, 4) # Test for more restricting effect of max_final_vocab # results in setting a min_count more restricting than specified min_count model = word2vec.Word2Vec(vector_size=10, max_final_vocab=4, min_count=2, sample=0) model.scan_vocab(sentences) reported_values = model.prepare_vocab() self.assertEqual(reported_values['drop_unique'], 8) self.assertEqual(reported_values['retain_total'], 13) self.assertEqual(reported_values['num_retained_words'], 4) self.assertEqual(model.effective_min_count, 3) def test_online_learning(self): """Test that the algorithm is able to add new words to the vocabulary and to a trained model when using a sorted vocabulary""" model_hs = word2vec.Word2Vec(sentences, vector_size=10, min_count=0, seed=42, hs=1, negative=0) model_neg = word2vec.Word2Vec(sentences, vector_size=10, min_count=0, seed=42, hs=0, negative=5) self.assertTrue(len(model_hs.wv), 12) self.assertTrue(model_hs.wv.get_vecattr('graph', 'count'), 3) model_hs.build_vocab(new_sentences, update=True) model_neg.build_vocab(new_sentences, update=True) self.assertTrue(model_hs.wv.get_vecattr('graph', 'count'), 4) self.assertTrue(model_hs.wv.get_vecattr('artificial', 'count'), 4) self.assertEqual(len(model_hs.wv), 14) self.assertEqual(len(model_neg.wv), 14) def test_online_learning_after_save(self): """Test that the algorithm is able to add new words to the vocabulary and to a trained model when using a sorted vocabulary""" tmpf = get_tmpfile('gensim_word2vec.tst') model_neg = word2vec.Word2Vec(sentences, vector_size=10, min_count=0, seed=42, hs=0, negative=5) model_neg.save(tmpf) model_neg = word2vec.Word2Vec.load(tmpf) self.assertTrue(len(model_neg.wv), 12) model_neg.build_vocab(new_sentences, update=True) model_neg.train(new_sentences, total_examples=model_neg.corpus_count, epochs=model_neg.epochs) self.assertEqual(len(model_neg.wv), 14) def test_online_learning_from_file(self): """Test that the algorithm is able to add new words to the vocabulary and to a trained model when using a sorted vocabulary""" with temporary_file(get_tmpfile('gensim_word2vec1.tst')) as corpus_file,\ temporary_file(get_tmpfile('gensim_word2vec2.tst')) as new_corpus_file: utils.save_as_line_sentence(sentences, corpus_file) utils.save_as_line_sentence(new_sentences, new_corpus_file) model_hs = word2vec.Word2Vec(corpus_file=corpus_file, vector_size=10, min_count=0, seed=42, hs=1, negative=0) model_neg = word2vec.Word2Vec(corpus_file=corpus_file, vector_size=10, min_count=0, seed=42, hs=0, negative=5) self.assertTrue(len(model_hs.wv), 12) self.assertTrue(model_hs.wv.get_vecattr('graph', 'count'), 3) model_hs.build_vocab(corpus_file=new_corpus_file, update=True) model_hs.train(corpus_file=new_corpus_file, total_words=model_hs.corpus_total_words, epochs=model_hs.epochs) model_neg.build_vocab(corpus_file=new_corpus_file, update=True) model_neg.train( corpus_file=new_corpus_file, total_words=model_hs.corpus_total_words, epochs=model_hs.epochs) self.assertTrue(model_hs.wv.get_vecattr('graph', 'count'), 4) self.assertTrue(model_hs.wv.get_vecattr('artificial', 'count'), 4) self.assertEqual(len(model_hs.wv), 14) self.assertEqual(len(model_neg.wv), 14) def test_online_learning_after_save_from_file(self): """Test that the algorithm is able to add new words to the vocabulary and to a trained model when using a sorted vocabulary""" with temporary_file(get_tmpfile('gensim_word2vec1.tst')) as corpus_file,\ temporary_file(get_tmpfile('gensim_word2vec2.tst')) as new_corpus_file: utils.save_as_line_sentence(sentences, corpus_file) utils.save_as_line_sentence(new_sentences, new_corpus_file) tmpf = get_tmpfile('gensim_word2vec.tst') model_neg = word2vec.Word2Vec(corpus_file=corpus_file, vector_size=10, min_count=0, seed=42, hs=0, negative=5) model_neg.save(tmpf) model_neg = word2vec.Word2Vec.load(tmpf) self.assertTrue(len(model_neg.wv), 12) # Check that training works on the same data after load without calling build_vocab model_neg.train(corpus_file=corpus_file, total_words=model_neg.corpus_total_words, epochs=model_neg.epochs) # Train on new corpus file model_neg.build_vocab(corpus_file=new_corpus_file, update=True) model_neg.train(corpus_file=new_corpus_file, total_words=model_neg.corpus_total_words, epochs=model_neg.epochs) self.assertEqual(len(model_neg.wv), 14) def onlineSanity(self, model, trained_model=False): terro, others = [], [] for line in lee_corpus_list: if 'terrorism' in line: terro.append(line) else: others.append(line) self.assertTrue(all('terrorism' not in line for line in others)) model.build_vocab(others, update=trained_model) model.train(others, total_examples=model.corpus_count, epochs=model.epochs) self.assertFalse('terrorism' in model.wv) model.build_vocab(terro, update=True) self.assertTrue('terrorism' in model.wv) orig0 = np.copy(model.wv.vectors) model.train(terro, total_examples=len(terro), epochs=model.epochs) self.assertFalse(np.allclose(model.wv.vectors, orig0)) sim = model.wv.n_similarity(['war'], ['terrorism']) self.assertLess(0., sim) def test_sg_hs_online(self): """Test skipgram w/ hierarchical softmax""" model = word2vec.Word2Vec(sg=1, window=5, hs=1, negative=0, min_count=3, epochs=10, seed=42, workers=2) self.onlineSanity(model) def test_sg_neg_online(self): """Test skipgram w/ negative sampling""" model = word2vec.Word2Vec(sg=1, window=4, hs=0, negative=15, min_count=3, epochs=10, seed=42, workers=2) self.onlineSanity(model) def test_cbow_hs_online(self): """Test CBOW w/ hierarchical softmax""" model = word2vec.Word2Vec( sg=0, cbow_mean=1, alpha=0.05, window=5, hs=1, negative=0, min_count=3, epochs=20, seed=42, workers=2 ) self.onlineSanity(model) def test_cbow_neg_online(self): """Test CBOW w/ negative sampling""" model = word2vec.Word2Vec( sg=0, cbow_mean=1, alpha=0.05, window=5, hs=0, negative=15, min_count=5, epochs=10, seed=42, workers=2, sample=0 ) self.onlineSanity(model) def test_persistence(self): """Test storing/loading the entire model.""" tmpf = get_tmpfile('gensim_word2vec.tst') model = word2vec.Word2Vec(sentences, min_count=1) model.save(tmpf) self.models_equal(model, word2vec.Word2Vec.load(tmpf)) # test persistence of the KeyedVectors of a model wv = model.wv wv.save(tmpf) loaded_wv = keyedvectors.KeyedVectors.load(tmpf) self.assertTrue(np.allclose(wv.vectors, loaded_wv.vectors)) self.assertEqual(len(wv), len(loaded_wv)) def test_persistence_from_file(self): """Test storing/loading the entire model trained with corpus_file argument.""" with temporary_file(get_tmpfile('gensim_word2vec.tst')) as corpus_file: utils.save_as_line_sentence(sentences, corpus_file) tmpf = get_tmpfile('gensim_word2vec.tst') model = word2vec.Word2Vec(corpus_file=corpus_file, min_count=1) model.save(tmpf) self.models_equal(model, word2vec.Word2Vec.load(tmpf)) # test persistence of the KeyedVectors of a model wv = model.wv wv.save(tmpf) loaded_wv = keyedvectors.KeyedVectors.load(tmpf) self.assertTrue(np.allclose(wv.vectors, loaded_wv.vectors)) self.assertEqual(len(wv), len(loaded_wv)) def test_persistence_with_constructor_rule(self): """Test storing/loading the entire model with a vocab trimming rule passed in the constructor.""" tmpf = get_tmpfile('gensim_word2vec.tst') model = word2vec.Word2Vec(sentences, min_count=1, trim_rule=_rule) model.save(tmpf) self.models_equal(model, word2vec.Word2Vec.load(tmpf)) def test_rule_with_min_count(self): """Test that returning RULE_DEFAULT from trim_rule triggers min_count.""" model = word2vec.Word2Vec(sentences + [["occurs_only_once"]], min_count=2, trim_rule=_rule) self.assertTrue("human" not in model.wv) self.assertTrue("occurs_only_once" not in model.wv) self.assertTrue("interface" in model.wv) def test_rule(self): """Test applying vocab trim_rule to build_vocab instead of constructor.""" model = word2vec.Word2Vec(min_count=1) model.build_vocab(sentences, trim_rule=_rule) self.assertTrue("human" not in model.wv) def test_lambda_rule(self): """Test that lambda trim_rule works.""" def rule(word, count, min_count): return utils.RULE_DISCARD if word == "human" else utils.RULE_DEFAULT model = word2vec.Word2Vec(sentences, min_count=1, trim_rule=rule) self.assertTrue("human" not in model.wv) def obsolete_testLoadPreKeyedVectorModel(self): """Test loading pre-KeyedVectors word2vec model""" if sys.version_info[:2] == (3, 4): model_file_suffix = '_py3_4' elif sys.version_info < (3,): model_file_suffix = '_py2' else: model_file_suffix = '_py3' # Model stored in one file model_file = 'word2vec_pre_kv%s' % model_file_suffix model = word2vec.Word2Vec.load(datapath(model_file)) self.assertTrue(model.wv.vectors.shape == (len(model.wv), model.vector_size)) self.assertTrue(model.syn1neg.shape == (len(model.wv), model.vector_size)) # Model stored in multiple files model_file = 'word2vec_pre_kv_sep%s' % model_file_suffix model = word2vec.Word2Vec.load(datapath(model_file)) self.assertTrue(model.wv.vectors.shape == (len(model.wv), model.vector_size)) self.assertTrue(model.syn1neg.shape == (len(model.wv), model.vector_size)) def test_load_pre_keyed_vector_model_c_format(self): """Test loading pre-KeyedVectors word2vec model saved in word2vec format""" model = keyedvectors.KeyedVectors.load_word2vec_format(datapath('word2vec_pre_kv_c')) self.assertTrue(model.vectors.shape[0] == len(model)) def test_persistence_word2vec_format(self): """Test storing/loading the entire model in word2vec format.""" tmpf = get_tmpfile('gensim_word2vec.tst') model = word2vec.Word2Vec(sentences, min_count=1) model.wv.save_word2vec_format(tmpf, binary=True) binary_model_kv = keyedvectors.KeyedVectors.load_word2vec_format(tmpf, binary=True) self.assertTrue(np.allclose(model.wv['human'], binary_model_kv['human'])) norm_only_model = keyedvectors.KeyedVectors.load_word2vec_format(tmpf, binary=True) norm_only_model.unit_normalize_all() self.assertFalse(np.allclose(model.wv['human'], norm_only_model['human'])) self.assertTrue(np.allclose(model.wv.get_vector('human', norm=True), norm_only_model['human'])) limited_model_kv = keyedvectors.KeyedVectors.load_word2vec_format(tmpf, binary=True, limit=3) self.assertEqual(len(limited_model_kv.vectors), 3) half_precision_model_kv = keyedvectors.KeyedVectors.load_word2vec_format( tmpf, binary=True, datatype=np.float16 ) self.assertEqual(binary_model_kv.vectors.nbytes, half_precision_model_kv.vectors.nbytes * 2) def test_no_training_c_format(self): tmpf = get_tmpfile('gensim_word2vec.tst') model = word2vec.Word2Vec(sentences, min_count=1) model.wv.save_word2vec_format(tmpf, binary=True) kv = keyedvectors.KeyedVectors.load_word2vec_format(tmpf, binary=True) binary_model = word2vec.Word2Vec() binary_model.wv = kv self.assertRaises(ValueError, binary_model.train, sentences) def test_too_short_binary_word2vec_format(self): tfile = get_tmpfile('gensim_word2vec.tst') model = word2vec.Word2Vec(sentences, min_count=1) model.wv.save_word2vec_format(tfile, binary=True) f = open(tfile, 'r+b') f.write(b'13') # write wrong (too-long) vector count f.close() self.assertRaises(EOFError, keyedvectors.KeyedVectors.load_word2vec_format, tfile, binary=True) def test_too_short_text_word2vec_format(self): tfile = get_tmpfile('gensim_word2vec.tst') model = word2vec.Word2Vec(sentences, min_count=1) model.wv.save_word2vec_format(tfile, binary=False) f = open(tfile, 'r+b') f.write(b'13') # write wrong (too-long) vector count f.close() self.assertRaises(EOFError, keyedvectors.KeyedVectors.load_word2vec_format, tfile, binary=False) def test_persistence_word2vec_format_non_binary(self): """Test storing/loading the entire model in word2vec non-binary format.""" tmpf = get_tmpfile('gensim_word2vec.tst') model = word2vec.Word2Vec(sentences, min_count=1) model.wv.save_word2vec_format(tmpf, binary=False) text_model = keyedvectors.KeyedVectors.load_word2vec_format(tmpf, binary=False) self.assertTrue(np.allclose(model.wv['human'], text_model['human'], atol=1e-6)) norm_only_model = keyedvectors.KeyedVectors.load_word2vec_format(tmpf, binary=False) norm_only_model.unit_normalize_all() self.assertFalse(np.allclose(model.wv['human'], norm_only_model['human'], atol=1e-6)) self.assertTrue(np.allclose( model.wv.get_vector('human', norm=True), norm_only_model['human'], atol=1e-4 )) def test_persistence_word2vec_format_with_vocab(self): """Test storing/loading the entire model and vocabulary in word2vec format.""" tmpf = get_tmpfile('gensim_word2vec.tst') model = word2vec.Word2Vec(sentences, min_count=1) testvocab = get_tmpfile('gensim_word2vec.vocab') model.wv.save_word2vec_format(tmpf, testvocab, binary=True) binary_model_with_vocab_kv = keyedvectors.KeyedVectors.load_word2vec_format(tmpf, testvocab, binary=True) self.assertEqual( model.wv.get_vecattr('human', 'count'), binary_model_with_vocab_kv.get_vecattr('human', 'count'), ) def test_persistence_keyed_vectors_format_with_vocab(self): """Test storing/loading the entire model and vocabulary in word2vec format.""" tmpf = get_tmpfile('gensim_word2vec.tst') model = word2vec.Word2Vec(sentences, min_count=1) testvocab = get_tmpfile('gensim_word2vec.vocab') model.wv.save_word2vec_format(tmpf, testvocab, binary=True) kv_binary_model_with_vocab = keyedvectors.KeyedVectors.load_word2vec_format(tmpf, testvocab, binary=True) self.assertEqual( model.wv.get_vecattr('human', 'count'), kv_binary_model_with_vocab.get_vecattr('human', 'count'), ) def test_persistence_word2vec_format_combination_with_standard_persistence(self): """Test storing/loading the entire model and vocabulary in word2vec format chained with saving and loading via `save` and `load` methods`. It was possible prior to 1.0.0 release, now raises Exception""" tmpf = get_tmpfile('gensim_word2vec.tst') model = word2vec.Word2Vec(sentences, min_count=1) testvocab = get_tmpfile('gensim_word2vec.vocab') model.wv.save_word2vec_format(tmpf, testvocab, binary=True) binary_model_with_vocab_kv = keyedvectors.KeyedVectors.load_word2vec_format(tmpf, testvocab, binary=True) binary_model_with_vocab_kv.save(tmpf) self.assertRaises(AttributeError, word2vec.Word2Vec.load, tmpf) def test_large_mmap(self): """Test storing/loading the entire model.""" tmpf = get_tmpfile('gensim_word2vec.tst') model = word2vec.Word2Vec(sentences, min_count=1) # test storing the internal arrays into separate files model.save(tmpf, sep_limit=0) self.models_equal(model, word2vec.Word2Vec.load(tmpf)) # make sure mmaping the arrays back works, too self.models_equal(model, word2vec.Word2Vec.load(tmpf, mmap='r')) def test_vocab(self): """Test word2vec vocabulary building.""" corpus = LeeCorpus() total_words = sum(len(sentence) for sentence in corpus) # try vocab building explicitly, using all words model = word2vec.Word2Vec(min_count=1, hs=1, negative=0) model.build_vocab(corpus) self.assertTrue(len(model.wv) == 6981) # with min_count=1, we're not throwing away anything, # so make sure the word counts add up to be the entire corpus self.assertEqual(sum(model.wv.get_vecattr(k, 'count') for k in model.wv.key_to_index), total_words) # make sure the binary codes are correct np.allclose(model.wv.get_vecattr('the', 'code'), [1, 1, 0, 0]) # test building vocab with default params model = word2vec.Word2Vec(hs=1, negative=0) model.build_vocab(corpus) self.assertTrue(len(model.wv) == 1750) np.allclose(model.wv.get_vecattr('the', 'code'), [1, 1, 1, 0]) # no input => "RuntimeError: you must first build vocabulary before training the model" self.assertRaises(RuntimeError, word2vec.Word2Vec, []) # input not empty, but rather completely filtered out self.assertRaises(RuntimeError, word2vec.Word2Vec, corpus, min_count=total_words + 1) def test_training(self): """Test word2vec training.""" # build vocabulary, don't train yet model = word2vec.Word2Vec(vector_size=2, min_count=1, hs=1, negative=0) model.build_vocab(sentences) self.assertTrue(model.wv.vectors.shape == (len(model.wv), 2)) self.assertTrue(model.syn1.shape == (len(model.wv), 2)) model.train(sentences, total_examples=model.corpus_count, epochs=model.epochs) sims = model.wv.most_similar('graph', topn=10) # self.assertTrue(sims[0][0] == 'trees', sims) # most similar # test querying for "most similar" by vector graph_vector = model.wv.get_vector('graph', norm=True) sims2 = model.wv.most_similar(positive=[graph_vector], topn=11) sims2 = [(w, sim) for w, sim in sims2 if w != 'graph'] # ignore 'graph' itself self.assertEqual(sims, sims2) # build vocab and train in one step; must be the same as above model2 = word2vec.Word2Vec(sentences, vector_size=2, min_count=1, hs=1, negative=0) self.models_equal(model, model2) def test_training_from_file(self): """Test word2vec training with corpus_file argument.""" # build vocabulary, don't train yet with temporary_file(get_tmpfile('gensim_word2vec.tst')) as tf: utils.save_as_line_sentence(sentences, tf) model = word2vec.Word2Vec(vector_size=2, min_count=1, hs=1, negative=0) model.build_vocab(corpus_file=tf) self.assertTrue(model.wv.vectors.shape == (len(model.wv), 2)) self.assertTrue(model.syn1.shape == (len(model.wv), 2)) model.train(corpus_file=tf, total_words=model.corpus_total_words, epochs=model.epochs) sims = model.wv.most_similar('graph', topn=10) # self.assertTrue(sims[0][0] == 'trees', sims) # most similar # test querying for "most similar" by vector graph_vector = model.wv.get_vector('graph', norm=True) sims2 = model.wv.most_similar(positive=[graph_vector], topn=11) sims2 = [(w, sim) for w, sim in sims2 if w != 'graph'] # ignore 'graph' itself self.assertEqual(sims, sims2) def test_scoring(self): """Test word2vec scoring.""" model = word2vec.Word2Vec(sentences, vector_size=2, min_count=1, hs=1, negative=0) # just score and make sure they exist scores = model.score(sentences, len(sentences)) self.assertEqual(len(scores), len(sentences)) def test_locking(self): """Test word2vec training doesn't change locked vectors.""" corpus = LeeCorpus() # build vocabulary, don't train yet for sg in range(2): # test both cbow and sg model = word2vec.Word2Vec(vector_size=4, hs=1, negative=5, min_count=1, sg=sg, window=5) model.build_vocab(corpus) # remember two vectors locked0 = np.copy(model.wv.vectors[0]) unlocked1 = np.copy(model.wv.vectors[1]) # alocate a full lockf array (not just default single val for all) model.wv.vectors_lockf = np.ones(len(model.wv), dtype=np.float32) # lock the vector in slot 0 against change model.wv.vectors_lockf[0] = 0.0 model.train(corpus, total_examples=model.corpus_count, epochs=model.epochs) self.assertFalse((unlocked1 == model.wv.vectors[1]).all()) # unlocked vector should vary self.assertTrue((locked0 == model.wv.vectors[0]).all()) # locked vector should not vary def test_evaluate_word_analogies(self): """Test that evaluating analogies on KeyedVectors give sane results""" model = word2vec.Word2Vec(LeeCorpus()) score, sections = model.wv.evaluate_word_analogies(datapath('questions-words.txt')) self.assertGreaterEqual(score, 0.0) self.assertLessEqual(score, 1.0) self.assertGreater(len(sections), 0) # Check that dict contains the right keys first_section = sections[0] self.assertIn('section', first_section) self.assertIn('correct', first_section) self.assertIn('incorrect', first_section) def test_evaluate_word_pairs(self): """Test Spearman and Pearson correlation coefficients give sane results on similarity datasets""" corpus = word2vec.LineSentence(datapath('head500.noblanks.cor.bz2')) model = word2vec.Word2Vec(corpus, min_count=3, epochs=20) correlation = model.wv.evaluate_word_pairs(datapath('wordsim353.tsv')) pearson = correlation[0][0] spearman = correlation[1][0] oov = correlation[2] self.assertTrue(0.1 < pearson < 1.0, "pearson {pearson} not between 0.1 & 1.0") self.assertTrue(0.1 < spearman < 1.0, "spearman {spearman} not between 0.1 and 1.0") self.assertTrue(0.0 <= oov < 90.0, "OOV {oov} not between 0.0 and 90.0") def test_evaluate_word_pairs_from_file(self): """Test Spearman and Pearson correlation coefficients give sane results on similarity datasets""" with temporary_file(get_tmpfile('gensim_word2vec.tst')) as tf: utils.save_as_line_sentence(word2vec.LineSentence(datapath('head500.noblanks.cor.bz2')), tf) model = word2vec.Word2Vec(corpus_file=tf, min_count=3, epochs=20) correlation = model.wv.evaluate_word_pairs(datapath('wordsim353.tsv')) pearson = correlation[0][0] spearman = correlation[1][0] oov = correlation[2] self.assertTrue(0.1 < pearson < 1.0, f"pearson {pearson} not between 0.1 & 1.0") self.assertTrue(0.1 < spearman < 1.0, f"spearman {spearman} not between 0.1 and 1.0") self.assertTrue(0.0 <= oov < 90.0, f"OOV {oov} not between 0.0 and 90.0") def model_sanity(self, model, train=True, with_corpus_file=False, ranks=None): """Even tiny models trained on LeeCorpus should pass these sanity checks""" # run extra before/after training tests if train=True if train: model.build_vocab(lee_corpus_list) orig0 = np.copy(model.wv.vectors[0]) if with_corpus_file: tmpfile = get_tmpfile('gensim_word2vec.tst') utils.save_as_line_sentence(lee_corpus_list, tmpfile) model.train(corpus_file=tmpfile, total_words=model.corpus_total_words, epochs=model.epochs) else: model.train(lee_corpus_list, total_examples=model.corpus_count, epochs=model.epochs) self.assertFalse((orig0 == model.wv.vectors[1]).all()) # vector should vary after training query_word = 'attacks' expected_word = 'bombings' sims = model.wv.most_similar(query_word, topn=len(model.wv.index_to_key)) t_rank = [word for word, score in sims].index(expected_word) # in >200 calibration runs w/ calling parameters, 'terrorism' in 50-most_sim for 'war' if ranks is not None: ranks.append(t_rank) # tabulate trial rank if requested self.assertLess(t_rank, 50) query_vec = model.wv[query_word] sims2 = model.wv.most_similar([query_vec], topn=51) self.assertTrue(query_word in [word for word, score in sims2]) self.assertTrue(expected_word in [word for word, score in sims2]) def test_sg_hs(self): """Test skipgram w/ hierarchical softmax""" model = word2vec.Word2Vec(sg=1, window=4, hs=1, negative=0, min_count=5, epochs=10, workers=2) self.model_sanity(model) def test_sg_hs_fromfile(self): model = word2vec.Word2Vec(sg=1, window=4, hs=1, negative=0, min_count=5, epochs=10, workers=2) self.model_sanity(model, with_corpus_file=True) def test_sg_neg(self): """Test skipgram w/ negative sampling""" model = word2vec.Word2Vec(sg=1, window=4, hs=0, negative=15, min_count=5, epochs=10, workers=2) self.model_sanity(model) def test_sg_neg_fromfile(self): model = word2vec.Word2Vec(sg=1, window=4, hs=0, negative=15, min_count=5, epochs=10, workers=2) self.model_sanity(model, with_corpus_file=True) @unittest.skipIf('BULK_TEST_REPS' not in os.environ, reason="bulk test only occasionally run locally") def test_method_in_bulk(self): """Not run by default testing, but can be run locally to help tune stochastic aspects of tests to very-very-rarely fail. EG: % BULK_TEST_REPS=200 METHOD_NAME=test_cbow_hs pytest test_word2vec.py -k "test_method_in_bulk" Method must accept `ranks` keyword-argument, empty list into which salient internal result can be reported. """ failures = 0 ranks = [] reps = int(os.environ['BULK_TEST_REPS']) method_name = os.environ.get('METHOD_NAME', 'test_cbow_hs') # by default test that specially-troublesome one method_fn = getattr(self, method_name) for i in range(reps): try: method_fn(ranks=ranks) except Exception as ex: print('%s failed: %s' % (method_name, ex)) failures += 1 print(ranks) print(np.mean(ranks)) self.assertEquals(failures, 0, "too many failures") def test_cbow_hs(self, ranks=None): """Test CBOW w/ hierarchical softmax""" model = word2vec.Word2Vec( sg=0, cbow_mean=1, alpha=0.1, window=2, hs=1, negative=0, min_count=5, epochs=60, workers=2, batch_words=1000 ) self.model_sanity(model, ranks=ranks) def test_cbow_hs_fromfile(self): model = word2vec.Word2Vec( sg=0, cbow_mean=1, alpha=0.1, window=2, hs=1, negative=0, min_count=5, epochs=60, workers=2, batch_words=1000 ) self.model_sanity(model, with_corpus_file=True) def test_cbow_neg(self, ranks=None): """Test CBOW w/ negative sampling""" model = word2vec.Word2Vec( sg=0, cbow_mean=1, alpha=0.05, window=5, hs=0, negative=15, min_count=5, epochs=10, workers=2, sample=0 ) self.model_sanity(model, ranks=ranks) def test_cbow_neg_fromfile(self): model = word2vec.Word2Vec( sg=0, cbow_mean=1, alpha=0.05, window=5, hs=0, negative=15, min_count=5, epochs=10, workers=2, sample=0 ) self.model_sanity(model, with_corpus_file=True) def test_sg_fixedwindowsize(self): """Test skipgram with fixed window size. Use NS.""" model = word2vec.Word2Vec( sg=1, window=5, shrink_windows=False, hs=0, negative=15, min_count=5, epochs=10, workers=2 ) self.model_sanity(model) def test_sg_fixedwindowsize_fromfile(self): """Test skipgram with fixed window size. Use HS and train from file.""" model = word2vec.Word2Vec( sg=1, window=5, shrink_windows=False, hs=1, negative=0, min_count=5, epochs=10, workers=2 ) self.model_sanity(model, with_corpus_file=True) def test_cbow_fixedwindowsize(self, ranks=None): """Test CBOW with fixed window size. Use HS.""" model = word2vec.Word2Vec( sg=0, cbow_mean=1, alpha=0.1, window=5, shrink_windows=False, hs=1, negative=0, min_count=5, epochs=10, workers=2 ) self.model_sanity(model, ranks=ranks) def test_cbow_fixedwindowsize_fromfile(self): """Test CBOW with fixed window size. Use NS and train from file.""" model = word2vec.Word2Vec( sg=0, cbow_mean=1, alpha=0.1, window=5, shrink_windows=False, hs=0, negative=15, min_count=5, epochs=10, workers=2 ) self.model_sanity(model, with_corpus_file=True) def test_cosmul(self): model = word2vec.Word2Vec(sentences, vector_size=2, min_count=1, hs=1, negative=0) sims = model.wv.most_similar_cosmul('graph', topn=10) # self.assertTrue(sims[0][0] == 'trees', sims) # most similar # test querying for "most similar" by vector graph_vector = model.wv.get_vector('graph', norm=True) sims2 = model.wv.most_similar_cosmul(positive=[graph_vector], topn=11) sims2 = [(w, sim) for w, sim in sims2 if w != 'graph'] # ignore 'graph' itself self.assertEqual(sims, sims2) def test_training_cbow(self): """Test CBOW word2vec training.""" # to test training, make the corpus larger by repeating its sentences over and over # build vocabulary, don't train yet model = word2vec.Word2Vec(vector_size=2, min_count=1, sg=0, hs=1, negative=0) model.build_vocab(sentences) self.assertTrue(model.wv.vectors.shape == (len(model.wv), 2)) self.assertTrue(model.syn1.shape == (len(model.wv), 2)) model.train(sentences, total_examples=model.corpus_count, epochs=model.epochs) sims = model.wv.most_similar('graph', topn=10) # self.assertTrue(sims[0][0] == 'trees', sims) # most similar # test querying for "most similar" by vector graph_vector = model.wv.get_vector('graph', norm=True) sims2 = model.wv.most_similar(positive=[graph_vector], topn=11) sims2 = [(w, sim) for w, sim in sims2 if w != 'graph'] # ignore 'graph' itself self.assertEqual(sims, sims2) # build vocab and train in one step; must be the same as above model2 = word2vec.Word2Vec(sentences, vector_size=2, min_count=1, sg=0, hs=1, negative=0) self.models_equal(model, model2) def test_training_sg_negative(self): """Test skip-gram (negative sampling) word2vec training.""" # to test training, make the corpus larger by repeating its sentences over and over # build vocabulary, don't train yet model = word2vec.Word2Vec(vector_size=2, min_count=1, sg=1, hs=0, negative=2) model.build_vocab(sentences) self.assertTrue(model.wv.vectors.shape == (len(model.wv), 2)) self.assertTrue(model.syn1neg.shape == (len(model.wv), 2)) model.train(sentences, total_examples=model.corpus_count, epochs=model.epochs) sims = model.wv.most_similar('graph', topn=10) # self.assertTrue(sims[0][0] == 'trees', sims) # most similar # test querying for "most similar" by vector graph_vector = model.wv.get_vector('graph', norm=True) sims2 = model.wv.most_similar(positive=[graph_vector], topn=11) sims2 = [(w, sim) for w, sim in sims2 if w != 'graph'] # ignore 'graph' itself self.assertEqual(sims, sims2) # build vocab and train in one step; must be the same as above model2 = word2vec.Word2Vec(sentences, vector_size=2, min_count=1, sg=1, hs=0, negative=2) self.models_equal(model, model2) def test_training_cbow_negative(self): """Test CBOW (negative sampling) word2vec training.""" # to test training, make the corpus larger by repeating its sentences over and over # build vocabulary, don't train yet model = word2vec.Word2Vec(vector_size=2, min_count=1, sg=0, hs=0, negative=2) model.build_vocab(sentences) self.assertTrue(model.wv.vectors.shape == (len(model.wv), 2)) self.assertTrue(model.syn1neg.shape == (len(model.wv), 2)) model.train(sentences, total_examples=model.corpus_count, epochs=model.epochs) sims = model.wv.most_similar('graph', topn=10) # self.assertTrue(sims[0][0] == 'trees', sims) # most similar # test querying for "most similar" by vector graph_vector = model.wv.get_vector('graph', norm=True) sims2 = model.wv.most_similar(positive=[graph_vector], topn=11) sims2 = [(w, sim) for w, sim in sims2 if w != 'graph'] # ignore 'graph' itself self.assertEqual(sims, sims2) # build vocab and train in one step; must be the same as above model2 = word2vec.Word2Vec(sentences, vector_size=2, min_count=1, sg=0, hs=0, negative=2) self.models_equal(model, model2) def test_similarities(self): """Test similarity and n_similarity methods.""" # The model is trained using CBOW model = word2vec.Word2Vec(vector_size=2, min_count=1, sg=0, hs=0, negative=2) model.build_vocab(sentences) model.train(sentences, total_examples=model.corpus_count, epochs=model.epochs) self.assertTrue(model.wv.n_similarity(['graph', 'trees'], ['trees', 'graph'])) self.assertTrue(model.wv.n_similarity(['graph'], ['trees']) == model.wv.similarity('graph', 'trees')) self.assertRaises(ZeroDivisionError, model.wv.n_similarity, ['graph', 'trees'], []) self.assertRaises(ZeroDivisionError, model.wv.n_similarity, [], ['graph', 'trees']) self.assertRaises(ZeroDivisionError, model.wv.n_similarity, [], []) def test_similar_by(self): """Test word2vec similar_by_word and similar_by_vector.""" model = word2vec.Word2Vec(sentences, vector_size=2, min_count=1, hs=1, negative=0) wordsims = model.wv.similar_by_word('graph', topn=10) wordsims2 = model.wv.most_similar(positive='graph', topn=10) vectorsims = model.wv.similar_by_vector(model.wv['graph'], topn=10) vectorsims2 = model.wv.most_similar([model.wv['graph']], topn=10) self.assertEqual(wordsims, wordsims2) self.assertEqual(vectorsims, vectorsims2) def test_parallel(self): """Test word2vec parallel training.""" corpus = utils.RepeatCorpus(LeeCorpus(), 10000) # repeats about 33 times for workers in [4, ]: # [4, 2] model = word2vec.Word2Vec(corpus, vector_size=16, min_count=(10 * 33), workers=workers) origin_word = 'israeli' expected_neighbor = 'palestinian' sims = model.wv.most_similar(origin_word, topn=len(model.wv)) # the exact vectors and therefore similarities may differ, due to different thread collisions/randomization # so let's test only for top10 neighbor_rank = [word for word, sim in sims].index(expected_neighbor) self.assertLess(neighbor_rank, 20) def test_r_n_g(self): """Test word2vec results identical with identical RNG seed.""" model = word2vec.Word2Vec(sentences, min_count=2, seed=42, workers=1) model2 = word2vec.Word2Vec(sentences, min_count=2, seed=42, workers=1) self.models_equal(model, model2) def models_equal(self, model, model2): self.assertEqual(len(model.wv), len(model2.wv)) self.assertTrue(np.allclose(model.wv.vectors, model2.wv.vectors)) if model.hs: self.assertTrue(np.allclose(model.syn1, model2.syn1)) if model.negative: self.assertTrue(np.allclose(model.syn1neg, model2.syn1neg)) most_common_word_index = np.argsort(model.wv.expandos['count'])[-1] most_common_word = model.wv.index_to_key[most_common_word_index] self.assertTrue(np.allclose(model.wv[most_common_word], model2.wv[most_common_word])) def test_predict_output_word(self): '''Test word2vec predict_output_word method handling for negative sampling scheme''' # under normal circumstances model_with_neg = word2vec.Word2Vec(sentences, min_count=1) predictions_with_neg = model_with_neg.predict_output_word(['system', 'human'], topn=5) self.assertTrue(len(predictions_with_neg) == 5) # out-of-vobaculary scenario predictions_out_of_vocab = model_with_neg.predict_output_word(['some', 'random', 'words'], topn=5) self.assertEqual(predictions_out_of_vocab, None) # when required model parameters have been deleted tmpf = get_tmpfile('gensim_word2vec.tst') model_with_neg.wv.save_word2vec_format(tmpf, binary=True) kv_model_with_neg = keyedvectors.KeyedVectors.load_word2vec_format(tmpf, binary=True) binary_model_with_neg = word2vec.Word2Vec() binary_model_with_neg.wv = kv_model_with_neg self.assertRaises(RuntimeError, binary_model_with_neg.predict_output_word, ['system', 'human']) # negative sampling scheme not used model_without_neg = word2vec.Word2Vec(sentences, min_count=1, negative=0) self.assertRaises(RuntimeError, model_without_neg.predict_output_word, ['system', 'human']) # passing indices instead of words in context str_context = ['system', 'human'] mixed_context = [model_with_neg.wv.get_index(str_context[0]), str_context[1]] idx_context = [model_with_neg.wv.get_index(w) for w in str_context] prediction_from_str = model_with_neg.predict_output_word(str_context, topn=5) prediction_from_mixed = model_with_neg.predict_output_word(mixed_context, topn=5) prediction_from_idx = model_with_neg.predict_output_word(idx_context, topn=5) self.assertEqual(prediction_from_str, prediction_from_mixed) self.assertEqual(prediction_from_str, prediction_from_idx) def test_load_old_model(self): """Test loading an old word2vec model of indeterminate version""" model_file = 'word2vec_old' # which version?!? model = word2vec.Word2Vec.load(datapath(model_file)) self.assertTrue(model.wv.vectors.shape == (12, 100)) self.assertTrue(len(model.wv) == 12) self.assertTrue(len(model.wv.index_to_key) == 12) self.assertTrue(model.syn1neg.shape == (len(model.wv), model.wv.vector_size)) self.assertTrue(len(model.wv.vectors_lockf.shape) > 0) self.assertTrue(model.cum_table.shape == (12,)) self.onlineSanity(model, trained_model=True) def test_load_old_model_separates(self): """Test loading an old word2vec model of indeterminate version""" # Model stored in multiple files model_file = 'word2vec_old_sep' model = word2vec.Word2Vec.load(datapath(model_file)) self.assertTrue(model.wv.vectors.shape == (12, 100)) self.assertTrue(len(model.wv) == 12) self.assertTrue(len(model.wv.index_to_key) == 12) self.assertTrue(model.syn1neg.shape == (len(model.wv), model.wv.vector_size)) self.assertTrue(len(model.wv.vectors_lockf.shape) > 0) self.assertTrue(model.cum_table.shape == (12,)) self.onlineSanity(model, trained_model=True) def obsolete_test_load_old_models_pre_1_0(self): """Test loading pre-1.0 models""" # load really old model model_file = 'w2v-lee-v0.12.0' model = word2vec.Word2Vec.load(datapath(model_file)) self.onlineSanity(model, trained_model=True) old_versions = [ '0.12.0', '0.12.1', '0.12.2', '0.12.3', '0.12.4', '0.13.0', '0.13.1', '0.13.2', '0.13.3', '0.13.4', ] for old_version in old_versions: self._check_old_version(old_version) def test_load_old_models_1_x(self): """Test loading 1.x models""" old_versions = [ '1.0.0', '1.0.1', ] for old_version in old_versions: self._check_old_version(old_version) def test_load_old_models_2_x(self): """Test loading 2.x models""" old_versions = [ '2.0.0', '2.1.0', '2.2.0', '2.3.0', ] for old_version in old_versions: self._check_old_version(old_version) def test_load_old_models_3_x(self): """Test loading 3.x models""" # test for max_final_vocab for model saved in 3.3 model_file = 'word2vec_3.3' model = word2vec.Word2Vec.load(datapath(model_file)) self.assertEqual(model.max_final_vocab, None) self.assertEqual(model.max_final_vocab, None) old_versions = [ '3.0.0', '3.1.0', '3.2.0', '3.3.0', '3.4.0' ] for old_version in old_versions: self._check_old_version(old_version) def _check_old_version(self, old_version): logging.info("TESTING LOAD of %s Word2Vec MODEL", old_version) saved_models_dir = datapath('old_w2v_models/w2v_{}.mdl') model = word2vec.Word2Vec.load(saved_models_dir.format(old_version)) self.assertIsNone(model.corpus_total_words) self.assertTrue(len(model.wv) == 3) try: self.assertTrue(model.wv.vectors.shape == (3, 4)) except AttributeError as ae: print("WV") print(model.wv) print(dir(model.wv)) print(model.wv.syn0) raise ae # check if similarity search and online training works. self.assertTrue(len(model.wv.most_similar('sentence')) == 2) model.build_vocab(lee_corpus_list, update=True) model.train(lee_corpus_list, total_examples=model.corpus_count, epochs=model.epochs) # check if similarity search and online training works after saving and loading back the model. tmpf = get_tmpfile('gensim_word2vec.tst') model.save(tmpf) loaded_model = word2vec.Word2Vec.load(tmpf) loaded_model.build_vocab(lee_corpus_list, update=True) loaded_model.train(lee_corpus_list, total_examples=model.corpus_count, epochs=model.epochs) @log_capture() def test_build_vocab_warning(self, loglines): """Test if warning is raised on non-ideal input to a word2vec model""" sentences = ['human', 'machine'] model = word2vec.Word2Vec() model.build_vocab(sentences) warning = "Each 'sentences' item should be a list of words (usually unicode strings)." self.assertTrue(warning in str(loglines)) @log_capture() def test_train_warning(self, loglines): """Test if warning is raised if alpha rises during subsequent calls to train()""" sentences = [ ['human'], ['graph', 'trees'] ] model = word2vec.Word2Vec(min_count=1) model.build_vocab(sentences) for epoch in range(10): model.train(sentences, total_examples=model.corpus_count, epochs=model.epochs) model.alpha -= 0.002 model.min_alpha = model.alpha if epoch == 5: model.alpha += 0.05 warning = "Effective 'alpha' higher than previous training cycles" self.assertTrue(warning in str(loglines)) def test_train_with_explicit_param(self): model = word2vec.Word2Vec(vector_size=2, min_count=1, hs=1, negative=0) model.build_vocab(sentences) with self.assertRaises(ValueError): model.train(sentences, total_examples=model.corpus_count) with self.assertRaises(ValueError): model.train(sentences, epochs=model.epochs) with self.assertRaises(ValueError): model.train(sentences) def test_sentences_should_not_be_a_generator(self): """ Is sentences a generator object? """ gen = (s for s in sentences) self.assertRaises(TypeError, word2vec.Word2Vec, (gen,)) def test_load_on_class_error(self): """Test if exception is raised when loading word2vec model on instance""" self.assertRaises(AttributeError, load_on_instance) def test_reset_from(self): """Test if reset_from() uses pre-built structures from other model""" model = word2vec.Word2Vec(sentences, min_count=1) other_model = word2vec.Word2Vec(new_sentences, min_count=1) model.reset_from(other_model) self.assertEqual(model.wv.key_to_index, other_model.wv.key_to_index) def test_compute_training_loss(self): model = word2vec.Word2Vec(min_count=1, sg=1, negative=5, hs=1) model.build_vocab(sentences) model.train(sentences, compute_loss=True, total_examples=model.corpus_count, epochs=model.epochs) training_loss_val = model.get_latest_training_loss() self.assertTrue(training_loss_val > 0.0) # endclass TestWord2VecModel class TestWMD(unittest.TestCase): @unittest.skipIf(PYEMD_EXT is False, "pyemd not installed") def test_nonzero(self): '''Test basic functionality with a test sentence.''' model = word2vec.Word2Vec(sentences, min_count=2, seed=42, workers=1) sentence1 = ['human', 'interface', 'computer'] sentence2 = ['survey', 'user', 'computer', 'system', 'response', 'time'] distance = model.wv.wmdistance(sentence1, sentence2) # Check that distance is non-zero. self.assertFalse(distance == 0.0) @unittest.skipIf(PYEMD_EXT is False, "pyemd not installed") def test_symmetry(self): '''Check that distance is symmetric.''' model = word2vec.Word2Vec(sentences, min_count=2, seed=42, workers=1) sentence1 = ['human', 'interface', 'computer'] sentence2 = ['survey', 'user', 'computer', 'system', 'response', 'time'] distance1 = model.wv.wmdistance(sentence1, sentence2) distance2 = model.wv.wmdistance(sentence2, sentence1) self.assertTrue(np.allclose(distance1, distance2)) @unittest.skipIf(PYEMD_EXT is False, "pyemd not installed") def test_identical_sentences(self): '''Check that the distance from a sentence to itself is zero.''' model = word2vec.Word2Vec(sentences, min_count=1) sentence = ['survey', 'user', 'computer', 'system', 'response', 'time'] distance = model.wv.wmdistance(sentence, sentence) self.assertEqual(0.0, distance) class TestWord2VecSentenceIterators(unittest.TestCase): def test_line_sentence_works_with_filename(self): """Does LineSentence work with a filename argument?""" with utils.open(datapath('lee_background.cor'), 'rb') as orig: sentences = word2vec.LineSentence(datapath('lee_background.cor')) for words in sentences: self.assertEqual(words, utils.to_unicode(orig.readline()).split()) def test_cython_line_sentence_works_with_filename(self): """Does CythonLineSentence work with a filename argument?""" from gensim.models import word2vec_corpusfile with utils.open(datapath('lee_background.cor'), 'rb') as orig: sentences = word2vec_corpusfile.CythonLineSentence(datapath('lee_background.cor')) for words in sentences: self.assertEqual(words, orig.readline().split()) def test_line_sentence_works_with_compressed_file(self): """Does LineSentence work with a compressed file object argument?""" with utils.open(datapath('head500.noblanks.cor'), 'rb') as orig: sentences = word2vec.LineSentence(bz2.BZ2File(datapath('head500.noblanks.cor.bz2'))) for words in sentences: self.assertEqual(words, utils.to_unicode(orig.readline()).split()) def test_line_sentence_works_with_normal_file(self): """Does LineSentence work with a file object argument, rather than filename?""" with utils.open(datapath('head500.noblanks.cor'), 'rb') as orig: with utils.open(datapath('head500.noblanks.cor'), 'rb') as fin: sentences = word2vec.LineSentence(fin) for words in sentences: self.assertEqual(words, utils.to_unicode(orig.readline()).split()) def test_path_line_sentences(self): """Does PathLineSentences work with a path argument?""" with utils.open(os.path.join(datapath('PathLineSentences'), '1.txt'), 'rb') as orig1: with utils.open(os.path.join(datapath('PathLineSentences'), '2.txt.bz2'), 'rb') as orig2: sentences = word2vec.PathLineSentences(datapath('PathLineSentences')) orig = orig1.readlines() + orig2.readlines() orig_counter = 0 # to go through orig while matching PathLineSentences for words in sentences: self.assertEqual(words, utils.to_unicode(orig[orig_counter]).split()) orig_counter += 1 def test_path_line_sentences_one_file(self): """Does PathLineSentences work with a single file argument?""" test_file = os.path.join(datapath('PathLineSentences'), '1.txt') with utils.open(test_file, 'rb') as orig: sentences = word2vec.PathLineSentences(test_file) for words in sentences: self.assertEqual(words, utils.to_unicode(orig.readline()).split()) # endclass TestWord2VecSentenceIterators # TODO: get correct path to Python binary # class TestWord2VecScripts(unittest.TestCase): # def test_word2vec_stand_alone_script(self): # """Does Word2Vec script launch standalone?""" # cmd = 'python -m gensim.scripts.word2vec_standalone -train ' + datapath('testcorpus.txt') + \ # ' -output vec.txt -size 200 -sample 1e-4 -binary 0 -iter 3 -min_count 1' # output = check_output(cmd, stderr=PIPE) # self.assertEqual(output, '0') # #endclass TestWord2VecScripts if not hasattr(TestWord2VecModel, 'assertLess'): # workaround for python 2.6 def assertLess(self, a, b, msg=None): self.assertTrue(a < b, msg="%s is not less than %s" % (a, b)) setattr(TestWord2VecModel, 'assertLess', assertLess) if __name__ == '__main__': logging.basicConfig( format='%(asctime)s : %(threadName)s : %(levelname)s : %(message)s', level=logging.DEBUG ) unittest.main(module='gensim.test.test_word2vec')