Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 193 additions & 50 deletions PathPlanning/DStarLite/d_star_lite.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
"""
D* Lite grid planning
author: vss2sn (28676655+vss2sn@users.noreply.github.com)
author: Taha Zahid (@TahaZahid05)
Original author: vss2sn

Link to papers:
D* Lite (Link: http://idm-lab.org/bib/abstracts/papers/aaai02b.pdf)
Improved Fast Replanning for Robot Navigation in Unknown Terrain
(Link: http://www.cs.cmu.edu/~maxim/files/dlite_icra02.pdf)
Implemented maintaining similarity with the pseudocode for understanding.
Code can be significantly optimized by using a priority queue for U, etc.
Avoiding additional imports based on repository philosophy.

Optimized using heapq (priority queue) with lazy deletion for O(log n)
priority queue operations.
"""
import heapq
import math
import matplotlib.pyplot as plt
import random
Expand All @@ -25,6 +28,17 @@ def __init__(self, x: int = 0, y: int = 0, cost: float = 0.0):
self.y = y
self.cost = cost

def __lt__(self, other):
return False

def __eq__(self, other):
if not isinstance(other, Node):
return False
return self.x == other.x and self.y == other.y

def __hash__(self):
return hash((self.x, self.y))


def add_coordinates(node1: Node, node2: Node):
new_node = Node()
Expand Down Expand Up @@ -66,13 +80,14 @@ def __init__(self, ox: list, oy: list):
self.obstacles_xy = {(obstacle.x, obstacle.y) for obstacle in self.obstacles}
self.start = Node(0, 0)
self.goal = Node(0, 0)
self.U = list() # type: ignore
# Priority queue implemented with heapq for O(log n) operations
self.U: list = [] # Min-heap for open set
self.entry_finder: dict = {} # Maps nodes to heap entries for O(1) lookup
self.counter = 0 # Unique sequence count for tie-breaking
self.km = 0.0
self.kold = 0.0
self.rhs = self.create_grid(float("inf"))
self.g = self.create_grid(float("inf"))
self.detected_obstacles_xy: set[tuple[int, int]] = set()
self.xy = np.empty((0, 2))
if show_animation:
self.detected_obstacles_for_plotting_x = list() # type: ignore
self.detected_obstacles_for_plotting_y = list() # type: ignore
Expand Down Expand Up @@ -110,9 +125,9 @@ def h(self, s: Node):
# return max(abs(self.start.x - s.x), abs(self.start.y - s.y))
return 1

def calculate_key(self, s: Node):
return (min(self.g[s.x][s.y], self.rhs[s.x][s.y]) + self.h(s)
+ self.km, min(self.g[s.x][s.y], self.rhs[s.x][s.y]))
def calculate_key(self, s: Node) -> list[float]:
return [min(self.g[s.x][s.y], self.rhs[s.x][s.y]) + self.h(s) + self.km,
min(self.g[s.x][s.y], self.rhs[s.x][s.y])]

def is_valid(self, node: Node):
if 0 <= node.x < self.x_max and 0 <= node.y < self.y_max:
Expand All @@ -131,6 +146,99 @@ def succ(self, u: Node):
# Grid, so each vertex is connected to the ones around it
return self.get_neighbours(u)

def push(self, task: Node, priority: list):
"""
Add a new node to priority queue or update its priority.
Uses lazy deletion pattern for efficient priority updates.

Args:
task: Node to add/update
priority: Priority key [f-value, g-value]
"""
if task in self.entry_finder:
self.remove(task)
count = self.counter
self.counter += 1
entry = [priority, count, task]
self.entry_finder[task] = entry
heapq.heappush(self.U, entry)

def remove(self, task: Node):
"""
Mark an existing task as removed (lazy deletion).
The actual removal from heap happens during pop/peek operations.

Args:
task: Node to mark as removed
"""
entry = self.entry_finder.pop(task)
entry[-1] = None # Mark as removed

def pop(self):
"""
Remove and return the lowest priority task.
Skips over entries marked as removed (lazy deletion).

Returns:
tuple: (task Node, priority list)

Raises:
KeyError: If heap is empty
"""
while self.U:
priority, count, task = heapq.heappop(self.U)
if task is not None:
del self.entry_finder[task]
return task, priority
raise KeyError("empty heap")

def contains(self, task: Node):
"""
Check if a node is in the priority queue.

Args:
task: Node to check

Returns:
bool: True if node is in queue
"""
return task in self.entry_finder

def peek(self):
"""
Return the lowest priority task without removing it.
Cleans up entries marked as removed.

Returns:
tuple: (task Node or None, priority list)
"""
if not self.U:
return None, [float('inf'), float('inf')]

while self.U:
entry = self.U[0]
priority, count, task = entry

if task is not None:
return task, priority

heapq.heappop(self.U) # Remove invalid entries

return None, [float('inf'), float('inf')]

def key_less_than(self, k1: list, k2: list):
"""
Lexicographical comparison of priority keys.

Args:
k1: First key [f-value, g-value]
k2: Second key [f-value, g-value]

Returns:
bool: True if k1 < k2 lexicographically
"""
return k1[0] < k2[0] or (k1[0] == k2[0] and k1[1] < k2[1])

def initialize(self, start: Node, goal: Node):
self.start.x = start.x - self.x_min_world
self.start.y = start.y - self.y_min_world
Expand All @@ -139,61 +247,96 @@ def initialize(self, start: Node, goal: Node):
if not self.initialized:
self.initialized = True
print('Initializing')
self.U = list() # Would normally be a priority queue
self.U = []
self.entry_finder.clear()
self.counter = 0
self.km = 0.0
self.rhs = self.create_grid(math.inf)
self.g = self.create_grid(math.inf)
self.rhs[self.goal.x][self.goal.y] = 0
self.U.append((self.goal, self.calculate_key(self.goal)))
self.push(self.goal, self.calculate_key(self.goal))
self.detected_obstacles_xy = set()

def update_vertex(self, u: Node):
if not compare_coordinates(u, self.goal):
self.rhs[u.x][u.y] = min([self.c(u, sprime) +
self.g[sprime.x][sprime.y]
for sprime in self.succ(u)])
if any([compare_coordinates(u, node) for node, key in self.U]):
self.U = [(node, key) for node, key in self.U
if not compare_coordinates(node, u)]
self.U.sort(key=lambda x: x[1])
if self.g[u.x][u.y] != self.rhs[u.x][u.y]:
self.U.append((u, self.calculate_key(u)))
self.U.sort(key=lambda x: x[1])

def compare_keys(self, key_pair1: tuple[float, float],
key_pair2: tuple[float, float]):
return key_pair1[0] < key_pair2[0] or \
(key_pair1[0] == key_pair2[0] and key_pair1[1] < key_pair2[1])
if self.contains(u):
self.remove(u)
self.push(u, self.calculate_key(u))
elif self.g[u.x][u.y] == self.rhs[u.x][u.y] and self.contains(u):
self.remove(u)

def compute_shortest_path(self):
self.U.sort(key=lambda x: x[1])
has_elements = len(self.U) > 0
start_key_not_updated = self.compare_keys(
self.U[0][1], self.calculate_key(self.start)
)
rhs_not_equal_to_g = self.rhs[self.start.x][self.start.y] != \
self.g[self.start.x][self.start.y]
while has_elements and start_key_not_updated or rhs_not_equal_to_g:
self.kold = self.U[0][1]
u = self.U[0][0]
self.U.pop(0)
if self.compare_keys(self.kold, self.calculate_key(u)):
self.U.append((u, self.calculate_key(u)))
self.U.sort(key=lambda x: x[1])
elif (self.g[u.x, u.y] > self.rhs[u.x, u.y]).any():
self.g[u.x, u.y] = self.rhs[u.x, u.y]
for s in self.pred(u):
self.update_vertex(s)
while True:
task, k_old = self.peek()
if task is None:
break

k_start = self.calculate_key(self.start)

# Stop condition: Start is consistent AND top of heap >= Start Key
if (not self.key_less_than(k_old, k_start) and
self.rhs[self.start.x][self.start.y] == self.g[self.start.x][self.start.y]):
break

u, k_old = self.pop()
k_new = self.calculate_key(u)

if self.key_less_than(k_old, k_new):
# Node priority has improved, re-insert
self.push(u, k_new)

elif self.g[u.x][u.y] > self.rhs[u.x][u.y]:
# Overconsistent (path found/improved): Propagate cost to neighbors
self.g[u.x][u.y] = self.rhs[u.x][u.y]

neighbors_lst = self.pred(u)

for curr in neighbors_lst:
if curr != self.goal:
edge_cost = self.c(curr, u)
self.rhs[curr.x][curr.y] = min(self.rhs[curr.x][curr.y],
edge_cost + self.g[u.x][u.y])
self.update_vertex(curr)
else:
self.g[u.x, u.y] = math.inf
for s in self.pred(u) + [u]:
self.update_vertex(s)
self.U.sort(key=lambda x: x[1])
start_key_not_updated = self.compare_keys(
self.U[0][1], self.calculate_key(self.start)
)
rhs_not_equal_to_g = self.rhs[self.start.x][self.start.y] != \
self.g[self.start.x][self.start.y]
# Underconsistent (obstacle detected): Reset g to infinity and re-evaluate neighbors
g_old = self.g[u.x][u.y]
self.g[u.x][u.y] = math.inf

neighbors_lst = self.pred(u)

for curr in (neighbors_lst + [u]):
if curr == u:
# When curr is u itself, recalculate rhs for u
if curr != self.goal:
temp_rhs = float('inf')

curr_neighbors_lst = self.succ(curr)

for j in curr_neighbors_lst:
edge_cost = self.c(curr, j)
temp_rhs = min(temp_rhs, (edge_cost + self.g[j.x][j.y]))

self.rhs[curr.x][curr.y] = temp_rhs
else:
# For neighbors of u, check if they need rhs recalculation
edge_cost = self.c(curr, u)
if self.rhs[curr.x][curr.y] == (edge_cost + g_old):
if curr != self.goal:
temp_rhs = float('inf')

curr_neighbors_lst = self.succ(curr)

for j in curr_neighbors_lst:
edge_cost = self.c(curr, j)
temp_rhs = min(temp_rhs, (edge_cost + self.g[j.x][j.y]))

self.rhs[curr.x][curr.y] = temp_rhs

self.update_vertex(curr)

def detect_changes(self):
changed_vertices = list()
Expand Down
53 changes: 53 additions & 0 deletions tests/test_d_star_lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,62 @@


def test_1():
"""Test D* Lite with default configuration"""
m.show_animation = False
m.main()


def test_path_found():
"""Test that D* Lite successfully finds a path"""
m.show_animation = False

# Start and goal position
sx = 10
sy = 10
gx = 50
gy = 50

ox, oy = [], []
for i in range(-10, 60):
ox.append(i)
oy.append(-10.0)
for i in range(-10, 60):
ox.append(60.0)
oy.append(i)
for i in range(-10, 61):
ox.append(i)
oy.append(60.0)
for i in range(-10, 61):
ox.append(-10.0)
oy.append(i)
for i in range(-10, 40):
ox.append(20.0)
oy.append(i)
for i in range(0, 40):
ox.append(40.0)
oy.append(60.0 - i)

spoofed_ox = [[], [], [],
[i for i in range(0, 21)] + [0 for _ in range(0, 20)]]
spoofed_oy = [[], [], [],
[20 for _ in range(0, 21)] + [i for i in range(0, 20)]]

dstarlite = m.DStarLite(ox, oy)
path_found, pathx, pathy = dstarlite.main(
m.Node(x=sx, y=sy),
m.Node(x=gx, y=gy),
spoofed_ox=spoofed_ox,
spoofed_oy=spoofed_oy
)

assert path_found, "D* Lite should find a path"
assert len(pathx) > 0, "Path should contain points"
assert len(pathy) > 0, "Path should contain points"
assert pathx[0] == sx, "Path should start at start position"
assert pathy[0] == sy, "Path should start at start position"
assert pathx[-1] == gx, "Path should end at goal position"
assert pathy[-1] == gy, "Path should end at goal position"


if __name__ == '__main__':
conftest.run_this_test(__file__)