Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions conf/default/web.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ vnc_host = localhost
# You might need to add your server IP to ALLOWED_HOSTS in web/web/settings.py if it not ["*""]
# vnc or rdp
guest_protocol = vnc
# TIP: For KVM/QEMU, using 'qxl' or 'virtio' video drivers in your VM XML
# TIP: For KVM/QEMU, using 'qxl' or 'virtio' video drivers in your VM XML
# definition provides much better VNC performance than 'vga' or 'cirrus'.
guacd_recording_path = /opt/CAPEv2/storage/guacrecordings
guest_width = 1280
Expand All @@ -215,7 +215,7 @@ rdp_enable_menu_animations = no
# VNC Performance Optimizations
# Color depth: 8, 16, 24, 32. 16 is a great balance for performance.
vnc_color_depth = 16
# Cursor: 'local' renders the mouse on your browser (feels instant).
# Cursor: 'local' renders the mouse on your browser (feels instant).
# 'remote' waits for the server (feels laggy).
vnc_cursor = local
# Audio (enable only if needed, consumes bandwidth)
Expand Down Expand Up @@ -248,4 +248,4 @@ enabled = no
enabled = no

[audit_framework]
enabled = no
enabled = no
14 changes: 7 additions & 7 deletions docs/book/src/usage/mcp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ Scenario B: Remote / Shared Server (SSE)

In this mode, a single MCP server instance runs continuously and accepts connections from multiple clients over the network.

0. **Execution:** Start the server using ``python3 web/mcp_server.py --transport sse``.
0. **Execution:** Start the server using ``poetry run python mcp/server.py --transport sse``.
1. **Configuration:** Start the server **without** a ``CAPE_API_TOKEN`` environment variable.
2. **Strict Mode:** Ensure ``token_auth_enabled = yes`` is set in ``conf/api.conf``.
3. **Usage:** Users **must** provide their API token in the ``token`` argument for every tool call (e.g., ``submit_file(..., token="MyKey")``).
Expand All @@ -142,7 +142,7 @@ Standard execution (Stdio)

.. code-block:: bash

CAPE_API_URL=http://your-cape-ip:8000/apiv2 CAPE_API_TOKEN=your_token python3 web/mcp_server.py
CAPE_API_URL=http://your-cape-ip:8000/apiv2 CAPE_API_TOKEN=your_token poetry run python mcp/server.py

Remote / SSE execution
~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -151,7 +151,7 @@ To run the server as a persistent service accessible over the network:

.. code-block:: bash

python3 web/mcp_server.py --transport sse --port 9004
poetry run python mcp/server.py --transport sse --port 9004

Deployment behind Nginx
~~~~~~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -192,7 +192,7 @@ Add the following to your ``claude_desktop_config.json``:
"mcpServers": {
"cape": {
"command": "poetry",
"args": ["run", "python", "/opt/CAPEv2/web/mcp_server.py"],
"args": ["run", "python", "/opt/CAPEv2/mcp/server.py"],
"env": {
"CAPE_API_URL": "http://127.0.0.1:8000/apiv2",
"CAPE_API_TOKEN": "YOUR_API_TOKEN_HERE",
Expand All @@ -209,7 +209,7 @@ You can add the server using the CLI command:

.. code-block:: bash

gemini mcp add cape poetry run python /opt/CAPEv2/web/mcp_server.py \
gemini mcp add cape poetry run python /opt/CAPEv2/mcp/server.py \
-e CAPE_API_URL=http://127.0.0.1:8000/apiv2 \
-e CAPE_API_TOKEN=YOUR_API_TOKEN_HERE \
-e CAPE_ALLOWED_SUBMISSION_DIR=/home/user/samples
Expand All @@ -222,7 +222,7 @@ Or manually add it to your ``~/.gemini/settings.json``:
"mcpServers": {
"cape": {
"command": "poetry",
"args": ["run", "python", "/opt/CAPEv2/web/mcp_server.py"],
"args": ["run", "python", "/opt/CAPEv2/mcp/server.py"],
"env": {
"CAPE_API_URL": "http://127.0.0.1:8000/apiv2",
"CAPE_API_TOKEN": "YOUR_API_TOKEN_HERE",
Expand All @@ -243,7 +243,7 @@ Open **Agent Panel** -> **...** -> **MCP Servers** -> **Manage MCP Servers** ->
"mcpServers": {
"cape": {
"command": "poetry",
"args": ["run", "python", "/opt/CAPEv2/web/mcp_server.py"],
"args": ["run", "python", "/opt/CAPEv2/mcp/server.py"],
"env": {
"CAPE_API_URL": "http://127.0.0.1:8000/apiv2",
"CAPE_API_TOKEN": "YOUR_API_TOKEN_HERE",
Expand Down
2 changes: 1 addition & 1 deletion lib/cuckoo/common/web_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@ def perform_search(
# Stage 8: Make the task doc the new root
{"$replaceRoot": {"newRoot": "$task_doc"}},
# Stage 9: Add your custom projection
{"$project": perform_search_filters},
{"$project": projection or perform_search_filters},
]
retval = list(mongo_aggregate(FILES_COLL, pipeline))
if not retval:
Expand Down
Empty file added mcp/__init__.py
Empty file.
31 changes: 31 additions & 0 deletions mcp/filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Configuration for MCP server search filters
# You can modify this dictionary to include or exclude specific fields in the lean report
# Injested by Agents to give a quick overview

lean_search_filters = {
"info": 1,
"virustotal_summary": 1,
"detections.family": 1,
"malfamily": 1,
"malfamily_tag": 1,
"malscore": 1,
"network.pcap_sha256": 1,
"network.domains.domain": 1,
"network.http.uri": 1,
"signatures.name": 1,
"signatures.description": 1,
"signatures.severity": 1,
"CAPE": 1,
"behavior.summary.mutexes": 1,
"behavior.summary.executed_commands": 1,
"mlist_cnt": 1,
"f_mlist_cnt": 1,
"target.file.clamav": 1,
"target.file.sha256": 1,
"suri_tls_cnt": 1,
"suri_alert_cnt": 1,
"suri_http_cnt": 1,
"suri_file_cnt": 1,
"trid": 1,
"_id": 0,
}
73 changes: 69 additions & 4 deletions web/mcp_server.py → mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
api_config = Config("api")

# Configuration from Environment or Config File
# Run with: CAPE_API_URL=http://127.0.0.1:8000/apiv2 CAPE_API_TOKEN=your_token python3 web/mcp_server.py
# Run with: CAPE_API_URL=http://127.0.0.1:8000/apiv2 CAPE_API_TOKEN=your_token poetry run python mcp/server.py
API_URL = os.environ.get("CAPE_API_URL")
if not API_URL:
# Try to get from api.conf [api] url
Expand Down Expand Up @@ -344,26 +344,69 @@ async def submit_static(

# --- Task Management & Search ---

def get_lean_cape_report(raw_cape_json):
"""Filters a 50MB CAPE report down to a 500-token LLM payload."""
return {
"score": raw_cape_json.get("info", {}).get("score", 0),
"family": raw_cape_json.get("malfamily") or raw_cape_json.get("detections", {}).get("family") or "Unknown",
"extracted_configs": raw_cape_json.get("CAPE", []),
"high_severity_signatures": [
{"name": sig["name"], "desc": sig["description"]}
for sig in raw_cape_json.get("signatures", [])
if isinstance(sig, dict) and sig.get("severity", 0) >= 3
],
"network": {
"domains": [d["domain"] for d in raw_cape_json.get("network", {}).get("domains", [])] if isinstance(raw_cape_json.get("network", {}).get("domains"), list) else [],
"http_uris": [h["uri"] for h in raw_cape_json.get("network", {}).get("http", [])] if isinstance(raw_cape_json.get("network", {}).get("http"), list) else [],
},
"indicators": {
"mutexes": raw_cape_json.get("behavior", {}).get("summary", {}).get("mutexes", []) if isinstance(raw_cape_json.get("behavior", {}).get("summary"), dict) else [],
"commands": raw_cape_json.get("behavior", {}).get("summary", {}).get("executed_commands", []) if isinstance(raw_cape_json.get("behavior", {}).get("summary"), dict) else []
}
}

def _apply_lean_report(result):
if isinstance(result, dict):
if result.get("error") is False and "data" in result:
if isinstance(result["data"], list):
result["data"] = [get_lean_cape_report(item) for item in result["data"]]
elif isinstance(result["data"], dict):
result["data"] = get_lean_cape_report(result["data"])
elif "info" in result:
return get_lean_cape_report(result)
elif isinstance(result, list):
return [get_lean_cape_report(item) for item in result]
return result

@mcp_tool("tasksearch")
async def search_task(hash_value: str, token: str = "") -> str:
async def search_task(hash_value: str, lean: bool = True, token: str = "") -> str:
"""Search for tasks by MD5, SHA1, or SHA256."""
if not re.match(r"^[a-fA-F0-9]+$", hash_value):
return json.dumps({"error": True, "message": "Invalid hash value provided. Only hexadecimal characters are allowed."}, indent=2)

algo = "md5"
if len(hash_value) == 40:
algo = "sha1"
elif len(hash_value) == 64:
algo = "sha256"

result = await _request("GET", f"tasks/search/{algo}/{hash_value}/", token=token)
if lean:
result = _apply_lean_report(result)
return json.dumps(result, indent=2)

@mcp_tool("extendedtasksearch")
async def extended_search(option: str, argument: str, token: str = "") -> str:
async def extended_search(option: str, argument: str, lean: bool = True, token: str = "") -> str:
"""
Search tasks using extended options.
Options include: id, name, type, string, ssdeep, crc32, file, command, resolvedapi, key, mutex, domain, ip, signature, signame, etc.
"""
data = {"option": option, "argument": argument}
if lean:
data["lean"] = True
result = await _request("POST", "tasks/extendedsearch/", token=token, data=data)
if lean:
result = _apply_lean_report(result)
return json.dumps(result, indent=2)

@mcp_tool("extendedtasksearch")
Expand Down Expand Up @@ -430,7 +473,25 @@ async def get_statistics(days: int = 7, token: str = "") -> str:

@mcp_tool("taskreport")
async def get_task_report(task_id: int, format: str = "json", token: str = "") -> str:
"""Get the analysis report for a task (json, lite, maec, metadata)."""
"""Get the analysis report for a task (json, lite, maec, metadata, lean)."""
allowed_formats = {"json", "lite", "maec", "metadata", "lean"}
if format not in allowed_formats:
return json.dumps({"error": True, "message": f"Invalid format provided. Allowed formats: {', '.join(allowed_formats)}"}, indent=2)

if format == "lean":
data = {"option": "id", "argument": str(task_id), "lean": True}
result = await _request("POST", "tasks/extendedsearch/", token=token, data=data)

# Extract the single task report from the search results
if isinstance(result, dict) and not result.get("error") and isinstance(result.get("data"), list):
if len(result["data"]) > 0:
result["data"] = result["data"][0]
else:
result = {"error": True, "message": "Task report not found via lean search."}

result = _apply_lean_report(result)
return json.dumps(result, indent=2)

result = await _request("GET", f"tasks/get/report/{task_id}/{format}/", token=token)
return json.dumps(result, indent=2)

Expand Down Expand Up @@ -516,11 +577,15 @@ async def download_task_fullmemory(task_id: int, destination: str, token: str =
@mcp_tool("fileview")
async def view_file(hash_value: str, hash_type: str = "sha256", token: str = "") -> str:
"""View information about a file in the database."""
if not re.match(r"^[a-fA-F0-9]+$", hash_value):
return json.dumps({"error": True, "message": "Invalid hash value provided. Only hexadecimal characters are allowed."}, indent=2)
return await _request("GET", f"files/view/{hash_type}/{hash_value}/", token=token)

@mcp_tool("sampledl")
async def download_sample(hash_value: str, destination: str, hash_type: str = "sha256", token: str = "") -> str:
"""Download a sample from the database."""
if not re.match(r"^[a-fA-F0-9]+$", hash_value):
return json.dumps({"error": True, "message": "Invalid hash value provided. Only hexadecimal characters are allowed."}, indent=2)
return await _download_file(f"files/get/{hash_type}/{hash_value}/", destination, f"{hash_value}.bin", token=token)

@mcp_tool("machinelist")
Expand Down
6 changes: 5 additions & 1 deletion web/apiv2/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
statistics,
validate_task,
)

from lib.cuckoo.core.database import Database, _Database
from lib.cuckoo.core.data.task import (
TASK_RECOVERED,
Expand All @@ -61,6 +62,8 @@
)
from lib.cuckoo.core.rooter import _load_socks5_operational, vpns

# from mcp.filters import lean_search_filters
lean_search_filters = {}
try:
import psutil

Expand Down Expand Up @@ -756,7 +759,8 @@ def ext_tasks_search(request):
value = tmp_value
del tmp_value
try:
records = perform_search(term, value, user_id=request.user.id, privs=request.user.is_staff, web=False)
projection = lean_search_filters if request.data.get("lean") else None
records = perform_search(term, value, user_id=request.user.id, privs=request.user.is_staff, web=False, projection=projection)
except ValueError:
if not term:
resp = {"error": True, "error_value": "No option provided."}
Expand Down
Loading