diff --git a/pyrecest/filters/hyperhemispherical_grid_filter.py b/pyrecest/filters/hyperhemispherical_grid_filter.py new file mode 100644 index 000000000..8378a6c83 --- /dev/null +++ b/pyrecest/filters/hyperhemispherical_grid_filter.py @@ -0,0 +1,83 @@ +import numpy as np +import warnings +from .abstract_grid_filter import AbstractGridFilter +from .abstract_hyperhemispherical_filter import AbstractHyperhemisphericalFilter +from pyrecest.distributions.hypersphere_subset.hyperhemispherical_grid_distribution import HyperhemisphericalGridDistribution +from pyrecest.distributions.conditional.sd_half_cond_sd_half_grid_distribution import SdHalfCondSdHalfGridDistribution +from pyrecest.distributions import BinghamDistribution +from pyrecest.distributions import WatsonDistribution +from pyrecest.distributions import VonMisesFisherDistribution +from pyrecest.distributions import HypersphericalMixture +from pyrecest.distributions import HyperhemisphericalUniformDistribution +from pyrecest.distributions import AbstractHyperhemisphericalDistribution +from pyrecest.distributions import HyperhemisphericalWatsonDistribution + +class HyperhemisphericalGridFilter(AbstractGridFilter, AbstractHyperhemisphericalFilter): + def __init__(self, no_of_coefficients, dim, grid_type='eq_point_set_symm'): + self.gd = HyperhemisphericalGridDistribution.from_distribution( + HyperhemisphericalUniformDistribution(dim), no_of_coefficients, grid_type) + + def set_state(self, new_state): + assert self.dim == new_state.dim + assert isinstance(new_state, AbstractHyperhemisphericalDistribution) + self.gd = new_state + + def predict_identity(self, d_sys): + assert isinstance(d_sys, AbstractHyperhemisphericalDistribution) + sd_half_cond_sd_half = HyperhemisphericalGridFilter.sys_noise_to_transition_density( + d_sys, self.gd.grid_values.shape[0]) + self.predict_nonlinear_via_transition_density(sd_half_cond_sd_half) + + def update_identity(self, meas_noise, z=None): + assert isinstance(meas_noise, AbstractHyperhemisphericalDistribution) + if not z==None: + measNoise = measNoise.setMode(z) + curr_grid = self.gd.get_grid() + self.gd = self.gd.multiply(HyperhemisphericalGridDistribution(curr_grid, 2 * meas_noise.pdf(curr_grid).T)) + + def update_nonlinear(self, likelihood, z): + self.gd.grid_values = self.gd.grid_values * likelihood(z, self.gd.get_grid()).T + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=RuntimeWarning) + self.gd = self.gd.normalize() + + def predict_nonlinear_via_transition_density(self, f_trans): + assert np.array_equal(self.gd.get_grid(), f_trans.get_grid()), \ + "fTrans is using an incompatible grid." + self.gd = self.gd.normalize() + grid_values_new = self.gd.get_manifold_size() / self.gd.grid_values.shape[0] * f_trans.grid_values.dot( + self.gd.grid_values) + self.gd = HyperhemisphericalGridDistribution(self.gd.get_grid(), grid_values_new) + + def get_point_estimate(self): + gd_full_sphere = self.gd.to_full_sphere() + p = BinghamDistribution.fit(gd_full_sphere.get_grid(), gd_full_sphere.grid_values.T / np.sum( + gd_full_sphere.grid_values)).mode() + if p[-1] < 0: + p = -p + return p + + @staticmethod + def sys_noise_to_transition_density(d_sys, no_grid_points): + if isinstance(d_sys, AbstractDistribution): + if isinstance(d_sys, (HyperhemisphericalWatsonDistribution, WatsonDistribution)): + def trans(xkk, xk): + return np.array([2 * WatsonDistribution(xk[:, i], d_sys.kappa).pdf(xkk) for i in range(xk.shape[1])]).T + + elif (isinstance(d_sys, HypersphericalMixture) and len(d_sys.dists) == 2 and + np.all(d_sys.w == 0.5) and np.array_equal(d_sys.dists[0].mu, -d_sys.dists[1].mu) and + d_sys.dists[0].kappa == d_sys.dists[1].kappa): + def trans(xkk, xk): + return np.array([(VonMisesFisherDistribution(xk[:, i], d_sys.dists[0].kappa).pdf(xkk) + + VonMisesFisherDistribution(xk[:, i], d_sys.dists[0].kappa).pdf(-xkk)) + for i in range(xk.shape[1])]).T + else: + raise ValueError("Distribution not supported for predict identity. Must be zonal (rotationally symmetric around last dimension)") + + print("PredictIdentity:Inefficient - Using inefficient prediction. Consider precalculating the SdHalfCondSdHalfGridDistribution and using predictNonlinearViaTransitionDensity.") + sd_half_cond_sd_half = SdHalfCondSdHalfGridDistribution.from_function(trans, no_grid_points, True, 'eq_point_set_symm', 2 * d_sys.dim) + return sd_half_cond_sd_half + + else: + raise TypeError("d_sys must be an instance of AbstractDistribution") + diff --git a/pyrecest/tests/filters/test_hyperhemispherical_grid_filter.py b/pyrecest/tests/filters/test_hyperhemispherical_grid_filter.py new file mode 100644 index 000000000..77463e1d0 --- /dev/null +++ b/pyrecest/tests/filters/test_hyperhemispherical_grid_filter.py @@ -0,0 +1,96 @@ +import unittest +import numpy as np +from pyrecest.filters.hyperhemispherical_grid_filter import HyperhemisphericalGridFilter +from pyrecest.distributions.hypersphere_subset.hyperhemispherical_grid_distribution import HyperhemisphericalGridDistribution +from pyrecest.distributions.conditional.sd_half_cond_sd_half_grid_distribution import SdHalfCondSdHalfGridDistribution +from pyrecest.distributions import BinghamDistribution, HypersphericalMixture, VonMisesFisherDistribution + +class HyperhemisphericalGridFilterTest(unittest.TestCase): + def test_set_state_s2(self): + np.random.seed(0) + no_grid_points = 1001 + sgf = HyperhemisphericalGridFilter(no_grid_points, 3) + + self.assertEqual(sgf.get_estimate().grid_values.shape, (no_grid_points, 1)) + + # Test if it is uniform at the beginning + self.assertAlmostEqual(np.sum(np.abs(sgf.get_estimate().grid_values - (1 / sgf.get_estimate().get_manifold_size() * np.ones((no_grid_points, 1))))), 0, delta=1e-13) + + M = np.eye(3) + Z = np.array([-2, -1, 0]).reshape(-1, 1) + bd = BinghamDistribution(Z, M) + bd.F = bd.F * bd.integrate_numerically() + + sgd_state = HyperhemisphericalGridDistribution.from_distribution(bd, no_grid_points) + self.assertIsInstance(sgf.gd, HyperhemisphericalGridDistribution) + sgf.set_state(sgd_state) + self.assertIsInstance(sgf.gd, HyperhemisphericalGridDistribution) + + # Verify that it is no longer a uniform distribution + self.assertGreater(np.sum(np.abs(sgf.get_estimate().grid_values - (1 / sgf.get_estimate().get_manifold_size()))), 60) + + # Verify estimate matches a mode of the Bingham + self.assertAlmostEqual(np.min(np.linalg.norm(sgf.get_point_estimate() - np.hstack((bd.mode(), -bd.mode())), axis=0)), 0, delta=1e-11) + + # Verify errors + full_grid = sgd_state.get_grid() + sgd_state.grid = full_grid[:, -1] + sgd_state.grid_values = sgd_state.grid_values[:-1] + self.assertIsInstance(sgf.gd, HyperhemisphericalGridDistribution) + sgf_tmp = sgf.copy() + + with self.assertRaises(ValueError): + sgf_tmp.set_state(sgd_state) + + with self.assertRaises(ValueError): + sgf.set_state(bd) + + def test_set_state_s3(self): + no_grid_points = 1001 + sgf = HyperhemisphericalGridFilter(no_grid_points, 4) + self.assertEqual(sgf.get_estimate().grid_values.shape, (no_grid_points, 1)) + + # Test if it is uniform at the beginning + self.assertAlmostEqual(np.sum(np.abs(np.diff(sgf.get_estimate().grid_values.T))), 0) + + M = np.eye(4) + Z = np.array([-2, -1, -0.5, 0]).reshape(-1, 1) + bd = BinghamDistribution(Z, M) + bd.F = bd.F * bd.integrate_numerically() + + sgd_state = HyperhemisphericalGridDistribution.from_distribution(bd, no_grid_points) + sgf.set_state(sgd_state) + + # Verify that it is no longer a uniform distribution + self.assertGreater(np.sum(np.abs(np.diff(sgf.get_estimate().grid_values.T))), 5) + def test_predict_converges_to_uniform_S2_S3(self): + test_predict_converges_to_uniform(3, 501, [-2, -1, 0], 3, 5e-5, 'eq_point_set_symm', 6) + test_predict_converges_to_uniform(4, 1001, [-2, -1, -0.5, 0], 5, 1e-3, 'eq_point_set', 8) + def test_predict_converges_to_uniform(dim, no_grid_points, z_values, tol_verify_greater, tol_verify_equal, eq_point_set_type, eq_point_set_arg): + sgf = HyperhemisphericalGridFilter(no_grid_points, dim) + M = np.eye(dim) + Z = np.array(z_values).reshape(-1, 1) + bd = BinghamDistribution(Z, M) + bd.F = bd.F * bd.integrate_numerically() + sgf.set_state(HyperhemisphericalGridDistribution.from_distribution(bd, no_grid_points)) + + # Verify that it is not a uniform distribution + assert sum(abs(np.diff(sgf.get_estimate().grid_values.T))) > tol_verify_greater + + # Predict 10 times with VM-distributed noise + def trans(xkk, xk): + return 2 * np.hstack([HypersphericalMixture([VonMisesFisherDistribution(xk[:, i], 1), VonMisesFisherDistribution(-xk[:, i], 1)], [0.5, 0.5]).pdf(xkk) for i in range(xk.shape[1])]) + + sdsd = SdHalfCondSdHalfGridDistribution.from_function(trans, no_grid_points, True, eq_point_set_type, eq_point_set_arg) + manifold_size = sgf.get_estimate().get_manifold_size() + + for i in range(10): + values_alternative_formula = (manifold_size / sgf.get_estimate().get_grid().shape[1]) * np.sum(sgf.get_estimate().grid_values.T * sdsd.grid_values, axis=1) + sgf.predict_nonlinear_via_transition_density(sdsd) + assert np.allclose(sgf.get_estimate().grid_values, values_alternative_formula, atol=1e-12) + + # Verify that it is approximately uniform now + assert np.isclose(sum(abs(np.diff(sgf.get_estimate().grid_values.T))), 0, atol=tol_verify_equal) + +if __name__ == '__main__': + unittest.main()