From ef2f46f86f6a4df3527be52c687b41a024191698 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lasse=20Fr=C3=B6hner?= Date: Thu, 28 Aug 2025 14:20:45 +0200 Subject: [PATCH 1/4] Allow different virtual CAN channels The python-can virtual bus supports multiple distinct channels, which are identified by an arbitrary channel name. This commit adds support for these different channels to the PythonCANMedia class, using the existing "virtual:" syntax. The previous behavior of using an empty channel name is preserved. --- .../can/media/pythoncan/_pythoncan.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/pycyphal/transport/can/media/pythoncan/_pythoncan.py b/pycyphal/transport/can/media/pythoncan/_pythoncan.py index 21b5c7d9..164eb871 100644 --- a/pycyphal/transport/can/media/pythoncan/_pythoncan.py +++ b/pycyphal/transport/can/media/pythoncan/_pythoncan.py @@ -105,8 +105,8 @@ def __init__( Example: ``pcan:PCAN_USBBUS1`` - Interface ``virtual`` is described in https://python-can.readthedocs.io/en/master/interfaces/virtual.html. - The channel name should be empty. - Example: ``virtual:`` + The channel name may be empty. + Example: ``virtual:``, ``virtual:foo-can`` - Interface ``usb2can`` is described in https://python-can.readthedocs.io/en/stable/interfaces/usb2can.html. Example: ``usb2can:ED000100`` @@ -514,15 +514,11 @@ def _construct_pcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus: assert False, "Internal error" -def _construct_virtual(parameters: _InterfaceParameters) -> can.ThreadSafeBus: - if isinstance(parameters, _ClassicInterfaceParameters): - return ( - PythonCANBusOptions(), - can.ThreadSafeBus(interface=parameters.interface_name, bitrate=parameters.bitrate), - ) - if isinstance(parameters, _FDInterfaceParameters): - return (PythonCANBusOptions(), can.ThreadSafeBus(interface=parameters.interface_name)) - assert False, "Internal error" +def _construct_virtual(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: + return ( + PythonCANBusOptions(), + can.ThreadSafeBus(interface=parameters.interface_name, channel=parameters.channel_name), + ) def _construct_usb2can(parameters: _InterfaceParameters) -> can.ThreadSafeBus: From 69a389e8417c8f8e193baeb6081187aced4a8752 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lasse=20Fr=C3=B6hner?= Date: Thu, 28 Aug 2025 14:24:52 +0200 Subject: [PATCH 2/4] Update type annotations of constructors in _pythoncan.py --- .../transport/can/media/pythoncan/_pythoncan.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pycyphal/transport/can/media/pythoncan/_pythoncan.py b/pycyphal/transport/can/media/pythoncan/_pythoncan.py index 164eb871..b0e4a540 100644 --- a/pycyphal/transport/can/media/pythoncan/_pythoncan.py +++ b/pycyphal/transport/can/media/pythoncan/_pythoncan.py @@ -425,7 +425,7 @@ class _FDInterfaceParameters(_InterfaceParameters): bitrate: typing.Tuple[int, int] -def _construct_socketcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus: +def _construct_socketcan(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: if isinstance(parameters, _ClassicInterfaceParameters): return ( PythonCANBusOptions(), @@ -439,7 +439,7 @@ def _construct_socketcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus: assert False, "Internal error" -def _construct_kvaser(parameters: _InterfaceParameters) -> can.ThreadSafeBus: +def _construct_kvaser(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: if isinstance(parameters, _ClassicInterfaceParameters): return ( PythonCANBusOptions(), @@ -464,7 +464,7 @@ def _construct_kvaser(parameters: _InterfaceParameters) -> can.ThreadSafeBus: assert False, "Internal error" -def _construct_slcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus: +def _construct_slcan(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: if isinstance(parameters, _ClassicInterfaceParameters): return ( PythonCANBusOptions(), @@ -479,7 +479,7 @@ def _construct_slcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus: assert False, "Internal error" -def _construct_pcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus: +def _construct_pcan(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: if isinstance(parameters, _ClassicInterfaceParameters): return ( PythonCANBusOptions(), @@ -521,7 +521,7 @@ def _construct_virtual(parameters: _InterfaceParameters) -> typing.Tuple[PythonC ) -def _construct_usb2can(parameters: _InterfaceParameters) -> can.ThreadSafeBus: +def _construct_usb2can(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: if isinstance(parameters, _ClassicInterfaceParameters): return ( PythonCANBusOptions(), @@ -536,7 +536,7 @@ def _construct_usb2can(parameters: _InterfaceParameters) -> can.ThreadSafeBus: assert False, "Internal error" -def _construct_canalystii(parameters: _InterfaceParameters) -> can.ThreadSafeBus: +def _construct_canalystii(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: if isinstance(parameters, _ClassicInterfaceParameters): return ( PythonCANBusOptions(), @@ -549,7 +549,7 @@ def _construct_canalystii(parameters: _InterfaceParameters) -> can.ThreadSafeBus assert False, "Internal error" -def _construct_seeedstudio(parameters: _InterfaceParameters) -> can.ThreadSafeBus: +def _construct_seeedstudio(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: if isinstance(parameters, _ClassicInterfaceParameters): return ( PythonCANBusOptions(), @@ -564,7 +564,7 @@ def _construct_seeedstudio(parameters: _InterfaceParameters) -> can.ThreadSafeBu assert False, "Internal error" -def _construct_gs_usb(parameters: _InterfaceParameters) -> can.ThreadSafeBus: +def _construct_gs_usb(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]: if isinstance(parameters, _ClassicInterfaceParameters): try: index = int(parameters.channel_name) From 967787337b3f8534d6c2dc0bdff948fe15b6067d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lasse=20Fr=C3=B6hner?= Date: Thu, 28 Aug 2025 14:33:07 +0200 Subject: [PATCH 3/4] Implement PythonCANMedia.list_available_interface_names() The functionality comes from python-can's detect_available_configs(). Each interface is tried on its own to catch errors in case the respective driver library is not available. --- pycyphal/transport/can/media/pythoncan/_pythoncan.py | 12 ++++++++++-- tests/transport/can/media/_pythoncan.py | 9 +++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/pycyphal/transport/can/media/pythoncan/_pythoncan.py b/pycyphal/transport/can/media/pythoncan/_pythoncan.py index b0e4a540..ed977ee5 100644 --- a/pycyphal/transport/can/media/pythoncan/_pythoncan.py +++ b/pycyphal/transport/can/media/pythoncan/_pythoncan.py @@ -348,9 +348,17 @@ def close(self) -> None: @staticmethod def list_available_interface_names() -> typing.Iterable[str]: """ - Returns an empty list. TODO: provide minimally functional implementation. + Returns a list of available interfaces. """ - return [] + available_configs: typing.List[can.typechecking.AutoDetectedConfig] = [] + for interface in _CONSTRUCTORS.keys(): + # try each interface on its own to catch errors if the interface library is not available + try: + available_configs.extend(can.detect_available_configs(interfaces=[interface])) + except NotImplementedError: + _logger.debug("%s: Interface not supported", interface) + continue + return [f"{config['interface']}:{config['channel']}" for config in available_configs] def _invoke_rx_handler(self, frs: typing.List[typing.Tuple[Timestamp, Envelope]]) -> None: try: diff --git a/tests/transport/can/media/_pythoncan.py b/tests/transport/can/media/_pythoncan.py index 4e503cd8..4bfead7e 100644 --- a/tests/transport/can/media/_pythoncan.py +++ b/tests/transport/can/media/_pythoncan.py @@ -167,6 +167,15 @@ def _unittest_can_pythoncan_iface_name() -> None: media.close() +def _unittest_can_pythoncan_list_iface_names() -> None: + available_iface_names = list(PythonCANMedia.list_available_interface_names()) + assert len(available_iface_names) > 0 + # https://python-can.readthedocs.io/en/stable/interfaces/virtual.html#can.interfaces.virtual.VirtualBus._detect_available_configs + assert any( + name.startswith("virtual:") for name in available_iface_names + ), "At least one virtual interface should be available" + + def _unittest_can_pythoncan_errors() -> None: with pytest.raises(InvalidMediaConfigurationError, match=r".*interface:channel.*"): PythonCANMedia("malformed_name", 1_000_000) From 67f55677109711ad3215993e1e860fda9b7bfd81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lasse=20Fr=C3=B6hner?= Date: Thu, 28 Aug 2025 14:36:04 +0200 Subject: [PATCH 4/4] Bump version to 1.24.5 --- pycyphal/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pycyphal/_version.py b/pycyphal/_version.py index d0702e5a..b1b8e08c 100644 --- a/pycyphal/_version.py +++ b/pycyphal/_version.py @@ -1 +1 @@ -__version__ = "1.24.4" +__version__ = "1.24.5"