diff --git a/main.py b/main.py index bf9207c9..cdc32b66 100755 --- a/main.py +++ b/main.py @@ -12,6 +12,7 @@ from torbot.modules.updater import check_version from torbot.modules.info import execute_all, fetch_html from torbot.modules.linktree import LinkTree +from torbot.modules.deep_extract import DeepExtractor def print_tor_ip_address(client: httpx.Client) -> None: @@ -52,6 +53,63 @@ def print_header(version: str) -> None: print(title) +def handle_deep_extraction(tree: LinkTree, client: httpx.Client, export_path: str = None) -> None: + """ + Handle deep content extraction from crawled pages. + + Args: + tree: LinkTree object with crawled URLs + client: HTTP client for making requests + export_path: Optional path to export intelligence data + """ + logging.info("Starting deep content extraction...") + deep_extractor = DeepExtractor() + + # Extract content from each page in the tree + pages_analyzed = 0 + for node_url in tree.nodes: + try: + logging.debug(f"Extracting from: {node_url}") + response = client.get(node_url) + if response.status_code == 200: + deep_extractor.extract_all(response.text, node_url) + pages_analyzed += 1 + except Exception as e: + logging.warning(f"Could not extract from {node_url}: {str(e)}") + + logging.info(f"Deep extraction complete. Analyzed {pages_analyzed} pages.") + + # Print summary + deep_extractor.print_summary() + + # Export to JSON if requested + if export_path: + logging.info(f"Exporting intelligence to {export_path}...") + deep_extractor.export_to_json(export_path) + + # Also create a text report + base_path = export_path.rsplit('.', 1)[0] if '.' in export_path else export_path + text_report_path = f"{base_path}_report.txt" + deep_extractor.export_to_text(text_report_path) + logging.info(f"Text report saved to {text_report_path}") + + +def handle_visualization(tree: LinkTree, visualize_mode: str = None) -> None: + """ + Handle visualization of crawled data. + + Args: + tree: LinkTree object with crawled data + visualize_mode: Visualization mode (table, tree, json) + """ + if visualize_mode == "table" or not visualize_mode: + tree.showTable() + elif visualize_mode == "tree": + print(tree) + elif visualize_mode == "json": + tree.showJSON() + + def run(arg_parser: argparse.ArgumentParser, version: str) -> None: args = arg_parser.parse_args() @@ -66,7 +124,7 @@ def run(arg_parser: argparse.ArgumentParser, version: str) -> None: arg_parser.print_help() sys.exit() - # Print verison then exit + # Print version then exit if args.version: print(f"TorBot Version: {version}") sys.exit() @@ -93,6 +151,10 @@ def run(arg_parser: argparse.ArgumentParser, version: str) -> None: tree = LinkTree(url=args.url, depth=args.depth, client=client) tree.load() + # Deep extraction if requested + if args.deep_extract: + handle_deep_extraction(tree, client, args.export_intel) + # save data if desired if args.save == "tree": tree.save() @@ -105,12 +167,7 @@ def run(arg_parser: argparse.ArgumentParser, version: str) -> None: fetch_html(client, args.url, tree, save_html=True) # always print something, table is the default - if args.visualize == "table" or not args.visualize: - tree.showTable() - elif args.visualize == "tree": - print(tree) - elif args.visualize == "json": - tree.showJSON() + handle_visualization(tree, args.visualize) print("\n\n") @@ -123,10 +180,10 @@ def set_arguments() -> argparse.ArgumentParser: prog="TorBot", usage="Gather and analayze data from Tor sites." ) parser.add_argument( - "-u", "--url", type=str, required=True, help="Specifiy a website link to crawl" + "-u", "--url", type=str, required=True, help="Specify a website link to crawl" ) parser.add_argument( - "--depth", type=int, help="Specifiy max depth of crawler (default 1)", default=1 + "--depth", type=int, help="Specify max depth of crawler (default 1)", default=1 ) parser.add_argument( "--host", type=str, help="IP address for SOCKS5 proxy", default="127.0.0.1" @@ -162,9 +219,16 @@ def set_arguments() -> argparse.ArgumentParser: help="Executes HTTP requests without using SOCKS5 proxy", ) parser.add_argument( - "--html", - choices=["save", "display"], - help="Saves / Displays the html of the onion link", + + "--deep-extract", + action="store_true", + help="Enable deep content extraction mode for OSINT intelligence gathering", + ) + parser.add_argument( + "--export-intel", + type=str, + metavar="FILENAME", + help="Export extracted intelligence to JSON file (use with --deep-extract)", ) return parser diff --git a/requirements.txt b/requirements.txt index c430132c..8d3ee09a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -256,19 +256,4 @@ urllib3==2.5.0 ; python_version >= "3.9" and python_full_version <= "3.11.4" \ validators==0.20.0 ; python_version >= "3.9" and python_full_version <= "3.11.4" \ --hash=sha256:24148ce4e64100a2d5e267233e23e7afeb55316b47d30faae7eb6e7292bc226a yattag==1.15.1 ; python_version >= "3.9" and python_full_version <= "3.11.4" \ - --hash=sha256:960fa54be1229d96f43178133e0b195c003391fdc49ecdb6b69b7374db6be416 - -numpy~=1.24.4 -beautifulsoup4~=4.11.1 -sklearn~=0.0 -scikit-learn~=1.3.0 -httpx[socks]~=0.25.0 -yattag~=1.15.1 -termcolor~=1.1.0 -python-dotenv~=0.20.0 -Unipath~=1.1 -validators~=0.20.0 -phonenumbers~=8.13.22 -tabulate~=0.9.0 -treelib~=1.7.0 -toml~=0.10.2 \ No newline at end of file + --hash=sha256:960fa54be1229d96f43178133e0b195c003391fdc49ecdb6b69b7374db6be416 \ No newline at end of file diff --git a/src/torbot/modules/deep_extract/__init__.py b/src/torbot/modules/deep_extract/__init__.py new file mode 100644 index 00000000..c25612de --- /dev/null +++ b/src/torbot/modules/deep_extract/__init__.py @@ -0,0 +1,12 @@ +""" +Deep Web Content Extraction Module + +This module provides comprehensive content extraction and intelligence gathering +capabilities for dark web OSINT investigations. +""" + +from .orchestrator import DeepExtractor +from .base import BaseExtractor, ExtractionResult + +__all__ = ['DeepExtractor', 'BaseExtractor', 'ExtractionResult'] + diff --git a/src/torbot/modules/deep_extract/base.py b/src/torbot/modules/deep_extract/base.py new file mode 100644 index 00000000..24b3460f --- /dev/null +++ b/src/torbot/modules/deep_extract/base.py @@ -0,0 +1,190 @@ +""" +Base classes and utilities for deep content extraction +""" + +import re +from typing import List, Dict, Any, Optional +from dataclasses import dataclass, field +from abc import ABC, abstractmethod +from datetime import datetime + + +@dataclass +class ExtractionResult: + """Container for extracted intelligence data""" + + category: str # Type of extraction (credentials, pii, crypto, etc.) + confidence: float # Confidence score (0.0 to 1.0) + risk_level: str # low, medium, high, critical + data: Dict[str, Any] # The actual extracted data + context: Optional[str] = None # Surrounding context + location: Optional[str] = None # Location in page (URL, line number, etc.) + timestamp: datetime = field(default_factory=datetime.now) + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for JSON serialization""" + return { + 'category': self.category, + 'confidence': self.confidence, + 'risk_level': self.risk_level, + 'data': self.data, + 'context': self.context, + 'location': self.location, + 'timestamp': self.timestamp.isoformat() + } + + +class BaseExtractor(ABC): + """Base class for all content extractors""" + + def __init__(self): + self.results: List[ExtractionResult] = [] + + @abstractmethod + def extract(self, text: str, url: str = "") -> List[ExtractionResult]: + """ + Extract intelligence from text content + + Args: + text: The text content to analyze + url: The source URL (optional) + + Returns: + List of ExtractionResult objects + """ + pass + + def get_context(self, text: str, match_start: int, match_end: int, + context_chars: int = 100) -> str: + """ + Extract surrounding context for a match + + Args: + text: Full text content + match_start: Start position of match + match_end: End position of match + context_chars: Number of characters to include on each side + + Returns: + Context string + """ + start = max(0, match_start - context_chars) + end = min(len(text), match_end + context_chars) + context = text[start:end] + + # Clean up context + context = context.replace('\n', ' ').replace('\r', ' ') + context = re.sub(r'\s+', ' ', context).strip() + + return context + + def calculate_risk_level(self, data_type: str, confidence: float) -> str: + """ + Calculate risk level based on data type and confidence + + Args: + data_type: Type of sensitive data found + confidence: Confidence score + + Returns: + Risk level string + """ + critical_types = ['password', 'ssn', 'credit_card', 'api_key', 'private_key'] + high_types = ['email', 'phone', 'bitcoin', 'credential_dump'] + medium_types = ['onion_link', 'ip_address', 'hash'] + + if data_type in critical_types and confidence > 0.7: + return 'critical' + elif data_type in high_types and confidence > 0.6: + return 'high' + elif data_type in medium_types and confidence > 0.5: + return 'medium' + else: + return 'low' + + +class RegexPatterns: + """Common regex patterns for extraction""" + + # Email patterns + EMAIL = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' + + # Cryptocurrency addresses + BITCOIN = r'\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b' + ETHEREUM = r'\b0x[a-fA-F0-9]{40}\b' + MONERO = r'\b4[0-9AB][1-9A-HJ-NP-Za-km-z]{93}\b' + LITECOIN = r'\b[LM3][a-km-zA-HJ-NP-Z1-9]{26,33}\b' + + # Onion links + ONION_V2 = r'\b[a-z2-7]{16}\.onion\b' + ONION_V3 = r'\b[a-z2-7]{56}\.onion\b' + + # Network indicators + IPV4 = r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b' + IPV6 = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b' + DOMAIN = r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b' + + # PII + PHONE = r'\b(?:\+?1[-.\s]?)?\(?([0-9]{3})\)?[-.\s]?([0-9]{3})[-.\s]?([0-9]{4})\b' + SSN = r'\b(?!000|666|9\d{2})\d{3}-(?!00)\d{2}-(?!0000)\d{4}\b' + CREDIT_CARD = r'\b(?:4[0-9]{12}(?:[0-9]{3})?|5[1-5][0-9]{14}|3[47][0-9]{13}|3(?:0[0-5]|[68][0-9])[0-9]{11}|6(?:011|5[0-9]{2})[0-9]{12})\b' + + # Credentials + USERNAME_PASSWORD = r'(?i)(?:username|user|login|email)[\s:=]+([^\s:]+)[\s\n\r]*(?:password|pass|pwd)[\s:=]+([^\s\n\r]+)' + API_KEY_AWS = r'\b(?:AKIA|ASIA)[0-9A-Z]{16}\b' + API_KEY_GENERIC = r'\b[a-zA-Z0-9_-]{32,}\b' + JWT_TOKEN = r'\beyJ[A-Za-z0-9_-]*\.eyJ[A-Za-z0-9_-]*\.[A-Za-z0-9_-]*\b' + + # Hashes + MD5 = r'\b[a-fA-F0-9]{32}\b' + SHA1 = r'\b[a-fA-F0-9]{40}\b' + SHA256 = r'\b[a-fA-F0-9]{64}\b' + + # Communication + PGP_KEY = r'-----BEGIN PGP (?:PUBLIC|PRIVATE) KEY BLOCK-----' + PGP_FINGERPRINT = r'\b[0-9A-F]{40}\b' + JABBER = r'\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}(?:\s|$)' + TELEGRAM = r'(?:@|t\.me/)[a-zA-Z0-9_]{5,32}' + WICKR = r'(?i)wickr(?:\s*:?\s*|me\s*:?\s*)([a-zA-Z0-9_-]{5,20})' + + # CVE + CVE = r'\bCVE-\d{4}-\d{4,7}\b' + + +class LuhnValidator: + """Luhn algorithm for credit card validation""" + + @staticmethod + def validate(number: str) -> bool: + """ + Validate credit card number using Luhn algorithm + + Args: + number: Credit card number string + + Returns: + True if valid, False otherwise + """ + try: + # Remove any spaces or dashes + number = number.replace(' ', '').replace('-', '') + + if not number.isdigit(): + return False + + # Luhn algorithm + total = 0 + reverse_digits = number[::-1] + + for i, digit in enumerate(reverse_digits): + n = int(digit) + if i % 2 == 1: + n *= 2 + if n > 9: + n -= 9 + total += n + + return total % 10 == 0 + except (ValueError, AttributeError): + return False + diff --git a/src/torbot/modules/deep_extract/breach_detector.py b/src/torbot/modules/deep_extract/breach_detector.py new file mode 100644 index 00000000..f1eb5f91 --- /dev/null +++ b/src/torbot/modules/deep_extract/breach_detector.py @@ -0,0 +1,299 @@ +""" +Data Breach Detection Extractor +""" + +import re +from typing import List, Dict +from .base import BaseExtractor, ExtractionResult + + +class BreachDetector(BaseExtractor): + """Detect and analyze data breach dumps and credential leaks""" + + def extract(self, text: str, url: str = "") -> List[ExtractionResult]: + """Detect data breaches and credential dumps""" + results = [] + + # Detect breach announcements + results.extend(self._detect_breach_announcements(text, url)) + + # Detect credential dumps + results.extend(self._detect_credential_dumps(text, url)) + + # Detect database leaks + results.extend(self._detect_database_leaks(text, url)) + + # Detect combo lists + results.extend(self._detect_combo_lists(text, url)) + + # Estimate breach size + results.extend(self._estimate_breach_size(text, url)) + + self.results.extend(results) + return results + + def _detect_breach_announcements(self, text: str, url: str) -> List[ExtractionResult]: + """Detect breach announcements and posts""" + results = [] + + breach_keywords = [ + r'(?i)\b(?:database|db|data)\s+(?:breach|leak|dump|hacked|compromised)\b', + r'(?i)\b(?:breach|leak)\s+(?:of|from)\s+([A-Z][a-zA-Z0-9\s]{2,30})\b', + r'(?i)\bhacked\s+database\b', + r'(?i)\bdata\s+dump\b', + r'(?i)\bstolen\s+(?:database|data|credentials)\b' + ] + + for pattern in breach_keywords: + for match in re.finditer(pattern, text): + context = self.get_context(text, match.start(), match.end(), 250) + + # Try to extract company/target name + target = match.group(1) if match.lastindex else None + + results.append(ExtractionResult( + category='data_breach', + confidence=0.85, + risk_level='critical', + data={ + 'type': 'breach_announcement', + 'target': target, + 'indicator': match.group(0) + }, + context=context, + location=url + )) + + # Specific breach format patterns + breach_format_patterns = [ + r'(?i)(?:database|dump):\s*([^\n]{10,100})', + r'(?i)source:\s*([^\n]{5,100})', + r'(?i)leaked\s+from:\s*([^\n]{5,100})' + ] + + for pattern in breach_format_patterns: + for match in re.finditer(pattern, text): + source_info = match.group(1).strip() + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='data_breach', + confidence=0.8, + risk_level='high', + data={ + 'type': 'breach_source', + 'source': source_info + }, + context=context, + location=url + )) + + return results + + def _detect_credential_dumps(self, text: str, url: str) -> List[ExtractionResult]: + """Detect credential dump patterns""" + results = [] + + # Count email:password patterns + email_pass_pattern = r'(?m)^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}:[^\s:]+$' + matches = list(re.finditer(email_pass_pattern, text)) + + if len(matches) >= 5: # If we find multiple credential pairs + # Get first and last match for context + first_match = matches[0] + last_match = matches[-1] + + context = self.get_context( + text, + first_match.start(), + min(last_match.end(), first_match.start() + 500), + 100 + ) + + # Extract sample credentials (first 3) + samples = [matches[i].group(0) for i in range(min(3, len(matches)))] + + results.append(ExtractionResult( + category='data_breach', + confidence=0.95, + risk_level='critical', + data={ + 'type': 'credential_dump', + 'format': 'email:password', + 'estimated_count': len(matches), + 'samples': samples + }, + context=context, + location=url + )) + + # Count username:password patterns + user_pass_pattern = r'(?m)^[a-zA-Z0-9_-]{3,20}:[^\s:]+$' + matches = list(re.finditer(user_pass_pattern, text)) + + if len(matches) >= 10: # Higher threshold for username:password + first_match = matches[0] + context = self.get_context(text, first_match.start(), first_match.start() + 500) + + samples = [matches[i].group(0) for i in range(min(3, len(matches)))] + + results.append(ExtractionResult( + category='data_breach', + confidence=0.85, + risk_level='critical', + data={ + 'type': 'credential_dump', + 'format': 'username:password', + 'estimated_count': len(matches), + 'samples': samples + }, + context=context, + location=url + )) + + return results + + def _detect_database_leaks(self, text: str, url: str) -> List[ExtractionResult]: + """Detect database dump patterns""" + results = [] + + # SQL dump indicators + sql_patterns = [ + r'(?i)(?:INSERT INTO|CREATE TABLE|DROP TABLE)', + r'(?i)(?:mysql|postgresql|mongodb|mssql)\s+dump', + r'(?i)\.sql\s+(?:file|dump|backup)' + ] + + for pattern in sql_patterns: + matches = list(re.finditer(pattern, text)) + if matches: + first_match = matches[0] + context = self.get_context(text, first_match.start(), first_match.end(), 200) + + results.append(ExtractionResult( + category='data_breach', + confidence=0.9, + risk_level='high', + data={ + 'type': 'database_dump', + 'dump_type': 'SQL', + 'indicator_count': len(matches) + }, + context=context, + location=url + )) + break # Only report once per type + + # JSON database dumps + if text.count('"password"') >= 5 or text.count('"email"') >= 5: + # Look for JSON array of user objects + json_user_pattern = r'\{\s*"(?:email|username|user)"[^}]{10,200}"password"[^}]{5,100}\}' + matches = list(re.finditer(json_user_pattern, text, re.IGNORECASE)) + + if len(matches) >= 3: + first_match = matches[0] + context = self.get_context(text, first_match.start(), first_match.end(), 150) + + results.append(ExtractionResult( + category='data_breach', + confidence=0.85, + risk_level='high', + data={ + 'type': 'database_dump', + 'dump_type': 'JSON', + 'record_count': len(matches) + }, + context=context, + location=url + )) + + return results + + def _detect_combo_lists(self, text: str, url: str) -> List[ExtractionResult]: + """Detect combo lists (credential lists from multiple sources)""" + results = [] + + combo_keywords = [ + r'(?i)\bcombo\s+list\b', + r'(?i)\bcombos\b.*\b(?:million|thousand|k)\b', + r'(?i)\bmixed\s+(?:credentials|combos)\b', + r'(?i)\b(?:private|fresh)\s+combos\b' + ] + + for pattern in combo_keywords: + for match in re.finditer(pattern, text): + context = self.get_context(text, match.start(), match.end(), 250) + + # Try to extract size + size_match = re.search(r'(\d+(?:\.\d+)?)\s*(?:million|mil|m|thousand|k)', context, re.IGNORECASE) + estimated_size = None + if size_match: + num = float(size_match.group(1)) + unit = size_match.group(2).lower() + if 'm' in unit: + estimated_size = int(num * 1000000) + elif 'k' in unit: + estimated_size = int(num * 1000) + + results.append(ExtractionResult( + category='data_breach', + confidence=0.9, + risk_level='critical', + data={ + 'type': 'combo_list', + 'estimated_size': estimated_size + }, + context=context, + location=url + )) + + return results + + def _estimate_breach_size(self, text: str, url: str) -> List[ExtractionResult]: + """Estimate the size of data breaches mentioned""" + results = [] + + size_patterns = [ + r'(?i)(\d+(?:\.\d+)?)\s*(million|billion|thousand|mil|m|k|gb|mb)\s+(?:records?|users?|accounts?|credentials?|passwords?|emails?)', + r'(?i)(?:contains|includes|total)[:\s]+(\d+(?:\.\d+)?)\s*(million|billion|thousand|mil|m|k)\s+(?:records?|entries?)', + ] + + for pattern in size_patterns: + for match in re.finditer(pattern, text): + number = float(match.group(1)) + unit = match.group(2).lower() + context = self.get_context(text, match.start(), match.end()) + + # Convert to actual count + multiplier = 1 + if 'billion' in unit or 'b' == unit: + multiplier = 1000000000 + elif 'million' in unit or 'm' in unit: + multiplier = 1000000 + elif 'thousand' in unit or 'k' in unit: + multiplier = 1000 + + estimated_count = int(number * multiplier) + + # Determine risk level based on size + risk_level = 'medium' + if estimated_count >= 1000000: + risk_level = 'critical' + elif estimated_count >= 100000: + risk_level = 'high' + + results.append(ExtractionResult( + category='data_breach', + confidence=0.85, + risk_level=risk_level, + data={ + 'type': 'breach_size', + 'estimated_records': estimated_count, + 'original_value': f"{number} {unit}" + }, + context=context, + location=url + )) + + return results + diff --git a/src/torbot/modules/deep_extract/communication_extractor.py b/src/torbot/modules/deep_extract/communication_extractor.py new file mode 100644 index 00000000..ff2444eb --- /dev/null +++ b/src/torbot/modules/deep_extract/communication_extractor.py @@ -0,0 +1,289 @@ +""" +Communication Methods Extractor +""" + +import re +from typing import List +from .base import BaseExtractor, ExtractionResult, RegexPatterns + + +class CommunicationExtractor(BaseExtractor): + """Extract communication methods and contact information""" + + def extract(self, text: str, url: str = "") -> List[ExtractionResult]: + """Extract various communication methods""" + results = [] + + # Extract PGP keys + results.extend(self._extract_pgp_keys(text, url)) + + # Extract messaging app IDs + results.extend(self._extract_messaging_ids(text, url)) + + # Extract email addresses (for communication) + results.extend(self._extract_contact_emails(text, url)) + + # Extract IRC channels + results.extend(self._extract_irc_channels(text, url)) + + self.results.extend(results) + return results + + def _extract_pgp_keys(self, text: str, url: str) -> List[ExtractionResult]: + """Extract PGP public keys and fingerprints""" + results = [] + + # PGP key blocks + pgp_block_pattern = r'-----BEGIN PGP (?:PUBLIC|PRIVATE) KEY BLOCK-----(.*?)-----END PGP (?:PUBLIC|PRIVATE) KEY BLOCK-----' + for match in re.finditer(pgp_block_pattern, text, re.DOTALL): + key_content = match.group(1).strip() + context = self.get_context(text, match.start(), match.end(), 100) + + key_type = 'PUBLIC' if 'PUBLIC KEY' in match.group(0) else 'PRIVATE' + + results.append(ExtractionResult( + category='communication', + confidence=0.95, + risk_level='high' if key_type == 'PRIVATE' else 'medium', + data={ + 'type': 'pgp_key_block', + 'key_type': key_type, + 'key_preview': key_content[:100] + '...' if len(key_content) > 100 else key_content + }, + context=context, + location=url + )) + + # PGP fingerprints + for match in re.finditer(RegexPatterns.PGP_FINGERPRINT, text): + fingerprint = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + # Check if context suggests this is a PGP fingerprint + pgp_keywords = ['pgp', 'fingerprint', 'key', 'gpg', 'encryption'] + if any(keyword in context.lower() for keyword in pgp_keywords): + results.append(ExtractionResult( + category='communication', + confidence=0.85, + risk_level='medium', + data={ + 'type': 'pgp_fingerprint', + 'fingerprint': fingerprint + }, + context=context, + location=url + )) + + return results + + def _extract_messaging_ids(self, text: str, url: str) -> List[ExtractionResult]: + """Extract messaging app identifiers""" + results = [] + + # Telegram + for match in re.finditer(RegexPatterns.TELEGRAM, text): + telegram_id = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + # Clean up the ID + clean_id = telegram_id.replace('@', '').replace('t.me/', '') + + results.append(ExtractionResult( + category='communication', + confidence=0.9, + risk_level='medium', + data={ + 'type': 'telegram', + 'username': clean_id, + 'full_handle': telegram_id + }, + context=context, + location=url + )) + + # Wickr + for match in re.finditer(RegexPatterns.WICKR, text): + wickr_id = match.group(1) if match.lastindex else match.group(0) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='communication', + confidence=0.85, + risk_level='medium', + data={ + 'type': 'wickr', + 'username': wickr_id + }, + context=context, + location=url + )) + + # Signal + signal_pattern = r'(?i)signal[:\s]+([+\d\s()-]{10,20})' + for match in re.finditer(signal_pattern, text): + signal_number = match.group(1).strip() + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='communication', + confidence=0.8, + risk_level='medium', + data={ + 'type': 'signal', + 'phone_number': signal_number + }, + context=context, + location=url + )) + + # Session + session_pattern = r'(?i)session\s+id[:\s]+([a-f0-9]{64,66})' + for match in re.finditer(session_pattern, text): + session_id = match.group(1) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='communication', + confidence=0.85, + risk_level='medium', + data={ + 'type': 'session', + 'session_id': session_id + }, + context=context, + location=url + )) + + # Jabber/XMPP + jabber_pattern = r'\b([a-zA-Z0-9._%+-]+@(?:[a-zA-Z0-9-]+\.)*xmpp\.[a-zA-Z]{2,}|[a-zA-Z0-9._%+-]+@jabber\.[a-zA-Z0-9.-]+)\b' + for match in re.finditer(jabber_pattern, text): + jabber_id = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='communication', + confidence=0.85, + risk_level='medium', + data={ + 'type': 'jabber', + 'jabber_id': jabber_id + }, + context=context, + location=url + )) + + # Discord + discord_pattern = r'(?i)discord[:\s]+([a-zA-Z0-9_]{2,32}#\d{4})' + for match in re.finditer(discord_pattern, text): + discord_id = match.group(1) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='communication', + confidence=0.9, + risk_level='low', + data={ + 'type': 'discord', + 'username': discord_id + }, + context=context, + location=url + )) + + # Matrix + matrix_pattern = r'@[a-zA-Z0-9._=-]+:[a-zA-Z0-9.-]+' + for match in re.finditer(matrix_pattern, text): + matrix_id = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + # Check if context suggests Matrix + if 'matrix' in context.lower(): + results.append(ExtractionResult( + category='communication', + confidence=0.8, + risk_level='medium', + data={ + 'type': 'matrix', + 'user_id': matrix_id + }, + context=context, + location=url + )) + + return results + + def _extract_contact_emails(self, text: str, url: str) -> List[ExtractionResult]: + """Extract email addresses used for contact""" + results = [] + + contact_patterns = [ + r'(?i)contact[:\s]+(' + RegexPatterns.EMAIL + r')', + r'(?i)email[:\s]+(' + RegexPatterns.EMAIL + r')', + r'(?i)reach\s+(?:me|us)\s+at[:\s]+(' + RegexPatterns.EMAIL + r')' + ] + + for pattern in contact_patterns: + for match in re.finditer(pattern, text): + email = match.group(1) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='communication', + confidence=0.9, + risk_level='medium', + data={ + 'type': 'contact_email', + 'email': email, + 'purpose': 'contact' + }, + context=context, + location=url + )) + + return results + + def _extract_irc_channels(self, text: str, url: str) -> List[ExtractionResult]: + """Extract IRC channels and servers""" + results = [] + + # IRC channel format + irc_channel_pattern = r'#[a-zA-Z0-9_-]{2,50}' + for match in re.finditer(irc_channel_pattern, text): + channel = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + # Check if context suggests IRC + irc_keywords = ['irc', 'channel', 'chat', 'server'] + if any(keyword in context.lower() for keyword in irc_keywords): + results.append(ExtractionResult( + category='communication', + confidence=0.75, + risk_level='low', + data={ + 'type': 'irc_channel', + 'channel': channel + }, + context=context, + location=url + )) + + # IRC server format + irc_server_pattern = r'(?i)irc[:\s]+([a-zA-Z0-9.-]+(?::\d+)?)' + for match in re.finditer(irc_server_pattern, text): + server = match.group(1) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='communication', + confidence=0.8, + risk_level='low', + data={ + 'type': 'irc_server', + 'server': server + }, + context=context, + location=url + )) + + return results + diff --git a/src/torbot/modules/deep_extract/credentials_extractor.py b/src/torbot/modules/deep_extract/credentials_extractor.py new file mode 100644 index 00000000..192bb654 --- /dev/null +++ b/src/torbot/modules/deep_extract/credentials_extractor.py @@ -0,0 +1,302 @@ +""" +Credential and Authentication Data Extractor +""" + +import re +from typing import List +from .base import BaseExtractor, ExtractionResult, RegexPatterns + + +class CredentialsExtractor(BaseExtractor): + """Extract credentials and authentication data from content""" + + def extract(self, text: str, url: str = "") -> List[ExtractionResult]: + """Extract various types of credentials""" + results = [] + + # Extract username:password pairs + results.extend(self._extract_username_password_pairs(text, url)) + + # Extract API keys + results.extend(self._extract_api_keys(text, url)) + + # Extract JWT tokens + results.extend(self._extract_jwt_tokens(text, url)) + + # Extract password hashes + results.extend(self._extract_password_hashes(text, url)) + + # Extract session tokens + results.extend(self._extract_session_tokens(text, url)) + + self.results.extend(results) + return results + + def _extract_username_password_pairs(self, text: str, url: str) -> List[ExtractionResult]: + """Extract username:password combinations""" + results = [] + + # Pattern 1: username:password format + pattern1 = r'(?m)^([a-zA-Z0-9._%+-]+):([^\s:]+)$' + for match in re.finditer(pattern1, text): + username, password = match.groups() + + # Skip if it looks like a URL or ratio + if '/' in username or '/' in password: + continue + + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='credentials', + confidence=0.85, + risk_level='critical', + data={ + 'type': 'username_password', + 'username': username, + 'password': password, + 'format': 'username:password' + }, + context=context, + location=url + )) + + # Pattern 2: email:password format + pattern2 = r'(' + RegexPatterns.EMAIL + r'):([^\s:]+)' + for match in re.finditer(pattern2, text): + email = match.group(1) + password = match.group(2) + + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='credentials', + confidence=0.9, + risk_level='critical', + data={ + 'type': 'email_password', + 'email': email, + 'password': password, + 'format': 'email:password' + }, + context=context, + location=url + )) + + # Pattern 3: Labeled credentials + pattern3 = r'(?i)(?:username|user|login|email)[\s:=]+([^\s:]+)[\s\n\r]{0,10}(?:password|pass|pwd)[\s:=]+([^\s\n\r]+)' + for match in re.finditer(pattern3, text): + username = match.group(1).strip() + password = match.group(2).strip() + + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='credentials', + confidence=0.8, + risk_level='critical', + data={ + 'type': 'labeled_credentials', + 'username': username, + 'password': password, + 'format': 'labeled' + }, + context=context, + location=url + )) + + return results + + def _extract_api_keys(self, text: str, url: str) -> List[ExtractionResult]: + """Extract API keys and tokens""" + results = [] + + # AWS Access Keys + for match in re.finditer(RegexPatterns.API_KEY_AWS, text): + key = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='credentials', + confidence=0.95, + risk_level='critical', + data={ + 'type': 'aws_access_key', + 'key': key, + 'provider': 'AWS' + }, + context=context, + location=url + )) + + # GitHub tokens + github_pattern = r'\bgh[pousr]_[A-Za-z0-9_]{36,}\b' + for match in re.finditer(github_pattern, text): + key = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='credentials', + confidence=0.95, + risk_level='critical', + data={ + 'type': 'github_token', + 'key': key, + 'provider': 'GitHub' + }, + context=context, + location=url + )) + + # Slack tokens + slack_pattern = r'\bxox[baprs]-[0-9]{10,13}-[0-9]{10,13}-[a-zA-Z0-9]{24,}\b' + for match in re.finditer(slack_pattern, text): + key = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='credentials', + confidence=0.95, + risk_level='critical', + data={ + 'type': 'slack_token', + 'key': key, + 'provider': 'Slack' + }, + context=context, + location=url + )) + + # Generic API keys (look for common keywords) + api_key_patterns = [ + r'(?i)api[_-]?key[\s:=]+([a-zA-Z0-9_-]{20,})', + r'(?i)apikey[\s:=]+([a-zA-Z0-9_-]{20,})', + r'(?i)access[_-]?token[\s:=]+([a-zA-Z0-9_-]{20,})', + ] + + for pattern in api_key_patterns: + for match in re.finditer(pattern, text): + key = match.group(1) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='credentials', + confidence=0.7, + risk_level='high', + data={ + 'type': 'generic_api_key', + 'key': key, + 'provider': 'Unknown' + }, + context=context, + location=url + )) + + return results + + def _extract_jwt_tokens(self, text: str, url: str) -> List[ExtractionResult]: + """Extract JWT tokens""" + results = [] + + for match in re.finditer(RegexPatterns.JWT_TOKEN, text): + token = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='credentials', + confidence=0.9, + risk_level='high', + data={ + 'type': 'jwt_token', + 'token': token[:50] + '...' if len(token) > 50 else token + }, + context=context, + location=url + )) + + return results + + def _extract_password_hashes(self, text: str, url: str) -> List[ExtractionResult]: + """Extract password hashes""" + results = [] + + # Look for hashes in common formats + hash_patterns = [ + (RegexPatterns.MD5, 'MD5', 0.6), + (RegexPatterns.SHA1, 'SHA1', 0.65), + (RegexPatterns.SHA256, 'SHA256', 0.7), + ] + + for pattern, hash_type, confidence in hash_patterns: + # Look for hashes with password-related context + for match in re.finditer(pattern, text): + hash_value = match.group(0) + context = self.get_context(text, match.start(), match.end(), 200) + + # Check if context suggests this is a password hash + password_keywords = ['password', 'passwd', 'pwd', 'hash', 'credential'] + if any(keyword in context.lower() for keyword in password_keywords): + results.append(ExtractionResult( + category='credentials', + confidence=confidence + 0.2, + risk_level='high', + data={ + 'type': 'password_hash', + 'hash_type': hash_type, + 'hash': hash_value + }, + context=context, + location=url + )) + + # Bcrypt hashes + bcrypt_pattern = r'\$2[ayb]\$[0-9]{2}\$[A-Za-z0-9./]{53}' + for match in re.finditer(bcrypt_pattern, text): + hash_value = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='credentials', + confidence=0.95, + risk_level='high', + data={ + 'type': 'password_hash', + 'hash_type': 'bcrypt', + 'hash': hash_value + }, + context=context, + location=url + )) + + return results + + def _extract_session_tokens(self, text: str, url: str) -> List[ExtractionResult]: + """Extract session identifiers and tokens""" + results = [] + + session_patterns = [ + r'(?i)session[_-]?id[\s:=]+([a-zA-Z0-9_-]{20,})', + r'(?i)phpsessid=([a-zA-Z0-9]{26,})', + r'(?i)jsessionid=([a-zA-Z0-9]{32,})', + r'(?i)asp\.net_sessionid=([a-zA-Z0-9]{24,})', + ] + + for pattern in session_patterns: + for match in re.finditer(pattern, text): + session_id = match.group(1) if match.lastindex else match.group(0) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='credentials', + confidence=0.75, + risk_level='medium', + data={ + 'type': 'session_token', + 'session_id': session_id + }, + context=context, + location=url + )) + + return results + diff --git a/src/torbot/modules/deep_extract/crypto_extractor.py b/src/torbot/modules/deep_extract/crypto_extractor.py new file mode 100644 index 00000000..1e6bbcd2 --- /dev/null +++ b/src/torbot/modules/deep_extract/crypto_extractor.py @@ -0,0 +1,228 @@ +""" +Cryptocurrency Address Extractor and Tracker +""" + +import re +from typing import List +from .base import BaseExtractor, ExtractionResult, RegexPatterns + + +class CryptoExtractor(BaseExtractor): + """Extract cryptocurrency addresses and related information""" + + def extract(self, text: str, url: str = "") -> List[ExtractionResult]: + """Extract various cryptocurrency addresses""" + results = [] + + # Extract Bitcoin addresses + results.extend(self._extract_bitcoin(text, url)) + + # Extract Ethereum addresses + results.extend(self._extract_ethereum(text, url)) + + # Extract Monero addresses + results.extend(self._extract_monero(text, url)) + + # Extract Litecoin addresses + results.extend(self._extract_litecoin(text, url)) + + # Extract other cryptocurrency mentions + results.extend(self._extract_crypto_keywords(text, url)) + + self.results.extend(results) + return results + + def _extract_bitcoin(self, text: str, url: str) -> List[ExtractionResult]: + """Extract Bitcoin addresses""" + results = [] + + for match in re.finditer(RegexPatterns.BITCOIN, text): + address = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + # Validate Bitcoin address format more strictly + if self._validate_bitcoin_address(address): + results.append(ExtractionResult( + category='cryptocurrency', + confidence=0.9, + risk_level='high', + data={ + 'type': 'bitcoin', + 'address': address, + 'currency': 'BTC', + 'address_type': self._get_bitcoin_type(address) + }, + context=context, + location=url + )) + + return results + + def _validate_bitcoin_address(self, address: str) -> bool: + """Validate Bitcoin address format""" + # Basic validation - starts with 1, 3, or bc1 + if not (address.startswith('1') or address.startswith('3') or address.startswith('bc1')): + return False + + # Length check + if address.startswith('bc1'): # Bech32 + return 42 <= len(address) <= 62 + else: # Base58 + return 26 <= len(address) <= 35 + + return True + + def _get_bitcoin_type(self, address: str) -> str: + """Determine Bitcoin address type""" + if address.startswith('1'): + return 'P2PKH (Legacy)' + elif address.startswith('3'): + return 'P2SH (SegWit)' + elif address.startswith('bc1'): + return 'Bech32 (Native SegWit)' + return 'Unknown' + + def _extract_ethereum(self, text: str, url: str) -> List[ExtractionResult]: + """Extract Ethereum addresses""" + results = [] + + for match in re.finditer(RegexPatterns.ETHEREUM, text): + address = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='cryptocurrency', + confidence=0.9, + risk_level='high', + data={ + 'type': 'ethereum', + 'address': address, + 'currency': 'ETH', + 'checksum_validated': False # Could add EIP-55 validation + }, + context=context, + location=url + )) + + return results + + def _extract_monero(self, text: str, url: str) -> List[ExtractionResult]: + """Extract Monero addresses""" + results = [] + + for match in re.finditer(RegexPatterns.MONERO, text): + address = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='cryptocurrency', + confidence=0.85, + risk_level='high', + data={ + 'type': 'monero', + 'address': address, + 'currency': 'XMR', + 'privacy_coin': True + }, + context=context, + location=url + )) + + return results + + def _extract_litecoin(self, text: str, url: str) -> List[ExtractionResult]: + """Extract Litecoin addresses""" + results = [] + + for match in re.finditer(RegexPatterns.LITECOIN, text): + address = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='cryptocurrency', + confidence=0.8, + risk_level='high', + data={ + 'type': 'litecoin', + 'address': address, + 'currency': 'LTC' + }, + context=context, + location=url + )) + + return results + + def _extract_crypto_keywords(self, text: str, url: str) -> List[ExtractionResult]: + """Extract cryptocurrency-related keywords and contexts""" + results = [] + + # Payment request patterns + payment_patterns = [ + (r'(?i)send\s+(\d+(?:\.\d+)?)\s*(btc|bitcoin|eth|ethereum|xmr|monero)', 0.7), + (r'(?i)price[:\s]+(\d+(?:\.\d+)?)\s*(btc|bitcoin|eth|ethereum|xmr|monero)', 0.75), + (r'(?i)payment[:\s]+(\d+(?:\.\d+)?)\s*(btc|bitcoin|eth|ethereum|xmr|monero)', 0.8), + ] + + for pattern, confidence in payment_patterns: + for match in re.finditer(pattern, text): + amount = match.group(1) + currency = match.group(2) + context = self.get_context(text, match.start(), match.end(), 150) + + results.append(ExtractionResult( + category='cryptocurrency', + confidence=confidence, + risk_level='medium', + data={ + 'type': 'payment_request', + 'amount': amount, + 'currency': currency.upper(), + }, + context=context, + location=url + )) + + # Wallet mentions + wallet_pattern = r'(?i)wallet[:\s]+([a-zA-Z0-9]{20,})' + for match in re.finditer(wallet_pattern, text): + wallet_id = match.group(1) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='cryptocurrency', + confidence=0.6, + risk_level='medium', + data={ + 'type': 'wallet_mention', + 'wallet_id': wallet_id + }, + context=context, + location=url + )) + + # Exchange mentions + exchanges = [ + 'binance', 'coinbase', 'kraken', 'bitstamp', 'bitfinex', + 'huobi', 'okex', 'kucoin', 'gemini', 'bittrex' + ] + + for exchange in exchanges: + pattern = r'\b' + exchange + r'\b' + for match in re.finditer(pattern, text, re.IGNORECASE): + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='cryptocurrency', + confidence=0.5, + risk_level='low', + data={ + 'type': 'exchange_mention', + 'exchange': exchange.title() + }, + context=context, + location=url + )) + + return results + diff --git a/src/torbot/modules/deep_extract/hidden_services_extractor.py b/src/torbot/modules/deep_extract/hidden_services_extractor.py new file mode 100644 index 00000000..2e02533f --- /dev/null +++ b/src/torbot/modules/deep_extract/hidden_services_extractor.py @@ -0,0 +1,206 @@ +""" +Hidden Services Intelligence Extractor +""" + +import re +from typing import List, Dict +from .base import BaseExtractor, ExtractionResult, RegexPatterns + + +class HiddenServicesExtractor(BaseExtractor): + """Extract and classify hidden service (.onion) links""" + + # Service type keywords for classification + SERVICE_KEYWORDS = { + 'marketplace': ['market', 'shop', 'store', 'buy', 'sell', 'vendor', 'product', 'cart', 'price'], + 'forum': ['forum', 'board', 'discussion', 'thread', 'post', 'reply', 'topic', 'community'], + 'hosting': ['host', 'hosting', 'server', 'vps', 'dedicated', 'upload', 'file'], + 'email': ['mail', 'email', 'inbox', 'message', 'webmail'], + 'wiki': ['wiki', 'encyclopedia', 'article', 'knowledge'], + 'blog': ['blog', 'news', 'article', 'post'], + 'paste': ['paste', 'pastebin', 'snippet'], + 'search': ['search', 'index', 'directory', 'engine'], + 'chat': ['chat', 'irc', 'messenger', 'talk'], + 'financial': ['bank', 'bitcoin', 'crypto', 'wallet', 'exchange', 'atm'], + 'social': ['social', 'network', 'profile', 'friend'], + 'darknet': ['darknet', 'deep web', 'anonymous', 'privacy'], + } + + def extract(self, text: str, url: str = "") -> List[ExtractionResult]: + """Extract hidden service links and information""" + results = [] + + # Extract v2 onion links + results.extend(self._extract_onion_links(text, url, RegexPatterns.ONION_V2, 'v2')) + + # Extract v3 onion links + results.extend(self._extract_onion_links(text, url, RegexPatterns.ONION_V3, 'v3')) + + # Extract service descriptions and metadata + results.extend(self._extract_service_metadata(text, url)) + + self.results.extend(results) + return results + + def _extract_onion_links(self, text: str, url: str, pattern: str, version: str) -> List[ExtractionResult]: + """Extract onion links with classification""" + results = [] + + for match in re.finditer(pattern, text): + onion_address = match.group(0) + context = self.get_context(text, match.start(), match.end(), 200) + + # Classify service type based on context + service_type, confidence_adjustment = self._classify_service(context) + + # Calculate trust score (basic heuristic) + trust_score = self._calculate_trust_score(context, onion_address) + + base_confidence = 0.95 if version == 'v3' else 0.9 + + results.append(ExtractionResult( + category='hidden_services', + confidence=base_confidence + confidence_adjustment, + risk_level=self._determine_risk_level(service_type), + data={ + 'type': 'onion_link', + 'address': onion_address, + 'full_url': f'http://{onion_address}', + 'version': version, + 'service_type': service_type, + 'trust_score': trust_score + }, + context=context, + location=url + )) + + return results + + def _classify_service(self, context: str) -> tuple: + """Classify hidden service based on context""" + context_lower = context.lower() + + # Count keyword matches for each category + category_scores = {} + for category, keywords in self.SERVICE_KEYWORDS.items(): + score = sum(1 for keyword in keywords if keyword in context_lower) + if score > 0: + category_scores[category] = score + + if not category_scores: + return 'unknown', 0.0 + + # Get category with highest score + best_category = max(category_scores, key=category_scores.get) + max_score = category_scores[best_category] + + # Confidence adjustment based on match strength + confidence_adjustment = min(0.1 * max_score, 0.3) + + return best_category, confidence_adjustment + + def _calculate_trust_score(self, context: str, address: str) -> float: + """Calculate basic trust score for hidden service""" + score = 0.5 # Base score + + context_lower = context.lower() + + # Positive indicators + positive_keywords = ['verified', 'trusted', 'official', 'secure', 'reputation', 'reviews'] + score += 0.05 * sum(1 for keyword in positive_keywords if keyword in context_lower) + + # Negative indicators + negative_keywords = ['scam', 'fake', 'phishing', 'warning', 'unsafe', 'malware', 'virus'] + score -= 0.1 * sum(1 for keyword in negative_keywords if keyword in context_lower) + + # Clamp score between 0 and 1 + return max(0.0, min(1.0, score)) + + def _determine_risk_level(self, service_type: str) -> str: + """Determine risk level based on service type""" + high_risk_types = ['marketplace', 'financial', 'darknet'] + medium_risk_types = ['forum', 'chat', 'paste', 'hosting'] + + if service_type in high_risk_types: + return 'high' + elif service_type in medium_risk_types: + return 'medium' + else: + return 'low' + + def _extract_service_metadata(self, text: str, url: str) -> List[ExtractionResult]: + """Extract hidden service metadata and descriptions""" + results = [] + + # Extract service titles + title_patterns = [ + r'([^<]{5,100})', + r'(?i)service name[:\s]+([^\n]{5,50})', + r'(?i)site name[:\s]+([^\n]{5,50})' + ] + + for pattern in title_patterns: + for match in re.finditer(pattern, text): + title = match.group(1).strip() + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='hidden_services', + confidence=0.8, + risk_level='low', + data={ + 'type': 'service_title', + 'title': title + }, + context=context, + location=url + )) + + # Extract service descriptions + desc_patterns = [ + r' List[ExtractionResult]: + """Perform linguistic analysis""" + results = [] + + # Detect dark web slang + results.extend(self._detect_slang(text, url)) + + # Analyze technical sophistication + results.extend(self._analyze_technical_sophistication(text, url)) + + # Detect language patterns + results.extend(self._detect_language_patterns(text, url)) + + # Analyze communication style + results.extend(self._analyze_communication_style(text, url)) + + self.results.extend(results) + return results + + def _detect_slang(self, text: str, url: str) -> List[ExtractionResult]: + """Detect dark web slang and terminology""" + results = [] + text_lower = text.lower() + + found_terms = [] + for term, meaning in self.DARKWEB_SLANG.items(): + # Look for whole word matches + pattern = r'\b' + re.escape(term) + r'\b' + if re.search(pattern, text_lower): + found_terms.append({ + 'term': term, + 'meaning': meaning + }) + + if found_terms: + # Calculate confidence based on number of terms + confidence = min(0.5 + (len(found_terms) * 0.05), 0.95) + + # Get context for first term + first_term = found_terms[0]['term'] + match = re.search(r'\b' + re.escape(first_term) + r'\b', text_lower) + if match: + context = self.get_context(text, match.start(), match.end(), 150) + else: + context = text[:200] + + results.append(ExtractionResult( + category='linguistic_analysis', + confidence=confidence, + risk_level='medium', + data={ + 'type': 'darkweb_slang', + 'terms_found': found_terms, + 'term_count': len(found_terms) + }, + context=context, + location=url + )) + + return results + + def _analyze_technical_sophistication(self, text: str, url: str) -> List[ExtractionResult]: + """Analyze technical sophistication of content""" + results = [] + + # Technical indicators + technical_terms = { + 'high': [ + 'zero-day', '0day', 'exploit', 'vulnerability', 'payload', + 'shellcode', 'buffer overflow', 'sql injection', 'xss', + 'cryptography', 'encryption', 'obfuscation', 'polymorphic', + 'reverse engineering', 'assembly', 'kernel', 'rootkit' + ], + 'medium': [ + 'malware', 'trojan', 'virus', 'phishing', 'social engineering', + 'brute force', 'dictionary attack', 'ddos', 'botnet', + 'proxy', 'vpn', 'anonymous', 'tor', 'bitcoin' + ], + 'low': [ + 'hack', 'password', 'login', 'account', 'username', + 'free', 'download', 'easy', 'simple', 'tutorial' + ] + } + + text_lower = text.lower() + + scores = {'high': 0, 'medium': 0, 'low': 0} + found_terms = {'high': [], 'medium': [], 'low': []} + + for level, terms in technical_terms.items(): + for term in terms: + pattern = r'\b' + re.escape(term) + r'\b' + count = len(re.findall(pattern, text_lower)) + if count > 0: + scores[level] += count + found_terms[level].append(term) + + # Calculate sophistication level + total_score = scores['high'] * 3 + scores['medium'] * 2 + scores['low'] + + if total_score > 0: + sophistication_level = 'low' + if scores['high'] >= 3: + sophistication_level = 'high' + elif scores['high'] >= 1 or scores['medium'] >= 5: + sophistication_level = 'medium' + + results.append(ExtractionResult( + category='linguistic_analysis', + confidence=0.75, + risk_level='medium', + data={ + 'type': 'technical_sophistication', + 'level': sophistication_level, + 'high_terms': found_terms['high'][:5], + 'medium_terms': found_terms['medium'][:5], + 'score_breakdown': scores + }, + context=text[:300], + location=url + )) + + return results + + def _detect_language_patterns(self, text: str, url: str) -> List[ExtractionResult]: + """Detect language patterns that might indicate geographic origin""" + results = [] + + # British vs American English indicators + british_spellings = ['colour', 'honour', 'favourite', 'centre', 'defence', 'organisation'] + american_spellings = ['color', 'honor', 'favorite', 'center', 'defense', 'organization'] + + british_count = sum(1 for word in british_spellings if word in text.lower()) + american_count = sum(1 for word in american_spellings if word in text.lower()) + + if british_count > 0 or american_count > 0: + variant = 'British English' if british_count > american_count else 'American English' + confidence = 0.6 + (abs(british_count - american_count) * 0.1) + confidence = min(confidence, 0.9) + + results.append(ExtractionResult( + category='linguistic_analysis', + confidence=confidence, + risk_level='low', + data={ + 'type': 'english_variant', + 'variant': variant, + 'indicators': { + 'british': british_count, + 'american': american_count + } + }, + context=text[:200], + location=url + )) + + # Common non-English phrases that appear in English text + foreign_patterns = [ + (r'\b(?:bonjour|merci|oui|non)\b', 'French'), + (r'\b(?:hola|gracias|por favor|sí|no)\b', 'Spanish'), + (r'\b(?:hallo|danke|bitte|ja|nein)\b', 'German'), + (r'\b(?:привет|спасибо|да|нет)\b', 'Russian'), + (r'\b(?:你好|谢谢)\b', 'Chinese'), + ] + + for pattern, language in foreign_patterns: + if re.search(pattern, text, re.IGNORECASE): + match = re.search(pattern, text, re.IGNORECASE) + if match: + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='linguistic_analysis', + confidence=0.7, + risk_level='low', + data={ + 'type': 'foreign_language_indicator', + 'language': language + }, + context=context, + location=url + )) + + return results + + def _analyze_communication_style(self, text: str, url: str) -> List[ExtractionResult]: + """Analyze communication style and professionalism""" + results = [] + + # Count various style indicators + indicators = { + 'professional': 0, + 'casual': 0, + 'aggressive': 0 + } + + # Professional indicators + professional_terms = [ + 'please', 'thank you', 'regards', 'sincerely', 'professional', + 'service', 'quality', 'guarantee', 'support', 'customer' + ] + + # Casual indicators + casual_terms = [ + 'lol', 'btw', 'imo', 'tbh', 'af', 'gonna', 'wanna', + 'yeah', 'nah', 'dude', 'bro', 'guys' + ] + + # Aggressive indicators + aggressive_terms = [ + 'fuck', 'shit', 'damn', 'idiot', 'stupid', 'scam', + 'warning', 'threat', 'revenge', 'attack', 'destroy' + ] + + text_lower = text.lower() + + indicators['professional'] = sum(1 for term in professional_terms if term in text_lower) + indicators['casual'] = sum(1 for term in casual_terms if term in text_lower) + indicators['aggressive'] = sum(1 for term in aggressive_terms if term in text_lower) + + if sum(indicators.values()) >= 2: + # Determine dominant style + dominant_style = max(indicators, key=indicators.get) + + # Calculate confidence + total = sum(indicators.values()) + confidence = 0.5 + (indicators[dominant_style] / total * 0.4) + + results.append(ExtractionResult( + category='linguistic_analysis', + confidence=confidence, + risk_level='low', + data={ + 'type': 'communication_style', + 'style': dominant_style, + 'indicators': indicators + }, + context=text[:250], + location=url + )) + + return results + diff --git a/src/torbot/modules/deep_extract/marketplace_extractor.py b/src/torbot/modules/deep_extract/marketplace_extractor.py new file mode 100644 index 00000000..9f10d9f6 --- /dev/null +++ b/src/torbot/modules/deep_extract/marketplace_extractor.py @@ -0,0 +1,337 @@ +""" +Dark Web Marketplace Intelligence Extractor +""" + +import re +from typing import List +from .base import BaseExtractor, ExtractionResult + + +class MarketplaceExtractor(BaseExtractor): + """Extract marketplace intelligence including products, vendors, and pricing""" + + def extract(self, text: str, url: str = "") -> List[ExtractionResult]: + """Extract marketplace-related intelligence""" + results = [] + + # Extract product listings + results.extend(self._extract_product_listings(text, url)) + + # Extract vendor information + results.extend(self._extract_vendor_info(text, url)) + + # Extract pricing information + results.extend(self._extract_pricing(text, url)) + + # Extract shipping information + results.extend(self._extract_shipping_info(text, url)) + + # Extract escrow mentions + results.extend(self._extract_escrow_info(text, url)) + + self.results.extend(results) + return results + + def _extract_product_listings(self, text: str, url: str) -> List[ExtractionResult]: + """Extract product/service listings""" + results = [] + + # Product patterns + product_patterns = [ + r'(?i)(?:product|item|listing)[:\s]+([^\n]{5,100})', + r'(?i)(?:selling|offering|available)[:\s]+([^\n]{5,100})', + ] + + for pattern in product_patterns: + for match in re.finditer(pattern, text): + product_name = match.group(1).strip() + context = self.get_context(text, match.start(), match.end(), 200) + + results.append(ExtractionResult( + category='marketplace', + confidence=0.7, + risk_level='high', + data={ + 'type': 'product_listing', + 'product_name': product_name + }, + context=context, + location=url + )) + + # Category mentions + categories = [ + 'drugs', 'weapons', 'counterfeit', 'fraud', 'hacking', + 'malware', 'exploits', 'data', 'credentials', 'accounts', + 'documents', 'services', 'digital goods' + ] + + for category in categories: + pattern = r'\b' + re.escape(category) + r'\b' + for match in re.finditer(pattern, text, re.IGNORECASE): + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='marketplace', + confidence=0.65, + risk_level='high', + data={ + 'type': 'product_category', + 'category': category.title() + }, + context=context, + location=url + )) + + return results + + def _extract_vendor_info(self, text: str, url: str) -> List[ExtractionResult]: + """Extract vendor/seller information""" + results = [] + + # Vendor name patterns + vendor_patterns = [ + r'(?i)vendor[:\s]+([a-zA-Z0-9_-]{3,20})', + r'(?i)seller[:\s]+([a-zA-Z0-9_-]{3,20})', + r'(?i)(?:sold|shipped)\s+by[:\s]+([a-zA-Z0-9_-]{3,20})' + ] + + for pattern in vendor_patterns: + for match in re.finditer(pattern, text): + vendor_name = match.group(1).strip() + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='marketplace', + confidence=0.8, + risk_level='medium', + data={ + 'type': 'vendor_name', + 'vendor': vendor_name + }, + context=context, + location=url + )) + + # Vendor reputation/rating + rating_patterns = [ + r'(?i)(?:rating|reputation|score)[:\s]+(\d+(?:\.\d+)?)\s*(?:/\s*(\d+))?', + r'(?i)(\d+)\s*(?:stars?|⭐)', + r'(?i)(\d+)%\s*positive' + ] + + for pattern in rating_patterns: + for match in re.finditer(pattern, text): + rating = match.group(1) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='marketplace', + confidence=0.75, + risk_level='low', + data={ + 'type': 'vendor_rating', + 'rating': rating + }, + context=context, + location=url + )) + + # Sales/transaction count + sales_patterns = [ + r'(?i)(\d+)\s+(?:sales?|transactions?|orders?)', + r'(?i)sold[:\s]+(\d+)\s+times?' + ] + + for pattern in sales_patterns: + for match in re.finditer(pattern, text): + count = match.group(1) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='marketplace', + confidence=0.7, + risk_level='low', + data={ + 'type': 'sales_count', + 'count': int(count) + }, + context=context, + location=url + )) + + return results + + def _extract_pricing(self, text: str, url: str) -> List[ExtractionResult]: + """Extract pricing information""" + results = [] + + # Cryptocurrency pricing + crypto_price_patterns = [ + r'(?i)price[:\s]+(\d+(?:\.\d+)?)\s*(btc|bitcoin|eth|ethereum|xmr|monero)', + r'(?i)(\d+(?:\.\d+)?)\s*(btc|bitcoin|eth|ethereum|xmr|monero)', + r'(?i)\$(\d+(?:\.\d+)?)\s*(?:usd)?' + ] + + for pattern in crypto_price_patterns: + for match in re.finditer(pattern, text): + amount = match.group(1) + currency = match.group(2) if match.lastindex >= 2 else 'USD' + context = self.get_context(text, match.start(), match.end()) + + # Check if context suggests this is a price + price_keywords = ['price', 'cost', 'pay', 'payment', 'buy', 'purchase'] + if any(keyword in context.lower() for keyword in price_keywords): + results.append(ExtractionResult( + category='marketplace', + confidence=0.8, + risk_level='medium', + data={ + 'type': 'pricing', + 'amount': float(amount), + 'currency': currency.upper() + }, + context=context, + location=url + )) + + # Quantity-based pricing + quantity_pattern = r'(?i)(\d+)\s*(?:x|pcs?|pieces?|units?)[:\s]+.*?(\d+(?:\.\d+)?)\s*(btc|eth|usd|\$)' + for match in re.finditer(quantity_pattern, text): + quantity = match.group(1) + price = match.group(2) + currency = match.group(3) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='marketplace', + confidence=0.75, + risk_level='medium', + data={ + 'type': 'quantity_pricing', + 'quantity': int(quantity), + 'price': float(price), + 'currency': currency.upper() + }, + context=context, + location=url + )) + + return results + + def _extract_shipping_info(self, text: str, url: str) -> List[ExtractionResult]: + """Extract shipping and delivery information""" + results = [] + + # Shipping locations + shipping_patterns = [ + r'(?i)(?:ships?|shipping|delivery)\s+(?:from|to)[:\s]+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', + r'(?i)(?:worldwide|international|domestic)\s+shipping', + r'(?i)ships?\s+to[:\s]+([A-Z]{2,3}(?:\s*,\s*[A-Z]{2,3})*)' + ] + + for pattern in shipping_patterns: + for match in re.finditer(pattern, text): + location = match.group(1) if match.lastindex else match.group(0) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='marketplace', + confidence=0.7, + risk_level='medium', + data={ + 'type': 'shipping_location', + 'location': location + }, + context=context, + location=url + )) + + # Delivery time + delivery_pattern = r'(?i)(?:delivery|shipping)\s+(?:time|period)[:\s]+(\d+[-\s]?\d*)\s*(days?|weeks?|hours?)' + for match in re.finditer(delivery_pattern, text): + time_value = match.group(1) + time_unit = match.group(2) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='marketplace', + confidence=0.75, + risk_level='low', + data={ + 'type': 'delivery_time', + 'time_value': time_value, + 'time_unit': time_unit + }, + context=context, + location=url + )) + + # Stealth/discreet shipping mentions + stealth_pattern = r'(?i)\b(stealth|discreet|hidden|vacuum[- ]sealed)\s+(?:shipping|packaging|delivery)\b' + for match in re.finditer(stealth_pattern, text): + method = match.group(1) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='marketplace', + confidence=0.85, + risk_level='high', + data={ + 'type': 'shipping_method', + 'method': method.lower(), + 'stealth': True + }, + context=context, + location=url + )) + + return results + + def _extract_escrow_info(self, text: str, url: str) -> List[ExtractionResult]: + """Extract escrow and payment protection information""" + results = [] + + escrow_patterns = [ + r'(?i)\b(escrow|multisig|2-of-3|multi-signature)\b', + r'(?i)(?:buyer|purchase)\s+protection', + r'(?i)(?:dispute|refund)\s+(?:process|policy|available)' + ] + + for pattern in escrow_patterns: + for match in re.finditer(pattern, text): + escrow_type = match.group(1) if match.lastindex else 'protection' + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='marketplace', + confidence=0.7, + risk_level='low', + data={ + 'type': 'escrow_method', + 'method': escrow_type.lower() + }, + context=context, + location=url + )) + + # FE (Finalize Early) mentions - often a red flag + fe_pattern = r'(?i)\b(FE|finalize\s+early)\b' + for match in re.finditer(fe_pattern, text): + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='marketplace', + confidence=0.9, + risk_level='high', + data={ + 'type': 'payment_risk', + 'risk_indicator': 'finalize_early', + 'warning': 'FE increases risk of fraud' + }, + context=context, + location=url + )) + + return results + diff --git a/src/torbot/modules/deep_extract/orchestrator.py b/src/torbot/modules/deep_extract/orchestrator.py new file mode 100644 index 00000000..808c803f --- /dev/null +++ b/src/torbot/modules/deep_extract/orchestrator.py @@ -0,0 +1,286 @@ +""" +Deep Content Extraction Orchestrator + +This module coordinates all extractors and provides a unified interface +for deep content analysis. +""" + +import json +import logging +from typing import List, Dict, Any +from datetime import datetime + +from .base import ExtractionResult +from .credentials_extractor import CredentialsExtractor +from .pii_extractor import PIIExtractor +from .crypto_extractor import CryptoExtractor +from .hidden_services_extractor import HiddenServicesExtractor +from .threat_indicators_extractor import ThreatIndicatorsExtractor +from .marketplace_extractor import MarketplaceExtractor +from .communication_extractor import CommunicationExtractor +from .breach_detector import BreachDetector +from .linguistic_analyzer import LinguisticAnalyzer + + +class DeepExtractor: + """ + Orchestrates all deep content extraction modules + """ + + def __init__(self): + """Initialize all extractors""" + self.extractors = { + 'credentials': CredentialsExtractor(), + 'pii': PIIExtractor(), + 'cryptocurrency': CryptoExtractor(), + 'hidden_services': HiddenServicesExtractor(), + 'threat_indicators': ThreatIndicatorsExtractor(), + 'marketplace': MarketplaceExtractor(), + 'communication': CommunicationExtractor(), + 'data_breach': BreachDetector(), + 'linguistic_analysis': LinguisticAnalyzer() + } + + self.all_results: List[ExtractionResult] = [] + self.logger = logging.getLogger(__name__) + + def extract_all(self, text: str, url: str = "") -> Dict[str, List[ExtractionResult]]: + """ + Run all extractors on the given text + + Args: + text: The text content to analyze + url: The source URL (optional) + + Returns: + Dictionary mapping extractor names to their results + """ + results_by_category = {} + + self.logger.info(f"Starting deep extraction for URL: {url}") + + for name, extractor in self.extractors.items(): + try: + self.logger.debug(f"Running {name} extractor...") + extractor_results = extractor.extract(text, url) + results_by_category[name] = extractor_results + self.all_results.extend(extractor_results) + + if extractor_results: + self.logger.info( + f"{name}: Found {len(extractor_results)} items" + ) + except Exception as e: + self.logger.error(f"Error in {name} extractor: {str(e)}") + results_by_category[name] = [] + + return results_by_category + + def get_summary(self) -> Dict[str, Any]: + """ + Get a summary of all extraction results + + Returns: + Dictionary containing summary statistics + """ + summary = { + 'total_findings': len(self.all_results), + 'by_category': {}, + 'by_risk_level': { + 'critical': 0, + 'high': 0, + 'medium': 0, + 'low': 0 + }, + 'high_confidence_findings': 0, + 'timestamp': datetime.now().isoformat() + } + + # Count by category + for result in self.all_results: + category = result.category + if category not in summary['by_category']: + summary['by_category'][category] = 0 + summary['by_category'][category] += 1 + + # Count by risk level + risk_level = result.risk_level + if risk_level in summary['by_risk_level']: + summary['by_risk_level'][risk_level] += 1 + + # Count high confidence findings + if result.confidence >= 0.8: + summary['high_confidence_findings'] += 1 + + return summary + + def get_critical_findings(self, min_confidence: float = 0.7) -> List[ExtractionResult]: + """ + Get critical risk findings above a confidence threshold + + Args: + min_confidence: Minimum confidence threshold (0.0 to 1.0) + + Returns: + List of critical findings + """ + return [ + result for result in self.all_results + if result.risk_level == 'critical' and result.confidence >= min_confidence + ] + + def export_to_json(self, filepath: str, include_summary: bool = True) -> None: + """ + Export all results to a JSON file + + Args: + filepath: Path to output JSON file + include_summary: Whether to include summary statistics + """ + try: + export_data = { + 'extraction_results': [ + result.to_dict() for result in self.all_results + ] + } + + if include_summary: + export_data['summary'] = self.get_summary() + + with open(filepath, 'w', encoding='utf-8') as f: + json.dump(export_data, f, indent=2, ensure_ascii=False) + + self.logger.info(f"Exported {len(self.all_results)} findings to {filepath}") + + except Exception as e: + self.logger.error(f"Error exporting to JSON: {str(e)}") + raise + + def export_to_text(self, filepath: str) -> None: + """ + Export results to a human-readable text file + + Args: + filepath: Path to output text file + """ + try: + with open(filepath, 'w', encoding='utf-8') as f: + f.write("="*80 + "\n") + f.write("TORBOT DEEP EXTRACTION INTELLIGENCE REPORT\n") + f.write("="*80 + "\n\n") + + # Write summary + summary = self.get_summary() + f.write("SUMMARY\n") + f.write("-"*80 + "\n") + f.write(f"Total Findings: {summary['total_findings']}\n") + f.write(f"High Confidence Findings: {summary['high_confidence_findings']}\n") + f.write(f"\nFindings by Risk Level:\n") + for risk, count in summary['by_risk_level'].items(): + f.write(f" {risk.upper()}: {count}\n") + f.write(f"\nFindings by Category:\n") + + for category, count in summary['by_category'].items(): + f.write(f" {category}: {count}\n") + f.write("\n\n") + + # Write detailed findings + f.write("DETAILED FINDINGS\n") + f.write("=" * 80 + "\n\n") + + # Group by category + by_category = {} + for result in self.all_results: + if result.category not in by_category: + by_category[result.category] = [] + by_category[result.category].append(result) + + # Write each category + for category, results in sorted(by_category.items()): + f.write(f"\n{category.upper()}\n") + f.write("-" * 80 + "\n") + + for i, result in enumerate(results, 1): + f.write(f"\n[{i}] {result.data.get('type', 'unknown')}\n") + f.write(f" Risk Level: {result.risk_level.upper()}\n") + f.write(f" Confidence: {result.confidence:.2f}\n") + f.write(f" Location: {result.location}\n") + + # Write data + f.write(" Data:\n") + for key, value in result.data.items(): + if key != 'type' and value is not None: + # Truncate long values + str_value = str(value) + if len(str_value) > 100: + str_value = str_value[:100] + "..." + f.write(f" {key}: {str_value}\n") + + # Write context if available + if result.context: + f.write(f" Context: {result.context}\n") + + f.write("\n") + + f.write("\n" + "=" * 80 + "\n") + f.write("END OF REPORT\n") + f.write("=" * 80 + "\n") + + self.logger.info(f"Exported report to {filepath}") + + except Exception as e: + self.logger.error(f"Error exporting to text: {str(e)}") + raise + + def print_summary(self) -> None: + """Print a summary of findings to console""" + summary = self.get_summary() + + print("\n" + "="*60) + print("DEEP EXTRACTION SUMMARY") + print("="*60) + print(f"\nTotal Findings: {summary['total_findings']}") + print(f"High Confidence Findings (>80%): {summary['high_confidence_findings']}") + + print("\nFindings by Risk Level:") + for risk in ['critical', 'high', 'medium', 'low']: + count = summary['by_risk_level'][risk] + if count > 0: + print(f" {risk.upper():12} {count:4d}") + + print("\nFindings by Category:") + for category, count in sorted(summary['by_category'].items()): + print(f" {category:25} {count:4d}") + + print("\n" + "="*60 + "\n") + + def get_results_by_category(self, category: str) -> List[ExtractionResult]: + """ + Get all results for a specific category + + Args: + category: Category name + + Returns: + List of results for that category + """ + return [r for r in self.all_results if r.category == category] + + def get_results_by_risk_level(self, risk_level: str) -> List[ExtractionResult]: + """ + Get all results for a specific risk level + + Args: + risk_level: Risk level (critical, high, medium, low) + + Returns: + List of results for that risk level + """ + return [r for r in self.all_results if r.risk_level == risk_level] + + def clear_results(self) -> None: + """Clear all stored results""" + self.all_results.clear() + for extractor in self.extractors.values(): + extractor.results.clear() + diff --git a/src/torbot/modules/deep_extract/pii_extractor.py b/src/torbot/modules/deep_extract/pii_extractor.py new file mode 100644 index 00000000..2f138994 --- /dev/null +++ b/src/torbot/modules/deep_extract/pii_extractor.py @@ -0,0 +1,281 @@ +""" +Personal Identifiable Information (PII) Extractor +""" + +import re +import phonenumbers +from typing import List +from .base import BaseExtractor, ExtractionResult, RegexPatterns, LuhnValidator + + +class PIIExtractor(BaseExtractor): + """Extract Personal Identifiable Information from content""" + + def extract(self, text: str, url: str = "") -> List[ExtractionResult]: + """Extract various types of PII""" + results = [] + + # Extract email addresses + results.extend(self._extract_emails(text, url)) + + # Extract phone numbers + results.extend(self._extract_phone_numbers(text, url)) + + # Extract SSNs + results.extend(self._extract_ssns(text, url)) + + # Extract credit card numbers + results.extend(self._extract_credit_cards(text, url)) + + # Extract names (basic pattern matching) + results.extend(self._extract_names(text, url)) + + # Extract addresses + results.extend(self._extract_addresses(text, url)) + + self.results.extend(results) + return results + + def _extract_emails(self, text: str, url: str) -> List[ExtractionResult]: + """Extract email addresses with context""" + results = [] + + for match in re.finditer(RegexPatterns.EMAIL, text): + email = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + # Determine risk level based on context + risk_level = 'medium' + sensitive_keywords = ['admin', 'root', 'support', 'contact', 'info'] + if any(keyword in email.lower() for keyword in sensitive_keywords): + risk_level = 'high' + + results.append(ExtractionResult( + category='pii', + confidence=0.9, + risk_level=risk_level, + data={ + 'type': 'email', + 'email': email, + 'domain': email.split('@')[1] if '@' in email else None + }, + context=context, + location=url + )) + + return results + + def _extract_phone_numbers(self, text: str, url: str) -> List[ExtractionResult]: + """Extract phone numbers using phonenumbers library""" + results = [] + + # Try to parse phone numbers from text + try: + for match in phonenumbers.PhoneNumberMatcher(text, None): + phone = match.number + phone_str = phonenumbers.format_number( + phone, phonenumbers.PhoneNumberFormat.INTERNATIONAL + ) + + context = self.get_context(text, match.start, match.end) + + results.append(ExtractionResult( + category='pii', + confidence=0.85, + risk_level='high', + data={ + 'type': 'phone_number', + 'number': phone_str, + 'country_code': phone.country_code, + 'national_number': phone.national_number, + 'is_valid': phonenumbers.is_valid_number(phone) + }, + context=context, + location=url + )) + except Exception as _: + # Fallback to regex if phonenumbers fails + for match in re.finditer(RegexPatterns.PHONE, text): + phone_str = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='pii', + confidence=0.7, + risk_level='high', + data={ + 'type': 'phone_number', + 'number': phone_str, + 'is_valid': None + }, + context=context, + location=url + )) + + return results + + def _extract_ssns(self, text: str, url: str) -> List[ExtractionResult]: + """Extract Social Security Numbers""" + results = [] + + for match in re.finditer(RegexPatterns.SSN, text): + ssn = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + # Check if context suggests this is actually an SSN + ssn_keywords = ['ssn', 'social security', 'ss#', 'social'] + context_lower = context.lower() + confidence = 0.6 + + if any(keyword in context_lower for keyword in ssn_keywords): + confidence = 0.9 + + results.append(ExtractionResult( + category='pii', + confidence=confidence, + risk_level='critical', + data={ + 'type': 'ssn', + 'ssn': ssn, + 'masked': ssn[:3] + '-XX-' + ssn[-4:] + }, + context=context, + location=url + )) + + return results + + def _extract_credit_cards(self, text: str, url: str) -> List[ExtractionResult]: + """Extract and validate credit card numbers""" + results = [] + + for match in re.finditer(RegexPatterns.CREDIT_CARD, text): + card_number = match.group(0) + + # Validate using Luhn algorithm + if not LuhnValidator.validate(card_number): + continue + + context = self.get_context(text, match.start(), match.end()) + + # Determine card type + card_type = self._identify_card_type(card_number) + + results.append(ExtractionResult( + category='pii', + confidence=0.95, + risk_level='critical', + data={ + 'type': 'credit_card', + 'card_type': card_type, + 'card_number': card_number, + 'masked': card_number[:4] + 'XXXXXXXX' + card_number[-4:], + 'luhn_valid': True + }, + context=context, + location=url + )) + + return results + + def _identify_card_type(self, card_number: str) -> str: + """Identify credit card type from number""" + if card_number[0] == '4': + return 'Visa' + elif card_number[0] == '5': + return 'Mastercard' + elif card_number[:2] in ['34', '37']: + return 'American Express' + elif card_number[:2] in ['36', '38'] or card_number[:3] in ['300', '301', '302', '303', '304', '305']: + return 'Diners Club' + elif card_number[:4] in ['6011'] or card_number[:2] == '65': + return 'Discover' + else: + return 'Unknown' + + def _extract_names(self, text: str, url: str) -> List[ExtractionResult]: + """Extract potential names (basic pattern matching)""" + results = [] + + # Pattern for names (Capital Letter followed by lowercase, 2-3 words) + name_pattern = r'\b([A-Z][a-z]{2,}\s+[A-Z][a-z]{2,}(?:\s+[A-Z][a-z]{2,})?)\b' + + for match in re.finditer(name_pattern, text): + name = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + # Skip common false positives + skip_words = ['The', 'And', 'But', 'For', 'Nor', 'Yet', 'So'] + if any(word in name for word in skip_words): + continue + + # Check if context suggests this is a name + name_keywords = ['name', 'contact', 'by', 'author', 'posted by', 'mr', 'ms', 'dr'] + context_lower = context.lower() + confidence = 0.4 + + if any(keyword in context_lower for keyword in name_keywords): + confidence = 0.7 + + # Only include if confidence is reasonable + if confidence >= 0.6: + results.append(ExtractionResult( + category='pii', + confidence=confidence, + risk_level='medium', + data={ + 'type': 'name', + 'name': name + }, + context=context, + location=url + )) + + return results + + def _extract_addresses(self, text: str, url: str) -> List[ExtractionResult]: + """Extract physical addresses (basic pattern matching)""" + results = [] + + # Pattern for street addresses (simplified) + address_pattern = r'\b\d+\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\s+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Way)\b' + + for match in re.finditer(address_pattern, text): + address = match.group(0) + context = self.get_context(text, match.start(), match.end(), 150) + + results.append(ExtractionResult( + category='pii', + confidence=0.65, + risk_level='high', + data={ + 'type': 'address', + 'address': address + }, + context=context, + location=url + )) + + # Pattern for zip codes with context + zip_pattern = r'\b\d{5}(?:-\d{4})?\b' + for match in re.finditer(zip_pattern, text): + zip_code = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + # Only include if context suggests address + address_keywords = ['zip', 'address', 'mail', 'shipping', 'location'] + if any(keyword in context.lower() for keyword in address_keywords): + results.append(ExtractionResult( + category='pii', + confidence=0.75, + risk_level='medium', + data={ + 'type': 'zip_code', + 'zip_code': zip_code + }, + context=context, + location=url + )) + + return results + diff --git a/src/torbot/modules/deep_extract/threat_indicators_extractor.py b/src/torbot/modules/deep_extract/threat_indicators_extractor.py new file mode 100644 index 00000000..9cc7a187 --- /dev/null +++ b/src/torbot/modules/deep_extract/threat_indicators_extractor.py @@ -0,0 +1,319 @@ +""" +Threat Indicators (IoCs) Extractor +""" + +import re +from typing import List +from .base import BaseExtractor, ExtractionResult, RegexPatterns + + +class ThreatIndicatorsExtractor(BaseExtractor): + """Extract Indicators of Compromise and threat intelligence""" + + def extract(self, text: str, url: str = "") -> List[ExtractionResult]: + """Extract various threat indicators""" + results = [] + + # Extract IP addresses + results.extend(self._extract_ip_addresses(text, url)) + + # Extract domains + results.extend(self._extract_domains(text, url)) + + # Extract file hashes + results.extend(self._extract_file_hashes(text, url)) + + # Extract CVE references + results.extend(self._extract_cves(text, url)) + + # Extract malware indicators + results.extend(self._extract_malware_indicators(text, url)) + + # Extract C2 infrastructure patterns + results.extend(self._extract_c2_patterns(text, url)) + + self.results.extend(results) + return results + + def _extract_ip_addresses(self, text: str, url: str) -> List[ExtractionResult]: + """Extract IPv4 and IPv6 addresses""" + results = [] + + # IPv4 + for match in re.finditer(RegexPatterns.IPV4, text): + ip = match.group(0) + + # Skip common false positives + if ip.startswith('0.') or ip.startswith('255.255.255.'): + continue + + context = self.get_context(text, match.start(), match.end()) + + # Classify IP type + ip_type = self._classify_ip_type(ip) + + results.append(ExtractionResult( + category='threat_indicators', + confidence=0.8, + risk_level=self._get_ip_risk_level(ip, context), + data={ + 'type': 'ipv4', + 'ip_address': ip, + 'ip_type': ip_type + }, + context=context, + location=url + )) + + # IPv6 + for match in re.finditer(RegexPatterns.IPV6, text): + ip = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + results.append(ExtractionResult( + category='threat_indicators', + confidence=0.85, + risk_level='medium', + data={ + 'type': 'ipv6', + 'ip_address': ip + }, + context=context, + location=url + )) + + return results + + def _classify_ip_type(self, ip: str) -> str: + """Classify IP address type""" + octets = ip.split('.') + first_octet = int(octets[0]) + + # Private ranges + if first_octet == 10: + return 'private' + elif first_octet == 172 and 16 <= int(octets[1]) <= 31: + return 'private' + elif first_octet == 192 and int(octets[1]) == 168: + return 'private' + elif first_octet == 127: + return 'loopback' + else: + return 'public' + + def _get_ip_risk_level(self, ip: str, context: str) -> str: + """Determine risk level for IP address""" + context_lower = context.lower() + + high_risk_keywords = ['c2', 'command', 'control', 'malware', 'exploit', 'attack'] + if any(keyword in context_lower for keyword in high_risk_keywords): + return 'high' + + ip_type = self._classify_ip_type(ip) + if ip_type == 'public': + return 'medium' + else: + return 'low' + + def _extract_domains(self, text: str, url: str) -> List[ExtractionResult]: + """Extract domain names (excluding .onion which are handled separately)""" + results = [] + + for match in re.finditer(RegexPatterns.DOMAIN, text): + domain = match.group(0) + + # Skip .onion domains and common false positives + if domain.endswith('.onion') or domain.endswith('.local'): + continue + + # Skip very common domains unless in suspicious context + common_domains = ['google.com', 'facebook.com', 'twitter.com', 'example.com'] + if domain in common_domains: + continue + + context = self.get_context(text, match.start(), match.end()) + + # Check for suspicious indicators + suspicious_keywords = ['phishing', 'fake', 'malicious', 'compromised', 'infected'] + confidence = 0.7 + risk_level = 'medium' + + if any(keyword in context.lower() for keyword in suspicious_keywords): + confidence = 0.9 + risk_level = 'high' + + results.append(ExtractionResult( + category='threat_indicators', + confidence=confidence, + risk_level=risk_level, + data={ + 'type': 'domain', + 'domain': domain, + 'tld': domain.split('.')[-1] + }, + context=context, + location=url + )) + + return results + + def _extract_file_hashes(self, text: str, url: str) -> List[ExtractionResult]: + """Extract file hashes (MD5, SHA1, SHA256)""" + results = [] + + hash_types = [ + (RegexPatterns.MD5, 'MD5', 32), + (RegexPatterns.SHA1, 'SHA1', 40), + (RegexPatterns.SHA256, 'SHA256', 64) + ] + + for pattern, hash_type, length in hash_types: + for match in re.finditer(pattern, text): + hash_value = match.group(0) + context = self.get_context(text, match.start(), match.end(), 200) + + # Check if context suggests this is a file hash + file_keywords = ['hash', 'checksum', 'md5', 'sha1', 'sha256', 'file', 'malware', 'sample'] + confidence = 0.5 + + if any(keyword in context.lower() for keyword in file_keywords): + confidence = 0.85 + + # Only include if confidence is reasonable + if confidence >= 0.7: + results.append(ExtractionResult( + category='threat_indicators', + confidence=confidence, + risk_level='medium', + data={ + 'type': 'file_hash', + 'hash_type': hash_type, + 'hash': hash_value, + 'length': length + }, + context=context, + location=url + )) + + return results + + def _extract_cves(self, text: str, url: str) -> List[ExtractionResult]: + """Extract CVE (Common Vulnerabilities and Exposures) references""" + results = [] + + for match in re.finditer(RegexPatterns.CVE, text): + cve = match.group(0) + context = self.get_context(text, match.start(), match.end(), 250) + + # Extract year from CVE + year = cve.split('-')[1] + + # Check for exploit mentions + exploit_keywords = ['exploit', 'poc', 'proof of concept', '0day', 'zero day'] + risk_level = 'medium' + confidence = 0.95 + + if any(keyword in context.lower() for keyword in exploit_keywords): + risk_level = 'high' + + results.append(ExtractionResult( + category='threat_indicators', + confidence=confidence, + risk_level=risk_level, + data={ + 'type': 'cve', + 'cve_id': cve, + 'year': year + }, + context=context, + location=url + )) + + return results + + def _extract_malware_indicators(self, text: str, url: str) -> List[ExtractionResult]: + """Extract malware-related indicators""" + results = [] + + # Common malware families + malware_families = [ + 'wannacry', 'emotet', 'trickbot', 'ryuk', 'maze', 'conti', + 'ransomware', 'trojan', 'backdoor', 'rootkit', 'keylogger', + 'botnet', 'mirai', 'cobalt strike', 'metasploit' + ] + + for malware in malware_families: + pattern = r'\b' + re.escape(malware) + r'\b' + for match in re.finditer(pattern, text, re.IGNORECASE): + context = self.get_context(text, match.start(), match.end(), 200) + + results.append(ExtractionResult( + category='threat_indicators', + confidence=0.85, + risk_level='high', + data={ + 'type': 'malware_mention', + 'malware_family': malware.title() + }, + context=context, + location=url + )) + + # Suspicious file extensions + suspicious_extensions = [ + r'\b\w+\.(?:exe|dll|sys|bat|ps1|vbs|js|scr|com|pif|msi)\b' + ] + + for pattern in suspicious_extensions: + for match in re.finditer(pattern, text, re.IGNORECASE): + filename = match.group(0) + context = self.get_context(text, match.start(), match.end()) + + # Check if context suggests malware + malware_keywords = ['malware', 'virus', 'trojan', 'payload', 'dropper', 'loader'] + if any(keyword in context.lower() for keyword in malware_keywords): + results.append(ExtractionResult( + category='threat_indicators', + confidence=0.75, + risk_level='high', + data={ + 'type': 'suspicious_file', + 'filename': filename, + 'extension': filename.split('.')[-1] + }, + context=context, + location=url + )) + + return results + + def _extract_c2_patterns(self, text: str, url: str) -> List[ExtractionResult]: + """Extract Command & Control infrastructure patterns""" + results = [] + + c2_keywords = [ + r'(?i)c2\s+(?:server|infrastructure|domain|ip)', + r'(?i)command\s+(?:and|&)\s+control', + r'(?i)c&c\s+(?:server|infrastructure)', + r'(?i)callback\s+(?:url|domain|server)', + r'(?i)exfil(?:tration)?\s+(?:server|domain)' + ] + + for pattern in c2_keywords: + for match in re.finditer(pattern, text): + context = self.get_context(text, match.start(), match.end(), 250) + + results.append(ExtractionResult( + category='threat_indicators', + confidence=0.9, + risk_level='high', + data={ + 'type': 'c2_infrastructure', + 'indicator_type': 'keyword_match' + }, + context=context, + location=url + )) + + return results +