diff --git a/src/memos/api/product_models.py b/src/memos/api/product_models.py index 78dcfc797..5cb1e24c7 100644 --- a/src/memos/api/product_models.py +++ b/src/memos/api/product_models.py @@ -608,6 +608,17 @@ class APIADDRequest(BaseRequest): description=("Whether this request represents user feedback. Default: False."), ) + # ==== Upload skill flag ==== + is_upload_skill: bool = Field( + False, + description=( + "Whether this request is an upload skill request. " + "When True, the messages field should contain file items " + "with zip file download URLs for pre-built skill packages. " + "Default: False." + ), + ) + # ==== Backward compatibility fields (will delete later) ==== mem_cube_id: str | None = Field( None, diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index 2edede76d..da3d54730 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -1019,6 +1019,7 @@ def _process_multi_modal_data( return fast_memory_items # Stage: llm_extract — fine mode 4-way parallel LLM + per-source serial + is_upload_skill = kwargs.pop("is_upload_skill", False) non_file_url_fast_items = [ item for item in fast_memory_items if not self._is_file_url_only_item(item) ] @@ -1035,7 +1036,9 @@ def _process_multi_modal_data( ) future_skill = executor.submit( process_skill_memory_fine, - fast_memory_items=non_file_url_fast_items, + fast_memory_items=fast_memory_items + if is_upload_skill + else non_file_url_fast_items, info=info, searcher=self.searcher, graph_db=self.graph_db, @@ -1043,6 +1046,7 @@ def _process_multi_modal_data( embedder=self.embedder, oss_config=self.oss_config, skills_dir_config=self.skills_dir_config, + is_upload_skill=is_upload_skill, **kwargs, ) future_pref = executor.submit( @@ -1065,6 +1069,10 @@ def _process_multi_modal_data( fine_memory_items.extend(fine_memory_items_pref_parser) # Part B: per-source serial processing + if is_upload_skill: + # (skip for upload skill to avoid zip being parsed) + return fine_memory_items + with timed_stage("add", "per_source") as ts_ps: for fast_item in fast_memory_items: sources = fast_item.metadata.sources @@ -1098,6 +1106,8 @@ def _process_transfer_multi_modal_data( logger.warning("[MultiModalStruct] No raw nodes found.") return [] + is_upload_skill = kwargs.pop("is_upload_skill", False) + # Extract info from raw_nodes (same as simple_struct.py) info = { "user_id": raw_nodes[0].metadata.user_id, @@ -1119,7 +1129,7 @@ def _process_transfer_multi_modal_data( ) future_skill = executor.submit( process_skill_memory_fine, - non_file_url_nodes, + raw_nodes if is_upload_skill else non_file_url_nodes, info, searcher=self.searcher, llm=self.general_llm, @@ -1127,6 +1137,7 @@ def _process_transfer_multi_modal_data( graph_db=self.graph_db, oss_config=self.oss_config, skills_dir_config=self.skills_dir_config, + is_upload_skill=is_upload_skill, **kwargs, ) # Add preference memory extraction @@ -1151,6 +1162,9 @@ def _process_transfer_multi_modal_data( fine_memory_items.extend(fine_memory_items_pref_parser) # Part B: get fine multimodal items + if is_upload_skill: + # (skip for upload skill to avoid zip being parsed) + return fine_memory_items for raw_node in raw_nodes: sources = raw_node.metadata.sources for source in sources: diff --git a/src/memos/mem_reader/read_skill_memory/process_skill_memory.py b/src/memos/mem_reader/read_skill_memory/process_skill_memory.py index 0b0c04252..ec27cbae5 100644 --- a/src/memos/mem_reader/read_skill_memory/process_skill_memory.py +++ b/src/memos/mem_reader/read_skill_memory/process_skill_memory.py @@ -252,36 +252,6 @@ def create_task(skill_mem, gen_type, prompt, requirements, context, **kwargs): return [item[0] for item in raw_skills_data] -def add_id_to_mysql(memory_id: str, mem_cube_id: str): - """Add id to mysql, will deprecate this function in the future""" - # TODO: tmp function, deprecate soon - import requests - - skill_mysql_url = os.getenv("SKILLS_MYSQL_URL", "") - skill_mysql_bearer = os.getenv("SKILLS_MYSQL_BEARER", "") - - if not skill_mysql_url or not skill_mysql_bearer: - logger.warning("[PROCESS_SKILLS] SKILLS_MYSQL_URL or SKILLS_MYSQL_BEARER is not set") - return None - headers = {"Authorization": skill_mysql_bearer, "Content-Type": "application/json"} - data = {"memCubeId": mem_cube_id, "skillId": memory_id} - try: - response = requests.post(skill_mysql_url, headers=headers, json=data) - - logger.info(f"[PROCESS_SKILLS] response: \n\n{response.json()}") - logger.info(f"[PROCESS_SKILLS] memory_id: \n\n{memory_id}") - logger.info(f"[PROCESS_SKILLS] mem_cube_id: \n\n{mem_cube_id}") - logger.info(f"[PROCESS_SKILLS] skill_mysql_url: \n\n{skill_mysql_url}") - logger.info(f"[PROCESS_SKILLS] skill_mysql_bearer: \n\n{skill_mysql_bearer}") - logger.info(f"[PROCESS_SKILLS] headers: \n\n{headers}") - logger.info(f"[PROCESS_SKILLS] data: \n\n{data}") - - return response.json() - except Exception as e: - logger.warning(f"[PROCESS_SKILLS] Error adding id to mysql: {e}") - return None - - @require_python_package( import_name="alibabacloud_oss_v2", install_command="pip install alibabacloud-oss-v2", @@ -948,6 +918,7 @@ def create_skill_memory_item( scripts=skill_memory.get("scripts"), others=skill_memory.get("others"), url=skill_memory.get("url", ""), + skill_source=skill_memory.get("skill_source"), manager_user_id=manager_user_id, project_id=project_id, ) @@ -1024,6 +995,21 @@ def process_skill_memory_fine( complete_skill_memory: bool = True, **kwargs, ) -> list[TextualMemoryItem]: + is_upload_skill = kwargs.pop("is_upload_skill", False) + if is_upload_skill: + from memos.mem_reader.read_skill_memory.upload_skill_memory import ( + process_upload_skill_memory, + ) + + return process_upload_skill_memory( + fast_memory_items=fast_memory_items, + info=info, + embedder=embedder, + oss_config=oss_config, + skills_dir_config=skills_dir_config, + **kwargs, + ) + skills_repo_backend = _get_skill_file_storage_location() oss_client, _missing_keys, flag = _skill_init( skills_repo_backend, oss_config, skills_dir_config @@ -1253,6 +1239,7 @@ def _full_extract(): if source: skill_sources.append(source) + skill_memory["skill_source"] = "auto_create" memory_item = create_skill_memory_item( skill_memory, info, embedder, sources=skill_sources, **kwargs ) @@ -1261,12 +1248,4 @@ def _full_extract(): logger.warning(f"[PROCESS_SKILLS] Error creating skill memory item: {e}") continue - # TODO: deprecate this funtion and call - for skill_memory, skill_memory_item in zip(skill_memories, skill_memory_items, strict=False): - if skill_memory.get("update", False) and skill_memory.get("old_memory_id", ""): - continue - add_id_to_mysql( - memory_id=skill_memory_item.id, - mem_cube_id=kwargs.get("user_name", info.get("user_id", "")), - ) return skill_memory_items diff --git a/src/memos/mem_reader/read_skill_memory/upload_skill_memory.py b/src/memos/mem_reader/read_skill_memory/upload_skill_memory.py new file mode 100644 index 000000000..2f8f6b42e --- /dev/null +++ b/src/memos/mem_reader/read_skill_memory/upload_skill_memory.py @@ -0,0 +1,284 @@ +import re +import shutil +import tempfile +import zipfile + +from pathlib import Path +from typing import Any +from uuid import uuid4 + +import requests + +from memos.embedders.base import BaseEmbedder +from memos.log import get_logger +from memos.mem_reader.read_skill_memory.process_skill_memory import ( + create_skill_memory_item, +) +from memos.memories.textual.item import TextualMemoryItem +from memos.utils import timed + + +logger = get_logger(__name__) + +_TEXT_MAX_LEN = 20 + + +def _truncate(text: str) -> str: + """Truncate a string to at most ``_TEXT_MAX_LEN`` characters.""" + return text[:_TEXT_MAX_LEN] + + +def _extract_zip_url_from_items(items: list[TextualMemoryItem]) -> str | None: + """ + Extract the zip download URL from fast-stage memory items. + + FileContentParser.parse_fast stores the URL in source.file_info["file_data"]. + Each upload-skill request contains exactly one zip URL. + """ + for item in items: + for source in getattr(item.metadata, "sources", None) or []: + file_info = getattr(source, "file_info", None) + if not isinstance(file_info, dict): + continue + file_data = file_info.get("file_data", "") + if ( + isinstance(file_data, str) + and file_data.startswith(("http://", "https://")) + and file_data.lower().endswith(".zip") + ): + return file_data + return None + + +def _download_zip(url: str, tmp_dir: Path) -> Path: + """Download a zip file to a local temporary directory.""" + try: + resp = requests.get(url, stream=True, timeout=60) + resp.raise_for_status() + except Exception as e: + raise ValueError(f"Failed to download zip from {url}: {e}") from e + + zip_path = tmp_dir / f"{uuid4()}.zip" + with open(zip_path, "wb") as f: + for chunk in resp.iter_content(chunk_size=8192): + f.write(chunk) + + if not zipfile.is_zipfile(zip_path): + raise ValueError(f"Downloaded file is not a valid zip: {url}") + + return zip_path + + +def _extract_and_parse_skill_zip(zip_path: Path) -> dict[str, Any]: + """ + Extract a skill zip and parse SKILL.md + directory contents into a skill_memory dict. + + The SKILL.md format mirrors the output of ``_write_skills_to_file`` in + ``process_skill_memory.py``. Section headings at any level (``#`` through + ``######``) are matched by title text (case-insensitive). + """ + # Step 1: extract & locate SKILL.md + extract_dir = zip_path.parent / zip_path.stem + with zipfile.ZipFile(zip_path, "r") as zf: + zf.extractall(extract_dir) + + skill_md_path = None + for candidate in extract_dir.rglob("SKILL.md"): + skill_md_path = candidate + break + + if skill_md_path is None: + raise FileNotFoundError(f"SKILL.md not found in zip: {zip_path.name}") + + skill_root = skill_md_path.parent + raw_text = skill_md_path.read_text(encoding="utf-8") + + # Step 2: parse frontmatter → name, description + name = "" + description = "" + fm_match = re.match(r"^---\s*\n(.*?)\n---", raw_text, re.DOTALL) + if fm_match: + for line in fm_match.group(1).splitlines(): + if line.startswith("name:"): + name = line[len("name:") :].strip() + elif line.startswith("description:"): + description = line[len("description:") :].strip() + + if not name: + name = zip_path.stem + + # Step 3: split body by any-level heading and parse each section + trigger: str = "" + procedure: str = "" + experience: list[str] = [] + preference: list[str] = [] + examples: list[str] = [] + tool: str | None = None + others_inline: dict[str, str] = {} + + known_sections = { + "trigger", + "procedure", + "experience", + "user preferences", + "examples", + "scripts", + "tool usage", + "additional information", + } + + body = raw_text[fm_match.end() :] if fm_match else raw_text + sections = re.split(r"\n(?=#{1,6}\s)", body) + + for section in sections: + section = section.strip() + if not section: + continue + + heading_match = re.match(r"^(#{1,6})\s+(.*)", section) + if not heading_match: + continue + + title = heading_match.group(2).strip() + content = section[heading_match.end() :].strip() + title_lower = title.lower() + + if title_lower not in known_sections: + logger.warning("[UPLOAD_SKILL] Unknown section '%s' in SKILL.md, skipping", title) + continue + + if title_lower == "trigger": + trigger = content + + elif title_lower == "procedure": + procedure = content + + elif title_lower == "experience": + items = re.findall(r"^\d+\.\s+(.+)$", content, re.MULTILINE) + experience = [item.strip() for item in items] if items else [] + + elif title_lower == "user preferences": + items = re.findall(r"^-\s+(.+)$", content, re.MULTILINE) + preference = [item.strip() for item in items] if items else [] + + elif title_lower == "examples": + blocks = re.findall(r"```markdown\n(.*?)\n```", content, re.DOTALL) + examples = [b.strip() for b in blocks] + + elif title_lower == "scripts": + pass + + elif title_lower == "tool usage": + tool = content.strip() if content.strip() else None + + elif title_lower == "additional information": + sub_sections = re.split(r"\n(?=#{1,6}\s)", content) + for sub in sub_sections: + sub = sub.strip() + if not sub or sub.startswith("See also:"): + continue + sub_heading = re.match(r"^(#{1,6})\s+(.*)", sub) + if not sub_heading: + continue + sub_key = sub_heading.group(2).strip() + sub_val = sub[sub_heading.end() :].strip() + if sub_val: + others_inline[sub_key] = sub_val + + # Step 4: read scripts/ directory + scripts: dict[str, str] | None = None + scripts_dir = skill_root / "scripts" + if scripts_dir.is_dir(): + scripts = {} + for py_file in scripts_dir.glob("*.py"): + scripts[py_file.name] = py_file.read_text(encoding="utf-8") + + # Step 5: read reference/ directory → merge into others + others = dict(others_inline) + reference_dir = skill_root / "reference" + if reference_dir.is_dir(): + for md_file in reference_dir.glob("*.md"): + others[md_file.name] = md_file.read_text(encoding="utf-8") + + # Step 6: truncate text fields & assemble return dict + truncated_trigger = _truncate(trigger) + + result: dict[str, Any] = { + "name": name, + "description": description, + "tags": [truncated_trigger] if truncated_trigger else [], + "procedure": _truncate(procedure), + "experience": [_truncate(e) for e in experience], + "preference": [_truncate(p) for p in preference], + "examples": [_truncate(e) for e in examples], + "tool": _truncate(tool) if tool else None, + "scripts": {k: _truncate(v) for k, v in scripts.items()} if scripts else None, + "others": {k: _truncate(v) for k, v in others.items()} if others else None, + } + # Only include trigger when non-empty; create_skill_memory_item uses + # `skill_memory.get("tags") or skill_memory.get("trigger", [])`, + # an empty-string trigger would override the correct [] fallback. + if truncated_trigger: + result["trigger"] = truncated_trigger + return result + + +@timed +def process_upload_skill_memory( + fast_memory_items: list[TextualMemoryItem], + info: dict[str, Any], + embedder: BaseEmbedder | None = None, + oss_config: dict[str, Any] | None = None, + skills_dir_config: dict[str, Any] | None = None, + **kwargs, +) -> list[TextualMemoryItem]: + """ + Process a user-uploaded skill zip, parse it, and build a SkillMemory node. + + The zip URL is taken from the fast-stage ``TextualMemoryItem`` sources + (``source.file_info["file_data"]``), consistent with both sync-fine and + async-transfer paths. + """ + zip_url = _extract_zip_url_from_items(fast_memory_items) + if not zip_url: + logger.warning("[UPLOAD_SKILL] No zip URL found in fast_memory_items") + return [] + + tmp_dir = Path(tempfile.mkdtemp(prefix="upload_skill_")) + try: + zip_path = _download_zip(zip_url, tmp_dir) + except Exception as e: + logger.warning("[UPLOAD_SKILL] Failed to download zip: %s", e) + shutil.rmtree(tmp_dir, ignore_errors=True) + return [] + + try: + skill_memory = _extract_and_parse_skill_zip(zip_path) + except FileNotFoundError as e: + logger.warning("[UPLOAD_SKILL] %s", e) + shutil.rmtree(tmp_dir, ignore_errors=True) + return [] + except Exception as e: + logger.error("[UPLOAD_SKILL] Failed to parse skill zip: %s", e) + shutil.rmtree(tmp_dir, ignore_errors=True) + return [] + + skill_memory["url"] = zip_url + skill_memory["skill_source"] = "user_upload" + + try: + skill_memory_item = create_skill_memory_item(skill_memory, info, embedder, **kwargs) + except Exception as e: + logger.error("[UPLOAD_SKILL] Failed to create skill memory item: %s", e) + shutil.rmtree(tmp_dir, ignore_errors=True) + return [] + + # Cleanup temp files + shutil.rmtree(tmp_dir, ignore_errors=True) + + logger.info( + "[UPLOAD_SKILL] Successfully created SkillMemory from uploaded zip: name=%s, id=%s", + skill_memory.get("name"), + skill_memory_item.id, + ) + return [skill_memory_item] diff --git a/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py b/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py index 20dbb63b2..c7b01a53d 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py +++ b/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py @@ -163,6 +163,9 @@ def _process_memories_with_reader( logger.info("Processing %s memories with mem_reader", len(memory_items)) + info = dict(info or {}) + is_upload_skill = info.pop("is_upload_skill", False) + try: processed_memories = mem_reader.fine_transfer_simple_mem( memory_items, @@ -171,6 +174,7 @@ def _process_memories_with_reader( user_name=user_name, chat_history=chat_history, user_context=user_context, + is_upload_skill=is_upload_skill, ) except Exception as e: logger.warning("%s: Fail to transfer mem: %s", e, memory_items) diff --git a/src/memos/multi_mem_cube/single_cube.py b/src/memos/multi_mem_cube/single_cube.py index 1a8b7092a..79e3837e4 100644 --- a/src/memos/multi_mem_cube/single_cube.py +++ b/src/memos/multi_mem_cube/single_cube.py @@ -529,7 +529,10 @@ def _schedule_memory_tasks( content=json.dumps(mem_ids), timestamp=datetime.utcnow(), user_name=self.cube_id, - info=add_req.info, + info={ + **(add_req.info or {}), + "is_upload_skill": getattr(add_req, "is_upload_skill", False), + }, chat_history=add_req.chat_history, user_context=user_context, ) @@ -709,6 +712,7 @@ def _process_text_mem( user_name=user_context.mem_cube_id, chat_history=add_req.chat_history, user_context=user_context, + is_upload_skill=getattr(add_req, "is_upload_skill", False), ) get_memory_ms = ts_gm.duration_ms flattened_local = [mm for m in memories_local for mm in m]