""" State Space Representation, Kalman Filter, Smoother, and Simulation Smoother Author: Chad Fulton License: Simplified-BSD """ import numpy as np from .kalman_smoother import KalmanSmoother from .cfa_simulation_smoother import CFASimulationSmoother from . import tools SIMULATION_STATE = 0x01 SIMULATION_DISTURBANCE = 0x04 SIMULATION_ALL = ( SIMULATION_STATE | SIMULATION_DISTURBANCE ) class SimulationSmoother(KalmanSmoother): r""" State space representation of a time series process, with Kalman filter and smoother, and with simulation smoother. Parameters ---------- k_endog : {array_like, int} The observed time-series process :math:`y` if array like or the number of variables in the process if an integer. k_states : int The dimension of the unobserved state process. k_posdef : int, optional The dimension of a guaranteed positive definite covariance matrix describing the shocks in the measurement equation. Must be less than or equal to `k_states`. Default is `k_states`. simulation_smooth_results_class : class, optional Default results class to use to save output of simulation smoothing. Default is `SimulationSmoothResults`. If specified, class must extend from `SimulationSmoothResults`. simulation_smoother_classes : dict, optional Dictionary with BLAS prefixes as keys and classes as values. **kwargs Keyword arguments may be used to provide default values for state space matrices, for Kalman filtering options, for Kalman smoothing options, or for Simulation smoothing options. See `Representation`, `KalmanFilter`, and `KalmanSmoother` for more details. """ simulation_outputs = [ 'simulate_state', 'simulate_disturbance', 'simulate_all' ] def __init__(self, k_endog, k_states, k_posdef=None, simulation_smooth_results_class=None, simulation_smoother_classes=None, **kwargs): super(SimulationSmoother, self).__init__( k_endog, k_states, k_posdef, **kwargs ) if simulation_smooth_results_class is None: simulation_smooth_results_class = SimulationSmoothResults self.simulation_smooth_results_class = simulation_smooth_results_class self.prefix_simulation_smoother_map = ( simulation_smoother_classes if simulation_smoother_classes is not None else tools.prefix_simulation_smoother_map.copy()) # Holder for an model-level simulation smoother objects, to use in # simulating new time series. self._simulators = {} def get_simulation_output(self, simulation_output=None, simulate_state=None, simulate_disturbance=None, simulate_all=None, **kwargs): r""" Get simulation output bitmask Helper method to get final simulation output bitmask from a set of optional arguments including the bitmask itself and possibly boolean flags. Parameters ---------- simulation_output : int, optional Simulation output bitmask. If this is specified, it is simply returned and the other arguments are ignored. simulate_state : bool, optional Whether or not to include the state in the simulation output. simulate_disturbance : bool, optional Whether or not to include the state and observation disturbances in the simulation output. simulate_all : bool, optional Whether or not to include all simulation output. \*\*kwargs Additional keyword arguments. Present so that calls to this method can use \*\*kwargs without clearing out additional arguments. """ # If we do not explicitly have simulation_output, try to get it from # kwargs if simulation_output is None: simulation_output = 0 if simulate_state: simulation_output |= SIMULATION_STATE if simulate_disturbance: simulation_output |= SIMULATION_DISTURBANCE if simulate_all: simulation_output |= SIMULATION_ALL # Handle case of no information in kwargs if simulation_output == 0: # If some arguments were passed, but we still do not have any # simulation output, raise an exception argument_set = not all([ simulate_state is None, simulate_disturbance is None, simulate_all is None ]) if argument_set: raise ValueError("Invalid simulation output options:" " given options would result in no" " output.") # Otherwise set simulation output to be the same as smoother # output simulation_output = self.smoother_output return simulation_output def _simulate(self, nsimulations, measurement_shocks, state_shocks, initial_state): # Initialize the filter and representation prefix, dtype, create_smoother, create_filter, create_statespace = ( self._initialize_smoother()) # Initialize the state self._initialize_state(prefix=prefix) # Create the simulator if necessary if (prefix not in self._simulators or not nsimulations == self._simulators[prefix].nobs): simulation_output = 0 # Kalman smoother parameters smoother_output = -1 # Kalman filter parameters filter_method = self.filter_method inversion_method = self.inversion_method stability_method = self.stability_method conserve_memory = self.conserve_memory filter_timing = self.filter_timing loglikelihood_burn = self.loglikelihood_burn tolerance = self.tolerance # Create a new simulation smoother object cls = self.prefix_simulation_smoother_map[prefix] self._simulators[prefix] = cls( self._statespaces[prefix], filter_method, inversion_method, stability_method, conserve_memory, filter_timing, tolerance, loglikelihood_burn, smoother_output, simulation_output, nsimulations ) simulator = self._simulators[prefix] # Set the disturbance variates if measurement_shocks is not None and state_shocks is not None: disturbance_variates = np.atleast_1d(np.array( np.r_[measurement_shocks.ravel(), state_shocks.ravel()], dtype=self.dtype ).squeeze()) simulator.set_disturbance_variates(disturbance_variates, pretransformed=True) elif measurement_shocks is None and state_shocks is None: pass elif measurement_shocks is not None: raise ValueError('Must set `state_shocks` if `measurement_shocks`' ' is set.') elif state_shocks is not None: raise ValueError('Must set `measurement_shocks` if `state_shocks`' ' is set.') # Set the intial state vector initial_state = np.atleast_1d(np.array( initial_state, dtype=self.dtype ).squeeze()) simulator.set_initial_state(initial_state) # Perform simulation smoothing # Note: simulation_output=-1 corresponds to whatever was setup when # the simulation smoother was constructed simulator.simulate(-1) simulated_obs = np.array(simulator.generated_obs, copy=True) simulated_state = np.array(simulator.generated_state, copy=True) return ( simulated_obs[:, :nsimulations].T, simulated_state[:, :nsimulations].T ) def simulation_smoother(self, simulation_output=None, method='kfs', results_class=None, prefix=None, **kwargs): r""" Retrieve a simulation smoother for the statespace model. Parameters ---------- simulation_output : int, optional Determines which simulation smoother output is calculated. Default is all (including state and disturbances). method : {'kfs', 'cfa'}, optional Method for simulation smoothing. If `method='kfs'`, then the simulation smoother is based on Kalman filtering and smoothing recursions. If `method='cfa'`, then the simulation smoother is based on the Cholesky Factor Algorithm (CFA) approach. The CFA approach is not applicable to all state space models, but can be faster for the cases in which it is supported. simulation_smooth_results_class : class, optional Default results class to use to save output of simulation smoothing. Default is `SimulationSmoothResults`. If specified, class must extend from `SimulationSmoothResults`. prefix : str The prefix of the datatype. Usually only used internally. **kwargs Additional keyword arguments, used to set the simulation output. See `set_simulation_output` for more details. Returns ------- SimulationSmoothResults """ method = method.lower() # Short-circuit for CFA if method == 'cfa': if simulation_output not in [None, 1, -1]: raise ValueError('Can only retrieve simulations of the state' ' vector using the CFA simulation smoother.') return CFASimulationSmoother(self) elif method != 'kfs': raise ValueError('Invalid simulation smoother method "%s". Valid' ' methods are "kfs" or "cfa".' % method) # Set the class to be the default results class, if None provided if results_class is None: results_class = self.simulation_smooth_results_class # Instantiate a new results object if not issubclass(results_class, SimulationSmoothResults): raise ValueError('Invalid results class provided.') # Make sure we have the required Statespace representation prefix, dtype, create_smoother, create_filter, create_statespace = ( self._initialize_smoother()) # Simulation smoother parameters simulation_output = self.get_simulation_output(simulation_output, **kwargs) # Kalman smoother parameters smoother_output = kwargs.get('smoother_output', simulation_output) # Kalman filter parameters filter_method = kwargs.get('filter_method', self.filter_method) inversion_method = kwargs.get('inversion_method', self.inversion_method) stability_method = kwargs.get('stability_method', self.stability_method) conserve_memory = kwargs.get('conserve_memory', self.conserve_memory) filter_timing = kwargs.get('filter_timing', self.filter_timing) loglikelihood_burn = kwargs.get('loglikelihood_burn', self.loglikelihood_burn) tolerance = kwargs.get('tolerance', self.tolerance) # Create a new simulation smoother object cls = self.prefix_simulation_smoother_map[prefix] simulation_smoother = cls( self._statespaces[prefix], filter_method, inversion_method, stability_method, conserve_memory, filter_timing, tolerance, loglikelihood_burn, smoother_output, simulation_output ) # Create results object results = results_class(self, simulation_smoother) return results class SimulationSmoothResults(object): r""" Results from applying the Kalman smoother and/or filter to a state space model. Parameters ---------- model : Representation A Statespace representation simulation_smoother : {{prefix}}SimulationSmoother object The Cython simulation smoother object with which to simulation smooth. Attributes ---------- model : Representation A Statespace representation dtype : dtype Datatype of representation matrices prefix : str BLAS prefix of representation matrices simulation_output : int Bitmask controlling simulation output. simulate_state : bool Flag for if the state is included in simulation output. simulate_disturbance : bool Flag for if the state and observation disturbances are included in simulation output. simulate_all : bool Flag for if simulation output should include everything. generated_measurement_disturbance : ndarray Measurement disturbance variates used to genereate the observation vector. generated_state_disturbance : ndarray State disturbance variates used to genereate the state and observation vectors. generated_obs : ndarray Generated observation vector produced as a byproduct of simulation smoothing. generated_state : ndarray Generated state vector produced as a byproduct of simulation smoothing. simulated_state : ndarray Simulated state. simulated_measurement_disturbance : ndarray Simulated measurement disturbance. simulated_state_disturbance : ndarray Simulated state disturbance. """ def __init__(self, model, simulation_smoother): self.model = model self.prefix = model.prefix self.dtype = model.dtype self._simulation_smoother = simulation_smoother # Output self._generated_measurement_disturbance = None self._generated_state_disturbance = None self._generated_obs = None self._generated_state = None self._simulated_state = None self._simulated_measurement_disturbance = None self._simulated_state_disturbance = None @property def simulation_output(self): return self._simulation_smoother.simulation_output @simulation_output.setter def simulation_output(self, value): self._simulation_smoother.simulation_output = value @property def simulate_state(self): return bool(self.simulation_output & SIMULATION_STATE) @simulate_state.setter def simulate_state(self, value): if bool(value): self.simulation_output = self.simulation_output | SIMULATION_STATE else: self.simulation_output = self.simulation_output & ~SIMULATION_STATE @property def simulate_disturbance(self): return bool(self.simulation_output & SIMULATION_DISTURBANCE) @simulate_disturbance.setter def simulate_disturbance(self, value): if bool(value): self.simulation_output = ( self.simulation_output | SIMULATION_DISTURBANCE) else: self.simulation_output = ( self.simulation_output & ~SIMULATION_DISTURBANCE) @property def simulate_all(self): return bool(self.simulation_output & SIMULATION_ALL) @simulate_all.setter def simulate_all(self, value): if bool(value): self.simulation_output = self.simulation_output | SIMULATION_ALL else: self.simulation_output = self.simulation_output & ~SIMULATION_ALL @property def generated_measurement_disturbance(self): r""" Randomly drawn measurement disturbance variates Used to construct `generated_obs`. Notes ----- .. math:: \varepsilon_t^+ ~ N(0, H_t) If `disturbance_variates` were provided to the `simulate()` method, then this returns those variates (which were N(0,1)) transformed to the distribution above. """ if self._generated_measurement_disturbance is None: end = self.model.nobs * self.model.k_endog self._generated_measurement_disturbance = np.array( self._simulation_smoother.disturbance_variates[:end], copy=True).reshape(self.model.nobs, self.model.k_endog) return self._generated_measurement_disturbance @property def generated_state_disturbance(self): r""" Randomly drawn state disturbance variates, used to construct `generated_state` and `generated_obs`. Notes ----- .. math:: \eta_t^+ ~ N(0, Q_t) If `disturbance_variates` were provided to the `simulate()` method, then this returns those variates (which were N(0,1)) transformed to the distribution above. """ if self._generated_state_disturbance is None: start = self.model.nobs * self.model.k_endog self._generated_state_disturbance = np.array( self._simulation_smoother.disturbance_variates[start:], copy=True).reshape(self.model.nobs, self.model.k_posdef) return self._generated_state_disturbance @property def generated_obs(self): r""" Generated vector of observations by iterating on the observation and transition equations, given a random initial state draw and random disturbance draws. Notes ----- .. math:: y_t^+ = d_t + Z_t \alpha_t^+ + \varepsilon_t^+ """ if self._generated_obs is None: self._generated_obs = np.array( self._simulation_smoother.generated_obs, copy=True ) return self._generated_obs @property def generated_state(self): r""" Generated vector of states by iterating on the transition equation, given a random initial state draw and random disturbance draws. Notes ----- .. math:: \alpha_{t+1}^+ = c_t + T_t \alpha_t^+ + \eta_t^+ """ if self._generated_state is None: self._generated_state = np.array( self._simulation_smoother.generated_state, copy=True ) return self._generated_state @property def simulated_state(self): r""" Random draw of the state vector from its conditional distribution. Notes ----- .. math:: \alpha ~ p(\alpha \mid Y_n) """ if self._simulated_state is None: self._simulated_state = np.array( self._simulation_smoother.simulated_state, copy=True ) return self._simulated_state @property def simulated_measurement_disturbance(self): r""" Random draw of the measurement disturbance vector from its conditional distribution. Notes ----- .. math:: \varepsilon ~ N(\hat \varepsilon, Var(\hat \varepsilon \mid Y_n)) """ if self._simulated_measurement_disturbance is None: self._simulated_measurement_disturbance = np.array( self._simulation_smoother.simulated_measurement_disturbance, copy=True ) return self._simulated_measurement_disturbance @property def simulated_state_disturbance(self): r""" Random draw of the state disturbance vector from its conditional distribution. Notes ----- .. math:: \eta ~ N(\hat \eta, Var(\hat \eta \mid Y_n)) """ if self._simulated_state_disturbance is None: self._simulated_state_disturbance = np.array( self._simulation_smoother.simulated_state_disturbance, copy=True ) return self._simulated_state_disturbance def simulate(self, simulation_output=-1, disturbance_variates=None, initial_state_variates=None, pretransformed_variates=False): r""" Perform simulation smoothing Does not return anything, but populates the object's `simulated_*` attributes, as specified by simulation output. Parameters ---------- simulation_output : int, optional Bitmask controlling simulation output. Default is to use the simulation output defined in object initialization. disturbance_variates : array_likes, optional Random values to use as disturbance variates, distributed standard Normal. Usually only specified if results are to be replicated (e.g. to enforce a seed) or for testing. If not specified, random variates are drawn. initial_state_variates : array_likes, optional Random values to use as initial state variates. Usually only specified if results are to be replicated (e.g. to enforce a seed) or for testing. If not specified, random variates are drawn. """ # Clear any previous output self._generated_measurement_disturbance = None self._generated_state_disturbance = None self._generated_state = None self._generated_obs = None self._generated_state = None self._simulated_state = None self._simulated_measurement_disturbance = None self._simulated_state_disturbance = None # Re-initialize the _statespace representation prefix, dtype, create_smoother, create_filter, create_statespace = ( self.model._initialize_smoother()) # Initialize the state self.model._initialize_state(prefix=prefix) # Draw the (independent) random variates for disturbances in the # simulation if disturbance_variates is not None: self._simulation_smoother.set_disturbance_variates( np.array(disturbance_variates, dtype=self.dtype), pretransformed=pretransformed_variates ) else: self._simulation_smoother.draw_disturbance_variates() # Draw the (independent) random variates for the initial states in the # simulation if initial_state_variates is not None: self._simulation_smoother.set_initial_state_variates( np.array(initial_state_variates, dtype=self.dtype), pretransformed=pretransformed_variates ) else: self._simulation_smoother.draw_initial_state_variates() # Perform simulation smoothing # Note: simulation_output=-1 corresponds to whatever was setup when # the simulation smoother was constructed self._simulation_smoother.simulate(simulation_output)