diff --git a/src/skillspector/nodes/analyzers/__init__.py b/src/skillspector/nodes/analyzers/__init__.py index 58b3e93..c6d6621 100644 --- a/src/skillspector/nodes/analyzers/__init__.py +++ b/src/skillspector/nodes/analyzers/__init__.py @@ -57,6 +57,9 @@ from skillspector.nodes.analyzers.static_patterns_rogue_agent import ( node as static_patterns_rogue_agent_node, ) +from skillspector.nodes.analyzers.static_patterns_ssrf import ( + node as static_patterns_ssrf_node, +) from skillspector.nodes.analyzers.static_patterns_supply_chain import ( node as static_patterns_supply_chain_node, ) @@ -80,6 +83,7 @@ "static_patterns_memory_poisoning", "static_patterns_tool_misuse", "static_patterns_rogue_agent", + "static_patterns_ssrf", "static_yara", "behavioral_ast", "behavioral_taint_tracking", @@ -103,6 +107,7 @@ "static_patterns_memory_poisoning": static_patterns_memory_poisoning_node, "static_patterns_tool_misuse": static_patterns_tool_misuse_node, "static_patterns_rogue_agent": static_patterns_rogue_agent_node, + "static_patterns_ssrf": static_patterns_ssrf_node, "static_yara": static_yara_node, "behavioral_ast": behavioral_ast_node, "behavioral_taint_tracking": behavioral_taint_tracking_node, diff --git a/src/skillspector/nodes/analyzers/pattern_defaults.py b/src/skillspector/nodes/analyzers/pattern_defaults.py index 0d32e17..a2adecf 100644 --- a/src/skillspector/nodes/analyzers/pattern_defaults.py +++ b/src/skillspector/nodes/analyzers/pattern_defaults.py @@ -38,6 +38,7 @@ class PatternCategory(StrEnum): YARA_MATCH = "YARA Match" MCP_LEAST_PRIVILEGE = "MCP Least Privilege" MCP_TOOL_POISONING = "MCP Tool Poisoning" + SERVER_SIDE_REQUEST_FORGERY = "Server-Side Request Forgery" # Pattern-specific explanations (why the finding is dangerous) @@ -119,6 +120,10 @@ class PatternCategory(StrEnum): "TP2": "Unicode deception detected in skill identifiers or descriptions. Homoglyphs, RTL overrides, or invisible characters can make malicious content appear benign.", "TP3": "Instruction injection patterns found in parameter descriptions or default values. Parameter metadata is read by LLMs and can override intended behavior.", "TP4": "Skill description does not match actual code behavior. The declared purpose diverges from what the code actually does, indicating possible deception.", + # Server-Side Request Forgery (SSRF) + "SSRF1": "Code accesses a cloud instance metadata endpoint (e.g. 169.254.169.254). A single request can return temporary IAM credentials, making this a high-value SSRF target for credential theft.", + "SSRF2": "Code issues a request to a loopback, link-local, or private-range host. This can reach internal services not meant to be exposed and is a common SSRF pivot.", + "SSRF3": "Request target host is built from a dynamic or untrusted value. If the host is attacker-influenced, this enables SSRF to arbitrary internal or metadata endpoints.", } # Rule ID -> category (for report output) @@ -182,6 +187,10 @@ class PatternCategory(StrEnum): "TP2": PatternCategory.MCP_TOOL_POISONING.value, "TP3": PatternCategory.MCP_TOOL_POISONING.value, "TP4": PatternCategory.MCP_TOOL_POISONING.value, + # Server-Side Request Forgery + "SSRF1": PatternCategory.SERVER_SIDE_REQUEST_FORGERY.value, + "SSRF2": PatternCategory.SERVER_SIDE_REQUEST_FORGERY.value, + "SSRF3": PatternCategory.SERVER_SIDE_REQUEST_FORGERY.value, } # Rule ID -> pattern display name (for report output) @@ -245,6 +254,10 @@ class PatternCategory(StrEnum): "TP2": "Unicode Deception", "TP3": "Parameter Description Injection", "TP4": "Description-Behavior Mismatch", + # Server-Side Request Forgery + "SSRF1": "Cloud Metadata Access", + "SSRF2": "Internal Network Request", + "SSRF3": "Dynamic Request Target", } # Pattern-specific remediations (how to fix the issue) @@ -326,6 +339,10 @@ class PatternCategory(StrEnum): "TP2": "Replace non-ASCII characters in identifiers with ASCII equivalents. Remove RTL override and invisible formatting characters.", "TP3": "Remove injection patterns, system tokens, and suspicious content from parameter descriptions and default values.", "TP4": "Update the skill description to accurately reflect all capabilities, or remove undeclared functionality.", + # Server-Side Request Forgery + "SSRF1": "Remove access to cloud metadata endpoints unless strictly required. If metadata is needed, restrict it (e.g. IMDSv2 with hop limit) and never expose returned credentials.", + "SSRF2": "Avoid requests to loopback/link-local/private hosts from skill code. If internal access is intended, document it and validate the target against an allowlist.", + "SSRF3": "Do not build request URLs from untrusted input. Validate the host against an allowlist and reject internal/metadata addresses before issuing the request.", } diff --git a/src/skillspector/nodes/analyzers/static_patterns_ssrf.py b/src/skillspector/nodes/analyzers/static_patterns_ssrf.py new file mode 100644 index 0000000..593c76a --- /dev/null +++ b/src/skillspector/nodes/analyzers/static_patterns_ssrf.py @@ -0,0 +1,102 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Static patterns: server-side request forgery (SSRF1–SSRF3). Node and analyze() in one module.""" + +from __future__ import annotations + +import re +import sys + +from skillspector.logging_config import get_logger +from skillspector.models import AnalyzerFinding, Location, Severity +from skillspector.state import AnalyzerNodeResponse, SkillspectorState + +from . import static_runner +from .common import get_context, get_line_number +from .pattern_defaults import PatternCategory + +logger = get_logger(__name__) + +ANALYZER_ID = "static_patterns_ssrf" + +# Request-issuing functions across Python and JS, used to anchor SSRF matches. +_REQ = r"(?:requests|httpx|aiohttp|urllib(?:\.request)?|urllib3|session)\s*\.\s*(?:get|post|put|patch|delete|head|request|urlopen)|fetch|axios(?:\.\w+)?|XMLHttpRequest|\bcurl\b|\bwget\b" + +# SSRF1: Cloud instance metadata endpoints (credential theft). +SSRF1_PATTERNS = [ + (r"169\.254\.169\.254", 0.9), # AWS / GCP / Azure / OpenStack IMDS + (r"metadata\.google\.internal", 0.9), + (r"100\.100\.100\.200", 0.85), # Alibaba Cloud + (r"fd00:ec2::254", 0.85), # AWS IMDS over IPv6 + ( + r"(?:read|fetch|get|query)\s+(?:the\s+)?(?:instance\s+)?metadata\s+(?:service|endpoint|server)", + 0.6, + ), +] + +# SSRF2: Requests to loopback / link-local / private (internal) hosts. +SSRF2_PATTERNS = [ + ( + rf"(?:{_REQ})\s*\(\s*f?['\"]https?://(?:localhost|127\.0\.0\.1|0\.0\.0\.0|\[::1\]|10\.\d|192\.168\.|172\.(?:1[6-9]|2\d|3[01])\.)", + 0.7, + ), +] + +# SSRF3: Request URL whose host is built from an untrusted/dynamic value. +SSRF3_PATTERNS = [ + ( + rf"(?:{_REQ})\s*\(\s*f['\"]https?://\{{", + 0.6, + ), + (r"fetch\s*\(\s*`https?://\$\{", 0.6), +] + + +def analyze(content: str, file_path: str, file_type: str) -> list[AnalyzerFinding]: + """Analyze content for server-side request forgery patterns (SSRF1–SSRF3).""" + findings: list[AnalyzerFinding] = [] + tag = [PatternCategory.SERVER_SIDE_REQUEST_FORGERY.value] + + def add( + rule_id: str, message: str, severity: Severity, patterns: list[tuple[str, float]] + ) -> None: + for pattern, confidence in patterns: + for match in re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE): + line_num = get_line_number(content, match.start()) + findings.append( + AnalyzerFinding( + rule_id=rule_id, + message=message, + severity=severity, + location=Location(file=file_path, start_line=line_num), + confidence=confidence, + tags=tag, + context=get_context(content, match.start()), + matched_text=match.group(0)[:200], + ) + ) + + add("SSRF1", "Cloud Metadata Access", Severity.HIGH, SSRF1_PATTERNS) + add("SSRF2", "Internal Network Request", Severity.MEDIUM, SSRF2_PATTERNS) + add("SSRF3", "Dynamic Request Target", Severity.MEDIUM, SSRF3_PATTERNS) + return findings + + +def node(state: SkillspectorState) -> AnalyzerNodeResponse: + """Run SSRF patterns and return findings.""" + findings = static_runner.run_static_patterns(state, [sys.modules[__name__]]) + logger.info("%s: %d findings", ANALYZER_ID, len(findings)) + return {"findings": findings} diff --git a/tests/nodes/analyzers/test_registry.py b/tests/nodes/analyzers/test_registry.py index 0459901..757bd58 100644 --- a/tests/nodes/analyzers/test_registry.py +++ b/tests/nodes/analyzers/test_registry.py @@ -33,6 +33,7 @@ "static_patterns_memory_poisoning", "static_patterns_tool_misuse", "static_patterns_rogue_agent", + "static_patterns_ssrf", "static_yara", "behavioral_ast", "behavioral_taint_tracking", diff --git a/tests/nodes/analyzers/test_static_patterns.py b/tests/nodes/analyzers/test_static_patterns.py index fbcac38..503cfd2 100644 --- a/tests/nodes/analyzers/test_static_patterns.py +++ b/tests/nodes/analyzers/test_static_patterns.py @@ -26,6 +26,9 @@ from skillspector.nodes.analyzers import ( static_patterns_prompt_injection as prompt_injection_module, ) +from skillspector.nodes.analyzers import ( + static_patterns_ssrf as ssrf_module, +) from skillspector.nodes.analyzers import ( static_patterns_supply_chain as supply_chain_module, ) @@ -172,3 +175,82 @@ def test_empty_components_returns_empty(self): state = {"components": [], "file_cache": {}} findings = static_runner.run_static_patterns(state, [prompt_injection_module]) assert findings == [] + + +class TestRunStaticPatternsSSRF: + """run_static_patterns with ssrf: SSRF1, SSRF2, SSRF3.""" + + def test_ssrf1_cloud_metadata_produces_finding(self): + """A request to the cloud metadata IP yields SSRF1 (HIGH).""" + state = { + "components": ["fetch.py"], + "file_cache": { + "fetch.py": ( + "import requests\n" + 'requests.get("http://169.254.169.254/latest/meta-data/iam/security-credentials/")\n' + ), + }, + } + findings = static_runner.run_static_patterns(state, [ssrf_module]) + ssrf1 = [f for f in findings if f.rule_id == "SSRF1"] + assert len(ssrf1) >= 1 + assert ssrf1[0].severity == "HIGH" + assert ssrf1[0].remediation is not None + + def test_ssrf2_internal_host_produces_finding(self): + """A request to an internal/loopback host yields SSRF2 (MEDIUM).""" + state = { + "components": ["fetch.py"], + "file_cache": { + "fetch.py": 'import requests\nrequests.get("http://127.0.0.1:8080/admin")\n', + }, + } + findings = static_runner.run_static_patterns(state, [ssrf_module]) + ssrf2 = [f for f in findings if f.rule_id == "SSRF2"] + assert len(ssrf2) >= 1 + assert ssrf2[0].severity == "MEDIUM" + + def test_ssrf3_dynamic_host_produces_finding(self): + """A request whose host is built from a variable yields SSRF3.""" + state = { + "components": ["fetch.py"], + "file_cache": { + "fetch.py": 'import requests\nrequests.get(f"http://{user_host}/internal")\n', + }, + } + findings = static_runner.run_static_patterns(state, [ssrf_module]) + assert any(f.rule_id == "SSRF3" for f in findings) + + def test_metadata_ip_not_double_flagged(self): + """The metadata IP is SSRF1 only, not also SSRF2 (no same-line duplicate).""" + state = { + "components": ["fetch.py"], + "file_cache": { + "fetch.py": 'import requests\nrequests.get("http://169.254.169.254/")\n', + }, + } + findings = static_runner.run_static_patterns(state, [ssrf_module]) + ids = {f.rule_id for f in findings} + assert "SSRF1" in ids and "SSRF2" not in ids + + def test_normal_external_request_not_flagged(self): + """A request to a normal public HTTPS host produces no SSRF finding.""" + state = { + "components": ["fetch.py"], + "file_cache": { + "fetch.py": 'import requests\nrequests.get("https://api.github.com/repos/x/y")\n', + }, + } + findings = static_runner.run_static_patterns(state, [ssrf_module]) + assert [f for f in findings if f.rule_id.startswith("SSRF")] == [] + + def test_node_runs_over_state(self): + """The node entrypoint runs the analyzer over state and returns findings.""" + state = { + "components": ["fetch.py"], + "file_cache": { + "fetch.py": 'import requests\nrequests.get("http://169.254.169.254/")\n' + }, + } + result = ssrf_module.node(state) + assert any(f.rule_id == "SSRF1" for f in result["findings"])