""" Tests for miscellaneous models Author: Chad Fulton License: Simplified-BSD """ import numpy as np import pandas as pd import os import pytest from statsmodels.tsa.statespace import mlemodel, sarimax from statsmodels import datasets from numpy.testing import assert_equal, assert_allclose, assert_raises current_path = os.path.dirname(os.path.abspath(__file__)) class Intercepts(mlemodel.MLEModel): """ Test class for observation and state intercepts (which usually do not get tested in other models). """ def __init__(self, endog, **kwargs): k_states = 3 k_posdef = 3 super(Intercepts, self).__init__( endog, k_states=k_states, k_posdef=k_posdef, **kwargs) self['design'] = np.eye(3) self['obs_cov'] = np.eye(3) self['transition'] = np.eye(3) self['selection'] = np.eye(3) self['state_cov'] = np.eye(3) self.initialize_approximate_diffuse() @property def param_names(self): return ['d.1', 'd.2', 'd.3', 'c.1', 'c.2', 'c.3'] @property def start_params(self): return np.arange(6) def update(self, params, **kwargs): params = super(Intercepts, self).update(params, **kwargs) self['obs_intercept'] = params[:3] self['state_intercept'] = params[3:] class TestIntercepts(object): @classmethod def setup_class(cls, which='mixed', **kwargs): # Results path = current_path + os.sep + 'results/results_intercepts_R.csv' cls.desired = pd.read_csv(path) # Data dta = datasets.macrodata.load_pandas().data dta.index = pd.date_range(start='1959-01-01', end='2009-7-01', freq='QS') obs = dta[['realgdp', 'realcons', 'realinv']].copy() obs = obs / obs.std() if which == 'all': obs.iloc[:50, :] = np.nan obs.iloc[119:130, :] = np.nan elif which == 'partial': obs.iloc[0:50, 0] = np.nan obs.iloc[119:130, 0] = np.nan elif which == 'mixed': obs.iloc[0:50, 0] = np.nan obs.iloc[19:70, 1] = np.nan obs.iloc[39:90, 2] = np.nan obs.iloc[119:130, 0] = np.nan obs.iloc[119:130, 2] = np.nan mod = Intercepts(obs, **kwargs) cls.params = np.arange(6) + 1 cls.model = mod cls.results = mod.smooth(cls.params, return_ssm=True) # Calculate the determinant of the covariance matrices (for easy # comparison to other languages without having to store 2-dim arrays) cls.results.det_scaled_smoothed_estimator_cov = ( np.zeros((1, cls.model.nobs))) cls.results.det_predicted_state_cov = np.zeros((1, cls.model.nobs)) cls.results.det_smoothed_state_cov = np.zeros((1, cls.model.nobs)) cls.results.det_smoothed_state_disturbance_cov = ( np.zeros((1, cls.model.nobs))) for i in range(cls.model.nobs): cls.results.det_scaled_smoothed_estimator_cov[0, i] = ( np.linalg.det( cls.results.scaled_smoothed_estimator_cov[:, :, i])) cls.results.det_predicted_state_cov[0, i] = np.linalg.det( cls.results.predicted_state_cov[:, :, i+1]) cls.results.det_smoothed_state_cov[0, i] = np.linalg.det( cls.results.smoothed_state_cov[:, :, i]) cls.results.det_smoothed_state_disturbance_cov[0, i] = ( np.linalg.det( cls.results.smoothed_state_disturbance_cov[:, :, i])) def test_loglike(self): assert_allclose(np.sum(self.results.llf_obs), -7924.03893566) def test_scaled_smoothed_estimator(self): assert_allclose( self.results.scaled_smoothed_estimator.T, self.desired[['r1', 'r2', 'r3']] ) def test_scaled_smoothed_estimator_cov(self): assert_allclose( self.results.det_scaled_smoothed_estimator_cov.T, self.desired[['detN']] ) def test_forecasts(self): assert_allclose( self.results.forecasts.T, self.desired[['m1', 'm2', 'm3']] ) def test_forecasts_error(self): assert_allclose( self.results.forecasts_error.T, self.desired[['v1', 'v2', 'v3']] ) def test_forecasts_error_cov(self): assert_allclose( self.results.forecasts_error_cov.diagonal(), self.desired[['F1', 'F2', 'F3']] ) def test_predicted_states(self): assert_allclose( self.results.predicted_state[:, 1:].T, self.desired[['a1', 'a2', 'a3']] ) def test_predicted_states_cov(self): assert_allclose( self.results.det_predicted_state_cov.T, self.desired[['detP']] ) def test_smoothed_states(self): assert_allclose( self.results.smoothed_state.T, self.desired[['alphahat1', 'alphahat2', 'alphahat3']] ) def test_smoothed_states_cov(self): assert_allclose( self.results.det_smoothed_state_cov.T, self.desired[['detV']] ) def test_smoothed_forecasts(self): assert_allclose( self.results.smoothed_forecasts.T, self.desired[['muhat1', 'muhat2', 'muhat3']] ) def test_smoothed_state_disturbance(self): assert_allclose( self.results.smoothed_state_disturbance.T, self.desired[['etahat1', 'etahat2', 'etahat3']] ) def test_smoothed_state_disturbance_cov(self): assert_allclose( self.results.det_smoothed_state_disturbance_cov.T, self.desired[['detVeta']] ) def test_smoothed_measurement_disturbance(self): assert_allclose( self.results.smoothed_measurement_disturbance.T, self.desired[['epshat1', 'epshat2', 'epshat3']], atol=1e-9 ) def test_smoothed_measurement_disturbance_cov(self): assert_allclose( self.results.smoothed_measurement_disturbance_cov.diagonal(), self.desired[['Veps1', 'Veps2', 'Veps3']] ) class LargeStateCovAR1(mlemodel.MLEModel): """ Test class for k_posdef > k_states (which usually do not get tested in other models). This is just an AR(1) model with an extra unused state innovation """ def __init__(self, endog, **kwargs): k_states = 1 k_posdef = 2 super(LargeStateCovAR1, self).__init__( endog, k_states=k_states, k_posdef=k_posdef, **kwargs) self['design', 0, 0] = 1 self['selection', 0, 0] = 1 self['state_cov', 1, 1] = 1 self.initialize_stationary() @property def param_names(self): return ['phi', 'sigma2'] @property def start_params(self): return [0.5, 1] def update(self, params, **kwargs): params = super(LargeStateCovAR1, self).update(params, **kwargs) self['transition', 0, 0] = params[0] self['state_cov', 0, 0] = params[1] def test_large_kposdef(): assert_raises(ValueError, LargeStateCovAR1, np.arange(10)) class TestLargeStateCovAR1(object): @classmethod def setup_class(cls): pytest.skip( 'TODO: This test is skipped since an exception is currently ' 'raised if k_posdef > k_states. However, this test could be ' 'used if models of those types were allowed' ) # Data: just some sample data endog = [0.2, -1.5, -.3, -.1, 1.5, 0.2, -0.3, 0.2, 0.5, 0.8] # Params params = [0.5, 1] # Desired model: AR(1) mod_desired = sarimax.SARIMAX(endog) cls.res_desired = mod_desired.smooth(params) # Test class mod = LargeStateCovAR1(endog) cls.res = mod.smooth(params) def test_dimensions(self): assert_equal(self.res.filter_results.k_states, 1) assert_equal(self.res.filter_results.k_posdef, 2) assert_equal(self.res.smoothed_state_disturbance.shape, (2, 10)) assert_equal(self.res_desired.filter_results.k_states, 1) assert_equal(self.res_desired.filter_results.k_posdef, 1) assert_equal(self.res_desired.smoothed_state_disturbance.shape, (1, 10)) def test_loglike(self): assert_allclose(self.res.llf_obs, self.res_desired.llf_obs) def test_scaled_smoothed_estimator(self): assert_allclose(self.res.scaled_smoothed_estimator[0], self.res_desired.scaled_smoothed_estimator[0]) def test_scaled_smoothed_estimator_cov(self): assert_allclose(self.res.scaled_smoothed_estimator_cov[0], self.res_desired.scaled_smoothed_estimator_cov[0]) def test_forecasts(self): assert_allclose(self.res.forecasts, self.res_desired.forecasts) def test_forecasts_error(self): assert_allclose(self.res.forecasts_error, self.res_desired.forecasts_error) def test_forecasts_error_cov(self): assert_allclose(self.res.forecasts_error_cov, self.res_desired.forecasts_error_cov) def test_predicted_states(self): assert_allclose(self.res.predicted_state[0], self.res_desired.predicted_state[0]) def test_predicted_states_cov(self): assert_allclose(self.res.predicted_state_cov[0, 0], self.res_desired.predicted_state_cov[0, 0]) def test_smoothed_states(self): assert_allclose(self.res.smoothed_state[0], self.res_desired.smoothed_state[0]) def test_smoothed_states_cov(self): assert_allclose(self.res.smoothed_state_cov[0, 0], self.res_desired.smoothed_state_cov[0, 0]) def test_smoothed_state_disturbance(self): assert_allclose(self.res.smoothed_state_disturbance[0], self.res_desired.smoothed_state_disturbance[0]) assert_allclose(self.res.smoothed_state_disturbance[1], 0) def test_smoothed_state_disturbance_cov(self): assert_allclose(self.res.smoothed_state_disturbance_cov[0, 0], self.res_desired.smoothed_state_disturbance_cov[0, 0]) assert_allclose(self.res.smoothed_state_disturbance[1, 1], 0) def test_smoothed_measurement_disturbance(self): assert_allclose(self.res.smoothed_measurement_disturbance, self.res_desired.smoothed_measurement_disturbance) def test_smoothed_measurement_disturbance_cov(self): assert_allclose(self.res.smoothed_measurement_disturbance_cov, self.res_desired.smoothed_measurement_disturbance_cov)