From 193755ad4d2a54b658a42422e3b4920b6f4e2bea Mon Sep 17 00:00:00 2001 From: Vasilchenko Igor Date: Thu, 2 Jul 2026 16:33:44 +0300 Subject: [PATCH] Add optional anonymous transaction broadcast over Tor (native P2P) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When node.tor.enabled is true, transactions submitted to this node's own broadcast API are validated locally as before but, instead of being advertised to the node's directly-connected sync peers (which exposes the origin node's IP), are relayed via the native TRON P2P protocol through the local Tor SOCKS5 proxy to standard, unmodified nodes learned via the (UDP) discovery layer that are NOT currently used as sync peers. Those nodes re-gossip the transaction as usual, hiding the origin. Block sync and normal P2P traffic are unaffected. The feature is off by default. Reaching a Tor peer and completing the P2P handshake costs several seconds, which is paid per connection, not per transaction. To broadcast quickly even at a high transaction rate, the service keeps a pool of persistent Tor tunnels to verified, at-head relay nodes: each tunnel is handshaked once, kept alive (answering keep-alive pings) and reused for every broadcast. The read timeout of an established tunnel is set well above the peer keep-alive interval so an idle but healthy tunnel is not torn down between pings. Transactions are buffered and flushed as a single TransactionsMessage batch (capped) over broadcastCount tunnels in parallel, so a burst is delivered in about one round-trip; broadcastTransaction enqueues and returns immediately. Expired transactions are dropped from the buffer. Each tunnel advertises a fresh random public IP (never our real one, and a fresh random node id per hello, so tunnels can't be clustered or linked to our clear-net node) and echoes the peer's head block so the peer marks the connection sync-complete before it will fetch a transaction. Relay candidates are discovered nodes minus current sync peers; only nodes at (or near) our head are kept. Resilience: no transaction is lost if Tor or a relay is down or degraded — it stays buffered and is delivered once a tunnel can be (re)built. A flush never blocks the pool on a silently-dead tunnel (writes are bounded; a stalled tunnel is dropped and rebuilt). Pool maintenance refills only the missing tunnels (plus a small margin) and backs off exponentially when Tor is congested, so it does not flood a struggling Tor with circuit requests. If no tunnel can be built for several rounds and node.tor.controlPort is set, the node signals NEWNYM over the Tor control port to rebuild circuits with fresh exit IPs. libp2p is used as a library only; it is not modified. - NodeConfig/Args/CommonParameter: parse and hold node.tor.* config (incl. optional controlPort/controlPassword for NEWNYM recovery) - TronNetService: expose discovery-table nodes (getTableNodes) - TorBroadcastService: persistent Tor tunnel pool + batched native-P2P relay, with bounded writes, adaptive refill/backoff, NEWNYM recovery, idle-safe read timeout, expired-tx eviction, and public-only advertised addresses - Wallet.broadcastTransaction: route through Tor when enabled - config.conf: documented node.tor block - Tests: delivery over a persistent tunnel via a SOCKS5 forwarder; Tor-outage and relay-degradation recovery; NEWNYM on a stuck pool; and routing (Tor path when enabled, normal P2P broadcast when disabled) --- .../common/parameter/CommonParameter.java | 35 +- .../org/tron/core/config/args/NodeConfig.java | 78 ++ .../src/main/java/org/tron/core/Wallet.java | 20 +- .../java/org/tron/core/config/args/Args.java | 11 + .../org/tron/core/net/TronNetService.java | 10 + .../net/service/tor/TorBroadcastService.java | 813 ++++++++++++++++++ framework/src/main/resources/config.conf | 21 + .../java/org/tron/core/WalletMockTest.java | 91 ++ .../service/tor/TorBroadcastServiceTest.java | 566 ++++++++++++ 9 files changed, 1643 insertions(+), 2 deletions(-) create mode 100644 framework/src/main/java/org/tron/core/net/service/tor/TorBroadcastService.java create mode 100644 framework/src/test/java/org/tron/core/net/service/tor/TorBroadcastServiceTest.java diff --git a/common/src/main/java/org/tron/common/parameter/CommonParameter.java b/common/src/main/java/org/tron/common/parameter/CommonParameter.java index 3fe1e878ffb..13393d96081 100644 --- a/common/src/main/java/org/tron/common/parameter/CommonParameter.java +++ b/common/src/main/java/org/tron/common/parameter/CommonParameter.java @@ -141,6 +141,37 @@ public class CommonParameter { @Getter @Setter public int minParticipationRate; + // -- Tor anonymous transaction broadcast -- + // When enabled, locally-originated transactions are sent through the Tor SOCKS5 proxy + // to the HTTP broadcast API of standard (unmodified) TRON nodes instead of being advertised + // to P2P peers, hiding the originating node's IP. + @Getter + @Setter + public boolean torBroadcastEnable = false; + @Getter + @Setter + public String torSocksHost = "127.0.0.1"; + @Getter + @Setter + public int torSocksPort = 9050; + @Getter + @Setter + public int torConnectTimeout = 30000; + @Getter + @Setter + public int torReadTimeout = 30000; + @Getter + @Setter + public int torBroadcastCount = 2; + @Getter + @Setter + public boolean torCircuitIsolation = true; + @Getter + @Setter + public int torControlPort = 0; + @Getter + @Setter + public String torControlPassword = ""; @Getter public P2pConfig p2pConfig; @Getter @@ -426,7 +457,9 @@ public class CommonParameter { // from clearParam(), consistent with mainnet.conf public List passiveNodes = new ArrayList<>(); @Getter - public List fastForwardNodes; // clearParam: new ArrayList<>() + // Default to an empty list (never null) so consumers that capture it at class-load time + // (e.g. PeerConnection's static relayNodes) can't capture a null before Args is initialised. + public List fastForwardNodes = new ArrayList<>(); @Getter public int maxFastForwardNum; // clearParam: 4 @Getter diff --git a/common/src/main/java/org/tron/core/config/args/NodeConfig.java b/common/src/main/java/org/tron/core/config/args/NodeConfig.java index ea9f26a06a0..80d31c67ac5 100644 --- a/common/src/main/java/org/tron/core/config/args/NodeConfig.java +++ b/common/src/main/java/org/tron/core/config/args/NodeConfig.java @@ -117,6 +117,73 @@ public class NodeConfig { private List fastForward = new ArrayList<>(); private List disabledApi = new ArrayList<>(); + // node.tor.* — anonymous outbound transaction broadcast over the Tor SOCKS5 proxy. + // Read manually in fromConfig() so an absent [node.tor] block keeps prior behaviour + // and no reference.conf defaults are required. + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private boolean torBroadcastEnable = false; + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private String torSocksHost = "127.0.0.1"; + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private int torSocksPort = 9050; + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private int torConnectTimeout = 30000; + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private int torReadTimeout = 30000; + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private int torBroadcastCount = 2; + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private boolean torCircuitIsolation = true; + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private int torControlPort = 0; + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private String torControlPassword = ""; + + public boolean isTorBroadcastEnable() { + return torBroadcastEnable; + } + + public String getTorSocksHost() { + return torSocksHost; + } + + public int getTorSocksPort() { + return torSocksPort; + } + + public int getTorConnectTimeout() { + return torConnectTimeout; + } + + public int getTorReadTimeout() { + return torReadTimeout; + } + + public int getTorBroadcastCount() { + return torBroadcastCount; + } + + public boolean isTorCircuitIsolation() { + return torCircuitIsolation; + } + + public int getTorControlPort() { + return torControlPort; + } + + public String getTorControlPassword() { + return torControlPassword; + } + // ---- Sub-object fields ---- private P2pConfig p2p = new P2pConfig(); private HttpConfig http = new HttpConfig(); @@ -394,6 +461,17 @@ public static NodeConfig fromConfig(Config config) { + "Please use [node.allowShieldedTransactionApi] instead."); } + // node.tor.* — read manually so an absent block is a no-op (feature stays off) + nc.torBroadcastEnable = getBool(section, "tor.enabled", false); + nc.torSocksHost = getString(section, "tor.socksHost", "127.0.0.1"); + nc.torSocksPort = getInt(section, "tor.socksPort", 9050); + nc.torConnectTimeout = getInt(section, "tor.connectTimeout", 30000); + nc.torReadTimeout = getInt(section, "tor.readTimeout", 30000); + nc.torBroadcastCount = getInt(section, "tor.broadcastCount", 2); + nc.torCircuitIsolation = getBool(section, "tor.circuitIsolation", true); + nc.torControlPort = getInt(section, "tor.controlPort", 0); + nc.torControlPassword = getString(section, "tor.controlPassword", ""); + // node.shutdown.* — PascalCase keys (BlockTime, BlockHeight), cannot auto-bind nc.shutdownBlockTime = config.hasPath("node.shutdown.BlockTime") ? config.getString("node.shutdown.BlockTime") : ""; diff --git a/framework/src/main/java/org/tron/core/Wallet.java b/framework/src/main/java/org/tron/core/Wallet.java index 0482643d8d0..67852d73b5f 100755 --- a/framework/src/main/java/org/tron/core/Wallet.java +++ b/framework/src/main/java/org/tron/core/Wallet.java @@ -189,6 +189,7 @@ import org.tron.core.net.TronNetDelegate; import org.tron.core.net.TronNetService; import org.tron.core.net.message.adv.TransactionMessage; +import org.tron.core.net.service.tor.TorBroadcastService; import org.tron.core.store.AccountIdIndexStore; import org.tron.core.store.AccountStore; import org.tron.core.store.AccountTraceStore; @@ -277,6 +278,8 @@ public class Wallet { @Autowired private TronNetService tronNetService; @Autowired + private TorBroadcastService torBroadcastService; + @Autowired private TronNetDelegate tronNetDelegate; @Autowired private Manager dbManager; @@ -556,9 +559,24 @@ public GrpcAPI.Return broadcastTransaction(Transaction signedTransaction) { if (trx.getInstance().getRawData().getContractCount() == 0) { throw new ContractValidateException(ActuatorConstant.CONTRACT_NOT_EXIST); } - TransactionMessage message = new TransactionMessage(trx.getInstance().toByteArray()); trx.checkExpiration(chainBaseManager.getNextBlockSlotTime()); dbManager.pushTransaction(trx); + + if (CommonParameter.getInstance().isTorBroadcastEnable()) { + // Anonymous path: the transaction is validated and kept locally, but instead of being + // advertised to P2P peers (which would expose this node's IP as the origin) it is relayed + // through the Tor SOCKS5 proxy to standard TRON nodes, which broadcast it to the network. + int relayed = torBroadcastService.broadcastTransaction(signedTransaction); + if (relayed == 0) { + return builder.setResult(false).setCode(response_code.NOT_ENOUGH_EFFECTIVE_CONNECTION) + .setMessage(ByteString.copyFromUtf8("Tor broadcast failed.")).build(); + } + logger.info("Broadcast transaction {} via Tor to {} relay nodes successfully.", + txID, relayed); + return builder.setResult(true).setCode(response_code.SUCCESS).build(); + } + + TransactionMessage message = new TransactionMessage(trx.getInstance().toByteArray()); int num = tronNetService.fastBroadcastTransaction(message); if (num == 0 && minEffectiveConnection != 0) { return builder.setResult(false).setCode(response_code.NOT_ENOUGH_EFFECTIVE_CONNECTION) diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index 2d6660f9a6a..f1a5a9bf0fb 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -640,6 +640,17 @@ private static void applyNodeConfig(NodeConfig nc) { PARAMETER.unsolidifiedBlockCheck = nc.isUnsolidifiedBlockCheck(); PARAMETER.maxUnsolidifiedBlocks = nc.getMaxUnsolidifiedBlocks(); + // ---- Tor anonymous transaction broadcast ---- + PARAMETER.torBroadcastEnable = nc.isTorBroadcastEnable(); + PARAMETER.torSocksHost = nc.getTorSocksHost(); + PARAMETER.torSocksPort = nc.getTorSocksPort(); + PARAMETER.torConnectTimeout = nc.getTorConnectTimeout(); + PARAMETER.torReadTimeout = nc.getTorReadTimeout(); + PARAMETER.torBroadcastCount = nc.getTorBroadcastCount(); + PARAMETER.torCircuitIsolation = nc.isTorCircuitIsolation(); + PARAMETER.torControlPort = nc.getTorControlPort(); + PARAMETER.torControlPassword = nc.getTorControlPassword(); + // disabledApi list — lowercase normalization PARAMETER.disabledApiList = nc.getDisabledApi().isEmpty() ? Collections.emptyList() diff --git a/framework/src/main/java/org/tron/core/net/TronNetService.java b/framework/src/main/java/org/tron/core/net/TronNetService.java index 8b97c8d9f4d..14ec7b053f3 100644 --- a/framework/src/main/java/org/tron/core/net/TronNetService.java +++ b/framework/src/main/java/org/tron/core/net/TronNetService.java @@ -31,6 +31,7 @@ import org.tron.core.net.service.sync.SyncService; import org.tron.p2p.P2pConfig; import org.tron.p2p.P2pService; +import org.tron.p2p.discover.Node; import org.tron.p2p.utils.NetUtil; @Slf4j(topic = "net") @@ -138,6 +139,15 @@ public int fastBroadcastTransaction(TransactionMessage msg) { return advService.fastBroadcastTransaction(msg); } + /** + * Nodes discovered via the (UDP) discovery layer. Only a subset of these is actually connected + * as sync peers; the rest are known-but-unconnected and are used as anonymous Tor broadcast + * targets (see TorBroadcastService). + */ + public List getTableNodes() { + return p2pService.getTableNodes(); + } + public static boolean hasIpv4Stack(Set ipSet) { for (String ip : ipSet) { InetAddress inetAddress; diff --git a/framework/src/main/java/org/tron/core/net/service/tor/TorBroadcastService.java b/framework/src/main/java/org/tron/core/net/service/tor/TorBroadcastService.java new file mode 100644 index 00000000000..18d82e27972 --- /dev/null +++ b/framework/src/main/java/org/tron/core/net/service/tor/TorBroadcastService.java @@ -0,0 +1,813 @@ +package org.tron.core.net.service.tor; + +import com.google.protobuf.ByteString; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.tron.common.math.StrictMathWrapper; +import org.tron.common.parameter.CommonParameter; +import org.tron.common.utils.Sha256Hash; +import org.tron.core.capsule.TransactionCapsule; +import org.tron.core.net.TronNetDelegate; +import org.tron.core.net.TronNetService; +import org.tron.core.net.message.MessageTypes; +import org.tron.core.net.message.adv.FetchInvDataMessage; +import org.tron.core.net.message.adv.InventoryMessage; +import org.tron.core.net.message.adv.TransactionsMessage; +import org.tron.core.net.peer.PeerConnection; +import org.tron.p2p.base.Parameter; +import org.tron.p2p.connection.business.upgrade.UpgradeController; +import org.tron.p2p.connection.message.MessageType; +import org.tron.p2p.connection.message.keepalive.PongMessage; +import org.tron.p2p.discover.Node; +import org.tron.p2p.protos.Connect; +import org.tron.p2p.utils.NetUtil; +import org.tron.protos.Protocol; +import org.tron.protos.Protocol.Inventory.InventoryType; +import org.tron.protos.Protocol.Transaction; + +/** + * Broadcasts locally-originated transactions anonymously over Tor, using the native TRON P2P + * protocol so the receiving nodes are standard, unmodified TRON full nodes. + * + *

When {@code node.tor.enabled} is true, a transaction submitted to this node's own broadcast + * API is validated locally as usual but, instead of being advertised to the node's directly + * connected sync peers (which would expose its IP as the origin), it is relayed through the local + * Tor SOCKS5 proxy to standard nodes learned via discovery that are NOT current sync peers, hiding + * the origin. Block sync and normal P2P traffic keep running in the clear. + * + *

Reaching a Tor peer and completing the P2P handshake costs several seconds, which is paid per + * connection, not per transaction. To make broadcasting fast even at a high transaction rate the + * service keeps a pool of persistent Tor tunnels to verified, at-head relay nodes: each + * tunnel is handshaked once, kept alive (answering keep-alive pings), reused for every broadcast, + * and rebuilt only when it breaks. Transactions are buffered and flushed as a single + * {@code TransactionsMessage} batch over the tunnels, so a burst of transactions is delivered in + * roughly one round-trip. {@code broadcastTransaction} enqueues and returns immediately. + */ +@Slf4j(topic = "net") +@Component +public class TorBroadcastService { + + private static final byte HELLO_TYPE = MessageType.HANDSHAKE_HELLO.getType(); + private static final byte PING_TYPE = MessageType.KEEP_ALIVE_PING.getType(); + private static final int SOCKS_VERSION = 0x05; + private static final int SOCKS_CMD_CONNECT = 0x01; + private static final int SOCKS_AUTH_NONE = 0x00; + private static final int SOCKS_AUTH_USERPASS = 0x02; + + private static final int POOL_SIZE = 6; + private static final int MAINTAIN_INTERVAL_MS = 3000; + private static final int FLUSH_INTERVAL_MS = 150; + private static final int MAX_BATCH = 300; + // How many connect attempts to launch per maintenance round: just enough to refill the missing + // tunnels plus a small margin for the ones that fail over Tor — NOT a fixed large batch, so we + // don't build (and immediately close) far more circuits than needed. + private static final int CONNECT_MARGIN = 2; + // When a round can't refill the pool (Tor congested/down), skip up to 2^level rounds before the + // next attempt, so we back off instead of flooding a struggling Tor with circuit requests. + private static final int MAX_BACKOFF_LEVEL = 4; + // A relay is only kept if its advertised head is within this many blocks of ours, so it is a + // well-connected node that actually propagates the transaction rather than sitting on it. + private static final long HEAD_TOLERANCE = 200; + + @Autowired + private TronNetService tronNetService; + @Autowired + private TronNetDelegate tronNetDelegate; + + private final ExecutorService connectExecutor = Executors.newCachedThreadPool(r -> { + Thread t = new Thread(r, "tor-relay-connect"); + t.setDaemon(true); + return t; + }); + private volatile ScheduledExecutorService scheduler; + + private final List channels = new CopyOnWriteArrayList<>(); + private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); + private final Set buffered = ConcurrentHashMap.newKeySet(); + + // If the pool stays empty for this many maintenance rounds, ask Tor for fresh circuits. + private static final int NEWNYM_AFTER_ROUNDS = 3; + private static final long NEWNYM_MIN_INTERVAL_MS = 15000; + private int emptyMaintainRounds = 0; + private long lastNewnymMs = 0; + // Exponential backoff so a struggling Tor isn't flooded with connect attempts. + private int backoffLevel = 0; + private int skipsRemaining = 0; + + /** + * Enqueue a transaction for anonymous relay over Tor and return immediately. Delivery happens + * asynchronously over the persistent tunnel pool. + * + * @return the number of relays the transaction will be pushed to (0 only if it cannot run) + */ + public int broadcastTransaction(Transaction transaction) { + ensureStarted(); + Sha256Hash txId = new TransactionCapsule(transaction).getTransactionId(); + if (buffered.add(txId)) { + buffer.add(transaction); + } + // Async: report the intended relay count so the caller sees success; the flusher delivers it. + return StrictMathWrapper.max(1, CommonParameter.getInstance().getTorBroadcastCount()); + } + + /** Stop the background maintenance/flush tasks and close all tunnels. */ + @PreDestroy + public void shutdown() { + ScheduledExecutorService svc = scheduler; + if (svc != null) { + svc.shutdownNow(); + } + connectExecutor.shutdownNow(); + for (RelayChannel c : channels) { + c.close(); + } + channels.clear(); + buffer.clear(); + buffered.clear(); + } + + private void ensureStarted() { + if (scheduler != null) { + return; + } + synchronized (this) { + if (scheduler != null) { + return; + } + ScheduledExecutorService svc = Executors.newScheduledThreadPool(2, r -> { + Thread t = new Thread(r, "tor-relay-pool"); + t.setDaemon(true); + return t; + }); + svc.scheduleWithFixedDelay(this::maintainChannels, 0, MAINTAIN_INTERVAL_MS, + TimeUnit.MILLISECONDS); + svc.scheduleWithFixedDelay(this::flush, FLUSH_INTERVAL_MS, FLUSH_INTERVAL_MS, + TimeUnit.MILLISECONDS); + scheduler = svc; + } + } + + // ---- Pool maintenance: keep POOL_SIZE persistent tunnels to at-head, non-sync relays ---------- + + private void maintainChannels() { + try { + CommonParameter parameter = CommonParameter.getInstance(); + if (!parameter.isTorBroadcastEnable()) { + return; + } + channels.removeIf(c -> { + if (!c.alive) { + c.close(); + return true; + } + return false; + }); + int need = POOL_SIZE - channels.size(); + if (need <= 0) { + emptyMaintainRounds = 0; + backoffLevel = 0; + skipsRemaining = 0; + return; + } + // Back off: after failed rounds, skip a growing number of ticks so a congested/down Tor is + // not flooded with connect attempts (which only clog its circuit-build queue further). + if (skipsRemaining > 0) { + skipsRemaining--; + return; + } + Set exclude = activeSyncPeerAddresses(); + for (RelayChannel c : channels) { + exclude.add(c.address); + } + List candidates = new ArrayList<>(); + for (Node node : discoveredNonSyncNodes(activeSyncPeerAddresses())) { + InetSocketAddress a = node.getPreferInetSocketAddress(); + if (a != null && a.getAddress() != null && !exclude.contains(a.getAddress())) { + candidates.add(node); + } + } + Collections.shuffle(candidates, ThreadLocalRandom.current()); + + // Launch only enough attempts to refill the missing tunnels (plus a small margin), and keep + // every one that succeeds — never build extra circuits just to close them. + int attempts = StrictMathWrapper.min(need + CONNECT_MARGIN, candidates.size()); + List> futures = new ArrayList<>(); + for (int i = 0; i < attempts; i++) { + Node node = candidates.get(i); + futures.add(connectExecutor.submit(() -> connectChannel(node, parameter))); + } + int added = 0; + for (Future f : futures) { + try { + RelayChannel c = f.get(parameter.getTorReadTimeout() + 3000L, TimeUnit.MILLISECONDS); + if (c != null) { + channels.add(c); + added++; + } + } catch (Exception ignored) { + // connect failed / timed out + } + } + if (added > 0) { + logger.info("Tor relay pool: {} persistent tunnel(s) open.", channels.size()); + } + if (added >= need) { + // pool refilled: clear the backoff + backoffLevel = 0; + skipsRemaining = 0; + emptyMaintainRounds = 0; + } else { + // couldn't refill: grow the backoff (skip 1, 3, 7, 15 ... of the next rounds) + backoffLevel = StrictMathWrapper.min(backoffLevel + 1, MAX_BACKOFF_LEVEL); + skipsRemaining = (1 << backoffLevel) - 1; + // If no tunnel exists at all for several attempts, Tor's circuits/exits may be throttled; + // ask Tor for a fresh identity (new exit IPs). This is our lever against a systemic outage. + if (channels.isEmpty() && ++emptyMaintainRounds >= NEWNYM_AFTER_ROUNDS) { + emptyMaintainRounds = 0; + signalNewnym(parameter); + } + } + } catch (Throwable t) { + // Never let the scheduled maintainer die: a thrown Error would stop it being rescheduled. + logger.warn("Tor relay pool maintenance failed: {}", t.toString()); + } + } + + // Ask the Tor daemon (via its ControlPort) to build fresh circuits with new exit relays, so a + // node whose current exit IPs are throttled can recover without a manual Tor restart. No-op if + // no ControlPort is configured. Rate-limited, as Tor itself throttles NEWNYM. + private void signalNewnym(CommonParameter parameter) { + int controlPort = parameter.getTorControlPort(); + if (controlPort <= 0) { + return; + } + long now = System.currentTimeMillis(); + if (now - lastNewnymMs < NEWNYM_MIN_INTERVAL_MS) { + return; + } + lastNewnymMs = now; + try (Socket socket = new Socket()) { + socket.connect(new InetSocketAddress(parameter.getTorSocksHost(), controlPort), + parameter.getTorConnectTimeout()); + socket.setSoTimeout(parameter.getTorReadTimeout()); + OutputStream out = socket.getOutputStream(); + InputStream in = socket.getInputStream(); + out.write(("AUTHENTICATE \"" + parameter.getTorControlPassword() + "\"\r\n") + .getBytes(StandardCharsets.US_ASCII)); + out.flush(); + String authReply = readControlLine(in); + if (authReply == null || !authReply.startsWith("250")) { + logger.warn("Tor ControlPort authentication failed: {}", authReply); + return; + } + out.write("SIGNAL NEWNYM\r\n".getBytes(StandardCharsets.US_ASCII)); + out.flush(); + String signalReply = readControlLine(in); + if (signalReply != null && signalReply.startsWith("250")) { + logger.info("Tor relay pool empty; signalled NEWNYM to rebuild circuits (new exit IPs)."); + } else { + logger.warn("Tor NEWNYM signal was rejected: {}", signalReply); + } + } catch (Exception e) { + logger.warn("Tor NEWNYM signal failed: {}", e.getMessage()); + } + } + + private static String readControlLine(InputStream in) throws IOException { + StringBuilder sb = new StringBuilder(); + int c; + while ((c = in.read()) != -1) { + if (c == '\n') { + break; + } + if (c != '\r') { + sb.append((char) c); + } + } + return sb.length() == 0 ? null : sb.toString(); + } + + private RelayChannel connectChannel(Node node, CommonParameter parameter) { + InetSocketAddress dest = node.getPreferInetSocketAddress(); + if (dest == null || dest.getAddress() == null) { + return null; + } + Socket socket = null; + try { + socket = openTunnel(dest, "relay-" + ThreadLocalRandom.current().nextLong(), parameter); + RelayChannel channel = new RelayChannel(dest.getAddress(), socket, parameter); + if (!channel.establish()) { + channel.close(); + return null; + } + return channel; + } catch (Exception e) { + closeQuietly(socket); + return null; + } + } + + // ---- Flush: deliver the buffered transactions as one batch over the tunnels ------------------- + + private void flush() { + try { + if (buffer.isEmpty()) { + return; + } + dropExpired(); + if (buffer.isEmpty()) { + return; + } + List alive = new ArrayList<>(); + for (RelayChannel c : channels) { + if (c.alive) { + alive.add(c); + } + } + if (alive.isEmpty()) { + return; // no tunnels yet; keep the transactions buffered until the pool is ready + } + List batch = new ArrayList<>(); + Transaction tx; + while (batch.size() < MAX_BATCH && (tx = buffer.poll()) != null) { + batch.add(tx); + } + if (batch.isEmpty()) { + return; + } + CommonParameter parameter = CommonParameter.getInstance(); + int target = StrictMathWrapper.max(1, + StrictMathWrapper.min(parameter.getTorBroadcastCount(), alive.size())); + List chosen = alive.subList(0, target); + // A socket write has no timeout of its own (setSoTimeout only affects reads), so a tunnel + // whose Tor circuit has died silently would block the advertise() call — and thus the whole + // flusher — forever. Run each advertise on the connect pool and bound it: if it stalls, close + // the tunnel (which unblocks the write) so the maintainer rebuilds it, and keep flushing. + long writeTimeout = StrictMathWrapper.max(2000L, parameter.getTorReadTimeout()); + List> futures = new ArrayList<>(); + for (RelayChannel channel : chosen) { + futures.add(connectExecutor.submit(() -> channel.advertise(batch))); + } + int delivered = 0; + for (int i = 0; i < futures.size(); i++) { + try { + if (Boolean.TRUE.equals(futures.get(i).get(writeTimeout, TimeUnit.MILLISECONDS))) { + delivered++; + } + } catch (Exception e) { + futures.get(i).cancel(true); + chosen.get(i).close(); // stalled/broken tunnel: drop it, the maintainer will replace it + } + } + if (delivered == 0) { + // every chosen tunnel failed to write; re-buffer for the next flush (no transaction lost) + for (Transaction t : batch) { + buffer.add(t); + } + } else { + for (Transaction t : batch) { + buffered.remove(new TransactionCapsule(t).getTransactionId()); + } + logger.info("Tor broadcast: flushed {} transaction(s) to {} relay tunnel(s).", + batch.size(), delivered); + } + } catch (Throwable t) { + // Never let the scheduled flusher die: a thrown Error would stop it being rescheduled. + logger.warn("Tor broadcast flush failed: {}", t.toString()); + } + } + + // Drop transactions whose expiration has passed: they can no longer be included even if relayed, + // so keeping them wastes tunnel writes and, during a long Tor outage, would grow the buffer + // without bound. Removing them here also frees the dedup set. + private void dropExpired() { + long now = System.currentTimeMillis(); + int dropped = 0; + Iterator it = buffer.iterator(); + while (it.hasNext()) { + Transaction tx = it.next(); + long expiration = tx.getRawData().getExpiration(); + if (expiration > 0 && expiration < now) { + it.remove(); + buffered.remove(new TransactionCapsule(tx).getTransactionId()); + dropped++; + } + } + if (dropped > 0) { + logger.warn("Tor broadcast: dropped {} expired transaction(s) not relayed before expiry.", + dropped); + } + } + + private List discoveredNonSyncNodes(Set syncPeers) { + List result = new ArrayList<>(); + for (Node node : tronNetService.getTableNodes()) { + InetSocketAddress address = node.getPreferInetSocketAddress(); + if (address != null && address.getAddress() != null + && !syncPeers.contains(address.getAddress())) { + result.add(node); + } + } + return result; + } + + private Set activeSyncPeerAddresses() { + Set set = new HashSet<>(); + for (PeerConnection peer : tronNetDelegate.getActivePeer()) { + InetAddress address = peer.getInetAddress(); + if (address != null) { + set.add(address); + } + } + return set; + } + + private long ourHeadNum() { + try { + return tronNetDelegate.getHeadBlockId().getNum(); + } catch (Exception e) { + return -1; // unknown (e.g. in tests) -> skip the at-head filter + } + } + + // ---- A persistent, handshaked Tor tunnel to one relay node ----------------------------------- + + private final class RelayChannel { + + private final InetAddress address; + private final Socket socket; + private final InputStream in; + private final OutputStream out; + private final CommonParameter parameter; + private int peerVersion; + private volatile boolean alive = true; + private final Map pendingFetch = new ConcurrentHashMap<>(); + private Thread reader; + + RelayChannel(InetAddress address, Socket socket, CommonParameter parameter) throws IOException { + this.address = address; + this.socket = socket; + this.parameter = parameter; + this.in = socket.getInputStream(); + this.out = socket.getOutputStream(); + } + + // Complete the transport + application handshake and start the keep-alive/serve reader. + boolean establish() throws Exception { + int networkId = parameter.getNodeP2pVersion(); + peerVersion = transportHandshake(in, out, networkId); + Protocol.HelloMessage peerAppHello = readAppHello(in, peerVersion); + if (peerAppHello == null) { + return false; + } + long ourHead = ourHeadNum(); + if (ourHead >= 0 && peerAppHello.getHeadBlockId().getNumber() < ourHead - HEAD_TOLERANCE) { + return false; // lagging relay, poor propagator + } + // Echo the peer's genesis/solid/head so it marks the connection sync-complete (needed before + // it will fetch a transaction inventory from us). + org.tron.protos.Discover.Endpoint appFrom = org.tron.protos.Discover.Endpoint.newBuilder() + .setNodeId(ByteString.copyFrom(NetUtil.getNodeId())) + .setAddress(ByteString.copyFrom(randomAnonIp().getBytes(StandardCharsets.UTF_8))) + .setPort(18888) + .build(); + Protocol.HelloMessage appHello = Protocol.HelloMessage.newBuilder() + .setFrom(appFrom) + .setVersion(networkId) + .setTimestamp(System.currentTimeMillis()) + .setGenesisBlockId(peerAppHello.getGenesisBlockId()) + .setSolidBlockId(peerAppHello.getSolidBlockId()) + .setHeadBlockId(peerAppHello.getHeadBlockId()) + .setNodeType(0) + .setLowestBlockNum(0) + .build(); + sendApp(prependType(MessageTypes.P2P_HELLO.asByte(), appHello.toByteArray())); + // Handshake done. The peer keep-alive-pings only every KEEP_ALIVE_TIMEOUT (20s), so relax the + // read timeout well above that (plus Tor jitter); otherwise an idle but healthy tunnel would + // time out between pings and be dropped, churning the pool and flooding Tor with reconnects. + int minIdle = 3 * Parameter.KEEP_ALIVE_TIMEOUT; + socket.setSoTimeout(StrictMathWrapper.max(parameter.getTorReadTimeout(), minIdle)); + reader = new Thread(this::readLoop, "tor-relay-reader"); + reader.setDaemon(true); + reader.start(); + return true; + } + + private void readLoop() { + try { + while (alive) { + byte[] decoded = UpgradeController.decodeReceiveData(peerVersion, readFrame(in)); + if (decoded.length == 0) { + continue; + } + byte type = decoded[0]; + if (type == PING_TYPE) { + sendApp(new PongMessage().getSendData()); // keep the tunnel alive + } else if (type == MessageTypes.FETCH_INV_DATA.asByte()) { + FetchInvDataMessage fetch = + new FetchInvDataMessage(subarray(decoded, 1, decoded.length)); + List serve = new ArrayList<>(); + for (Sha256Hash hash : fetch.getHashList()) { + Transaction t = pendingFetch.remove(hash); + if (t != null) { + serve.add(t); + } + } + if (!serve.isEmpty()) { + sendApp(new TransactionsMessage(serve).getSendBytes()); + } + } + // ignore everything else (the peer's own INV, status, sync requests, ...) + } + } catch (Exception e) { + // connection closed or broken; the maintainer will rebuild it + } finally { + alive = false; + closeQuietly(socket); + } + } + + // Advertise a batch of transactions; the reader serves them when the peer sends FetchInvData. + boolean advertise(List batch) { + try { + List ids = new ArrayList<>(batch.size()); + for (Transaction t : batch) { + Sha256Hash id = new TransactionCapsule(t).getTransactionId(); + pendingFetch.put(id, t); + ids.add(id); + } + sendApp(new InventoryMessage(ids, InventoryType.TRX).getSendBytes()); + return true; + } catch (Exception e) { + alive = false; + return false; + } + } + + // Serialize writes: the reader thread (pongs, fetch replies) and the flusher (INV) share out. + private synchronized void sendApp(byte[] appMessageBytes) throws IOException { + writeFrame(out, UpgradeController.codeSendData(peerVersion, appMessageBytes)); + } + + void close() { + alive = false; + closeQuietly(socket); + } + } + + // ---- Tor SOCKS5 tunnel + native TRON P2P handshake helpers ----------------------------------- + + private Socket openTunnel(InetSocketAddress dest, String isoUser, CommonParameter parameter) + throws IOException { + Socket socket = new Socket(); + socket.connect( + new InetSocketAddress(parameter.getTorSocksHost(), parameter.getTorSocksPort()), + parameter.getTorConnectTimeout()); + socket.setSoTimeout(parameter.getTorReadTimeout()); + socksHandshake(socket.getInputStream(), socket.getOutputStream(), dest, + parameter.isTorCircuitIsolation(), isoUser); + return socket; + } + + private void socksHandshake(InputStream in, OutputStream out, InetSocketAddress dest, + boolean isolation, String isoUser) throws IOException { + if (isolation) { + out.write(new byte[] {SOCKS_VERSION, 1, SOCKS_AUTH_USERPASS}); + } else { + out.write(new byte[] {SOCKS_VERSION, 1, SOCKS_AUTH_NONE}); + } + out.flush(); + byte[] methodReply = readFully(in, 2); + int method = methodReply[1] & 0xff; + + if (isolation) { + if (method != SOCKS_AUTH_USERPASS) { + throw new IOException("SOCKS proxy rejected user/pass auth (circuit isolation)"); + } + byte[] user = isoUser.getBytes(StandardCharsets.US_ASCII); + byte[] pass = {0}; + out.write(0x01); + out.write(user.length); + out.write(user); + out.write(pass.length); + out.write(pass); + out.flush(); + byte[] authReply = readFully(in, 2); + if ((authReply[1] & 0xff) != 0x00) { + throw new IOException("SOCKS proxy auth failed"); + } + } else if (method != SOCKS_AUTH_NONE) { + throw new IOException("SOCKS proxy requires authentication"); + } + + byte[] addr = dest.getAddress().getAddress(); + int atyp = addr.length == 4 ? 0x01 : 0x04; + int port = dest.getPort(); + out.write(SOCKS_VERSION); + out.write(SOCKS_CMD_CONNECT); + out.write(0x00); + out.write(atyp); + out.write(addr); + out.write((port >> 8) & 0xff); + out.write(port & 0xff); + out.flush(); + + byte[] head = readFully(in, 4); + if ((head[1] & 0xff) != 0x00) { + throw new IOException("SOCKS connect to " + dest + " failed, code=" + (head[1] & 0xff)); + } + int boundLen = (head[3] & 0xff) == 0x01 ? 4 : (head[3] & 0xff) == 0x04 ? 16 : 0; + readFully(in, boundLen + 2); // skip BND.ADDR + BND.PORT + } + + // libp2p transport Hello handshake; returns the peer's protocol version. Advertises a random, + // non-identifying public IP (never our real one) so tunnels can't be clustered by origin address. + private int transportHandshake(InputStream in, OutputStream out, int networkId) throws Exception { + org.tron.p2p.protos.Discover.Endpoint from = org.tron.p2p.protos.Discover.Endpoint.newBuilder() + .setNodeId(ByteString.copyFrom(NetUtil.getNodeId())) + .setAddress(ByteString.copyFrom(randomAnonIp().getBytes(StandardCharsets.UTF_8))) + .setPort(18888) + .build(); + Connect.HelloMessage hello = Connect.HelloMessage.newBuilder() + .setFrom(from) + .setNetworkId(networkId) + .setCode(0) + .setVersion(Parameter.version) + .setTimestamp(System.currentTimeMillis()) + .build(); + writeFrame(out, prependType(HELLO_TYPE, hello.toByteArray())); + + byte[] peerFrame = readFrame(in); + if (peerFrame.length == 0 || peerFrame[0] != HELLO_TYPE) { + throw new IOException("expected Hello from peer"); + } + Connect.HelloMessage peerHello = + Connect.HelloMessage.parseFrom(subarray(peerFrame, 1, peerFrame.length)); + if (peerHello.getCode() != 0 || peerHello.getNetworkId() != networkId) { + throw new IOException("peer rejected handshake (code=" + peerHello.getCode() + + ", networkId=" + peerHello.getNetworkId() + ")"); + } + return peerHello.getVersion(); + } + + // Read frames until the peer's application-level P2P_HELLO arrives, returning it (or null). + private Protocol.HelloMessage readAppHello(InputStream in, int peerVersion) throws Exception { + long deadline = System.currentTimeMillis() + Parameter.KEEP_ALIVE_TIMEOUT; + while (System.currentTimeMillis() < deadline) { + byte[] decoded = UpgradeController.decodeReceiveData(peerVersion, readFrame(in)); + if (decoded.length > 0 && decoded[0] == MessageTypes.P2P_HELLO.asByte()) { + return Protocol.HelloMessage.parseFrom(subarray(decoded, 1, decoded.length)); + } + } + return null; + } + + // ---- framing helpers (protobuf varint32 length prefix, matching libp2p's framing) ------------ + + private static void writeFrame(OutputStream out, byte[] payload) throws IOException { + writeRawVarint32(out, payload.length); + out.write(payload); + out.flush(); + } + + private static byte[] readFrame(InputStream in) throws IOException { + int len = readRawVarint32(in); + if (len < 0 || len > Parameter.MAX_MESSAGE_LENGTH) { + throw new IOException("invalid frame length: " + len); + } + return readFully(in, len); + } + + private static void writeRawVarint32(OutputStream out, int value) throws IOException { + while (true) { + if ((value & ~0x7f) == 0) { + out.write(value); + return; + } + out.write((value & 0x7f) | 0x80); + value >>>= 7; + } + } + + private static int readRawVarint32(InputStream in) throws IOException { + int result = 0; + int shift = 0; + while (shift < 32) { + int b = in.read(); + if (b == -1) { + throw new EOFException("stream closed while reading frame length"); + } + result |= (b & 0x7f) << shift; + if ((b & 0x80) == 0) { + return result; + } + shift += 7; + } + throw new IOException("varint32 too long"); + } + + private static byte[] readFully(InputStream in, int len) throws IOException { + byte[] buf = new byte[len]; + int off = 0; + while (off < len) { + int n = in.read(buf, off, len - off); + if (n == -1) { + throw new EOFException("stream closed while reading " + len + " bytes"); + } + off += n; + } + return buf; + } + + private static byte[] prependType(byte type, byte[] data) { + byte[] out = new byte[data.length + 1]; + out[0] = type; + System.arraycopy(data, 0, out, 1, data.length); + return out; + } + + private static byte[] subarray(byte[] data, int from, int to) { + byte[] out = new byte[to - from]; + System.arraycopy(data, from, out, 0, to - from); + return out; + } + + // A random, well-formed, non-multicast public IPv4 (never our real IP). Passes the peer's + // NetUtil.validNode check and, being fresh per tunnel, is not a stable clustering handle. + private static String randomAnonIp() { + ThreadLocalRandom r = ThreadLocalRandom.current(); + while (true) { + int a = r.nextInt(1, 224); // 1..223: skip 0.x and multicast/reserved 224+ + int b = r.nextInt(0, 256); + int c = r.nextInt(0, 256); + int d = r.nextInt(1, 255); // 1..254: skip .0 network and .255 broadcast + if (isPublicIpv4(a, b, c, d)) { + return a + "." + b + "." + c + "." + d; + } + } + } + + // True only for a globally-routable public IPv4 (excludes private, loopback, link-local, CGNAT, + // documentation, benchmarking and other reserved ranges). We advertise such an address so peers + // accept it (NetUtil.validNode) while it still reveals nothing about us. + private static boolean isPublicIpv4(int a, int b, int c, int d) { + if (a == 10 || a == 127) { + return false; // private 10/8, loopback 127/8 + } + if (a == 100 && b >= 64 && b <= 127) { + return false; // CGNAT 100.64/10 + } + if (a == 169 && b == 254) { + return false; // link-local 169.254/16 + } + if (a == 172 && b >= 16 && b <= 31) { + return false; // private 172.16/12 + } + if (a == 192 && (b == 0 || b == 88 || b == 168)) { + return false; // 192.0.0/24, 192.0.2/24, 192.88.99/24 (6to4), 192.168/16 + } + if (a == 198 && (b == 18 || b == 19 || b == 51)) { + return false; // benchmarking 198.18/15, doc 198.51.100/24 + } + if (a == 203 && b == 0 && c == 113) { + return false; // documentation 203.0.113/24 + } + return true; + } + + private static void closeQuietly(Socket socket) { + if (socket != null) { + try { + socket.close(); + } catch (IOException ignored) { + // ignore + } + } + } +} diff --git a/framework/src/main/resources/config.conf b/framework/src/main/resources/config.conf index d00f334f4ce..1aac2114e05 100644 --- a/framework/src/main/resources/config.conf +++ b/framework/src/main/resources/config.conf @@ -198,6 +198,27 @@ node { isOpenFullTcpDisconnect = false inactiveThreshold = 600 //seconds + # Anonymous outbound transaction broadcast over Tor. + # When enabled, transactions submitted to THIS node (via its own broadcast API) are validated + # locally and then, instead of being advertised to directly-connected P2P peers (which exposes + # this node's IP as the origin), are pushed via the native P2P protocol (Hello -> INV -> fetch + # -> Transaction) through the local Tor SOCKS5 proxy to a random subset of nodes learned via the + # normal (UDP) discovery layer that are NOT currently used as sync peers. Those nodes are stock, + # unmodified TRON nodes and re-gossip the transaction as usual. Block sync and normal P2P + # traffic are unaffected. The feature is off by default. + tor { + enabled = false + socksHost = "127.0.0.1" + socksPort = 9050 # default Tor SOCKS5 port + connectTimeout = 30000 # ms; Tor circuits are slow + readTimeout = 30000 # ms + broadcastCount = 2 # number of discovered non-sync nodes to push each tx to + circuitIsolation = true # use a distinct Tor circuit per connection (SOCKS auth isolation) + controlPort = 0 # Tor ControlPort; if > 0, the node signals NEWNYM to rebuild + # circuits (fresh exit IPs) when no relay tunnel can be established + controlPassword = "" # password for the Tor ControlPort (HashedControlPassword in torrc) + } + p2p { version = 11111 # Mainnet:11111; Nile:201910292; Shasta:1 } diff --git a/framework/src/test/java/org/tron/core/WalletMockTest.java b/framework/src/test/java/org/tron/core/WalletMockTest.java index 3e0c1a4461d..7b303089dc6 100644 --- a/framework/src/test/java/org/tron/core/WalletMockTest.java +++ b/framework/src/test/java/org/tron/core/WalletMockTest.java @@ -256,6 +256,97 @@ public void testBroadcastTransactionTooManyPending() throws Exception { assertEquals(GrpcAPI.Return.response_code.SERVER_BUSY, ret.getCode()); } + @Test + public void testBroadcastTransactionTorDisabledUsesP2p() throws Exception { + CommonParameter.getInstance().setTorBroadcastEnable(false); + org.tron.core.net.TronNetService netSvc = + mock(org.tron.core.net.TronNetService.class); + org.tron.core.net.service.tor.TorBroadcastService torSvc = + mock(org.tron.core.net.service.tor.TorBroadcastService.class); + when(netSvc.fastBroadcastTransaction(any())).thenReturn(2); + Wallet wallet = routingWallet(netSvc, torSvc); + + GrpcAPI.Return ret = wallet.broadcastTransaction(routingTransaction()); + + assertEquals(GrpcAPI.Return.response_code.SUCCESS, ret.getCode()); + Mockito.verify(netSvc, Mockito.times(1)).fastBroadcastTransaction(any()); + Mockito.verify(torSvc, Mockito.never()).broadcastTransaction(any()); + } + + @Test + public void testBroadcastTransactionTorEnabledUsesTor() throws Exception { + CommonParameter.getInstance().setTorBroadcastEnable(true); + try { + org.tron.core.net.TronNetService netSvc = + mock(org.tron.core.net.TronNetService.class); + org.tron.core.net.service.tor.TorBroadcastService torSvc = + mock(org.tron.core.net.service.tor.TorBroadcastService.class); + when(torSvc.broadcastTransaction(any())).thenReturn(2); + Wallet wallet = routingWallet(netSvc, torSvc); + + GrpcAPI.Return ret = wallet.broadcastTransaction(routingTransaction()); + + assertEquals(GrpcAPI.Return.response_code.SUCCESS, ret.getCode()); + Mockito.verify(torSvc, Mockito.times(1)).broadcastTransaction(any()); + Mockito.verify(netSvc, Mockito.never()).fastBroadcastTransaction(any()); + } finally { + CommonParameter.getInstance().setTorBroadcastEnable(false); + } + } + + // Wires a Wallet whose pre-broadcast checks all pass, so broadcastTransaction reaches the + // Tor-vs-P2P routing branch; the two broadcast services are mocks the caller stubs and verifies. + private Wallet routingWallet(org.tron.core.net.TronNetService netSvc, + org.tron.core.net.service.tor.TorBroadcastService torSvc) throws Exception { + Wallet wallet = new Wallet(); + TronNetDelegate netDelegate = mock(TronNetDelegate.class); + when(netDelegate.isBlockUnsolidified()).thenReturn(false); + Manager dbManager = mock(Manager.class); + when(dbManager.isTooManyPending()).thenReturn(false); + org.tron.core.ChainBaseManager cbm = mock(org.tron.core.ChainBaseManager.class); + DynamicPropertiesStore dps = mock(DynamicPropertiesStore.class); + when(cbm.getDynamicPropertiesStore()).thenReturn(dps); + when(dps.supportVM()).thenReturn(false); + when(dps.getAllowProtoFilterNum()).thenReturn(0L); + when(cbm.getNextBlockSlotTime()).thenReturn(0L); + // The P2P path builds a TransactionMessage, which validates the contract against the static + // DynamicPropertiesStore held by Message; provide it so the message can be constructed. + Field messageStore = org.tron.common.overlay.message.Message.class + .getDeclaredField("dynamicPropertiesStore"); + messageStore.setAccessible(true); + messageStore.set(null, dps); + setWalletField(wallet, "tronNetDelegate", netDelegate); + setWalletField(wallet, "dbManager", dbManager); + setWalletField(wallet, "chainBaseManager", cbm); + setWalletField(wallet, "tronNetService", netSvc); + setWalletField(wallet, "torBroadcastService", torSvc); + setWalletField(wallet, "minEffectiveConnection", 0); + setWalletField(wallet, "trxCacheEnable", false); + return wallet; + } + + private static Protocol.Transaction routingTransaction() { + byte[] addr = new byte[21]; + addr[0] = 0x41; + BalanceContract.TransferContract transfer = BalanceContract.TransferContract.newBuilder() + .setOwnerAddress(ByteString.copyFrom(addr)) + .setToAddress(ByteString.copyFrom(addr)) + .setAmount(1L) + .build(); + return Protocol.Transaction.newBuilder() + .setRawData(Protocol.Transaction.raw.newBuilder() + .addContract(Protocol.Transaction.Contract.newBuilder() + .setType(Protocol.Transaction.Contract.ContractType.TransferContract) + .setParameter(Any.pack(transfer)))) + .build(); + } + + private static void setWalletField(Wallet wallet, String name, Object value) throws Exception { + Field field = Wallet.class.getDeclaredField(name); + field.setAccessible(true); + field.set(wallet, value); + } + @Test public void testBroadcastTransactionAlreadyExists() throws Exception { Wallet wallet = new Wallet(); diff --git a/framework/src/test/java/org/tron/core/net/service/tor/TorBroadcastServiceTest.java b/framework/src/test/java/org/tron/core/net/service/tor/TorBroadcastServiceTest.java new file mode 100644 index 00000000000..0bb5c7cd5d7 --- /dev/null +++ b/framework/src/test/java/org/tron/core/net/service/tor/TorBroadcastServiceTest.java @@ -0,0 +1,566 @@ +package org.tron.core.net.service.tor; + +import com.google.protobuf.ByteString; +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import org.springframework.test.util.ReflectionTestUtils; +import org.tron.common.TestConstants; +import org.tron.common.parameter.CommonParameter; +import org.tron.core.config.args.Args; +import org.tron.core.net.TronNetDelegate; +import org.tron.core.net.TronNetService; +import org.tron.core.net.message.MessageTypes; +import org.tron.core.net.message.adv.FetchInvDataMessage; +import org.tron.core.net.message.adv.InventoryMessage; +import org.tron.p2p.base.Parameter; +import org.tron.p2p.connection.business.upgrade.UpgradeController; +import org.tron.p2p.discover.Node; +import org.tron.p2p.protos.Connect; +import org.tron.p2p.utils.NetUtil; +import org.tron.protos.Protocol; +import org.tron.protos.Protocol.Inventory.InventoryType; +import org.tron.protos.Protocol.Transaction; + +/** + * End-to-end test for {@link TorBroadcastService}: an enqueued transaction must reach a standard + * TRON node over a persistent tunnel using the native P2P protocol (Hello -> P2P_HELLO -> INV + * -> FetchInvData -> TransactionsMessage), tunnelled through a SOCKS5 proxy. A minimal + * in-process SOCKS5 forwarder stands in for Tor, and a stub node speaks just enough of the wire + * protocol to accept the transaction — so the whole routing + framing + compression path runs + * without any external dependency. Delivery is asynchronous (buffer + background flush), so the + * test waits for the stub to receive the transaction. + */ +public class TorBroadcastServiceTest { + + private static final int NETWORK_ID = 11111; + private static final byte HELLO = (byte) 0xfd; + + private StubNode stubNode; + private Socks5Forwarder socksProxy; + private TorBroadcastService service; + private final AtomicBoolean transactionReceived = new AtomicBoolean(false); + + @BeforeClass + public static void initArgs() { + // Initialise CommonParameter as the other net tests do (also sets up global state this test's + // service touches). PeerConnection's static relayNodes is safe regardless of ordering now that + // CommonParameter.fastForwardNodes defaults to an empty list. + Args.setParam(new String[] {}, TestConstants.TEST_CONF); + } + + @Before + public void setUp() throws IOException { + if (Parameter.p2pConfig == null) { + Parameter.p2pConfig = new org.tron.p2p.P2pConfig(); + } + stubNode = new StubNode(transactionReceived); + stubNode.start(); + socksProxy = new Socks5Forwarder(); + socksProxy.start(); + + CommonParameter parameter = CommonParameter.getInstance(); + parameter.setTorSocksHost("127.0.0.1"); + parameter.setTorSocksPort(socksProxy.getPort()); + parameter.setTorConnectTimeout(5000); + parameter.setTorReadTimeout(5000); + parameter.setTorBroadcastEnable(true); + parameter.setTorCircuitIsolation(false); // keep the in-process forwarder auth-free + parameter.setTorBroadcastCount(1); + parameter.setTorControlPort(0); + parameter.setNodeP2pVersion(NETWORK_ID); + } + + @After + public void tearDown() { + if (service != null) { + service.shutdown(); // stop the background threads so they don't leak into other tests + service = null; + } + if (stubNode != null) { + stubNode.close(); + } + if (socksProxy != null) { + socksProxy.close(); + } + } + + @Test + public void transactionDeliveredOverPersistentTunnel() throws Exception { + Node stub = new Node(new InetSocketAddress("127.0.0.1", stubNode.getPort())); + newService(Collections.singletonList(stub)); + + service.broadcastTransaction(sampleTransaction()); + + // Delivery is async (pool maintenance + flush run in the background); wait for the stub. + long deadline = System.currentTimeMillis() + 15000; + while (System.currentTimeMillis() < deadline && !transactionReceived.get()) { + Thread.sleep(100); + } + Assert.assertTrue("stub node never received the transaction over the persistent tunnel", + transactionReceived.get()); + } + + @Test + public void noRelaysNothingDelivered() throws Exception { + newService(Collections.emptyList()); + + service.broadcastTransaction(sampleTransaction()); + + Thread.sleep(2000); + Assert.assertFalse("nothing should be delivered when there are no relay nodes", + transactionReceived.get()); + } + + @Test + public void torProxyOutageThenRecovery() throws Exception { + // Point the SOCKS proxy at a closed port: the Tor daemon is "down". + CommonParameter.getInstance().setTorSocksPort(freePort()); + Node stub = new Node(new InetSocketAddress("127.0.0.1", stubNode.getPort())); + newService(Collections.singletonList(stub)); + + service.broadcastTransaction(sampleTransaction()); + Thread.sleep(2500); + Assert.assertFalse("nothing should be delivered while Tor is down", + transactionReceived.get()); + + // Tor comes back: point the proxy at the working forwarder. No transaction is lost — the + // buffered one must be delivered once tunnels can be built again. + CommonParameter.getInstance().setTorSocksPort(socksProxy.getPort()); + waitUntil(transactionReceived, 30000); + Assert.assertTrue("buffered transaction must be delivered once Tor recovers", + transactionReceived.get()); + } + + @Test + public void relayDegradedThenRecovers() throws Exception { + stubNode.healthy = false; // the only relay drops us during the handshake + Node stub = new Node(new InetSocketAddress("127.0.0.1", stubNode.getPort())); + newService(Collections.singletonList(stub)); + + service.broadcastTransaction(sampleTransaction()); + Thread.sleep(2500); + Assert.assertFalse("nothing should be delivered while the only relay is degraded", + transactionReceived.get()); + + // Relay recovers: the pool must rebuild a tunnel and deliver the buffered transaction. + stubNode.healthy = true; + waitUntil(transactionReceived, 30000); + Assert.assertTrue("buffered transaction must be delivered once the relay recovers", + transactionReceived.get()); + } + + @Test + public void newnymSignalledWhenPoolCannotForm() throws Exception { + TorControlServer control = new TorControlServer(); + control.start(); + CommonParameter.getInstance().setTorControlPort(control.getPort()); + CommonParameter.getInstance().setTorControlPassword(""); + try { + // No relay nodes -> the pool can never form -> after a few empty rounds the node should ask + // Tor for fresh circuits (new exit IPs) via the control port. + newService(Collections.emptyList()); + service.broadcastTransaction(sampleTransaction()); + waitUntil(control.newnymReceived, 40000); + Assert.assertTrue("node should signal NEWNYM when no relay tunnel can be built", + control.newnymReceived.get()); + } finally { + CommonParameter.getInstance().setTorControlPort(0); + control.close(); + } + } + + private static void waitUntil(AtomicBoolean flag, long timeoutMs) throws InterruptedException { + long deadline = System.currentTimeMillis() + timeoutMs; + while (System.currentTimeMillis() < deadline && !flag.get()) { + Thread.sleep(100); + } + } + + private static int freePort() throws IOException { + try (ServerSocket s = new ServerSocket()) { + s.bind(new InetSocketAddress("127.0.0.1", 0)); + return s.getLocalPort(); + } + } + + private TorBroadcastService newService(java.util.List tableNodes) { + TorBroadcastService svc = new TorBroadcastService(); + TronNetService netService = Mockito.mock(TronNetService.class); + TronNetDelegate netDelegate = Mockito.mock(TronNetDelegate.class); + Mockito.when(netService.getTableNodes()).thenReturn(tableNodes); + Mockito.when(netDelegate.getActivePeer()).thenReturn(Collections.emptyList()); + ReflectionTestUtils.setField(svc, "tronNetService", netService); + ReflectionTestUtils.setField(svc, "tronNetDelegate", netDelegate); + this.service = svc; // tracked so tearDown can shut it down + return svc; + } + + private static Transaction sampleTransaction() { + return Transaction.newBuilder() + .setRawData(Transaction.raw.newBuilder().setTimestamp(1L)) + .build(); + } + + // ---- framing helpers (mirror the service / libp2p wire format) ------------------------------ + + private static void writeFrame(OutputStream out, byte[] payload) throws IOException { + int value = payload.length; + while (true) { + if ((value & ~0x7f) == 0) { + out.write(value); + break; + } + out.write((value & 0x7f) | 0x80); + value >>>= 7; + } + out.write(payload); + out.flush(); + } + + private static byte[] readFrame(InputStream in) throws IOException { + int result = 0; + int shift = 0; + while (shift < 32) { + int b = in.read(); + if (b == -1) { + throw new IOException("closed"); + } + result |= (b & 0x7f) << shift; + if ((b & 0x80) == 0) { + break; + } + shift += 7; + } + byte[] buf = new byte[result]; + int off = 0; + while (off < result) { + int n = in.read(buf, off, result - off); + if (n == -1) { + throw new IOException("closed"); + } + off += n; + } + return buf; + } + + private static byte[] prepend(byte type, byte[] data) { + byte[] out = new byte[data.length + 1]; + out[0] = type; + System.arraycopy(data, 0, out, 1, data.length); + return out; + } + + private static byte[] tail(byte[] data) { + byte[] out = new byte[data.length - 1]; + System.arraycopy(data, 1, out, 0, out.length); + return out; + } + + /** A stub standard TRON node: handshakes, keeps the tunnel open, records receipt of the tx. */ + private static final class StubNode implements Closeable { + + private final ServerSocket serverSocket; + private final AtomicBoolean received; + // When false the relay is "degraded": it drops the connection during the handshake. + volatile boolean healthy = true; + + StubNode(AtomicBoolean received) throws IOException { + this.received = received; + this.serverSocket = new ServerSocket(); + this.serverSocket.bind(new InetSocketAddress("127.0.0.1", 0)); + } + + int getPort() { + return serverSocket.getLocalPort(); + } + + void start() { + Thread t = new Thread(() -> { + while (!serverSocket.isClosed()) { + try { + Socket client = serverSocket.accept(); + Thread worker = new Thread(() -> { + try { + serve(client); + } catch (Exception ignored) { + // connection closed on teardown + } finally { + try { + client.close(); + } catch (IOException ignored) { + // ignore + } + } + }); + worker.setDaemon(true); + worker.start(); + } catch (Exception e) { + return; // server socket closed on teardown + } + } + }, "stub-tron-node"); + t.setDaemon(true); + t.start(); + } + + private void serve(Socket client) throws Exception { + InputStream in = client.getInputStream(); + OutputStream out = client.getOutputStream(); + + // Transport Hello (uncompressed); reject if the client's "from" endpoint is invalid. + Connect.HelloMessage clientHello = Connect.HelloMessage.parseFrom(tail(readFrame(in))); + if (!NetUtil.validNode(NetUtil.getNode(clientHello.getFrom()))) { + return; + } + if (!healthy) { + return; // degraded relay: drop the connection mid-handshake + } + Connect.HelloMessage hello = Connect.HelloMessage.newBuilder() + .setFrom(Connect.HelloMessage.getDefaultInstance().getFrom()) + .setNetworkId(NETWORK_ID).setCode(0).setVersion(1) + .setTimestamp(System.currentTimeMillis()).build(); + // rebuild with a valid endpoint + org.tron.p2p.protos.Discover.Endpoint ep = org.tron.p2p.protos.Discover.Endpoint.newBuilder() + .setNodeId(ByteString.copyFrom(NetUtil.getNodeId())) + .setAddress(ByteString.copyFrom("203.0.113.7".getBytes())).setPort(18888).build(); + hello = hello.toBuilder().setFrom(ep).build(); + writeFrame(out, prepend(HELLO, hello.toByteArray())); + + // Application-level P2P_HELLO advertising a head block (compressed, post-handshake). + Protocol.HelloMessage.BlockId block = Protocol.HelloMessage.BlockId.newBuilder() + .setHash(ByteString.copyFrom(new byte[32])).setNumber(1L).build(); + org.tron.protos.Discover.Endpoint appFrom = org.tron.protos.Discover.Endpoint.newBuilder() + .setNodeId(ByteString.copyFrom(NetUtil.getNodeId())) + .setAddress(ByteString.copyFrom("203.0.113.7".getBytes())).setPort(18888).build(); + Protocol.HelloMessage appHello = Protocol.HelloMessage.newBuilder() + .setFrom(appFrom).setVersion(NETWORK_ID).setTimestamp(System.currentTimeMillis()) + .setGenesisBlockId(block).setSolidBlockId(block).setHeadBlockId(block) + .setNodeType(0).setLowestBlockNum(0).build(); + writeFrame(out, UpgradeController.codeSendData(1, + prepend(MessageTypes.P2P_HELLO.asByte(), appHello.toByteArray()))); + + // Read frames until the client advertises the tx (INV), then request it and read the batch. + long deadline = System.currentTimeMillis() + 15000; + while (System.currentTimeMillis() < deadline) { + byte[] frame = UpgradeController.decodeReceiveData(1, readFrame(in)); + if (frame.length == 0) { + continue; + } + if (frame[0] == MessageTypes.INVENTORY.asByte()) { + InventoryMessage invMessage = new InventoryMessage(tail(frame)); + FetchInvDataMessage fetch = + new FetchInvDataMessage(invMessage.getHashList(), InventoryType.TRX); + writeFrame(out, UpgradeController.codeSendData(1, fetch.getSendBytes())); + } else if (frame[0] == MessageTypes.TRXS.asByte()) { + received.set(true); + return; + } + } + } + + @Override + public void close() { + try { + serverSocket.close(); + } catch (IOException ignored) { + // ignore + } + } + } + + /** Minimal Tor ControlPort stub: accepts AUTHENTICATE then records a SIGNAL NEWNYM. */ + private static final class TorControlServer implements Closeable { + + private final ServerSocket serverSocket; + final AtomicBoolean newnymReceived = new AtomicBoolean(false); + + TorControlServer() throws IOException { + serverSocket = new ServerSocket(); + serverSocket.bind(new InetSocketAddress("127.0.0.1", 0)); + } + + int getPort() { + return serverSocket.getLocalPort(); + } + + void start() { + Thread t = new Thread(() -> { + while (!serverSocket.isClosed()) { + try { + Socket client = serverSocket.accept(); + Thread worker = new Thread(() -> handle(client)); + worker.setDaemon(true); + worker.start(); + } catch (IOException e) { + return; + } + } + }, "test-tor-control"); + t.setDaemon(true); + t.start(); + } + + private void handle(Socket client) { + try { + BufferedReader in = new BufferedReader(new InputStreamReader( + client.getInputStream(), StandardCharsets.US_ASCII)); + OutputStream out = client.getOutputStream(); + String line; + while ((line = in.readLine()) != null) { + if (line.startsWith("AUTHENTICATE") || line.startsWith("SIGNAL NEWNYM")) { + if (line.startsWith("SIGNAL NEWNYM")) { + newnymReceived.set(true); + } + out.write("250 OK\r\n".getBytes(StandardCharsets.US_ASCII)); + out.flush(); + } + } + } catch (IOException ignored) { + // connection closed + } + } + + @Override + public void close() { + try { + serverSocket.close(); + } catch (IOException ignored) { + // ignore + } + } + } + + /** Minimal no-auth SOCKS5 forwarder: CONNECT, then blindly pipe bytes both ways. */ + private static final class Socks5Forwarder implements Closeable { + + private final ServerSocket serverSocket; + + Socks5Forwarder() throws IOException { + serverSocket = new ServerSocket(); + serverSocket.bind(new InetSocketAddress("127.0.0.1", 0)); + } + + int getPort() { + return serverSocket.getLocalPort(); + } + + void start() { + Thread t = new Thread(() -> { + while (!serverSocket.isClosed()) { + try { + Socket client = serverSocket.accept(); + Thread worker = new Thread(() -> handle(client)); + worker.setDaemon(true); + worker.start(); + } catch (IOException e) { + return; + } + } + }, "test-socks5"); + t.setDaemon(true); + t.start(); + } + + private void handle(Socket client) { + try { + InputStream in = client.getInputStream(); + OutputStream out = client.getOutputStream(); + readNBytes(in, 1); // version + int nMethods = in.read(); + readNBytes(in, nMethods); + out.write(new byte[] {0x05, 0x00}); + out.flush(); + + readNBytes(in, 3); // ver, cmd, rsv + int atyp = in.read(); + byte[] addr; + if (atyp == 0x01) { + addr = readNBytes(in, 4); + } else if (atyp == 0x03) { + addr = readNBytes(in, in.read()); + } else { + addr = readNBytes(in, 16); + } + int port = ((in.read() & 0xff) << 8) | (in.read() & 0xff); + String host = atyp == 0x03 ? new String(addr, java.nio.charset.StandardCharsets.US_ASCII) + : java.net.InetAddress.getByAddress(addr).getHostAddress(); + + Socket target = new Socket(host, port); + out.write(new byte[] {0x05, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0}); + out.flush(); + pipe(client, target); + pipe(target, client); + } catch (IOException e) { + closeQuietly(client); + } + } + + private void pipe(Socket from, Socket to) { + Thread t = new Thread(() -> { + byte[] buf = new byte[4096]; + try { + InputStream src = from.getInputStream(); + OutputStream dst = to.getOutputStream(); + int n; + while ((n = src.read(buf)) != -1) { + dst.write(buf, 0, n); + dst.flush(); + } + } catch (IOException ignored) { + // closed + } finally { + closeQuietly(from); + closeQuietly(to); + } + }); + t.setDaemon(true); + t.start(); + } + + private static byte[] readNBytes(InputStream in, int n) throws IOException { + byte[] buf = new byte[n]; + int off = 0; + while (off < n) { + int r = in.read(buf, off, n - off); + if (r == -1) { + throw new IOException("closed"); + } + off += r; + } + return buf; + } + + private static void closeQuietly(Socket socket) { + try { + socket.close(); + } catch (IOException ignored) { + // ignore + } + } + + @Override + public void close() { + try { + serverSocket.close(); + } catch (IOException ignored) { + // ignore + } + } + } +}