import numpy as np from numpy.testing import assert_equal, assert_, assert_allclose from statsmodels.regression.linear_model import OLS from statsmodels.genmod.generalized_linear_model import GLM from statsmodels.genmod.families import Binomial from statsmodels.base.distributed_estimation import _calc_grad, \ _calc_wdesign_mat, _est_regularized_debiased, _join_debiased, \ _est_regularized_naive, _est_unregularized_naive, _join_naive, \ DistributedModel def _data_gen(endog, exog, partitions): """partitions data""" n_exog = exog.shape[0] n_part = np.ceil(n_exog / partitions) n_part = np.floor(n_exog / partitions) rem = n_exog - n_part * partitions stp = 0 while stp < (partitions - 1): ii = int(n_part * stp) jj = int(n_part * (stp + 1)) yield endog[ii:jj], exog[ii:jj, :] stp += 1 ii = int(n_part * stp) jj = int(n_part * (stp + 1) + rem) yield endog[ii:jj], exog[ii:jj, :] def test_calc_grad(): # separately tests that _calc_grad returns # sensible results # # regression test np.random.seed(435265) X = np.random.normal(size=(50, 3)) y = np.random.randint(0, 2, size=50) beta = np.random.normal(size=3) mod = OLS(y, X) grad = _calc_grad(mod, beta, 0.01, 1, {}) assert_allclose(grad, np.array([19.75816, -6.62307, 7.324644]), atol=1e-6, rtol=0) def test_calc_wdesign_mat(): # separately tests that _calc_wdesign_mat # returns sensible results # # regression test np.random.seed(435265) X = np.random.normal(size=(3, 3)) y = np.random.randint(0, 2, size=3) beta = np.random.normal(size=3) mod = OLS(y, X) dmat = _calc_wdesign_mat(mod, beta, {}) assert_allclose(dmat, np.array([[1.306314, -0.024897, 1.326498], [-0.539219, -0.483028, -0.703503], [-3.327987, 0.524541, -0.139761]]), atol=1e-6, rtol=0) mod = GLM(y, X, family=Binomial()) dmat = _calc_wdesign_mat(mod, beta, {}) assert_allclose(dmat, np.array([[0.408616, -0.007788, 0.41493], [-0.263292, -0.235854, -0.343509], [-0.11241, 0.017718, -0.004721]]), atol=1e-6, rtol=0) def test_est_regularized_debiased(): # tests that the shape of all the intermediate steps # remains correct for regularized debiased estimation, # does this for OLS and GLM np.random.seed(435265) X = np.random.normal(size=(50, 3)) y = np.random.randint(0, 2, size=50) beta = np.random.normal(size=3) mod = OLS(y, X) res = _est_regularized_debiased(mod, 0, 2, fit_kwds={"alpha": 0.5}) bhat = res[0] grad = res[1] ghat_l = res[2] that_l = res[3] assert_(isinstance(res, tuple)) assert_equal(bhat.shape, beta.shape) assert_equal(grad.shape, beta.shape) assert_(isinstance(ghat_l, list)) assert_(isinstance(that_l, list)) assert_equal(len(ghat_l), len(that_l)) assert_equal(ghat_l[0].shape, (2,)) assert_(isinstance(that_l[0], float)) mod = GLM(y, X, family=Binomial()) res = _est_regularized_debiased(mod, 0, 2, fit_kwds={"alpha": 0.5}) bhat = res[0] grad = res[1] ghat_l = res[2] that_l = res[3] assert_(isinstance(res, tuple)) assert_equal(bhat.shape, beta.shape) assert_equal(grad.shape, beta.shape) assert_(isinstance(ghat_l, list)) assert_(isinstance(that_l, list)) assert_equal(len(ghat_l), len(that_l)) assert_equal(ghat_l[0].shape, (2,)) assert_(isinstance(that_l[0], float)) def test_est_regularized_naive(): # tests that the shape of all the intermediate steps # remains correct for regularized naive estimation, # does this for OLS and GLM np.random.seed(435265) X = np.random.normal(size=(50, 3)) y = np.random.randint(0, 2, size=50) beta = np.random.normal(size=3) mod = OLS(y, X) res = _est_regularized_naive(mod, 0, 2, fit_kwds={"alpha": 0.5}) assert_equal(res.shape, beta.shape) mod = GLM(y, X, family=Binomial()) res = _est_regularized_naive(mod, 0, 2, fit_kwds={"alpha": 0.5}) assert_equal(res.shape, beta.shape) def test_est_unregularized_naive(): # tests that the shape of all the intermediate steps # remains correct for unregularized naive estimation, # does this for OLS and GLM np.random.seed(435265) X = np.random.normal(size=(50, 3)) y = np.random.randint(0, 2, size=50) beta = np.random.normal(size=3) mod = OLS(y, X) res = _est_unregularized_naive(mod, 0, 2, fit_kwds={"alpha": 0.5}) assert_equal(res.shape, beta.shape) mod = GLM(y, X, family=Binomial()) res = _est_unregularized_naive(mod, 0, 2, fit_kwds={"alpha": 0.5}) assert_equal(res.shape, beta.shape) def test_join_debiased(): # tests that the results of all the intermediate steps # remains correct for debiased join, does this for OLS and GLM # # regression test np.random.seed(435265) X = np.random.normal(size=(50, 3)) y = np.random.randint(0, 2, size=50) mod = OLS(y, X) res_l = [] for i in range(2): res = _est_regularized_debiased(mod, i, 2, fit_kwds={"alpha": 0.1}) res_l.append(res) joined = _join_debiased(res_l) assert_allclose(joined, np.array([-0.167548, -0.016567, -0.34414]), atol=1e-6, rtol=0) mod = GLM(y, X, family=Binomial()) res_l = [] for i in range(2): res = _est_regularized_debiased(mod, i, 2, fit_kwds={"alpha": 0.1}) res_l.append(res) joined = _join_debiased(res_l) assert_allclose(joined, np.array([-0.164515, -0.412854, -0.223955]), atol=1e-6, rtol=0) def test_join_naive(): # tests that the results of all the intermediate steps # remains correct for naive join, does this for OLS and GLM # # regression test np.random.seed(435265) X = np.random.normal(size=(50, 3)) y = np.random.randint(0, 2, size=50) mod = OLS(y, X) res_l = [] for i in range(2): res = _est_regularized_naive(mod, i, 2, fit_kwds={"alpha": 0.1}) res_l.append(res) joined = _join_naive(res_l) assert_allclose(joined, np.array([-0.020757, 0., 0.]), atol=1e-6, rtol=0) mod = GLM(y, X, family=Binomial()) res_l = [] for i in range(2): res = _est_regularized_naive(mod, i, 2, fit_kwds={"alpha": 0.1}) res_l.append(res) joined = _join_naive(res_l) assert_allclose(joined, np.array([0., 0., 0.]), atol=1e-6, rtol=0) def test_fit_sequential(): # tests that the results of all the intermediate steps # remains correct for sequential fit, does this for OLS and GLM # and a variety of model sizes # # regression test np.random.seed(435265) X = np.random.normal(size=(50, 3)) y = np.random.randint(0, 2, size=50) mod = DistributedModel(1, model_class=OLS) fit = mod.fit(_data_gen(y, X, 1), parallel_method="sequential", fit_kwds={"alpha": 0.5}) assert_allclose(fit.params, np.array([-0.191606, -0.012565, -0.351398]), atol=1e-6, rtol=0) mod = DistributedModel(2, model_class=OLS) fit = mod.fit(_data_gen(y, X, 2), parallel_method="sequential", fit_kwds={"alpha": 0.5}) assert_allclose(fit.params, np.array([-0.157416, -0.029643, -0.471653]), atol=1e-6, rtol=0) mod = DistributedModel(3, model_class=OLS) fit = mod.fit(_data_gen(y, X, 3), parallel_method="sequential", fit_kwds={"alpha": 0.5}) assert_allclose(fit.params, np.array([-0.124891, -0.050934, -0.403354]), atol=1e-6, rtol=0) mod = DistributedModel(1, model_class=GLM, init_kwds={"family": Binomial()}) fit = mod.fit(_data_gen(y, X, 1), parallel_method="sequential", fit_kwds={"alpha": 0.5}) assert_allclose(fit.params, np.array([-0.164515, -0.412854, -0.223955]), atol=1e-6, rtol=0) mod = DistributedModel(2, model_class=GLM, init_kwds={"family": Binomial()}) fit = mod.fit(_data_gen(y, X, 2), parallel_method="sequential", fit_kwds={"alpha": 0.5}) assert_allclose(fit.params, np.array([-0.142513, -0.360324, -0.295485]), atol=1e-6, rtol=0) mod = DistributedModel(3, model_class=GLM, init_kwds={"family": Binomial()}) fit = mod.fit(_data_gen(y, X, 3), parallel_method="sequential", fit_kwds={"alpha": 0.5}) assert_allclose(fit.params, np.array([-0.110487, -0.306431, -0.243921]), atol=1e-6, rtol=0) def test_fit_joblib(): # tests that the results of all the intermediate steps # remains correct for joblib fit, does this for OLS and GLM # and a variety of model sizes # # regression test np.random.seed(435265) X = np.random.normal(size=(50, 3)) y = np.random.randint(0, 2, size=50) mod = DistributedModel(1, model_class=OLS) fit = mod.fit(_data_gen(y, X, 1), parallel_method="joblib", fit_kwds={"alpha": 0.5}) assert_allclose(fit.params, np.array([-0.191606, -0.012565, -0.351398]), atol=1e-6, rtol=0) mod = DistributedModel(2, model_class=OLS) fit = mod.fit(_data_gen(y, X, 2), parallel_method="joblib", fit_kwds={"alpha": 0.5}) assert_allclose(fit.params, np.array([-0.157416, -0.029643, -0.471653]), atol=1e-6, rtol=0) mod = DistributedModel(3, model_class=OLS) fit = mod.fit(_data_gen(y, X, 3), parallel_method="joblib", fit_kwds={"alpha": 0.5}) assert_allclose(fit.params, np.array([-0.124891, -0.050934, -0.403354]), atol=1e-6, rtol=0) mod = DistributedModel(1, model_class=GLM, init_kwds={"family": Binomial()}) fit = mod.fit(_data_gen(y, X, 1), parallel_method="joblib", fit_kwds={"alpha": 0.5}) assert_allclose(fit.params, np.array([-0.164515, -0.412854, -0.223955]), atol=1e-6, rtol=0) mod = DistributedModel(2, model_class=GLM, init_kwds={"family": Binomial()}) fit = mod.fit(_data_gen(y, X, 2), parallel_method="joblib", fit_kwds={"alpha": 0.5}) assert_allclose(fit.params, np.array([-0.142513, -0.360324, -0.295485]), atol=1e-6, rtol=0) mod = DistributedModel(3, model_class=GLM, init_kwds={"family": Binomial()}) fit = mod.fit(_data_gen(y, X, 3), parallel_method="joblib", fit_kwds={"alpha": 0.5}) assert_allclose(fit.params, np.array([-0.110487, -0.306431, -0.243921]), atol=1e-6, rtol=0) def test_single_partition(): # tests that the results make sense if we have a single partition np.random.seed(435265) N = 200 p = 10 m = 1 beta = np.random.normal(size=p) beta = beta * np.random.randint(0, 2, p) X = np.random.normal(size=(N, p)) y = X.dot(beta) + np.random.normal(size=N) # test regularized OLS v. naive db_mod = DistributedModel(m) fitOLSdb = db_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0}) nv_mod = DistributedModel(m, estimation_method=_est_regularized_naive, join_method=_join_naive) fitOLSnv = nv_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0}) ols_mod = OLS(y, X) fitOLS = ols_mod.fit(alpha=0) assert_allclose(fitOLSdb.params, fitOLS.params) assert_allclose(fitOLSnv.params, fitOLS.params) # test regularized nv_mod = DistributedModel(m, estimation_method=_est_regularized_naive, join_method=_join_naive) fitOLSnv = nv_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0.1}) ols_mod = OLS(y, X) fitOLS = ols_mod.fit_regularized(alpha=0.1) assert_allclose(fitOLSnv.params, fitOLS.params) def test_larger_p(): # tests when p > N / m for the debiased and naive case np.random.seed(435265) N = 40 p = 40 m = 5 beta = np.random.normal(size=p) beta = beta * np.random.randint(0, 2, p) X = np.random.normal(size=(N, p)) y = X.dot(beta) + np.random.normal(size=N) db_mod = DistributedModel(m) fitOLSdb = db_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0.1}) assert_equal(np.sum(np.isnan(fitOLSdb.params)), 0) nv_mod = DistributedModel(m, estimation_method=_est_regularized_naive, join_method=_join_naive) fitOLSnv = nv_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0.1}) assert_equal(np.sum(np.isnan(fitOLSnv.params)), 0) def test_non_zero_params(): # tests that the thresholding does not cause any issues np.random.seed(435265) N = 200 p = 10 m = 5 beta = np.random.normal(size=p) beta = beta * np.random.randint(0, 2, p) X = np.random.normal(size=(N, p)) y = X.dot(beta) + np.random.normal(size=N) db_mod = DistributedModel(m, join_kwds={"threshold": 0.13}) fitOLSdb = db_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0.1}) ols_mod = OLS(y, X) fitOLS = ols_mod.fit_regularized(alpha=0.1) nz_params_db = 1 * (fitOLSdb.params != 0) nz_params_ols = 1 * (fitOLS.params != 0) assert_allclose(nz_params_db, nz_params_ols) def test_repeat_partition(): # tests that if we use identical partitions the average is the same # as the estimate for the full data np.random.seed(435265) N = 200 p = 10 m = 1 beta = np.random.normal(size=p) beta = beta * np.random.randint(0, 2, p) X = np.random.normal(size=(N, p)) y = X.dot(beta) + np.random.normal(size=N) def _rep_data_gen(endog, exog, partitions): """partitions data""" n_exog = exog.shape[0] n_part = np.ceil(n_exog / partitions) ii = 0 while ii < n_exog: yield endog, exog ii += int(n_part) nv_mod = DistributedModel(m, estimation_method=_est_regularized_naive, join_method=_join_naive) fitOLSnv = nv_mod.fit(_rep_data_gen(y, X, m), fit_kwds={"alpha": 0.1}) ols_mod = OLS(y, X) fitOLS = ols_mod.fit_regularized(alpha=0.1) assert_allclose(fitOLSnv.params, fitOLS.params) def test_debiased_v_average(): # tests that the debiased method performs better than the standard # average. Does this for both OLS and GLM. np.random.seed(435265) N = 200 p = 10 m = 4 beta = np.random.normal(size=p) beta = beta * np.random.randint(0, 2, p) X = np.random.normal(size=(N, p)) y = X.dot(beta) + np.random.normal(size=N) db_mod = DistributedModel(m) fitOLSdb = db_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0.2}) olsdb = np.linalg.norm(fitOLSdb.params - beta) n_mod = DistributedModel(m, estimation_method=_est_regularized_naive, join_method=_join_naive) fitOLSn = n_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0.2}) olsn = np.linalg.norm(fitOLSn.params - beta) assert_(olsdb < olsn) prob = 1 / (1 + np.exp(-X.dot(beta) + np.random.normal(size=N))) y = 1. * (prob > 0.5) db_mod = DistributedModel(m, model_class=GLM, init_kwds={"family": Binomial()}) fitGLMdb = db_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0.2}) glmdb = np.linalg.norm(fitGLMdb.params - beta) n_mod = DistributedModel(m, model_class=GLM, init_kwds={"family": Binomial()}, estimation_method=_est_regularized_naive, join_method=_join_naive) fitGLMn = n_mod.fit(_data_gen(y, X, m), fit_kwds={"alpha": 0.2}) glmn = np.linalg.norm(fitGLMn.params - beta) assert_(glmdb < glmn)