Source code for easyleed.kalman

"""
easyleed.kalman
----------------

Kalman filters for tracking the spots

"""

import numpy as np

[docs]class AbstractKalmanFilter(object): """ Abstract implementation of a Kalman filter. Matrices and Vectors can be given in any input format np.matrix() understands. Vectors are internally transposed and should therefore be given as column vectors. """ def __init__(self, x, P, H): """ Initialize Kalman filter. x: start state vector P: start state covariance matrix H: measurement matrix """ super(AbstractKalmanFilter, self).__init__() self.x = np.asmatrix(x).T self.P = np.asmatrix(P) self.H = np.asmatrix(H) # identity matrix of state vector size self._1 = np.asmatrix(np.identity(max(self.x.shape)))
[docs] def predict(self, F, Q=np.zeros((4, 4))): """ Predict next state. F: state transition matrix Q: process covariance matrix """ F = np.asmatrix(F) Q = np.asmatrix(Q) self.x = F * self.x self.P = F * self.P * F.T + Q
[docs] def predict_measurement_covariance(self, R=None): """ Returns the covariance matrix of a predicted measurement. """ if not R is None: return self.H * self.P * self.H.T + R else: return self.H * self.P * self.H.T
[docs] def predict_measurement(self): """ Returns the predicted measurement. """ return self.H * self.x
[docs] def update(self, z, R): """ Update state estimate. z: measurement vector R: measurement covariance matrix """ z = np.asmatrix(z).T R = np.asmatrix(R) K = self.P * self.H.T * self.predict_measurement_covariance(R).I # print np.sum((np.asarray(z).flatten() - np.asarray(self.predict_measurement()).flatten())**2), \ # np.sum(np.diag(self.predict_measurement_covariance(R))) self.x = self.x + K * (z - self.predict_measurement()) self.P = (self._1 - K * self.H) * self.P
[docs] def measurement_distance(self, z, R=None): """ Returns the squared Mahalanobis distance of the given measurement. z: measurement vector """ z = np.asmatrix(z).T R = np.diag([0, 0]) if R is None else np.asarray(R) z_predicted = self.predict_measurement() # calculate the measurement residual residual = z - z_predicted return residual.T * self.predict_measurement_covariance(R).I * residual
[docs]class AbstractPVKalmanFilter(AbstractKalmanFilter): """ Kalman filter for 2d-tracking using position and velocity as state variables.""" def __init__(self, x_in, y_in, P, time, vx_in=0, vy_in=0): self.old_time = time x = [x_in, y_in, vx_in, vy_in] H = [[1, 0, 0, 0], [0, 1, 0, 0]] super(AbstractPVKalmanFilter, self).__init__(x, P, H) def get_position(self): return float(self.x[0]), float(self.x[1]) def get_position_err(self): return float(self.P[0, 0])**0.5, float(self.P[1, 1])**0.5
class PVKalmanFilter0(AbstractPVKalmanFilter): def predict(self, time, *args, **kwargs): dt = time - self.old_time F = [[1, 0, dt, 0], [0, 1, 0, dt], [0, 0, 1, 0], [0, 0, 0, 1]] super(PVKalmanFilter0, self).predict(F, *args, **kwargs) self.old_time = time class PVKalmanFilter1(AbstractPVKalmanFilter): def predict(self, time, *args, **kwargs): dt = time - self.old_time a = - 1.5 / self.old_time v_up = 1 + a * dt pos_up = dt + 0.5 * a * dt**2 F = [[1, 0, pos_up, 0], [0, 1, 0, pos_up], [0, 0, v_up, 0], [0, 0, 0, v_up]] super(PVKalmanFilter1, self).predict(F, *args, **kwargs) self.old_time = time class PVKalmanFilter2(AbstractPVKalmanFilter): def predict(self, time, *args, **kwargs): dt = time - self.old_time a = - 1.5 / self.old_time a_dot = 1.875 / self.old_time**2 v_up = 1 + a * dt + a_dot * dt**2 pos_up = dt + 0.5 * a * dt**2 + (1.0 / 3.0) * a_dot * dt**3 F = [[1, 0, pos_up, 0], [0, 1, 0, pos_up], [0, 0, v_up, 0], [0, 0, 0, v_up]] super(PVKalmanFilter2, self).predict(F, *args, **kwargs) self.old_time = time class PVKalmanFilter3(AbstractPVKalmanFilter): def predict(self, time, *args, **kwargs): dt = time - self.old_time a = - 1.5 / self.old_time a_dot = 1.875 / self.old_time**2 a_ddot = - 2.1875 / self.old_time**3 v_up = 1 + a * dt + a_dot * dt**2 + a_ddot * dt**3 pos_up = dt + 0.5 * a * dt**2 + (1.0 / 3.0) * a_dot * dt**3 + (1.0 / 4.0) * a_ddot * dt**4 F = [[1, 0, pos_up, 0], [0, 1, 0, pos_up], [0, 0, v_up, 0], [0, 0, 0, v_up]] super(PVKalmanFilter3, self).predict(F, *args, **kwargs) self.old_time = time
Fork me on GitHub