diff --git a/gitgalaxy/core/detector.py b/gitgalaxy/core/detector.py index de5aa50..ed070cb 100644 --- a/gitgalaxy/core/detector.py +++ b/gitgalaxy/core/detector.py @@ -732,6 +732,13 @@ def _correlate_signals(self, targets: List[int], dampeners: List[int], max_dista def coding_analysis(self, segments: List[Tuple[str, str, int]], regex_telemetry: dict = None) -> Tuple[Dict[str, int], Dict[str, int], List[Dict[str, List[int]]], List[str]]: counts: Dict[str, int] = {key: 0 for key in self.UNIVERSAL_METRICS_SCHEMA} + + # --- THE FIX: INJECT APPSEC SENSORS --- + # Force the new Phase 4 sensors into the schema so the LogicSplicer doesn't ignore them + for appsec_key in ["memory_scraping", "exfiltration_camouflage", "rce_funnel"]: + if appsec_key not in counts: + counts[appsec_key] = 0 + mitigations: Dict[str, int] = {"mitigated_danger": 0, "mitigated_memory_allocs": 0, "amplified_rce": 0, "amplified_race_conditions": 0, "amplified_leaks": 0} segment_spatial_maps = [] extracted_parents = [] @@ -802,6 +809,26 @@ def coding_analysis(self, segments: List[Tuple[str, str, int]], regex_telemetry: # ---> NEW: SPATIAL CORRELATION (Runs once per segment) <--- + # ============================================================================== + # PHASE 4: AI APPSEC & ZERO-TRUST SENSORS (The Checkmarx/Bitwarden Defense) + # ============================================================================== + # 0a. The Exfiltration Distance Check + if "memory_scraping" in spatial_map and "exfiltration_camouflage" in spatial_map: + # Measures the physical call-path distance between the memory read and the socket + unmitigated, confirmed_exfiltration = self._correlate_signals( + targets=spatial_map["memory_scraping"], + dampeners=spatial_map["exfiltration_camouflage"], + max_distance=200 # If they happen within 200 chars of each other, it's a confirmed attack + ) + counts["memory_scraping"] += (confirmed_exfiltration * 100) # Massive penalty multiplier + mitigations["amplified_leaks"] += confirmed_exfiltration + + # 0b. The RCE Funnel Amplifier + if "rce_funnel" in spatial_map: + # RCE funnels inside JS/TS/Python are fatal structural anomalies. Multiply the mass. + counts["rce_funnel"] += (len(spatial_map["rce_funnel"]) * 50) + # ============================================================================== + # 1. Taint Tracking (RCE Weaponization) if "sec_danger" in spatial_map and ("sec_io" in spatial_map or "io" in spatial_map): io_hits = sorted(spatial_map.get("sec_io", []) + spatial_map.get("io", [])) diff --git a/gitgalaxy/standards/language_standards.py b/gitgalaxy/standards/language_standards.py index 064b581..8c09d8d 100644 --- a/gitgalaxy/standards/language_standards.py +++ b/gitgalaxy/standards/language_standards.py @@ -401,6 +401,10 @@ "regex_execution": re.compile(r'\b(re\.compile|re\.search|re\.match|re\.sub|re\.findall|re\.split)\b'), "time_date_logic": re.compile(r'\b(datetime\.datetime|timedelta|time\.sleep|time\.time|calendar)\b'), "ipc_rpc_bridges": re.compile(r'\b(multiprocessing|subprocess|xmlrpc|socketserver)\b'), + + # --- PHASE 4: APPSEC & AI SENSORS (Zero-Trust Pipelines) --- + "memory_scraping": re.compile(r"['\"]/proc/['\"]\s*\+\s*(?:str\([^)]*\)|f?['\"]\{[^}]*\})|/proc/\w+/mem"), + "exfiltration_camouflage": re.compile(r"\b(requests\.post|urllib\.request|httpx\.post)\s*\([^)]*(?:checkmarx|telemetry|metrics|audit|log)\b", re.I), }, }, "javascript": { @@ -668,6 +672,10 @@ "regex_execution": re.compile(r'\bnew\s+RegExp\b|\.(match|replace|search|split)\s*\('), "time_date_logic": re.compile(r'\b(Date\.now|new\s+Date|setTimeout|setInterval|clearTimeout|clearInterval|performance\.now)\b'), "ipc_rpc_bridges": re.compile(r'\b(postMessage|Worker|MessageChannel|child_process|worker_threads|cluster)\b'), + + # --- PHASE 4: APPSEC & AI SENSORS (Zero-Trust Pipelines) --- + "rce_funnel": re.compile(r"child_process\.(?:spawn|exec|execSync)\s*\(\s*['\"](?:python|bash|sh|bun|node)\b"), + "exfiltration_camouflage": re.compile(r"\b(fetch|axios\.post|https\.request)\s*\([^)]*(?:checkmarx|telemetry|metrics|audit|log)\b", re.I), }, }, "typescript": { @@ -924,6 +932,10 @@ "regex_execution": re.compile(r'\bnew\s+RegExp\b|\.(match|replace|search|split)\s*\('), "time_date_logic": re.compile(r'\b(Date\.now|new\s+Date|setTimeout|setInterval|clearTimeout|clearInterval|performance\.now)\b'), "ipc_rpc_bridges": re.compile(r'\b(postMessage|Worker|MessageChannel|child_process|worker_threads|cluster)\b'), + + # --- PHASE 4: APPSEC & AI SENSORS (Zero-Trust Pipelines) --- + "rce_funnel": re.compile(r"child_process\.(?:spawn|exec|execSync)\s*\(\s*['\"](?:python|bash|sh|bun|node)\b"), + "exfiltration_camouflage": re.compile(r"\b(fetch|axios\.post|https\.request)\s*\([^)]*(?:checkmarx|telemetry|metrics|audit|log)\b", re.I), }, }, "java": { diff --git a/site/app.py b/site/app.py index 48faa03..753cc2f 100644 --- a/site/app.py +++ b/site/app.py @@ -187,10 +187,11 @@ def create_checkout_session(): except requests.exceptions.RequestException as e: logger.error(f"Printify Upload Failed: {str(e)}") - return jsonify(error="Image upload to Printify failed. Please try again."), 500 + return jsonify(error="Image upload failed. Please try again."), 500 except Exception as e: + # Keep the raw error in YOUR logs, but give the user a sanitized message logger.error(f"Stripe Session Error: {str(e)}") - return jsonify(error=str(e)), 500 + return jsonify(error="An internal payment processing error occurred. Please contact support."), 500 @app.route('/webhook', methods=['POST']) def stripe_webhook(): @@ -230,7 +231,6 @@ def stripe_webhook(): line_items = stripe.checkout.Session.list_line_items(session.get('id')) final_quantity = line_items.data[0].quantity if line_items.data else 1 - printify_item = PRINTIFY_MAP.get(poster_size, PRINTIFY_MAP["5400x3600"]) printify_item = PRINTIFY_MAP.get(poster_size, PRINTIFY_MAP["5400x3600"]) master_product_id = printify_item["product_id"] target_variant_id = printify_item["variant_id"] @@ -302,6 +302,33 @@ def stripe_webhook(): # 4. ALWAYS return 200 to Stripe at the very end so it doesn't pause your webhook return jsonify({'status': 'success'}), 200 +# --- 5.5 ENTERPRISE LEAD CAPTURE (THE BUSINESS TRAP) --- +@app.route('/api/enterprise-lead', methods=['POST']) +def capture_enterprise_lead(): + try: + data = request.json + email = data.get('email', '').lower() + company = data.get('company', 'Unknown') + use_case = data.get('use_case', 'Unknown') + codebase_size = data.get('codebase_size', 'Unknown') + + # The Filter: Reject generic emails to ensure high-signal enterprise leads + generic_domains = ['@gmail.com', '@yahoo.com', '@hotmail.com', '@outlook.com'] + if any(domain in email for domain in generic_domains): + return jsonify(error="Please provide a valid corporate email address for commercial licensing."), 400 + + # Log the massive lead as a CRITICAL event so it stands out in your server logs + lead_msg = f"🚨 ENTERPRISE LEAD CAPTURED: {company} | Size: {codebase_size} | Case: {use_case} | Contact: {email}" + logger.critical(lead_msg) + + # TODO: Add logic here to ping your Discord webhook or send an email to joe@gitgalaxy.io + # requests.post(os.getenv("DISCORD_WEBHOOK_URL"), json={"content": lead_msg}) + + return jsonify({"status": "success", "message": "Lead captured. Our architecture team will be in touch shortly."}), 200 + + except Exception as e: + logger.error(f"Lead Capture Error: {str(e)}") + return jsonify(error="Failed to submit inquiry. Please email commercial@gitgalaxy.io directly."), 500 if __name__ == '__main__': print("\n" + "═"*50) @@ -310,4 +337,6 @@ def stripe_webhook(): print(" Access: http://localhost:5000") print("═"*50 + "\n") - app.run(debug=True, host='0.0.0.0', port=5000, threaded=True) \ No newline at end of file + # Securely load debug state from environment variables + is_debug = os.getenv("FLASK_ENV", "production").lower() == "development" + app.run(debug=is_debug, host='0.0.0.0', port=5000, threaded=True) \ No newline at end of file