diff --git a/admin/admin/admin.html b/admin/admin/admin.html index 5f1ff24..4c06f2e 100644 --- a/admin/admin/admin.html +++ b/admin/admin/admin.html @@ -144,27 +144,22 @@

- -
- +
+ + diff --git a/poetry.lock b/poetry.lock index 9ca3cb3..990abf3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -214,6 +214,20 @@ files = [ {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] +[[package]] +name = "colorful" +version = "0.5.6" +description = "Terminal string styling done right, in Python." +optional = false +python-versions = "*" +files = [ + {file = "colorful-0.5.6-py2.py3-none-any.whl", hash = "sha256:eab8c1c809f5025ad2b5238a50bd691e26850da8cac8f90d660ede6ea1af9f1e"}, + {file = "colorful-0.5.6.tar.gz", hash = "sha256:b56d5c01db1dac4898308ea889edcb113fbee3e6ec5df4bacffd61d5241b5b8d"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} + [[package]] name = "dill" version = "0.3.8" @@ -829,4 +843,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "1f384658dbc65f6e964526a839137b9bfbab63bad09ca1540c6c1b21f001d9b6" +content-hash = "ea67d9b8227c4e25c91623c82143faf7e94fbfaf3c6d8169670458e1e2df8dd4" diff --git a/pyproject.toml b/pyproject.toml index a5703ac..41c0cc9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ whatismyip = ">=2022.7.10" peewee = "^3.17.6" peewee-migrate = "^1.12.2" +colorful = "^0.5.6" [tool.poetry.group.dev.dependencies] black = "^24.4.2" isort = "^5.13.2" diff --git a/web3pi_proxy/config/conf.py b/web3pi_proxy/config/conf.py index 24f74d6..7b1d62a 100644 --- a/web3pi_proxy/config/conf.py +++ b/web3pi_proxy/config/conf.py @@ -49,6 +49,7 @@ class AppConfig: SSL_ENABLED: bool = False SSL_CERT_FILE: str = "cert.pem" SSL_KEY_FILE: str = "key.pem" + TUNNEL_ESTABLISH_PORT: int = 7634 LISTEN_BACKLOG_PARAM: int = 21 BLOCKING_ACCEPT_TIMEOUT: int = 5 diff --git a/web3pi_proxy/core/proxy.py b/web3pi_proxy/core/proxy.py index 20c191d..dd18baf 100644 --- a/web3pi_proxy/core/proxy.py +++ b/web3pi_proxy/core/proxy.py @@ -166,6 +166,7 @@ def handle_client( except Exception as error: self.__logger.error("%s: %s", error.__class__, error) self.__logger.error("Failed to establish endpoint connection") + self.__logger.exception(error) cs.send_all( ErrorResponses.connection_error(req.id) ) # TODO: detect wether client connection is closed diff --git a/web3pi_proxy/core/rpc/node/endpoint_pool/endpoint_connection_pool.py b/web3pi_proxy/core/rpc/node/endpoint_pool/endpoint_connection_pool.py index e4491f9..f12bb44 100644 --- a/web3pi_proxy/core/rpc/node/endpoint_pool/endpoint_connection_pool.py +++ b/web3pi_proxy/core/rpc/node/endpoint_pool/endpoint_connection_pool.py @@ -1,5 +1,6 @@ from __future__ import annotations +import socket import enum import time from typing import Set @@ -18,6 +19,7 @@ ) from web3pi_proxy.core.rpc.node.rpcendpoint.endpointimpl import RPCEndpoint from web3pi_proxy.core.rpc.request.rpcrequest import RPCRequest +from web3pi_proxy.core.sockets.basesocket import BaseSocket from web3pi_proxy.utils.logger import get_logger @@ -135,7 +137,8 @@ def __run_closing_thread(self): try: connection.close() except Exception as ex: - self.__logger.error(f"Error while closing a connection", ex) + self.__logger.error("Error while closing a connection") + self.__logger.exception(ex) def __run_cleanup_thread(self) -> None: while True: @@ -167,6 +170,12 @@ def __run_cleanup_thread(self) -> None: def __get_connection(self) -> EndpointConnection: return self.connections.get_nowait() + def new_connection(self) -> EndpointConnection: + """Internal function, do not call directly""" + def connection_factory() -> socket: # TODO is it worth to move it to object level and reuse? + return BaseSocket.create_socket(self.endpoint.conn_descr.host, self.endpoint.conn_descr.port) + return EndpointConnection(self.endpoint, connection_factory) + def __update_status(self, status: str): self.status = status self.__logger.debug("Changed %s status to %s", str(self), status) @@ -180,7 +189,7 @@ def get(self) -> EndpointConnectionHandler: self.__lock.release() self.__logger.debug("No existing connections available, establishing new connection") try: - connection = EndpointConnection(self.endpoint) + connection = self.new_connection() except Exception as error: self.stats.register_error_on_connection_creation() raise error @@ -217,6 +226,9 @@ def put(self, connection: EndpointConnection) -> None: def is_active(self): return self.status == self.PoolStatus.ACTIVE + def is_open(self): + return self.status == self.PoolStatus.ACTIVE.value or self.status == self.PoolStatus.DISABLED.value + def disable(self): with self.__lock: if self.status == self.PoolStatus.CLOSED or self.status == self.PoolStatus.CLOSING: diff --git a/web3pi_proxy/core/rpc/node/endpoint_pool/pool_manager.py b/web3pi_proxy/core/rpc/node/endpoint_pool/pool_manager.py index a5914cc..6cd1511 100644 --- a/web3pi_proxy/core/rpc/node/endpoint_pool/pool_manager.py +++ b/web3pi_proxy/core/rpc/node/endpoint_pool/pool_manager.py @@ -1,5 +1,8 @@ from __future__ import annotations +import select +import socket + import time from threading import RLock, Thread from typing import List, Tuple @@ -10,8 +13,9 @@ from web3pi_proxy.core.rpc.node.endpoint_pool.load_balancers import ( LoadBalancer, ) +from web3pi_proxy.core.rpc.node.endpoint_pool.tunnel_connection_pool import TunnelConnectionPool from web3pi_proxy.core.rpc.node.rpcendpoint.connection.connectiondescr import ( - EndpointConnectionDescriptor, + EndpointConnectionDescriptor, ConnectionType, ) from web3pi_proxy.core.rpc.node.rpcendpoint.connection.endpoint_connection_handler import ( EndpointConnectionHandler, @@ -19,6 +23,7 @@ from web3pi_proxy.core.rpc.node.rpcendpoint.endpointimpl import RPCEndpoint from web3pi_proxy.core.rpc.request.rpcrequest import RPCRequest from web3pi_proxy.utils.logger import get_logger +from web3pi_proxy.config.conf import Config class ConnectionPoolError(Exception): @@ -135,6 +140,13 @@ def __get_active_pools(self): if connection_pool.is_active() ] + def __get_open_pools(self): + return [ + connection_pool + for connection_pool in self.pools.values() + if connection_pool.is_open() + ] + def __damage_control(self): while True: self.__logger.debug("Running check on endpoint connections") @@ -153,7 +165,10 @@ def add_pool( f"Creating endpoint {name} with connection {conn_descr}" ) endpoint = RPCEndpoint.create(name, conn_descr) - connection_pool = EndpointConnectionPool(endpoint) + if endpoint.conn_descr.connection_type == ConnectionType.TUNNEL: + connection_pool = TunnelConnectionPool(endpoint) + else: + connection_pool = EndpointConnectionPool(endpoint) self.pools[name] = connection_pool return endpoint diff --git a/web3pi_proxy/core/rpc/node/endpoint_pool/tunnel_connection_pool.py b/web3pi_proxy/core/rpc/node/endpoint_pool/tunnel_connection_pool.py new file mode 100644 index 0000000..03c528e --- /dev/null +++ b/web3pi_proxy/core/rpc/node/endpoint_pool/tunnel_connection_pool.py @@ -0,0 +1,60 @@ +import socket + +from web3pi_proxy.config import Config +from web3pi_proxy.core.rpc.node.endpoint_pool.endpoint_connection_pool import EndpointConnectionPool +from web3pi_proxy.core.rpc.node.endpoint_pool.tunnel_connection_pool_intf import TunnelConnectionPoolIntf +from web3pi_proxy.core.rpc.node.endpoint_pool.tunnel_service import TunnelService +from web3pi_proxy.core.rpc.node.rpcendpoint.connection.endpointconnection import EndpointConnection +from web3pi_proxy.core.rpc.node.rpcendpoint.endpointimpl import RPCEndpoint + +from web3pi_proxy.utils.logger import get_logger + + +class TunnelConnectionPool(EndpointConnectionPool, TunnelConnectionPoolIntf): + + def __init__( + self, + endpoint: RPCEndpoint, + ): + super().__init__(endpoint) + self.__logger = get_logger(f"TunnelConnectionPool.{id(self)}") + + self.tunnel_api_key = endpoint.conn_descr.extras["tunnel_service_auth_key"] + self.tunnel_proxy_establish_port: int = endpoint.conn_descr.extras["tunnel_proxy_establish_port"] + + tunnel_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + tunnel_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + tunnel_socket.bind((Config.PROXY_LISTEN_ADDRESS, self.tunnel_proxy_establish_port)) + tunnel_socket.listen(Config.LISTEN_BACKLOG_PARAM) + self.tunnel_socket = tunnel_socket + + self.tunnel_service_socket = None + + self.status = self.PoolStatus.DISABLED.value + + TunnelService.register(self.tunnel_api_key, self) + + def new_connection(self) -> EndpointConnection: + + def connection_factory() -> socket: # TODO is it worth to move it to object level and reuse? + self.__logger.debug("Creating socket") + self.tunnel_service_socket.sendall(b"NEWCONN") + new_conn_sock, new_conn_addr = self.tunnel_socket.accept() + self.__logger.debug("Finished connecting socket") + return new_conn_sock + + return EndpointConnection(self.endpoint, connection_factory) + + def new_tunnel_service_socket(self, tunnel_service_socket: socket): + with self._EndpointConnectionPool__lock: + self.tunnel_service_socket = tunnel_service_socket + self.status = self.PoolStatus.ACTIVE.value + + def close(self) -> None: + super().close() + self.tunnel_socket.close() + self.tunnel_socket = None + if self.tunnel_service_socket: + self.tunnel_service_socket.close() + self.tunnel_service_socket = None + TunnelService.unregister(self.tunnel_api_key, self) diff --git a/web3pi_proxy/core/rpc/node/endpoint_pool/tunnel_connection_pool_intf.py b/web3pi_proxy/core/rpc/node/endpoint_pool/tunnel_connection_pool_intf.py new file mode 100644 index 0000000..abfbba7 --- /dev/null +++ b/web3pi_proxy/core/rpc/node/endpoint_pool/tunnel_connection_pool_intf.py @@ -0,0 +1,9 @@ +import socket + + +class TunnelConnectionPoolIntf: + """Added to resolve the circular dependencies""" + tunnel_proxy_establish_port: int + + def new_tunnel_service_socket(self, tunnel_service_socket: socket): + pass diff --git a/web3pi_proxy/core/rpc/node/endpoint_pool/tunnel_service.py b/web3pi_proxy/core/rpc/node/endpoint_pool/tunnel_service.py new file mode 100644 index 0000000..b71994f --- /dev/null +++ b/web3pi_proxy/core/rpc/node/endpoint_pool/tunnel_service.py @@ -0,0 +1,69 @@ +import socket +import select +from threading import Lock, Thread + +from web3pi_proxy.config import Config +from web3pi_proxy.core.rpc.node.endpoint_pool.tunnel_connection_pool_intf import TunnelConnectionPoolIntf +from web3pi_proxy.utils.logger import get_logger + + +class TunnelServiceImpl: + """The service is lazy initialized at the first pool registration + to avoid resource allocation when there is not pool with a tunnel""" + def __init__(self): + self.__registry = dict() + self.__lock = Lock() + self.__initialized = False + self.__logger = get_logger(f"TunnelServiceImpl") + + def register(self, api_key: str, tunnel_connection_pool: TunnelConnectionPoolIntf): + with self.__lock: + if not self.__initialized: + self.__initialize__() + self.__initialized = True + self.__registry[api_key] = tunnel_connection_pool + + def unregister(self, api_key: str, tunnel_connection_pool: TunnelConnectionPoolIntf): + with self.__lock: + if self.__registry.get(api_key) == tunnel_connection_pool: + del self.__registry[api_key] + + def __initialize__(self): + tunnel_srv_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + tunnel_srv_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + tunnel_srv_socket.bind((Config.PROXY_LISTEN_ADDRESS, Config.TUNNEL_ESTABLISH_PORT)) + tunnel_srv_socket.listen(Config.LISTEN_BACKLOG_PARAM) + self.tunnel_srv_socket = tunnel_srv_socket + self.tunnel_thread = Thread( + target=self.__tunnel_service_run, + daemon=True, + ) + self.tunnel_thread.start() + + def __tunnel_service_run(self): + while True: + ready_read, _, _ = select.select([self.tunnel_srv_socket], [], [], Config.BLOCKING_ACCEPT_TIMEOUT) + + if len(ready_read) == 0: + continue + tunnel_sock, cli_addr = self.tunnel_srv_socket.accept() + self.__logger.debug( + f"New tunnel request from {cli_addr}" + ) + + cli_api_key = tunnel_sock.recv(2048).decode("utf-8") # TODO the attack: a tunnel client does not send api key + if not cli_api_key: + tunnel_sock.close() + continue + + with self.__lock: + pool: TunnelConnectionPoolIntf = self.__registry.get(cli_api_key) + if not pool: + tunnel_sock.close() + tunnel_sock.sendall(f'RJCT|invalid_auth_key'.encode("utf-8")) + continue + tunnel_sock.sendall(f'ACPT|{Config.PROXY_LISTEN_ADDRESS}:{pool.tunnel_proxy_establish_port}'.encode("utf-8")) + pool.new_tunnel_service_socket(tunnel_sock) + + +TunnelService = TunnelServiceImpl() diff --git a/web3pi_proxy/core/rpc/node/rpcendpoint/connection/connectiondescr.py b/web3pi_proxy/core/rpc/node/rpcendpoint/connection/connectiondescr.py index b209ac2..d013906 100644 --- a/web3pi_proxy/core/rpc/node/rpcendpoint/connection/connectiondescr.py +++ b/web3pi_proxy/core/rpc/node/rpcendpoint/connection/connectiondescr.py @@ -1,10 +1,15 @@ from __future__ import annotations from dataclasses import dataclass - +import enum import urllib3.util +class ConnectionType(enum.Enum): + DIRECT = "DIRECT" + TUNNEL = "TUNNEL" + + @dataclass class EndpointConnectionDescriptor: host: str @@ -12,6 +17,8 @@ class EndpointConnectionDescriptor: auth_key: str is_ssl: bool url: str + extras: dict + connection_type: ConnectionType @classmethod def from_url(cls, url) -> EndpointConnectionDescriptor | None: @@ -37,4 +44,24 @@ def from_url(cls, url) -> EndpointConnectionDescriptor | None: else: return None - return EndpointConnectionDescriptor(host, int(port), auth_key, is_ssl, url) + return EndpointConnectionDescriptor(host, int(port), auth_key, is_ssl, url, dict(), ConnectionType.DIRECT) + + @classmethod + def from_dict(cls, conf: dict) -> EndpointConnectionDescriptor | None: + url: str = conf["url"] + conn_descr = cls.from_url(url) + if not conn_descr: + return None + connection_type: str = conf.get("connection_type") + if connection_type: + try: + conn_descr.connection_type = ConnectionType(connection_type.upper()) + except ValueError: + raise Exception(f"Unrecognized connection type {connection_type}") + else: + conn_descr.connection_type = ConnectionType.DIRECT + conn_descr.extras = conf.copy() + del conn_descr.extras["name"] + del conn_descr.extras["url"] + conn_descr.extras.pop("connection_type", None) + return conn_descr diff --git a/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpointconnection.py b/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpointconnection.py index 30543f4..db69195 100644 --- a/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpointconnection.py +++ b/web3pi_proxy/core/rpc/node/rpcendpoint/connection/endpointconnection.py @@ -1,5 +1,8 @@ from __future__ import annotations + +import socket import time +from typing import Callable from web3pi_proxy.core.rpc.node.rpcendpoint.connection.connectiondescr import ( EndpointConnectionDescriptor, @@ -27,12 +30,13 @@ class EndpointConnection: auth_key: str is_ssl: bool - def __init__(self, endpoint: RPCEndpoint) -> None: + def __init__(self, endpoint: RPCEndpoint, connection_factory: Callable) -> None: self.__logger = get_logger(f"EndpointConnection.{id(self)}") self.endpoint = endpoint + self.connection_factory = connection_factory self.__logger.debug(f"Creating socket for endpoint: {endpoint}") - self.socket = self.__create_socket() + self.socket = self.__create_socket__() self.__logger.debug(f"Socket created for description: {self.conn_descr}") self.req_sender = RequestSender( @@ -51,9 +55,10 @@ def conn_descr(self) -> EndpointConnectionDescriptor: def ip(self) -> str: return self.socket.get_peer_name()[0] - def __create_socket(self) -> BaseSocket: - return BaseSocket.create_socket( - self.conn_descr.host, self.conn_descr.port, self.conn_descr.is_ssl + def __create_socket__(self) -> BaseSocket: + s_dst: socket = self.connection_factory() + return BaseSocket.wrap_socket( + s_dst, self.conn_descr.host, self.conn_descr.is_ssl ) def close(self) -> None: @@ -61,7 +66,7 @@ def close(self) -> None: def reconnect(self) -> None: self.close() - self.socket = self.__create_socket() + self.socket = self.__create_socket__() self.req_sender = RequestSender( self.socket, self.conn_descr.host, self.conn_descr.auth_key ) diff --git a/web3pi_proxy/core/sockets/basesocket.py b/web3pi_proxy/core/sockets/basesocket.py index 91c1c35..bbab2ff 100644 --- a/web3pi_proxy/core/sockets/basesocket.py +++ b/web3pi_proxy/core/sockets/basesocket.py @@ -59,7 +59,7 @@ def clear_mapping(cls, host: str) -> None: # FIXME: this call can fail (wrong address, endpoint no ready -> failed connection) @classmethod - def create_socket(cls, host: str, port: int, is_ssl: bool) -> BaseSocket: + def create_socket(cls, host: str, port: int) -> socket: # This hack allows multiple connections to a single endpoint (using mdns requires waiting some time between # sockets are successfully processed). Connecting with directly specified IP address solves this problem. # FIXME: it may fail for remote endpoints (such as infura), as there is no guarantee that the IP stays @@ -75,10 +75,15 @@ def create_socket(cls, host: str, port: int, is_ssl: bool) -> BaseSocket: s_dst.settimeout(5.0) # TODO parametrize? s_dst.connect((host_ip, port)) - s_dst.settimeout(None) cls.__logger.debug("Finished connecting socket") + return s_dst + + @classmethod + def wrap_socket(cls, s_dst: socket, host: str, is_ssl: bool) -> BaseSocket: + s_dst.settimeout(None) + if is_ssl: context = ssl.create_default_context() s_dst = context.wrap_socket(s_dst, server_hostname=host) @@ -86,3 +91,4 @@ def create_socket(cls, host: str, port: int, is_ssl: bool) -> BaseSocket: res = BaseSocket(s_dst) return res + diff --git a/web3pi_proxy/service/endpoints/endpoint_manager.py b/web3pi_proxy/service/endpoints/endpoint_manager.py index e5c967e..1a68bed 100644 --- a/web3pi_proxy/service/endpoints/endpoint_manager.py +++ b/web3pi_proxy/service/endpoints/endpoint_manager.py @@ -58,7 +58,7 @@ def get_endpoints(self) -> dict: return nodes_data def add_endpoint(self, name: str, url: str) -> RPCEndpoint | dict: - descriptor = EndpointConnectionDescriptor.from_url(url) + descriptor = EndpointConnectionDescriptor.from_url(url) # TODO from_dict try: endpoint = self.endpoint_pool_manager.add_pool(name, descriptor) except PoolAlreadyExistsError as error: @@ -75,7 +75,7 @@ def remove_endpoint(self, name: str) -> RPCEndpoint | dict: return endpoint def update_endpoint(self, name: str, url: str) -> RPCEndpoint | dict: - descriptor = EndpointConnectionDescriptor.from_url(url) + descriptor = EndpointConnectionDescriptor.from_url(url) # TODO from_dict try: self.endpoint_pool_manager.remove_pool(name) except PoolDoesNotExistError as error: diff --git a/web3pi_proxy/service/http/adminserver.py b/web3pi_proxy/service/http/adminserver.py index 732714a..f9e4841 100644 --- a/web3pi_proxy/service/http/adminserver.py +++ b/web3pi_proxy/service/http/adminserver.py @@ -1,3 +1,4 @@ +import colorful import hashlib import json import random @@ -193,9 +194,11 @@ def start(self) -> None: auth_token = self.server.auth.create_auth_token() print("Admin auth token: " f"{auth_token}") print("Use it with 'Authorization' header for POST requests") - print(f"Access admin portal with:") + print(f"\n\nAccess admin portal with:") print( - f"http://{Config.admin_connection_address}:{self.server.server_address[1]}/?token={auth_token}" + colorful.cyan( + f"http://{Config.admin_connection_address}:{self.server.server_address[1]}/?token={auth_token}\n\n" + ) ) super().start() diff --git a/web3pi_proxy/service/providers/serviceprovider.py b/web3pi_proxy/service/providers/serviceprovider.py index 81ffed7..d932354 100644 --- a/web3pi_proxy/service/providers/serviceprovider.py +++ b/web3pi_proxy/service/providers/serviceprovider.py @@ -42,7 +42,7 @@ def create_default_connection_pool(cls, endpoint_config: List[dict], loadbalance descriptors = [ ( entrypoint["name"], - EndpointConnectionDescriptor.from_url(entrypoint["url"]), + EndpointConnectionDescriptor.from_dict(entrypoint), ) for entrypoint in endpoint_config ]