From a215f5b66d6a311fa70dcd2bc365682a3ace37aa Mon Sep 17 00:00:00 2001 From: Abdylreshit Date: Fri, 24 Apr 2026 20:38:55 +0500 Subject: [PATCH] feat(plc4py/simulated): add in-process simulated PLC driver Introduces a new 'simulated' driver under plc4py.drivers.simulated, mirroring the existing Java and Go implementations. The mspec-generated protocol code under plc4py/protocols/simulated/readwrite/ was already present in the tree, but no Python driver consumed it; this change fills that gap. The driver does not open a network socket -- it dispatches read and write requests directly to an in-memory SimulatedDevice. That makes it useful both as a reference implementation of the driver SPI and in tests that need a working driver without external infrastructure. Tag format mirrors the Java/Go drivers: TYPE/name:DATA_TYPE[count] (e.g. STATE/counter:INT, RANDOM/temperature:REAL, STDOUT/msg:STRING). Supported tag types: - STATE -- read/write in-memory key-value store - RANDOM -- reads return a freshly generated random value; writes are rejected with ACCESS_DENIED - STDOUT -- writes are logged; reads return ACCESS_DENIED All 19 data types from simulated.mspec are supported (BOOL, BYTE, WORD, DWORD, LWORD, SINT, INT, DINT, LINT, USINT, UINT, UDINT, ULINT, REAL, LREAL, CHAR, WCHAR, STRING, WSTRING), both scalar and arrays. The driver is registered via the plc4py.drivers setuptools entry point. Adds 20 unit tests (11 tag-parser tests + 9 end-to-end connection tests covering STATE round-trip, RANDOM reads, STDOUT writes, arrays, and multi-tag requests). Full test suite goes from 77 passed, 36 xfailed to 97 passed, 36 xfailed -- no regressions. --- .../simulated/SimulatedConfiguration.py | 33 ++++ .../drivers/simulated/SimulatedConnection.py | 115 +++++++++++ .../drivers/simulated/SimulatedDevice.py | 178 ++++++++++++++++++ .../plc4py/drivers/simulated/SimulatedTag.py | 79 ++++++++ plc4py/plc4py/drivers/simulated/__init__.py | 18 ++ plc4py/pyproject.toml | 1 + .../unit/plc4py/drivers/simulated/__init__.py | 18 ++ .../simulated/test_simulated_connection.py | 164 ++++++++++++++++ .../drivers/simulated/test_simulated_tag.py | 94 +++++++++ 9 files changed, 700 insertions(+) create mode 100644 plc4py/plc4py/drivers/simulated/SimulatedConfiguration.py create mode 100644 plc4py/plc4py/drivers/simulated/SimulatedConnection.py create mode 100644 plc4py/plc4py/drivers/simulated/SimulatedDevice.py create mode 100644 plc4py/plc4py/drivers/simulated/SimulatedTag.py create mode 100644 plc4py/plc4py/drivers/simulated/__init__.py create mode 100644 plc4py/tests/unit/plc4py/drivers/simulated/__init__.py create mode 100644 plc4py/tests/unit/plc4py/drivers/simulated/test_simulated_connection.py create mode 100644 plc4py/tests/unit/plc4py/drivers/simulated/test_simulated_tag.py diff --git a/plc4py/plc4py/drivers/simulated/SimulatedConfiguration.py b/plc4py/plc4py/drivers/simulated/SimulatedConfiguration.py new file mode 100644 index 00000000000..b0eeb3c62ac --- /dev/null +++ b/plc4py/plc4py/drivers/simulated/SimulatedConfiguration.py @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from plc4py.spi.configuration.PlcConfiguration import PlcConfiguration + + +class SimulatedConfiguration(PlcConfiguration): + """ + Configuration for the simulated driver. + + The host part of the connection URL is treated as the device name, + e.g. `simulated://plc-1` yields a device named `plc-1`. If the URL + does not provide a host, the device name defaults to `simulated`. + """ + + def __init__(self, url: str): + super().__init__(url) + self.device_name = self.host or "simulated" diff --git a/plc4py/plc4py/drivers/simulated/SimulatedConnection.py b/plc4py/plc4py/drivers/simulated/SimulatedConnection.py new file mode 100644 index 00000000000..983b854645d --- /dev/null +++ b/plc4py/plc4py/drivers/simulated/SimulatedConnection.py @@ -0,0 +1,115 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from dataclasses import dataclass, field +from typing import Type, Union + +import plc4py +from plc4py.api.authentication.PlcAuthentication import PlcAuthentication +from plc4py.api.messages.PlcRequest import ( + PlcReadRequest, + PlcRequest, + PlcWriteRequest, + ReadRequestBuilder, + WriteRequestBuilder, +) +from plc4py.api.messages.PlcResponse import PlcResponse +from plc4py.api.PlcConnection import PlcConnection +from plc4py.api.PlcDriver import PlcDriver +from plc4py.api.value.PlcValue import PlcResponseCode +from plc4py.drivers.PlcDriverLoader import PlcDriverLoader +from plc4py.drivers.simulated.SimulatedConfiguration import SimulatedConfiguration +from plc4py.drivers.simulated.SimulatedDevice import SimulatedDevice +from plc4py.drivers.simulated.SimulatedTag import SimulatedTagBuilder +from plc4py.spi.messages.PlcReader import DefaultPlcReader +from plc4py.spi.messages.PlcRequest import ( + DefaultReadRequestBuilder, + DefaultWriteRequestBuilder, +) +from plc4py.spi.messages.PlcWriter import DefaultPlcWriter +from plc4py.spi.transport.MockTransport import MockTransport +from plc4py.spi.transport.Plc4xBaseTransport import Plc4xBaseTransport + + +@dataclass +class SimulatedConnection(PlcConnection, DefaultPlcReader, DefaultPlcWriter): + """ + In-process PLC connection backed by a :class:`SimulatedDevice`. + + The connection does not open a network socket; it dispatches read and + write requests straight to the device. This makes it useful as a + reference driver and in tests that should not depend on a real PLC. + """ + + _device: SimulatedDevice = field(default_factory=lambda: SimulatedDevice()) + _transport: Union[Plc4xBaseTransport, None] = None + + @staticmethod + async def create(url: str) -> "SimulatedConnection": + config = SimulatedConfiguration(url) + connection = SimulatedConnection( + _device=SimulatedDevice(name=config.device_name) + ) + connection._transport = await MockTransport.create(None, None, None) + return connection + + def read_request_builder(self) -> ReadRequestBuilder: + return DefaultReadRequestBuilder(SimulatedTagBuilder) + + def write_request_builder(self) -> WriteRequestBuilder: + return DefaultWriteRequestBuilder(SimulatedTagBuilder) + + async def execute(self, request: PlcRequest) -> PlcResponse: + if not self.is_connected(): + return self._default_failed_request(PlcResponseCode.NOT_CONNECTED) + if isinstance(request, PlcReadRequest): + return await self._read(request) + if isinstance(request, PlcWriteRequest): + return await self._write(request) + return self._default_failed_request(PlcResponseCode.NOT_CONNECTED) + + +class SimulatedDriver(PlcDriver): + def __init__(self): + super().__init__() + self.protocol_code = "simulated" + self.protocol_name = "Simulated" + + async def get_connection( + self, + url: str, + authentication: PlcAuthentication = PlcAuthentication(), + ) -> PlcConnection: + return await SimulatedConnection.create(url) + + +class SimulatedDriverLoader(PlcDriverLoader): + """ + Pluggy hook implementation that lets ``PlcDriverManager`` discover the + simulated driver via the ``plc4py.drivers`` entry point. + """ + + @staticmethod + @plc4py.drivers.hookimpl + def get_driver() -> Type[SimulatedDriver]: + return SimulatedDriver + + @staticmethod + @plc4py.drivers.hookimpl + def key() -> str: + return "simulated" diff --git a/plc4py/plc4py/drivers/simulated/SimulatedDevice.py b/plc4py/plc4py/drivers/simulated/SimulatedDevice.py new file mode 100644 index 00000000000..ee8fc399c54 --- /dev/null +++ b/plc4py/plc4py/drivers/simulated/SimulatedDevice.py @@ -0,0 +1,178 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import logging +import random +import string +from asyncio import Transport +from dataclasses import dataclass, field +from typing import Callable, Dict, Optional + +from plc4py.api.exceptions.exceptions import PlcRuntimeException +from plc4py.api.messages.PlcRequest import PlcReadRequest, PlcWriteRequest +from plc4py.api.messages.PlcResponse import PlcReadResponse, PlcWriteResponse +from plc4py.api.value.PlcValue import PlcResponseCode, PlcValue +from plc4py.drivers.simulated.SimulatedTag import SimulatedTag, SimulatedTagType +from plc4py.spi.messages.utils.ResponseItem import ResponseItem +from plc4py.spi.values.PlcValues import ( + PlcBOOL, + PlcBYTE, + PlcCHAR, + PlcDINT, + PlcDWORD, + PlcINT, + PlcLINT, + PlcLREAL, + PlcLWORD, + PlcList, + PlcREAL, + PlcSINT, + PlcSTRING, + PlcUDINT, + PlcUINT, + PlcULINT, + PlcUSINT, + PlcWCHAR, + PlcWORD, +) + + +def _random_scalar(data_type: str) -> Optional[PlcValue]: + if data_type == "BOOL": + return PlcBOOL(bool(random.getrandbits(1))) + if data_type == "BYTE": + return PlcBYTE(random.randint(0, 0xFF)) + if data_type == "WORD": + return PlcWORD(random.randint(0, 0xFFFF)) + if data_type == "DWORD": + return PlcDWORD(random.randint(0, 0xFFFFFFFF)) + if data_type == "LWORD": + return PlcLWORD(random.randint(0, 0xFFFFFFFFFFFFFFFF)) + if data_type == "SINT": + return PlcSINT(random.randint(-0x80, 0x7F)) + if data_type == "INT": + return PlcINT(random.randint(-0x8000, 0x7FFF)) + if data_type == "DINT": + return PlcDINT(random.randint(-0x80000000, 0x7FFFFFFF)) + if data_type == "LINT": + return PlcLINT( + random.randint(-0x8000000000000000, 0x7FFFFFFFFFFFFFFF) + ) + if data_type == "USINT": + return PlcUSINT(random.randint(0, 0xFF)) + if data_type == "UINT": + return PlcUINT(random.randint(0, 0xFFFF)) + if data_type == "UDINT": + return PlcUDINT(random.randint(0, 0xFFFFFFFF)) + if data_type == "ULINT": + return PlcULINT(random.randint(0, 0xFFFFFFFFFFFFFFFF)) + if data_type == "REAL": + return PlcREAL(random.uniform(-1.0e6, 1.0e6)) + if data_type == "LREAL": + return PlcLREAL(random.uniform(-1.0e12, 1.0e12)) + if data_type == "CHAR": + return PlcCHAR(random.choice(string.ascii_letters)) + if data_type == "WCHAR": + return PlcWCHAR(random.choice(string.ascii_letters)) + if data_type in ("STRING", "WSTRING"): + length = random.randint(1, 16) + return PlcSTRING( + "".join(random.choices(string.ascii_letters, k=length)) + ) + return None + + +def _random_value(data_type: str, quantity: int) -> Optional[PlcValue]: + if quantity <= 1: + return _random_scalar(data_type) + values = [_random_scalar(data_type) for _ in range(quantity)] + if any(v is None for v in values): + return None + return PlcList(values) + + +@dataclass +class SimulatedDevice: + """ + Virtual PLC that keeps STATE tags in-memory, serves RANDOM tags with + freshly generated values, and writes STDOUT tags to the log. + """ + + name: str = "simulated" + _state: Dict[str, PlcValue] = field(default_factory=dict) + + async def read( + self, request: PlcReadRequest, transport: Transport + ) -> PlcReadResponse: + if len(request.tags) == 0: + raise PlcRuntimeException("No tags have been specified to read") + response_items: Dict[str, ResponseItem] = {} + for tag_name, tag in request.tags.items(): + if not isinstance(tag, SimulatedTag): + response_items[tag_name] = ResponseItem( + PlcResponseCode.INVALID_ADDRESS, None + ) + continue + response_items[tag_name] = self._read_tag(tag) + return PlcReadResponse(PlcResponseCode.OK, response_items) + + async def write( + self, request: PlcWriteRequest, transport: Transport + ) -> PlcWriteResponse: + if len(request.tags) == 0: + raise PlcRuntimeException("No tags have been specified to write") + response_items: Dict[str, ResponseItem] = {} + for tag_name, tag in request.tags.items(): + value = request.values.get(tag_name) + if not isinstance(tag, SimulatedTag): + response_items[tag_name] = ResponseItem( + PlcResponseCode.INVALID_ADDRESS, None + ) + continue + response_items[tag_name] = self._write_tag(tag, value) + return PlcWriteResponse(PlcResponseCode.OK, response_items) + + def _read_tag(self, tag: SimulatedTag) -> ResponseItem: + if tag.tag_type == SimulatedTagType.RANDOM: + value = _random_value(tag.data_type, tag.quantity) + if value is None: + return ResponseItem(PlcResponseCode.INVALID_DATATYPE, None) + return ResponseItem(PlcResponseCode.OK, value) + if tag.tag_type == SimulatedTagType.STATE: + value = self._state.get(tag.name) + if value is None: + return ResponseItem(PlcResponseCode.NOT_FOUND, None) + return ResponseItem(PlcResponseCode.OK, value) + # STDOUT tags cannot be read + return ResponseItem(PlcResponseCode.ACCESS_DENIED, None) + + def _write_tag( + self, tag: SimulatedTag, value: Optional[PlcValue] + ) -> ResponseItem: + if value is None: + return ResponseItem(PlcResponseCode.INVALID_DATA, None) + if tag.tag_type == SimulatedTagType.STATE: + self._state[tag.name] = value + return ResponseItem(PlcResponseCode.OK, None) + if tag.tag_type == SimulatedTagType.STDOUT: + logging.info( + "TEST PLC STDOUT [%s/%s]: %s", self.name, tag.name, value.get_raw() + ) + return ResponseItem(PlcResponseCode.OK, None) + # RANDOM tags cannot be written + return ResponseItem(PlcResponseCode.ACCESS_DENIED, None) diff --git a/plc4py/plc4py/drivers/simulated/SimulatedTag.py b/plc4py/plc4py/drivers/simulated/SimulatedTag.py new file mode 100644 index 00000000000..397e13ee6e4 --- /dev/null +++ b/plc4py/plc4py/drivers/simulated/SimulatedTag.py @@ -0,0 +1,79 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import re +from dataclasses import dataclass +from enum import Enum +from typing import ClassVar, Pattern + +from plc4py.api.exceptions.exceptions import PlcFieldParseException +from plc4py.api.messages.PlcField import PlcTag +from plc4py.spi.messages.PlcRequest import TagBuilder + + +class SimulatedTagType(Enum): + RANDOM = "RANDOM" + STATE = "STATE" + STDOUT = "STDOUT" + + +@dataclass +class SimulatedTag(PlcTag): + _ADDRESS_PATTERN: ClassVar[Pattern] = re.compile( + r"^(?P\w+)/(?P[a-zA-Z0-9_.]+):(?P[a-zA-Z0-9]+)(\[(?P\d+)\])?$" + ) + + tag_type: SimulatedTagType + name: str + data_type: str + quantity: int = 1 + + @classmethod + def matches(cls, address_string: str) -> bool: + return cls._ADDRESS_PATTERN.match(address_string) is not None + + @classmethod + def create(cls, address_string: str) -> "SimulatedTag": + matcher = cls._ADDRESS_PATTERN.match(address_string) + if matcher is None: + raise PlcFieldParseException( + f"Unable to parse address: {address_string}" + ) + try: + tag_type = SimulatedTagType[matcher.group("type").upper()] + except KeyError: + raise PlcFieldParseException( + f"Invalid simulated tag type: {matcher.group('type')}" + ) + name = matcher.group("name") + data_type = matcher.group("dataType").upper() + num_elements = matcher.group("numElements") + quantity = int(num_elements) if num_elements is not None else 1 + return cls( + address=address_string, + tag_type=tag_type, + name=name, + data_type=data_type, + quantity=quantity, + ) + + +class SimulatedTagBuilder(TagBuilder): + @staticmethod + def create(address_string: str) -> SimulatedTag: + return SimulatedTag.create(address_string) diff --git a/plc4py/plc4py/drivers/simulated/__init__.py b/plc4py/plc4py/drivers/simulated/__init__.py new file mode 100644 index 00000000000..342be717994 --- /dev/null +++ b/plc4py/plc4py/drivers/simulated/__init__.py @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# diff --git a/plc4py/pyproject.toml b/plc4py/pyproject.toml index b4abd5faad7..28c32f1f81b 100644 --- a/plc4py/pyproject.toml +++ b/plc4py/pyproject.toml @@ -78,6 +78,7 @@ dev = [ [project.entry-points."plc4py.drivers"] mock = "plc4py.drivers.mock.MockConnection:MockDriverLoader" modbus = "plc4py.drivers.modbus.ModbusConnection:ModbusDriverLoader" +simulated = "plc4py.drivers.simulated.SimulatedConnection:SimulatedDriverLoader" umas = "plc4py.drivers.umas.UmasConnection:UmasDriverLoader" [project.entry-points."plc4py.transports"] diff --git a/plc4py/tests/unit/plc4py/drivers/simulated/__init__.py b/plc4py/tests/unit/plc4py/drivers/simulated/__init__.py new file mode 100644 index 00000000000..342be717994 --- /dev/null +++ b/plc4py/tests/unit/plc4py/drivers/simulated/__init__.py @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# diff --git a/plc4py/tests/unit/plc4py/drivers/simulated/test_simulated_connection.py b/plc4py/tests/unit/plc4py/drivers/simulated/test_simulated_connection.py new file mode 100644 index 00000000000..d44476d5868 --- /dev/null +++ b/plc4py/tests/unit/plc4py/drivers/simulated/test_simulated_connection.py @@ -0,0 +1,164 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import pytest + +from plc4py.PlcDriverManager import PlcDriverManager +from plc4py.api.PlcConnection import PlcConnection +from plc4py.api.value.PlcValue import PlcResponseCode +from plc4py.drivers.simulated.SimulatedConnection import SimulatedConnection +from plc4py.spi.values.PlcValues import PlcBOOL, PlcDINT, PlcINT, PlcREAL, PlcSTRING + + +@pytest.mark.asyncio +async def test_driver_manager_returns_simulated_connection(): + driver_manager = PlcDriverManager() + async with driver_manager.connection("simulated://plc-1") as connection: + assert isinstance(connection, PlcConnection) + assert isinstance(connection, SimulatedConnection) + + +@pytest.mark.asyncio +async def test_state_tag_round_trip(): + """Writing a STATE tag and then reading it back returns the written value.""" + driver_manager = PlcDriverManager() + async with driver_manager.connection("simulated://plc-1") as connection: + with connection.write_request_builder() as builder: + builder.add_item("counter", "STATE/counter:INT", PlcINT(42)) + write_request = builder.build() + write_response = await connection.execute(write_request) + assert write_response.response_code == PlcResponseCode.OK + assert write_response.tags["counter"].response_code == PlcResponseCode.OK + + with connection.read_request_builder() as builder: + builder.add_item("counter", "STATE/counter:INT") + read_request = builder.build() + read_response = await connection.execute(read_request) + assert read_response.response_code == PlcResponseCode.OK + assert read_response.tags["counter"].response_code == PlcResponseCode.OK + assert read_response.tags["counter"].value == PlcINT(42) + + +@pytest.mark.asyncio +async def test_state_tag_read_before_write_is_not_found(): + driver_manager = PlcDriverManager() + async with driver_manager.connection("simulated://plc-1") as connection: + with connection.read_request_builder() as builder: + builder.add_item("unknown", "STATE/unknown:INT") + request = builder.build() + response = await connection.execute(request) + assert response.response_code == PlcResponseCode.OK + assert response.tags["unknown"].response_code == PlcResponseCode.NOT_FOUND + + +@pytest.mark.asyncio +async def test_random_tag_returns_value(): + driver_manager = PlcDriverManager() + async with driver_manager.connection("simulated://plc-1") as connection: + with connection.read_request_builder() as builder: + builder.add_item("temperature", "RANDOM/temperature:REAL") + request = builder.build() + response = await connection.execute(request) + assert response.response_code == PlcResponseCode.OK + assert response.tags["temperature"].response_code == PlcResponseCode.OK + assert isinstance(response.tags["temperature"].value, PlcREAL) + + +@pytest.mark.asyncio +async def test_random_tag_cannot_be_written(): + driver_manager = PlcDriverManager() + async with driver_manager.connection("simulated://plc-1") as connection: + with connection.write_request_builder() as builder: + builder.add_item("rnd", "RANDOM/rnd:INT", PlcINT(1)) + request = builder.build() + response = await connection.execute(request) + assert response.response_code == PlcResponseCode.OK + assert ( + response.tags["rnd"].response_code == PlcResponseCode.ACCESS_DENIED + ) + + +@pytest.mark.asyncio +async def test_stdout_tag_write_ok(caplog): + driver_manager = PlcDriverManager() + async with driver_manager.connection("simulated://plc-1") as connection: + with connection.write_request_builder() as builder: + builder.add_item( + "msg", "STDOUT/msg:STRING", PlcSTRING("hello") + ) + request = builder.build() + response = await connection.execute(request) + assert response.response_code == PlcResponseCode.OK + assert response.tags["msg"].response_code == PlcResponseCode.OK + + +@pytest.mark.asyncio +async def test_stdout_tag_cannot_be_read(): + driver_manager = PlcDriverManager() + async with driver_manager.connection("simulated://plc-1") as connection: + with connection.read_request_builder() as builder: + builder.add_item("msg", "STDOUT/msg:STRING") + request = builder.build() + response = await connection.execute(request) + assert response.response_code == PlcResponseCode.OK + assert ( + response.tags["msg"].response_code == PlcResponseCode.ACCESS_DENIED + ) + + +@pytest.mark.asyncio +async def test_state_tag_array_round_trip(): + driver_manager = PlcDriverManager() + async with driver_manager.connection("simulated://plc-1") as connection: + from plc4py.spi.values.PlcValues import PlcList + + values = PlcList([PlcDINT(i) for i in range(5)]) + with connection.write_request_builder() as builder: + builder.add_item("samples", "STATE/samples:DINT[5]", values) + write_request = builder.build() + write_response = await connection.execute(write_request) + assert write_response.tags["samples"].response_code == PlcResponseCode.OK + + with connection.read_request_builder() as builder: + builder.add_item("samples", "STATE/samples:DINT[5]") + read_request = builder.build() + read_response = await connection.execute(read_request) + assert read_response.tags["samples"].response_code == PlcResponseCode.OK + stored = read_response.tags["samples"].value + assert isinstance(stored, PlcList) + assert [v.get_int() for v in stored.get_list()] == [0, 1, 2, 3, 4] + + +@pytest.mark.asyncio +async def test_multi_tag_read(): + """Reads with multiple tags are dispatched to each tag independently.""" + driver_manager = PlcDriverManager() + async with driver_manager.connection("simulated://plc-1") as connection: + with connection.write_request_builder() as builder: + builder.add_item("flag", "STATE/flag:BOOL", PlcBOOL(True)) + await connection.execute(builder.build()) + + with connection.read_request_builder() as builder: + builder.add_item("flag", "STATE/flag:BOOL") + builder.add_item("rnd", "RANDOM/rnd:INT") + request = builder.build() + response = await connection.execute(request) + + assert response.tags["flag"].response_code == PlcResponseCode.OK + assert response.tags["flag"].value == PlcBOOL(True) + assert response.tags["rnd"].response_code == PlcResponseCode.OK diff --git a/plc4py/tests/unit/plc4py/drivers/simulated/test_simulated_tag.py b/plc4py/tests/unit/plc4py/drivers/simulated/test_simulated_tag.py new file mode 100644 index 00000000000..cee4140af03 --- /dev/null +++ b/plc4py/tests/unit/plc4py/drivers/simulated/test_simulated_tag.py @@ -0,0 +1,94 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import pytest + +from plc4py.api.exceptions.exceptions import PlcFieldParseException +from plc4py.drivers.simulated.SimulatedTag import ( + SimulatedTag, + SimulatedTagBuilder, + SimulatedTagType, +) + + +def test_parse_state_scalar(): + tag = SimulatedTag.create("STATE/counter:INT") + assert tag.tag_type == SimulatedTagType.STATE + assert tag.name == "counter" + assert tag.data_type == "INT" + assert tag.quantity == 1 + + +def test_parse_random_scalar(): + tag = SimulatedTag.create("RANDOM/temperature:REAL") + assert tag.tag_type == SimulatedTagType.RANDOM + assert tag.name == "temperature" + assert tag.data_type == "REAL" + assert tag.quantity == 1 + + +def test_parse_stdout_string(): + tag = SimulatedTag.create("STDOUT/message:STRING") + assert tag.tag_type == SimulatedTagType.STDOUT + assert tag.name == "message" + assert tag.data_type == "STRING" + + +def test_parse_array(): + tag = SimulatedTag.create("STATE/samples:DINT[5]") + assert tag.tag_type == SimulatedTagType.STATE + assert tag.name == "samples" + assert tag.data_type == "DINT" + assert tag.quantity == 5 + + +def test_parse_dotted_name(): + tag = SimulatedTag.create("STATE/motor.speed:REAL") + assert tag.name == "motor.speed" + + +def test_tag_type_is_case_insensitive(): + tag = SimulatedTag.create("state/x:BOOL") + assert tag.tag_type == SimulatedTagType.STATE + + +def test_data_type_is_normalized_to_upper(): + tag = SimulatedTag.create("STATE/x:int") + assert tag.data_type == "INT" + + +def test_matches_returns_false_for_invalid(): + assert SimulatedTag.matches("not a tag") is False + assert SimulatedTag.matches("STATE/x") is False + assert SimulatedTag.matches(":INT") is False + + +def test_create_raises_on_invalid_address(): + with pytest.raises(PlcFieldParseException): + SimulatedTag.create("not a tag") + + +def test_create_raises_on_unknown_tag_type(): + with pytest.raises(PlcFieldParseException): + SimulatedTag.create("BOGUS/x:INT") + + +def test_builder_creates_same_tag(): + built = SimulatedTagBuilder.create("STATE/x:INT") + direct = SimulatedTag.create("STATE/x:INT") + assert built == direct