Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions pyrecest/filters/hyperhemispherical_grid_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import numpy as np
import warnings
from .abstract_grid_filter import AbstractGridFilter
from .abstract_hyperhemispherical_filter import AbstractHyperhemisphericalFilter
from pyrecest.distributions.hypersphere_subset.hyperhemispherical_grid_distribution import HyperhemisphericalGridDistribution
from pyrecest.distributions.conditional.sd_half_cond_sd_half_grid_distribution import SdHalfCondSdHalfGridDistribution
from pyrecest.distributions import BinghamDistribution
from pyrecest.distributions import WatsonDistribution
from pyrecest.distributions import VonMisesFisherDistribution
from pyrecest.distributions import HypersphericalMixture
from pyrecest.distributions import HyperhemisphericalUniformDistribution
from pyrecest.distributions import AbstractHyperhemisphericalDistribution
from pyrecest.distributions import HyperhemisphericalWatsonDistribution

class HyperhemisphericalGridFilter(AbstractGridFilter, AbstractHyperhemisphericalFilter):
def __init__(self, no_of_coefficients, dim, grid_type='eq_point_set_symm'):
self.gd = HyperhemisphericalGridDistribution.from_distribution(
HyperhemisphericalUniformDistribution(dim), no_of_coefficients, grid_type)

def set_state(self, new_state):
assert self.dim == new_state.dim
assert isinstance(new_state, AbstractHyperhemisphericalDistribution)
self.gd = new_state

def predict_identity(self, d_sys):
assert isinstance(d_sys, AbstractHyperhemisphericalDistribution)
sd_half_cond_sd_half = HyperhemisphericalGridFilter.sys_noise_to_transition_density(
d_sys, self.gd.grid_values.shape[0])
self.predict_nonlinear_via_transition_density(sd_half_cond_sd_half)

def update_identity(self, meas_noise, z=None):
assert isinstance(meas_noise, AbstractHyperhemisphericalDistribution)
if not z==None:
measNoise = measNoise.setMode(z)
curr_grid = self.gd.get_grid()
self.gd = self.gd.multiply(HyperhemisphericalGridDistribution(curr_grid, 2 * meas_noise.pdf(curr_grid).T))

def update_nonlinear(self, likelihood, z):
self.gd.grid_values = self.gd.grid_values * likelihood(z, self.gd.get_grid()).T
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
self.gd = self.gd.normalize()

def predict_nonlinear_via_transition_density(self, f_trans):
assert np.array_equal(self.gd.get_grid(), f_trans.get_grid()), \
"fTrans is using an incompatible grid."
self.gd = self.gd.normalize()
grid_values_new = self.gd.get_manifold_size() / self.gd.grid_values.shape[0] * f_trans.grid_values.dot(
self.gd.grid_values)
self.gd = HyperhemisphericalGridDistribution(self.gd.get_grid(), grid_values_new)

def get_point_estimate(self):
gd_full_sphere = self.gd.to_full_sphere()
p = BinghamDistribution.fit(gd_full_sphere.get_grid(), gd_full_sphere.grid_values.T / np.sum(
gd_full_sphere.grid_values)).mode()
if p[-1] < 0:
p = -p
return p

@staticmethod
def sys_noise_to_transition_density(d_sys, no_grid_points):
if isinstance(d_sys, AbstractDistribution):
if isinstance(d_sys, (HyperhemisphericalWatsonDistribution, WatsonDistribution)):
def trans(xkk, xk):
return np.array([2 * WatsonDistribution(xk[:, i], d_sys.kappa).pdf(xkk) for i in range(xk.shape[1])]).T

elif (isinstance(d_sys, HypersphericalMixture) and len(d_sys.dists) == 2 and
np.all(d_sys.w == 0.5) and np.array_equal(d_sys.dists[0].mu, -d_sys.dists[1].mu) and
d_sys.dists[0].kappa == d_sys.dists[1].kappa):
def trans(xkk, xk):
return np.array([(VonMisesFisherDistribution(xk[:, i], d_sys.dists[0].kappa).pdf(xkk) +
VonMisesFisherDistribution(xk[:, i], d_sys.dists[0].kappa).pdf(-xkk))
for i in range(xk.shape[1])]).T
else:
raise ValueError("Distribution not supported for predict identity. Must be zonal (rotationally symmetric around last dimension)")

print("PredictIdentity:Inefficient - Using inefficient prediction. Consider precalculating the SdHalfCondSdHalfGridDistribution and using predictNonlinearViaTransitionDensity.")
sd_half_cond_sd_half = SdHalfCondSdHalfGridDistribution.from_function(trans, no_grid_points, True, 'eq_point_set_symm', 2 * d_sys.dim)
return sd_half_cond_sd_half

else:
raise TypeError("d_sys must be an instance of AbstractDistribution")

96 changes: 96 additions & 0 deletions pyrecest/tests/filters/test_hyperhemispherical_grid_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import unittest
import numpy as np
from pyrecest.filters.hyperhemispherical_grid_filter import HyperhemisphericalGridFilter
from pyrecest.distributions.hypersphere_subset.hyperhemispherical_grid_distribution import HyperhemisphericalGridDistribution
from pyrecest.distributions.conditional.sd_half_cond_sd_half_grid_distribution import SdHalfCondSdHalfGridDistribution
from pyrecest.distributions import BinghamDistribution, HypersphericalMixture, VonMisesFisherDistribution

class HyperhemisphericalGridFilterTest(unittest.TestCase):
def test_set_state_s2(self):
np.random.seed(0)
no_grid_points = 1001
sgf = HyperhemisphericalGridFilter(no_grid_points, 3)

self.assertEqual(sgf.get_estimate().grid_values.shape, (no_grid_points, 1))

# Test if it is uniform at the beginning
self.assertAlmostEqual(np.sum(np.abs(sgf.get_estimate().grid_values - (1 / sgf.get_estimate().get_manifold_size() * np.ones((no_grid_points, 1))))), 0, delta=1e-13)

M = np.eye(3)
Z = np.array([-2, -1, 0]).reshape(-1, 1)
bd = BinghamDistribution(Z, M)
bd.F = bd.F * bd.integrate_numerically()

sgd_state = HyperhemisphericalGridDistribution.from_distribution(bd, no_grid_points)
self.assertIsInstance(sgf.gd, HyperhemisphericalGridDistribution)
sgf.set_state(sgd_state)
self.assertIsInstance(sgf.gd, HyperhemisphericalGridDistribution)

# Verify that it is no longer a uniform distribution
self.assertGreater(np.sum(np.abs(sgf.get_estimate().grid_values - (1 / sgf.get_estimate().get_manifold_size()))), 60)

# Verify estimate matches a mode of the Bingham
self.assertAlmostEqual(np.min(np.linalg.norm(sgf.get_point_estimate() - np.hstack((bd.mode(), -bd.mode())), axis=0)), 0, delta=1e-11)

# Verify errors
full_grid = sgd_state.get_grid()
sgd_state.grid = full_grid[:, -1]
sgd_state.grid_values = sgd_state.grid_values[:-1]
self.assertIsInstance(sgf.gd, HyperhemisphericalGridDistribution)
sgf_tmp = sgf.copy()

with self.assertRaises(ValueError):
sgf_tmp.set_state(sgd_state)

with self.assertRaises(ValueError):
sgf.set_state(bd)

def test_set_state_s3(self):
no_grid_points = 1001
sgf = HyperhemisphericalGridFilter(no_grid_points, 4)
self.assertEqual(sgf.get_estimate().grid_values.shape, (no_grid_points, 1))

# Test if it is uniform at the beginning
self.assertAlmostEqual(np.sum(np.abs(np.diff(sgf.get_estimate().grid_values.T))), 0)

M = np.eye(4)
Z = np.array([-2, -1, -0.5, 0]).reshape(-1, 1)
bd = BinghamDistribution(Z, M)
bd.F = bd.F * bd.integrate_numerically()

sgd_state = HyperhemisphericalGridDistribution.from_distribution(bd, no_grid_points)
sgf.set_state(sgd_state)

# Verify that it is no longer a uniform distribution
self.assertGreater(np.sum(np.abs(np.diff(sgf.get_estimate().grid_values.T))), 5)
def test_predict_converges_to_uniform_S2_S3(self):
test_predict_converges_to_uniform(3, 501, [-2, -1, 0], 3, 5e-5, 'eq_point_set_symm', 6)
test_predict_converges_to_uniform(4, 1001, [-2, -1, -0.5, 0], 5, 1e-3, 'eq_point_set', 8)
def test_predict_converges_to_uniform(dim, no_grid_points, z_values, tol_verify_greater, tol_verify_equal, eq_point_set_type, eq_point_set_arg):
sgf = HyperhemisphericalGridFilter(no_grid_points, dim)
M = np.eye(dim)
Z = np.array(z_values).reshape(-1, 1)
bd = BinghamDistribution(Z, M)
bd.F = bd.F * bd.integrate_numerically()
sgf.set_state(HyperhemisphericalGridDistribution.from_distribution(bd, no_grid_points))

# Verify that it is not a uniform distribution
assert sum(abs(np.diff(sgf.get_estimate().grid_values.T))) > tol_verify_greater

# Predict 10 times with VM-distributed noise
def trans(xkk, xk):
return 2 * np.hstack([HypersphericalMixture([VonMisesFisherDistribution(xk[:, i], 1), VonMisesFisherDistribution(-xk[:, i], 1)], [0.5, 0.5]).pdf(xkk) for i in range(xk.shape[1])])

sdsd = SdHalfCondSdHalfGridDistribution.from_function(trans, no_grid_points, True, eq_point_set_type, eq_point_set_arg)
manifold_size = sgf.get_estimate().get_manifold_size()

for i in range(10):
values_alternative_formula = (manifold_size / sgf.get_estimate().get_grid().shape[1]) * np.sum(sgf.get_estimate().grid_values.T * sdsd.grid_values, axis=1)
sgf.predict_nonlinear_via_transition_density(sdsd)
assert np.allclose(sgf.get_estimate().grid_values, values_alternative_formula, atol=1e-12)

# Verify that it is approximately uniform now
assert np.isclose(sum(abs(np.diff(sgf.get_estimate().grid_values.T))), 0, atol=tol_verify_equal)

if __name__ == '__main__':
unittest.main()
Loading