Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
a7eb168
feat: define mem-read schedular message&consumer; add async mem-reade…
CaralHsi Sep 28, 2025
8a24ec7
feat: add fast/fine mode in mem-reader;
CaralHsi Sep 28, 2025
81915a3
feat: add mem-reader in scheduler
CaralHsi Sep 29, 2025
e875ca0
fix: conflict
CaralHsi Oct 13, 2025
b5086e7
feat: change async remove
CaralHsi Oct 13, 2025
b43a9ff
Merge branch 'dev' of github.com:MemTensor/MemOS into feat/Async-add
CaralHsi Oct 14, 2025
0b2649d
feat: modify async-add in core.py
CaralHsi Oct 14, 2025
9dd632f
feat: add 'remove and refresh memory in schedular'
CaralHsi Oct 14, 2025
8c97058
feat: add naive fast mode in mem-reader
CaralHsi Oct 14, 2025
3e08a82
feat: finish fast mode in mem-reader
CaralHsi Oct 15, 2025
37fcff8
feat: add token-based window splitting and concurrency improvements
CaralHsi Oct 16, 2025
5f7e8e0
feat: add split chunker into mode in simple struct mem reader
CaralHsi Oct 16, 2025
2355527
feat: update async-mode add
CaralHsi Oct 16, 2025
2ee4c4c
chore: update gitignore
CaralHsi Oct 16, 2025
593faa5
feat: improve database note write performance
CaralHsi Oct 17, 2025
8d2263a
feat: fix mem-read scheduler
CaralHsi Oct 20, 2025
e250ab8
fix: nebula group-by bug
CaralHsi Oct 20, 2025
14e986e
fix: bug in adding mem scheduler
CaralHsi Oct 20, 2025
1609703
Merge branch 'dev' into feat/Async-add
CaralHsi Oct 21, 2025
31adec0
fix: nebula index; mem-reader chat-time;
CaralHsi Oct 21, 2025
8628a37
fix: conflict
CaralHsi Oct 21, 2025
18f3cc8
format: searcher
CaralHsi Oct 21, 2025
4e0133e
fix: some bug in shceduler and mem-reader
CaralHsi Oct 21, 2025
6653bea
feat: add mem-organize in scheduler
CaralHsi Oct 21, 2025
af5c940
feat: add tree.mode to config; modify scheduler config
CaralHsi Oct 21, 2025
0c6e68b
Merge branch 'dev' into feat/Async-add
CaralHsi Oct 21, 2025
28a20e9
fix: test bug
CaralHsi Oct 21, 2025
b29aa02
Merge branch 'feat/Async-add' of github.com:CaralHsi/MemOSRealPublic …
CaralHsi Oct 21, 2025
d00a553
feat: add organize handler and submit reorganize scheduler
CaralHsi Oct 22, 2025
1f735c5
feat: move all async organization modules in scheduler; add user_name…
CaralHsi Oct 22, 2025
e0f089e
feat: modify tree_textual_memory example
CaralHsi Oct 22, 2025
2ac8201
feat: modify reorganizer and add passing user_name in relation_reason…
CaralHsi Oct 22, 2025
11d4f00
feat: delete reorganize task switch button in core
CaralHsi Oct 28, 2025
87bc80e
feat: fix get candidate nodes; add get neighbors; [TODO]: neo4j and p…
CaralHsi Oct 28, 2025
afa5bcf
feat: update reorganize
CaralHsi Oct 28, 2025
b09552f
fix: mem-reader
CaralHsi Oct 28, 2025
3ad9c8d
feat: update reorganize scheduler
CaralHsi Oct 28, 2025
73ca83b
fix: conflict
CaralHsi Oct 28, 2025
bd47a49
fix: core.py
CaralHsi Oct 28, 2025
20d991b
fix: conflict
CaralHsi Apr 22, 2026
9981feb
Merge pull request #5 from CaralHsi/feat/async/reorganize
CaralHsi Apr 22, 2026
81360d3
format: add comment
CaralHsi Apr 22, 2026
c512491
fix: tiny prompt bug
CaralHsi Apr 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions examples/core_memories/tree_textual_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@
added_ids = my_tree_textual_memory.add(m_list)
for i, id in enumerate(added_ids):
print(f"{i}'th added result is:" + my_tree_textual_memory.get(id).memory)
my_tree_textual_memory.memory_manager.wait_reorganizer()
# wait the synchronous thread
# TODO: USE SCHEDULE MODULE TO WAIT

time.sleep(60)

Expand Down Expand Up @@ -234,7 +235,8 @@

for m_list in doc_memory:
added_ids = my_tree_textual_memory.add(m_list)
my_tree_textual_memory.memory_manager.wait_reorganizer()
# wait the synchronous thread
# TODO: USE SCHEDULE MODULE TO WAIT

results = my_tree_textual_memory.search(
"Tell me about what memos consist of?",
Expand Down Expand Up @@ -273,9 +275,10 @@
print(f"{i}'th similar result is: " + str(r["memory"]))
print(f"Successfully search {len(results)} memories")

# close the synchronous thread in memory manager
my_tree_textual_memory.memory_manager.close()
# close the synchronous thread
# TODO: USE SCHEDULE MODULE TO CLOSE

# my_tree_textual_memory.dump
# Note that you cannot drop this tree when`use_multi_db` ==
# false. my_tree_textual_memory.drop() """
my_tree_textual_memory.dump("tmp/my_tree_textual_memory")
my_tree_textual_memory.drop()
20 changes: 0 additions & 20 deletions src/memos/mem_os/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,25 +166,6 @@ def mem_scheduler_off(self) -> bool:
logger.error(f"Failed to stop scheduler: {e!s}")
return False

def mem_reorganizer_on(self) -> bool:
pass

def mem_reorganizer_off(self) -> bool:
"""temporally implement"""
for mem_cube in self.mem_cubes.values():
logger.info(f"try to close reorganizer for {mem_cube.text_mem.config.cube_id}")
if mem_cube.text_mem and mem_cube.text_mem.is_reorganize:
logger.info(f"close reorganizer for {mem_cube.text_mem.config.cube_id}")
mem_cube.text_mem.memory_manager.close()
mem_cube.text_mem.memory_manager.wait_reorganizer()

def mem_reorganizer_wait(self) -> bool:
for mem_cube in self.mem_cubes.values():
logger.info(f"try to close reorganizer for {mem_cube.text_mem.config.cube_id}")
if mem_cube.text_mem and mem_cube.text_mem.is_reorganize:
logger.info(f"close reorganizer for {mem_cube.text_mem.config.cube_id}")
mem_cube.text_mem.memory_manager.wait_reorganizer()

def _register_chat_history(
self, user_id: str | None = None, session_id: str | None = None
) -> None:
Expand Down Expand Up @@ -730,7 +711,6 @@ def add(

if mem_cube_id not in self.mem_cubes:
raise ValueError(f"MemCube '{mem_cube_id}' is not loaded. Please register.")

sync_mode = self.mem_cubes[mem_cube_id].text_mem.mode
if sync_mode == "async":
assert self.mem_scheduler is not None, (
Expand Down
5 changes: 5 additions & 0 deletions src/memos/memories/textual/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ class SourceMessage(BaseModel):
image_info: dict | None = None
model_config = ConfigDict(extra="allow")

@property
def content_safe(self) -> str:
"""Always return a string, fallback to '' if content is None."""
return self.content or ""


class ArchivedTextualMemory(BaseModel):
"""
Expand Down
130 changes: 2 additions & 128 deletions src/memos/memories/textual/tree_text_memory/organize/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@
from memos.graph_dbs.neo4j import Neo4jGraphDB
from memos.llms.factory import AzureLLM, OllamaLLM, OpenAILLM
from memos.log import get_logger
from memos.memories.textual.item import TextualMemoryItem, TreeNodeTextualMemoryMetadata
from memos.memories.textual.tree_text_memory.organize.reorganizer import (
GraphStructureReorganizer,
QueueMessage,
)
from memos.memories.textual.item import TextualMemoryItem
from memos.memories.textual.tree_text_memory.organize.reorganizer import GraphStructureReorganizer


logger = get_logger(__name__)
Expand Down Expand Up @@ -59,7 +56,6 @@ def __init__(
llm: OpenAILLM | OllamaLLM | AzureLLM,
memory_size: dict | None = None,
threshold: float | None = 0.80,
merged_threshold: float | None = 0.92,
is_reorganize: bool = False,
):
self.graph_store = graph_store
Expand All @@ -84,7 +80,6 @@ def __init__(
self.reorganizer = GraphStructureReorganizer(
graph_store, llm, embedder, is_reorganize=is_reorganize
)
self._merged_threshold = merged_threshold

def add(
self,
Expand Down Expand Up @@ -235,12 +230,6 @@ def _submit_batches(nodes: list[dict], node_kind: str) -> None:
)

_submit_batches(graph_nodes, "graph memory")

if graph_node_ids and self.is_reorganize:
self.reorganizer.add_message(
QueueMessage(op="add", after_node=graph_node_ids, user_name=user_name)
)

return added_ids

def _cleanup_working_memory(self, user_name: str | None = None) -> None:
Expand Down Expand Up @@ -409,107 +398,6 @@ def _add_to_graph_memory(
metadata_dict,
user_name=user_name,
)
self.reorganizer.add_message(
QueueMessage(
op="add",
after_node=[node_id],
user_name=user_name,
)
)
return node_id

def _inherit_edges(self, from_id: str, to_id: str, user_name: str | None = None) -> None:
"""
Migrate all non-lineage edges from `from_id` to `to_id`,
and remove them from `from_id` after copying.
"""
edges = self.graph_store.get_edges(
from_id, type="ANY", direction="ANY", user_name=user_name
)

for edge in edges:
if edge["type"] == "MERGED_TO":
continue # Keep lineage edges

new_from = to_id if edge["from"] == from_id else edge["from"]
new_to = to_id if edge["to"] == from_id else edge["to"]

if new_from == new_to:
continue

# Add edge to merged node if it doesn't already exist
if not self.graph_store.edge_exists(
new_from, new_to, edge["type"], direction="ANY", user_name=user_name
):
self.graph_store.add_edge(new_from, new_to, edge["type"], user_name=user_name)

# Remove original edge if it involved the archived node
self.graph_store.delete_edge(
edge["from"], edge["to"], edge["type"], user_name=user_name
)

def _ensure_structure_path(
self,
memory_type: str,
metadata: TreeNodeTextualMemoryMetadata,
user_name: str | None = None,
) -> str:
"""
Ensure structural path exists (ROOT → ... → final node), return last node ID.

Args:
memory_type: Memory type for the structure node.
metadata: Metadata containing key and other fields.
user_name: Optional user name for multi-tenant isolation.

Returns:
Final node ID of the structure path.
"""
# Step 1: Try to find an existing memory node with content == tag
existing = self.graph_store.get_by_metadata(
[
{"field": "memory", "op": "=", "value": metadata.key},
{"field": "memory_type", "op": "=", "value": memory_type},
],
user_name=user_name,
)
if existing:
node_id = existing[0] # Use the first match
else:
# Step 2: If not found, create a new structure node
new_node = TextualMemoryItem(
memory=metadata.key,
metadata=TreeNodeTextualMemoryMetadata(
user_id=metadata.user_id,
session_id=metadata.session_id,
memory_type=memory_type,
status="activated",
tags=[],
key=metadata.key,
embedding=self.embedder.embed([metadata.key])[0],
usage=[],
sources=[],
confidence=0.99,
background="",
),
)
self.graph_store.add_node(
new_node.id,
new_node.memory,
new_node.metadata.model_dump(exclude_none=True),
user_name=user_name,
)
self.reorganizer.add_message(
QueueMessage(
op="add",
after_node=[new_node.id],
user_name=user_name,
)
)

node_id = new_node.id

# Step 3: Return this structure node ID as the parent_id
return node_id

def remove_and_refresh_memory(self, user_name: str | None = None):
Expand Down Expand Up @@ -537,17 +425,3 @@ def _cleanup_memories_if_needed(self, user_name: str | None = None) -> None:
logger.debug(f"Cleaned up {memory_type}: {current_count} -> {limit}")
except Exception:
logger.warning(f"Remove {memory_type} error: {traceback.format_exc()}")

def wait_reorganizer(self):
"""
Wait for the reorganizer to finish processing all messages.
"""
logger.debug("Waiting for reorganizer to finish processing messages...")
self.reorganizer.wait_until_current_task_done()

def close(self):
self.wait_reorganizer()
self.reorganizer.stop()

def __del__(self):
self.close()
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import json
import traceback

from memos.embedders.factory import OllamaEmbedder
from memos.embedders.base import BaseEmbedder
from memos.graph_dbs.base import BaseGraphDB
from memos.graph_dbs.item import GraphDBNode
from memos.graph_dbs.neo4j import Neo4jGraphDB
from memos.llms.base import BaseLLM
from memos.log import get_logger
from memos.memories.textual.item import TreeNodeTextualMemoryMetadata
Expand All @@ -18,12 +18,18 @@


class RelationAndReasoningDetector:
def __init__(self, graph_store: Neo4jGraphDB, llm: BaseLLM, embedder: OllamaEmbedder):
def __init__(self, graph_store: BaseGraphDB, llm: BaseLLM, embedder: BaseEmbedder):
self.graph_store = graph_store
self.llm = llm
self.embedder = embedder

def process_node(self, node: GraphDBNode, exclude_ids: list[str], top_k: int = 5):
def process_node(
self,
node: GraphDBNode,
exclude_ids: list[str],
top_k: int = 5,
user_name: str | None = None,
):
"""
Unified pipeline for:
1) Pairwise relations (cause, condition, conflict, relate)
Expand Down Expand Up @@ -52,6 +58,7 @@ def process_node(self, node: GraphDBNode, exclude_ids: list[str], top_k: int = 5
exclude_ids=exclude_ids,
top_k=top_k,
min_overlap=2,
user_name=user_name,
)
nearest = [GraphDBNode(**cand_data) for cand_data in nearest]

Expand All @@ -62,7 +69,7 @@ def process_node(self, node: GraphDBNode, exclude_ids: list[str], top_k: int = 5

"""
# 2) Inferred nodes (from causal/condition)
inferred = self._infer_fact_nodes_from_relations(pairwise)
inferred = self._infer_fact_nodes_from_relations(pairwise, user_name=user_name)
results["inferred_nodes"].extend(inferred)
"""

Expand Down Expand Up @@ -115,12 +122,18 @@ def _detect_pairwise_causal_condition_relations(

return results

def _infer_fact_nodes_from_relations(self, pairwise_results: dict):
def _infer_fact_nodes_from_relations(self, pairwise_results: dict, user_name: str):
inferred_nodes = []
for rel in pairwise_results["relations"]:
if rel["relation_type"] in ("CAUSE", "CONDITION"):
src = self.graph_store.get_node(rel["source_id"])
tgt = self.graph_store.get_node(rel["target_id"])
src = self.graph_store.get_node(
rel["source_id"],
user_name=user_name,
)
tgt = self.graph_store.get_node(
rel["target_id"],
user_name=user_name,
)
if not src or not tgt:
continue

Expand Down
Loading
Loading