""" Tests for simulation smoothing Author: Chad Fulton License: Simplified-BSD """ import os import numpy as np from numpy.testing import assert_allclose, assert_equal import pandas as pd from statsmodels import datasets from statsmodels.tsa.statespace import mlemodel, sarimax, structural from statsmodels.tsa.statespace.simulation_smoother import ( SIMULATION_STATE, SIMULATION_DISTURBANCE, SIMULATION_ALL) current_path = os.path.dirname(os.path.abspath(__file__)) class MultivariateVARKnown(object): """ Tests for simulation smoothing values in a couple of special cases of variates. Both computed values and KFAS values are used for comparison against the simulation smoother output. """ @classmethod def setup_class(cls, missing=None, test_against_KFAS=True, *args, **kwargs): cls.test_against_KFAS = test_against_KFAS # Data dta = datasets.macrodata.load_pandas().data dta.index = pd.date_range(start='1959-01-01', end='2009-7-01', freq='QS') obs = np.log(dta[['realgdp', 'realcons', 'realinv']]).diff().iloc[1:] if missing == 'all': obs.iloc[0:50, :] = np.nan elif missing == 'partial': obs.iloc[0:50, 0] = np.nan elif missing == 'mixed': obs.iloc[0:50, 0] = np.nan obs.iloc[19:70, 1] = np.nan obs.iloc[39:90, 2] = np.nan obs.iloc[119:130, 0] = np.nan obs.iloc[119:130, 2] = np.nan obs.iloc[-10:, :] = np.nan if test_against_KFAS: obs = obs.iloc[:9] # Create the model mod = mlemodel.MLEModel(obs, k_states=3, k_posdef=3, **kwargs) mod['design'] = np.eye(3) mod['obs_cov'] = np.array([ [0.0000640649, 0., 0.], [0., 0.0000572802, 0.], [0., 0., 0.0017088585]]) mod['transition'] = np.array([ [-0.1119908792, 0.8441841604, 0.0238725303], [0.2629347724, 0.4996718412, -0.0173023305], [-3.2192369082, 4.1536028244, 0.4514379215]]) mod['selection'] = np.eye(3) mod['state_cov'] = np.array([ [0.0000640649, 0.0000388496, 0.0002148769], [0.0000388496, 0.0000572802, 0.000001555], [0.0002148769, 0.000001555, 0.0017088585]]) mod.initialize_approximate_diffuse(1e6) mod.ssm.filter_univariate = True cls.model = mod cls.results = mod.smooth([], return_ssm=True) cls.sim = cls.model.simulation_smoother() def test_loglike(self): assert_allclose(np.sum(self.results.llf_obs), self.true_llf) def test_simulate_0(self): n = 10 # Test with all inputs as zeros measurement_shocks = np.zeros((n, self.model.k_endog)) state_shocks = np.zeros((n, self.model.ssm.k_posdef)) initial_state = np.zeros(self.model.k_states) obs, states = self.model.ssm.simulate( nsimulations=n, measurement_shocks=measurement_shocks, state_shocks=state_shocks, initial_state=initial_state) assert_allclose(obs, np.zeros((n, self.model.k_endog))) assert_allclose(states, np.zeros((n, self.model.k_states))) def test_simulate_1(self): n = 10 # Test with np.arange / 10 measurement shocks only measurement_shocks = np.reshape( np.arange(n * self.model.k_endog) / 10., (n, self.model.k_endog)) state_shocks = np.zeros((n, self.model.ssm.k_posdef)) initial_state = np.zeros(self.model.k_states) obs, states = self.model.ssm.simulate( nsimulations=n, measurement_shocks=measurement_shocks, state_shocks=state_shocks, initial_state=initial_state) assert_allclose(obs, np.reshape( np.arange(n * self.model.k_endog) / 10., (n, self.model.k_endog))) assert_allclose(states, np.zeros((n, self.model.k_states))) def test_simulate_2(self): n = 10 Z = self.model['design'] T = self.model['transition'] # Test with non-zero state shocks and initial state measurement_shocks = np.zeros((n, self.model.k_endog)) state_shocks = np.ones((n, self.model.ssm.k_posdef)) initial_state = np.ones(self.model.k_states) * 2.5 obs, states = self.model.ssm.simulate( nsimulations=n, measurement_shocks=measurement_shocks, state_shocks=state_shocks, initial_state=initial_state) desired_obs = np.zeros((n, self.model.k_endog)) desired_state = np.zeros((n, self.model.k_states)) desired_state[0] = initial_state desired_obs[0] = np.dot(Z, initial_state) for i in range(1, n): desired_state[i] = np.dot(T, desired_state[i-1]) + state_shocks[i] desired_obs[i] = np.dot(Z, desired_state[i]) assert_allclose(obs, desired_obs) assert_allclose(states, desired_state) def test_simulation_smoothing_0(self): # Simulation smoothing when setting all variates to zeros # In this case: # - unconditional disturbances are zero, because they are simply # transformed to have the appropriate variance matrix, but keep the # same mean - of zero # - generated states are zeros, because initial state is # zeros and all state disturbances are zeros # - generated observations are zeros, because states are zeros and all # measurement disturbances are zeros # - The simulated state is equal to the smoothed state from the # original model, because # simulated state = (generated state - smoothed generated state + # smoothed state) # and here generated state = smoothed generated state = 0 # - The simulated measurement disturbance is equal to the smoothed # measurement disturbance for very similar reasons, because # simulated measurement disturbance = ( # generated measurement disturbance - # smoothed generated measurement disturbance + # smoothed measurement disturbance) # and here generated measurement disturbance and # smoothed generated measurement disturbance are zero. # - The simulated state disturbance is equal to the smoothed # state disturbance for exactly the same reason as above. sim = self.sim Z = self.model['design'] n_disturbance_variates = ( (self.model.k_endog + self.model.ssm.k_posdef) * self.model.nobs) # Test against known quantities (see above for description) sim.simulate(disturbance_variates=np.zeros(n_disturbance_variates), initial_state_variates=np.zeros(self.model.k_states)) assert_allclose(sim.generated_measurement_disturbance, 0) assert_allclose(sim.generated_state_disturbance, 0) assert_allclose(sim.generated_state, 0) assert_allclose(sim.generated_obs, 0) assert_allclose(sim.simulated_state, self.results.smoothed_state) if not self.model.ssm.filter_collapsed: assert_allclose(sim.simulated_measurement_disturbance, self.results.smoothed_measurement_disturbance) assert_allclose(sim.simulated_state_disturbance, self.results.smoothed_state_disturbance) # Test against R package KFAS values if self.test_against_KFAS: path = os.path.join(current_path, 'results', 'results_simulation_smoothing0.csv') true = pd.read_csv(path) assert_allclose(sim.simulated_state, true[['state1', 'state2', 'state3']].T, atol=1e-7) assert_allclose(sim.simulated_measurement_disturbance, true[['eps1', 'eps2', 'eps3']].T, atol=1e-7) assert_allclose(sim.simulated_state_disturbance, true[['eta1', 'eta2', 'eta3']].T, atol=1e-7) signals = np.zeros((3, self.model.nobs)) for t in range(self.model.nobs): signals[:, t] = np.dot(Z, sim.simulated_state[:, t]) assert_allclose(signals, true[['signal1', 'signal2', 'signal3']].T, atol=1e-7) def test_simulation_smoothing_1(self): # Test with measurement disturbance as np.arange / 10., all other # disturbances are zeros sim = self.sim Z = self.model['design'] # Construct the variates measurement_disturbance_variates = np.reshape( np.arange(self.model.nobs * self.model.k_endog) / 10., (self.model.nobs, self.model.k_endog)) disturbance_variates = np.r_[ measurement_disturbance_variates.ravel(), np.zeros(self.model.nobs * self.model.ssm.k_posdef)] # Compute some additional known quantities generated_measurement_disturbance = np.zeros( measurement_disturbance_variates.shape) chol = np.linalg.cholesky(self.model['obs_cov']) for t in range(self.model.nobs): generated_measurement_disturbance[t] = np.dot( chol, measurement_disturbance_variates[t]) generated_model = mlemodel.MLEModel( generated_measurement_disturbance, k_states=self.model.k_states, k_posdef=self.model.ssm.k_posdef) for name in ['design', 'obs_cov', 'transition', 'selection', 'state_cov']: generated_model[name] = self.model[name] generated_model.initialize_approximate_diffuse(1e6) generated_model.ssm.filter_univariate = True generated_res = generated_model.ssm.smooth() simulated_state = ( 0 - generated_res.smoothed_state + self.results.smoothed_state) if not self.model.ssm.filter_collapsed: simulated_measurement_disturbance = ( generated_measurement_disturbance.T - generated_res.smoothed_measurement_disturbance + self.results.smoothed_measurement_disturbance) simulated_state_disturbance = ( 0 - generated_res.smoothed_state_disturbance + self.results.smoothed_state_disturbance) # Test against known values sim.simulate(disturbance_variates=disturbance_variates, initial_state_variates=np.zeros(self.model.k_states)) assert_allclose(sim.generated_measurement_disturbance, generated_measurement_disturbance) assert_allclose(sim.generated_state_disturbance, 0) assert_allclose(sim.generated_state, 0) assert_allclose(sim.generated_obs, generated_measurement_disturbance.T) assert_allclose(sim.simulated_state, simulated_state) if not self.model.ssm.filter_collapsed: assert_allclose(sim.simulated_measurement_disturbance, simulated_measurement_disturbance) assert_allclose(sim.simulated_state_disturbance, simulated_state_disturbance) # Test against R package KFAS values if self.test_against_KFAS: path = os.path.join(current_path, 'results', 'results_simulation_smoothing1.csv') true = pd.read_csv(path) assert_allclose(sim.simulated_state, true[['state1', 'state2', 'state3']].T, atol=1e-7) assert_allclose(sim.simulated_measurement_disturbance, true[['eps1', 'eps2', 'eps3']].T, atol=1e-7) assert_allclose(sim.simulated_state_disturbance, true[['eta1', 'eta2', 'eta3']].T, atol=1e-7) signals = np.zeros((3, self.model.nobs)) for t in range(self.model.nobs): signals[:, t] = np.dot(Z, sim.simulated_state[:, t]) assert_allclose(signals, true[['signal1', 'signal2', 'signal3']].T, atol=1e-7) def test_simulation_smoothing_2(self): # Test with measurement and state disturbances as np.arange / 10., # initial state variates are zeros. sim = self.sim Z = self.model['design'] T = self.model['transition'] # Construct the variates measurement_disturbance_variates = np.reshape( np.arange(self.model.nobs * self.model.k_endog) / 10., (self.model.nobs, self.model.k_endog)) state_disturbance_variates = np.reshape( np.arange(self.model.nobs * self.model.ssm.k_posdef) / 10., (self.model.nobs, self.model.ssm.k_posdef)) disturbance_variates = np.r_[ measurement_disturbance_variates.ravel(), state_disturbance_variates.ravel()] initial_state_variates = np.zeros(self.model.k_states) # Compute some additional known quantities generated_measurement_disturbance = np.zeros( measurement_disturbance_variates.shape) chol = np.linalg.cholesky(self.model['obs_cov']) for t in range(self.model.nobs): generated_measurement_disturbance[t] = np.dot( chol, measurement_disturbance_variates[t]) generated_state_disturbance = np.zeros( state_disturbance_variates.shape) chol = np.linalg.cholesky(self.model['state_cov']) for t in range(self.model.nobs): generated_state_disturbance[t] = np.dot( chol, state_disturbance_variates[t]) generated_obs = np.zeros((self.model.k_endog, self.model.nobs)) generated_state = np.zeros((self.model.k_states, self.model.nobs+1)) chol = np.linalg.cholesky(self.results.initial_state_cov) generated_state[:, 0] = ( self.results.initial_state + np.dot(chol, initial_state_variates)) for t in range(self.model.nobs): generated_state[:, t+1] = (np.dot(T, generated_state[:, t]) + generated_state_disturbance.T[:, t]) generated_obs[:, t] = (np.dot(Z, generated_state[:, t]) + generated_measurement_disturbance.T[:, t]) generated_model = mlemodel.MLEModel( generated_obs.T, k_states=self.model.k_states, k_posdef=self.model.ssm.k_posdef) for name in ['design', 'obs_cov', 'transition', 'selection', 'state_cov']: generated_model[name] = self.model[name] generated_model.initialize_approximate_diffuse(1e6) generated_model.ssm.filter_univariate = True generated_res = generated_model.ssm.smooth() simulated_state = ( generated_state[:, :-1] - generated_res.smoothed_state + self.results.smoothed_state) if not self.model.ssm.filter_collapsed: simulated_measurement_disturbance = ( generated_measurement_disturbance.T - generated_res.smoothed_measurement_disturbance + self.results.smoothed_measurement_disturbance) simulated_state_disturbance = ( generated_state_disturbance.T - generated_res.smoothed_state_disturbance + self.results.smoothed_state_disturbance) # Test against known values sim.simulate(disturbance_variates=disturbance_variates, initial_state_variates=np.zeros(self.model.k_states)) assert_allclose(sim.generated_measurement_disturbance, generated_measurement_disturbance) assert_allclose(sim.generated_state_disturbance, generated_state_disturbance) assert_allclose(sim.generated_state, generated_state) assert_allclose(sim.generated_obs, generated_obs) assert_allclose(sim.simulated_state, simulated_state) if not self.model.ssm.filter_collapsed: assert_allclose(sim.simulated_measurement_disturbance.T, simulated_measurement_disturbance.T) assert_allclose(sim.simulated_state_disturbance, simulated_state_disturbance) # Test against R package KFAS values if self.test_against_KFAS: path = os.path.join(current_path, 'results', 'results_simulation_smoothing2.csv') true = pd.read_csv(path) assert_allclose(sim.simulated_state.T, true[['state1', 'state2', 'state3']], atol=1e-7) assert_allclose(sim.simulated_measurement_disturbance, true[['eps1', 'eps2', 'eps3']].T, atol=1e-7) assert_allclose(sim.simulated_state_disturbance, true[['eta1', 'eta2', 'eta3']].T, atol=1e-7) signals = np.zeros((3, self.model.nobs)) for t in range(self.model.nobs): signals[:, t] = np.dot(Z, sim.simulated_state[:, t]) assert_allclose(signals, true[['signal1', 'signal2', 'signal3']].T, atol=1e-7) class TestMultivariateVARKnown(MultivariateVARKnown): @classmethod def setup_class(cls, *args, **kwargs): super(TestMultivariateVARKnown, cls).setup_class() cls.true_llf = 39.01246166 class TestMultivariateVARKnownMissingAll(MultivariateVARKnown): """ Notes ----- Cannot test against KFAS because they have a different behavior for missing entries. When an entry is missing, KFAS does not draw a simulation smoothed value for that entry, whereas we draw from the unconditional distribution. It appears there is nothing to definitively recommend one approach over the other, but it makes it difficult to line up the variates correctly in order to replicate results. """ @classmethod def setup_class(cls, *args, **kwargs): super(TestMultivariateVARKnownMissingAll, cls).setup_class( missing='all', test_against_KFAS=False) cls.true_llf = 1305.739288 class TestMultivariateVARKnownMissingPartial(MultivariateVARKnown): @classmethod def setup_class(cls, *args, **kwargs): super(TestMultivariateVARKnownMissingPartial, cls).setup_class( missing='partial', test_against_KFAS=False) cls.true_llf = 1518.449598 class TestMultivariateVARKnownMissingMixed(MultivariateVARKnown): @classmethod def setup_class(cls, *args, **kwargs): super(TestMultivariateVARKnownMissingMixed, cls).setup_class( missing='mixed', test_against_KFAS=False) cls.true_llf = 1117.265303 class TestDFM(TestMultivariateVARKnown): test_against_KFAS = False @classmethod def setup_class(cls, which='none', *args, **kwargs): # Data dta = datasets.macrodata.load_pandas().data dta.index = pd.date_range(start='1959-01-01', end='2009-7-01', freq='QS') levels = dta[['realgdp', 'realcons', 'realinv']] obs = np.log(levels).diff().iloc[1:] * 400 if which == 'all': obs.iloc[:50, :] = np.nan obs.iloc[119:130, :] = np.nan elif which == 'partial': obs.iloc[0:50, 0] = np.nan obs.iloc[119:130, 0] = np.nan elif which == 'mixed': obs.iloc[0:50, 0] = np.nan obs.iloc[19:70, 1] = np.nan obs.iloc[39:90, 2] = np.nan obs.iloc[119:130, 0] = np.nan obs.iloc[119:130, 2] = np.nan # Create the model with typical state space mod = mlemodel.MLEModel(obs, k_states=2, k_posdef=2, **kwargs) mod['design'] = np.array([[-32.47143586, 17.33779024], [-7.40264169, 1.69279859], [-209.04702853, 125.2879374]]) mod['obs_cov'] = np.diag( np.array([0.0622668, 1.95666886, 58.37473642])) mod['transition'] = np.array([[0.29935707, 0.33289005], [-0.7639868, 1.2844237]]) mod['selection'] = np.eye(2) mod['state_cov'] = np.array([[1.2, -0.25], [-0.25, 1.1]]) mod.initialize_approximate_diffuse(1e6) mod.ssm.filter_univariate = True mod.ssm.filter_collapsed = True cls.model = mod cls.results = mod.smooth([], return_ssm=True) cls.sim = cls.model.simulation_smoother() def test_loglike(self): pass class MultivariateVAR(object): """ More generic tests for simulation smoothing; use actual N(0,1) variates """ @classmethod def setup_class(cls, missing='none', *args, **kwargs): # Data dta = datasets.macrodata.load_pandas().data dta.index = pd.date_range(start='1959-01-01', end='2009-7-01', freq='QS') obs = np.log(dta[['realgdp', 'realcons', 'realinv']]).diff().iloc[1:] if missing == 'all': obs.iloc[0:50, :] = np.nan elif missing == 'partial': obs.iloc[0:50, 0] = np.nan elif missing == 'mixed': obs.iloc[0:50, 0] = np.nan obs.iloc[19:70, 1] = np.nan obs.iloc[39:90, 2] = np.nan obs.iloc[119:130, 0] = np.nan obs.iloc[119:130, 2] = np.nan obs.iloc[-10:, :] = np.nan # Create the model mod = mlemodel.MLEModel(obs, k_states=3, k_posdef=3, **kwargs) mod['design'] = np.eye(3) mod['obs_cov'] = np.array([ [0.0000640649, 0., 0.], [0., 0.0000572802, 0.], [0., 0., 0.0017088585]]) mod['transition'] = np.array([ [-0.1119908792, 0.8441841604, 0.0238725303], [0.2629347724, 0.4996718412, -0.0173023305], [-3.2192369082, 4.1536028244, 0.4514379215]]) mod['selection'] = np.eye(3) mod['state_cov'] = np.array([ [0.0000640649, 0.0000388496, 0.0002148769], [0.0000388496, 0.0000572802, 0.000001555], [0.0002148769, 0.000001555, 0.0017088585]]) mod.initialize_approximate_diffuse(1e6) mod.ssm.filter_univariate = True cls.model = mod cls.results = mod.smooth([], return_ssm=True) cls.sim = cls.model.simulation_smoother() def test_loglike(self): assert_allclose(np.sum(self.results.llf_obs), self.true_llf) def test_simulation_smoothing(self): sim = self.sim Z = self.model['design'] # Simulate with known variates sim.simulate(disturbance_variates=self.variates[:-3], initial_state_variates=self.variates[-3:]) # Test against R package KFAS values assert_allclose(sim.simulated_state.T, self.true[['state1', 'state2', 'state3']], atol=1e-7) assert_allclose(sim.simulated_measurement_disturbance, self.true[['eps1', 'eps2', 'eps3']].T, atol=1e-7) assert_allclose(sim.simulated_state_disturbance, self.true[['eta1', 'eta2', 'eta3']].T, atol=1e-7) signals = np.zeros((3, self.model.nobs)) for t in range(self.model.nobs): signals[:, t] = np.dot(Z, sim.simulated_state[:, t]) assert_allclose(signals, self.true[['signal1', 'signal2', 'signal3']].T, atol=1e-7) class TestMultivariateVAR(MultivariateVAR): @classmethod def setup_class(cls): super(TestMultivariateVAR, cls).setup_class() path = os.path.join(current_path, 'results', 'results_simulation_smoothing3_variates.csv') cls.variates = pd.read_csv(path).values.squeeze() path = os.path.join(current_path, 'results', 'results_simulation_smoothing3.csv') cls.true = pd.read_csv(path) cls.true_llf = 1695.34872 def test_misc(): # Create the model and simulation smoother dta = datasets.macrodata.load_pandas().data dta.index = pd.date_range(start='1959-01-01', end='2009-7-01', freq='QS') obs = np.log(dta[['realgdp', 'realcons', 'realinv']]).diff().iloc[1:] mod = sarimax.SARIMAX(obs['realgdp'], order=(1, 0, 0)) mod['design', 0, 0] = 0. mod['obs_cov', 0, 0] = 1. mod.update(np.r_[1., 1.]) sim = mod.simulation_smoother() # Test that the simulation smoother is drawing variates correctly np.random.seed(1234) n_disturbance_variates = mod.nobs * (mod.k_endog + mod.k_states) variates = np.random.normal(size=n_disturbance_variates) np.random.seed(1234) sim.simulate() assert_allclose(sim.generated_measurement_disturbance[:, 0], variates[:mod.nobs]) assert_allclose(sim.generated_state_disturbance[:, 0], variates[mod.nobs:]) # Test that we can change the options of the simulations smoother assert_equal(sim.simulation_output, mod.ssm.smoother_output) sim.simulation_output = 0 assert_equal(sim.simulation_output, 0) sim.simulate_state = True assert_equal(sim.simulation_output, SIMULATION_STATE) sim.simulate_state = False assert_equal(sim.simulation_output, 0) sim.simulate_disturbance = True assert_equal(sim.simulation_output, SIMULATION_DISTURBANCE) sim.simulate_disturbance = False assert_equal(sim.simulation_output, 0) sim.simulate_all = True assert_equal(sim.simulation_output, SIMULATION_ALL) sim.simulate_all = False assert_equal(sim.simulation_output, 0) def test_simulation_smoothing_obs_intercept(): nobs = 10 intercept = 100 endog = np.ones(nobs) * intercept mod = structural.UnobservedComponents(endog, 'rwalk', exog=np.ones(nobs)) mod.update([1, intercept]) sim = mod.simulation_smoother() sim.simulate(disturbance_variates=np.zeros(mod.nobs * 2), initial_state_variates=np.zeros(1)) assert_equal(sim.simulated_state[0], 0) def test_simulation_smoothing_state_intercept(): nobs = 10 intercept = 100 endog = np.ones(nobs) * intercept mod = sarimax.SARIMAX(endog, order=(0, 0, 0), trend='c', measurement_error=True) mod.initialize_known([100], [[0]]) mod.update([intercept, 1., 1.]) sim = mod.simulation_smoother() sim.simulate(disturbance_variates=np.zeros(mod.nobs * 2), initial_state_variates=np.zeros(1)) assert_equal(sim.simulated_state[0], intercept) def test_simulation_smoothing_state_intercept_diffuse(): nobs = 10 intercept = 100 endog = np.ones(nobs) * intercept # Test without missing values mod = sarimax.SARIMAX(endog, order=(0, 0, 0), trend='c', measurement_error=True, initialization='diffuse') mod.update([intercept, 1., 1.]) sim = mod.simulation_smoother() sim.simulate(disturbance_variates=np.zeros(mod.nobs * 2), initial_state_variates=np.zeros(1)) assert_equal(sim.simulated_state[0], intercept) # Test with missing values endog[5] = np.nan mod = sarimax.SARIMAX(endog, order=(0, 0, 0), trend='c', measurement_error=True, initialization='diffuse') mod.update([intercept, 1., 1.]) sim = mod.simulation_smoother() sim.simulate(disturbance_variates=np.zeros(mod.nobs * 2), initial_state_variates=np.zeros(1)) assert_equal(sim.simulated_state[0], intercept)