pykalmanでUKF

pythonでカルマンフィルタを扱えるpykalmanというライブラリがあるんですが,使い方をよく忘れるので簡単にメモしておきます.

pykalmanには線形カルマンフィルタと非線形のUnscentedカルマンフィルタ(UKF)の実装が用意されていて,更にUKFには通常のUnscentedKalmanFilterクラスとAdditiveUnscentedKalmanFilterクラスがあります.AdditiveUnscentedKalmanFilterは通常のUKFより計算が軽くてロバストですが,ノイズが単純な加算でモデル化できる場合のみに使用できます.

どちらのUKFも基本的に 1) 状態遷移関数&観測関数を定義, 2) UKF初期化, 3) 観測を基にUKF更新, で状態推定を行うことができます.

# 1) definition of the transition and observation functions
# calculate the next state
def transition(state, noise):
  next_state = ...
  return next_state + noise

# calculate the expected observation
def observation(state, noise):
  expected_observation = ...
  return expected_observation + noise

# 2) initialize UKF
transition_cov = numpy.eye(N)  # transition noise covariance matrix
observation_cov = numpy.eye(M) # observation noise covariance matrix
mean = numpy.zeros(N)          # initial mean
covs = numpy.eye(N)            # initial covariance

ukf = pykalman.UnscentedKalmanFilter(transition, observation, transition_cov, observation_cov, mean, cov)

# 3) update mean and cov with observations
while True:
  observation = ...
  mean, cov = ukf.filter_update(mean, cov, observation)

前述したように今回の例のようにノイズが状態,観測に加算されるだけの場合にはAdditiveUnscentedKalmanFilterを使ったほうが,計算量・安定性的に良いです.AdditiveUnscentedKalmanFilterの場合にはノイズは加算されるだけと決まっているので,状態遷移・観測関数には現状態ベクトルのみ渡されて,pykalman側で勝手にノイズを足してくれます.下のような感じ.

def transition(state):
  next_state = ...
  return next_state

def observation(state):
  expected_observation = ...
  return expected_observation

ukf = pykalman.AdditiveUnscentedKalmanFilter(...)

デモとして,前にも作ってた距離センサを使った位置推定プログラムをpykalmanで書き直しました.github

#!/usr/bin/python
import cv2
import math
import numpy
import pykalman


# System to be estimated. We estimate a target position from observations taken from several range sensors.
# target position : p = [p.x, p.y]
# sensor positions: s0 = [s0.x, s0.y], ..., sn = [sn.x, sn.y]
# state space     : x = p
# observation     : y = [|p - s0|, |p - s1|, ..., |p - sn|]
class System:
	# constructor
	def __init__(self):
		# true target and sensor positions
		self.target = numpy.float32((256, 256))
		self.sensors = numpy.float32([(128, 128), (384, 384), (384, 128)])

		cv2.namedWindow('canvas')
		cv2.setMouseCallback('canvas', self.mouse_callback)

	# generates an observation vector
	# [|p - s0|, |p - s1|, ..., |p - sn|] + noise
	def measurement(self):
		dists = numpy.linalg.norm(self.sensors - self.target, axis=1)
		return dists + numpy.abs(numpy.random.normal(0.0, 25.0, dists.size))

	# updates the target position
	def mouse_callback(self, event, x, y, flags, userdata):
		if event == cv2.EVENT_LBUTTONDOWN:
			self.pushed = True
		elif event == cv2.EVENT_LBUTTONUP:
			self.pushed = False

		if hasattr(self, 'pushed') and self.pushed:
			self.target = numpy.float32((x, y))

	# draw
	def draw(self, filter, measurement):
		canvas = numpy.ones((512, 512, 3), dtype=numpy.uint8) * 255

		# draw sensors and observations
		for r, sensor in zip(measurement, self.sensors):
			cv2.circle(canvas, tuple(sensor.astype(numpy.int32)), 5, (0, 255, 0), 2)
			cv2.circle(canvas, tuple(sensor.astype(numpy.int32)), int(r), (128, 128, 128), 1)

		# draw true target position
		cv2.circle(canvas, tuple(self.target.astype(numpy.int32)), 2, (255, 0, 0), 2)

		# draw estimated target position and error ellipse
		cv2.circle(canvas, tuple(filter.mean.astype(numpy.int32)), 5, (0, 0, 255), 2)
		eigenvalues, eigenvectors = numpy.linalg.eig(filter.cov)
		idx = [x[0] for x in sorted(enumerate(eigenvalues), key=lambda x: x[1])]
		angle = math.degrees(math.atan2(eigenvectors[0, idx[1]], eigenvectors[1, idx[1]]))
		axes = (int(eigenvalues[idx[1]]), int(eigenvalues[idx[0]]))
		cv2.ellipse(canvas, tuple(filter.mean.astype(numpy.int32)), axes, angle, 0, 360, (0, 0, 255))

		cv2.imshow('canvas', canvas)


# Target position estimator using UKF
class Filter:
	def __init__(self, sensors):
		self.sensors = sensors

		# initialize ukf
		trans_cov = numpy.eye(2) * 20
		obs_cov = numpy.eye(sensors.shape[0]) * 100
		self.mean = numpy.random.normal(256, 256, 2)
		self.cov = numpy.eye(2) * 128
		# self.ukf = pykalman.UnscentedKalmanFilter(self.transition, self.observation, trans_cov, obs_cov, self.mean, self.cov)
		self.ukf = pykalman.AdditiveUnscentedKalmanFilter(self.transition_, self.observation_, trans_cov, obs_cov, self.mean, self.cov)

	# transition function for usual UKF
	def transition(self, state, noise):
		return state + noise

	# observation function for usual UKF
	def observation(self, state, noise):
		expected = numpy.linalg.norm(self.sensors - state, axis=-1)
		return expected + noise

	# transition function for Additive UKF
	def transition_(self, state):
		return state

	# observation function for Additive UKF
	def observation_(self, state):
		expected = numpy.linalg.norm(self.sensors - state, axis=-1)
		return expected

	# update state using ukf
	def update(self, measurement):
		self.mean, self.cov = self.ukf.filter_update(self.mean, self.cov, measurement)


# entry point
def main():
	system = System()
	filter = Filter(system.sensors)

	while cv2.waitKey(50) != 0x1b:
		obs = system.measurement()
		filter.update(obs)
		system.draw(filter, obs)


if __name__ == '__main__':
	main()

pykalmanですが,通常の線形カルマンフィルタに関してもパラメータ推定など,いい感じの機能が揃っているので使い勝手が良いです.それから観測データが全て揃ってからのオフライン推定についてもカルマンスムーサーをKF, UKFどちらにも提供しているのでそちらについてもそのうち書きたいと思います.

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です

このサイトはスパムを低減するために Akismet を使っています。コメントデータの処理方法の詳細はこちらをご覧ください