Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/memos/api/product_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,17 @@ class APIADDRequest(BaseRequest):
description=("Whether this request represents user feedback. Default: False."),
)

# ==== Upload skill flag ====
is_upload_skill: bool = Field(
False,
description=(
"Whether this request is an upload skill request. "
"When True, the messages field should contain file items "
"with zip file download URLs for pre-built skill packages. "
"Default: False."
),
)

# ==== Backward compatibility fields (will delete later) ====
mem_cube_id: str | None = Field(
None,
Expand Down
18 changes: 16 additions & 2 deletions src/memos/mem_reader/multi_modal_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,7 @@ def _process_multi_modal_data(
return fast_memory_items

# Stage: llm_extract — fine mode 4-way parallel LLM + per-source serial
is_upload_skill = kwargs.pop("is_upload_skill", False)
non_file_url_fast_items = [
item for item in fast_memory_items if not self._is_file_url_only_item(item)
]
Expand All @@ -1035,14 +1036,17 @@ def _process_multi_modal_data(
)
future_skill = executor.submit(
process_skill_memory_fine,
fast_memory_items=non_file_url_fast_items,
fast_memory_items=fast_memory_items
if is_upload_skill
else non_file_url_fast_items,
info=info,
searcher=self.searcher,
graph_db=self.graph_db,
llm=self.general_llm,
embedder=self.embedder,
oss_config=self.oss_config,
skills_dir_config=self.skills_dir_config,
is_upload_skill=is_upload_skill,
**kwargs,
)
future_pref = executor.submit(
Expand All @@ -1065,6 +1069,10 @@ def _process_multi_modal_data(
fine_memory_items.extend(fine_memory_items_pref_parser)

# Part B: per-source serial processing
if is_upload_skill:
# (skip for upload skill to avoid zip being parsed)
return fine_memory_items

with timed_stage("add", "per_source") as ts_ps:
for fast_item in fast_memory_items:
sources = fast_item.metadata.sources
Expand Down Expand Up @@ -1098,6 +1106,8 @@ def _process_transfer_multi_modal_data(
logger.warning("[MultiModalStruct] No raw nodes found.")
return []

is_upload_skill = kwargs.pop("is_upload_skill", False)

# Extract info from raw_nodes (same as simple_struct.py)
info = {
"user_id": raw_nodes[0].metadata.user_id,
Expand All @@ -1119,14 +1129,15 @@ def _process_transfer_multi_modal_data(
)
future_skill = executor.submit(
process_skill_memory_fine,
non_file_url_nodes,
raw_nodes if is_upload_skill else non_file_url_nodes,
info,
searcher=self.searcher,
llm=self.general_llm,
embedder=self.embedder,
graph_db=self.graph_db,
oss_config=self.oss_config,
skills_dir_config=self.skills_dir_config,
is_upload_skill=is_upload_skill,
**kwargs,
)
# Add preference memory extraction
Expand All @@ -1151,6 +1162,9 @@ def _process_transfer_multi_modal_data(
fine_memory_items.extend(fine_memory_items_pref_parser)

# Part B: get fine multimodal items
if is_upload_skill:
# (skip for upload skill to avoid zip being parsed)
return fine_memory_items
for raw_node in raw_nodes:
sources = raw_node.metadata.sources
for source in sources:
Expand Down
55 changes: 17 additions & 38 deletions src/memos/mem_reader/read_skill_memory/process_skill_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,36 +252,6 @@ def create_task(skill_mem, gen_type, prompt, requirements, context, **kwargs):
return [item[0] for item in raw_skills_data]


def add_id_to_mysql(memory_id: str, mem_cube_id: str):
"""Add id to mysql, will deprecate this function in the future"""
# TODO: tmp function, deprecate soon
import requests

skill_mysql_url = os.getenv("SKILLS_MYSQL_URL", "")
skill_mysql_bearer = os.getenv("SKILLS_MYSQL_BEARER", "")

if not skill_mysql_url or not skill_mysql_bearer:
logger.warning("[PROCESS_SKILLS] SKILLS_MYSQL_URL or SKILLS_MYSQL_BEARER is not set")
return None
headers = {"Authorization": skill_mysql_bearer, "Content-Type": "application/json"}
data = {"memCubeId": mem_cube_id, "skillId": memory_id}
try:
response = requests.post(skill_mysql_url, headers=headers, json=data)

logger.info(f"[PROCESS_SKILLS] response: \n\n{response.json()}")
logger.info(f"[PROCESS_SKILLS] memory_id: \n\n{memory_id}")
logger.info(f"[PROCESS_SKILLS] mem_cube_id: \n\n{mem_cube_id}")
logger.info(f"[PROCESS_SKILLS] skill_mysql_url: \n\n{skill_mysql_url}")
logger.info(f"[PROCESS_SKILLS] skill_mysql_bearer: \n\n{skill_mysql_bearer}")
logger.info(f"[PROCESS_SKILLS] headers: \n\n{headers}")
logger.info(f"[PROCESS_SKILLS] data: \n\n{data}")

return response.json()
except Exception as e:
logger.warning(f"[PROCESS_SKILLS] Error adding id to mysql: {e}")
return None


@require_python_package(
import_name="alibabacloud_oss_v2",
install_command="pip install alibabacloud-oss-v2",
Expand Down Expand Up @@ -948,6 +918,7 @@ def create_skill_memory_item(
scripts=skill_memory.get("scripts"),
others=skill_memory.get("others"),
url=skill_memory.get("url", ""),
skill_source=skill_memory.get("skill_source"),
manager_user_id=manager_user_id,
project_id=project_id,
)
Expand Down Expand Up @@ -1024,6 +995,21 @@ def process_skill_memory_fine(
complete_skill_memory: bool = True,
**kwargs,
) -> list[TextualMemoryItem]:
is_upload_skill = kwargs.pop("is_upload_skill", False)
if is_upload_skill:
from memos.mem_reader.read_skill_memory.upload_skill_memory import (
process_upload_skill_memory,
)

return process_upload_skill_memory(
fast_memory_items=fast_memory_items,
info=info,
embedder=embedder,
oss_config=oss_config,
skills_dir_config=skills_dir_config,
**kwargs,
)

skills_repo_backend = _get_skill_file_storage_location()
oss_client, _missing_keys, flag = _skill_init(
skills_repo_backend, oss_config, skills_dir_config
Expand Down Expand Up @@ -1253,6 +1239,7 @@ def _full_extract():
if source:
skill_sources.append(source)

skill_memory["skill_source"] = "auto_create"
memory_item = create_skill_memory_item(
skill_memory, info, embedder, sources=skill_sources, **kwargs
)
Expand All @@ -1261,12 +1248,4 @@ def _full_extract():
logger.warning(f"[PROCESS_SKILLS] Error creating skill memory item: {e}")
continue

# TODO: deprecate this funtion and call
for skill_memory, skill_memory_item in zip(skill_memories, skill_memory_items, strict=False):
if skill_memory.get("update", False) and skill_memory.get("old_memory_id", ""):
continue
add_id_to_mysql(
memory_id=skill_memory_item.id,
mem_cube_id=kwargs.get("user_name", info.get("user_id", "")),
)
return skill_memory_items
Loading
Loading