Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates the planning API to use a structured PlanState object (instead of raw dicts) and adapts the TOPPRA planner and motion generator to the new interface.
Changes:
- Introduces
PlanState(plusMoveType/MovePart) in planner utils. - Refactors
BasePlannerandToppraPlannerto acceptPlanStateinputs and initializes planners via**kwargs. - Updates
MotionGeneratorto constructPlanStateobjects before invoking the planner.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
embodichain/lab/sim/planners/utils.py |
Adds PlanState and related enums used to represent planning inputs more explicitly. |
embodichain/lab/sim/planners/base_planner.py |
Updates base planner API to accept PlanState and switches init to **kwargs. |
embodichain/lab/sim/planners/toppra_planner.py |
Updates TOPPRA planner to accept PlanState and adjusts waypoint handling accordingly. |
embodichain/lab/sim/planners/motion_generator.py |
Packs dict-based states into PlanState before calling the underlying planner. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| move_part: MovePart = MovePart.LEFT | ||
| xpos: torch.Tensor = None | ||
| qpos: torch.Tensor = None | ||
| qacc: torch.Tensor = None | ||
| qvel: torch.Tensor = None |
There was a problem hiding this comment.
PlanState fields are annotated as torch.Tensor but default to None (and are also populated with lists in MotionGenerator.plan). Consider widening these types (e.g., torch.Tensor | np.ndarray | Sequence[float] | None) and/or using Optional[...] to match the actual inputs and avoid type-checking/runtime surprises.
| def __init__(self, **kwargs): | ||
| self.dofs = kwargs.get("dofs", None) | ||
| self.max_constraints = kwargs.get("max_constraints", None) | ||
|
|
There was a problem hiding this comment.
BasePlanner.__init__ now silently accepts missing dofs / max_constraints (setting them to None), which will later crash in places like constraint checks or waypoint validation. Since these are required for any planner, consider making them explicit parameters again or validating kwargs and raising via logger.log_error when required values are absent.
| def __init__(self, **kwargs): | |
| self.dofs = kwargs.get("dofs", None) | |
| self.max_constraints = kwargs.get("max_constraints", None) | |
| def __init__(self, **kwargs): | |
| # Required configuration parameters for any planner | |
| self.dofs = kwargs.get("dofs", None) | |
| self.max_constraints = kwargs.get("max_constraints", None) | |
| missing_args = [] | |
| if self.dofs is None: | |
| missing_args.append("dofs") | |
| if self.max_constraints is None: | |
| missing_args.append("max_constraints") | |
| if missing_args: | |
| logger.log_error( | |
| f"Missing required planner initialization argument(s): {', '.join(missing_args)}", | |
| ValueError, | |
| ) |
| def plan( | ||
| self, | ||
| current_state: Dict, | ||
| target_states: List[Dict], | ||
| current_state: PlanState, | ||
| target_states: list[PlanState], | ||
| **kwargs, |
There was a problem hiding this comment.
The plan signature has been updated to use PlanState, but the docstring (Args section) still describes current_state / target_states as dictionaries. Please update the docstring to match the new PlanState-based API so callers know what fields are expected.
| # Create TOPPRA-specific constraint arrays (symmetric format) | ||
| # This format is required by TOPPRA library | ||
| max_constraints = kwargs.get("max_constraints", None) | ||
| self.vlims = np.array([[-v, v] for v in max_constraints["velocity"]]) | ||
| self.alims = np.array([[-a, a] for a in max_constraints["acceleration"]]) | ||
|
|
There was a problem hiding this comment.
max_constraints is fetched from kwargs and then indexed without validation; if it’s missing/None this will raise a non-obvious TypeError. Since BasePlanner already stores self.max_constraints, consider using that and explicitly validating it (and self.dofs) with logger.log_error when they are not provided.
| # Create TOPPRA-specific constraint arrays (symmetric format) | |
| # This format is required by TOPPRA library | |
| max_constraints = kwargs.get("max_constraints", None) | |
| self.vlims = np.array([[-v, v] for v in max_constraints["velocity"]]) | |
| self.alims = np.array([[-a, a] for a in max_constraints["acceleration"]]) | |
| # Validate required planner configuration | |
| if self.max_constraints is None: | |
| logger.log_error( | |
| "ToppraPlanner requires 'max_constraints' to be provided in BasePlanner.", | |
| ValueError, | |
| ) | |
| if self.dofs is None: | |
| logger.log_error( | |
| "ToppraPlanner requires 'dofs' to be provided in BasePlanner.", | |
| ValueError, | |
| ) | |
| # Create TOPPRA-specific constraint arrays (symmetric format) | |
| # This format is required by TOPPRA library | |
| try: | |
| velocity_limits = self.max_constraints["velocity"] | |
| acceleration_limits = self.max_constraints["acceleration"] | |
| except (TypeError, KeyError): | |
| logger.log_error( | |
| "ToppraPlanner 'max_constraints' must be a mapping with " | |
| "'velocity' and 'acceleration' entries.", | |
| ValueError, | |
| ) | |
| self.vlims = np.array([[-v, v] for v in velocity_limits]) | |
| self.alims = np.array([[-a, a] for a in acceleration_limits]) |
| np.array([[0.0] * self.dofs, [0.0] * self.dofs]), | ||
| np.array([[0.0] * self.dofs, [0.0] * self.dofs]), | ||
| 0, | ||
| 0, | ||
| ) |
There was a problem hiding this comment.
In the early-return case for identical waypoints, times and duration are returned as 0 (ints). Callers (and the BasePlanner.plan return type) expect times to be an array-like and duration to be a float; please return consistent types (e.g., a 2-element times array and a float duration) to avoid downstream type/shape errors.
| qvel=current_state["velocity"], | ||
| qacc=current_state["acceleration"], |
There was a problem hiding this comment.
MotionGenerator.plan previously worked when callers provided only position (docstring says velocity/acceleration are optional), but this new packing code unconditionally indexes current_state["velocity"] and ["acceleration"], which will raise KeyError for minimal inputs. Please default missing values (e.g., zeros) or reuse _create_state_dict-style normalization before building PlanState.
| qvel=current_state["velocity"], | |
| qacc=current_state["acceleration"], | |
| qvel=current_state.get("velocity", np.zeros(self.dof)), | |
| qacc=current_state.get("acceleration", np.zeros(self.dof)), |
| def plan( | ||
| self, | ||
| current_state: dict, | ||
| target_states: list[dict], | ||
| current_state: PlanState, | ||
| target_states: list[PlanState], | ||
| **kwargs, |
There was a problem hiding this comment.
The plan method now takes PlanState objects, but the docstring still describes current_state / target_states as dictionaries with keys like 'position'. Please update the docstring to reflect the new PlanState fields (e.g., qpos) so the public API stays accurate.
| from typing import Union | ||
| from embodichain.utils import logger | ||
| import torch | ||
| from enum import Enum |
There was a problem hiding this comment.
There is a duplicated from enum import Enum import here (lines 17 and 21). Please remove the duplicate and keep imports grouped/ordered to avoid confusion and unnecessary re-imports.
| from enum import Enum |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
embodichain/lab/sim/planners/toppra_planner.py:63
plan()now takesPlanState, but the docstring still refers to state dictionaries. Please update the parameter documentation to matchPlanState(e.g., which fields are required such asqpos).
r"""Execute trajectory planning.
Args:
current_state: Dictionary containing 'position', 'velocity', 'acceleration' for current state
target_states: List of dictionaries containing target states
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| @@ -34,15 +35,15 @@ class BasePlanner(ABC): | |||
| max_constraints: Dictionary containing 'velocity' and 'acceleration' constraints | |||
| """ | |||
|
|
|||
| def __init__(self, dofs: int, max_constraints: Dict[str, List[float]]): | |||
| self.dofs = dofs | |||
| self.max_constraints = max_constraints | |||
| def __init__(self, **kwargs): | |||
| self.dofs = kwargs.get("dofs", None) | |||
| self.max_constraints = kwargs.get("max_constraints", None) | |||
There was a problem hiding this comment.
The class docstring for BasePlanner (and its Args: section) still documents dofs/max_constraints as explicit constructor parameters, but __init__ now takes **kwargs. Please update the documentation (or restore explicit parameters) to avoid confusing subclasses/callers.
| if len(current_state.qpos) != self.dofs: | ||
| logger.log_info("Current wayponit does not align") | ||
| return False, None, None, None, None, None | ||
| for target in target_states: | ||
| if len(target["position"]) != self.dofs: | ||
| if len(target.qpos) != self.dofs: | ||
| logger.log_info("Target Wayponits does not align") | ||
| return False, None, None, None, None, None | ||
|
|
||
| if ( | ||
| len(target_states) == 1 | ||
| and np.sum( | ||
| np.abs( | ||
| np.array(target_states[0]["position"]) | ||
| - np.array(current_state["position"]) | ||
| ) | ||
| np.abs(np.array(target_states[0].qpos) - np.array(current_state.qpos)) | ||
| ) | ||
| < 1e-3 | ||
| ): | ||
| logger.log_info("Only two same waypoints, do not plan") | ||
| return ( | ||
| True, | ||
| np.array([current_state["position"], target_states[0]["position"]]), | ||
| np.array([current_state.qpos, target_states[0].qpos]), | ||
| np.array([[0.0] * self.dofs, [0.0] * self.dofs]), | ||
| np.array([[0.0] * self.dofs, [0.0] * self.dofs]), | ||
| 0, | ||
| 0, | ||
| ) |
There was a problem hiding this comment.
Several failure/degenerate returns from ToppraPlanner.plan() don’t match the declared return contract: (1) waypoint-dimension mismatch returns duration=None; (2) the "same waypoints" fast-path returns times=0 and duration=0 instead of an array of timestamps and a float duration. Please standardize these returns (e.g., times=np.array([...]) and duration=0.0) so callers can rely on types even when planning is skipped/failed.
| from enum import Enum | ||
| from typing import Union | ||
| from embodichain.utils import logger | ||
| import torch | ||
| from enum import Enum | ||
| from dataclasses import dataclass | ||
|
|
There was a problem hiding this comment.
Enum is imported twice and the import block is out of order (stdlib/third-party/local). Please remove the duplicate from enum import Enum and re-order imports to avoid lint/format churn.
| from enum import Enum | |
| from typing import Union | |
| from embodichain.utils import logger | |
| import torch | |
| from enum import Enum | |
| from dataclasses import dataclass | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from typing import Union | |
| import torch | |
| from embodichain.utils import logger |
| import torch | ||
| import numpy as np | ||
| from typing import Dict, List, Tuple, Union, Any | ||
| from enum import Enum | ||
| from scipy.spatial.transform import Rotation, Slerp | ||
|
|
||
| from embodichain.lab.sim.planners.toppra_planner import ToppraPlanner | ||
| from embodichain.lab.sim.planners.utils import TrajectorySampleMethod | ||
| from embodichain.lab.sim.objects.robot import Robot | ||
| from embodichain.utils import logger | ||
|
|
||
|
|
||
| class PlannerType(Enum): | ||
| r"""Enumeration for different planner types.""" | ||
|
|
||
| TOPPRA = "toppra" | ||
| """TOPPRA planner for time-optimal trajectory planning.""" | ||
| from embodichain.lab.sim.planners.utils import PlanState, MoveType, MovePart | ||
|
|
There was a problem hiding this comment.
After removing PlannerType, from enum import Enum appears unused in this module. Please drop the unused import to avoid lint failures and keep the import list accurate.
| def __init__(self, **kwargs): | ||
| r"""Initialize the TOPPRA trajectory planner. | ||
|
|
||
| Args: | ||
| dofs: Number of degrees of freedom | ||
| max_constraints: Dictionary containing 'velocity' and 'acceleration' constraints | ||
| """ | ||
| super().__init__(dofs, max_constraints) | ||
| super().__init__(**kwargs) |
There was a problem hiding this comment.
ToppraPlanner.__init__ now accepts only **kwargs, but the docstring still documents positional dofs/max_constraints arguments. Please update the docstring (and ideally add explicit keyword-only parameters) so it’s clear what must be provided.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| if ( | ||
| len(target_states) == 1 | ||
| and np.sum( | ||
| np.abs( | ||
| np.array(target_states[0]["position"]) | ||
| - np.array(current_state["position"]) | ||
| ) | ||
| np.abs(np.array(target_states[0].qpos) - np.array(current_state.qpos)) | ||
| ) | ||
| < 1e-3 | ||
| ): | ||
| logger.log_info("Only two same waypoints, do not plan") | ||
| return ( | ||
| True, | ||
| np.array([current_state["position"], target_states[0]["position"]]), | ||
| np.array([current_state.qpos, target_states[0].qpos]), | ||
| np.array([[0.0] * self.dofs, [0.0] * self.dofs]), | ||
| np.array([[0.0] * self.dofs, [0.0] * self.dofs]), | ||
| 0, | ||
| 0, | ||
| ) |
There was a problem hiding this comment.
In the "two same waypoints" early-return branch, the function returns NumPy arrays and scalar 0s for times/duration. After this PR, callers (e.g., MotionGenerator.create_discrete_trajectory) assume positions/times are torch tensors and will crash when calling .to(...). Please make this early-return match the new contract (torch tensors on self.device, and a float duration).
| from typing import TYPE_CHECKING, Union, Tuple | ||
|
|
There was a problem hiding this comment.
TYPE_CHECKING, Union, and Tuple are imported but unused in this module. Please remove them to avoid dead imports and keep lint/static analysis clean.
| from typing import TYPE_CHECKING, Union, Tuple |
| # Build waypoints | ||
| waypoints = [np.array(current_state["position"])] | ||
| waypoints = [np.array(current_state.qpos)] | ||
| for target in target_states: | ||
| waypoints.append(np.array(target["position"])) | ||
| waypoints.append(np.array(target.qpos)) | ||
| waypoints = np.array(waypoints) |
There was a problem hiding this comment.
np.array(current_state.qpos) / np.array(target.qpos) will produce a dtype=object array when qpos is a torch.Tensor (common in this codebase), which can break TOPPRA interpolation. Please explicitly convert torch tensors to NumPy (e.g., .detach().cpu().numpy()) before building waypoints.
| @@ -65,45 +70,16 @@ | |||
| self.robot = robot | |||
| self.sim = sim | |||
| self.collision_margin = collision_margin | |||
| self.uid = uid | |||
| self.uid = uid # control part | |||
|
|
|||
| # Get robot DOF using get_joint_ids for specified control part (None for whole body) | |||
| self.dof = len(robot.get_joint_ids(uid)) | |||
|
|
|||
| # Create planner based on planner_type | |||
| self.planner_type = self._parse_planner_type(planner_type) | |||
| self.planner = self._create_planner( | |||
| self.planner_type, default_velocity, default_acceleration, **kwargs | |||
| planner_type, default_velocity, default_acceleration, **kwargs | |||
| ) | |||
There was a problem hiding this comment.
MotionGenerator used to accept planner types case-insensitively (and via PlannerType enum). After removing _parse_planner_type, planner_type must exactly match a key in _support_planner_dict, so values like "TOPPRA" will now fail. Please normalize (e.g., planner_type = planner_type.lower()) and keep the previous validation/warning behavior to avoid an API regression.
| if not success or positions is None: | ||
| logger.log_error("Failed to plan trajectory") | ||
|
|
||
| # Convert positions to list | ||
| out_qpos_list = ( | ||
| positions.tolist() if isinstance(positions, np.ndarray) else positions | ||
| ) | ||
|
|
||
| out_qpos_list = positions.to("cpu").numpy().tolist() |
There was a problem hiding this comment.
out_qpos_list = positions.to("cpu").numpy().tolist() assumes positions is always a torch.Tensor. With the current ToppraPlanner.plan early-return branch, positions can be a NumPy array, causing an AttributeError here. Either enforce the torch return type in all planners (preferred) or add a safe conversion/branch here.
| def plan( | ||
| self, | ||
| current_state: Dict, | ||
| target_states: List[Dict], | ||
| current_state: PlanState, | ||
| target_states: list[PlanState], | ||
| **kwargs, |
There was a problem hiding this comment.
The plan() docstring still describes current_state / target_states as dictionaries, but the signature now uses PlanState and the return types are torch tensors. Please update the docstring to match the new API to avoid misleading callers.
Description
Update BasePlanner.
Type of change
Checklist
black .command to format the code base.