Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pycyphal/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.24.4"
__version__ = "1.24.5"
46 changes: 25 additions & 21 deletions pycyphal/transport/can/media/pythoncan/_pythoncan.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ def __init__(
Example: ``pcan:PCAN_USBBUS1``

- Interface ``virtual`` is described in https://python-can.readthedocs.io/en/master/interfaces/virtual.html.
The channel name should be empty.
Example: ``virtual:``
The channel name may be empty.
Example: ``virtual:``, ``virtual:foo-can``

- Interface ``usb2can`` is described in https://python-can.readthedocs.io/en/stable/interfaces/usb2can.html.
Example: ``usb2can:ED000100``
Expand Down Expand Up @@ -348,9 +348,17 @@ def close(self) -> None:
@staticmethod
def list_available_interface_names() -> typing.Iterable[str]:
"""
Returns an empty list. TODO: provide minimally functional implementation.
Returns a list of available interfaces.
"""
return []
available_configs: typing.List[can.typechecking.AutoDetectedConfig] = []
for interface in _CONSTRUCTORS.keys():
# try each interface on its own to catch errors if the interface library is not available
try:
available_configs.extend(can.detect_available_configs(interfaces=[interface]))
except NotImplementedError:
_logger.debug("%s: Interface not supported", interface)
continue
return [f"{config['interface']}:{config['channel']}" for config in available_configs]

def _invoke_rx_handler(self, frs: typing.List[typing.Tuple[Timestamp, Envelope]]) -> None:
try:
Expand Down Expand Up @@ -425,7 +433,7 @@ class _FDInterfaceParameters(_InterfaceParameters):
bitrate: typing.Tuple[int, int]


def _construct_socketcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus:
def _construct_socketcan(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]:
if isinstance(parameters, _ClassicInterfaceParameters):
return (
PythonCANBusOptions(),
Expand All @@ -439,7 +447,7 @@ def _construct_socketcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus:
assert False, "Internal error"


def _construct_kvaser(parameters: _InterfaceParameters) -> can.ThreadSafeBus:
def _construct_kvaser(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]:
if isinstance(parameters, _ClassicInterfaceParameters):
return (
PythonCANBusOptions(),
Expand All @@ -464,7 +472,7 @@ def _construct_kvaser(parameters: _InterfaceParameters) -> can.ThreadSafeBus:
assert False, "Internal error"


def _construct_slcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus:
def _construct_slcan(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]:
if isinstance(parameters, _ClassicInterfaceParameters):
return (
PythonCANBusOptions(),
Expand All @@ -479,7 +487,7 @@ def _construct_slcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus:
assert False, "Internal error"


def _construct_pcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus:
def _construct_pcan(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]:
if isinstance(parameters, _ClassicInterfaceParameters):
return (
PythonCANBusOptions(),
Expand Down Expand Up @@ -514,18 +522,14 @@ def _construct_pcan(parameters: _InterfaceParameters) -> can.ThreadSafeBus:
assert False, "Internal error"


def _construct_virtual(parameters: _InterfaceParameters) -> can.ThreadSafeBus:
if isinstance(parameters, _ClassicInterfaceParameters):
return (
PythonCANBusOptions(),
can.ThreadSafeBus(interface=parameters.interface_name, bitrate=parameters.bitrate),
)
if isinstance(parameters, _FDInterfaceParameters):
return (PythonCANBusOptions(), can.ThreadSafeBus(interface=parameters.interface_name))
assert False, "Internal error"
def _construct_virtual(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]:
return (
PythonCANBusOptions(),
can.ThreadSafeBus(interface=parameters.interface_name, channel=parameters.channel_name),
)


def _construct_usb2can(parameters: _InterfaceParameters) -> can.ThreadSafeBus:
def _construct_usb2can(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]:
if isinstance(parameters, _ClassicInterfaceParameters):
return (
PythonCANBusOptions(),
Expand All @@ -540,7 +544,7 @@ def _construct_usb2can(parameters: _InterfaceParameters) -> can.ThreadSafeBus:
assert False, "Internal error"


def _construct_canalystii(parameters: _InterfaceParameters) -> can.ThreadSafeBus:
def _construct_canalystii(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]:
if isinstance(parameters, _ClassicInterfaceParameters):
return (
PythonCANBusOptions(),
Expand All @@ -553,7 +557,7 @@ def _construct_canalystii(parameters: _InterfaceParameters) -> can.ThreadSafeBus
assert False, "Internal error"


def _construct_seeedstudio(parameters: _InterfaceParameters) -> can.ThreadSafeBus:
def _construct_seeedstudio(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]:
if isinstance(parameters, _ClassicInterfaceParameters):
return (
PythonCANBusOptions(),
Expand All @@ -568,7 +572,7 @@ def _construct_seeedstudio(parameters: _InterfaceParameters) -> can.ThreadSafeBu
assert False, "Internal error"


def _construct_gs_usb(parameters: _InterfaceParameters) -> can.ThreadSafeBus:
def _construct_gs_usb(parameters: _InterfaceParameters) -> typing.Tuple[PythonCANBusOptions, can.ThreadSafeBus]:
if isinstance(parameters, _ClassicInterfaceParameters):
try:
index = int(parameters.channel_name)
Expand Down
9 changes: 9 additions & 0 deletions tests/transport/can/media/_pythoncan.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ def _unittest_can_pythoncan_iface_name() -> None:
media.close()


def _unittest_can_pythoncan_list_iface_names() -> None:
available_iface_names = list(PythonCANMedia.list_available_interface_names())
assert len(available_iface_names) > 0
# https://python-can.readthedocs.io/en/stable/interfaces/virtual.html#can.interfaces.virtual.VirtualBus._detect_available_configs
assert any(
name.startswith("virtual:") for name in available_iface_names
), "At least one virtual interface should be available"


def _unittest_can_pythoncan_errors() -> None:
with pytest.raises(InvalidMediaConfigurationError, match=r".*interface:channel.*"):
PythonCANMedia("malformed_name", 1_000_000)
Expand Down