Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions gitgalaxy/core/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,13 @@ def _correlate_signals(self, targets: List[int], dampeners: List[int], max_dista

def coding_analysis(self, segments: List[Tuple[str, str, int]], regex_telemetry: dict = None) -> Tuple[Dict[str, int], Dict[str, int], List[Dict[str, List[int]]], List[str]]:
counts: Dict[str, int] = {key: 0 for key in self.UNIVERSAL_METRICS_SCHEMA}

# --- THE FIX: INJECT APPSEC SENSORS ---
# Force the new Phase 4 sensors into the schema so the LogicSplicer doesn't ignore them
for appsec_key in ["memory_scraping", "exfiltration_camouflage", "rce_funnel"]:
if appsec_key not in counts:
counts[appsec_key] = 0

mitigations: Dict[str, int] = {"mitigated_danger": 0, "mitigated_memory_allocs": 0, "amplified_rce": 0, "amplified_race_conditions": 0, "amplified_leaks": 0}
segment_spatial_maps = []
extracted_parents = []
Expand Down Expand Up @@ -802,6 +809,26 @@ def coding_analysis(self, segments: List[Tuple[str, str, int]], regex_telemetry:

# ---> NEW: SPATIAL CORRELATION (Runs once per segment) <---

# ==============================================================================
# PHASE 4: AI APPSEC & ZERO-TRUST SENSORS (The Checkmarx/Bitwarden Defense)
# ==============================================================================
# 0a. The Exfiltration Distance Check
if "memory_scraping" in spatial_map and "exfiltration_camouflage" in spatial_map:
# Measures the physical call-path distance between the memory read and the socket
unmitigated, confirmed_exfiltration = self._correlate_signals(
targets=spatial_map["memory_scraping"],
dampeners=spatial_map["exfiltration_camouflage"],
max_distance=200 # If they happen within 200 chars of each other, it's a confirmed attack
)
counts["memory_scraping"] += (confirmed_exfiltration * 100) # Massive penalty multiplier
mitigations["amplified_leaks"] += confirmed_exfiltration

# 0b. The RCE Funnel Amplifier
if "rce_funnel" in spatial_map:
# RCE funnels inside JS/TS/Python are fatal structural anomalies. Multiply the mass.
counts["rce_funnel"] += (len(spatial_map["rce_funnel"]) * 50)
# ==============================================================================

# 1. Taint Tracking (RCE Weaponization)
if "sec_danger" in spatial_map and ("sec_io" in spatial_map or "io" in spatial_map):
io_hits = sorted(spatial_map.get("sec_io", []) + spatial_map.get("io", []))
Expand Down
12 changes: 12 additions & 0 deletions gitgalaxy/standards/language_standards.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,10 @@
"regex_execution": re.compile(r'\b(re\.compile|re\.search|re\.match|re\.sub|re\.findall|re\.split)\b'),
"time_date_logic": re.compile(r'\b(datetime\.datetime|timedelta|time\.sleep|time\.time|calendar)\b'),
"ipc_rpc_bridges": re.compile(r'\b(multiprocessing|subprocess|xmlrpc|socketserver)\b'),

# --- PHASE 4: APPSEC & AI SENSORS (Zero-Trust Pipelines) ---
"memory_scraping": re.compile(r"['\"]/proc/['\"]\s*\+\s*(?:str\([^)]*\)|f?['\"]\{[^}]*\})|/proc/\w+/mem"),
"exfiltration_camouflage": re.compile(r"\b(requests\.post|urllib\.request|httpx\.post)\s*\([^)]*(?:checkmarx|telemetry|metrics|audit|log)\b", re.I),
},
},
"javascript": {
Expand Down Expand Up @@ -668,6 +672,10 @@
"regex_execution": re.compile(r'\bnew\s+RegExp\b|\.(match|replace|search|split)\s*\('),
"time_date_logic": re.compile(r'\b(Date\.now|new\s+Date|setTimeout|setInterval|clearTimeout|clearInterval|performance\.now)\b'),
"ipc_rpc_bridges": re.compile(r'\b(postMessage|Worker|MessageChannel|child_process|worker_threads|cluster)\b'),

# --- PHASE 4: APPSEC & AI SENSORS (Zero-Trust Pipelines) ---
"rce_funnel": re.compile(r"child_process\.(?:spawn|exec|execSync)\s*\(\s*['\"](?:python|bash|sh|bun|node)\b"),
"exfiltration_camouflage": re.compile(r"\b(fetch|axios\.post|https\.request)\s*\([^)]*(?:checkmarx|telemetry|metrics|audit|log)\b", re.I),
},
},
"typescript": {
Expand Down Expand Up @@ -924,6 +932,10 @@
"regex_execution": re.compile(r'\bnew\s+RegExp\b|\.(match|replace|search|split)\s*\('),
"time_date_logic": re.compile(r'\b(Date\.now|new\s+Date|setTimeout|setInterval|clearTimeout|clearInterval|performance\.now)\b'),
"ipc_rpc_bridges": re.compile(r'\b(postMessage|Worker|MessageChannel|child_process|worker_threads|cluster)\b'),

# --- PHASE 4: APPSEC & AI SENSORS (Zero-Trust Pipelines) ---
"rce_funnel": re.compile(r"child_process\.(?:spawn|exec|execSync)\s*\(\s*['\"](?:python|bash|sh|bun|node)\b"),
"exfiltration_camouflage": re.compile(r"\b(fetch|axios\.post|https\.request)\s*\([^)]*(?:checkmarx|telemetry|metrics|audit|log)\b", re.I),
},
},
"java": {
Expand Down
37 changes: 33 additions & 4 deletions site/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,11 @@

except requests.exceptions.RequestException as e:
logger.error(f"Printify Upload Failed: {str(e)}")
return jsonify(error="Image upload to Printify failed. Please try again."), 500
return jsonify(error="Image upload failed. Please try again."), 500
except Exception as e:
# Keep the raw error in YOUR logs, but give the user a sanitized message
logger.error(f"Stripe Session Error: {str(e)}")
return jsonify(error=str(e)), 500
return jsonify(error="An internal payment processing error occurred. Please contact support."), 500

@app.route('/webhook', methods=['POST'])
def stripe_webhook():
Expand Down Expand Up @@ -230,7 +231,6 @@
line_items = stripe.checkout.Session.list_line_items(session.get('id'))
final_quantity = line_items.data[0].quantity if line_items.data else 1

printify_item = PRINTIFY_MAP.get(poster_size, PRINTIFY_MAP["5400x3600"])
printify_item = PRINTIFY_MAP.get(poster_size, PRINTIFY_MAP["5400x3600"])
master_product_id = printify_item["product_id"]
target_variant_id = printify_item["variant_id"]
Expand Down Expand Up @@ -302,6 +302,33 @@
# 4. ALWAYS return 200 to Stripe at the very end so it doesn't pause your webhook
return jsonify({'status': 'success'}), 200

# --- 5.5 ENTERPRISE LEAD CAPTURE (THE BUSINESS TRAP) ---
@app.route('/api/enterprise-lead', methods=['POST'])
def capture_enterprise_lead():
try:
data = request.json
email = data.get('email', '').lower()
company = data.get('company', 'Unknown')
use_case = data.get('use_case', 'Unknown')
codebase_size = data.get('codebase_size', 'Unknown')

# The Filter: Reject generic emails to ensure high-signal enterprise leads
generic_domains = ['@gmail.com', '@yahoo.com', '@hotmail.com', '@outlook.com']
if any(domain in email for domain in generic_domains):
return jsonify(error="Please provide a valid corporate email address for commercial licensing."), 400

# Log the massive lead as a CRITICAL event so it stands out in your server logs
lead_msg = f"🚨 ENTERPRISE LEAD CAPTURED: {company} | Size: {codebase_size} | Case: {use_case} | Contact: {email}"
logger.critical(lead_msg)

Check warning

Code scanning / CodeQL

Log Injection Medium

This log entry depends on a
user-provided value
.

# TODO: Add logic here to ping your Discord webhook or send an email to joe@gitgalaxy.io
# requests.post(os.getenv("DISCORD_WEBHOOK_URL"), json={"content": lead_msg})

return jsonify({"status": "success", "message": "Lead captured. Our architecture team will be in touch shortly."}), 200

except Exception as e:
logger.error(f"Lead Capture Error: {str(e)}")
return jsonify(error="Failed to submit inquiry. Please email commercial@gitgalaxy.io directly."), 500

if __name__ == '__main__':
print("\n" + "═"*50)
Expand All @@ -310,4 +337,6 @@
print(" Access: http://localhost:5000")
print("═"*50 + "\n")

app.run(debug=True, host='0.0.0.0', port=5000, threaded=True)
# Securely load debug state from environment variables
is_debug = os.getenv("FLASK_ENV", "production").lower() == "development"
app.run(debug=is_debug, host='0.0.0.0', port=5000, threaded=True)
Loading