diff --git a/src/tether/pro/license.py b/src/tether/pro/license.py index 019e39d..ec4a557 100644 --- a/src/tether/pro/license.py +++ b/src/tether/pro/license.py @@ -267,7 +267,8 @@ def load_license( "will require a signed license." ) - # Refresh local heartbeat so the next validation passes for another 24h + # Refresh local heartbeat so the next validation passes for another 24h. + new_heartbeat = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") refreshed = ProLicense( license_version=license.license_version, customer_id=license.customer_id, @@ -276,13 +277,21 @@ def load_license( expires_at=license.expires_at, hardware_binding=license.hardware_binding, signature=license.signature, - last_heartbeat_at=datetime.now(timezone.utc).strftime( - "%Y-%m-%dT%H:%M:%S.%fZ" - ), + last_heartbeat_at=new_heartbeat, ) try: + # Persist the FULL on-disk envelope with only last_heartbeat_at bumped. + # ProLicense models a subset of the license fields, so writing + # refreshed.to_dict() would silently DROP license_id / max_seats / + # key_id (and any future fields) — and since those are part of the + # signed payload (pro/signature.py), the NEXT load_license would fail + # signature verification with LicenseCorrupt: a v2 license that locks + # itself out on the second startup. last_heartbeat_at is NOT in the + # signed payload, so bumping it here never invalidates the signature. + persisted = dict(data) + persisted["last_heartbeat_at"] = new_heartbeat tmp = path_obj.with_suffix(path_obj.suffix + ".tmp") - tmp.write_text(json.dumps(refreshed.to_dict(), indent=2, sort_keys=True)) + tmp.write_text(json.dumps(persisted, indent=2, sort_keys=True)) tmp.replace(path_obj) os.chmod(path_obj, 0o600) except Exception as exc: # noqa: BLE001 — heartbeat write failure shouldn't kill startup diff --git a/tests/test_pro_license.py b/tests/test_pro_license.py index 2a9bd62..46e9283 100644 --- a/tests/test_pro_license.py +++ b/tests/test_pro_license.py @@ -292,6 +292,62 @@ def test_load_refreshes_heartbeat_on_success(tmp_path): assert (datetime.now(timezone.utc) - new_hb).total_seconds() < 60 +def test_heartbeat_rewrite_preserves_unmodelled_signed_fields(tmp_path): + """The heartbeat refresh must NOT drop signed-envelope fields. + + ProLicense models only a subset of the on-disk license. The old code + rewrote the file from ProLicense.to_dict(), silently dropping license_id, + max_seats, and key_id — which are part of the signed payload, so the NEXT + load_license would fail signature verification (a v2 license that locks + itself out on the second startup). Persisting the raw envelope (only + bumping last_heartbeat_at) fixes it; last_heartbeat_at is not signed. + """ + path = tmp_path / "pro.license" + expires = (datetime.now(timezone.utc) + timedelta(days=30)).strftime( + "%Y-%m-%dT%H:%M:%S.%fZ" + ) + old_hb = (datetime.now(timezone.utc) - timedelta(hours=12)).strftime( + "%Y-%m-%dT%H:%M:%S.%fZ" + ) + # A v1 (legacy unsigned) license — skips the signature gate — but carrying + # the same extra envelope fields a real signed v2 license would have. + data = { + "license_version": 1, + "customer_id": "acme", + "tier": "pro", + "issued_at": old_hb, + "expires_at": expires, + "hardware_binding": { + "gpu_uuid": "GPU-abc-123", + "gpu_name": "NVIDIA A10G", + "cpu_count": 8, + }, + "signature": "", + "last_heartbeat_at": old_hb, + # Signed-payload fields the ProLicense dataclass does not model: + "license_id": "lic_preserve_me", + "max_seats": 5, + "key_id": "key_abc123", + } + path.write_text(json.dumps(data)) + + load_license(path=path, current_hardware=_mk_hw()) + + persisted = json.loads(path.read_text()) + # The extra signed fields survive the heartbeat rewrite... + assert persisted["license_id"] == "lic_preserve_me" + assert persisted["max_seats"] == 5 + assert persisted["key_id"] == "key_abc123" + # ...and the heartbeat was still refreshed. + assert persisted["last_heartbeat_at"] != old_hb + + # A second load must also succeed (no LicenseCorrupt from dropped fields). + load_license(path=path, current_hardware=_mk_hw()) + again = json.loads(path.read_text()) + assert again["license_id"] == "lic_preserve_me" + assert again["max_seats"] == 5 + + # --------------------------------------------------------------------------- # issue_dev_license # ---------------------------------------------------------------------------