Skip to content
156 changes: 135 additions & 21 deletions pyk/src/pyk/kcfg/kcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def write_cfg_data(
for node_id in deleted_nodes:
self.kcfg_node_path(node_id).unlink(missing_ok=True)
for node_id in created_nodes:
self.kcfg_node_path(node_id).write_text(json.dumps(kcfg._nodes[node_id].to_dict()))
node_path = self.kcfg_node_path(node_id)
node_path.write_text(json.dumps(kcfg._nodes[node_id].to_dict()))
kcfg._nodes[node_id]._node_path = node_path
self.kcfg_json_path.write_text(json.dumps(dct))

def read_cfg_data(self) -> dict[str, Any]:
Expand All @@ -97,7 +99,38 @@ def read_cfg_data(self) -> dict[str, Any]:
attrs.append(KCFGNodeAttr.VACUOUS.value)
if node['id'] in dct['stuck']:
attrs.append(KCFGNodeAttr.STUCK.value)
new_nodes.append({'id': node['id'], 'cterm': node['cterm'], 'attrs': attrs})
new_nodes.append(
{
'id': node['id'],
'cterm': node['cterm'],
'attrs': attrs,
'node_path': str(self.kcfg_node_path(node['id'])),
}
)

dct['nodes'] = new_nodes

del dct['vacuous']
del dct['stuck']

return dct

def read_cfg_data_lazy(self) -> dict[str, Any]:
"""Read kcfg.json without loading node CTerms.

Node entries contain id, attrs, and the path to load the CTerm from,
but no 'cterm' key. Cover CSubsts are left as raw dicts.
"""
dct = json.loads(self.kcfg_json_path.read_text())

new_nodes = []
for node_id in dct.get('nodes') or []:
attrs = []
if node_id in dct['vacuous']:
attrs.append(KCFGNodeAttr.VACUOUS.value)
if node_id in dct['stuck']:
attrs.append(KCFGNodeAttr.STUCK.value)
new_nodes.append({'id': node_id, 'node_path': str(self.kcfg_node_path(node_id)), 'attrs': attrs})

dct['nodes'] = new_nodes

Expand All @@ -112,23 +145,47 @@ def read_node_data(self, node_id: int) -> dict[str, Any]:

class KCFG(Container[Union['KCFG.Node', 'KCFG.Successor']]):
@final
@dataclass(frozen=True, order=True)
class Node:
id: int
cterm: CTerm
_cterm: CTerm | None
_node_path: Path | None
attrs: frozenset[NodeAttr]

def __init__(self, id: int, cterm: CTerm, attrs: Iterable[NodeAttr] = ()) -> None:
object.__setattr__(self, 'id', id)
object.__setattr__(self, 'cterm', cterm)
object.__setattr__(self, 'attrs', frozenset(attrs))
def __init__(
self, id: int, cterm: CTerm | None = None, attrs: Iterable[NodeAttr] = (), node_path: Path | None = None
) -> None:
self.id = id
self._cterm = cterm
self._node_path = node_path
self.attrs = frozenset(attrs)

@property
def cterm(self) -> CTerm:
if self._cterm is None:
if self._node_path is None:
raise ValueError(f'Node {self.id} has no CTerm and no path to load from')
import json

node_dict = json.loads(self._node_path.read_text())
self._cterm = CTerm.from_dict(node_dict['cterm'])
return self._cterm

def evict(self) -> None:
"""Release the loaded CTerm from memory. Reloads from disk on next .cterm access."""
if self._node_path is not None:
self._cterm = None

def to_dict(self) -> dict[str, Any]:
return {'id': self.id, 'cterm': self.cterm.to_dict(), 'attrs': [attr.value for attr in self.attrs]}

@staticmethod
def from_dict(dct: dict[str, Any]) -> KCFG.Node:
return KCFG.Node(dct['id'], CTerm.from_dict(dct['cterm']), [NodeAttr(attr) for attr in dct['attrs']])
from pathlib import Path

node_path = Path(dct['node_path']) if 'node_path' in dct else None
return KCFG.Node(
dct['id'], CTerm.from_dict(dct['cterm']), [NodeAttr(attr) for attr in dct['attrs']], node_path=node_path
)

def add_attr(self, attr: NodeAttr) -> KCFG.Node:
return KCFG.Node(self.id, self.cterm, list(self.attrs) + [attr])
Expand All @@ -150,6 +207,27 @@ def let(self, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None =
def free_vars(self) -> frozenset[str]:
return frozenset(self.cterm.free_vars)

def __eq__(self, other: object) -> bool:
if not isinstance(other, KCFG.Node):
return NotImplemented
return self.id == other.id

def __hash__(self) -> int:
return hash(self.id)

def __lt__(self, other: object) -> bool:
if not isinstance(other, KCFG.Node):
return NotImplemented
return self.id < other.id

def __le__(self, other: object) -> bool:
if not isinstance(other, KCFG.Node):
return NotImplemented
return self.id <= other.id

def __repr__(self) -> str:
return f'Node(id={self.id}, attrs={self.attrs})'

class Successor(ABC):
source: KCFG.Node

Expand Down Expand Up @@ -441,8 +519,11 @@ def replace_target(self, node: KCFG.Node) -> KCFG.NDBranch:

_kcfg_store: KCFGStore | None

def __init__(self, cfg_dir: Path | None = None, optimize_memory: bool = True) -> None:
_auto_evict: bool

def __init__(self, cfg_dir: Path | None = None, optimize_memory: bool = True, auto_evict: bool = False) -> None:
self._node_id = 1
self._auto_evict = auto_evict
if optimize_memory:
from .store import OptimizedNodeStore

Expand Down Expand Up @@ -521,9 +602,13 @@ def uncovered(self) -> list[KCFG.Node]:

@staticmethod
def from_claim(
defn: KDefinition, claim: KClaim, cfg_dir: Path | None = None, optimize_memory: bool = True
defn: KDefinition,
claim: KClaim,
cfg_dir: Path | None = None,
optimize_memory: bool = True,
auto_evict: bool = False,
) -> tuple[KCFG, NodeIdLike, NodeIdLike]:
cfg = KCFG(cfg_dir=cfg_dir, optimize_memory=optimize_memory)
cfg = KCFG(cfg_dir=cfg_dir, optimize_memory=optimize_memory, auto_evict=auto_evict)
claim_body = claim.body
claim_body = defn.instantiate_cell_vars(claim_body)
claim_body = rename_generated_vars(claim_body)
Expand Down Expand Up @@ -664,14 +749,29 @@ def to_dict(self) -> dict[str, Any]:
return {k: v for k, v in res.items() if v}

@staticmethod
def from_dict(dct: Mapping[str, Any], optimize_memory: bool = True) -> KCFG:
cfg = KCFG(optimize_memory=optimize_memory)
def from_dict(
dct: Mapping[str, Any], optimize_memory: bool = True, lazy: bool = False, auto_evict: bool = False
) -> KCFG:
cfg = KCFG(optimize_memory=(optimize_memory and not lazy), auto_evict=auto_evict)

for node_dict in dct.get('nodes') or []:
node = KCFG.Node.from_dict(node_dict)
cfg.add_node(node)
if lazy:
from pathlib import Path

max_id = max([node.id for node in cfg.nodes], default=0)
from .lazy import LazyCSubst

for node_dict in dct.get('nodes') or []:
node = KCFG.Node(
id=node_dict['id'],
attrs=[NodeAttr(a) for a in node_dict.get('attrs', [])],
node_path=Path(node_dict['node_path']),
)
cfg.add_node(node)
else:
for node_dict in dct.get('nodes') or []:
node = KCFG.Node.from_dict(node_dict)
cfg.add_node(node)

max_id = max(cfg._nodes.keys(), default=0)
cfg._node_id = dct.get('next', max_id + 1)

for edge_dict in dct.get('edges') or []:
Expand All @@ -683,7 +783,12 @@ def from_dict(dct: Mapping[str, Any], optimize_memory: bool = True) -> KCFG:
cfg.add_successor(merged_edge)

for cover_dict in dct.get('covers') or []:
cover = KCFG.Cover.from_dict(cover_dict, cfg._nodes)
if lazy:
src = cfg._nodes[cover_dict['source']]
tgt = cfg._nodes[cover_dict['target']]
cover = KCFG.Cover(src, tgt, LazyCSubst(cover_dict['csubst'])) # type: ignore[arg-type]
else:
cover = KCFG.Cover.from_dict(cover_dict, cfg._nodes)
cfg.add_successor(cover)

for split_dict in dct.get('splits') or []:
Expand Down Expand Up @@ -907,6 +1012,8 @@ def add_successor(self, succ: KCFG.Successor) -> None:
self._splits[succ.source.id] = succ
elif type(succ) is KCFG.NDBranch:
self._ndbranches[succ.source.id] = succ
if self._auto_evict:
succ.source.evict()

def edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Edge | None:
source_id = self._resolve(source_id)
Expand Down Expand Up @@ -1295,9 +1402,16 @@ def write_cfg_data(self) -> None:
self._created_nodes.clear()

@staticmethod
def read_cfg_data(cfg_dir: Path) -> KCFG:
def read_cfg_data(cfg_dir: Path, auto_evict: bool = False) -> KCFG:
store = KCFGStore(cfg_dir)
cfg = KCFG.from_dict(store.read_cfg_data(), auto_evict=auto_evict)
cfg._kcfg_store = store
return cfg

@staticmethod
def read_cfg_data_lazy(cfg_dir: Path) -> KCFG:
store = KCFGStore(cfg_dir)
cfg = KCFG.from_dict(store.read_cfg_data())
cfg = KCFG.from_dict(store.read_cfg_data_lazy(), lazy=True)
cfg._kcfg_store = store
return cfg

Expand Down
102 changes: 102 additions & 0 deletions pyk/src/pyk/kcfg/lazy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Stubs for memory-efficient proof display.

LazyCSubst defers CSubst parsing until accessed.
APRProofStub answers proof-level queries from proof.json metadata.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Any

from ..cterm import CSubst
from ..kast.inner import KInner
from .kcfg import KCFG


class APRProofStub:
"""Lightweight stub for APRProof — answers proof-level queries without loading the full proof.

Duck-types enough of APRProof for APRProofNodePrinter.node_attrs() to work.
Uses proof.json metadata + the KCFG for graph queries.
"""

def __init__(self, proof_dict: dict[str, Any], kcfg: KCFG) -> None:
self.init = int(proof_dict['init'])
self.target = int(proof_dict['target'])
self._terminal_ids = set(proof_dict.get('terminal') or [])
self._bounded_ids = set(proof_dict.get('bounded') or [])
self._refuted_ids = {int(k) for k in (proof_dict.get('node_refutations') or {}).keys()}
self.kcfg = kcfg

def _resolve(self, node_id: int) -> int:
return node_id

def is_init(self, node_id: int) -> bool:
return node_id == self.init

def is_target(self, node_id: int) -> bool:
return node_id == self.target

def is_terminal(self, node_id: int) -> bool:
return node_id in self._terminal_ids

def is_explorable(self, node_id: int) -> bool:
return (
self.kcfg.is_leaf(node_id)
and not self.is_terminal(node_id)
and not self.kcfg.is_stuck(node_id)
and not self.kcfg.is_vacuous(node_id)
)

def is_pending(self, node_id: int) -> bool:
return (
self.is_explorable(node_id)
and not self.is_target(node_id)
and not self.is_refuted(node_id)
and not self.is_bounded(node_id)
)

def is_refuted(self, node_id: int) -> bool:
return node_id in self._refuted_ids

def is_bounded(self, node_id: int) -> bool:
return node_id in self._bounded_ids


class LazyCSubst:
"""Duck-types CSubst. Loads from a JSON dict on first access."""

_raw: dict[str, Any]
_csubst: CSubst | None

def __init__(self, raw_dict: dict[str, Any]) -> None:
self._raw: dict[str, Any] = raw_dict
self._csubst = None

def _load(self) -> CSubst:
if self._csubst is None:
from ..cterm import CSubst

self._csubst = CSubst.from_dict(self._raw)
return self._csubst

@property
def constraints(self) -> tuple:
return self._load().constraints

@property
def subst(self) -> object:
return self._load().subst

def pred(self, *args: Any, **kwargs: Any) -> KInner:
return self._load().pred(*args, **kwargs)

def to_dict(self) -> dict:
return self._load().to_dict()

def evict(self) -> None:
"""Release the loaded CSubst from memory, keep raw dict for reload."""
self._csubst = None
Loading
Loading