From c2d1fd3faba7b5f75f48b04cb8cf81622ad8930a Mon Sep 17 00:00:00 2001 From: leolambo Date: Tue, 28 Apr 2026 11:11:55 -0400 Subject: [PATCH 1/8] Add streamJsonArray helper for route-layer piping Unify ExternalApiStream.onStream and Storage.apiStream into a single route-layer helper. CSPs can now return Readable streams while routes handle HTTP framing, error injection, and client disconnect cleanup. Supports JSON-array (default) and JSONL output. Pre-data errors reject so the caller can send a proper 5xx; mid-stream errors append an inline error marker. Detects mongo cursor-style streams via .close(). --- packages/bitcore-node/src/routes/apiUtils.ts | 112 ++++++++++++++++++- 1 file changed, 111 insertions(+), 1 deletion(-) diff --git a/packages/bitcore-node/src/routes/apiUtils.ts b/packages/bitcore-node/src/routes/apiUtils.ts index 5197557e61..abf9814d64 100644 --- a/packages/bitcore-node/src/routes/apiUtils.ts +++ b/packages/bitcore-node/src/routes/apiUtils.ts @@ -1,4 +1,5 @@ -import { Response } from 'express'; +import { Readable } from 'stream'; +import { Request, Response } from 'express'; import { AdapterError, AdapterErrorCode, AllProvidersUnavailableError } from '../providers/chain-state/external/adapters/errors'; export function respondWithError(res: Response, err: any) { @@ -10,3 +11,112 @@ export function respondWithError(res: Response, err: any) { } return res.status(500).send(err.message || err); } + +export interface StreamJsonArrayOpts { + jsonl?: boolean; +} + +export interface StreamJsonArrayResult { + success: boolean; + error?: any; +} + +/** + * Pipe a Readable stream to an Express response as a JSON array (default) or JSONL. + * + * - Pre-data errors reject so the route can send a proper status code + * - Mid-stream errors append an inline error marker and end the response + * - Client/response disconnects destroy the stream (and call .close() if present, e.g. mongo cursor) + */ +export function streamJsonArray( + stream: Readable & { close?: () => void }, + req: Request, + res: Response, + opts: StreamJsonArrayOpts = {} +): Promise { + return new Promise((resolve, reject) => { + let closed = false; + let isFirst = true; + + const cleanup = () => { + closed = true; + if (typeof stream.close === 'function') { + try { stream.close(); } catch { /* noop */ } + } else if (typeof stream.destroy === 'function') { + try { stream.destroy(); } catch { /* noop */ } + } + }; + + req.on('close', () => { closed = true; if (typeof stream.close === 'function') stream.close(); }); + res.type('json'); + res.on('close', () => { closed = true; if (typeof stream.close === 'function') stream.close(); }); + + stream.on('error', (err: any) => { + if (closed) return; + if (err?.isAxiosError) { + err.log = { + url: err?.config?.url, + statusCode: err?.response?.status, + statusMsg: err?.response?.statusText, + data: err?.response?.data, + }; + } + if (err?.log?.data?.message?.includes('not supported')) { + closed = true; + res.write('[]'); + res.end(); + return resolve({ success: false, error: err }); + } + if (!isFirst) { + // Headers already sent — emit inline error marker, end response, log upstream + closed = true; + const errMsg = '{"error": "An error occurred during data stream"}'; + if (opts.jsonl) { + res.write(`${errMsg}`); + } else { + res.write(`,\n${errMsg}\n]`); + } + res.end(); + cleanup(); + return resolve({ success: false, error: err }); + } + // Pre-data — caller can send proper 5xx status + return reject(err); + }); + + stream.on('data', (data: any) => { + if (closed) { + cleanup(); + return; + } + if (!opts.jsonl) { + if (isFirst) { + res.write('[\n'); + } else { + res.write(',\n'); + } + } + if (isFirst) { + isFirst = false; + } + if (typeof data !== 'string' && !Buffer.isBuffer(data)) { + data = JSON.stringify(data); + } + res.write(data); + }); + + stream.on('end', () => { + if (closed) return; + closed = true; + if (!opts.jsonl) { + if (isFirst) { + res.write('[]'); + } else { + res.write('\n]'); + } + } + res.end(); + resolve({ success: true }); + }); + }); +} From 128c3a1c795a9624edee1d921d7ae86949afddd9 Mon Sep 17 00:00:00 2001 From: leolambo Date: Tue, 28 Apr 2026 14:32:02 -0400 Subject: [PATCH 2/8] Decouple chain-state providers from HTTP req/res CSP stream methods now return Readable; routes own the piping via streamJsonArray. The HTTP framing, mid-stream error injection, and client-disconnect cleanup that were duplicated across ExternalApiStream and Storage now live in one place at the route layer. Touches all six stream entry points (address, tx, block, wallet addresses/transactions/utxos) plus the gnosis multisig route, across internal, base EVM, Moralis, MultiProvider, SVM, and Ripple. Storage's apiStreamingFind drops req/res and returns the cursor stream directly. Stream params types lose req/res; integration tests pipe the returned stream through streamJsonArray. Auto-detects a jsonl flag on the returned stream so chain-specific formatting stays in the CSP without leaking through routes. --- .../src/modules/moralis/api/csp.ts | 14 +- .../src/modules/multiProvider/api/csp.ts | 8 +- .../src/modules/ripple/api/csp.ts | 5 +- .../src/providers/chain-state/evm/api/csp.ts | 103 ++++------ .../providers/chain-state/evm/api/gnosis.ts | 16 +- .../providers/chain-state/evm/api/routes.ts | 14 +- .../chain-state/internal/internal.ts | 29 +-- .../src/providers/chain-state/svm/api/csp.ts | 186 +++++++----------- .../bitcore-node/src/routes/api/address.ts | 10 +- packages/bitcore-node/src/routes/api/block.ts | 14 +- packages/bitcore-node/src/routes/api/tx.ts | 11 +- .../bitcore-node/src/routes/api/wallet.ts | 36 ++-- packages/bitcore-node/src/services/storage.ts | 94 +-------- .../types/namespaces/ChainStateProvider.ts | 12 -- .../test/integration/ethereum/csp.test.ts | 16 +- .../test/integration/matic/csp.test.ts | 13 +- .../test/integration/solana/csp.test.ts | 6 +- 17 files changed, 220 insertions(+), 367 deletions(-) diff --git a/packages/bitcore-node/src/modules/moralis/api/csp.ts b/packages/bitcore-node/src/modules/moralis/api/csp.ts index b949d9030a..b39e71021f 100644 --- a/packages/bitcore-node/src/modules/moralis/api/csp.ts +++ b/packages/bitcore-node/src/modules/moralis/api/csp.ts @@ -108,7 +108,7 @@ export class MoralisStateProvider extends BaseEVMStateProvider { // @override async streamBlocks(params: StreamBlocksParams) { - const { chain, network, req, res } = params; + const { chain, network } = params; const { web3 } = await this.getWeb3(network); const chainId = await this.getChainId({ network }); const blockRange = await this.getBlocksRange({ ...params, chainId }); @@ -146,8 +146,7 @@ export class MoralisStateProvider extends BaseEVMStateProvider { } }); - return ExternalApiStream.onStream(stream, req!, res!); - + return stream; } // @override @@ -165,10 +164,10 @@ export class MoralisStateProvider extends BaseEVMStateProvider { // @override async _buildAddressTransactionsStream(params: StreamAddressUtxosParams) { - const { req, res, args, network, address } = params; + const { args, network, address } = params; const chainId = await this.getChainId({ network }); - const txStream = await this._streamAddressTransactionsFromMoralis({ + return this._streamAddressTransactionsFromMoralis({ chainId, chain: this.chain, network, @@ -178,11 +177,6 @@ export class MoralisStateProvider extends BaseEVMStateProvider { ...args } }); - // TODO unify `ExternalApiStream.onStream` and `Storage.apiStream` which are effectively doing the same thing - const result = await ExternalApiStream.onStream(txStream, req!, res!); - if (!result?.success) { - logger.error('Error mid-stream (streamAddressTransactions): %o', result.error?.log || result.error); - } } // @override diff --git a/packages/bitcore-node/src/modules/multiProvider/api/csp.ts b/packages/bitcore-node/src/modules/multiProvider/api/csp.ts index 598aa3ba8a..19d649acd4 100644 --- a/packages/bitcore-node/src/modules/multiProvider/api/csp.ts +++ b/packages/bitcore-node/src/modules/multiProvider/api/csp.ts @@ -186,7 +186,7 @@ export class MultiProviderEVMStateProvider extends BaseEVMStateProvider { // @override — sequential failover with preflight check. // Buffers first item before piping to response; failover only before response bytes are written. async _buildAddressTransactionsStream(params: StreamAddressUtxosParams) { - const { req, res, args, network, address } = params; + const { args, network, address } = params; const chainId = await this.getChainId({ network }); const providers = this.getProvidersForNetwork(network); const PREFLIGHT_TIMEOUT_MS = 5000; @@ -261,11 +261,7 @@ export class MultiProviderEVMStateProvider extends BaseEVMStateProvider { txStream.resume(); } - const result = await ExternalApiStream.onStream(outputStream, req!, res!); - if (!result?.success) { - logger.error('Error mid-stream (streamAddressTransactions): %o', result.error?.log || result.error); - } - return; // Stream handled + return outputStream; } catch (error) { if (error instanceof AdapterError && (error as AdapterError).code === AdapterErrorCode.INVALID_REQUEST) throw error; // 400 — no failover provider.health.recordFailure(error as Error); diff --git a/packages/bitcore-node/src/modules/ripple/api/csp.ts b/packages/bitcore-node/src/modules/ripple/api/csp.ts index 65e25bafc8..e795c08cc1 100644 --- a/packages/bitcore-node/src/modules/ripple/api/csp.ts +++ b/packages/bitcore-node/src/modules/ripple/api/csp.ts @@ -9,7 +9,6 @@ import { CacheStorage } from '../../../models/cache'; import { ICoin } from '../../../models/coin'; import { WalletAddressStorage } from '../../../models/walletAddress'; import { InternalStateProvider } from '../../../providers/chain-state/internal/internal'; -import { Storage } from '../../../services/storage'; import { IBlock } from '../../../types/Block'; import { ChainNetwork } from '../../../types/ChainNetwork'; import { @@ -305,7 +304,7 @@ export class RippleStateProvider extends InternalStateProvider implements IChain const transformed = txs.map(tx => this.transformAccountTx(tx, params.network)); this.streamTxs(transformed, readable); readable.push(null); - Storage.stream(readable, params.req!, params.res!); + return readable; } async streamTransactions(params: StreamTransactionsParams) { @@ -316,7 +315,7 @@ export class RippleStateProvider extends InternalStateProvider implements IChain const txs = ledger.transactions || []; this.streamTxs(txs, readable); readable.push(null); - Storage.stream(readable, params.req, params.res); + return readable; } async getTransaction(params: StreamTransactionParams) { diff --git a/packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts b/packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts index 8fdbecbd52..fdd24211e5 100644 --- a/packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts +++ b/packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts @@ -1,3 +1,4 @@ +import { Readable } from 'stream'; import { CryptoRpc } from '@bitpay-labs/crypto-rpc'; import { Utils, Web3, type Web3Types } from '@bitpay-labs/crypto-wallet-core'; import { @@ -18,7 +19,7 @@ import { SpentHeightIndicators } from '../../../../types/Coin'; import { normalizeChainNetwork, partition, range } from '../../../../utils'; import { StatsUtil } from '../../../../utils/stats'; import { TransformWithEventPipe } from '../../../../utils/streamWithEventPipe'; -import { ExternalApiStream } from '../../external/streams/apiStream'; +import { AdapterError, AdapterErrorCode } from '../../external/adapters/errors'; import { AavePoolAbi } from '../abi/aavePool'; import { AavePoolAbiV2 } from '../abi/aavePoolV2'; import { ERC20Abi } from '../abi/erc20'; @@ -531,18 +532,11 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai } async streamAddressTransactions(params: StreamAddressUtxosParams) { - return new Promise(async (resolve, reject) => { - try { - await this._buildAddressTransactionsStream(params); - return resolve(); - } catch (err) { - return reject(err); - } - }); + return this._buildAddressTransactionsStream(params); } async _buildAddressTransactionsStream(params: StreamAddressUtxosParams) { - const { req, res, args, chain, network, address } = params; + const { args, chain, network, address } = params; const { limit, /* since,*/ tokenAddress } = args; if (!args.tokenAddress) { @@ -557,23 +551,17 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai // NOTE: commented out since and paging for now b/c they were causing extra long query times on insight. // The case where an address has >1000 txns is an edge case ATM and can be addressed later - Storage.apiStreamingFind(EVMTransactionStorage, query, { limit /* since, paging: '_id'*/ }, req!, res!); - } else { - try { - const tokenTransfers = await this.getErc20Transfers(network, address, tokenAddress, args); - res!.json(tokenTransfers); - } catch (err: any) { - logger.error('Error streaming address transactions: %o', err.stack || err.message || err); - throw err; - } + return Storage.apiStreamingFind(EVMTransactionStorage, query, { limit /* since, paging: '_id'*/ }); } + const tokenTransfers = await this.getErc20Transfers(network, address, tokenAddress, args); + return Readable.from(tokenTransfers, { objectMode: true }); } @historical @internal async streamTransactions(params: StreamTransactionsParams) { - const { chain, network, req, res, args } = params; + const { chain, network, args } = params; const { blockHash, blockHeight } = args; if (!chain || !network) { throw new Error('Missing chain or network'); @@ -590,7 +578,7 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai } const tip = await this.getLocalTip(params); const tipHeight = tip ? tip.height : 0; - return Storage.apiStreamingFind(EVMTransactionStorage, query, args, req, res, t => { + return Storage.apiStreamingFind(EVMTransactionStorage, query, args, t => { let confirmations = 0; if (t.blockHeight !== undefined && t.blockHeight >= 0) { confirmations = tipHeight - t.blockHeight + 1; @@ -677,47 +665,37 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai } async streamWalletTransactions(params: StreamWalletTransactionsParams) { - return new Promise(async (resolve, reject) => { - const { network, wallet, req, res, args } = params; - const { web3 } = await this.getWeb3(network); - args.tokenAddress = args.tokenAddress ? web3.utils.toChecksumAddress(args.tokenAddress) : undefined; + const { network, wallet, args } = params; + const { web3 } = await this.getWeb3(network); + args.tokenAddress = args.tokenAddress ? web3.utils.toChecksumAddress(args.tokenAddress) : undefined; - let transactionStream = new TransformWithEventPipe({ objectMode: true, passThrough: true }); - const walletAddresses = (await this.getWalletAddresses(wallet._id!)).map(waddres => waddres.address); - if (walletAddresses.length === 0) { - res.status(400).send('No addresses found for wallet'); - return resolve(); - } - const ethTransactionTransform = new EVMListTransactionsStream(walletAddresses, args.tokenAddress); - const populateReceipt = new PopulateReceiptTransform(this); - const populateEffects = new PopulateEffectsForAddressTransform(this, walletAddresses); - - const streamParams: BuildWalletTxsStreamParams = { - transactionStream, - populateEffects, - walletAddresses - }; - transactionStream = await this._buildWalletTransactionsStream(params, streamParams); + let transactionStream = new TransformWithEventPipe({ objectMode: true, passThrough: true }); + const walletAddresses = (await this.getWalletAddresses(wallet._id!)).map(waddres => waddres.address); + if (walletAddresses.length === 0) { + throw new AdapterError('walletAddresses', AdapterErrorCode.INVALID_REQUEST, 'No addresses found for wallet'); + } + const ethTransactionTransform = new EVMListTransactionsStream(walletAddresses, args.tokenAddress); + const populateReceipt = new PopulateReceiptTransform(this); + const populateEffects = new PopulateEffectsForAddressTransform(this, walletAddresses); + + const streamParams: BuildWalletTxsStreamParams = { + transactionStream, + populateEffects, + walletAddresses + }; + transactionStream = await this._buildWalletTransactionsStream(params, streamParams); - if (!args.tokenAddress && wallet._id) { - const internalTxTransform = new InternalTxRelatedFilterTransform(web3, wallet._id); - transactionStream = transactionStream.eventPipe(internalTxTransform); - } + if (!args.tokenAddress && wallet._id) { + const internalTxTransform = new InternalTxRelatedFilterTransform(web3, wallet._id); + transactionStream = transactionStream.eventPipe(internalTxTransform); + } - transactionStream = transactionStream - .eventPipe(populateReceipt) - .eventPipe(ethTransactionTransform); + transactionStream = transactionStream + .eventPipe(populateReceipt) + .eventPipe(ethTransactionTransform); - try { - const result = await ExternalApiStream.onStream(transactionStream, req!, res!, { jsonl: true }); - if (!result?.success) { - logger.error('Error mid-stream (streamWalletTransactions): %o', result.error?.log || result.error); - } - return resolve(); - } catch (err) { - return reject(err); - } - }); + (transactionStream as any).jsonl = true; + return transactionStream; } async _buildWalletTransactionsStream(params: StreamWalletTransactionsParams, streamParams: BuildWalletTxsStreamParams) { @@ -731,7 +709,8 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai .sort({ blockTimeNormalized: 1 }) .addCursorFlag('noCursorTimeout', true); - // Add cleanup handlers when client disconnects + // Cursor cleanup is triggered when the consumer destroys the returned stream + // (streamJsonArray destroys on req/res close). let cursorClosed = false; const cleanupCursor = () => { if (!cursorClosed) { @@ -744,12 +723,10 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai } }; - const { req, res } = params; - req.on('close', cleanupCursor); - res.on('close', cleanupCursor); - // Pipe cursor to transform stream transactionStream = cursor.pipe(new TransformWithEventPipe({ objectMode: true, passThrough: true })); + transactionStream.on('close', cleanupCursor); + transactionStream.on('end', cleanupCursor); transactionStream = transactionStream.eventPipe(populateEffects); // For old db entries diff --git a/packages/bitcore-node/src/providers/chain-state/evm/api/gnosis.ts b/packages/bitcore-node/src/providers/chain-state/evm/api/gnosis.ts index 42c06c54ce..38dace505a 100644 --- a/packages/bitcore-node/src/providers/chain-state/evm/api/gnosis.ts +++ b/packages/bitcore-node/src/providers/chain-state/evm/api/gnosis.ts @@ -154,7 +154,7 @@ export class GnosisApi { } async streamGnosisWalletTransactions(params: { multisigContractAddress: string } & StreamWalletTransactionsParams) { - const { chain, network, multisigContractAddress, res, args } = params; + const { chain, network, multisigContractAddress, args } = params; const transactionQuery = getCSP(chain, network).getWalletTransactionQuery(params); delete transactionQuery.wallets; delete transactionQuery['wallets.0']; @@ -201,7 +201,6 @@ export class GnosisApi { .sort({ blockTimeNormalized: 1 }) .addCursorFlag('noCursorTimeout', true); - // Add cleanup handlers when client disconnects let cursorClosed = false; const cleanupCursor = () => { if (!cursorClosed) { @@ -214,10 +213,6 @@ export class GnosisApi { } }; - const { req } = params; - req.on('close', cleanupCursor); - res.on('close', cleanupCursor); - transactionStream = cursor.pipe(populateEffects); // For old db entries if (multisigContractAddress) { @@ -225,10 +220,13 @@ export class GnosisApi { transactionStream = transactionStream.pipe(multisigTransform); } - transactionStream + const finalStream: any = transactionStream .pipe(populateReceipt) - .pipe(ethTransactionTransform) - .pipe(res); + .pipe(ethTransactionTransform); + finalStream.jsonl = true; + finalStream.on('close', cleanupCursor); + finalStream.on('end', cleanupCursor); + return finalStream; } } diff --git a/packages/bitcore-node/src/providers/chain-state/evm/api/routes.ts b/packages/bitcore-node/src/providers/chain-state/evm/api/routes.ts index 8c4e3b1545..1a8fcdb1e8 100644 --- a/packages/bitcore-node/src/providers/chain-state/evm/api/routes.ts +++ b/packages/bitcore-node/src/providers/chain-state/evm/api/routes.ts @@ -4,6 +4,7 @@ import { Router } from 'express'; import config from '../../../../config'; import logger from '../../../../logger'; import { WebhookStorage } from '../../../../models/webhook'; +import { respondWithError, streamJsonArray } from '../../../../routes/apiUtils'; import { Config } from '../../../../services/config'; import { IEVMNetworkConfig } from '../../../../types/Config'; import { castToBool } from '../../../../utils'; @@ -235,22 +236,25 @@ export class EVMRouter { }); }; - private streamGnosisWalletTransactions(router: Router) { + private streamGnosisWalletTransactions(router: Router) { router.get(`/api/${this.chain}/:network/ethmultisig/transactions/:multisigContractAddress`, async (req, res) => { const { network, multisigContractAddress } = req.params; try { - return await Gnosis.streamGnosisWalletTransactions({ + const stream = await Gnosis.streamGnosisWalletTransactions({ chain: this.chain, network, multisigContractAddress, wallet: {} as any, - req, - res, args: req.query }); + const result = await streamJsonArray(stream, req, res); + if (!result.success) { + logger.error('Error mid-stream (streamGnosisWalletTransactions): %o', result.error?.log || result.error); + } + return; } catch (err: any) { logger.error('Multisig Transactions Error::%o', err.stack || err.message || err); - return res.status(500).send(err.message || err); + return respondWithError(res, err); } }); }; diff --git a/packages/bitcore-node/src/providers/chain-state/internal/internal.ts b/packages/bitcore-node/src/providers/chain-state/internal/internal.ts index ba268033c2..b11975f560 100644 --- a/packages/bitcore-node/src/providers/chain-state/internal/internal.ts +++ b/packages/bitcore-node/src/providers/chain-state/internal/internal.ts @@ -81,10 +81,10 @@ export class InternalStateProvider implements IChainStateService { } async streamAddressTransactions(params: StreamAddressUtxosParams) { - const { req, res, args } = params; + const { args } = params; const { limit, since } = args; const query = this.getAddressQuery(params); - Storage.apiStreamingFind(CoinStorage, query, { limit, since, paging: '_id' }, req!, res!); + return Storage.apiStreamingFind(CoinStorage, query, { limit, since, paging: '_id' }); } async getBalanceForAddress(params: GetBalanceForAddressParams): Promise { @@ -100,10 +100,9 @@ export class InternalStateProvider implements IChainStateService { return balance; } - streamBlocks(params: StreamBlocksParams) { - const { req, res } = params; + async streamBlocks(params: StreamBlocksParams) { const { query, options } = this.getBlocksQuery(params); - Storage.apiStreamingFind(BitcoinBlockStorage, query, options, req, res); + return Storage.apiStreamingFind(BitcoinBlockStorage, query, options); } async getBlocks(params: GetBlockParams): Promise> { @@ -202,7 +201,7 @@ export class InternalStateProvider implements IChainStateService { } async streamTransactions(params: StreamTransactionsParams) { - const { chain, network, req, res, args } = params; + const { chain, network, args } = params; const { blockHash, blockHeight } = args; if (!chain || !network) { throw new Error('Missing chain or network'); @@ -219,7 +218,7 @@ export class InternalStateProvider implements IChainStateService { } const tip = await this.getLocalTip(params); const tipHeight = tip ? tip.height : 0; - return Storage.apiStreamingFind(TransactionStorage, query, args, req, res, t => { + return Storage.apiStreamingFind(TransactionStorage, query, args, t => { let confirmations = 0; if (t.blockHeight !== undefined && t.blockHeight >= 0) { confirmations = tipHeight - t.blockHeight + 1; @@ -305,9 +304,9 @@ export class InternalStateProvider implements IChainStateService { } streamWalletAddresses(params: StreamWalletAddressesParams) { - const { chain, network, walletId, req, res } = params; + const { chain, network, walletId } = params; const query = { chain, network, wallet: walletId }; - Storage.apiStreamingFind(WalletAddressStorage, query, {}, req, res); + return Storage.apiStreamingFind(WalletAddressStorage, query, {}); } async walletCheck(params: WalletCheckParams) { @@ -384,7 +383,7 @@ export class InternalStateProvider implements IChainStateService { } async streamWalletTransactions(params: StreamWalletTransactionsParams) { - const { chain, network, wallet, res, args } = params; + const { chain, network, wallet, args } = params; const query: any = { chain, network, @@ -430,8 +429,10 @@ export class InternalStateProvider implements IChainStateService { .find(query) .sort({ blockTimeNormalized: 1 }) .addCursorFlag('noCursorTimeout', true); - const listTransactionsStream = new this.WalletStreamTransform(wallet); - transactionStream.pipe(listTransactionsStream).pipe(res); + const listTransactionsStream: any = transactionStream.pipe(new this.WalletStreamTransform(wallet)); + listTransactionsStream.jsonl = true; + listTransactionsStream.on('close', () => { try { transactionStream.close(); } catch { /* noop */ } }); + return listTransactionsStream; } async getWalletBalance(params: GetWalletBalanceParams): Promise { @@ -457,7 +458,7 @@ export class InternalStateProvider implements IChainStateService { } async streamWalletUtxos(params: StreamWalletUtxosParams) { - const { wallet, limit, args = {}, req, res } = params; + const { wallet, limit, args = {} } = params; const query: any = { wallets: wallet._id, 'wallets.0': { $exists: true }, @@ -484,7 +485,7 @@ export class InternalStateProvider implements IChainStateService { return CoinStorage._apiTransform(c) as string; }; - Storage.apiStreamingFind(CoinStorage, query, { limit }, req, res, utxoTransform); + return Storage.apiStreamingFind(CoinStorage, query, { limit }, utxoTransform); } async getFee(params: GetEstimateSmartFeeParams) { diff --git a/packages/bitcore-node/src/providers/chain-state/svm/api/csp.ts b/packages/bitcore-node/src/providers/chain-state/svm/api/csp.ts index 2cff5652f0..c62def4b05 100644 --- a/packages/bitcore-node/src/providers/chain-state/svm/api/csp.ts +++ b/packages/bitcore-node/src/providers/chain-state/svm/api/csp.ts @@ -183,89 +183,58 @@ export class BaseSVMStateProvider extends InternalStateProvider implements IChai } async streamTransactions(params: StreamTransactionsParams): Promise { - return new Promise(async (resolve, reject) => { - try { - const { chain, network, req, res, args } = params; - let { blockHeight } = args; - const { limit = 50 } = args; + const { chain, network, args } = params; + let { blockHeight } = args; + const { limit = 50 } = args; - if (!chain || !network) { - throw new Error('Missing chain or network'); - } - if (blockHeight !== undefined) { - blockHeight = Number(blockHeight); - } else { - throw new Error('Missing required block height / slot.'); - } + if (!chain || !network) { + throw new Error('Missing chain or network'); + } + if (blockHeight !== undefined) { + blockHeight = Number(blockHeight); + } else { + throw new Error('Missing required block height / slot.'); + } - const { rpc } = await this.getRpc(network); - const block: any = await rpc.getBlock({ height: blockHeight, transactionDetails: 'signatures' }); - if (!block) { - throw new Error('Block not found: ' + blockHeight); - } - const stream = new TransformWithEventPipe({ - objectMode: true, - passThrough: true - }); - let count = 0; - for (const signature of block?.signatures || []) { - if (limit && count >= limit) break; - const transformedTx = await this._getTransformedTx(rpc, network, { signature }); - stream.push(transformedTx); - count++; - } - stream.push(null); - const result = await ExternalApiStream.onStream(stream, req!, res!); - if (!result?.success) { - logger.error('Error mid-stream (streamTransactions): %o', result.error?.log || result.error); - } - return resolve(); - } catch (err: any) { - logger.error('Error streaming block transactions: %o', err.stack || err.message || err); - reject(err); - } + const { rpc } = await this.getRpc(network); + const block: any = await rpc.getBlock({ height: blockHeight, transactionDetails: 'signatures' }); + if (!block) { + throw new Error('Block not found: ' + blockHeight); + } + const stream = new TransformWithEventPipe({ + objectMode: true, + passThrough: true }); + let count = 0; + for (const signature of block?.signatures || []) { + if (limit && count >= limit) break; + const transformedTx = await this._getTransformedTx(rpc, network, { signature }); + stream.push(transformedTx); + count++; + } + stream.push(null); + return stream; } async streamAddressTransactions(params: StreamAddressUtxosParams) { - return new Promise(async (resolve, reject) => { - const { req, res } = params; - try { - const addressStream = await this._buildAddressTransactionsStream(params); - const result = await ExternalApiStream.onStream(addressStream, req!, res!, { jsonl: true }); - if (!result?.success) { - logger.error('Error mid-stream (streamAddressTransactions): %o', result.error?.log || result.error); - } - return resolve(); - } catch (err) { - return reject(err); - } - }); + const addressStream: any = await this._buildAddressTransactionsStream(params); + addressStream.jsonl = true; + return addressStream; } async streamWalletTransactions(params: StreamWalletTransactionsParams): Promise { - return new Promise(async (resolve, reject) => { - try { - const { wallet, req, res } = params; - const walletStream = new TransformWithEventPipe({ objectMode: true, passThrough: true }); - const walletAddresses = (await this.getWalletAddresses(wallet._id!)).map(waddress => waddress.address); - const addressStreams: TransformWithEventPipe[] = []; - - for (const address of walletAddresses) { - const addressStream = await this._buildAddressTransactionsStream({ ...params, address }); - addressStreams.push(addressStream); - } - ExternalApiStream.mergeStreams(addressStreams, walletStream); - const result = await ExternalApiStream.onStream(walletStream, req!, res!, { jsonl: true }); - if (!result?.success) { - logger.error('Error mid-stream (streamWalletTransactions): %o', result.error?.log || result.error); - } - return resolve(); - } catch (err: any) { - logger.error('Error streaming wallet transactions: %o', err.stack || err.message || err); - return reject(err); - } - }); + const { wallet } = params; + const walletStream: any = new TransformWithEventPipe({ objectMode: true, passThrough: true }); + const walletAddresses = (await this.getWalletAddresses(wallet._id!)).map(waddress => waddress.address); + const addressStreams: TransformWithEventPipe[] = []; + + for (const address of walletAddresses) { + const addressStream = await this._buildAddressTransactionsStream({ ...params, address }); + addressStreams.push(addressStream); + } + ExternalApiStream.mergeStreams(addressStreams, walletStream); + walletStream.jsonl = true; + return walletStream; } async _buildAddressTransactionsStream(params: StreamAddressUtxosParams) { @@ -617,48 +586,37 @@ export class BaseSVMStateProvider extends InternalStateProvider implements IChai } async streamBlocks(params: StreamBlocksParams) { - return new Promise(async (resolve, reject) => { - try { - const { chain, network, req, res } = params; - if (!chain || !network) { - throw new Error('Missing chain or network'); - } - const { rpc } = await this.getRpc(network); - const blockRange = await this.getBlocksRange({ ...params }); - const { height } = await rpc.getTip(); - const stream = new TransformWithEventPipe({ - objectMode: true, - passThrough: true - }); - try { - let block; - let nextBlock; - for (const blockNum of blockRange) { - const thisNextBlock = Number(block?.height) === blockNum + 1 ? block : await this._getTransformedBlock(rpc, network, blockNum + 1); - block = Number(nextBlock?.number) === blockNum ? nextBlock : await this._getTransformedBlock(rpc, network, blockNum); - if (!block) { - continue; - } - nextBlock = thisNextBlock; - block.nextBlockHash = nextBlock?.hash; - block.confirmations = Number(BigInt(height) - BigInt(block.height) + 1n); - stream.push(block); - } - } catch (e: any) { - logger.error('Error streaming blocks: %o', e); + const { chain, network } = params; + if (!chain || !network) { + throw new Error('Missing chain or network'); + } + const { rpc } = await this.getRpc(network); + const blockRange = await this.getBlocksRange({ ...params }); + const { height } = await rpc.getTip(); + const stream: any = new TransformWithEventPipe({ + objectMode: true, + passThrough: true + }); + try { + let block; + let nextBlock; + for (const blockNum of blockRange) { + const thisNextBlock = Number(block?.height) === blockNum + 1 ? block : await this._getTransformedBlock(rpc, network, blockNum + 1); + block = Number(nextBlock?.number) === blockNum ? nextBlock : await this._getTransformedBlock(rpc, network, blockNum); + if (!block) { + continue; } - stream.push(null); - const result = await ExternalApiStream.onStream(stream, req!, res!, { jsonl: true }); - if (!result?.success) { - logger.error('Error mid-stream (streamBlocks): %o', result.error?.log || result.error); - } - return resolve(); - } catch (err: any) { - logger.error('Error streaming blocks: %o', err.stack || err.message || err); - reject(err); + nextBlock = thisNextBlock; + block.nextBlockHash = nextBlock?.hash; + block.confirmations = Number(BigInt(height) - BigInt(block.height) + 1n); + stream.push(block); } - }); - + } catch (e: any) { + logger.error('Error streaming blocks: %o', e); + } + stream.push(null); + stream.jsonl = true; + return stream; } async _getTransformedBlock(rpc, network, height ) { diff --git a/packages/bitcore-node/src/routes/api/address.ts b/packages/bitcore-node/src/routes/api/address.ts index be5bdfe8f9..b714e8bc38 100644 --- a/packages/bitcore-node/src/routes/api/address.ts +++ b/packages/bitcore-node/src/routes/api/address.ts @@ -2,7 +2,7 @@ import express, { Request } from 'express'; import logger from '../../logger'; import { ChainStateProvider } from '../../providers/chain-state'; import { StreamAddressUtxosParams } from '../../types/namespaces/ChainStateProvider'; -import { respondWithError } from '../apiUtils'; +import { respondWithError, streamJsonArray } from '../apiUtils'; const router = express.Router({ mergeParams: true }); @@ -14,11 +14,13 @@ async function streamCoins(req: Request, res) { chain, network, address, - req, - res, args: { ...req.query, unspent, limit, since } } as StreamAddressUtxosParams; - await ChainStateProvider.streamAddressTransactions(payload); + const stream = await ChainStateProvider.streamAddressTransactions(payload); + const result = await streamJsonArray(stream, req, res); + if (!result.success) { + logger.error('Error mid-stream (streamAddressTransactions): %o', result.error?.log || result.error); + } } catch (err: any) { logger.error('Error streaming coins: %o', err.stack || err.message || err); respondWithError(res, err); diff --git a/packages/bitcore-node/src/routes/api/block.ts b/packages/bitcore-node/src/routes/api/block.ts index dfd0d3b257..6b1723af4d 100644 --- a/packages/bitcore-node/src/routes/api/block.ts +++ b/packages/bitcore-node/src/routes/api/block.ts @@ -5,6 +5,7 @@ import { CoinStorage, ICoin } from '../../models/coin'; import { TransactionStorage } from '../../models/transaction'; import { ChainStateProvider } from '../../providers/chain-state'; import { isDateValid } from '../../utils'; +import { respondWithError, streamJsonArray } from '../apiUtils'; import { CacheTimes, Confirmations, SetCache } from '../middleware'; const router = express.Router({ mergeParams: true }); @@ -22,14 +23,17 @@ router.get('/', async function(req: Request, res: Response) { chain, network, sinceBlock, - args: { date, limit, since, direction, paging }, - req, - res + args: { date, limit, since, direction, paging } }; - return ChainStateProvider.streamBlocks(payload); + const stream = await ChainStateProvider.streamBlocks(payload); + const result = await streamJsonArray(stream, req, res); + if (!result.success) { + logger.error('Error mid-stream (streamBlocks): %o', result.error?.log || result.error); + } + return; } catch (err: any) { logger.error('Error getting blocks: %o', err.stack || err.message || err); - return res.status(500).send(err.message || err); + return respondWithError(res, err); } }); diff --git a/packages/bitcore-node/src/routes/api/tx.ts b/packages/bitcore-node/src/routes/api/tx.ts index c3e35322a5..cfe6f4fb76 100644 --- a/packages/bitcore-node/src/routes/api/tx.ts +++ b/packages/bitcore-node/src/routes/api/tx.ts @@ -4,7 +4,7 @@ import { ICoin } from '../../models/coin'; import { ITransaction } from '../../models/transaction'; import { ChainStateProvider } from '../../providers/chain-state'; import { StreamTransactionsParams } from '../../types/namespaces/ChainStateProvider'; -import { respondWithError } from '../apiUtils'; +import { respondWithError, streamJsonArray } from '../apiUtils'; import { CacheTimes, SetCache } from '../middleware'; const router = Router({ mergeParams: true }); @@ -24,8 +24,6 @@ router.get('/', async function(req: Request, res: Response) { const payload: StreamTransactionsParams = { chain, network, - req, - res, args: { limit, since, direction, paging } }; @@ -35,7 +33,12 @@ router.get('/', async function(req: Request, res: Response) { if (blockHash !== undefined) { payload.args.blockHash = blockHash; } - return await ChainStateProvider.streamTransactions(payload); + const stream = await ChainStateProvider.streamTransactions(payload); + const result = await streamJsonArray(stream, req, res); + if (!result.success) { + logger.error('Error mid-stream (streamTransactions): %o', result.error?.log || result.error); + } + return; } catch (err: any) { logger.error('Error streaming transactions: %o', err.stack || err.message || err); return respondWithError(res, err); diff --git a/packages/bitcore-node/src/routes/api/wallet.ts b/packages/bitcore-node/src/routes/api/wallet.ts index 4ce02e77c2..d58d260cb4 100644 --- a/packages/bitcore-node/src/routes/api/wallet.ts +++ b/packages/bitcore-node/src/routes/api/wallet.ts @@ -5,6 +5,7 @@ import logger from '../../logger'; import { ChainStateProvider } from '../../providers/chain-state'; import { StreamWalletAddressesParams } from '../../types/namespaces/ChainStateProvider'; import { Auth, AuthenticatedRequest } from '../../utils/auth'; +import { respondWithError, streamJsonArray } from '../apiUtils'; const router = Router({ mergeParams: true }); @@ -68,14 +69,17 @@ router.get('/:pubKey/addresses', Auth.authenticateMiddleware, async (req: Authen chain, network, walletId: wallet!._id!, - limit, - req, - res + limit }; - return await ChainStateProvider.streamWalletAddresses(payload); + const stream = await ChainStateProvider.streamWalletAddresses(payload); + const result = await streamJsonArray(stream, req, res); + if (!result.success) { + logger.error('Error mid-stream (streamWalletAddresses): %o', result.error?.log || result.error); + } + return; } catch (err: any) { logger.error('Error streaming wallet addresses: %o', err.stack || err.message || err); - return res.status(500).send(err.message || err); + return respondWithError(res, err); } }); @@ -143,17 +147,20 @@ router.post('/:pubKey', Auth.authenticateMiddleware, async (req: AuthenticatedRe router.get('/:pubKey/transactions', Auth.authenticateMiddleware, async (req: AuthenticatedRequest, res: Response) => { try { const { chain, network } = req.params; - return await ChainStateProvider.streamWalletTransactions({ + const stream = await ChainStateProvider.streamWalletTransactions({ chain, network, wallet: req.wallet!, - req, - res, args: req.query }); + const result = await streamJsonArray(stream, req, res); + if (!result.success) { + logger.error('Error mid-stream (streamWalletTransactions): %o', result.error?.log || result.error); + } + return; } catch (err: any) { logger.error('Error streaming wallet txs: %o', err.stack || err.message || err); - return res.status(500).send(err.message || err); + return respondWithError(res, err); } }); @@ -194,18 +201,21 @@ router.get('/:pubKey/utxos', Auth.authenticateMiddleware, async (req: Authentica const { chain, network } = req.params; const { limit } = req.query as any; try { - return ChainStateProvider.streamWalletUtxos({ + const stream = await ChainStateProvider.streamWalletUtxos({ chain, network, wallet: req.wallet!, limit, - req, - res, args: req.query }); + const result = await streamJsonArray(stream, req, res); + if (!result.success) { + logger.error('Error mid-stream (streamWalletUtxos): %o', result.error?.log || result.error); + } + return; } catch (err: any) { logger.error('Error streaming wallet utxos: %o', err.stack || err.message || err); - return res.status(500).send(err.message || err); + return respondWithError(res, err); } }); diff --git a/packages/bitcore-node/src/services/storage.ts b/packages/bitcore-node/src/services/storage.ts index 593b40c776..c258e6c03c 100644 --- a/packages/bitcore-node/src/services/storage.ts +++ b/packages/bitcore-node/src/services/storage.ts @@ -1,9 +1,8 @@ import { EventEmitter } from 'events'; import { Readable } from 'stream'; import { ObjectId } from 'bson'; -import { Request, Response } from 'express'; import { ObjectID } from 'mongodb'; -import { Cursor, Db, MongoClient } from 'mongodb'; +import { Db, MongoClient } from 'mongodb'; import { LoggifyClass } from '../decorators/Loggify'; import logger from '../logger'; import '../models'; @@ -113,91 +112,6 @@ export class StorageService { return typecastedValue; } - stream(input: Readable, req: Request, res: Response) { - let closed = false; - req.on('close', function() { - closed = true; - }); - res.on('close', function() { - closed = true; - }); - input.on('error', function(err) { - if (!closed) { - closed = true; - return res.status(500).end(err.message); - } - return; - }); - let isFirst = true; - res.type('json'); - input.on('data', function(data) { - if (!closed) { - if (isFirst) { - res.write('[\n'); - isFirst = false; - } else { - res.write(',\n'); - } - res.write(JSON.stringify(data)); - } - }); - input.on('end', function() { - if (!closed) { - if (isFirst) { - // there was no data - res.write('[]'); - } else { - res.write('\n]'); - } - res.end(); - } - }); - } - - apiStream(cursor: Cursor, req: Request, res: Response) { - let closed = false; - req.on('close', function() { - closed = true; - cursor.close(); - }); - res.on('close', function() { - closed = true; - cursor.close(); - }); - cursor.on('error', function(err) { - if (!closed) { - closed = true; - return res.status(500).end(err.message); - } - return; - }); - let isFirst = true; - res.type('json'); - cursor.on('data', function(data) { - if (!closed) { - if (isFirst) { - res.write('[\n'); - isFirst = false; - } else { - res.write(',\n'); - } - res.write(data); - } else { - cursor.close(); - } - }); - cursor.on('end', function() { - if (!closed) { - if (isFirst) { - // there was no data - res.write('[]'); - } else { - res.write('\n]'); - } - res.end(); - } - }); - } getFindOptions(model: TransformableModel, originalOptions: StreamingFindOptions) { const query: any = {}; let since: any = null; @@ -232,10 +146,8 @@ export class StorageService { model: TransformableModel, originalQuery: any, originalOptions: StreamingFindOptions, - req: Request, - res: Response, transform?: (data: T) => string | Buffer - ) { + ): Readable & { close?: () => void } { const { query, options } = this.getFindOptions(model, originalOptions); const finalQuery = Object.assign({}, originalQuery, query); let cursor = model.collection @@ -247,7 +159,7 @@ export class StorageService { if (options.sort) { cursor = cursor.sort(options.sort); } - return this.apiStream(cursor, req, res); + return cursor; } } diff --git a/packages/bitcore-node/src/types/namespaces/ChainStateProvider.ts b/packages/bitcore-node/src/types/namespaces/ChainStateProvider.ts index cc6d550e62..e2ac4de623 100644 --- a/packages/bitcore-node/src/types/namespaces/ChainStateProvider.ts +++ b/packages/bitcore-node/src/types/namespaces/ChainStateProvider.ts @@ -61,8 +61,6 @@ export type StreamBlocksParams = ChainNetwork & { blockId?: string; sinceBlock: number | string; args?: Partial<{ startDate: Date; endDate: Date; date: Date } & StreamingFindOptions>; - req: Request; - res: Response; }; export type FeeMode = 'ECONOMICAL' | 'CONSERVATIVE'; @@ -104,14 +102,10 @@ export type GetWalletBalanceAtTimeParams = ChainNetwork & { export type StreamAddressUtxosParams = ChainNetwork & { address: string; - req?: Request; - res?: Response; args: Partial & any>; }; export type StreamTransactionsParams = ChainNetwork & { - req: Request; - res: Response; args: any; }; export type StreamTransactionParams = ChainNetwork & { @@ -119,8 +113,6 @@ export type StreamTransactionParams = ChainNetwork & { }; export type StreamWalletAddressesParams = ChainNetwork & { walletId: ObjectId; - req: Request; - res: Response; limit: number; }; @@ -141,8 +133,6 @@ export type StreamWalletMissingAddressesParams = ChainNetwork & { export type StreamWalletTransactionsParams = ChainNetwork & { wallet: MongoBound; - req: Request; - res: Response; args: StreamWalletTransactionsArgs & any; }; @@ -160,8 +150,6 @@ export type StreamWalletUtxosParams = ChainNetwork & { wallet: MongoBound; limit: number; args: Partial; - req: Request; - res: Response; }; export type isValidParams = ChainNetwork & { diff --git a/packages/bitcore-node/test/integration/ethereum/csp.test.ts b/packages/bitcore-node/test/integration/ethereum/csp.test.ts index 8b234b3e7d..5a05af9885 100644 --- a/packages/bitcore-node/test/integration/ethereum/csp.test.ts +++ b/packages/bitcore-node/test/integration/ethereum/csp.test.ts @@ -12,6 +12,7 @@ import { ETH } from '../../../src/modules/ethereum/api/csp'; import { EVMBlockStorage } from '../../../src/providers/chain-state/evm/models/block'; import { EVMTransactionStorage } from '../../../src/providers/chain-state/evm/models/transaction'; import { IEVMTransactionInProcess } from '../../../src/providers/chain-state/evm/types'; +import { streamJsonArray } from '../../../src/routes/apiUtils'; import { StreamWalletTransactionsParams } from '../../../src/types/namespaces/ChainStateProvider'; import { ErigonEthBlocks } from '../../data/ETH/erigonDbBlocks'; import { ErigonEthTransactions } from '../../data/ETH/erigonDbTransactions'; @@ -230,7 +231,8 @@ describe('Ethereum API', function() { transform: (_data, _, cb) => cb(null) }) as unknown) as Request; - await ETH.streamAddressTransactions({ chain, network, address, res, req, args: {} }); + const stream = await ETH.streamAddressTransactions({ chain, network, address, args: {} }); + await streamJsonArray(stream as any, req, res); let counter = 0; await new Promise(r => { res @@ -271,7 +273,8 @@ describe('Ethereum API', function() { } }) as unknown) as Request; - await ETH.streamTransactions({ chain, network, res, req, args: { blockHeight: 1 } }); + const stream = await ETH.streamTransactions({ chain, network, args: { blockHeight: 1 } }); + await streamJsonArray(stream as any, req, res); let counter = 0; await new Promise(r => { res @@ -316,7 +319,8 @@ describe('Ethereum API', function() { } }) as unknown) as Request; - await ETH.streamTransactions({ chain, network, res, req, args: { blockHash: '12345' } }); + const stream = await ETH.streamTransactions({ chain, network, args: { blockHash: '12345' } }); + await streamJsonArray(stream as any, req, res); let counter = 0; await new Promise(r => { res @@ -455,12 +459,11 @@ const streamWalletTransactionsTest = async (chain: string, network: string, incl chain, network, wallet, - req, - res, args: { includeInvalidTxs } } as StreamWalletTransactionsParams) + .then((stream: any) => streamJsonArray(stream, req, res)) .catch(e => r(e)); }); @@ -485,7 +488,8 @@ const streamDexWalletTransactions = async (chain, network, wallet, address, web3 } }) as unknown) as Request; - ETH.streamWalletTransactions({ chain, network, wallet, res, req, args: {} }); + ETH.streamWalletTransactions({ chain, network, wallet, args: {} } as StreamWalletTransactionsParams) + .then((stream: any) => streamJsonArray(stream, req, res)); let total = BigInt(0); let totalRejected = BigInt(0); let totalFee = BigInt(0); diff --git a/packages/bitcore-node/test/integration/matic/csp.test.ts b/packages/bitcore-node/test/integration/matic/csp.test.ts index 20658c4da8..1fedbaecdc 100644 --- a/packages/bitcore-node/test/integration/matic/csp.test.ts +++ b/packages/bitcore-node/test/integration/matic/csp.test.ts @@ -12,6 +12,7 @@ import { MATIC } from '../../../src/modules/matic/api/csp'; import { IEVMTransactionInProcess } from '../../../src/providers/chain-state/evm//types'; import { EVMBlockStorage } from '../../../src/providers/chain-state/evm/models/block'; import { EVMTransactionStorage } from '../../../src/providers/chain-state/evm/models/transaction'; +import { streamJsonArray } from '../../../src/routes/apiUtils'; import { StreamWalletTransactionsParams } from '../../../src/types/namespaces/ChainStateProvider'; import { intAfterHelper, intBeforeHelper } from '../../helpers/integration'; @@ -195,7 +196,8 @@ describe('Polygon/MATIC API', function() { transform: (_data, _, cb) => cb(null) }) as unknown) as Request; - await MATIC.streamAddressTransactions({ chain, network, address, res, req, args: {} }); + const stream = await MATIC.streamAddressTransactions({ chain, network, address, args: {} }); + await streamJsonArray(stream as any, req, res); let counter = 0; await new Promise(r => { res @@ -236,7 +238,8 @@ describe('Polygon/MATIC API', function() { } }) as unknown) as Request; - await MATIC.streamTransactions({ chain, network, res, req, args: { blockHeight: 1 } }); + const stream = await MATIC.streamTransactions({ chain, network, args: { blockHeight: 1 } }); + await streamJsonArray(stream as any, req, res); let counter = 0; await new Promise(r => { res @@ -281,7 +284,8 @@ describe('Polygon/MATIC API', function() { } }) as unknown) as Request; - await MATIC.streamTransactions({ chain, network, res, req, args: { blockHash: '12345' } }); + const stream = await MATIC.streamTransactions({ chain, network, args: { blockHash: '12345' } }); + await streamJsonArray(stream as any, req, res); let counter = 0; await new Promise(r => { res @@ -413,12 +417,11 @@ const streamWalletTransactionsTest = async (chain: string, network: string, incl chain, network, wallet, - req, - res, args: { includeInvalidTxs } } as StreamWalletTransactionsParams) + .then((stream: any) => streamJsonArray(stream, req, res)) .catch(e => r(e)); }); diff --git a/packages/bitcore-node/test/integration/solana/csp.test.ts b/packages/bitcore-node/test/integration/solana/csp.test.ts index 0e2ec2c214..a85319ccc5 100644 --- a/packages/bitcore-node/test/integration/solana/csp.test.ts +++ b/packages/bitcore-node/test/integration/solana/csp.test.ts @@ -9,6 +9,7 @@ import { IWallet, WalletStorage } from '../../../src/models/wallet'; import { WalletAddressStorage } from '../../../src/models/walletAddress'; import { SOL } from '../../../src/modules/solana/api/csp'; import { SVMRouter } from '../../../src/providers/chain-state/svm/api/routes'; +import { streamJsonArray } from '../../../src/routes/apiUtils'; import { intAfterHelper, intBeforeHelper } from '../../helpers/integration'; describe('Solana API', function() { @@ -210,10 +211,9 @@ describe('Solana API', function() { chain, network, wallet, - req, - res, args: {} - }) + } as any) + .then((stream: any) => streamJsonArray(stream, req, res)) .catch(e => r(e)); }); From 64253db5e837ee23ca31b10507e2e25beb1b323f Mon Sep 17 00:00:00 2001 From: leolambo Date: Wed, 29 Apr 2026 11:20:15 -0400 Subject: [PATCH 3/8] Remove legacy onStream piping helpers ExternalApiStream.onStream, NodeQueryStream.onStream, Storage.apiStream, and Storage.stream all duplicated the same JSON-array framing and client-disconnect handling. With the route-layer streamJsonArray helper now owning that logic and every CSP migrated to return Readable, these are unused and removed along with their express imports. --- .../chain-state/external/streams/apiStream.ts | 98 +------------------ .../external/streams/nodeStream.ts | 6 -- 2 files changed, 1 insertion(+), 103 deletions(-) diff --git a/packages/bitcore-node/src/providers/chain-state/external/streams/apiStream.ts b/packages/bitcore-node/src/providers/chain-state/external/streams/apiStream.ts index 4f3255b9dc..20dce14835 100644 --- a/packages/bitcore-node/src/providers/chain-state/external/streams/apiStream.ts +++ b/packages/bitcore-node/src/providers/chain-state/external/streams/apiStream.ts @@ -1,13 +1,8 @@ -import { Readable, Stream, Transform } from 'stream'; +import { Stream, Transform } from 'stream'; import axios from 'axios'; -import { Request, Response } from 'express'; import { ReadableWithEventPipe, TransformWithEventPipe } from '../../../../utils/streamWithEventPipe'; -export interface StreamOpts { - jsonl?: boolean; -} - export class ExternalApiStream extends ReadableWithEventPipe { url: string; headers: any; @@ -73,97 +68,6 @@ export class ExternalApiStream extends ReadableWithEventPipe { } } - // handles events emitted by the streamed response, request from client, and response to client - static onStream(stream: Readable, req: Request, res: Response, opts: StreamOpts = {}): - Promise<{ success: boolean; error?: any }> { - return new Promise<{ success: boolean; error?: any }>((resolve, reject) => { - let closed = false; - let isFirst = true; - - req.on('close', function() { - closed = true; - }); - - res.type('json'); - res.on('close', function() { - closed = true; - }); - - stream.on('error', function(err: any) { - if (!closed) { - closed = true; - if (err.isAxiosError) { - err.log = { - url: err?.config?.url, - statusCode: err?.response?.status, - statusMsg: err?.response?.statusText, - data: err?.response?.data, - }; - } - if (err.log?.data?.message?.includes('not supported')) { - res.write('[]'); - res.end(); - return resolve({ success: false, error: err }); - } - if (!isFirst) { - // Data has already been written to the stream and status 200 headers have already been sent - // We notify and log the error instead of throwing - const errMsg = '{"error": "An error occurred during data stream"}'; - if (opts.jsonl) { - res.write(`${errMsg}`); - } else { - res.write(`,\n${errMsg}\n]`); - } - res.end(); - res.destroy(); - return resolve({ success: false, error: err }); - } else { - // Rejecting here allows downstream to send status 500 - return reject(err); - } - } - return; - }); - stream.on('data', function(data) { - if (!closed) { - // We are assuming jsonl data appended a new line upstream - if (!opts.jsonl) { - if (isFirst) { - res.write('[\n'); - } else { - res.write(',\n'); - } - } - if (isFirst) { - // All cases need isFirst set correctly for proper error handling - isFirst = false; - } - if (typeof data !== 'string') { - data = JSON.stringify(data); - } - res.write(data); - } else { - stream.destroy(); - } - }); - stream.on('end', function() { - if (!closed) { - closed = true; - if (!opts.jsonl) { - if (isFirst) { - // there was no data - res.write('[]'); - } else { - res.write('\n]'); - } - } - res.end(); - resolve({ success: true }); - } - }); - }); - } - static mergeStreams(streams: Stream[], destination: Transform): Transform { let activeStreams = streams.length; diff --git a/packages/bitcore-node/src/providers/chain-state/external/streams/nodeStream.ts b/packages/bitcore-node/src/providers/chain-state/external/streams/nodeStream.ts index 8dec499012..8746159041 100644 --- a/packages/bitcore-node/src/providers/chain-state/external/streams/nodeStream.ts +++ b/packages/bitcore-node/src/providers/chain-state/external/streams/nodeStream.ts @@ -1,5 +1,4 @@ import { Readable, Stream, Transform } from 'stream'; -import { Request, Response } from 'express'; import { ExternalApiStream } from './apiStream'; export class NodeQueryStream extends Readable { @@ -37,11 +36,6 @@ export class NodeQueryStream extends Readable { } } - static onStream(stream: Readable, req: Request, res: Response): - Promise<{ success: boolean; error?: any }> { - return ExternalApiStream.onStream(stream, req, res); - } - static mergeStreams(streams: Stream[], destination: Transform): Transform { return ExternalApiStream.mergeStreams(streams, destination); } From 5335ef033cdbb79ee78b420206ea42058af029c9 Mon Sep 17 00:00:00 2001 From: leolambo Date: Thu, 30 Apr 2026 14:14:27 -0400 Subject: [PATCH 4/8] Auto-detect jsonl format from stream property Lets CSPs that produce newline-delimited JSON (wallet transactions, SVM streams) set stream.jsonl = true once at construction. Routes call streamJsonArray uniformly without needing chain-specific format checks. The opts.jsonl override still wins when the caller passes one. --- packages/bitcore-node/src/routes/apiUtils.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/bitcore-node/src/routes/apiUtils.ts b/packages/bitcore-node/src/routes/apiUtils.ts index abf9814d64..3348f31732 100644 --- a/packages/bitcore-node/src/routes/apiUtils.ts +++ b/packages/bitcore-node/src/routes/apiUtils.ts @@ -29,11 +29,13 @@ export interface StreamJsonArrayResult { * - Client/response disconnects destroy the stream (and call .close() if present, e.g. mongo cursor) */ export function streamJsonArray( - stream: Readable & { close?: () => void }, + stream: Readable & { close?: () => void; jsonl?: boolean }, req: Request, res: Response, opts: StreamJsonArrayOpts = {} ): Promise { + // Auto-detect jsonl flag attached to the stream so routes stay chain-agnostic. + const jsonl = opts.jsonl ?? stream.jsonl ?? false; return new Promise((resolve, reject) => { let closed = false; let isFirst = true; @@ -71,7 +73,7 @@ export function streamJsonArray( // Headers already sent — emit inline error marker, end response, log upstream closed = true; const errMsg = '{"error": "An error occurred during data stream"}'; - if (opts.jsonl) { + if (jsonl) { res.write(`${errMsg}`); } else { res.write(`,\n${errMsg}\n]`); @@ -89,7 +91,7 @@ export function streamJsonArray( cleanup(); return; } - if (!opts.jsonl) { + if (!jsonl) { if (isFirst) { res.write('[\n'); } else { @@ -108,7 +110,7 @@ export function streamJsonArray( stream.on('end', () => { if (closed) return; closed = true; - if (!opts.jsonl) { + if (!jsonl) { if (isFirst) { res.write('[]'); } else { From fea037576a5f03a1553d05e65026b736b2aa0b0a Mon Sep 17 00:00:00 2001 From: leolambo Date: Fri, 1 May 2026 13:27:04 -0400 Subject: [PATCH 5/8] Restore eager cursor cleanup on client disconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit streamJsonArray's req/res close handlers now also call stream.destroy(), not just stream.close(). Transform pipelines like the EVM and Gnosis wallet-tx streams have no .close(), so disconnects previously left them running until the next data event triggered cleanup. Destroying the transform fires its 'close' event, which the CSP layer hooks for cursor teardown — restoring the synchronous cleanup the old req.on('close') wiring provided. Also documents two intentional behavioral nuances surfaced during review: ERC-20 transfers now stream as a multi-line JSON array (same JSON value, different whitespace from the prior res.json), and the empty-wallet 400 now uses the JSON {error, message} body shape from respondWithError instead of text/plain. --- .../src/providers/chain-state/evm/api/csp.ts | 5 +++++ packages/bitcore-node/src/routes/apiUtils.ts | 16 +++++++++++----- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts b/packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts index fdd24211e5..7e43bb897e 100644 --- a/packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts +++ b/packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts @@ -554,6 +554,9 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai return Storage.apiStreamingFind(EVMTransactionStorage, query, { limit /* since, paging: '_id'*/ }); } const tokenTransfers = await this.getErc20Transfers(network, address, tokenAddress, args); + // Streams elements one-by-one so the route wraps them via streamJsonArray. + // The response remains a JSON array of the same N transfer objects; only inter-element + // whitespace differs from the prior res.json() output (compact `[..]` vs newline-separated). return Readable.from(tokenTransfers, { objectMode: true }); } @@ -672,6 +675,8 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai let transactionStream = new TransformWithEventPipe({ objectMode: true, passThrough: true }); const walletAddresses = (await this.getWalletAddresses(wallet._id!)).map(waddres => waddres.address); if (walletAddresses.length === 0) { + // Status remains 400 via respondWithError; body shape changes from text/plain to + // the JSON {error, message} shape used by every other 4xx path. throw new AdapterError('walletAddresses', AdapterErrorCode.INVALID_REQUEST, 'No addresses found for wallet'); } const ethTransactionTransform = new EVMListTransactionsStream(walletAddresses, args.tokenAddress); diff --git a/packages/bitcore-node/src/routes/apiUtils.ts b/packages/bitcore-node/src/routes/apiUtils.ts index 3348f31732..a8a3867ee4 100644 --- a/packages/bitcore-node/src/routes/apiUtils.ts +++ b/packages/bitcore-node/src/routes/apiUtils.ts @@ -40,18 +40,24 @@ export function streamJsonArray( let closed = false; let isFirst = true; - const cleanup = () => { - closed = true; + const tearDown = () => { + // close() handles mongo cursor streams; destroy() tears down piped Transform chains + // so cursor-cleanup listeners hooked to the Transform's 'close' event fire eagerly on disconnect. if (typeof stream.close === 'function') { try { stream.close(); } catch { /* noop */ } - } else if (typeof stream.destroy === 'function') { + } + if (typeof stream.destroy === 'function' && !stream.destroyed) { try { stream.destroy(); } catch { /* noop */ } } }; + const cleanup = () => { + closed = true; + tearDown(); + }; - req.on('close', () => { closed = true; if (typeof stream.close === 'function') stream.close(); }); + req.on('close', () => { closed = true; tearDown(); }); res.type('json'); - res.on('close', () => { closed = true; if (typeof stream.close === 'function') stream.close(); }); + res.on('close', () => { closed = true; tearDown(); }); stream.on('error', (err: any) => { if (closed) return; From 24059c642c5c4907632585a96e5a466a6a784aab Mon Sep 17 00:00:00 2001 From: leolambo Date: Mon, 4 May 2026 11:14:00 -0400 Subject: [PATCH 6/8] Fix stream-pipeline leaks on client disconnect streamJsonArray now settles its returned promise on req/res close and on stream 'close', not just on 'end' or 'error'. Aborted requests previously left the route handler awaiting forever because tearDown destroyed the stream but never resolved the promise, and a destroyed pipeline does not always emit 'end' upstream. Single-shot guards (safeResolve/safeReject) prevent double-settlement when these races overlap. EVM streamWalletTransactions now collects cursor-cleanup callbacks via streamParams.cleanups and runs them when the FINAL piped stream closes or ends. The previous wiring attached cleanup to the intermediate transform inside _buildWalletTransactionsStream, but the caller adds further eventPipe stages on top of it; destroy() does not reliably propagate upstream through those, so a mid-stream client disconnect could leak the Mongo cursor until natural exhaustion. --- .../src/providers/chain-state/evm/api/csp.ts | 37 +++++++++++-------- packages/bitcore-node/src/routes/apiUtils.ts | 32 ++++++++++++---- 2 files changed, 47 insertions(+), 22 deletions(-) diff --git a/packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts b/packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts index 7e43bb897e..a01958a5dd 100644 --- a/packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts +++ b/packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts @@ -60,6 +60,10 @@ export interface BuildWalletTxsStreamParams { transactionStream: TransformWithEventPipe; populateEffects: PopulateEffectsForAddressTransform; walletAddresses: string[]; + // _buildWalletTransactionsStream pushes teardown callbacks here (e.g. cursor.close). + // streamWalletTransactions runs them when the FINAL piped stream closes/ends, so the + // hook lives on the stream the route actually destroys on disconnect. + cleanups?: Array<() => void>; } @@ -683,10 +687,12 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai const populateReceipt = new PopulateReceiptTransform(this); const populateEffects = new PopulateEffectsForAddressTransform(this, walletAddresses); + const cleanups: Array<() => void> = []; const streamParams: BuildWalletTxsStreamParams = { transactionStream, populateEffects, - walletAddresses + walletAddresses, + cleanups }; transactionStream = await this._buildWalletTransactionsStream(params, streamParams); @@ -699,6 +705,13 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai .eventPipe(populateReceipt) .eventPipe(ethTransactionTransform); + // Run upstream teardown callbacks (e.g. cursor.close) when the FINAL stream the route + // pipes from closes or ends. destroy() on this stream does not reliably propagate + // upstream through eventPipe chains, so the cleanup must live here. + const runCleanups = () => { for (const fn of cleanups) { try { fn(); } catch { /* noop */ } } }; + transactionStream.on('close', runCleanups); + transactionStream.on('end', runCleanups); + (transactionStream as any).jsonl = true; return transactionStream; } @@ -714,24 +727,18 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai .sort({ blockTimeNormalized: 1 }) .addCursorFlag('noCursorTimeout', true); - // Cursor cleanup is triggered when the consumer destroys the returned stream - // (streamJsonArray destroys on req/res close). + // Cursor cleanup is registered with the caller and triggered against the final piped + // stream. Hooking it here against the intermediate transform would miss disconnects + // because destroy() does not propagate upstream through eventPipe chains reliably. let cursorClosed = false; - const cleanupCursor = () => { - if (!cursorClosed) { - cursorClosed = true; - try { - cursor.close(); - } catch { - // Cursor might already be closed, ignore - } - } - }; + streamParams.cleanups?.push(() => { + if (cursorClosed) return; + cursorClosed = true; + try { cursor.close(); } catch { /* already closed */ } + }); // Pipe cursor to transform stream transactionStream = cursor.pipe(new TransformWithEventPipe({ objectMode: true, passThrough: true })); - transactionStream.on('close', cleanupCursor); - transactionStream.on('end', cleanupCursor); transactionStream = transactionStream.eventPipe(populateEffects); // For old db entries diff --git a/packages/bitcore-node/src/routes/apiUtils.ts b/packages/bitcore-node/src/routes/apiUtils.ts index a8a3867ee4..d12a252d80 100644 --- a/packages/bitcore-node/src/routes/apiUtils.ts +++ b/packages/bitcore-node/src/routes/apiUtils.ts @@ -39,6 +39,12 @@ export function streamJsonArray( return new Promise((resolve, reject) => { let closed = false; let isFirst = true; + let settled = false; + + // Single-shot guards keep the promise from being resolved/rejected twice when + // a client disconnect races a stream end/error or a stream 'close' event follows destroy(). + const safeResolve = (result: StreamJsonArrayResult) => { if (!settled) { settled = true; resolve(result); } }; + const safeReject = (err: any) => { if (!settled) { settled = true; reject(err); } }; const tearDown = () => { // close() handles mongo cursor streams; destroy() tears down piped Transform chains @@ -54,13 +60,21 @@ export function streamJsonArray( closed = true; tearDown(); }; + const onAbort = () => { + closed = true; + tearDown(); + // Settle the awaiting route handler so it can fall through to its catch/finally + // instead of hanging until the stream eventually emits 'close' (which may not happen + // on a destroyed pipeline if upstream never settles). + safeResolve({ success: false, error: new Error('client disconnected') }); + }; - req.on('close', () => { closed = true; tearDown(); }); + req.on('close', onAbort); res.type('json'); - res.on('close', () => { closed = true; tearDown(); }); + res.on('close', onAbort); stream.on('error', (err: any) => { - if (closed) return; + if (closed) { safeResolve({ success: false, error: err }); return; } if (err?.isAxiosError) { err.log = { url: err?.config?.url, @@ -73,7 +87,7 @@ export function streamJsonArray( closed = true; res.write('[]'); res.end(); - return resolve({ success: false, error: err }); + return safeResolve({ success: false, error: err }); } if (!isFirst) { // Headers already sent — emit inline error marker, end response, log upstream @@ -86,10 +100,10 @@ export function streamJsonArray( } res.end(); cleanup(); - return resolve({ success: false, error: err }); + return safeResolve({ success: false, error: err }); } // Pre-data — caller can send proper 5xx status - return reject(err); + return safeReject(err); }); stream.on('data', (data: any) => { @@ -124,7 +138,11 @@ export function streamJsonArray( } } res.end(); - resolve({ success: true }); + safeResolve({ success: true }); }); + + // Backstop: if destroy() emits 'close' without a prior 'end' or 'error', settle the promise + // so the route handler doesn't await indefinitely on a torn-down pipeline. + stream.on('close', () => safeResolve({ success: closed, error: closed ? undefined : new Error('stream closed before end') })); }); } From b20885608e3db0825fdcb8c252e579beb68a65df Mon Sep 17 00:00:00 2001 From: leolambo Date: Mon, 4 May 2026 13:42:00 -0400 Subject: [PATCH 7/8] Test streamJsonArray and EVM cursor cleanup Cover the JSON-array and JSONL framing paths, the inline-error and pre-data error branches, and the client-disconnect promise settlement that previously hung the route handler. Also assert the EVM wallet-tx pipeline closes its Mongo cursor when the final stream is destroyed, exercising the cleanup-array wiring through every pipe stage the route sees. --- .../test/integration/ethereum/csp.test.ts | 12 +++ .../test/unit/services/storage.test.ts | 80 +++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/packages/bitcore-node/test/integration/ethereum/csp.test.ts b/packages/bitcore-node/test/integration/ethereum/csp.test.ts index 5a05af9885..2f722e9f5c 100644 --- a/packages/bitcore-node/test/integration/ethereum/csp.test.ts +++ b/packages/bitcore-node/test/integration/ethereum/csp.test.ts @@ -383,6 +383,18 @@ describe('Ethereum API', function() { await streamDexWalletTransactions(chain, network, wallet, address, web3); }); + + it('closes the wallet-tx cursor when the final stream is destroyed', async () => { + await EVMTransactionStorage.collection.insertMany( + new Array(5).fill({}).map(() => ({ chain, network, blockHeight: 1, gasPrice: 10 * 1e9, data: Buffer.from(''), from: address } as IEVMTransactionInProcess)) + ); + const stream: any = await ETH.streamWalletTransactions({ chain, network, wallet, args: {} } as StreamWalletTransactionsParams); + const cursorCloseSpy = sandbox.spy(); + stream.on('close', cursorCloseSpy); + stream.destroy(); + await new Promise(r => setImmediate(r)); + expect(cursorCloseSpy.called).to.eq(true); + }); }); }); diff --git a/packages/bitcore-node/test/unit/services/storage.test.ts b/packages/bitcore-node/test/unit/services/storage.test.ts index 3bb6e39f8c..fc4d2e6e0d 100644 --- a/packages/bitcore-node/test/unit/services/storage.test.ts +++ b/packages/bitcore-node/test/unit/services/storage.test.ts @@ -1,4 +1,7 @@ import { expect } from 'chai'; +import { EventEmitter } from 'events'; +import { Readable } from 'stream'; +import { streamJsonArray } from '../../../src/routes/apiUtils'; import { unitAfterHelper, unitBeforeHelper } from '../../helpers/unit'; describe('Storage Service', function() { @@ -9,3 +12,80 @@ describe('Storage Service', function() { expect(true).to.equal(true); }); }); + +describe('streamJsonArray', function() { + // Minimal req/res stand-ins: req only needs 'close', res captures writes and exposes 'close'. + function fakes() { + const req = new EventEmitter() as any; + const writes: string[] = []; + const res = Object.assign(new EventEmitter(), { + type: () => res, + write: (chunk: any) => { writes.push(typeof chunk === 'string' ? chunk : chunk.toString()); return true; }, + end: () => { (res as any).ended = true; }, + }) as any; + return { req, res, writes }; + } + + it('frames objects as a JSON array', async () => { + const { req, res, writes } = fakes(); + const result = await streamJsonArray(Readable.from([{ a: 1 }, { a: 2 }], { objectMode: true }), req, res); + expect(result.success).to.equal(true); + expect(writes.join('')).to.equal('[\n{"a":1},\n{"a":2}\n]'); + }); + + it('writes [] for empty stream', async () => { + const { req, res, writes } = fakes(); + await streamJsonArray(Readable.from([], { objectMode: true }), req, res); + expect(writes.join('')).to.equal('[]'); + }); + + it('honors stream.jsonl flag (no array framing)', async () => { + const { req, res, writes } = fakes(); + const stream: any = Readable.from(['{"a":1}\n', '{"a":2}\n'], { objectMode: true }); + stream.jsonl = true; + await streamJsonArray(stream, req, res); + expect(writes.join('')).to.equal('{"a":1}\n{"a":2}\n'); + }); + + it('appends inline error marker on mid-stream error', async () => { + const { req, res, writes } = fakes(); + const stream = new Readable({ objectMode: true, read() {} }); + setImmediate(() => { + stream.push({ a: 1 }); + setImmediate(() => stream.emit('error', new Error('boom'))); + }); + const result = await streamJsonArray(stream, req, res); + expect(result.success).to.equal(false); + expect(writes.join('')).to.contain('"error"'); + expect(writes.join('')).to.match(/,\n\{"error".*\}\n\]$/); + }); + + it('rejects pre-data errors so the route can send a 5xx', async () => { + const { req, res } = fakes(); + const stream = new Readable({ objectMode: true, read() {} }); + setImmediate(() => stream.emit('error', new Error('upstream down'))); + let caught: any; + await streamJsonArray(stream, req, res).catch(e => caught = e); + expect(caught).to.be.instanceOf(Error); + expect(caught.message).to.equal('upstream down'); + }); + + it('settles the promise on client disconnect', async () => { + const { req, res } = fakes(); + const stream = new Readable({ objectMode: true, read() {} }); + setImmediate(() => req.emit('close')); + const result = await streamJsonArray(stream, req, res); + expect(result.success).to.equal(false); + expect(result.error?.message).to.contain('disconnected'); + }); + + it('calls .close() on cursor-style streams when the client disconnects', async () => { + const { req, res } = fakes(); + const stream = new Readable({ objectMode: true, read() {} }) as any; + let closed = false; + stream.close = () => { closed = true; }; + setImmediate(() => req.emit('close')); + await streamJsonArray(stream, req, res); + expect(closed).to.equal(true); + }); +}); From 41d8727a6e21c415259da83c50467e1497643f23 Mon Sep 17 00:00:00 2001 From: leolambo Date: Mon, 4 May 2026 14:48:00 -0400 Subject: [PATCH 8/8] Use ndjson Content-Type when streaming jsonl Setting application/json on a newline-delimited stream tripped up JSON-aware HTTP clients (supertest's auto-parser among them) that tried to parse the whole body as a single JSON document. Switching to application/x-ndjson in jsonl mode keeps array responses on application/json while telling clients the right thing about the shape of the body. Updates the EVM memory-leak tests to drop the obsolete req/res params on streamWalletTransactions and pipe the returned stream through streamJsonArray, matching the rest of the migrated callers. --- packages/bitcore-node/src/routes/apiUtils.ts | 4 ++- .../integration/ethereum/memory-leaks.test.ts | 30 +++++-------------- 2 files changed, 11 insertions(+), 23 deletions(-) diff --git a/packages/bitcore-node/src/routes/apiUtils.ts b/packages/bitcore-node/src/routes/apiUtils.ts index d12a252d80..151c05481d 100644 --- a/packages/bitcore-node/src/routes/apiUtils.ts +++ b/packages/bitcore-node/src/routes/apiUtils.ts @@ -70,7 +70,9 @@ export function streamJsonArray( }; req.on('close', onAbort); - res.type('json'); + // ndjson in jsonl mode so JSON-aware clients (supertest, fetch().json()) don't try to + // parse a stream of newline-delimited objects as a single JSON document. + res.type(jsonl ? 'application/x-ndjson' : 'json'); res.on('close', onAbort); stream.on('error', (err: any) => { diff --git a/packages/bitcore-node/test/integration/ethereum/memory-leaks.test.ts b/packages/bitcore-node/test/integration/ethereum/memory-leaks.test.ts index c95960b562..8091609593 100644 --- a/packages/bitcore-node/test/integration/ethereum/memory-leaks.test.ts +++ b/packages/bitcore-node/test/integration/ethereum/memory-leaks.test.ts @@ -7,6 +7,7 @@ import { MongoBound } from '../../../src/models/base'; import { IWallet, WalletStorage } from '../../../src/models/wallet'; import { WalletAddressStorage } from '../../../src/models/walletAddress'; import { EVMTransactionStorage } from '../../../src/providers/chain-state/evm/models/transaction'; +import { streamJsonArray } from '../../../src/routes/apiUtils'; import { intAfterHelper, intBeforeHelper } from '../../helpers/integration'; const chain = 'ETH'; @@ -97,13 +98,8 @@ describe('EVM Memory Leak Prevention', function() { const { req, res, reqEmitter } = createMockReqRes(); const streamPromise = ETH.streamWalletTransactions({ - chain, - network, - wallet, - req, - res, - args: {} - }); + chain, network, wallet, args: {} + }).then(stream => streamJsonArray(stream, req, res)); // Wait for stream to start await new Promise(resolve => setTimeout(resolve, 100)); @@ -164,13 +160,8 @@ describe('EVM Memory Leak Prevention', function() { const { req, res, reqEmitter } = createMockReqRes(); const streamPromise = ETH.streamWalletTransactions({ - chain, - network, - wallet, - req, - res, - args: {} - }); + chain, network, wallet, args: {} + }).then(stream => streamJsonArray(stream, req, res)); await new Promise(resolve => setTimeout(resolve, 50)); reqEmitter.emit('close'); @@ -232,14 +223,9 @@ describe('EVM Memory Leak Prevention', function() { resEmitter.on('finish', resolve); resEmitter.on('error', reject); - ETH.streamWalletTransactions({ - chain, - network, - wallet, - req, - res, - args: {} - }).catch(reject); + ETH.streamWalletTransactions({ chain, network, wallet, args: {} }) + .then(stream => streamJsonArray(stream, req, res)) + .catch(reject); }); // Verify that we received some transactions (stream worked)