Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/batcontrol/inverter/baseclass.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
""" Parent Class for implementing common functions for all inverters """
from .inverter_interface import InverterInterface

DEFAULT_MIN_SOC = 5
DEFAULT_MAX_SOC = 100


class InverterBaseclass(InverterInterface):
def __init__(self, config):
self.min_soc = -1
Expand Down
6 changes: 3 additions & 3 deletions src/batcontrol/inverter/fronius.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import requests
from packaging import version
from cachetools import TTLCache
from .baseclass import InverterBaseclass
from .baseclass import DEFAULT_MAX_SOC, DEFAULT_MIN_SOC, InverterBaseclass

logger = logging.getLogger(__name__)
logger.info('Loading module ')
Expand Down Expand Up @@ -201,8 +201,8 @@ def __init__(self, config: dict) -> None:
self.previous_battery_config = self.get_battery_config()
self.previous_backup_power_config = None
# default values
self.max_soc = 100
self.min_soc = 5
self.max_soc = DEFAULT_MAX_SOC
self.min_soc = DEFAULT_MIN_SOC
# Energy Management (EM)
# 0 - On (Automatic , Default)
# 1 - Off (Adjustable)
Expand Down
4 changes: 4 additions & 0 deletions src/batcontrol/inverter/fronius_modbus/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .inverter import FroniusModbusInverter
from .tcp_transport import FroniusModbusTcpTransport

__all__ = ["FroniusModbusInverter", "FroniusModbusTcpTransport"]
112 changes: 112 additions & 0 deletions src/batcontrol/inverter/fronius_modbus/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""Pure Fronius GEN24 Modbus command building helpers."""

from .types import RegisterWrite

REG_STORCTL_MOD = 40348
REG_OUTWRTE = 40355
REG_INWRTE = 40356
REG_RVRT_TMS = 40358
REG_CHAGRISET = 40360

STORCTL_CHARGE_LIMIT = 1
STORCTL_DISCHARGE_LIMIT = 2
DEFAULT_RATE_SCALE_FACTOR = -2
FULL_RATE_PERCENT = 10000


def signed_to_unsigned_16(value: int) -> int:
"""Convert any integer to unsigned 16-bit (two's complement)."""
return value & 0xFFFF


def watts_to_pct_register_value(
watts: float,
max_charge_rate: float,
scale_factor: int = DEFAULT_RATE_SCALE_FACTOR,
) -> int:
"""Convert watts to a scaled percentage register value.

Raises:
ValueError: If ``max_charge_rate`` is zero or negative.
"""
if max_charge_rate <= 0:
raise ValueError(
f"max_charge_rate must be greater than 0, got {max_charge_rate}"
)

pct = max(0.0, min(100.0, (watts / max_charge_rate) * 100.0))
return int(pct * (10 ** (-scale_factor)))



def validate_revert_seconds(revert_seconds: int) -> int:
"""Validate that revert_seconds fits into an unsigned 16-bit register."""
if not 0 <= revert_seconds <= 65535:
raise ValueError("revert_seconds must be between 0 and 65535")
return revert_seconds



def build_force_charge_register_writes(
rate_watts: float,
max_charge_rate: float,
revert_seconds: int = 0,
) -> list[RegisterWrite]:
"""Build register writes for force-charge mode."""
rate_value = watts_to_pct_register_value(rate_watts, max_charge_rate)
revert_seconds = validate_revert_seconds(revert_seconds)

return [
RegisterWrite(REG_CHAGRISET, 1),
RegisterWrite(REG_RVRT_TMS, revert_seconds),
RegisterWrite(REG_OUTWRTE, signed_to_unsigned_16(-rate_value)),
Comment thread
filiplajszczak marked this conversation as resolved.
RegisterWrite(REG_INWRTE, FULL_RATE_PERCENT),
RegisterWrite(REG_STORCTL_MOD, STORCTL_DISCHARGE_LIMIT),
]



def build_avoid_discharge_register_writes(
revert_seconds: int = 0,
) -> list[RegisterWrite]:
"""Build register writes for hold/avoid-discharge mode."""
revert_seconds = validate_revert_seconds(revert_seconds)

return [
RegisterWrite(REG_RVRT_TMS, revert_seconds),
RegisterWrite(REG_OUTWRTE, 0),
Comment thread
filiplajszczak marked this conversation as resolved.
RegisterWrite(REG_INWRTE, FULL_RATE_PERCENT),
RegisterWrite(REG_STORCTL_MOD, STORCTL_DISCHARGE_LIMIT),
]



def build_allow_discharge_register_writes() -> list[RegisterWrite]:
"""Build register writes for returning to automatic mode."""
return [
RegisterWrite(REG_STORCTL_MOD, 0),
RegisterWrite(REG_OUTWRTE, FULL_RATE_PERCENT),
RegisterWrite(REG_INWRTE, FULL_RATE_PERCENT),
RegisterWrite(REG_RVRT_TMS, 0),
]



def build_limit_battery_charge_register_writes(
limit_charge_rate_watts: float,
max_charge_rate: float,
revert_seconds: int = 0,
) -> list[RegisterWrite]:
"""Build register writes for limiting battery charge while allowing discharge."""
rate_value = watts_to_pct_register_value(
limit_charge_rate_watts,
max_charge_rate,
)
revert_seconds = validate_revert_seconds(revert_seconds)

return [
RegisterWrite(REG_RVRT_TMS, revert_seconds),
RegisterWrite(REG_OUTWRTE, FULL_RATE_PERCENT),
Comment thread
filiplajszczak marked this conversation as resolved.
RegisterWrite(REG_INWRTE, rate_value),
RegisterWrite(REG_STORCTL_MOD, STORCTL_CHARGE_LIMIT),
]
47 changes: 47 additions & 0 deletions src/batcontrol/inverter/fronius_modbus/control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from .commands import (
build_allow_discharge_register_writes,
build_avoid_discharge_register_writes,
build_force_charge_register_writes,
build_limit_battery_charge_register_writes,
)
from .types import FroniusModbusTransport


class FroniusModbusControl:
def __init__(
self,
transport: FroniusModbusTransport,
max_charge_rate: float,
revert_seconds: int = 0,
):
self.transport = transport
self.max_charge_rate = max_charge_rate
self.revert_seconds = revert_seconds

def set_mode_force_charge(self, rate_watts: float):
self.transport.write_registers(
build_force_charge_register_writes(
rate_watts,
self.max_charge_rate,
revert_seconds=self.revert_seconds,
)
)

def set_mode_avoid_discharge(self):
self.transport.write_registers(
build_avoid_discharge_register_writes(
revert_seconds=self.revert_seconds,
)
)

def set_mode_allow_discharge(self):
self.transport.write_registers(build_allow_discharge_register_writes())

def set_mode_limit_battery_charge(self, rate_watts: float):
self.transport.write_registers(
build_limit_battery_charge_register_writes(
rate_watts,
self.max_charge_rate,
revert_seconds=self.revert_seconds,
)
)
80 changes: 80 additions & 0 deletions src/batcontrol/inverter/fronius_modbus/inverter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import logging

from ..baseclass import DEFAULT_MAX_SOC, DEFAULT_MIN_SOC, InverterBaseclass
from .control import FroniusModbusControl
from .storage_reader import FroniusModbusStorageReader
from .types import FroniusModbusTransport

logger = logging.getLogger(__name__)


class FroniusModbusInverter(InverterBaseclass):
def __init__(
self,
transport: FroniusModbusTransport,
max_charge_rate: float,
capacity: float = -1,
min_soc: float = DEFAULT_MIN_SOC,
max_soc: float = DEFAULT_MAX_SOC,
revert_seconds: int = 0,
):
super().__init__({})
Comment thread
filiplajszczak marked this conversation as resolved.
self.transport = transport
self.capacity = capacity
self.min_soc = min_soc
self.max_soc = max_soc
self.control = FroniusModbusControl(
transport,
max_charge_rate=max_charge_rate,
revert_seconds=revert_seconds,
)
self.storage_reader = FroniusModbusStorageReader(transport)

def set_mode_force_charge(self, chargerate: float):
self.control.set_mode_force_charge(chargerate)

def set_mode_avoid_discharge(self):
self.control.set_mode_avoid_discharge()

def set_mode_allow_discharge(self):
self.control.set_mode_allow_discharge()

def set_mode_limit_battery_charge(self, limit_charge_rate: int):
self.control.set_mode_limit_battery_charge(limit_charge_rate)

def get_capacity(self) -> float:
return self.capacity

def read_storage_status(self):
return self.storage_reader.read_storage_status()

def get_SOC(self) -> float:
return self.read_storage_status().soc_pct

def get_max_charge_rate(self) -> float:
return self.read_storage_status().max_charge_rate_w

def is_grid_charging_enabled(self) -> bool:
return self.read_storage_status().grid_charging_enabled

def get_min_reserve_soc(self) -> float:
return self.read_storage_status().minimum_reserve_pct

def get_charge_status(self) -> int:
return self.read_storage_status().charge_status

def shutdown(self):
try:
self.control.set_mode_allow_discharge()
except Exception as exc:
logger.warning(
"Failed to restore automatic mode during shutdown: %s",
exc,
)
finally:
close = getattr(self.transport, "close", None)
if close is not None:
close()

def activate_mqtt(self, api_mqtt_api: object):
pass
106 changes: 106 additions & 0 deletions src/batcontrol/inverter/fronius_modbus/reads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Pure Fronius GEN24 Modbus storage-register decoding helpers."""

from dataclasses import dataclass


REG_WCHAMAX = 40345
REG_STORCTL_MOD = 40348
REG_MIN_RSV_PCT = 40350
REG_CHASTATE = 40351
REG_CHAST = 40354
REG_OUTWRTE = 40355
REG_INWRTE = 40356
REG_RVRT_TMS = 40358
REG_CHAGRISET = 40360
REG_CHASTATE_SF = 40365
REG_INOUTWRTE_SF = 40368

CHAGRISET_ENABLED = 1


@dataclass(frozen=True)
class FroniusStorageStatus:
max_charge_rate_w: int
storage_control_mode: int
minimum_reserve_pct: float
soc_pct: float
charge_status: int
discharge_rate_pct: float
charge_rate_pct: float
revert_seconds: int
grid_charging_enabled: bool
soc_scale_factor: int
rate_scale_factor: int


def unsigned_to_signed_16(value: int) -> int:
"""Convert an unsigned 16-bit Modbus value to signed."""
if value >= 32768:
return value - 65536
return value


def decode_scaled_percent(raw_value: int, scale_factor: int) -> float:
"""Decode a SunSpec-style scaled percent value."""
return raw_value * (10 ** scale_factor)


def decode_storage_status(registers: dict[int, int]) -> FroniusStorageStatus:
"""Decode the known Fronius storage control/status registers.

The register set and scale handling here match the behavior observed in:
- local live read-only probing
- `fronius-modbus-control`
- `redpomodoro/fronius_modbus`

Note: `ChaGriSet` semantics are somewhat inconsistently documented across
ecosystem code. This module currently follows the live probe and
`fronius-modbus-control` interpretation that `1` means grid charging is
enabled.
"""
required_registers = [
REG_WCHAMAX,
REG_STORCTL_MOD,
REG_MIN_RSV_PCT,
REG_CHASTATE,
REG_CHAST,
REG_OUTWRTE,
REG_INWRTE,
REG_RVRT_TMS,
REG_CHAGRISET,
REG_CHASTATE_SF,
REG_INOUTWRTE_SF,
]

for register in required_registers:
if register not in registers:
raise KeyError(f"Missing required register {register}")

soc_scale_factor = unsigned_to_signed_16(registers[REG_CHASTATE_SF])
rate_scale_factor = unsigned_to_signed_16(registers[REG_INOUTWRTE_SF])

return FroniusStorageStatus(
max_charge_rate_w=registers[REG_WCHAMAX],
storage_control_mode=registers[REG_STORCTL_MOD],
minimum_reserve_pct=decode_scaled_percent(
registers[REG_MIN_RSV_PCT],
soc_scale_factor,
),
soc_pct=decode_scaled_percent(
registers[REG_CHASTATE],
soc_scale_factor,
),
charge_status=registers[REG_CHAST],
discharge_rate_pct=decode_scaled_percent(
unsigned_to_signed_16(registers[REG_OUTWRTE]),
rate_scale_factor,
),
charge_rate_pct=decode_scaled_percent(
unsigned_to_signed_16(registers[REG_INWRTE]),
rate_scale_factor,
),
revert_seconds=registers[REG_RVRT_TMS],
grid_charging_enabled=registers[REG_CHAGRISET] == CHAGRISET_ENABLED,
soc_scale_factor=soc_scale_factor,
rate_scale_factor=rate_scale_factor,
)
Loading