Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 36 additions & 19 deletions flowquery-py/src/graph/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,20 @@ def clone(self) -> "IndexEntry":
class Layer:
"""Layer for managing index state at a specific level."""

def __init__(self, index: Dict[str, IndexEntry]):
self._index: Dict[str, IndexEntry] = index
def __init__(self, indexes: Dict[str, Dict[str, IndexEntry]]):
self._indexes: Dict[str, Dict[str, IndexEntry]] = indexes
self._current: int = -1

def index(self, name: str) -> Dict[str, IndexEntry]:
"""Get or create an index by name."""
if name not in self._indexes:
self._indexes[name] = {}
return self._indexes[name]

@property
def index(self) -> Dict[str, IndexEntry]:
"""Get the index dictionary."""
return self._index
def indexes(self) -> Dict[str, Dict[str, IndexEntry]]:
"""Get all indexes."""
return self._indexes

@property
def current(self) -> int:
Expand All @@ -67,30 +73,40 @@ def __init__(self, records: Optional[List[Dict[str, Any]]] = None):

def _build_index(self, key: str, level: int = 0) -> None:
"""Build an index for the given key at the specified level."""
self.layer(level).index.clear()
for idx, record in enumerate(self._records):
idx = self.layer(level).index(key)
idx.clear()
for i, record in enumerate(self._records):
if key in record:
if record[key] not in self.layer(level).index:
self.layer(level).index[record[key]] = IndexEntry()
self.layer(level).index[record[key]].add(idx)
if record[key] not in idx:
idx[record[key]] = IndexEntry()
idx[record[key]].add(i)

def layer(self, level: int = 0) -> Layer:
"""Get or create a layer at the specified level."""
if level not in self._layers:
first = self._layers[0]
cloned = {}
for key, entry in first.index.items():
cloned[key] = entry.clone()
self._layers[level] = Layer(cloned)
cloned_indexes = {}
for name, index_map in first.indexes.items():
cloned_map = {}
for key, entry in index_map.items():
cloned_map[key] = entry.clone()
cloned_indexes[name] = cloned_map
self._layers[level] = Layer(cloned_indexes)
return self._layers[level]

def _find(self, key: str, level: int = 0) -> bool:
def _find(self, key: str, level: int = 0, index_name: Optional[str] = None) -> bool:
"""Find the next record with the given key value."""
if key not in self.layer(level).index:
idx: Optional[Dict[str, IndexEntry]] = None
if index_name:
idx = self.layer(level).index(index_name)
else:
indexes = self.layer(level).indexes
idx = next(iter(indexes.values())) if indexes else None
if not idx or key not in idx:
self.layer(level).current = len(self._records) # Move to end
return False
else:
entry = self.layer(level).index[key]
entry = idx[key]
more = entry.next()
if not more:
self.layer(level).current = len(self._records) # Move to end
Expand All @@ -102,8 +118,9 @@ def reset(self) -> None:
"""Reset iteration to the beginning."""
for layer in self._layers.values():
layer.current = -1
for entry in layer.index.values():
entry.reset()
for index_map in layer.indexes.values():
for entry in index_map.values():
entry.reset()

def next(self, level: int = 0) -> bool:
"""Move to the next record. Returns True if successful."""
Expand Down
23 changes: 23 additions & 0 deletions flowquery-py/src/graph/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,31 @@ def label(self, value: Optional[str]) -> None:
def properties(self) -> Dict[str, Expression]:
return self._properties

@properties.setter
def properties(self, value: Dict[str, Expression]) -> None:
self._properties = value

def set_property(self, key: str, value: Expression) -> None:
self._properties[key] = value

def get_property(self, key: str) -> Optional[Expression]:
return self._properties.get(key)

def _matches_properties(self, hop: int = 0) -> bool:
"""Check if current record matches all constraint properties."""
if not self._properties:
return True
if self._data is None:
return True
for key, expression in self._properties.items():
record = self._data.current(hop)
if record is None:
raise ValueError("No current node data available")
if key not in record:
raise ValueError("Node does not have property")
return bool(record[key] == expression.value())
return True

def set_value(self, value: Dict[str, Any]) -> None:
self._value = value # type: ignore[assignment]

Expand Down Expand Up @@ -88,6 +107,8 @@ async def next(self) -> None:
current = self._data.current()
if current is not None:
self.set_value(current)
if not self._matches_properties():
continue
if self._outgoing and self._value:
await self._outgoing.find(self._value['id'])
await self.run_todo_next()
Expand All @@ -99,6 +120,8 @@ async def find(self, id_: str, hop: int = 0) -> None:
current = self._data.current(hop)
if current is not None:
self.set_value(current)
if not self._matches_properties(hop):
continue
if self._incoming:
self._incoming.set_end_node(self)
if self._outgoing and self._value:
Expand Down
2 changes: 1 addition & 1 deletion flowquery-py/src/graph/node_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, records: Optional[List[Dict[str, Any]]] = None):

def find(self, id_: str, hop: int = 0) -> bool:
"""Find a record by ID."""
return self._find(id_, hop)
return self._find(id_, hop, "id")

def current(self, hop: int = 0) -> Optional[Dict[str, Any]]:
"""Get the current record."""
Expand Down
57 changes: 48 additions & 9 deletions flowquery-py/src/graph/relationship.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def __init__(self) -> None:
self._hops: Hops = Hops()
self._source: Optional['Node'] = None
self._target: Optional['Node'] = None
self._direction: str = "right"
self._data: Optional['RelationshipData'] = None
self._value: Optional[Union[RelationshipMatchRecord, List[RelationshipMatchRecord]]] = None
self._matches: RelationshipMatchCollector = RelationshipMatchCollector()
Expand Down Expand Up @@ -54,10 +55,26 @@ def hops(self, value: Hops) -> None:

@property
def properties(self) -> Dict[str, Any]:
"""Get properties from relationship data."""
if self._data:
return self._data.properties() or {}
return {}
return self._properties

@properties.setter
def properties(self, value: Dict[str, Any]) -> None:
self._properties = value

def _matches_properties(self, hop: int = 0) -> bool:
"""Check if current record matches all constraint properties."""
if not self._properties:
return True
if self._data is None:
return True
for key, expression in self._properties.items():
record = self._data.current(hop)
if record is None:
raise ValueError("No current relationship data available")
if key not in record:
raise ValueError("Relationship does not have property")
return bool(record[key] == expression.value())
return True

@property
def source(self) -> Optional['Node']:
Expand All @@ -75,6 +92,14 @@ def target(self) -> Optional['Node']:
def target(self, value: 'Node') -> None:
self._target = value

@property
def direction(self) -> str:
return self._direction

@direction.setter
def direction(self, value: str) -> None:
self._direction = value

# Keep start/end aliases for backward compatibility
@property
def start(self) -> Optional['Node']:
Expand All @@ -95,6 +120,9 @@ def end(self, value: 'Node') -> None:
def set_data(self, data: Optional['RelationshipData']) -> None:
self._data = data

def get_data(self) -> Optional['RelationshipData']:
return self._data

def set_value(self, relationship: 'Relationship') -> None:
"""Set value by pushing match to collector."""
self._matches.push(relationship)
Expand All @@ -115,11 +143,13 @@ async def find(self, left_id: str, hop: int = 0) -> None:
"""Find relationships starting from the given node ID."""
# Save original source node
original = self._source
is_left = self._direction == "left"
if hop > 0:
# For hops greater than 0, the source becomes the target of the previous hop
self._source = self._target
if hop == 0:
self._data.reset() if self._data else None
if self._data:
self._data.reset()

# Handle zero-hop case: when min is 0 on a variable-length relationship,
# match source node as target (no traversal)
Expand All @@ -128,16 +158,25 @@ async def find(self, left_id: str, hop: int = 0) -> None:
# No relationship match is pushed since no edge is traversed
await self._target.find(left_id, hop)

while self._data and self._data.find(left_id, hop):
def find_match(id_: str, h: int) -> bool:
if self._data is None:
return False
if is_left:
return self._data.find_reverse(id_, h)
return self._data.find(id_, h)
follow_id = 'left_id' if is_left else 'right_id'
while self._data and find_match(left_id, hop):
data = self._data.current(hop)
if data and self._hops and hop >= self._hops.min:
self.set_value(self)
if self._target and 'right_id' in data:
await self._target.find(data['right_id'], hop)
if not self._matches_properties(hop):
continue
if self._target and follow_id in data:
await self._target.find(data[follow_id], hop)
if self._matches.is_circular():
raise ValueError("Circular relationship detected")
if self._hops and hop + 1 < self._hops.max:
await self.find(data['right_id'], hop + 1)
await self.find(data[follow_id], hop + 1)
self._matches.pop()

# Restore original source node
Expand Down
9 changes: 7 additions & 2 deletions flowquery-py/src/graph/relationship_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@ class RelationshipRecord(TypedDict, total=False):


class RelationshipData(Data):
"""Relationship data class extending Data with left_id-based indexing."""
"""Relationship data class extending Data with left_id and right_id indexing."""

def __init__(self, records: Optional[List[Dict[str, Any]]] = None):
super().__init__(records)
self._build_index("left_id")
self._build_index("right_id")

def find(self, left_id: str, hop: int = 0) -> bool:
"""Find a relationship by start node ID."""
return self._find(left_id, hop)
return self._find(left_id, hop, "left_id")

def find_reverse(self, right_id: str, hop: int = 0) -> bool:
"""Find a relationship by end node ID (reverse direction)."""
return self._find(right_id, hop, "right_id")

def properties(self) -> Optional[Dict[str, Any]]:
"""Get properties of current relationship, excluding left_id and right_id."""
Expand Down
4 changes: 3 additions & 1 deletion flowquery-py/src/graph/relationship_match_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ def __init__(self) -> None:
def push(self, relationship: 'Relationship') -> RelationshipMatchRecord:
"""Push a new match onto the collector."""
start_node_value = relationship.source.value() if relationship.source else None
rel_data = relationship.get_data()
rel_props: Dict[str, Any] = (rel_data.properties() or {}) if rel_data else {}
match: RelationshipMatchRecord = {
"type": relationship.type or "",
"startNode": start_node_value or {},
"endNode": None,
"properties": relationship.properties,
"properties": rel_props,
}
self._matches.append(match)
if isinstance(start_node_value, dict):
Expand Down
41 changes: 40 additions & 1 deletion flowquery-py/src/parsing/parser.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Main parser for FlowQuery statements."""

import sys
from typing import Dict, Iterator, List, Optional, cast
from typing import Dict, Iterator, List, Optional, Tuple, cast

from ..graph.hops import Hops
from ..graph.node import Node
Expand Down Expand Up @@ -467,6 +467,7 @@ def _parse_node(self) -> Optional[Node]:
self._skip_whitespace_and_comments()
node = Node()
node.label = label
node.properties = dict(self._parse_properties())
if label is not None and identifier is not None:
node.identifier = identifier
self._variables[identifier] = node
Expand All @@ -481,7 +482,9 @@ def _parse_node(self) -> Optional[Node]:
return node

def _parse_relationship(self) -> Optional[Relationship]:
direction = "right"
if self.token.is_less_than() and self.peek() is not None and self.peek().is_subtract():
direction = "left"
self.set_next_token()
self.set_next_token()
elif self.token.is_subtract():
Expand All @@ -503,6 +506,7 @@ def _parse_relationship(self) -> Optional[Relationship]:
rel_type: str = self.token.value or ""
self.set_next_token()
hops = self._parse_relationship_hops()
properties: Dict[str, Expression] = dict(self._parse_properties())
if not self.token.is_closing_bracket():
raise ValueError("Expected closing bracket for relationship definition")
self.set_next_token()
Expand All @@ -512,6 +516,8 @@ def _parse_relationship(self) -> Optional[Relationship]:
if self.token.is_greater_than():
self.set_next_token()
relationship = Relationship()
relationship.direction = direction
relationship.properties = properties
if rel_type is not None and variable is not None:
relationship.identifier = variable
self._variables[variable] = relationship
Expand All @@ -525,6 +531,39 @@ def _parse_relationship(self) -> Optional[Relationship]:
relationship.type = rel_type
return relationship

def _parse_properties(self) -> Iterator[Tuple[str, Expression]]:
parts: int = 0
while True:
self._skip_whitespace_and_comments()
if not self.token.is_opening_brace() and parts == 0:
return
elif not self.token.is_opening_brace() and parts > 0:
raise ValueError("Expected opening brace")
self.set_next_token()
self._skip_whitespace_and_comments()
if not self.token.is_identifier():
raise ValueError("Expected identifier")
key: str = self.token.value or ""
self.set_next_token()
self._skip_whitespace_and_comments()
if not self.token.is_colon():
raise ValueError("Expected colon")
self.set_next_token()
self._skip_whitespace_and_comments()
expression = self._parse_expression()
if expression is None:
raise ValueError("Expected expression")
self._skip_whitespace_and_comments()
if not self.token.is_closing_brace():
raise ValueError("Expected closing brace")
self.set_next_token()
yield (key, expression)
self._skip_whitespace_and_comments()
if not self.token.is_comma():
break
self.set_next_token()
parts += 1

def _parse_relationship_hops(self) -> Optional[Hops]:
if not self.token.is_multiply():
return None
Expand Down
Loading