diff --git a/pycyphal/_version.py b/pycyphal/_version.py index d0702e5a..b1b8e08c 100644 --- a/pycyphal/_version.py +++ b/pycyphal/_version.py @@ -1 +1 @@ -__version__ = "1.24.4" +__version__ = "1.24.5" diff --git a/pycyphal/transport/can/media/pythoncan/_pythoncan.py b/pycyphal/transport/can/media/pythoncan/_pythoncan.py index 21b5c7d9..ed977ee5 100644 --- a/pycyphal/transport/can/media/pythoncan/_pythoncan.py +++ b/pycyphal/transport/can/media/pythoncan/_pythoncan.py @@ -105,8 +105,8 @@ def __init__( Example: ``pcan:PCAN_USBBUS1`` - Interface ``virtual`` is described in https://python-can.readthedocs.io/en/master/interfaces/virtual.html. - The channel name should be empty. - Example: ``virtual:`` + The channel name may be empty. + Example: ``virtual:``, ``virtual:foo-can`` - Interface ``usb2can`` is described in https://python-can.readthedocs.io/en/stable/interfaces/usb2can.html. Example: ``usb2can:ED000100`` @@ -348,9 +348,17 @@ def close(self) -> None: @staticmethod def list_available_interface_names() -> typing.Iterable[str]: """ - Returns an empty list. TODO: provide minimally functional implementation. + Returns a list of available interfaces. """ - return [] + available_configs: typing.List[can.typechecking.AutoDetectedConfig] = [] + for interface in _CONSTRUCTORS.keys(): + # try each interface on its own to catch errors if the interface library is not available + try: + available_configs.extend(can.detect_available_configs(interfaces=[interface])) + except NotImplementedError: + _logger.debug("%s: Interface not supported", interface) + continue + return [f"{config['interface']}:{config['channel']}" for config in available_configs] def _invoke_rx_handler(self, frs: typing.List[typing.Tuple[Timestamp, Envelope]]) -> None: try: @@ -425,7 +433,7 @@ class _FDInterfaceParameters(_InterfaceParameters): bitrate: typing.Tuple[int, int] -def _construct_socketcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus: +def _construct_socketcan(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: if isinstance(parameters, _ClassicInterfaceParameters): return ( PythonCANBusOptions(), @@ -439,7 +447,7 @@ def _construct_socketcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus: assert False, "Internal error" -def _construct_kvaser(parameters: _InterfaceParameters) -> can.ThreadSafeBus: +def _construct_kvaser(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: if isinstance(parameters, _ClassicInterfaceParameters): return ( PythonCANBusOptions(), @@ -464,7 +472,7 @@ def _construct_kvaser(parameters: _InterfaceParameters) -> can.ThreadSafeBus: assert False, "Internal error" -def _construct_slcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus: +def _construct_slcan(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: if isinstance(parameters, _ClassicInterfaceParameters): return ( PythonCANBusOptions(), @@ -479,7 +487,7 @@ def _construct_slcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus: assert False, "Internal error" -def _construct_pcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus: +def _construct_pcan(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: if isinstance(parameters, _ClassicInterfaceParameters): return ( PythonCANBusOptions(), @@ -514,18 +522,14 @@ def _construct_pcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus: assert False, "Internal error" -def _construct_virtual(parameters: _InterfaceParameters) -> can.ThreadSafeBus: - if isinstance(parameters, _ClassicInterfaceParameters): - return ( - PythonCANBusOptions(), - can.ThreadSafeBus(interface=parameters.interface_name, bitrate=parameters.bitrate), - ) - if isinstance(parameters, _FDInterfaceParameters): - return (PythonCANBusOptions(), can.ThreadSafeBus(interface=parameters.interface_name)) - assert False, "Internal error" +def _construct_virtual(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: + return ( + PythonCANBusOptions(), + can.ThreadSafeBus(interface=parameters.interface_name, channel=parameters.channel_name), + ) -def _construct_usb2can(parameters: _InterfaceParameters) -> can.ThreadSafeBus: +def _construct_usb2can(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: if isinstance(parameters, _ClassicInterfaceParameters): return ( PythonCANBusOptions(), @@ -540,7 +544,7 @@ def _construct_usb2can(parameters: _InterfaceParameters) -> can.ThreadSafeBus: assert False, "Internal error" -def _construct_canalystii(parameters: _InterfaceParameters) -> can.ThreadSafeBus: +def _construct_canalystii(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: if isinstance(parameters, _ClassicInterfaceParameters): return ( PythonCANBusOptions(), @@ -553,7 +557,7 @@ def _construct_canalystii(parameters: _InterfaceParameters) -> can.ThreadSafeBus assert False, "Internal error" -def _construct_seeedstudio(parameters: _InterfaceParameters) -> can.ThreadSafeBus: +def _construct_seeedstudio(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: if isinstance(parameters, _ClassicInterfaceParameters): return ( PythonCANBusOptions(), @@ -568,7 +572,7 @@ def _construct_seeedstudio(parameters: _InterfaceParameters) -> can.ThreadSafeBu assert False, "Internal error" -def _construct_gs_usb(parameters: _InterfaceParameters) -> can.ThreadSafeBus: +def _construct_gs_usb(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: if isinstance(parameters, _ClassicInterfaceParameters): try: index = int(parameters.channel_name) diff --git a/tests/transport/can/media/_pythoncan.py b/tests/transport/can/media/_pythoncan.py index 4e503cd8..4bfead7e 100644 --- a/tests/transport/can/media/_pythoncan.py +++ b/tests/transport/can/media/_pythoncan.py @@ -167,6 +167,15 @@ def _unittest_can_pythoncan_iface_name() -> None: media.close() +def _unittest_can_pythoncan_list_iface_names() -> None: + available_iface_names = list(PythonCANMedia.list_available_interface_names()) + assert len(available_iface_names) > 0 + # https://python-can.readthedocs.io/en/stable/interfaces/virtual.html#can.interfaces.virtual.VirtualBus._detect_available_configs + assert any( + name.startswith("virtual:") for name in available_iface_names + ), "At least one virtual interface should be available" + + def _unittest_can_pythoncan_errors() -> None: with pytest.raises(InvalidMediaConfigurationError, match=r".*interface:channel.*"): PythonCANMedia("malformed_name", 1_000_000)