Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- MCP entry validation hardened (security): names must match `^[a-zA-Z0-9@_][a-zA-Z0-9._@/:=-]{0,127}$`, URLs must use `http` or `https` schemes, headers reject CR/LF in keys and values, self-defined stdio commands rejected if they contain path-traversal sequences. Migration: most existing `apm.yml` files unaffected; if you hit `Invalid MCP name`, the error message now includes a valid positive example (e.g. `io.github.acme/cool-server` or `my-server`) to pattern against. (#807)
- Strict-by-default transport selection: explicit `ssh://`/`https://` URLs no longer silently fall back to the other protocol; shorthand consults `git config url.<base>.insteadOf` and otherwise defaults to HTTPS. Set `APM_ALLOW_PROTOCOL_FALLBACK=1` (or pass `--allow-protocol-fallback`) to restore the legacy permissive chain; cross-protocol retries then emit a `[!]` warning. Closes #328 (#778)
- Self-defined stdio MCP entries with `command` containing whitespace and no `args` are now rejected at parse time with a fix-it error pointing at the canonical `command: <binary>, args: [<token>, ...]` shape. Previously silently accepted; APM never split `command` on whitespace, so the loose shape mis-executed downstream. Closes #806, refs #122 (#809)

### Changed

Expand Down
3 changes: 2 additions & 1 deletion docs/src/content/docs/reference/manifest-schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ A plain registry reference: `io.github.github/github-mcp-server`
| `headers` | `map<string, string>` | OPTIONAL | | Custom HTTP headers for remote endpoints. Values may contain `${input:<id>}` references (VS Code only — see §4.2.4). |
| `tools` | `list<string>` | OPTIONAL | Default: `["*"]` | Restrict which tools are exposed. |
| `url` | `string` | Conditional | | Endpoint URL. REQUIRED when `registry: false` and `transport` is `http`, `sse`, or `streamable-http`. |
| `command` | `string` | Conditional | | Binary path. REQUIRED when `registry: false` and `transport` is `stdio`. |
| `command` | `string` | Conditional | Single binary path; no embedded whitespace unless `args` is also present | Binary path. REQUIRED when `registry: false` and `transport` is `stdio`. |

#### 4.2.3. Validation Rules for Self-Defined Servers

Expand All @@ -318,6 +318,7 @@ When `registry` is `false`, the following constraints apply:
1. `transport` MUST be present.
2. If `transport` is `stdio`, `command` MUST be present.
3. If `transport` is `http`, `sse`, or `streamable-http`, `url` MUST be present.
4. If `transport` is `stdio`, `command` MUST be a single binary path with no embedded whitespace. APM does not split `command` on whitespace; use `args` for additional arguments. A path that legitimately contains spaces (e.g. `/opt/My App/server`) is allowed when `args` is also provided (including an explicit empty list `args: []`), signaling the author has taken responsibility for the shape.

```yaml
dependencies:
Expand Down
99 changes: 11 additions & 88 deletions src/apm_cli/commands/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,18 +377,16 @@ def _validate_and_add_packages_to_apm_yml(packages, dry_run=False, dev=False, lo
# MCP CLI helpers (W3 --mcp flag)
# ---------------------------------------------------------------------------

# F7 shell-expansion residue scan: tokens that would be evaluated by a real
# shell but are NOT shell-evaluated when an MCP stdio server runs through
# ``execve``-style spawning. Warn so users do not assume shell semantics.
_SHELL_METACHAR_TOKENS = ("$(", "`", ";", "&&", "||", "|", ">>", ">", "<")

# F5 SSRF: well-known cloud metadata endpoints surfaced as constants for
# explicit allow/deny review.
_METADATA_HOSTS = {
"169.254.169.254", # AWS / Azure / GCP IMDS
"100.100.100.200", # Alibaba Cloud
"fd00:ec2::254", # AWS IPv6 IMDS
}
# F7 / F5 install-time MCP warnings live in apm_cli/install/mcp_warnings.py
# per LOC budget. Re-bind module-level names for back-compat with tests
# that still patch ``apm_cli.commands.install._warn_*``.
from ..install.mcp_warnings import (
warn_ssrf_url as _warn_ssrf_url,
warn_shell_metachars as _warn_shell_metachars,
_SHELL_METACHAR_TOKENS,
_METADATA_HOSTS,
_is_internal_or_metadata_host,
)

# --registry helpers live in apm_cli/install/mcp_registry.py per LOC budget.
from ..install.mcp_registry import (
Expand Down Expand Up @@ -610,81 +608,6 @@ def _add_mcp_to_apm_yml(name, entry, *, dev=False, force=False, project_root=Non
return status, diff


def _is_internal_or_metadata_host(host: str) -> bool:
"""Return True when ``host`` resolves/parses to an internal IP.

Covers cloud metadata IPs, loopback, link-local, and RFC1918 ranges.
Defensive against ``ValueError``/``OSError`` from name resolution.
"""
import ipaddress
import socket

if not host:
return False
if host in _METADATA_HOSTS:
return True
candidates: builtins.list = [host]
# Strip brackets from IPv6 literals.
bare = host.strip("[]")
if bare != host:
candidates.append(bare)
# Resolve hostname when it is not already an IP literal.
try:
ipaddress.ip_address(bare)
except ValueError:
try:
resolved = socket.gethostbyname(bare)
candidates.append(resolved)
except (OSError, UnicodeError):
pass
for c in candidates:
try:
ip = ipaddress.ip_address(c)
except ValueError:
continue
if ip.is_loopback or ip.is_link_local or ip.is_private:
return True
if c in _METADATA_HOSTS:
return True
return False


def _warn_ssrf_url(url, logger):
"""F5: warn (do not block) when URL points at an internal/metadata host."""
if not url:
return
try:
from urllib.parse import urlparse
host = urlparse(url).hostname or ""
except (ValueError, TypeError):
return
if _is_internal_or_metadata_host(host):
logger.warning(
f"URL '{url}' points to an internal or metadata address; "
f"verify intent before installing."
)


def _warn_shell_metachars(env, logger):
"""F7: warn (do not block) when env values contain shell metacharacters.

MCP stdio servers spawn via ``execve``-style calls with no shell, so
these characters are passed literally rather than evaluated. Users
who think they are setting ``FOO=$(secret)`` will be surprised.
"""
if not env:
return
for key, value in env.items():
sval = "" if value is None else str(value)
for tok in _SHELL_METACHAR_TOKENS:
if tok in sval:
logger.warning(
f"Env value for '{key}' contains shell metacharacter "
f"'{tok}'; reminder these are NOT shell-evaluated."
)
break


# Mapping for E10: which flags require --mcp. Keyed by attribute-style
# name so we can read directly from the Click handler locals.
_MCP_REQUIRED_FLAGS = (
Expand Down Expand Up @@ -845,7 +768,7 @@ def _run_mcp_install(

# F5 + F7 warnings -- do not block.
_warn_ssrf_url(url, logger)
_warn_shell_metachars(env, logger)
_warn_shell_metachars(env, logger, command=entry.get("command"))

# Write to apm.yml.
status, _diff = _add_mcp_to_apm_yml(
Expand Down
34 changes: 28 additions & 6 deletions src/apm_cli/deps/plugin_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,29 @@
import yaml

from ..utils.path_security import ensure_path_within, PathTraversalError
from ..utils.console import _rich_warning


_logger = logging.getLogger(__name__)


def _surface_warning(message: str, logger: logging.Logger) -> None:
"""Emit a warning to both the stdlib logger and the rich console.

The ``apm`` stdlib logger has no handlers configured by default, so
``logger.warning`` calls are silently dropped in non-debug runs. For
user-visible plugin-parse issues (skipped MCP servers, validation
failures), also route through ``_rich_warning`` so the user sees them
even without ``--verbose``. Falls back gracefully if Rich is unavailable.
"""
logger.warning(message)
try:
_rich_warning(message, symbol="warning")
except Exception:
# Console output is best-effort; never mask the underlying warning.
pass


def _is_within_plugin(candidate: Path, plugin_root: Path, *, component: str) -> bool:
"""Return True iff *candidate* resolves inside *plugin_root*.

Expand Down Expand Up @@ -330,9 +348,10 @@ def _mcp_servers_to_apm_deps(
if "headers" in cfg:
dep["headers"] = cfg["headers"]
else:
logger.warning(
"Skipping MCP server '%s' from plugin '%s': no 'command' or 'url'",
name, plugin_path.name,
_surface_warning(
f"Skipping MCP server '{name}' from plugin "
f"'{plugin_path.name}': no 'command' or 'url'",
logger,
)
continue

Expand All @@ -344,12 +363,15 @@ def _mcp_servers_to_apm_deps(
# Route through the validation chokepoint. Plugins are an ingress
# path: a malicious plugin could otherwise smuggle path traversal,
# CRLF, or unsafe URL schemes that bypass MCPDependency.validate().
# PR #809 follow-up: surface validation errors to the user via the
# rich console (stdlib logger has no handlers configured).
try:
MCPDependency.from_dict(dep)
except (ValueError, Exception) as exc:
logger.warning(
"Skipping invalid MCP server '%s' from plugin '%s': %s",
name, plugin_path.name, exc,
_surface_warning(
f"Skipping invalid MCP server '{name}' from plugin "
f"'{plugin_path.name}': {exc}",
logger,
)
continue

Expand Down
122 changes: 122 additions & 0 deletions src/apm_cli/install/mcp_warnings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""MCP install-time, non-blocking safety warnings (F5 SSRF + F7 shell metachars).

Extracted from ``commands/install.py`` per the architecture-invariants
LOC budget. These checks fire during ``apm install --mcp`` to surface
likely-misconfiguration to the user without blocking the operation.

Categories:

- **F5 (SSRF)**: warn when a self-defined remote MCP URL points at
internal/metadata addresses (loopback, link-local, RFC1918, cloud IMDS).
- **F7 (Shell metachars)**: warn when env values OR the stdio ``command``
field contain shell metacharacters. MCP stdio servers spawn via
``execve``-style calls (no shell), so metacharacters are passed
literally rather than evaluated -- a confused user expecting shell
semantics will get surprising behavior.

These are deliberately *warnings*, not errors, because legitimate paths
exist (e.g. a private homelab MCP server bound to a loopback address).
"""

from __future__ import annotations

import ipaddress
import socket
from typing import Iterable, Optional


# F7: tokens that would be evaluated by a real shell but are NOT evaluated
# when an MCP stdio server runs through ``execve``-style spawning.
_SHELL_METACHAR_TOKENS = ("$(", "`", ";", "&&", "||", "|", ">>", ">", "<")

# F5: well-known cloud metadata endpoints surfaced as constants for
# explicit allow/deny review.
_METADATA_HOSTS = {
"169.254.169.254", # AWS / Azure / GCP IMDS
"100.100.100.200", # Alibaba Cloud
"fd00:ec2::254", # AWS IPv6 IMDS
}


def _is_internal_or_metadata_host(host: str) -> bool:
"""Return True when ``host`` resolves/parses to an internal IP.

Covers cloud metadata IPs, loopback, link-local, and RFC1918 ranges.
Defensive against ``ValueError``/``OSError`` from name resolution.
"""
if not host:
return False
if host in _METADATA_HOSTS:
return True
candidates: list = [host]
# Strip brackets from IPv6 literals.
bare = host.strip("[]")
if bare != host:
candidates.append(bare)
# Resolve hostname when it is not already an IP literal.
try:
ipaddress.ip_address(bare)
except ValueError:
try:
resolved = socket.gethostbyname(bare)
candidates.append(resolved)
except (OSError, UnicodeError):
pass
for c in candidates:
try:
ip = ipaddress.ip_address(c)
except ValueError:
continue
if ip.is_loopback or ip.is_link_local or ip.is_private:
return True
if c in _METADATA_HOSTS:
return True
return False


def warn_ssrf_url(url: Optional[str], logger) -> None:
"""F5: warn (do not block) when URL points at an internal/metadata host."""
if not url:
return
try:
from urllib.parse import urlparse
host = urlparse(url).hostname or ""
except (ValueError, TypeError):
return
if _is_internal_or_metadata_host(host):
logger.warning(
f"URL '{url}' points to an internal or metadata address; "
f"verify intent before installing."
)


def warn_shell_metachars(env, logger, command: Optional[str] = None) -> None:
"""F7: warn (do not block) on shell metacharacters in env values or stdio command.

MCP stdio servers spawn via ``execve``-style calls with no shell, so
these characters are passed literally rather than evaluated. Users who
think they are setting ``FOO=$(secret)`` will be surprised.

Also covers ``command`` itself (e.g. ``command: "npx|curl evil.com"``)
which would otherwise pass the whitespace-rejection guard but still
indicate a confused user expecting shell evaluation.
"""
if env:
for key, value in env.items():
sval = "" if value is None else str(value)
for tok in _SHELL_METACHAR_TOKENS:
if tok in sval:
logger.warning(
f"Env value for '{key}' contains shell metacharacter "
f"'{tok}'; reminder these are NOT shell-evaluated."
)
break
if command and isinstance(command, str):
for tok in _SHELL_METACHAR_TOKENS:
if tok in command:
logger.warning(
f"'command' contains shell metacharacter '{tok}'; "
f"reminder MCP stdio servers run via execve (no shell). "
f"This will be passed literally."
)
break
49 changes: 48 additions & 1 deletion src/apm_cli/models/dependency/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,16 @@ def __repr__(self) -> str:
if self.url:
parts.append(f"url={self.url!r}")
if self.command:
parts.append(f"command={self.command!r}")
# Redact: show only the first whitespace-separated token to avoid
# leaking embedded credentials (e.g. `--token=...`) in repr output
# via debug logs or tracebacks. Mirrors the env/headers redaction
# above and the M1 fix in the validation error message.
if isinstance(self.command, str):
first_tok = self.command.strip().split(maxsplit=1)
preview = first_tok[0] if first_tok else ''
parts.append(f"command={preview!r}")
else:
parts.append(f"command=<{type(self.command).__name__}>")
return f"MCPDependency({', '.join(parts)})"

def validate(self, strict: bool = True) -> None:
Expand Down Expand Up @@ -168,6 +177,12 @@ def validate(self, strict: bool = True) -> None:
f"(CR/LF) not allowed in keys or values"
)
if self.command is not None:
if not isinstance(self.command, str):
raise ValueError(
f"MCP dependency '{self.name}': 'command' must be a string, "
f"got {type(self.command).__name__}. "
f"Use 'args' for the argument list."
)
try:
validate_path_segments(
self.command,
Expand Down Expand Up @@ -205,3 +220,35 @@ def validate(self, strict: bool = True) -> None:
f"Self-defined MCP dependency '{self.name}' with transport "
f"'stdio' requires 'command'"
)
if (
self.transport == 'stdio'
and isinstance(self.command, str)
and any(ch.isspace() for ch in self.command)
and self.args is None
):
# Split on any whitespace (incl. tabs / multiple spaces) so the
# fix-it suggestion matches the validation trigger condition
# (any character.isspace()), not just literal U+0020.
# Note: `args is None` (not `not self.args`) so that an explicit
# `args: []` (e.g., paired with a path like '/opt/My App/server')
# is treated as a deliberate "no extra args" signal and accepted.
command_parts = self.command.strip().split(maxsplit=1)
if not command_parts:
raise ValueError(
f"Self-defined MCP dependency '{self.name}': "
f"'command' is empty or whitespace-only. "
f"Set 'command' to a binary path, e.g. command: npx"
)
first = command_parts[0]
rest_tokens = command_parts[1].split() if len(command_parts) > 1 else []
suggested_args = '[' + ', '.join(f'"{tok}"' for tok in rest_tokens) + ']'
raise ValueError(
"\n".join([
f"'command' contains whitespace in MCP dependency '{self.name}'.",
f" Rule: 'command' must be a single binary path -- APM does not split on whitespace. Use 'args' for additional arguments.",
f" Got: command={first!r} ({len(rest_tokens)} additional args)",
f" Fix: command: {first}",
f" args: {suggested_args}",
f" See: https://microsoft.github.io/apm/guides/mcp-servers/",
])
)
Loading
Loading