diff --git a/src/components/landmark/landmark.py b/src/components/landmark/landmark.py new file mode 100644 index 00000000..19f60848 --- /dev/null +++ b/src/components/landmark/landmark.py @@ -0,0 +1,78 @@ +""" +landmark.py + +Static point landmark for SLAM (mirrors obstacle.Obstacle: data + draw). +Noise-free range–bearing prediction lives here; EKF Jacobians stay in observation_model. +""" + +import numpy as np +from math import atan2, sqrt + + +class Landmark: + """ + A single 2D point landmark at fixed world coordinates. + """ + + def __init__(self, x_m, y_m, color="k", marker_size=8): + """ + x_m, y_m: position in global frame [m] + color: matplotlib color for draw() + marker_size: marker size for draw() + """ + self.x_m = float(x_m) + self.y_m = float(y_m) + self.DRAW_COLOR = color + self.marker_size = marker_size + + def get_x_m(self): + return self.x_m + + def get_y_m(self): + return self.y_m + + def position_xy(self): + """Returns (x_m, y_m) tuple.""" + return self.x_m, self.y_m + + def predicted_range_bearing(self, robot_state): + """ + Noise-free predicted [range, bearing] from robot pose to this landmark. + robot_state: (3, 1) or (4, 1); only x, y, yaw are used. + Returns: (2, 1) ndarray + """ + return Landmark.predicted_range_bearing_at(robot_state, self.x_m, self.y_m) + + @staticmethod + def predicted_range_bearing_at(robot_state, lx, ly): + """ + Same geometry as legacy observe_landmark; for EKF when reading lx, ly from state. + robot_state: (3, 1) [x, y, yaw] + lx, ly: landmark position [m] + """ + x = robot_state[0, 0] + y = robot_state[1, 0] + yaw = robot_state[2, 0] + dx = lx - x + dy = ly - y + r = sqrt(dx * dx + dy * dy) + if r < 1e-9: + return np.array([[0.0], [0.0]]) + global_bearing = atan2(dy, dx) + bearing = (global_bearing - yaw + np.pi) % (2 * np.pi) - np.pi + return np.array([[r], [bearing]]) + + def draw(self, axes, elems, label=None): + """ + Draw landmark as a point marker (similar role to Obstacle.draw). + """ + plot_kwargs = { + "markersize": self.marker_size, + "color": self.DRAW_COLOR, + "linestyle": "None", + "marker": "o", + } + if label is not None: + plot_kwargs["label"] = label + p, = axes.plot([self.x_m], [self.y_m], **plot_kwargs) + elems.append(p) diff --git a/src/components/landmark/landmark_list.py b/src/components/landmark/landmark_list.py new file mode 100644 index 00000000..e4b8f1dd --- /dev/null +++ b/src/components/landmark/landmark_list.py @@ -0,0 +1,48 @@ +""" +landmark_list.py + +Collection of landmarks (mirrors obstacle.ObstacleList). +""" + + +class LandmarkList: + """ + Holds Landmark objects; supports drawing and iteration for sensors. + """ + + def __init__(self): + self.list = [] + + def add_landmark(self, landmark): + """Append a Landmark instance.""" + self.list.append(landmark) + + def get_list(self): + return self.list + + def __iter__(self): + return iter(self.list) + + def __len__(self): + return len(self.list) + + def update(self, _time_s): + """ + Landmarks are static; this lets LandmarkList be added to the visualizer. + """ + pass + + def draw(self, axes, elems, label_first_only=True): + """ + Draw all landmarks. If label_first_only, only the first gets a legend label. + """ + for i, lm in enumerate(self.list): + lm.draw( + axes, + elems, + label="True landmarks" if (label_first_only and i == 0) else None, + ) + + def as_xy_tuples(self): + """List of (x, y) for plotting or legacy paths.""" + return [(lm.get_x_m(), lm.get_y_m()) for lm in self.list] diff --git a/src/components/localization/ekf_slam/data_association.py b/src/components/localization/ekf_slam/data_association.py new file mode 100644 index 00000000..61f2ffc8 --- /dev/null +++ b/src/components/localization/ekf_slam/data_association.py @@ -0,0 +1,76 @@ +""" +data_association.py + +Nearest-neighbor data association with Mahalanobis distance gate. +""" + +import numpy as np + + +def mahalanobis_distance(z_actual, z_predicted, S): + """ + Mahalanobis distance: (z_actual - z_predicted)^T @ inv(S) @ (z_actual - z_predicted) + z_actual, z_predicted: (2, 1) measurement vectors + S: (2, 2) innovation covariance + Returns: scalar distance + """ + diff = z_actual - z_predicted + # Normalize bearing difference to [-pi, pi] + diff[1, 0] = _normalize_angle(diff[1, 0]) + try: + s_diff = np.linalg.solve(S, diff) + d_sq = (diff.T @ s_diff)[0, 0] + return np.sqrt(max(0, d_sq)) + except np.linalg.LinAlgError: + return np.inf + + +def _normalize_angle(angle): + """Normalize angle to [-pi, pi].""" + return (angle + np.pi) % (2 * np.pi) - np.pi + + +# Landmark id returned when observation should not be added as new (potential duplicate) +POTENTIAL_DUPLICATE = -2 + + +def nearest_neighbor_association(observations, predicted_observations, innovation_covs, + landmark_ids, gate_threshold): + """ + For each observation, find nearest landmark within Mahalanobis gate. + observations: list of (2, 1) arrays - actual measurements + predicted_observations: list of (2, 1) arrays - one per known landmark + innovation_covs: list of (2, 2) arrays - S matrix per landmark + landmark_ids: list of landmark indices (0-based) + gate_threshold: max Mahalanobis distance to accept a match + Returns: list of (obs_index, landmark_id). + landmark_id >= 0: match to that landmark (state index). + landmark_id == -1: truly new landmark (add to state). + landmark_id == POTENTIAL_DUPLICATE (-2): within gate of an already-used landmark; + do NOT add as new (duplicate-landmark guard). + """ + num_obs = len(observations) + num_landmarks = len(landmark_ids) + matches = [] + used_landmarks = set() + for o in range(num_obs): + z = observations[o] + best_landmark = -1 + best_dist = gate_threshold + 1e-6 + min_dist_any = np.inf # min distance to any landmark (including used) + for L in range(num_landmarks): + d = mahalanobis_distance(z, predicted_observations[L], innovation_covs[L]) + min_dist_any = min(min_dist_any, d) + if landmark_ids[L] in used_landmarks: + continue + if d < best_dist: + best_dist = d + best_landmark = landmark_ids[L] + if best_landmark >= 0: + used_landmarks.add(best_landmark) + matches.append((o, best_landmark)) + elif min_dist_any <= gate_threshold: + matches.append((o, POTENTIAL_DUPLICATE)) + else: + matches.append((o, -1)) # truly new landmark + return matches diff --git a/src/components/localization/ekf_slam/ekf_slam_localizer.py b/src/components/localization/ekf_slam/ekf_slam_localizer.py new file mode 100644 index 00000000..6f9d14bc --- /dev/null +++ b/src/components/localization/ekf_slam/ekf_slam_localizer.py @@ -0,0 +1,178 @@ +""" +ekf_slam_localizer.py + +EKF-SLAM localizer component following the ExtendedKalmanFilterLocalizer pattern. +Jointly estimates robot state [x, y, yaw, speed] and landmark positions [lx, ly, ...] +using range-bearing observations. Prediction uses State.motion_model. +""" + +import sys +from pathlib import Path + +import numpy as np +from math import sqrt, pi + +sys.path.append(str(Path(__file__).absolute().parent) + "/../../state") +sys.path.append(str(Path(__file__).absolute().parent.parent.parent) + "/landmark") +from state import State +from landmark import Landmark + +from motion_model import jacobian_F, jacobian_G +from observation_model import build_H_matrix +from data_association import nearest_neighbor_association, POTENTIAL_DUPLICATE +from landmark_manager import augment_state, initialize_landmark_from_observation + + +class EKFSLAMLocalizer: + """ + EKF-SLAM localizer: simultaneous localization and mapping via Extended Kalman Filter. + Interface mirrors ExtendedKalmanFilterLocalizer where possible. + """ + + def __init__(self, init_x=0.0, init_y=0.0, init_yaw=0.0, init_speed_mps=0.0, + sigma_r=0.2, sigma_phi=0.1, sigma_v=0.15, sigma_omega=0.08, + gate_threshold=2.0, max_range=12.0, + duplicate_position_threshold=4.5, color='r'): + self.R_obs = np.diag([sigma_r ** 2, sigma_phi ** 2]) + self.Q_input = np.diag([sigma_v ** 2, sigma_omega ** 2]) + self.sigma_r = sigma_r + self.sigma_phi = sigma_phi + self.gate_threshold = gate_threshold + self.duplicate_position_threshold = duplicate_position_threshold + self.max_range = max_range + self.DRAW_COLOR = color + + self.mu = np.array([[init_x], [init_y], [init_yaw], [init_speed_mps]]) + self.Sigma = np.eye(4) * 0.1 + + def predict(self, control, dt): + """ + EKF prediction step. + control: (2,1) [accel_mps2, yaw_rate_rps] + dt: time step [s] + """ + n = self.mu.shape[0] + robot_state = self.mu[0:4] + robot_pred = State.motion_model(robot_state, control, dt) + F = jacobian_F(robot_pred, control, dt) + G = jacobian_G(robot_pred, dt) + Sigma_rr = self.Sigma[0:4, 0:4] + Sigma_rr_new = F @ Sigma_rr @ F.T + G @ self.Q_input @ G.T + self.mu[0:4] = robot_pred + if n == 4: + self.Sigma[0:4, 0:4] = Sigma_rr_new + else: + Sigma_rl = self.Sigma[0:4, 4:].copy() + self.Sigma[0:4, 0:4] = Sigma_rr_new + self.Sigma[0:4, 4:] = F @ Sigma_rl + self.Sigma[4:, 0:4] = Sigma_rl.T @ F.T + + def update(self, observations): + """ + EKF update step with data association and landmark augmentation. + observations: list of (2,1) [range, bearing] measurements + """ + if not observations: + return + + n = self.mu.shape[0] + num_landmarks = (n - 4) // 2 + pred_obs_list = [] + S_list = [] + for j in range(num_landmarks): + lx = self.mu[4 + 2 * j, 0] + ly = self.mu[4 + 2 * j + 1, 0] + z_pred = Landmark.predicted_range_bearing_at(self.mu[0:3], lx, ly) + pred_obs_list.append(z_pred) + H = build_H_matrix(self.mu[0:3], lx, ly, j, n) + S = H @ self.Sigma @ H.T + self.R_obs + S_list.append(S) + if not pred_obs_list: + matches = [(o, -1) for o in range(len(observations))] + else: + matches = nearest_neighbor_association( + observations, pred_obs_list, S_list, + list(range(num_landmarks)), self.gate_threshold + ) + + for (obs_idx, land_id) in matches: + n = self.mu.shape[0] + z = observations[obs_idx] + if land_id >= 0: + j = land_id + lx = self.mu[4 + 2 * j, 0] + ly = self.mu[4 + 2 * j + 1, 0] + z_pred = Landmark.predicted_range_bearing_at(self.mu[0:3], lx, ly) + if z_pred[0, 0] < 0.5: + continue + H = build_H_matrix(self.mu[0:3], lx, ly, j, n) + S = H @ self.Sigma @ H.T + self.R_obs + try: + K = self.Sigma @ H.T @ np.linalg.inv(S) + except np.linalg.LinAlgError: + continue + innov = z - z_pred + innov[1, 0] = (innov[1, 0] + pi) % (2 * pi) - pi + self.mu = self.mu + K @ innov + IKH = np.eye(n) - K @ H + self.Sigma = IKH @ self.Sigma + elif land_id == POTENTIAL_DUPLICATE: + continue + else: + lm_new, _, _ = initialize_landmark_from_observation( + self.mu[0:3], z, None, None + ) + lx_new, ly_new = float(lm_new[0, 0]), float(lm_new[1, 0]) + is_duplicate = False + for j in range((self.mu.shape[0] - 4) // 2): + lx_j = self.mu[4 + 2 * j, 0] + ly_j = self.mu[4 + 2 * j + 1, 0] + d = sqrt((lx_new - lx_j) ** 2 + (ly_new - ly_j) ** 2) + if d < self.duplicate_position_threshold: + is_duplicate = True + break + if is_duplicate: + continue + self.mu, self.Sigma = augment_state( + self.mu, self.Sigma, z, self.mu[0:3], self.R_obs + ) + + def get_robot_state(self): + """Returns (4,1) array [x, y, yaw, speed].""" + return self.mu[0:4].copy() + + def get_estimated_landmarks(self): + """Returns list of (lx, ly) tuples for all mapped landmarks.""" + n = self.mu.shape[0] + if n <= 4: + return [] + return [ + (self.mu[4 + 2 * j, 0], self.mu[4 + 2 * j + 1, 0]) + for j in range((n - 4) // 2) + ] + + def draw(self, axes, elems, pose): + """ + Draw uncertainty circle and estimated landmarks. + axes: Axes object + elems: list of plot elements + pose: (3,1) [x, y, yaw] for circle center + """ + est_lm = self.get_estimated_landmarks() + if est_lm: + lx_est = [p[0] for p in est_lm] + ly_est = [p[1] for p in est_lm] + p, = axes.plot(lx_est, ly_est, "rx", markersize=6, label="Est. landmarks") + elems.append(p) + + Sigma_xy = self.Sigma[0:2, 0:2] + try: + trace_xy = np.trace(Sigma_xy) / 2.0 + r = 3.0 * sqrt(max(0.0, trace_xy)) + t = np.arange(0, 2 * pi + 0.1, 0.1) + cx = pose[0, 0] + r * np.cos(t) + cy = pose[1, 0] + r * np.sin(t) + p, = axes.plot(cx, cy, color=self.DRAW_COLOR, linewidth=0.8) + elems.append(p) + except (np.linalg.LinAlgError, ValueError): + pass diff --git a/src/components/localization/ekf_slam/ekf_slam_vehicle_localizer.py b/src/components/localization/ekf_slam/ekf_slam_vehicle_localizer.py new file mode 100644 index 00000000..fa44c8f0 --- /dev/null +++ b/src/components/localization/ekf_slam/ekf_slam_vehicle_localizer.py @@ -0,0 +1,86 @@ +""" +ekf_slam_vehicle_localizer.py + +Adapter that lets EKF-SLAM plug into FourWheelsVehicle like other localizers. +""" + +import numpy as np + +from ekf_slam_localizer import EKFSLAMLocalizer +from landmark_range_bearing_sensor import LandmarkRangeBearingSensor + + +class EKFSLAMVehicleLocalizer: + """ + Adapts EKFSLAMLocalizer to the FourWheelsVehicle localizer interface. + """ + + def __init__( + self, + true_state, + landmark_list, + landmark_sensor=None, + init_x=0.0, + init_y=0.0, + init_yaw=0.0, + init_speed_mps=0.0, + sigma_r=0.2, + sigma_phi=0.1, + sigma_v=0.15, + sigma_omega=0.08, + gate_threshold=2.0, + max_range=15.0, + duplicate_position_threshold=4.5, + color="r", + ): + self.true_state = true_state + self.landmark_list = landmark_list + self.landmark_sensor = landmark_sensor or LandmarkRangeBearingSensor( + sigma_r=sigma_r, + sigma_phi=sigma_phi, + max_range_m=max_range, + ) + self.localizer = EKFSLAMLocalizer( + init_x=init_x, + init_y=init_y, + init_yaw=init_yaw, + init_speed_mps=init_speed_mps, + sigma_r=sigma_r, + sigma_phi=sigma_phi, + sigma_v=sigma_v, + sigma_omega=sigma_omega, + gate_threshold=gate_threshold, + max_range=max_range, + duplicate_position_threshold=duplicate_position_threshold, + color=color, + ) + + def update(self, state, accel_mps2, yaw_rate_rps, time_s, _unused_sensor_data): + """ + Update the wrapped EKF-SLAM localizer and return the estimated 4D state. + """ + control = np.array([[accel_mps2], [yaw_rate_rps]]) + self.localizer.predict(control, time_s) + + true_robot = np.array( + [ + [self.true_state.get_x_m()], + [self.true_state.get_y_m()], + [self.true_state.get_yaw_rad()], + [self.true_state.get_speed_mps()], + ] + ) + observations = self.landmark_sensor.observe_visible_landmarks( + true_robot, self.landmark_list + ) + self.localizer.update(observations) + return self.localizer.get_robot_state() + + def draw(self, axes, elems, pose): + self.localizer.draw(axes, elems, pose) + + def get_estimated_landmarks(self): + return self.localizer.get_estimated_landmarks() + + def get_robot_state(self): + return self.localizer.get_robot_state() diff --git a/src/components/localization/ekf_slam/landmark_manager.py b/src/components/localization/ekf_slam/landmark_manager.py new file mode 100644 index 00000000..cd097c71 --- /dev/null +++ b/src/components/localization/ekf_slam/landmark_manager.py @@ -0,0 +1,59 @@ +""" +landmark_manager.py + +State vector augmentation and new landmark initialization for EKF-SLAM. +""" + +import numpy as np +from math import cos, sin + + +def initialize_landmark_from_observation(robot_state, z, G_robot, G_landmark): + """ + Compute landmark position (lx, ly) from range-bearing observation and robot pose. + robot_state: (3, 1) [x, y, yaw] + z: (2, 1) [range, bearing] + G_robot: (2, 4) Jacobian of [lx, ly] w.r.t. [x, y, yaw, speed] (speed column is zero) + G_landmark: (2, 2) Jacobian of [lx, ly] w.r.t. [r, phi] + Returns: (2, 1) landmark position, and G_robot, G_landmark for covariance update. + """ + x, y, yaw = robot_state[0, 0], robot_state[1, 0], robot_state[2, 0] + r, phi = z[0, 0], z[1, 0] + lx = x + r * cos(yaw + phi) + ly = y + r * sin(yaw + phi) + # Jacobian of (lx, ly) w.r.t. (x, y, yaw): dlx/dx=1, dlx/dy=0, dlx/dyaw=-r*sin(yaw+phi) + G_robot = np.array([ + [1, 0, -r * sin(yaw + phi), 0], + [0, 1, r * cos(yaw + phi), 0] + ]) + G_landmark = np.array([ + [cos(yaw + phi), -r * sin(yaw + phi)], + [sin(yaw + phi), r * cos(yaw + phi)] + ]) + return np.array([[lx], [ly]]), G_robot, G_landmark + + +def augment_state(mu, Sigma, z, robot_state, R_obs): + """ + Augment state vector and covariance with a new landmark from observation z. + mu: (n, 1) state vector [x, y, yaw, speed, l1_x, l1_y, ...] + Sigma: (n, n) covariance + z: (2, 1) [range, bearing] + robot_state: (3, 1) current robot pose (x, y, yaw) from mu + R_obs: (2, 2) observation noise covariance + Returns: mu_new (n+2, 1), Sigma_new (n+2, n+2) + """ + lm, G_robot, G_landmark = initialize_landmark_from_observation(robot_state, z, None, None) + n = mu.shape[0] + Sigma_rr = Sigma[0:4, 0:4] + Sigma_xr = Sigma[:, 0:4] + # New landmark covariance and cross terms + sigma_new_lm = Sigma_xr @ G_robot.T # (n, 2) + sigma_new_lm_T = sigma_new_lm.T # (2, n) + Sigma_ll = G_robot @ Sigma_rr @ G_robot.T + G_landmark @ R_obs @ G_landmark.T + mu_new = np.vstack([mu, lm]) + Sigma_new = np.block([ + [Sigma, sigma_new_lm], + [sigma_new_lm_T, Sigma_ll] + ]) + return mu_new, Sigma_new diff --git a/src/components/localization/ekf_slam/motion_model.py b/src/components/localization/ekf_slam/motion_model.py new file mode 100644 index 00000000..4f816a47 --- /dev/null +++ b/src/components/localization/ekf_slam/motion_model.py @@ -0,0 +1,60 @@ +""" +motion_model.py + +Jacobians for EKF-SLAM prediction step. Mean propagation uses State.motion_model +(see state.py): robot state [x, y, yaw, speed], control [accel, yaw_rate]. +""" + +from math import cos, sin + +import numpy as np + + +def jacobian_F(pred_state, control, time_s): + """ + Jacobian of State.motion_model w.r.t. state [x, y, yaw, speed]. + pred_state: (4, 1) predicted state after State.motion_model + control: (2, 1) [accel_mps2, yaw_rate_rps] + time_s: time step [s] + Returns: (4, 4) Jacobian F (matches ExtendedKalmanFilterLocalizer._jacobian_F) + """ + yaw = pred_state[2, 0] + spd = pred_state[3, 0] + acl = control[0, 0] + t = time_s + + sin_yaw = sin(yaw) + cos_yaw = cos(yaw) + + jF = np.array( + [ + [1, 0, -spd * sin_yaw * t - acl * sin_yaw * t**2 / 2, cos_yaw * t], + [0, 1, spd * cos_yaw * t + acl * cos_yaw * t**2 / 2, sin_yaw * t], + [0, 0, 1, 0], + [0, 0, 0, 1], + ] + ) + + return jF + + +def jacobian_G(pred_state, time_s): + """ + Jacobian of State.motion_model w.r.t. control [accel, yaw_rate]. + pred_state: (4, 1) predicted state + time_s: time step [s] + Returns: (4, 2) Jacobian G (matches ExtendedKalmanFilterLocalizer._jacobian_G) + """ + yaw = pred_state[2, 0] + t = time_s + + jG = np.array( + [ + [cos(yaw) * t**2 / 2, 0], + [sin(yaw) * t**2 / 2, 0], + [0, t], + [t, 0], + ] + ) + + return jG diff --git a/src/components/localization/ekf_slam/observation_model.py b/src/components/localization/ekf_slam/observation_model.py new file mode 100644 index 00000000..39744101 --- /dev/null +++ b/src/components/localization/ekf_slam/observation_model.py @@ -0,0 +1,59 @@ +""" +observation_model.py + +EKF linearization only: Jacobians for range–bearing measurements. +Noise-free measurement prediction lives on Landmark; noisy simulation on LandmarkRangeBearingSensor. + +Measurement: range r, bearing phi (relative to robot yaw). +""" + +import numpy as np +from math import sqrt + + +def jacobian_H(robot_state, lx, ly, landmark_index): + """ + Jacobian of observation [range, bearing] w.r.t. state vector. + State vector is [x, y, yaw, speed, l1_x, l1_y, l2_x, l2_y, ...]. + landmark_index: 0-based index of this landmark (state indices 4+2*j, 4+2*j+1). + Returns partial blocks; build_H_matrix adds zero columns for speed and places landmark block. + """ + x, y, yaw = robot_state[0, 0], robot_state[1, 0], robot_state[2, 0] + dx = lx - x + dy = ly - y + r_sq = dx * dx + dy * dy + r = sqrt(r_sq) + r_min_jacobian = 0.3 + if r < r_min_jacobian: + r = r_min_jacobian + r_sq = r * r + elif r < 1e-9: + r = 1e-9 + r_sq = 1e-18 + dr_dx = -dx / r + dr_dy = -dy / r + dphi_dx = dy / r_sq + dphi_dy = -dx / r_sq + dphi_dyaw = -1.0 + dr_dlx = dx / r + dr_dly = dy / r + dphi_dlx = -dy / r_sq + dphi_dly = dx / r_sq + H_robot = np.array([[dr_dx, dr_dy, 0], + [dphi_dx, dphi_dy, dphi_dyaw]]) + H_landmark = np.array([[dr_dlx, dr_dly], + [dphi_dlx, dphi_dly]]) + return H_robot, H_landmark, landmark_index + + +def build_H_matrix(robot_state, lx, ly, landmark_index, state_dim): + """ + Build full (2, state_dim) Jacobian H for one landmark. + Robot block is 4 columns: (x, y, yaw, speed); range/bearing do not depend on speed. + """ + H_robot, H_landmark, j = jacobian_H(robot_state, lx, ly, landmark_index) + H = np.zeros((2, state_dim)) + H[:, 0:3] = H_robot + H[:, 3] = 0.0 + H[:, 4 + 2 * j:4 + 2 * j + 2] = H_landmark + return H diff --git a/src/components/sensors/landmark_range_bearing/landmark_range_bearing_sensor.py b/src/components/sensors/landmark_range_bearing/landmark_range_bearing_sensor.py new file mode 100644 index 00000000..8dc815a2 --- /dev/null +++ b/src/components/sensors/landmark_range_bearing/landmark_range_bearing_sensor.py @@ -0,0 +1,69 @@ +""" +landmark_range_bearing_sensor.py + +Range–bearing landmark sensor (simulation), following the pattern of gnss.Gnss. +Noise-free prediction uses Landmark; EKF Jacobians remain in observation_model. +""" + +import sys +from pathlib import Path + +import numpy as np + +_landmark_path = str(Path(__file__).absolute().parent.parent.parent) + "/landmark" +sys.path.append(_landmark_path) +from landmark import Landmark + + +class LandmarkRangeBearingSensor: + """ + Simulates noisy range and bearing measurements to point landmarks in 2D. + """ + + def __init__(self, sigma_r=0.2, sigma_phi=0.1, max_range_m=15.0, color="c"): + """ + sigma_r: range noise standard deviation [m] + sigma_phi: bearing noise standard deviation [rad] + max_range_m: landmarks beyond this predicted range are not observed + color: optional draw color (reserved for future visualization) + """ + self.sigma_r = sigma_r + self.sigma_phi = sigma_phi + self.max_range_m = max_range_m + self.DRAW_COLOR = color + self.NOISE_VAR_MAT = np.diag([sigma_r ** 2, sigma_phi ** 2]) + + @staticmethod + def observation_model(robot_state, landmark): + """ + Noise-free predicted measurement [range, bearing] from robot pose to landmark. + robot_state: (3, 1) or (4, 1); only x, y, yaw are used. + landmark: Landmark instance + """ + return landmark.predicted_range_bearing(robot_state) + + def observe_noisy(self, robot_state, landmark): + """Noisy range–bearing sample for one landmark.""" + z = landmark.predicted_range_bearing(robot_state).copy() + z[0, 0] = max(0.0, z[0, 0] + self.sigma_r * np.random.randn()) + z[1, 0] = ( + z[1, 0] + self.sigma_phi * np.random.randn() + np.pi + ) % (2 * np.pi) - np.pi + return z + + def observe_visible_landmarks(self, robot_state, landmarks): + """ + For each Landmark, if predicted range <= max_range_m, append one noisy observation. + landmarks: LandmarkList or iterable of Landmark + Returns: list of (2, 1) arrays in iteration order (only in-range). + """ + if hasattr(landmarks, "get_list"): + iterable = landmarks.get_list() + else: + iterable = landmarks + observations = [] + for lm in iterable: + z_pred = lm.predicted_range_bearing(robot_state) + if z_pred[0, 0] <= self.max_range_m: + observations.append(self.observe_noisy(robot_state, lm)) + return observations diff --git a/src/simulations/localization/ekf_slam/ekf_slam.py b/src/simulations/localization/ekf_slam/ekf_slam.py new file mode 100644 index 00000000..2ebc251b --- /dev/null +++ b/src/simulations/localization/ekf_slam/ekf_slam.py @@ -0,0 +1,263 @@ +""" +ekf_slam.py + +EKF-SLAM (Extended Kalman Filter based Simultaneous Localization and Mapping). +Real-time landmark-based SLAM in 2D with range-bearing sensor. +Demonstrates loop closure via covariance convergence when re-observing landmarks. +""" + +# import path setting +import sys +import numpy as np +from pathlib import Path +from math import pi + +abs_dir_path = str(Path(__file__).absolute().parent) +relative_path = "/../../../components/" + +sys.path.append(abs_dir_path + relative_path + "visualization") +sys.path.append(abs_dir_path + relative_path + "state") +sys.path.append(abs_dir_path + relative_path + "vehicle") +sys.path.append(abs_dir_path + relative_path + "sensors") +sys.path.append(abs_dir_path + relative_path + "localization/ekf_slam") +sys.path.append(abs_dir_path + relative_path + "sensors/landmark_range_bearing") +sys.path.append(abs_dir_path + relative_path + "landmark") + +import matplotlib.pyplot as plt + +# import component modules +from global_xy_visualizer import GlobalXYVisualizer +from min_max import MinMax +from time_parameters import TimeParameters +from vehicle_specification import VehicleSpecification +from state import State +from four_wheels_vehicle import FourWheelsVehicle +from sensors import Sensors +from ekf_slam_vehicle_localizer import EKFSLAMVehicleLocalizer +from landmark_range_bearing_sensor import LandmarkRangeBearingSensor +from landmark import Landmark +from landmark_list import LandmarkList + +# flag to show plot figure +# when executed as unit test, this flag is set as false +show_plot = True + +SIGMA_R = 0.2 +SIGMA_PHI = 0.1 +SIGMA_V = 0.15 +SIGMA_OMEGA = 0.08 +GATE_THRESHOLD = 2.0 +MAX_RANGE = 15.0 +NUM_LANDMARKS = 18 +INTERVAL_SEC = 0.2 +# Min distance between landmarks (so they stay distinct) +MIN_LANDMARK_SPACING = 4.5 +# Stop after one lap (circumference / speed); slight overshoot for loop closure +LAP_OVERSHOOT = 1.2 + + +class CircularMotionController: + """ + Minimal controller to keep the vehicle on a circle at constant speed. + """ + + def __init__(self, radius=10.0, speed=1.0): + self.radius = radius + self.speed = speed + + def update(self, state, time_s): + return None + + def draw(self, axes, elems): + return None + + def get_target_accel_mps2(self): + return 0.0 + + def get_target_yaw_rate_rps(self): + return -self.speed / self.radius + + def get_target_steer_rad(self): + return 0.0 + + +class LegendOverlay: + """ + Draw legend entries for the true and estimated vehicles. + """ + + def update(self, _time_s): + pass + + def draw(self, axes, elems): + true_plot, = axes.plot([], [], "b-", linewidth=2, label="True pose") + est_plot, = axes.plot([], [], "g-", linewidth=2, label="Est. pose") + elems.extend([true_plot, est_plot]) + axes.legend(loc="upper left") + + +def draw_final_comparison(landmark_list, true_state, est_state, localizer): + """ + After simulation ends, show a side-by-side: true map vs estimated map. + """ + fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6)) + fig.suptitle("EKF-SLAM: True map vs estimated map", fontsize=14) + + # Left: true map + ax1.set_title("True map (ground truth)") + ax1.set_aspect("equal") + xy = landmark_list.as_xy_tuples() + ax1.plot( + [p[0] for p in xy], + [p[1] for p in xy], + "ko", + markersize=8, + label="Landmarks", + ) + ax1.plot( + true_state.x_history, + true_state.y_history, + "b-", + linewidth=1.5, + alpha=0.8, + label="True path", + ) + ax1.set_xlabel("X [m]") + ax1.set_ylabel("Y [m]") + ax1.legend(loc="upper right") + ax1.grid(True, alpha=0.3) + + # Right: estimated map + ax2.set_title("Estimated map (EKF-SLAM)") + ax2.set_aspect("equal") + est_lm = localizer.get_estimated_landmarks() + if est_lm: + ax2.plot( + [p[0] for p in est_lm], + [p[1] for p in est_lm], + "rx", + markersize=8, + label="Est. landmarks", + ) + ax2.plot( + est_state.x_history, + est_state.y_history, + "g-", + linewidth=1.5, + alpha=0.8, + label="Est. path", + ) + ax2.set_xlabel("X [m]") + ax2.set_ylabel("Y [m]") + ax2.legend(loc="upper right") + ax2.grid(True, alpha=0.3) + + plt.tight_layout() + plt.show() + + +def main(): + """ + Main process function. + """ + np.random.seed(42) + + # set simulation parameters + x_lim, y_lim = MinMax(-5, 55), MinMax(-20, 25) + spec = VehicleSpecification(area_size=20.0, x_lim=x_lim, y_lim=y_lim) + + center_x, center_y = 25.0, 2.5 + radius_lm = 20.0 + speed = 1.0 + lap_time_sec = (2 * pi * radius_lm / speed) * LAP_OVERSHOOT + time_params = TimeParameters(span_sec=lap_time_sec, interval_sec=INTERVAL_SEC) + vis = GlobalXYVisualizer(x_lim, y_lim, time_params, show_zoom=True) + + # create landmark map + margin = 5.0 + x_min, x_max = x_lim.min_value() + margin, x_lim.max_value() - margin + y_min, y_max = y_lim.min_value() + margin, y_lim.max_value() - margin + landmark_coords = [] + max_attempts = 500 + for _ in range(NUM_LANDMARKS): + for _ in range(max_attempts): + x, y = float(np.random.uniform(x_min, x_max)), float(np.random.uniform(y_min, y_max)) + if all( + (x - lx) ** 2 + (y - ly) ** 2 >= MIN_LANDMARK_SPACING ** 2 + for (lx, ly) in landmark_coords + ): + landmark_coords.append((x, y)) + break + else: + landmark_coords.append((float(np.random.uniform(x_min, x_max)), float(np.random.uniform(y_min, y_max)))) + + landmark_list = LandmarkList() + for x, y in landmark_coords: + landmark_list.add_landmark(Landmark(x, y)) + vis.add_object(landmark_list) + + # create vehicle's state instances + est_state = State( + x_m=center_x + radius_lm, + y_m=center_y, + yaw_rad=-pi / 2, + speed_mps=speed, + color="g", + ) + true_state = State( + x_m=center_x + radius_lm, + y_m=center_y, + yaw_rad=-pi / 2, + speed_mps=speed, + color="b", + ) + + # create controller, sensor, localizer and vehicles + controller = CircularMotionController(radius=radius_lm, speed=speed) + landmark_sensor = LandmarkRangeBearingSensor(SIGMA_R, SIGMA_PHI, MAX_RANGE) + localizer = EKFSLAMVehicleLocalizer( + true_state=true_state, + landmark_list=landmark_list, + landmark_sensor=landmark_sensor, + init_x=center_x + radius_lm, + init_y=center_y, + init_yaw=-pi / 2, + init_speed_mps=speed, + sigma_r=SIGMA_R, + sigma_phi=SIGMA_PHI, + sigma_v=SIGMA_V, + sigma_omega=SIGMA_OMEGA, + gate_threshold=GATE_THRESHOLD, + max_range=MAX_RANGE, + duplicate_position_threshold=MIN_LANDMARK_SPACING, + ) + + true_vehicle = FourWheelsVehicle( + true_state, + spec, + controller=controller, + show_zoom=False, + ) + est_vehicle = FourWheelsVehicle( + est_state, + spec, + controller=controller, + sensors=Sensors(), + localizer=localizer, + show_zoom=True, + ) + vis.add_object(true_vehicle) + vis.add_object(est_vehicle) + vis.add_object(LegendOverlay()) + + if not show_plot: + vis.not_show_plot() + vis.draw() + + # After animation ends, show true map vs estimated map comparison + if show_plot: + draw_final_comparison(landmark_list, true_state, est_state, localizer) + + +if __name__ == "__main__": + main() diff --git a/test/test_ekf_slam.py b/test/test_ekf_slam.py new file mode 100644 index 00000000..a605c4e2 --- /dev/null +++ b/test/test_ekf_slam.py @@ -0,0 +1,97 @@ +""" +Test of EKF-SLAM (Simultaneous Localization and Mapping) Simulation + +Author: Contribution following HOWTOCONTRIBUTE.md +""" + +from pathlib import Path +import sys +import numpy as np + +ekf_slam_path = str(Path(__file__).absolute().parent) + "/../src/simulations/localization/ekf_slam" +sys.path.append(ekf_slam_path) +import ekf_slam +from state import State +from motion_model import jacobian_F, jacobian_G +from observation_model import build_H_matrix + +sys.path.append(str(Path(__file__).absolute().parent) + "/../src/components/landmark") +from landmark import Landmark +from data_association import mahalanobis_distance, nearest_neighbor_association +from landmark_manager import initialize_landmark_from_observation, augment_state + + +def test_simulation(): + ekf_slam.show_plot = False + ekf_slam.main() + + +def test_motion_model_predict(): + robot = np.array([[0.0], [0.0], [0.0], [1.0]]) + u = np.array([[0.0], [0.1]]) + next_state = State.motion_model(robot, u, 0.1) + assert next_state.shape == (4, 1) + assert np.isfinite(next_state).all() + + +def test_motion_model_jacobians(): + robot = np.array([[1.0], [2.0], [0.5], [1.0]]) + u = np.array([[0.5], [0.05]]) + pred = State.motion_model(robot, u, 0.1) + F = jacobian_F(pred, u, 0.1) + G = jacobian_G(pred, 0.1) + assert F.shape == (4, 4) + assert G.shape == (4, 2) + assert np.isfinite(F).all() and np.isfinite(G).all() + + +def test_landmark_predicted_range_bearing(): + robot = np.array([[0.0], [0.0], [0.0]]) + z = Landmark.predicted_range_bearing_at(robot, 3.0, 4.0) + assert z.shape == (2, 1) + assert abs(z[0, 0] - 5.0) < 1e-6 + assert abs(z[1, 0] - np.arctan2(4, 3)) < 1e-6 + + +def test_observation_jacobian(): + robot = np.array([[0.0], [0.0], [0.0]]) + H = build_H_matrix(robot, 3.0, 4.0, 0, 6) + assert H.shape == (2, 6) + assert np.isfinite(H).all() + + +def test_data_association_mahalanobis(): + z1 = np.array([[1.0], [0.0]]) + z2 = np.array([[1.1], [0.05]]) + S = np.eye(2) * 0.01 + d = mahalanobis_distance(z1, z2, S) + assert d >= 0 and np.isfinite(d) + + +def test_data_association_nearest_neighbor(): + obs = [np.array([[5.0], [0.1]])] + pred = [np.array([[5.0], [0.0]])] + S_list = [np.eye(2)] + matches = nearest_neighbor_association(obs, pred, S_list, [0], 3.0) + assert len(matches) == 1 + assert matches[0][1] == 0 + + +def test_landmark_initialization(): + robot = np.array([[0.0], [0.0], [0.0]]) + z = np.array([[5.0], [np.arctan2(4, 3)]]) + lm, G_r, G_l = initialize_landmark_from_observation(robot, z, None, None) + assert lm.shape == (2, 1) + assert G_r.shape == (2, 4) + assert abs(lm[0, 0] - 3.0) < 0.1 and abs(lm[1, 0] - 4.0) < 0.1 + + +def test_augment_state(): + mu = np.array([[0.0], [0.0], [0.0], [0.0]]) + Sigma = np.eye(4) * 0.1 + z = np.array([[5.0], [0.0]]) + R = np.diag([0.1, 0.01]) + mu_new, Sigma_new = augment_state(mu, Sigma, z, mu[0:3], R) + assert mu_new.shape[0] == 6 + assert Sigma_new.shape == (6, 6) + assert np.allclose(Sigma_new, Sigma_new.T) diff --git a/test/test_landmark.py b/test/test_landmark.py new file mode 100644 index 00000000..0aaebc91 --- /dev/null +++ b/test/test_landmark.py @@ -0,0 +1,34 @@ +"""Tests for Landmark and LandmarkList.""" + +from pathlib import Path +import sys + +import numpy as np + +_components = str(Path(__file__).absolute().parent) + "/../src/components/landmark" +sys.path.append(_components) + +from landmark import Landmark +from landmark_list import LandmarkList + + +def test_landmark_predicted_range_bearing_at(): + robot = np.array([[0.0], [0.0], [0.0]]) + z = Landmark.predicted_range_bearing_at(robot, 3.0, 4.0) + assert abs(z[0, 0] - 5.0) < 1e-6 + assert abs(z[1, 0] - np.arctan2(4, 3)) < 1e-6 + + +def test_landmark_instance_method(): + robot = np.array([[0.0], [0.0], [0.0]]) + lm = Landmark(3.0, 4.0) + z = lm.predicted_range_bearing(robot) + assert np.allclose(z, Landmark.predicted_range_bearing_at(robot, 3.0, 4.0)) + + +def test_landmark_list_as_xy_tuples(): + ll = LandmarkList() + ll.add_landmark(Landmark(1.0, 2.0)) + ll.add_landmark(Landmark(3.0, 4.0)) + assert ll.as_xy_tuples() == [(1.0, 2.0), (3.0, 4.0)] + assert len(ll) == 2 diff --git a/test/test_landmark_range_bearing_sensor.py b/test/test_landmark_range_bearing_sensor.py new file mode 100644 index 00000000..0edc7c86 --- /dev/null +++ b/test/test_landmark_range_bearing_sensor.py @@ -0,0 +1,48 @@ +""" +Tests for LandmarkRangeBearingSensor (range–bearing landmark observation). +""" + +from pathlib import Path +import sys + +import numpy as np + +_components = str(Path(__file__).absolute().parent) + "/../src/components" +sys.path.append(_components + "/localization/ekf_slam") +sys.path.append(_components + "/sensors/landmark_range_bearing") +sys.path.append(_components + "/landmark") + +from landmark import Landmark +from landmark_list import LandmarkList +from landmark_range_bearing_sensor import LandmarkRangeBearingSensor + + +def test_observation_model_matches_landmark(): + robot = np.array([[0.0], [0.0], [0.0], [1.0]]) + lm = Landmark(3.0, 4.0) + z_sensor = LandmarkRangeBearingSensor.observation_model(robot, lm) + z_lm = lm.predicted_range_bearing(robot) + assert np.allclose(z_sensor, z_lm) + + +def test_observe_visible_landmarks_in_range(): + np.random.seed(0) + robot = np.array([[0.0], [0.0], [0.0], [0.0]]) + ll = LandmarkList() + ll.add_landmark(Landmark(5.0, 0.0)) + ll.add_landmark(Landmark(1000.0, 0.0)) + sensor_short = LandmarkRangeBearingSensor( + sigma_r=0.01, sigma_phi=0.01, max_range_m=10.0 + ) + obs = sensor_short.observe_visible_landmarks(robot, ll) + assert len(obs) == 1 + assert obs[0].shape == (2, 1) + assert abs(obs[0][0, 0] - 5.0) < 0.5 + + +def test_observe_noisy_shape(): + sensor = LandmarkRangeBearingSensor(sigma_r=0.2, sigma_phi=0.1, max_range_m=15.0) + robot = np.array([[1.0], [2.0], [0.5], [1.0]]) + z = sensor.observe_noisy(robot, Landmark(4.0, 6.0)) + assert z.shape == (2, 1) + assert np.isfinite(z).all()