diff --git a/src/connectionHandler.js b/src/connectionHandler.js index 3fd4e07..0301759 100755 --- a/src/connectionHandler.js +++ b/src/connectionHandler.js @@ -73,8 +73,25 @@ const handle = (conn) => { conn.destroy(); }); - conn.on("data", function mainListener(data) { - data = data.trim().split(","); + // mainListener accumulates incoming bytes until either a '\n' + // terminator is seen OR a short idle gap suggests a legacy client + // finished sending. TCP does not preserve application-level + // boundaries, so handling the first chunk as if it were a full + // message risked passing a truncated JOB request (e.g. data[3] + // cut mid-mining_key) to miningHandler. Any bytes that followed + // the first '\n' are fed into conn._recvBuf so miningHandler's + // receiveData picks them up as the next message without loss. + // + // Dual-mode framing matches the same logic in mining.js's + // receiveData: prefer '\n', fall back to "no data for 10 ms" for + // legacy clients that don't terminate their messages. + const MAIN_IDLE_FLUSH_MS = 10; + let mainFlushTimer = null; + + // processParsedData contains the original mainListener body (the + // ban / JOB / MOTD dispatch) unchanged. It is called exactly once + // per client, with a cleanly-framed message. + const processParsedData = (data) => { if (!conn.remoteAddress || data.length > 6) { conn.write("BAD,Incorrect data\n"); @@ -125,7 +142,43 @@ const handle = (conn) => { } else if (data[0] === "MOTD") { conn.write(motd); } - }); + }; + + const dispatchMain = () => { + if (mainFlushTimer) { + clearTimeout(mainFlushTimer); + mainFlushTimer = null; + } + const idx = (conn._mainBuf || "").indexOf("\n"); + if (idx >= 0) { + const line = conn._mainBuf.slice(0, idx); + const leftover = conn._mainBuf.slice(idx + 1); + conn._mainBuf = ""; + if (leftover.length > 0) { + conn._recvBuf = (conn._recvBuf || "") + leftover; + } + processParsedData(line.trim().split(",")); + return; + } + // No '\n' yet but no more data in flight: legacy client frame. + if ((conn._mainBuf || "").length > 0) { + const line = conn._mainBuf; + conn._mainBuf = ""; + processParsedData(line.trim().split(",")); + } + }; + + const mainListener = (chunk) => { + conn._mainBuf = (conn._mainBuf || "") + chunk; + if (conn._mainBuf.indexOf("\n") >= 0) { + dispatchMain(); + return; + } + if (mainFlushTimer) clearTimeout(mainFlushTimer); + mainFlushTimer = setTimeout(dispatchMain, MAIN_IDLE_FLUSH_MS); + }; + + conn.on("data", mainListener); }; module.exports = handle; diff --git a/src/mining.js b/src/mining.js index eaed1ca..8ba03ab 100755 --- a/src/mining.js +++ b/src/mining.js @@ -55,12 +55,108 @@ const checkWorkers = (ipWorkers, usrWorkers, serverMiners, username) => { return false; }; +// Read one message from the socket, with safe framing. +// +// The previous implementation attached a one-shot 'data' listener and +// resolved with whatever single chunk Node.js happened to deliver. +// TCP does not preserve application-level message boundaries: under +// event-loop pressure a single client message can span two 'data' +// events, or two messages can arrive in one event. The one-shot +// pattern had no way to recover — answer[0] ended up as fragments of +// a different message, parseInt returned NaN, miner_res !== random, +// and a valid share was rejected as "BAD,Incorrect result". +// +// This version installs a single persistent 'data' listener per +// connection, appends every chunk to a per-conn buffer, and hands +// pending promises one message at a time. +// +// Framing is dual-mode for backward compatibility with legacy miners +// (including the reference AVR_Miner.py / PC_Miner.py which do not +// append '\n' to their outgoing messages): +// +// 1. If the buffer contains '\n', cut the message there. This is +// the robust path; new clients that send '\n' get it. +// 2. Otherwise wait LEGACY_IDLE_FLUSH_MS ms from the last chunk. +// If no more bytes have arrived, treat the buffer as a single +// message (legacy framing). Fragments of a single logical +// message arrive sub-ms apart at receive side, so this idle +// window cleanly distinguishes "still arriving" from "message +// complete". +const LEGACY_IDLE_FLUSH_MS = 10; + +const drain = (conn) => { + while (conn._recvPending.length > 0) { + const idx = conn._recvBuf.indexOf("\n"); + if (idx >= 0) { + const line = conn._recvBuf.slice(0, idx); + conn._recvBuf = conn._recvBuf.slice(idx + 1); + conn._recvPending.shift()(line.trim()); + continue; + } + break; + } +}; + +const legacyFlush = (conn) => { + conn._flushTimer = null; + if (conn._recvPending.length === 0) return; + if (conn._recvBuf.length === 0) return; + if (conn._recvBuf.indexOf("\n") >= 0) { + drain(conn); + return; + } + // No newline arrived within the idle window — assume a legacy + // client that frames one message per 'data' event. Consume the + // whole buffer as a single message. + const line = conn._recvBuf; + conn._recvBuf = ""; + conn._recvPending.shift()(line.trim()); +}; + +const installRecvReader = (conn) => { + if (conn._recvPending !== undefined) return; + // conn._recvBuf may already contain leftover bytes placed there by + // the initial data handler in connectionHandler.js — preserve them. + if (conn._recvBuf === undefined) conn._recvBuf = ""; + conn._recvPending = []; + conn._flushTimer = null; + conn.on("data", (chunk) => { + conn._recvBuf += chunk; + drain(conn); + // Re-arm the legacy idle flush on every new chunk. If more + // fragments of the same message are still in flight, this + // timer gets reset before it fires. + if (conn._flushTimer) clearTimeout(conn._flushTimer); + if (conn._recvPending.length > 0 && conn._recvBuf.length > 0) { + conn._flushTimer = setTimeout(() => legacyFlush(conn), + LEGACY_IDLE_FLUSH_MS); + } + }); +}; + const receiveData = (conn) => { + installRecvReader(conn); return new Promise((resolve) => { - conn.on("data", function listener(data) { - conn.removeListener("data", listener); - resolve(data.trim()); - }); + // Fast path: a complete line (or any pre-buffered bytes) is + // already waiting and no other receiver is ahead of us. + if (conn._recvPending.length === 0) { + const idx = conn._recvBuf.indexOf("\n"); + if (idx >= 0) { + const line = conn._recvBuf.slice(0, idx); + conn._recvBuf = conn._recvBuf.slice(idx + 1); + resolve(line.trim()); + return; + } + } + conn._recvPending.push(resolve); + // If bytes are already pre-buffered without '\n' (possible + // after mainListener leftover hand-off), arm the legacy idle + // flush so we don't hang waiting for a '\n' that a legacy + // client will never send. + if (conn._recvBuf.length > 0 && !conn._flushTimer) { + conn._flushTimer = setTimeout(() => legacyFlush(conn), + LEGACY_IDLE_FLUSH_MS); + } }); };