Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 56 additions & 3 deletions src/connectionHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,25 @@ const handle = (conn) => {
conn.destroy();
});

conn.on("data", function mainListener(data) {
data = data.trim().split(",");
// mainListener accumulates incoming bytes until either a '\n'
// terminator is seen OR a short idle gap suggests a legacy client
// finished sending. TCP does not preserve application-level
// boundaries, so handling the first chunk as if it were a full
// message risked passing a truncated JOB request (e.g. data[3]
// cut mid-mining_key) to miningHandler. Any bytes that followed
// the first '\n' are fed into conn._recvBuf so miningHandler's
// receiveData picks them up as the next message without loss.
//
// Dual-mode framing matches the same logic in mining.js's
// receiveData: prefer '\n', fall back to "no data for 10 ms" for
// legacy clients that don't terminate their messages.
const MAIN_IDLE_FLUSH_MS = 10;
let mainFlushTimer = null;

// processParsedData contains the original mainListener body (the
// ban / JOB / MOTD dispatch) unchanged. It is called exactly once
// per client, with a cleanly-framed message.
const processParsedData = (data) => {

if (!conn.remoteAddress || data.length > 6) {
conn.write("BAD,Incorrect data\n");
Expand Down Expand Up @@ -125,7 +142,43 @@ const handle = (conn) => {
} else if (data[0] === "MOTD") {
conn.write(motd);
}
});
};

const dispatchMain = () => {
if (mainFlushTimer) {
clearTimeout(mainFlushTimer);
mainFlushTimer = null;
}
const idx = (conn._mainBuf || "").indexOf("\n");
if (idx >= 0) {
const line = conn._mainBuf.slice(0, idx);
const leftover = conn._mainBuf.slice(idx + 1);
conn._mainBuf = "";
if (leftover.length > 0) {
conn._recvBuf = (conn._recvBuf || "") + leftover;
}
processParsedData(line.trim().split(","));
return;
}
// No '\n' yet but no more data in flight: legacy client frame.
if ((conn._mainBuf || "").length > 0) {
const line = conn._mainBuf;
conn._mainBuf = "";
processParsedData(line.trim().split(","));
}
};

const mainListener = (chunk) => {
conn._mainBuf = (conn._mainBuf || "") + chunk;
if (conn._mainBuf.indexOf("\n") >= 0) {
dispatchMain();
return;
}
if (mainFlushTimer) clearTimeout(mainFlushTimer);
mainFlushTimer = setTimeout(dispatchMain, MAIN_IDLE_FLUSH_MS);
};

conn.on("data", mainListener);
};

module.exports = handle;
104 changes: 100 additions & 4 deletions src/mining.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,108 @@ const checkWorkers = (ipWorkers, usrWorkers, serverMiners, username) => {
return false;
};

// Read one message from the socket, with safe framing.
//
// The previous implementation attached a one-shot 'data' listener and
// resolved with whatever single chunk Node.js happened to deliver.
// TCP does not preserve application-level message boundaries: under
// event-loop pressure a single client message can span two 'data'
// events, or two messages can arrive in one event. The one-shot
// pattern had no way to recover — answer[0] ended up as fragments of
// a different message, parseInt returned NaN, miner_res !== random,
// and a valid share was rejected as "BAD,Incorrect result".
//
// This version installs a single persistent 'data' listener per
// connection, appends every chunk to a per-conn buffer, and hands
// pending promises one message at a time.
//
// Framing is dual-mode for backward compatibility with legacy miners
// (including the reference AVR_Miner.py / PC_Miner.py which do not
// append '\n' to their outgoing messages):
//
// 1. If the buffer contains '\n', cut the message there. This is
// the robust path; new clients that send '\n' get it.
// 2. Otherwise wait LEGACY_IDLE_FLUSH_MS ms from the last chunk.
// If no more bytes have arrived, treat the buffer as a single
// message (legacy framing). Fragments of a single logical
// message arrive sub-ms apart at receive side, so this idle
// window cleanly distinguishes "still arriving" from "message
// complete".
const LEGACY_IDLE_FLUSH_MS = 10;

const drain = (conn) => {
while (conn._recvPending.length > 0) {
const idx = conn._recvBuf.indexOf("\n");
if (idx >= 0) {
const line = conn._recvBuf.slice(0, idx);
conn._recvBuf = conn._recvBuf.slice(idx + 1);
conn._recvPending.shift()(line.trim());
continue;
}
break;
}
};

const legacyFlush = (conn) => {
conn._flushTimer = null;
if (conn._recvPending.length === 0) return;
if (conn._recvBuf.length === 0) return;
if (conn._recvBuf.indexOf("\n") >= 0) {
drain(conn);
return;
}
// No newline arrived within the idle window — assume a legacy
// client that frames one message per 'data' event. Consume the
// whole buffer as a single message.
const line = conn._recvBuf;
conn._recvBuf = "";
conn._recvPending.shift()(line.trim());
};

const installRecvReader = (conn) => {
if (conn._recvPending !== undefined) return;
// conn._recvBuf may already contain leftover bytes placed there by
// the initial data handler in connectionHandler.js — preserve them.
if (conn._recvBuf === undefined) conn._recvBuf = "";
conn._recvPending = [];
conn._flushTimer = null;
conn.on("data", (chunk) => {
conn._recvBuf += chunk;
drain(conn);
// Re-arm the legacy idle flush on every new chunk. If more
// fragments of the same message are still in flight, this
// timer gets reset before it fires.
if (conn._flushTimer) clearTimeout(conn._flushTimer);
if (conn._recvPending.length > 0 && conn._recvBuf.length > 0) {
conn._flushTimer = setTimeout(() => legacyFlush(conn),
LEGACY_IDLE_FLUSH_MS);
}
});
};

const receiveData = (conn) => {
installRecvReader(conn);
return new Promise((resolve) => {
conn.on("data", function listener(data) {
conn.removeListener("data", listener);
resolve(data.trim());
});
// Fast path: a complete line (or any pre-buffered bytes) is
// already waiting and no other receiver is ahead of us.
if (conn._recvPending.length === 0) {
const idx = conn._recvBuf.indexOf("\n");
if (idx >= 0) {
const line = conn._recvBuf.slice(0, idx);
conn._recvBuf = conn._recvBuf.slice(idx + 1);
resolve(line.trim());
return;
}
}
conn._recvPending.push(resolve);
// If bytes are already pre-buffered without '\n' (possible
// after mainListener leftover hand-off), arm the legacy idle
// flush so we don't hang waiting for a '\n' that a legacy
// client will never send.
if (conn._recvBuf.length > 0 && !conn._flushTimer) {
conn._flushTimer = setTimeout(() => legacyFlush(conn),
LEGACY_IDLE_FLUSH_MS);
}
});
};

Expand Down