Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions embodichain/lab/sim/planners/base_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import numpy as np
from abc import ABC, abstractmethod
from typing import Dict, List, Tuple, Union
import torch
import matplotlib.pyplot as plt

from embodichain.lab.sim.planners.utils import TrajectorySampleMethod
from embodichain.utils import logger
from embodichain.lab.sim.planners.utils import PlanState


class BasePlanner(ABC):
Expand All @@ -34,22 +36,23 @@ class BasePlanner(ABC):
max_constraints: Dictionary containing 'velocity' and 'acceleration' constraints
"""

def __init__(self, dofs: int, max_constraints: Dict[str, List[float]]):
self.dofs = dofs
self.max_constraints = max_constraints
def __init__(self, **kwargs):
self.dofs = kwargs.get("dofs", None)
self.max_constraints = kwargs.get("max_constraints", None)
Comment on lines 28 to +41
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class docstring for BasePlanner (and its Args: section) still documents dofs/max_constraints as explicit constructor parameters, but __init__ now takes **kwargs. Please update the documentation (or restore explicit parameters) to avoid confusing subclasses/callers.

Copilot uses AI. Check for mistakes.
self.device = kwargs.get("device", torch.device("cpu"))

Comment on lines +39 to 43
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BasePlanner.__init__ now silently accepts missing dofs / max_constraints (setting them to None), which will later crash in places like constraint checks or waypoint validation. Since these are required for any planner, consider making them explicit parameters again or validating kwargs and raising via logger.log_error when required values are absent.

Suggested change
def __init__(self, **kwargs):
self.dofs = kwargs.get("dofs", None)
self.max_constraints = kwargs.get("max_constraints", None)
def __init__(self, **kwargs):
# Required configuration parameters for any planner
self.dofs = kwargs.get("dofs", None)
self.max_constraints = kwargs.get("max_constraints", None)
missing_args = []
if self.dofs is None:
missing_args.append("dofs")
if self.max_constraints is None:
missing_args.append("max_constraints")
if missing_args:
logger.log_error(
f"Missing required planner initialization argument(s): {', '.join(missing_args)}",
ValueError,
)

Copilot uses AI. Check for mistakes.
@abstractmethod
def plan(
self,
current_state: Dict,
target_states: List[Dict],
current_state: PlanState,
target_states: list[PlanState],
**kwargs,
Comment on lines 45 to 49
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan signature has been updated to use PlanState, but the docstring (Args section) still describes current_state / target_states as dictionaries. Please update the docstring to match the new PlanState-based API so callers know what fields are expected.

Copilot uses AI. Check for mistakes.
Comment on lines 45 to 49
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan() docstring still describes current_state / target_states as dictionaries, but the signature now uses PlanState and the return types are torch tensors. Please update the docstring to match the new API to avoid misleading callers.

Copilot uses AI. Check for mistakes.
) -> Tuple[
bool,
np.ndarray | None,
np.ndarray | None,
np.ndarray | None,
np.ndarray | None,
torch.Tensor | None,
torch.Tensor | None,
torch.Tensor | None,
torch.Tensor | None,
float,
]:
r"""Execute trajectory planning.
Expand All @@ -64,10 +67,10 @@ def plan(
Returns:
Tuple of (success, positions, velocities, accelerations, times, duration):
- success: bool, whether planning succeeded
- positions: np.ndarray (N, DOF), joint positions along trajectory
- velocities: np.ndarray (N, DOF), joint velocities along trajectory
- accelerations: np.ndarray (N, DOF), joint accelerations along trajectory
- times: np.ndarray (N,), time stamps for each point
- positions: torch.Tensor (N, DOF), joint positions along trajectory
- velocities: torch.Tensor (N, DOF), joint velocities along trajectory
- accelerations: torch.Tensor (N, DOF), joint accelerations along trajectory
- times: torch.Tensor (N,), time stamps for each point
- duration: float, total trajectory duration
"""
logger.log_error("Subclasses must implement plan() method", NotImplementedError)
Expand Down
140 changes: 55 additions & 85 deletions embodichain/lab/sim/planners/motion_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,7 @@
from embodichain.lab.sim.planners.utils import TrajectorySampleMethod
from embodichain.lab.sim.objects.robot import Robot
from embodichain.utils import logger


class PlannerType(Enum):
r"""Enumeration for different planner types."""

TOPPRA = "toppra"
"""TOPPRA planner for time-optimal trajectory planning."""
from embodichain.lab.sim.planners.utils import PlanState, MoveType, MovePart


class MotionGenerator:
Expand All @@ -51,12 +45,23 @@ class MotionGenerator:
**kwargs: Additional arguments passed to planner initialization
"""

_support_planner_dict = {
"toppra": ToppraPlanner,
}

@classmethod
def register_planner_type(cls, name: str, planner_class):
"""
Register a new planner type.
"""
cls._support_planner_dict[name] = planner_class

def __init__(
self,
robot: Robot,
uid: str,
sim=None,
planner_type: Union[str, PlannerType] = "toppra",
planner_type: str = "toppra",
default_velocity: float = 0.2,
default_acceleration: float = 0.5,
collision_margin: float = 0.01,
Expand All @@ -65,45 +70,16 @@ def __init__(
self.robot = robot
self.sim = sim
self.collision_margin = collision_margin
self.uid = uid
self.uid = uid # control part

# Get robot DOF using get_joint_ids for specified control part (None for whole body)
self.dof = len(robot.get_joint_ids(uid))

# Create planner based on planner_type
self.planner_type = self._parse_planner_type(planner_type)
self.planner = self._create_planner(
self.planner_type, default_velocity, default_acceleration, **kwargs
planner_type, default_velocity, default_acceleration, **kwargs
)
Comment on lines 59 to 81
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MotionGenerator used to accept planner types case-insensitively (and via PlannerType enum). After removing _parse_planner_type, planner_type must exactly match a key in _support_planner_dict, so values like "TOPPRA" will now fail. Please normalize (e.g., planner_type = planner_type.lower()) and keep the previous validation/warning behavior to avoid an API regression.

Copilot uses AI. Check for mistakes.

def _parse_planner_type(self, planner_type: Union[str, PlannerType]) -> str:
r"""Parse planner type from string or enum.

Args:
planner_type: Planner type as string or PlannerType enum

Returns:
Planner type as string
"""
if isinstance(planner_type, PlannerType):
return planner_type.value
elif isinstance(planner_type, str):
planner_type_lower = planner_type.lower()
# Validate planner type
valid_types = [e.value for e in PlannerType]
if planner_type_lower not in valid_types:
logger.log_warning(
f"Unknown planner type '{planner_type}', using 'toppra'. "
f"Valid types: {valid_types}"
)
return "toppra"
return planner_type_lower
else:
logger.log_error(
f"planner_type must be str or PlannerType, got {type(planner_type)}",
TypeError,
)

def _create_planner(
self,
planner_type: str,
Expand All @@ -123,18 +99,20 @@ def _create_planner(
Planner instance
"""
# Get constraints from robot or use defaults
max_constraints = self._get_constraints(
default_velocity, default_acceleration, **kwargs
)

if planner_type == "toppra":
return ToppraPlanner(self.dof, max_constraints)
else:
planner_class = self._support_planner_dict.get(planner_type, None)
if planner_class is None:
logger.log_error(
f"Unknown planner type '{planner_type}'. "
f"Supported types: {[e.value for e in PlannerType]}",
f"Unsupported planner type '{planner_type}'. "
f"Supported types: {[e for e in self._support_planner_dict.keys()]}",
ValueError,
)
cfg = kwargs.copy()
cfg["dofs"] = self.dof
cfg["max_constraints"] = self._get_constraints(
default_velocity, default_acceleration, **kwargs
)
cfg["robot"] = self.robot
return planner_class(**cfg)

def _get_constraints(
self, default_velocity: float, default_acceleration: float, **kwargs
Expand Down Expand Up @@ -200,17 +178,17 @@ def _create_state_dict(

def plan(
self,
current_state: Dict,
target_states: List[Dict],
current_state: PlanState,
target_states: List[PlanState],
sample_method: TrajectorySampleMethod = TrajectorySampleMethod.TIME,
sample_interval: Union[float, int] = 0.01,
**kwargs,
) -> Tuple[
bool,
np.ndarray | None,
np.ndarray | None,
np.ndarray | None,
np.ndarray | None,
torch.Tensor | None,
torch.Tensor | None,
torch.Tensor | None,
torch.Tensor | None,
float,
]:
r"""Plan trajectory without collision checking.
Expand All @@ -219,40 +197,21 @@ def plan(
velocity and acceleration constraints, but does not check for collisions.

Args:
current_state: Dictionary containing current state:
- "position": Current joint positions (required)
- "velocity": Current joint velocities (optional, defaults to zeros)
- "acceleration": Current joint accelerations (optional, defaults to zeros)
target_states: List of target state dictionaries, each with same format as current_state
current_state: PlanState
target_states: List of PlanState
sample_method: Sampling method (TIME or QUANTITY)
sample_interval: Sampling interval (time in seconds for TIME method, or number of points for QUANTITY)
**kwargs: Additional arguments

Returns:
Tuple of (success, positions, velocities, accelerations, times, duration):
- success: bool, whether planning succeeded
- positions: np.ndarray (N, DOF), joint positions along trajectory
- velocities: np.ndarray (N, DOF), joint velocities along trajectory
- accelerations: np.ndarray (N, DOF), joint accelerations along trajectory
- times: np.ndarray (N,), time stamps for each point
- positions: torch.Tensor (N, DOF), joint positions along trajectory
- velocities: torch.Tensor (N, DOF), joint velocities along trajectory
- accelerations: torch.Tensor (N, DOF), joint accelerations along trajectory
- times: torch.Tensor (N,), time stamps for each point
- duration: float, total trajectory duration
"""
# Validate inputs
if len(current_state["position"]) != self.dof:
logger.log_warning(
f"Current state position dimension {len(current_state['position'])} "
f"does not match robot DOF {self.dof}"
)
return False, None, None, None, None, 0.0

for i, target in enumerate(target_states):
if len(target["position"]) != self.dof:
logger.log_warning(
f"Target state {i} position dimension {len(target['position'])} "
f"does not match robot DOF {self.dof}"
)
return False, None, None, None, None, 0.0

# Plan trajectory using selected planner
(
success,
Expand Down Expand Up @@ -487,10 +446,24 @@ def calculate_point_allocations(
self._create_state_dict(pos) for pos in interpolate_qpos_list[1:]
]

init_plan_state = PlanState(
move_type=MoveType.JOINT_MOVE,
move_part=MovePart.ALL,
qpos=current_state["position"],
qvel=current_state["velocity"],
qacc=current_state["acceleration"],
)
target_plan_states = []
for state in target_states:
plan_state = PlanState(
move_type=MoveType.JOINT_MOVE, qpos=state["position"]
)
target_plan_states.append(plan_state)

# Plan trajectory using internal plan method
success, positions, velocities, accelerations, times, duration = self.plan(
current_state=current_state,
target_states=target_states,
current_state=init_plan_state,
target_states=target_plan_states,
sample_method=sample_method,
sample_interval=sample_num,
**kwargs,
Expand All @@ -500,10 +473,7 @@ def calculate_point_allocations(
logger.log_error("Failed to plan trajectory")

# Convert positions to list
out_qpos_list = (
positions.tolist() if isinstance(positions, np.ndarray) else positions
)

out_qpos_list = positions.to("cpu").numpy().tolist()
out_qpos_list = (
torch.tensor(out_qpos_list)
if not isinstance(out_qpos_list, torch.Tensor)
Expand Down
37 changes: 19 additions & 18 deletions embodichain/lab/sim/planners/toppra_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from embodichain.utils import logger
from embodichain.lab.sim.planners.utils import TrajectorySampleMethod
from embodichain.lab.sim.planners.base_planner import BasePlanner
from embodichain.lab.sim.planners.utils import PlanState
import torch

from typing import TYPE_CHECKING, Union, Tuple

Comment on lines 24 to 25
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TYPE_CHECKING, Union, and Tuple are imported but unused in this module. Please remove them to avoid dead imports and keep lint/static analysis clean.

Suggested change
from typing import TYPE_CHECKING, Union, Tuple

Copilot uses AI. Check for mistakes.
Expand All @@ -33,24 +35,25 @@


class ToppraPlanner(BasePlanner):
def __init__(self, dofs, max_constraints):
def __init__(self, **kwargs):
r"""Initialize the TOPPRA trajectory planner.

Args:
dofs: Number of degrees of freedom
max_constraints: Dictionary containing 'velocity' and 'acceleration' constraints
"""
super().__init__(dofs, max_constraints)
super().__init__(**kwargs)
Comment on lines +38 to +45
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ToppraPlanner.__init__ now accepts only **kwargs, but the docstring still documents positional dofs/max_constraints arguments. Please update the docstring (and ideally add explicit keyword-only parameters) so it’s clear what must be provided.

Copilot uses AI. Check for mistakes.

# Create TOPPRA-specific constraint arrays (symmetric format)
# This format is required by TOPPRA library
max_constraints = kwargs.get("max_constraints", None)
self.vlims = np.array([[-v, v] for v in max_constraints["velocity"]])
self.alims = np.array([[-a, a] for a in max_constraints["acceleration"]])

Comment on lines 47 to 52
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_constraints is fetched from kwargs and then indexed without validation; if it’s missing/None this will raise a non-obvious TypeError. Since BasePlanner already stores self.max_constraints, consider using that and explicitly validating it (and self.dofs) with logger.log_error when they are not provided.

Suggested change
# Create TOPPRA-specific constraint arrays (symmetric format)
# This format is required by TOPPRA library
max_constraints = kwargs.get("max_constraints", None)
self.vlims = np.array([[-v, v] for v in max_constraints["velocity"]])
self.alims = np.array([[-a, a] for a in max_constraints["acceleration"]])
# Validate required planner configuration
if self.max_constraints is None:
logger.log_error(
"ToppraPlanner requires 'max_constraints' to be provided in BasePlanner.",
ValueError,
)
if self.dofs is None:
logger.log_error(
"ToppraPlanner requires 'dofs' to be provided in BasePlanner.",
ValueError,
)
# Create TOPPRA-specific constraint arrays (symmetric format)
# This format is required by TOPPRA library
try:
velocity_limits = self.max_constraints["velocity"]
acceleration_limits = self.max_constraints["acceleration"]
except (TypeError, KeyError):
logger.log_error(
"ToppraPlanner 'max_constraints' must be a mapping with "
"'velocity' and 'acceleration' entries.",
ValueError,
)
self.vlims = np.array([[-v, v] for v in velocity_limits])
self.alims = np.array([[-a, a] for a in acceleration_limits])

Copilot uses AI. Check for mistakes.
def plan(
self,
current_state: dict,
target_states: list[dict],
current_state: PlanState,
target_states: list[PlanState],
**kwargs,
Comment on lines 53 to 57
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan method now takes PlanState objects, but the docstring still describes current_state / target_states as dictionaries with keys like 'position'. Please update the docstring to reflect the new PlanState fields (e.g., qpos) so the public API stays accurate.

Copilot uses AI. Check for mistakes.
):
r"""Execute trajectory planning.
Expand All @@ -75,40 +78,36 @@ def plan(
logger.log_error("At least 2 sample points required", ValueError)

# Check waypoints
if len(current_state["position"]) != self.dofs:
if len(current_state.qpos) != self.dofs:
logger.log_info("Current wayponit does not align")
return False, None, None, None, None, None
for target in target_states:
if len(target["position"]) != self.dofs:
if len(target.qpos) != self.dofs:
logger.log_info("Target Wayponits does not align")
return False, None, None, None, None, None

if (
len(target_states) == 1
and np.sum(
np.abs(
np.array(target_states[0]["position"])
- np.array(current_state["position"])
)
np.abs(np.array(target_states[0].qpos) - np.array(current_state.qpos))
)
< 1e-3
):
logger.log_info("Only two same waypoints, do not plan")
return (
True,
np.array([current_state["position"], target_states[0]["position"]]),
np.array([current_state.qpos, target_states[0].qpos]),
np.array([[0.0] * self.dofs, [0.0] * self.dofs]),
np.array([[0.0] * self.dofs, [0.0] * self.dofs]),
0,
0,
)
Comment on lines 100 to 104
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the early-return case for identical waypoints, times and duration are returned as 0 (ints). Callers (and the BasePlanner.plan return type) expect times to be an array-like and duration to be a float; please return consistent types (e.g., a 2-element times array and a float duration) to avoid downstream type/shape errors.

Copilot uses AI. Check for mistakes.
Comment on lines +81 to 104
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several failure/degenerate returns from ToppraPlanner.plan() don’t match the declared return contract: (1) waypoint-dimension mismatch returns duration=None; (2) the "same waypoints" fast-path returns times=0 and duration=0 instead of an array of timestamps and a float duration. Please standardize these returns (e.g., times=np.array([...]) and duration=0.0) so callers can rely on types even when planning is skipped/failed.

Copilot uses AI. Check for mistakes.
Comment on lines 89 to 104
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the "two same waypoints" early-return branch, the function returns NumPy arrays and scalar 0s for times/duration. After this PR, callers (e.g., MotionGenerator.create_discrete_trajectory) assume positions/times are torch tensors and will crash when calling .to(...). Please make this early-return match the new contract (torch tensors on self.device, and a float duration).

Copilot uses AI. Check for mistakes.

# Build waypoints
waypoints = [np.array(current_state["position"])]
waypoints = [np.array(current_state.qpos)]
for target in target_states:
waypoints.append(np.array(target["position"]))
waypoints.append(np.array(target.qpos))
waypoints = np.array(waypoints)
Comment on lines 106 to 110
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np.array(current_state.qpos) / np.array(target.qpos) will produce a dtype=object array when qpos is a torch.Tensor (common in this codebase), which can break TOPPRA interpolation. Please explicitly convert torch tensors to NumPy (e.g., .detach().cpu().numpy()) before building waypoints.

Copilot uses AI. Check for mistakes.

# Create spline interpolation
# NOTE: Suitable for dense waypoints
ss = np.linspace(0, 1, len(waypoints))
Expand Down Expand Up @@ -164,9 +163,11 @@ def plan(

return (
True,
np.array(positions),
np.array(velocities),
np.array(accelerations),
ts,
torch.tensor(np.array(positions), dtype=torch.float32, device=self.device),
torch.tensor(np.array(velocities), dtype=torch.float32, device=self.device),
torch.tensor(
np.array(accelerations), dtype=torch.float32, device=self.device
),
torch.tensor(ts, dtype=torch.float32, device=self.device),
duration,
)
Loading
Loading