-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathserver.py
More file actions
209 lines (161 loc) · 6.45 KB
/
server.py
File metadata and controls
209 lines (161 loc) · 6.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import cloudscraper
import ipaddress
import socket
import time
from urllib.parse import unquote, urlparse
from flask import Flask, request, Response
scraper = cloudscraper.create_scraper(
browser={
'browser': 'chrome',
'platform': 'windows',
'desktop': True
},
delay=1,
allow_brotli=True
)
def set_user_agent(headers):
# this needs to match Sec-Ch-Ua (cloudflare will flag as a bot seeing two different user agents)
# make sure to change it accordingly if you change this
headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' \
'Chrome/120.0.0.0 Safari/537.36'
return headers
def set_security_headers(headers):
headers['Sec-Ch-Ua'] = '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"'
headers['Sec-Ch-Ua-Mobile'] = '?0'
headers['Sec-Ch-Ua-Platform'] = '"Windows"'
headers['Sec-Fetch-Dest'] = 'empty'
headers['Sec-Fetch-Mode'] = 'cors'
headers['Sec-Fetch-Site'] = 'same-origin'
return headers
def set_origin_and_ref(headers, origin, ref):
headers['Origin'] = origin
headers['Referer'] = ref
return headers
def generate_origin_and_ref(url, headers):
data = url.split('/')
if len(data) > 2:
first = data[0]
base = data[2]
c_url = f"{first}//{base}/"
headers = set_origin_and_ref(headers, c_url, c_url)
return headers
def is_safe_url(url):
"""
Validates URL to prevent SSRF attacks by blocking local/private IP ranges.
Uses getaddrinfo to resolve ALL IP addresses (IPv4 and IPv6) to prevent
bypass via dual-stack hostnames that resolve to both public and private IPs.
Returns:
tuple: (is_safe: bool, error_message: str | None)
"""
try:
parsed = urlparse(url)
if parsed.scheme not in ('http', 'https'):
return False, "Only HTTP/HTTPS protocols are allowed"
hostname = parsed.hostname
if not hostname:
return False, "Invalid hostname"
try:
# Resolve hostname to IP(s) to check against blocklist.
# We use getaddrinfo to get all resolved IPs (IPv4 and IPv6) to prevent evasion
# where a hostname resolves to both a safe IP and a private IP.
addr_info = socket.getaddrinfo(hostname, None)
for family, _, _, _, sockaddr in addr_info:
# sockaddr is (address, port) for AF_INET and (address, port, flow info, scope id) for AF_INET6
ip_str = sockaddr[0]
ip = ipaddress.ip_address(ip_str)
if ip.is_loopback or ip.is_private or ip.is_reserved or ip.is_multicast or ip.is_unspecified:
return False, "Access to private/local network is forbidden"
except (socket.gaierror, ValueError):
# If we can't resolve it, fail closed.
return False, "Could not resolve hostname or invalid IP"
return True, None
except Exception:
return False, "Invalid URL format"
app = Flask(__name__)
HOP_BY_HOP_HEADERS = {
'connection',
'keep-alive',
'proxy-authenticate',
'proxy-authorization',
'te',
'trailers',
'transfer-encoding',
'upgrade',
}
def clean_headers(response):
headers = {}
for name, value in response.headers.items():
if name.lower() not in HOP_BY_HOP_HEADERS:
headers[name] = value
headers.pop('content-encoding', None)
headers.pop('content-length', None)
return headers
def generate_proxy_response(response) -> Response:
content_type = response.headers.get('content-type', '')
# OPTIMIZATION: Use .content (bytes) to avoid decoding overhead
# This provides significant performance improvement and avoids potential transcoding issues.
content = response.content
headers = clean_headers(response)
# For JSON content
if 'application/json' in content_type:
return Response(content, status=response.status_code, content_type='application/json')
# For HTML content
if 'text/html' in content_type:
# Use the original content type to ensure charset matches the raw bytes
return Response(content, status=response.status_code, content_type=content_type)
# For all other content types
return Response(
content,
status=response.status_code,
headers=headers
)
def get_headers():
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive'
}
headers = set_user_agent(headers)
headers = set_security_headers(headers)
return headers
def get_proxy_request_url(req, url):
full_url = unquote(url.strip('"'))
if req.query_string:
full_url = f"{full_url}?{req.query_string.decode('utf-8')}"
return full_url
def get_proxy_request_headers(req, url):
headers = get_headers()
headers['Accept-Encoding'] = 'gzip, deflate, br'
for key, value in req.headers.items():
if key.lower() not in ['host', 'connection', 'content-length']:
headers[key] = value
headers = generate_origin_and_ref(url, headers)
return headers
# Cloudflare bypassed request
@app.route("/api/proxy/<path:url>", methods=["GET"])
def handle_proxy(url):
if request.method == 'GET':
full_url = get_proxy_request_url(request, url) # parse request url
# SSRF protection check
is_safe, error_msg = is_safe_url(full_url)
if not is_safe:
print(f"SSRF blocked: {full_url} - {error_msg}")
return {'error': error_msg}, 403
headers = get_proxy_request_headers(request, url) # generate headers for the request
try:
start = time.time()
response = scraper.get(full_url, headers=headers, timeout=30)
end = time.time()
elapsed = end - start
print(f"Proxied request for {full_url.split('?')[0]} in {elapsed:.6f} seconds")
response.raise_for_status()
return generate_proxy_response(response)
except Exception as e:
print(f"Proxy Request Error: {str(e)}")
# Don't leak stack traces or internal details
return {'error': "Proxy request failed. Check server logs for details."}, 500
if __name__ == "__main__":
print('Starting cloudflare bypass proxy server')
from waitress import serve
serve(app, host="0.0.0.0", port=5000)