Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions admin/admin/admin.html
Original file line number Diff line number Diff line change
Expand Up @@ -144,27 +144,22 @@
<hr>

<ul class="nav nav-pills nav-stacked">
<li><a href="#" id="list_users"><i class="glyphicon glyphicon-user"></i> List Users</a></li>
<li><a href="#" id="list_activities_basic"><i class="glyphicon glyphicon-th-list"></i> Activities (basic)</a></li>
<li><a href="#" id="list_activities_detailed"><i class="glyphicon glyphicon-list-alt"></i> Activities (detailed)</a></li>
<li><a href="#" id="list_users"><i class="glyphicon glyphicon-user" style="color:darkgreen"></i> List Users</a></li>
<li><a href="#" id="list_activities_basic"><i class="glyphicon glyphicon-th-list" style="color:darkgreen"></i> Activities (basic)</a></li>
<li><a href="#" id="list_activities_detailed"><i class="glyphicon glyphicon-list-alt" style="color:darkgreen"></i> Activities (detailed)</a></li>
</ul>
<hr>

<ul class="nav nav-pills nav-stacked">
<li><a href="#" id="query_user_billing_plan"><i class="glyphicon glyphicon-usd"></i> User Billing Plan</a></li>
<li><a href="#" id="query_user_billing_status"><i class="glyphicon glyphicon-stats"></i> User Billing Stats</a></li>
</ul>
<form class="navbar-form" name="user_api_key_form" onkeydown="return event.key != 'Enter';">
<small class="lab-l">API key</small>
<label for="billing_api_key"></label><input id="billing_api_key" type="text" class="form-control pull-right" placeholder="User API key" style="width: 150px; height: 25px;">
</form>
<hr>

<ul class="nav nav-pills nav-stacked">
<li><a href="#" id="register_user"><i class="glyphicon glyphicon-plus"></i> Register User</a></li>
<li><a href="#" id="remove_user"><i class="glyphicon glyphicon-trash"></i> Remove User</a></li>
<li><a href="#" id="update_user_plan"><i class="glyphicon glyphicon-refresh"></i> Update User</a></li>
<li><a href="#" id="query_user_billing_plan"><i class="glyphicon glyphicon-usd" style="color:darkgreen"></i> User Billing Plan</a></li>
<li><a href="#" id="query_user_billing_status"><i class="glyphicon glyphicon-stats" style="color:darkgreen"></i> User Billing Stats</a></li>
</ul>
<hr>

<form class="navbar-form" name="manage_api_key_form" onkeydown="return event.key != 'Enter';">
<small class="lab-l">API key</small>
<label for="manage_api_key"></label><input id="manage_api_key" type="text" class="form-control pull-right" placeholder="User API key" style="width: 150px; height: 25px;">
Expand All @@ -185,6 +180,11 @@
<small class="lab-l">Constant Pool</small>
<label for="constant_pool"></label><input id="constant_pool" type="text" class="form-control pull-right deselect reset-cursor" placeholder="A pool name, optional" style="width: 150px; height: 25px;">
</form>
<ul class="nav nav-pills nav-stacked">
<li><a href="#" id="register_user"><i class="glyphicon glyphicon-plus" style="color:red"></i> Add User</a></li>
<li><a href="#" id="remove_user"><i class="glyphicon glyphicon-trash" style="color:red"></i> Remove User</a></li>
<li><a href="#" id="update_user_plan"><i class="glyphicon glyphicon-refresh" style="color:red"></i> Update User</a></li>
</ul>

</div><!-- /span-3 -->

Expand Down
16 changes: 15 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ whatismyip = ">=2022.7.10"
peewee = "^3.17.6"
peewee-migrate = "^1.12.2"

colorful = "^0.5.6"
[tool.poetry.group.dev.dependencies]
black = "^24.4.2"
isort = "^5.13.2"
Expand Down
1 change: 1 addition & 0 deletions web3pi_proxy/config/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class AppConfig:
SSL_ENABLED: bool = False
SSL_CERT_FILE: str = "cert.pem"
SSL_KEY_FILE: str = "key.pem"
TUNNEL_ESTABLISH_PORT: int = 7634

LISTEN_BACKLOG_PARAM: int = 21
BLOCKING_ACCEPT_TIMEOUT: int = 5
Expand Down
1 change: 1 addition & 0 deletions web3pi_proxy/core/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def handle_client(
except Exception as error:
self.__logger.error("%s: %s", error.__class__, error)
self.__logger.error("Failed to establish endpoint connection")
self.__logger.exception(error)
cs.send_all(
ErrorResponses.connection_error(req.id)
) # TODO: detect wether client connection is closed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import socket
import enum
import time
from typing import Set
Expand All @@ -18,6 +19,7 @@
)
from web3pi_proxy.core.rpc.node.rpcendpoint.endpointimpl import RPCEndpoint
from web3pi_proxy.core.rpc.request.rpcrequest import RPCRequest
from web3pi_proxy.core.sockets.basesocket import BaseSocket
from web3pi_proxy.utils.logger import get_logger


Expand Down Expand Up @@ -135,7 +137,8 @@ def __run_closing_thread(self):
try:
connection.close()
except Exception as ex:
self.__logger.error(f"Error while closing a connection", ex)
self.__logger.error("Error while closing a connection")
self.__logger.exception(ex)

def __run_cleanup_thread(self) -> None:
while True:
Expand Down Expand Up @@ -167,6 +170,12 @@ def __run_cleanup_thread(self) -> None:
def __get_connection(self) -> EndpointConnection:
return self.connections.get_nowait()

def new_connection(self) -> EndpointConnection:
"""Internal function, do not call directly"""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of naming it like that, please change the name to __new_connection or _new_connection to make it obvious that it's an internal function...

def connection_factory() -> socket: # TODO is it worth to move it to object level and reuse?
return BaseSocket.create_socket(self.endpoint.conn_descr.host, self.endpoint.conn_descr.port)
return EndpointConnection(self.endpoint, connection_factory)

def __update_status(self, status: str):
self.status = status
self.__logger.debug("Changed %s status to %s", str(self), status)
Expand All @@ -180,7 +189,7 @@ def get(self) -> EndpointConnectionHandler:
self.__lock.release()
self.__logger.debug("No existing connections available, establishing new connection")
try:
connection = EndpointConnection(self.endpoint)
connection = self.new_connection()
except Exception as error:
self.stats.register_error_on_connection_creation()
raise error
Expand Down Expand Up @@ -217,6 +226,9 @@ def put(self, connection: EndpointConnection) -> None:
def is_active(self):
return self.status == self.PoolStatus.ACTIVE

def is_open(self):
return self.status == self.PoolStatus.ACTIVE.value or self.status == self.PoolStatus.DISABLED.value

def disable(self):
with self.__lock:
if self.status == self.PoolStatus.CLOSED or self.status == self.PoolStatus.CLOSING:
Expand Down
19 changes: 17 additions & 2 deletions web3pi_proxy/core/rpc/node/endpoint_pool/pool_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import annotations

import select
import socket

import time
from threading import RLock, Thread
from typing import List, Tuple
Expand All @@ -10,15 +13,17 @@
from web3pi_proxy.core.rpc.node.endpoint_pool.load_balancers import (
LoadBalancer,
)
from web3pi_proxy.core.rpc.node.endpoint_pool.tunnel_connection_pool import TunnelConnectionPool
from web3pi_proxy.core.rpc.node.rpcendpoint.connection.connectiondescr import (
EndpointConnectionDescriptor,
EndpointConnectionDescriptor, ConnectionType,
)
from web3pi_proxy.core.rpc.node.rpcendpoint.connection.endpoint_connection_handler import (
EndpointConnectionHandler,
)
from web3pi_proxy.core.rpc.node.rpcendpoint.endpointimpl import RPCEndpoint
from web3pi_proxy.core.rpc.request.rpcrequest import RPCRequest
from web3pi_proxy.utils.logger import get_logger
from web3pi_proxy.config.conf import Config


class ConnectionPoolError(Exception):
Expand Down Expand Up @@ -135,6 +140,13 @@ def __get_active_pools(self):
if connection_pool.is_active()
]

def __get_open_pools(self):
return [
connection_pool
for connection_pool in self.pools.values()
if connection_pool.is_open()
]

def __damage_control(self):
while True:
self.__logger.debug("Running check on endpoint connections")
Expand All @@ -153,7 +165,10 @@ def add_pool(
f"Creating endpoint {name} with connection {conn_descr}"
)
endpoint = RPCEndpoint.create(name, conn_descr)
connection_pool = EndpointConnectionPool(endpoint)
if endpoint.conn_descr.connection_type == ConnectionType.TUNNEL:
connection_pool = TunnelConnectionPool(endpoint)
else:
connection_pool = EndpointConnectionPool(endpoint)
self.pools[name] = connection_pool
return endpoint

Expand Down
60 changes: 60 additions & 0 deletions web3pi_proxy/core/rpc/node/endpoint_pool/tunnel_connection_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import socket

from web3pi_proxy.config import Config
from web3pi_proxy.core.rpc.node.endpoint_pool.endpoint_connection_pool import EndpointConnectionPool
from web3pi_proxy.core.rpc.node.endpoint_pool.tunnel_connection_pool_intf import TunnelConnectionPoolIntf
from web3pi_proxy.core.rpc.node.endpoint_pool.tunnel_service import TunnelService
from web3pi_proxy.core.rpc.node.rpcendpoint.connection.endpointconnection import EndpointConnection
from web3pi_proxy.core.rpc.node.rpcendpoint.endpointimpl import RPCEndpoint

from web3pi_proxy.utils.logger import get_logger


class TunnelConnectionPool(EndpointConnectionPool, TunnelConnectionPoolIntf):

def __init__(
self,
endpoint: RPCEndpoint,
):
super().__init__(endpoint)
self.__logger = get_logger(f"TunnelConnectionPool.{id(self)}")

self.tunnel_api_key = endpoint.conn_descr.extras["tunnel_service_auth_key"]
self.tunnel_proxy_establish_port: int = endpoint.conn_descr.extras["tunnel_proxy_establish_port"]

tunnel_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tunnel_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tunnel_socket.bind((Config.PROXY_LISTEN_ADDRESS, self.tunnel_proxy_establish_port))
tunnel_socket.listen(Config.LISTEN_BACKLOG_PARAM)
self.tunnel_socket = tunnel_socket

self.tunnel_service_socket = None

self.status = self.PoolStatus.DISABLED.value

TunnelService.register(self.tunnel_api_key, self)

def new_connection(self) -> EndpointConnection:

def connection_factory() -> socket: # TODO is it worth to move it to object level and reuse?
self.__logger.debug("Creating socket")
self.tunnel_service_socket.sendall(b"NEWCONN")
new_conn_sock, new_conn_addr = self.tunnel_socket.accept()
self.__logger.debug("Finished connecting socket")
return new_conn_sock

return EndpointConnection(self.endpoint, connection_factory)

def new_tunnel_service_socket(self, tunnel_service_socket: socket):
with self._EndpointConnectionPool__lock:
self.tunnel_service_socket = tunnel_service_socket
self.status = self.PoolStatus.ACTIVE.value

def close(self) -> None:
super().close()
self.tunnel_socket.close()
self.tunnel_socket = None
if self.tunnel_service_socket:
self.tunnel_service_socket.close()
self.tunnel_service_socket = None
TunnelService.unregister(self.tunnel_api_key, self)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import socket


class TunnelConnectionPoolIntf:
"""Added to resolve the circular dependencies"""
tunnel_proxy_establish_port: int

def new_tunnel_service_socket(self, tunnel_service_socket: socket):
pass
69 changes: 69 additions & 0 deletions web3pi_proxy/core/rpc/node/endpoint_pool/tunnel_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import socket
import select
from threading import Lock, Thread

from web3pi_proxy.config import Config
from web3pi_proxy.core.rpc.node.endpoint_pool.tunnel_connection_pool_intf import TunnelConnectionPoolIntf
from web3pi_proxy.utils.logger import get_logger


class TunnelServiceImpl:
"""The service is lazy initialized at the first pool registration
to avoid resource allocation when there is not pool with a tunnel"""
def __init__(self):
self.__registry = dict()
self.__lock = Lock()
self.__initialized = False
self.__logger = get_logger(f"TunnelServiceImpl")

def register(self, api_key: str, tunnel_connection_pool: TunnelConnectionPoolIntf):
with self.__lock:
if not self.__initialized:
self.__initialize__()
self.__initialized = True
self.__registry[api_key] = tunnel_connection_pool

def unregister(self, api_key: str, tunnel_connection_pool: TunnelConnectionPoolIntf):
with self.__lock:
if self.__registry.get(api_key) == tunnel_connection_pool:
del self.__registry[api_key]

def __initialize__(self):
tunnel_srv_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tunnel_srv_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tunnel_srv_socket.bind((Config.PROXY_LISTEN_ADDRESS, Config.TUNNEL_ESTABLISH_PORT))
tunnel_srv_socket.listen(Config.LISTEN_BACKLOG_PARAM)
self.tunnel_srv_socket = tunnel_srv_socket
self.tunnel_thread = Thread(
target=self.__tunnel_service_run,
daemon=True,
)
self.tunnel_thread.start()

def __tunnel_service_run(self):
while True:
ready_read, _, _ = select.select([self.tunnel_srv_socket], [], [], Config.BLOCKING_ACCEPT_TIMEOUT)

if len(ready_read) == 0:
continue
tunnel_sock, cli_addr = self.tunnel_srv_socket.accept()
self.__logger.debug(
f"New tunnel request from {cli_addr}"
)

cli_api_key = tunnel_sock.recv(2048).decode("utf-8") # TODO the attack: a tunnel client does not send api key
if not cli_api_key:
tunnel_sock.close()
continue

with self.__lock:
pool: TunnelConnectionPoolIntf = self.__registry.get(cli_api_key)
if not pool:
tunnel_sock.close()
tunnel_sock.sendall(f'RJCT|invalid_auth_key'.encode("utf-8"))
continue
tunnel_sock.sendall(f'ACPT|{Config.PROXY_LISTEN_ADDRESS}:{pool.tunnel_proxy_establish_port}'.encode("utf-8"))
pool.new_tunnel_service_socket(tunnel_sock)


TunnelService = TunnelServiceImpl()
Loading