Skip to content
78 changes: 78 additions & 0 deletions src/components/landmark/landmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
landmark.py

Static point landmark for SLAM (mirrors obstacle.Obstacle: data + draw).
Noise-free range–bearing prediction lives here; EKF Jacobians stay in observation_model.
"""

import numpy as np
from math import atan2, sqrt


class Landmark:
"""
A single 2D point landmark at fixed world coordinates.
"""

def __init__(self, x_m, y_m, color="k", marker_size=8):
"""
x_m, y_m: position in global frame [m]
color: matplotlib color for draw()
marker_size: marker size for draw()
"""
self.x_m = float(x_m)
self.y_m = float(y_m)
self.DRAW_COLOR = color
self.marker_size = marker_size

def get_x_m(self):
return self.x_m

def get_y_m(self):
return self.y_m

def position_xy(self):
"""Returns (x_m, y_m) tuple."""
return self.x_m, self.y_m

def predicted_range_bearing(self, robot_state):
"""
Noise-free predicted [range, bearing] from robot pose to this landmark.
robot_state: (3, 1) or (4, 1); only x, y, yaw are used.
Returns: (2, 1) ndarray
"""
return Landmark.predicted_range_bearing_at(robot_state, self.x_m, self.y_m)

@staticmethod
def predicted_range_bearing_at(robot_state, lx, ly):
"""
Same geometry as legacy observe_landmark; for EKF when reading lx, ly from state.
robot_state: (3, 1) [x, y, yaw]
lx, ly: landmark position [m]
"""
x = robot_state[0, 0]
y = robot_state[1, 0]
yaw = robot_state[2, 0]
dx = lx - x
dy = ly - y
r = sqrt(dx * dx + dy * dy)
if r < 1e-9:
return np.array([[0.0], [0.0]])
global_bearing = atan2(dy, dx)
bearing = (global_bearing - yaw + np.pi) % (2 * np.pi) - np.pi
return np.array([[r], [bearing]])

def draw(self, axes, elems, label=None):
"""
Draw landmark as a point marker (similar role to Obstacle.draw).
"""
plot_kwargs = {
"markersize": self.marker_size,
"color": self.DRAW_COLOR,
"linestyle": "None",
"marker": "o",
}
if label is not None:
plot_kwargs["label"] = label
p, = axes.plot([self.x_m], [self.y_m], **plot_kwargs)
elems.append(p)
48 changes: 48 additions & 0 deletions src/components/landmark/landmark_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""
landmark_list.py

Collection of landmarks (mirrors obstacle.ObstacleList).
"""


class LandmarkList:
"""
Holds Landmark objects; supports drawing and iteration for sensors.
"""

def __init__(self):
self.list = []

def add_landmark(self, landmark):
"""Append a Landmark instance."""
self.list.append(landmark)

def get_list(self):
return self.list

def __iter__(self):
return iter(self.list)

def __len__(self):
return len(self.list)

def update(self, _time_s):
"""
Landmarks are static; this lets LandmarkList be added to the visualizer.
"""
pass

def draw(self, axes, elems, label_first_only=True):
"""
Draw all landmarks. If label_first_only, only the first gets a legend label.
"""
for i, lm in enumerate(self.list):
lm.draw(
axes,
elems,
label="True landmarks" if (label_first_only and i == 0) else None,
)

def as_xy_tuples(self):
"""List of (x, y) for plotting or legacy paths."""
return [(lm.get_x_m(), lm.get_y_m()) for lm in self.list]
76 changes: 76 additions & 0 deletions src/components/localization/ekf_slam/data_association.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""
data_association.py

Nearest-neighbor data association with Mahalanobis distance gate.
"""

import numpy as np


def mahalanobis_distance(z_actual, z_predicted, S):
"""
Mahalanobis distance: (z_actual - z_predicted)^T @ inv(S) @ (z_actual - z_predicted)
z_actual, z_predicted: (2, 1) measurement vectors
S: (2, 2) innovation covariance
Returns: scalar distance
"""
diff = z_actual - z_predicted
# Normalize bearing difference to [-pi, pi]
diff[1, 0] = _normalize_angle(diff[1, 0])
try:
s_diff = np.linalg.solve(S, diff)
d_sq = (diff.T @ s_diff)[0, 0]
return np.sqrt(max(0, d_sq))
except np.linalg.LinAlgError:
return np.inf


def _normalize_angle(angle):
"""Normalize angle to [-pi, pi]."""
return (angle + np.pi) % (2 * np.pi) - np.pi


# Landmark id returned when observation should not be added as new (potential duplicate)
POTENTIAL_DUPLICATE = -2


def nearest_neighbor_association(observations, predicted_observations, innovation_covs,
landmark_ids, gate_threshold):
"""
For each observation, find nearest landmark within Mahalanobis gate.
observations: list of (2, 1) arrays - actual measurements
predicted_observations: list of (2, 1) arrays - one per known landmark
innovation_covs: list of (2, 2) arrays - S matrix per landmark
landmark_ids: list of landmark indices (0-based)
gate_threshold: max Mahalanobis distance to accept a match
Returns: list of (obs_index, landmark_id).
landmark_id >= 0: match to that landmark (state index).
landmark_id == -1: truly new landmark (add to state).
landmark_id == POTENTIAL_DUPLICATE (-2): within gate of an already-used landmark;
do NOT add as new (duplicate-landmark guard).
"""
num_obs = len(observations)
num_landmarks = len(landmark_ids)
matches = []
used_landmarks = set()
for o in range(num_obs):
z = observations[o]
best_landmark = -1
best_dist = gate_threshold + 1e-6
min_dist_any = np.inf # min distance to any landmark (including used)
for L in range(num_landmarks):
d = mahalanobis_distance(z, predicted_observations[L], innovation_covs[L])
min_dist_any = min(min_dist_any, d)
if landmark_ids[L] in used_landmarks:
continue
if d < best_dist:
best_dist = d
best_landmark = landmark_ids[L]
if best_landmark >= 0:
used_landmarks.add(best_landmark)
matches.append((o, best_landmark))
elif min_dist_any <= gate_threshold:
matches.append((o, POTENTIAL_DUPLICATE))
else:
matches.append((o, -1)) # truly new landmark
return matches
178 changes: 178 additions & 0 deletions src/components/localization/ekf_slam/ekf_slam_localizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
"""
ekf_slam_localizer.py

EKF-SLAM localizer component following the ExtendedKalmanFilterLocalizer pattern.
Jointly estimates robot state [x, y, yaw, speed] and landmark positions [lx, ly, ...]
using range-bearing observations. Prediction uses State.motion_model.
"""

import sys
from pathlib import Path

import numpy as np
from math import sqrt, pi

sys.path.append(str(Path(__file__).absolute().parent) + "/../../state")
sys.path.append(str(Path(__file__).absolute().parent.parent.parent) + "/landmark")
from state import State
from landmark import Landmark

from motion_model import jacobian_F, jacobian_G
from observation_model import build_H_matrix
from data_association import nearest_neighbor_association, POTENTIAL_DUPLICATE
from landmark_manager import augment_state, initialize_landmark_from_observation


class EKFSLAMLocalizer:
"""
EKF-SLAM localizer: simultaneous localization and mapping via Extended Kalman Filter.
Interface mirrors ExtendedKalmanFilterLocalizer where possible.
"""

def __init__(self, init_x=0.0, init_y=0.0, init_yaw=0.0, init_speed_mps=0.0,
sigma_r=0.2, sigma_phi=0.1, sigma_v=0.15, sigma_omega=0.08,
gate_threshold=2.0, max_range=12.0,
duplicate_position_threshold=4.5, color='r'):
self.R_obs = np.diag([sigma_r ** 2, sigma_phi ** 2])
self.Q_input = np.diag([sigma_v ** 2, sigma_omega ** 2])
self.sigma_r = sigma_r
self.sigma_phi = sigma_phi
self.gate_threshold = gate_threshold
self.duplicate_position_threshold = duplicate_position_threshold
self.max_range = max_range
self.DRAW_COLOR = color

self.mu = np.array([[init_x], [init_y], [init_yaw], [init_speed_mps]])
self.Sigma = np.eye(4) * 0.1

def predict(self, control, dt):
"""
EKF prediction step.
control: (2,1) [accel_mps2, yaw_rate_rps]
dt: time step [s]
"""
n = self.mu.shape[0]
robot_state = self.mu[0:4]
robot_pred = State.motion_model(robot_state, control, dt)
F = jacobian_F(robot_pred, control, dt)
G = jacobian_G(robot_pred, dt)
Sigma_rr = self.Sigma[0:4, 0:4]
Sigma_rr_new = F @ Sigma_rr @ F.T + G @ self.Q_input @ G.T
self.mu[0:4] = robot_pred
if n == 4:
self.Sigma[0:4, 0:4] = Sigma_rr_new
else:
Sigma_rl = self.Sigma[0:4, 4:].copy()
self.Sigma[0:4, 0:4] = Sigma_rr_new
self.Sigma[0:4, 4:] = F @ Sigma_rl
self.Sigma[4:, 0:4] = Sigma_rl.T @ F.T

def update(self, observations):
"""
EKF update step with data association and landmark augmentation.
observations: list of (2,1) [range, bearing] measurements
"""
if not observations:
return

n = self.mu.shape[0]
num_landmarks = (n - 4) // 2
pred_obs_list = []
S_list = []
for j in range(num_landmarks):
lx = self.mu[4 + 2 * j, 0]
ly = self.mu[4 + 2 * j + 1, 0]
z_pred = Landmark.predicted_range_bearing_at(self.mu[0:3], lx, ly)
pred_obs_list.append(z_pred)
H = build_H_matrix(self.mu[0:3], lx, ly, j, n)
S = H @ self.Sigma @ H.T + self.R_obs
S_list.append(S)
if not pred_obs_list:
matches = [(o, -1) for o in range(len(observations))]
else:
matches = nearest_neighbor_association(
observations, pred_obs_list, S_list,
list(range(num_landmarks)), self.gate_threshold
)

for (obs_idx, land_id) in matches:
n = self.mu.shape[0]
z = observations[obs_idx]
if land_id >= 0:
j = land_id
lx = self.mu[4 + 2 * j, 0]
ly = self.mu[4 + 2 * j + 1, 0]
z_pred = Landmark.predicted_range_bearing_at(self.mu[0:3], lx, ly)
if z_pred[0, 0] < 0.5:
continue
H = build_H_matrix(self.mu[0:3], lx, ly, j, n)
S = H @ self.Sigma @ H.T + self.R_obs
try:
K = self.Sigma @ H.T @ np.linalg.inv(S)
except np.linalg.LinAlgError:
continue
innov = z - z_pred
innov[1, 0] = (innov[1, 0] + pi) % (2 * pi) - pi
self.mu = self.mu + K @ innov
IKH = np.eye(n) - K @ H
self.Sigma = IKH @ self.Sigma
elif land_id == POTENTIAL_DUPLICATE:
continue
else:
lm_new, _, _ = initialize_landmark_from_observation(
self.mu[0:3], z, None, None
)
lx_new, ly_new = float(lm_new[0, 0]), float(lm_new[1, 0])
is_duplicate = False
for j in range((self.mu.shape[0] - 4) // 2):
lx_j = self.mu[4 + 2 * j, 0]
ly_j = self.mu[4 + 2 * j + 1, 0]
d = sqrt((lx_new - lx_j) ** 2 + (ly_new - ly_j) ** 2)
if d < self.duplicate_position_threshold:
is_duplicate = True
break
if is_duplicate:
continue
self.mu, self.Sigma = augment_state(
self.mu, self.Sigma, z, self.mu[0:3], self.R_obs
)

def get_robot_state(self):
"""Returns (4,1) array [x, y, yaw, speed]."""
return self.mu[0:4].copy()

def get_estimated_landmarks(self):
"""Returns list of (lx, ly) tuples for all mapped landmarks."""
n = self.mu.shape[0]
if n <= 4:
return []
return [
(self.mu[4 + 2 * j, 0], self.mu[4 + 2 * j + 1, 0])
for j in range((n - 4) // 2)
]

def draw(self, axes, elems, pose):
"""
Draw uncertainty circle and estimated landmarks.
axes: Axes object
elems: list of plot elements
pose: (3,1) [x, y, yaw] for circle center
"""
est_lm = self.get_estimated_landmarks()
if est_lm:
lx_est = [p[0] for p in est_lm]
ly_est = [p[1] for p in est_lm]
p, = axes.plot(lx_est, ly_est, "rx", markersize=6, label="Est. landmarks")
elems.append(p)

Sigma_xy = self.Sigma[0:2, 0:2]
try:
trace_xy = np.trace(Sigma_xy) / 2.0
r = 3.0 * sqrt(max(0.0, trace_xy))
t = np.arange(0, 2 * pi + 0.1, 0.1)
cx = pose[0, 0] + r * np.cos(t)
cy = pose[1, 0] + r * np.sin(t)
p, = axes.plot(cx, cy, color=self.DRAW_COLOR, linewidth=0.8)
elems.append(p)
except (np.linalg.LinAlgError, ValueError):
pass
Loading