Skip to content

Commit 38b0445

Browse files
committed
Test energy scanner tool
1 parent ca7baa6 commit 38b0445

File tree

3 files changed

+89
-10
lines changed

3 files changed

+89
-10
lines changed

tests/test_application.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,7 @@ def passthrough_serial_conn(loop, protocol_factory, url, *args, **kwargs):
162162
return server_znp
163163

164164

165-
@pytest.fixture
166-
def application(znp_server):
165+
def make_application(znp_server):
167166
app = ControllerApplication(config_for_port_path("/dev/ttyFAKE0"))
168167

169168
# Handle the entire startup sequence
@@ -291,6 +290,11 @@ def on_endpoint_deletion(req):
291290
return app, znp_server
292291

293292

293+
@pytest.fixture
294+
def application(znp_server):
295+
return make_application(znp_server)
296+
297+
294298
@pytest_mark_asyncio_timeout(seconds=5)
295299
async def test_application_startup_skip_bootloader(application, mocker):
296300
app, znp_server = application

tests/test_tools_energy_scan.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import asyncio
2+
3+
import zigpy_znp.types as t
4+
import zigpy_znp.commands as c
5+
6+
from zigpy_znp.tools.energy_scan import channels_from_channel_mask, main as energy_scan
7+
8+
from test_api import pytest_mark_asyncio_timeout # noqa: F401
9+
from test_application import make_application, znp_server # noqa: F401
10+
from test_tools_nvram import openable_serial_znp_server # noqa: F401
11+
12+
13+
def test_channels_from_channel_mask():
14+
def channel_list(channels):
15+
return list(channels_from_channel_mask(channels))
16+
17+
assert channel_list(t.Channels.ALL_CHANNELS) == list(range(11, 26 + 1))
18+
assert channel_list(t.Channels.NO_CHANNELS) == []
19+
assert channel_list(t.Channels.CHANNEL_11 | t.Channels.CHANNEL_20) == [11, 20]
20+
assert channel_list(t.Channels.CHANNEL_15) == [15]
21+
22+
23+
@pytest_mark_asyncio_timeout(seconds=5)
24+
async def test_energy_scan(openable_serial_znp_server, capsys): # noqa: F811
25+
app, openable_serial_znp_server = make_application(openable_serial_znp_server)
26+
27+
def fake_scanner(request):
28+
async def response(request):
29+
openable_serial_znp_server.send(
30+
c.ZDO.MgmtNWKUpdateReq.Rsp(Status=t.Status.SUCCESS)
31+
)
32+
33+
delay = 2 ** request.ScanDuration
34+
num_channels = len(list(channels_from_channel_mask(request.Channels)))
35+
36+
for i in range(request.ScanCount):
37+
await asyncio.sleep(delay / 100)
38+
39+
openable_serial_znp_server.send(
40+
c.ZDO.MgmtNWKUpdateNotify.Callback(
41+
Src=0x0000,
42+
Status=t.ZDOStatus.SUCCESS,
43+
ScannedChannels=request.Channels,
44+
TotalTransmissions=998,
45+
TransmissionFailures=2,
46+
EnergyValues=list(range(num_channels)),
47+
)
48+
)
49+
50+
asyncio.create_task(response(request))
51+
52+
openable_serial_znp_server.callback_for_response(
53+
c.ZDO.MgmtNWKUpdateReq.Req(
54+
Dst=0x0000, DstAddrMode=t.AddrMode.NWK, NwkManagerAddr=0x0000, partial=True,
55+
),
56+
fake_scanner,
57+
)
58+
59+
await energy_scan(["-n", "1", openable_serial_znp_server._port_path, "-v", "-v"])
60+
61+
captured = capsys.readouterr()
62+
63+
for i in range(11, 26 + 1):
64+
assert str(i) in captured.out

zigpy_znp/tools/energy_scan.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import asyncio
33
import logging
44
import argparse
5+
import itertools
56

67
from collections import defaultdict, deque
78

@@ -19,7 +20,7 @@ def channels_from_channel_mask(channels: t.Channels):
1920
yield channel
2021

2122

22-
async def perform_energy_scan(radio_path, auto_form=False):
23+
async def perform_energy_scan(radio_path, num_scans=None, auto_form=False):
2324
LOGGER.info("Starting up zigpy-znp")
2425

2526
app = ControllerApplication(
@@ -30,18 +31,18 @@ async def perform_energy_scan(radio_path, auto_form=False):
3031
await app.startup(auto_form=auto_form, write_nvram=auto_form)
3132
except RuntimeError:
3233
LOGGER.error("The hardware needs to be configured before this tool can work.")
33-
LOGGER.error(
34-
"Either re-run this command with -f (--form) or use the hardware "
35-
"once with either ZHA or Zigbee2Mqtt."
36-
)
34+
LOGGER.error("Re-run this command with -f (--form).")
3735
return
3836

3937
LOGGER.info("Running scan...")
4038

4139
# We compute an average over the last 5 scans
4240
channel_energies = defaultdict(lambda: deque([], maxlen=5))
4341

44-
while True:
42+
for i in itertools.count(start=1):
43+
if num_scans is not None and i > num_scans:
44+
break
45+
4546
rsp = await app._znp.request_callback_rsp(
4647
request=c.ZDO.MgmtNWKUpdateReq.Req(
4748
Dst=0x0000,
@@ -61,7 +62,7 @@ async def perform_energy_scan(radio_path, auto_form=False):
6162
energies = channel_energies[channel]
6263
energies.append(energy)
6364

64-
total = 0xFF * energies.maxlen
65+
total = 0xFF * len(energies)
6566

6667
print(f"Channel energy ({len(energies)} / {energies.maxlen}):")
6768

@@ -89,6 +90,14 @@ async def main(argv):
8990
default=0,
9091
help="Increases verbosity",
9192
)
93+
parser.add_argument(
94+
"-n",
95+
"--num-scans",
96+
dest="num_scans",
97+
type=int,
98+
default=None,
99+
help="Number of scans to perform before exiting",
100+
)
92101
parser.add_argument(
93102
"-f",
94103
"--form",
@@ -107,7 +116,9 @@ async def main(argv):
107116
# We just want to make sure it exists
108117
args.serial.close()
109118

110-
await perform_energy_scan(args.serial.name, auto_form=args.form)
119+
await perform_energy_scan(
120+
args.serial.name, auto_form=args.form, num_scans=args.num_scans
121+
)
111122

112123

113124
if __name__ == "__main__":

0 commit comments

Comments
 (0)