Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/skillspector/nodes/analyzers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
from skillspector.nodes.analyzers.static_patterns_rogue_agent import (
node as static_patterns_rogue_agent_node,
)
from skillspector.nodes.analyzers.static_patterns_ssrf import (
node as static_patterns_ssrf_node,
)
from skillspector.nodes.analyzers.static_patterns_supply_chain import (
node as static_patterns_supply_chain_node,
)
Expand All @@ -80,6 +83,7 @@
"static_patterns_memory_poisoning",
"static_patterns_tool_misuse",
"static_patterns_rogue_agent",
"static_patterns_ssrf",
"static_yara",
"behavioral_ast",
"behavioral_taint_tracking",
Expand All @@ -103,6 +107,7 @@
"static_patterns_memory_poisoning": static_patterns_memory_poisoning_node,
"static_patterns_tool_misuse": static_patterns_tool_misuse_node,
"static_patterns_rogue_agent": static_patterns_rogue_agent_node,
"static_patterns_ssrf": static_patterns_ssrf_node,
"static_yara": static_yara_node,
"behavioral_ast": behavioral_ast_node,
"behavioral_taint_tracking": behavioral_taint_tracking_node,
Expand Down
17 changes: 17 additions & 0 deletions src/skillspector/nodes/analyzers/pattern_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class PatternCategory(StrEnum):
YARA_MATCH = "YARA Match"
MCP_LEAST_PRIVILEGE = "MCP Least Privilege"
MCP_TOOL_POISONING = "MCP Tool Poisoning"
SERVER_SIDE_REQUEST_FORGERY = "Server-Side Request Forgery"


# Pattern-specific explanations (why the finding is dangerous)
Expand Down Expand Up @@ -119,6 +120,10 @@ class PatternCategory(StrEnum):
"TP2": "Unicode deception detected in skill identifiers or descriptions. Homoglyphs, RTL overrides, or invisible characters can make malicious content appear benign.",
"TP3": "Instruction injection patterns found in parameter descriptions or default values. Parameter metadata is read by LLMs and can override intended behavior.",
"TP4": "Skill description does not match actual code behavior. The declared purpose diverges from what the code actually does, indicating possible deception.",
# Server-Side Request Forgery (SSRF)
"SSRF1": "Code accesses a cloud instance metadata endpoint (e.g. 169.254.169.254). A single request can return temporary IAM credentials, making this a high-value SSRF target for credential theft.",
"SSRF2": "Code issues a request to a loopback, link-local, or private-range host. This can reach internal services not meant to be exposed and is a common SSRF pivot.",
"SSRF3": "Request target host is built from a dynamic or untrusted value. If the host is attacker-influenced, this enables SSRF to arbitrary internal or metadata endpoints.",
}

# Rule ID -> category (for report output)
Expand Down Expand Up @@ -182,6 +187,10 @@ class PatternCategory(StrEnum):
"TP2": PatternCategory.MCP_TOOL_POISONING.value,
"TP3": PatternCategory.MCP_TOOL_POISONING.value,
"TP4": PatternCategory.MCP_TOOL_POISONING.value,
# Server-Side Request Forgery
"SSRF1": PatternCategory.SERVER_SIDE_REQUEST_FORGERY.value,
"SSRF2": PatternCategory.SERVER_SIDE_REQUEST_FORGERY.value,
"SSRF3": PatternCategory.SERVER_SIDE_REQUEST_FORGERY.value,
}

# Rule ID -> pattern display name (for report output)
Expand Down Expand Up @@ -245,6 +254,10 @@ class PatternCategory(StrEnum):
"TP2": "Unicode Deception",
"TP3": "Parameter Description Injection",
"TP4": "Description-Behavior Mismatch",
# Server-Side Request Forgery
"SSRF1": "Cloud Metadata Access",
"SSRF2": "Internal Network Request",
"SSRF3": "Dynamic Request Target",
}

# Pattern-specific remediations (how to fix the issue)
Expand Down Expand Up @@ -326,6 +339,10 @@ class PatternCategory(StrEnum):
"TP2": "Replace non-ASCII characters in identifiers with ASCII equivalents. Remove RTL override and invisible formatting characters.",
"TP3": "Remove injection patterns, system tokens, and suspicious content from parameter descriptions and default values.",
"TP4": "Update the skill description to accurately reflect all capabilities, or remove undeclared functionality.",
# Server-Side Request Forgery
"SSRF1": "Remove access to cloud metadata endpoints unless strictly required. If metadata is needed, restrict it (e.g. IMDSv2 with hop limit) and never expose returned credentials.",
"SSRF2": "Avoid requests to loopback/link-local/private hosts from skill code. If internal access is intended, document it and validate the target against an allowlist.",
"SSRF3": "Do not build request URLs from untrusted input. Validate the host against an allowlist and reject internal/metadata addresses before issuing the request.",
}


Expand Down
102 changes: 102 additions & 0 deletions src/skillspector/nodes/analyzers/static_patterns_ssrf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Static patterns: server-side request forgery (SSRF1–SSRF3). Node and analyze() in one module."""

from __future__ import annotations

import re
import sys

from skillspector.logging_config import get_logger
from skillspector.models import AnalyzerFinding, Location, Severity
from skillspector.state import AnalyzerNodeResponse, SkillspectorState

from . import static_runner
from .common import get_context, get_line_number
from .pattern_defaults import PatternCategory

logger = get_logger(__name__)

ANALYZER_ID = "static_patterns_ssrf"

# Request-issuing functions across Python and JS, used to anchor SSRF matches.
_REQ = r"(?:requests|httpx|aiohttp|urllib(?:\.request)?|urllib3|session)\s*\.\s*(?:get|post|put|patch|delete|head|request|urlopen)|fetch|axios(?:\.\w+)?|XMLHttpRequest|\bcurl\b|\bwget\b"

# SSRF1: Cloud instance metadata endpoints (credential theft).
SSRF1_PATTERNS = [
(r"169\.254\.169\.254", 0.9), # AWS / GCP / Azure / OpenStack IMDS
(r"metadata\.google\.internal", 0.9),
(r"100\.100\.100\.200", 0.85), # Alibaba Cloud
(r"fd00:ec2::254", 0.85), # AWS IMDS over IPv6
(
r"(?:read|fetch|get|query)\s+(?:the\s+)?(?:instance\s+)?metadata\s+(?:service|endpoint|server)",
0.6,
),
]

# SSRF2: Requests to loopback / link-local / private (internal) hosts.
SSRF2_PATTERNS = [
(
rf"(?:{_REQ})\s*\(\s*f?['\"]https?://(?:localhost|127\.0\.0\.1|0\.0\.0\.0|\[::1\]|10\.\d|192\.168\.|172\.(?:1[6-9]|2\d|3[01])\.)",
0.7,
),
]

# SSRF3: Request URL whose host is built from an untrusted/dynamic value.
SSRF3_PATTERNS = [
(
rf"(?:{_REQ})\s*\(\s*f['\"]https?://\{{",
0.6,
),
(r"fetch\s*\(\s*`https?://\$\{", 0.6),
]


def analyze(content: str, file_path: str, file_type: str) -> list[AnalyzerFinding]:
"""Analyze content for server-side request forgery patterns (SSRF1–SSRF3)."""
findings: list[AnalyzerFinding] = []
tag = [PatternCategory.SERVER_SIDE_REQUEST_FORGERY.value]

def add(
rule_id: str, message: str, severity: Severity, patterns: list[tuple[str, float]]
) -> None:
for pattern, confidence in patterns:
for match in re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE):
line_num = get_line_number(content, match.start())
findings.append(
AnalyzerFinding(
rule_id=rule_id,
message=message,
severity=severity,
location=Location(file=file_path, start_line=line_num),
confidence=confidence,
tags=tag,
context=get_context(content, match.start()),
matched_text=match.group(0)[:200],
)
)

add("SSRF1", "Cloud Metadata Access", Severity.HIGH, SSRF1_PATTERNS)
add("SSRF2", "Internal Network Request", Severity.MEDIUM, SSRF2_PATTERNS)
add("SSRF3", "Dynamic Request Target", Severity.MEDIUM, SSRF3_PATTERNS)
return findings


def node(state: SkillspectorState) -> AnalyzerNodeResponse:
"""Run SSRF patterns and return findings."""
findings = static_runner.run_static_patterns(state, [sys.modules[__name__]])
logger.info("%s: %d findings", ANALYZER_ID, len(findings))
return {"findings": findings}
1 change: 1 addition & 0 deletions tests/nodes/analyzers/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"static_patterns_memory_poisoning",
"static_patterns_tool_misuse",
"static_patterns_rogue_agent",
"static_patterns_ssrf",
"static_yara",
"behavioral_ast",
"behavioral_taint_tracking",
Expand Down
82 changes: 82 additions & 0 deletions tests/nodes/analyzers/test_static_patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
from skillspector.nodes.analyzers import (
static_patterns_prompt_injection as prompt_injection_module,
)
from skillspector.nodes.analyzers import (
static_patterns_ssrf as ssrf_module,
)
from skillspector.nodes.analyzers import (
static_patterns_supply_chain as supply_chain_module,
)
Expand Down Expand Up @@ -172,3 +175,82 @@ def test_empty_components_returns_empty(self):
state = {"components": [], "file_cache": {}}
findings = static_runner.run_static_patterns(state, [prompt_injection_module])
assert findings == []


class TestRunStaticPatternsSSRF:
"""run_static_patterns with ssrf: SSRF1, SSRF2, SSRF3."""

def test_ssrf1_cloud_metadata_produces_finding(self):
"""A request to the cloud metadata IP yields SSRF1 (HIGH)."""
state = {
"components": ["fetch.py"],
"file_cache": {
"fetch.py": (
"import requests\n"
'requests.get("http://169.254.169.254/latest/meta-data/iam/security-credentials/")\n'
),
},
}
findings = static_runner.run_static_patterns(state, [ssrf_module])
ssrf1 = [f for f in findings if f.rule_id == "SSRF1"]
assert len(ssrf1) >= 1
assert ssrf1[0].severity == "HIGH"
assert ssrf1[0].remediation is not None

def test_ssrf2_internal_host_produces_finding(self):
"""A request to an internal/loopback host yields SSRF2 (MEDIUM)."""
state = {
"components": ["fetch.py"],
"file_cache": {
"fetch.py": 'import requests\nrequests.get("http://127.0.0.1:8080/admin")\n',
},
}
findings = static_runner.run_static_patterns(state, [ssrf_module])
ssrf2 = [f for f in findings if f.rule_id == "SSRF2"]
assert len(ssrf2) >= 1
assert ssrf2[0].severity == "MEDIUM"

def test_ssrf3_dynamic_host_produces_finding(self):
"""A request whose host is built from a variable yields SSRF3."""
state = {
"components": ["fetch.py"],
"file_cache": {
"fetch.py": 'import requests\nrequests.get(f"http://{user_host}/internal")\n',
},
}
findings = static_runner.run_static_patterns(state, [ssrf_module])
assert any(f.rule_id == "SSRF3" for f in findings)

def test_metadata_ip_not_double_flagged(self):
"""The metadata IP is SSRF1 only, not also SSRF2 (no same-line duplicate)."""
state = {
"components": ["fetch.py"],
"file_cache": {
"fetch.py": 'import requests\nrequests.get("http://169.254.169.254/")\n',
},
}
findings = static_runner.run_static_patterns(state, [ssrf_module])
ids = {f.rule_id for f in findings}
assert "SSRF1" in ids and "SSRF2" not in ids

def test_normal_external_request_not_flagged(self):
"""A request to a normal public HTTPS host produces no SSRF finding."""
state = {
"components": ["fetch.py"],
"file_cache": {
"fetch.py": 'import requests\nrequests.get("https://api.github.com/repos/x/y")\n',
},
}
findings = static_runner.run_static_patterns(state, [ssrf_module])
assert [f for f in findings if f.rule_id.startswith("SSRF")] == []

def test_node_runs_over_state(self):
"""The node entrypoint runs the analyzer over state and returns findings."""
state = {
"components": ["fetch.py"],
"file_cache": {
"fetch.py": 'import requests\nrequests.get("http://169.254.169.254/")\n'
},
}
result = ssrf_module.node(state)
assert any(f.rule_id == "SSRF1" for f in result["findings"])