|
| 1 | +"""Tests for Q10 StatusTrait.""" |
| 2 | + |
| 3 | +import json |
| 4 | +from typing import Any |
| 5 | + |
| 6 | +import pytest |
| 7 | + |
| 8 | +from roborock.data.b01_q10.b01_q10_code_mappings import ( |
| 9 | + B01_Q10_DP, |
| 10 | + YXDeviceState, |
| 11 | + YXDeviceWorkMode, |
| 12 | + YXFanLevel, |
| 13 | +) |
| 14 | +from roborock.devices.traits.b01.q10.status import StatusTrait |
| 15 | +from roborock.roborock_message import RoborockMessage |
| 16 | +from tests.fixtures.channel_fixtures import FakeChannel |
| 17 | + |
| 18 | + |
| 19 | +@pytest.fixture(name="fake_channel") |
| 20 | +def fake_channel_fixture() -> FakeChannel: |
| 21 | + return FakeChannel() |
| 22 | + |
| 23 | + |
| 24 | +@pytest.fixture(name="status_trait") |
| 25 | +def status_trait_fixture(fake_channel: FakeChannel) -> StatusTrait: |
| 26 | + return StatusTrait(fake_channel) # type: ignore[arg-type] |
| 27 | + |
| 28 | + |
| 29 | +def build_q10_response(dps: dict[str, Any]) -> RoborockMessage: |
| 30 | + """Build a Q10 MQTT response message.""" |
| 31 | + payload = {"dps": dps} |
| 32 | + return RoborockMessage( |
| 33 | + protocol=11, # MQTT_PROTO |
| 34 | + payload=json.dumps(payload).encode(), |
| 35 | + seq=0, |
| 36 | + version=b"B01", |
| 37 | + ) |
| 38 | + |
| 39 | + |
| 40 | +async def test_status_trait_battery(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: |
| 41 | + """Test getting battery status.""" |
| 42 | + # Queue a response with battery data |
| 43 | + fake_channel.response_queue.append(build_q10_response({"122": 85})) |
| 44 | + |
| 45 | + result = await status_trait.refresh() |
| 46 | + |
| 47 | + assert status_trait.battery == 85 |
| 48 | + assert B01_Q10_DP.BATTERY in result |
| 49 | + |
| 50 | + |
| 51 | +async def test_status_trait_state(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: |
| 52 | + """Test getting device state.""" |
| 53 | + # CLEANING_STATE = 5 |
| 54 | + fake_channel.response_queue.append(build_q10_response({"121": 5, "122": 100})) |
| 55 | + |
| 56 | + result = await status_trait.refresh() |
| 57 | + |
| 58 | + assert status_trait.state == YXDeviceState.CLEANING_STATE |
| 59 | + assert B01_Q10_DP.STATUS in result |
| 60 | + |
| 61 | + |
| 62 | +async def test_status_trait_fan_level(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: |
| 63 | + """Test getting fan level.""" |
| 64 | + # FAN_LEVEL NORMAL = 2 |
| 65 | + fake_channel.response_queue.append(build_q10_response({"121": 3, "122": 100, "123": 2})) |
| 66 | + |
| 67 | + result = await status_trait.refresh() |
| 68 | + |
| 69 | + assert status_trait.fan_level == YXFanLevel.NORMAL |
| 70 | + assert B01_Q10_DP.FAN_LEVEL in result |
| 71 | + |
| 72 | + |
| 73 | +async def test_status_trait_clean_mode(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: |
| 74 | + """Test getting cleaning mode.""" |
| 75 | + # CLEAN_MODE BOTH_WORK = 1 |
| 76 | + fake_channel.response_queue.append(build_q10_response({"121": 3, "122": 100, "137": 1})) |
| 77 | + |
| 78 | + result = await status_trait.refresh() |
| 79 | + |
| 80 | + assert status_trait.clean_mode == YXDeviceWorkMode.BOTH_WORK |
| 81 | + assert B01_Q10_DP.CLEAN_MODE in result |
| 82 | + |
| 83 | + |
| 84 | +async def test_status_trait_cleaning_progress(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: |
| 85 | + """Test getting cleaning progress.""" |
| 86 | + fake_channel.response_queue.append( |
| 87 | + build_q10_response({"121": 5, "122": 100, "141": 25}) |
| 88 | + ) |
| 89 | + |
| 90 | + result = await status_trait.refresh() |
| 91 | + |
| 92 | + assert status_trait.cleaning_progress == 25 |
| 93 | + assert B01_Q10_DP.CLEANING_PROGRESS in result |
| 94 | + |
| 95 | + |
| 96 | +async def test_status_trait_empty_data(status_trait: StatusTrait) -> None: |
| 97 | + """Test status trait with no data queued.""" |
| 98 | + # Test that properties return None when data is empty |
| 99 | + assert status_trait.battery is None |
| 100 | + assert status_trait.state is None |
| 101 | + assert status_trait.fan_level is None |
| 102 | + assert status_trait.clean_mode is None |
| 103 | + assert status_trait.cleaning_progress is None |
| 104 | + |
| 105 | + |
| 106 | +async def test_status_trait_data_property(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: |
| 107 | + """Test that data property returns the raw data.""" |
| 108 | + test_data = {"121": 5, "122": 100, "123": 2} |
| 109 | + fake_channel.response_queue.append(build_q10_response(test_data)) |
| 110 | + |
| 111 | + await status_trait.refresh() |
| 112 | + |
| 113 | + # Convert string keys to B01_Q10_DP keys |
| 114 | + assert B01_Q10_DP.STATUS in status_trait.data |
| 115 | + assert B01_Q10_DP.BATTERY in status_trait.data |
| 116 | + assert B01_Q10_DP.FAN_LEVEL in status_trait.data |
| 117 | + |
| 118 | + |
| 119 | +async def test_status_trait_unknown_state(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: |
| 120 | + """Test handling of unknown state code.""" |
| 121 | + # Use a code that doesn't map to any state |
| 122 | + fake_channel.response_queue.append(build_q10_response({"121": 999, "122": 100})) |
| 123 | + |
| 124 | + await status_trait.refresh() |
| 125 | + |
| 126 | + # Should return UNKNOWN or None |
| 127 | + assert status_trait.state == YXDeviceState.UNKNOWN or status_trait.state is None |
| 128 | + |
| 129 | + |
| 130 | +async def test_status_trait_multiple_refreshes(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: |
| 131 | + """Test that multiple refreshes update the status.""" |
| 132 | + # First refresh |
| 133 | + fake_channel.response_queue.append(build_q10_response({"121": 3, "122": 80})) |
| 134 | + await status_trait.refresh() |
| 135 | + assert status_trait.battery == 80 |
| 136 | + |
| 137 | + # Second refresh with different battery |
| 138 | + fake_channel.response_queue.append(build_q10_response({"121": 5, "122": 60})) |
| 139 | + await status_trait.refresh() |
| 140 | + assert status_trait.battery == 60 |
0 commit comments