diff --git a/PathPlanning/DStarLite/d_star_lite.py b/PathPlanning/DStarLite/d_star_lite.py index 1a44d84fa5..43ee8924ae 100644 --- a/PathPlanning/DStarLite/d_star_lite.py +++ b/PathPlanning/DStarLite/d_star_lite.py @@ -1,14 +1,17 @@ """ D* Lite grid planning -author: vss2sn (28676655+vss2sn@users.noreply.github.com) +author: Taha Zahid (@TahaZahid05) +Original author: vss2sn + Link to papers: D* Lite (Link: http://idm-lab.org/bib/abstracts/papers/aaai02b.pdf) Improved Fast Replanning for Robot Navigation in Unknown Terrain (Link: http://www.cs.cmu.edu/~maxim/files/dlite_icra02.pdf) -Implemented maintaining similarity with the pseudocode for understanding. -Code can be significantly optimized by using a priority queue for U, etc. -Avoiding additional imports based on repository philosophy. + +Optimized using heapq (priority queue) with lazy deletion for O(log n) +priority queue operations. """ +import heapq import math import matplotlib.pyplot as plt import random @@ -25,6 +28,17 @@ def __init__(self, x: int = 0, y: int = 0, cost: float = 0.0): self.y = y self.cost = cost + def __lt__(self, other): + return False + + def __eq__(self, other): + if not isinstance(other, Node): + return False + return self.x == other.x and self.y == other.y + + def __hash__(self): + return hash((self.x, self.y)) + def add_coordinates(node1: Node, node2: Node): new_node = Node() @@ -66,13 +80,14 @@ def __init__(self, ox: list, oy: list): self.obstacles_xy = {(obstacle.x, obstacle.y) for obstacle in self.obstacles} self.start = Node(0, 0) self.goal = Node(0, 0) - self.U = list() # type: ignore + # Priority queue implemented with heapq for O(log n) operations + self.U: list = [] # Min-heap for open set + self.entry_finder: dict = {} # Maps nodes to heap entries for O(1) lookup + self.counter = 0 # Unique sequence count for tie-breaking self.km = 0.0 - self.kold = 0.0 self.rhs = self.create_grid(float("inf")) self.g = self.create_grid(float("inf")) self.detected_obstacles_xy: set[tuple[int, int]] = set() - self.xy = np.empty((0, 2)) if show_animation: self.detected_obstacles_for_plotting_x = list() # type: ignore self.detected_obstacles_for_plotting_y = list() # type: ignore @@ -110,9 +125,9 @@ def h(self, s: Node): # return max(abs(self.start.x - s.x), abs(self.start.y - s.y)) return 1 - def calculate_key(self, s: Node): - return (min(self.g[s.x][s.y], self.rhs[s.x][s.y]) + self.h(s) - + self.km, min(self.g[s.x][s.y], self.rhs[s.x][s.y])) + def calculate_key(self, s: Node) -> list[float]: + return [min(self.g[s.x][s.y], self.rhs[s.x][s.y]) + self.h(s) + self.km, + min(self.g[s.x][s.y], self.rhs[s.x][s.y])] def is_valid(self, node: Node): if 0 <= node.x < self.x_max and 0 <= node.y < self.y_max: @@ -131,6 +146,99 @@ def succ(self, u: Node): # Grid, so each vertex is connected to the ones around it return self.get_neighbours(u) + def push(self, task: Node, priority: list): + """ + Add a new node to priority queue or update its priority. + Uses lazy deletion pattern for efficient priority updates. + + Args: + task: Node to add/update + priority: Priority key [f-value, g-value] + """ + if task in self.entry_finder: + self.remove(task) + count = self.counter + self.counter += 1 + entry = [priority, count, task] + self.entry_finder[task] = entry + heapq.heappush(self.U, entry) + + def remove(self, task: Node): + """ + Mark an existing task as removed (lazy deletion). + The actual removal from heap happens during pop/peek operations. + + Args: + task: Node to mark as removed + """ + entry = self.entry_finder.pop(task) + entry[-1] = None # Mark as removed + + def pop(self): + """ + Remove and return the lowest priority task. + Skips over entries marked as removed (lazy deletion). + + Returns: + tuple: (task Node, priority list) + + Raises: + KeyError: If heap is empty + """ + while self.U: + priority, count, task = heapq.heappop(self.U) + if task is not None: + del self.entry_finder[task] + return task, priority + raise KeyError("empty heap") + + def contains(self, task: Node): + """ + Check if a node is in the priority queue. + + Args: + task: Node to check + + Returns: + bool: True if node is in queue + """ + return task in self.entry_finder + + def peek(self): + """ + Return the lowest priority task without removing it. + Cleans up entries marked as removed. + + Returns: + tuple: (task Node or None, priority list) + """ + if not self.U: + return None, [float('inf'), float('inf')] + + while self.U: + entry = self.U[0] + priority, count, task = entry + + if task is not None: + return task, priority + + heapq.heappop(self.U) # Remove invalid entries + + return None, [float('inf'), float('inf')] + + def key_less_than(self, k1: list, k2: list): + """ + Lexicographical comparison of priority keys. + + Args: + k1: First key [f-value, g-value] + k2: Second key [f-value, g-value] + + Returns: + bool: True if k1 < k2 lexicographically + """ + return k1[0] < k2[0] or (k1[0] == k2[0] and k1[1] < k2[1]) + def initialize(self, start: Node, goal: Node): self.start.x = start.x - self.x_min_world self.start.y = start.y - self.y_min_world @@ -139,12 +247,14 @@ def initialize(self, start: Node, goal: Node): if not self.initialized: self.initialized = True print('Initializing') - self.U = list() # Would normally be a priority queue + self.U = [] + self.entry_finder.clear() + self.counter = 0 self.km = 0.0 self.rhs = self.create_grid(math.inf) self.g = self.create_grid(math.inf) self.rhs[self.goal.x][self.goal.y] = 0 - self.U.append((self.goal, self.calculate_key(self.goal))) + self.push(self.goal, self.calculate_key(self.goal)) self.detected_obstacles_xy = set() def update_vertex(self, u: Node): @@ -152,48 +262,81 @@ def update_vertex(self, u: Node): self.rhs[u.x][u.y] = min([self.c(u, sprime) + self.g[sprime.x][sprime.y] for sprime in self.succ(u)]) - if any([compare_coordinates(u, node) for node, key in self.U]): - self.U = [(node, key) for node, key in self.U - if not compare_coordinates(node, u)] - self.U.sort(key=lambda x: x[1]) if self.g[u.x][u.y] != self.rhs[u.x][u.y]: - self.U.append((u, self.calculate_key(u))) - self.U.sort(key=lambda x: x[1]) - - def compare_keys(self, key_pair1: tuple[float, float], - key_pair2: tuple[float, float]): - return key_pair1[0] < key_pair2[0] or \ - (key_pair1[0] == key_pair2[0] and key_pair1[1] < key_pair2[1]) + if self.contains(u): + self.remove(u) + self.push(u, self.calculate_key(u)) + elif self.g[u.x][u.y] == self.rhs[u.x][u.y] and self.contains(u): + self.remove(u) def compute_shortest_path(self): - self.U.sort(key=lambda x: x[1]) - has_elements = len(self.U) > 0 - start_key_not_updated = self.compare_keys( - self.U[0][1], self.calculate_key(self.start) - ) - rhs_not_equal_to_g = self.rhs[self.start.x][self.start.y] != \ - self.g[self.start.x][self.start.y] - while has_elements and start_key_not_updated or rhs_not_equal_to_g: - self.kold = self.U[0][1] - u = self.U[0][0] - self.U.pop(0) - if self.compare_keys(self.kold, self.calculate_key(u)): - self.U.append((u, self.calculate_key(u))) - self.U.sort(key=lambda x: x[1]) - elif (self.g[u.x, u.y] > self.rhs[u.x, u.y]).any(): - self.g[u.x, u.y] = self.rhs[u.x, u.y] - for s in self.pred(u): - self.update_vertex(s) + while True: + task, k_old = self.peek() + if task is None: + break + + k_start = self.calculate_key(self.start) + + # Stop condition: Start is consistent AND top of heap >= Start Key + if (not self.key_less_than(k_old, k_start) and + self.rhs[self.start.x][self.start.y] == self.g[self.start.x][self.start.y]): + break + + u, k_old = self.pop() + k_new = self.calculate_key(u) + + if self.key_less_than(k_old, k_new): + # Node priority has improved, re-insert + self.push(u, k_new) + + elif self.g[u.x][u.y] > self.rhs[u.x][u.y]: + # Overconsistent (path found/improved): Propagate cost to neighbors + self.g[u.x][u.y] = self.rhs[u.x][u.y] + + neighbors_lst = self.pred(u) + + for curr in neighbors_lst: + if curr != self.goal: + edge_cost = self.c(curr, u) + self.rhs[curr.x][curr.y] = min(self.rhs[curr.x][curr.y], + edge_cost + self.g[u.x][u.y]) + self.update_vertex(curr) else: - self.g[u.x, u.y] = math.inf - for s in self.pred(u) + [u]: - self.update_vertex(s) - self.U.sort(key=lambda x: x[1]) - start_key_not_updated = self.compare_keys( - self.U[0][1], self.calculate_key(self.start) - ) - rhs_not_equal_to_g = self.rhs[self.start.x][self.start.y] != \ - self.g[self.start.x][self.start.y] + # Underconsistent (obstacle detected): Reset g to infinity and re-evaluate neighbors + g_old = self.g[u.x][u.y] + self.g[u.x][u.y] = math.inf + + neighbors_lst = self.pred(u) + + for curr in (neighbors_lst + [u]): + if curr == u: + # When curr is u itself, recalculate rhs for u + if curr != self.goal: + temp_rhs = float('inf') + + curr_neighbors_lst = self.succ(curr) + + for j in curr_neighbors_lst: + edge_cost = self.c(curr, j) + temp_rhs = min(temp_rhs, (edge_cost + self.g[j.x][j.y])) + + self.rhs[curr.x][curr.y] = temp_rhs + else: + # For neighbors of u, check if they need rhs recalculation + edge_cost = self.c(curr, u) + if self.rhs[curr.x][curr.y] == (edge_cost + g_old): + if curr != self.goal: + temp_rhs = float('inf') + + curr_neighbors_lst = self.succ(curr) + + for j in curr_neighbors_lst: + edge_cost = self.c(curr, j) + temp_rhs = min(temp_rhs, (edge_cost + self.g[j.x][j.y])) + + self.rhs[curr.x][curr.y] = temp_rhs + + self.update_vertex(curr) def detect_changes(self): changed_vertices = list() diff --git a/tests/test_d_star_lite.py b/tests/test_d_star_lite.py index b60a140a89..ad3b2073fd 100644 --- a/tests/test_d_star_lite.py +++ b/tests/test_d_star_lite.py @@ -3,9 +3,62 @@ def test_1(): + """Test D* Lite with default configuration""" m.show_animation = False m.main() +def test_path_found(): + """Test that D* Lite successfully finds a path""" + m.show_animation = False + + # Start and goal position + sx = 10 + sy = 10 + gx = 50 + gy = 50 + + ox, oy = [], [] + for i in range(-10, 60): + ox.append(i) + oy.append(-10.0) + for i in range(-10, 60): + ox.append(60.0) + oy.append(i) + for i in range(-10, 61): + ox.append(i) + oy.append(60.0) + for i in range(-10, 61): + ox.append(-10.0) + oy.append(i) + for i in range(-10, 40): + ox.append(20.0) + oy.append(i) + for i in range(0, 40): + ox.append(40.0) + oy.append(60.0 - i) + + spoofed_ox = [[], [], [], + [i for i in range(0, 21)] + [0 for _ in range(0, 20)]] + spoofed_oy = [[], [], [], + [20 for _ in range(0, 21)] + [i for i in range(0, 20)]] + + dstarlite = m.DStarLite(ox, oy) + path_found, pathx, pathy = dstarlite.main( + m.Node(x=sx, y=sy), + m.Node(x=gx, y=gy), + spoofed_ox=spoofed_ox, + spoofed_oy=spoofed_oy + ) + + assert path_found, "D* Lite should find a path" + assert len(pathx) > 0, "Path should contain points" + assert len(pathy) > 0, "Path should contain points" + assert pathx[0] == sx, "Path should start at start position" + assert pathy[0] == sy, "Path should start at start position" + assert pathx[-1] == gx, "Path should end at goal position" + assert pathy[-1] == gy, "Path should end at goal position" + + if __name__ == '__main__': conftest.run_this_test(__file__)