From 8af0f219f99d2101dbe54005494535900223f092 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Tue, 7 Apr 2026 09:47:10 +0300 Subject: [PATCH 1/2] ocean.js p2p --- .github/workflows/ci.yml | 37 ++++++++++++++++-- package-lock.json | 12 ++---- package.json | 2 +- src/commands.ts | 84 ++++++++++++++++++++++++---------------- src/helpers.ts | 3 ++ 5 files changed, 92 insertions(+), 46 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c17d008..24b515c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,6 +44,10 @@ jobs: test_system: runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + transport: [http, p2p] steps: - name: Checkout code @@ -60,8 +64,8 @@ jobs: cache-name: cache-node-modules with: path: ~/.npm - key: ${{ runner.os }}-test-integration-${{ env.cache-name }}-${{ hashFiles('**/package-lock.json') }} - restore-keys: ${{ runner.os }}-test-integration-${{ env.cache-name }}- + key: ${{ runner.os }}-test-${{ matrix.transport }}-${{ env.cache-name }}-${{ hashFiles('**/package-lock.json') }} + restore-keys: ${{ runner.os }}-test-${{ matrix.transport }}-${{ env.cache-name }}- - name: Set ADDRESS_FILE run: echo "ADDRESS_FILE=${HOME}/.ocean/ocean-contracts/artifacts/address.json" >> $GITHUB_ENV @@ -121,16 +125,41 @@ jobs: attempt=$((attempt + 1)) done + - name: Set NODE_URL for HTTP + if: matrix.transport == 'http' + run: echo "NODE_URL=http://127.0.0.1:8001" >> $GITHUB_ENV + + - name: Set NODE_URL for P2P + if: matrix.transport == 'p2p' + run: | + max_attempts=30 + attempt=1 + echo "Waiting for node HTTP API to be ready..." + while [ $attempt -le $max_attempts ]; do + PEER_ID=$(curl -s http://127.0.0.1:8001 | jq -r '.nodeId // empty' 2>/dev/null) + if [ -n "$PEER_ID" ]; then + echo "Got peer ID: $PEER_ID" + echo "NODE_URL=${PEER_ID}" >> $GITHUB_ENV + break + fi + echo "Attempt $attempt/$max_attempts: Node API not ready yet..." + if [ $attempt -eq $max_attempts ]; then + echo "Error: Could not get node peer ID" + exit 1 + fi + sleep 5 + attempt=$((attempt + 1)) + done + - name: docker logs run: docker logs ocean_ocean-contracts_1 && docker logs ocean_typesense_1 if: ${{ failure() }} - - name: Run system tests + - name: Run system tests (${{ matrix.transport }}) run: npm run test:system env: INDEXING_RETRY_INTERVAL: 4000 INDEXING_MAX_RETRIES: 120 - NODE_URL: 'http://127.0.0.1:8001' AVOID_LOOP_RUN: true - name: Print Ocean Node Logs if tests fail if: ${{ failure() }} diff --git a/package-lock.json b/package-lock.json index 9f4bbe5..d7d6652 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,7 +12,7 @@ "@oasisprotocol/sapphire-paratime": "^1.3.2", "@oceanprotocol/contracts": "^2.5.0", "@oceanprotocol/ddo-js": "^0.1.4", - "@oceanprotocol/lib": "^6.1.0", + "@oceanprotocol/lib": "^7.0.0-next.6", "commander": "^13.1.0", "cross-fetch": "^3.1.5", "crypto-js": "^4.1.1", @@ -3246,17 +3246,13 @@ } }, "node_modules/@oceanprotocol/lib": { - "version": "6.1.2", - "resolved": "https://registry.npmjs.org/@oceanprotocol/lib/-/lib-6.1.2.tgz", - "integrity": "sha512-zb7aeldnmbzS2qmjoSVROtRDujEyx8LbvW3PzZcRkDwKTpDM5w8XJdEDdV4v/JPdT5aIBj9PP3qjPPZLITSfsA==", + "version": "7.0.0-next.6", + "resolved": "https://registry.npmjs.org/@oceanprotocol/lib/-/lib-7.0.0-next.6.tgz", + "integrity": "sha512-K11YoKz3RItMjsHt54FDv0Zdyt31wEZHthZKOvghpgL0nYGEU+CC0+1IBBgrVMDuQhOxu/p0+vjf3Ie94f4PwQ==", "license": "Apache-2.0", "dependencies": { "@oasisprotocol/sapphire-paratime": "^1.3.2", - "@oceanprotocol/contracts": "^2.6.0", "@oceanprotocol/ddo-js": "^0.1.4", - "@rdfjs/dataset": "^2.0.2", - "@rdfjs/formats-common": "^3.1.0", - "@zazuko/env-node": "^2.1.4", "bignumber.js": "^9.3.1", "cross-fetch": "^4.0.0", "crypto-js": "^4.1.1", diff --git a/package.json b/package.json index 9848768..00f4134 100644 --- a/package.json +++ b/package.json @@ -47,7 +47,7 @@ "@oasisprotocol/sapphire-paratime": "^1.3.2", "@oceanprotocol/contracts": "^2.5.0", "@oceanprotocol/ddo-js": "^0.1.4", - "@oceanprotocol/lib": "^6.1.0", + "@oceanprotocol/lib": "^7.0.0-next.6", "commander": "^13.1.0", "cross-fetch": "^3.1.5", "crypto-js": "^4.1.1", diff --git a/src/commands.ts b/src/commands.ts index 9ab2fb8..d6e2d74 100644 --- a/src/commands.ts +++ b/src/commands.ts @@ -1,4 +1,5 @@ import fs from "fs"; +import path from "path"; import util from "util"; import { createAssetUtil, @@ -226,7 +227,7 @@ export class Commands { const orderTx = await tx.wait(); - const urlDownloadUrl = await ProviderInstance.getDownloadUrl( + const downloadResult = await ProviderInstance.getDownloadUrl( dataDdo.id, dataDdo.services[0].id, 0, @@ -235,9 +236,16 @@ export class Commands { this.signer ); try { - const path = args[2] ? args[2] : "."; - const { filename } = await downloadFile(urlDownloadUrl, path); - console.log("File downloaded successfully:", path + "/" + filename); + const destPath = args[2] ? args[2] : "."; + if (typeof downloadResult === "string") { + const { filename } = await downloadFile(downloadResult, destPath); + console.log("File downloaded successfully:", destPath + "/" + filename); + } else { + const filename = downloadResult.filename || "file.out"; + const filePath = path.join(destPath, filename); + fs.writeFileSync(filePath, Buffer.from(downloadResult.data)); + console.log("File downloaded successfully:", filePath); + } } catch (e) { console.log(`Download url dataset failed: ${e}`); } @@ -451,7 +459,7 @@ export class Commands { paymentToken, supportedMaxJobDuration, providerURI, - this.signer, // V1 was this.signer.getAddress() + await this.signer.getAddress(), parsedResources, Number(chainId) ); @@ -1046,23 +1054,24 @@ export class Commands { this.signer, jobId ); - console.log("response: ", logsResponse); if (!logsResponse) { console.error("Error fetching streamable logs. No logs available."); return; + } + + let text: string; + if (logsResponse[Symbol.asyncIterator]) { + const chunks: Uint8Array[] = []; + for await (const chunk of logsResponse) { + chunks.push(chunk); + } + text = Buffer.concat(chunks).toString("utf-8"); } else { - const stream = logsResponse as ReadableStream; - console.log("stream: ", stream); - const text = await new Response(stream).text(); - console.log("Streamable Logs: "); - console.log(text); - // for await (const value of stream) { - // // just print it to the console - // console.log(value); - // } - } - console.log("Exiting computeStreamableLogs: ", logsResponse); + text = await new Response(logsResponse).text(); + } + console.log("Streamable Logs:"); + console.log(text); } public async allowAlgo(args: string[]) { @@ -1238,7 +1247,7 @@ export class Commands { const jobStatus = (await ProviderInstance.computeStatus( this.oceanNodeUrl, - await this.signer.getAddress(), + this.signer, jobId, agreementId )) as ComputeJob; @@ -1246,24 +1255,24 @@ export class Commands { } public async downloadJobResults(args: string[]) { - const jobResult = await ProviderInstance.getComputeResultUrl( - this.oceanNodeUrl, - this.signer, - args[1], - parseInt(args[2]) - ); - console.log("jobResult ", jobResult); - try { - const path = args[3] ? args[3] : "."; - const { filename } = await downloadFile( - jobResult, - path, + const stream = await ProviderInstance.getComputeResult( + this.oceanNodeUrl, + this.signer, + args[1], parseInt(args[2]) ); - console.log("File downloaded successfully:", path + "/" + filename); + const chunks: Uint8Array[] = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + const destPath = args[3] || "."; + const filename = `file_${args[2]}.out`; + const filePath = path.join(destPath, filename); + fs.writeFileSync(filePath, Buffer.concat(chunks)); + console.log("File downloaded successfully:", filePath); } catch (e) { - console.log(`Download url dataset failed: ${e}`); + console.log(`Download compute result failed: ${e}`); } } @@ -1778,7 +1787,16 @@ export class Commands { maxLogs ); - const text = await new Response(response).text(); + let text: string; + if (response[Symbol.asyncIterator]) { + const chunks: Uint8Array[] = []; + for await (const chunk of response) { + chunks.push(chunk); + } + text = Buffer.concat(chunks).toString("utf-8"); + } else { + text = await new Response(response).text(); + } const outputPath = `${outputLocation}/logs.json`; fs.writeFileSync(outputPath, text); console.log(chalk.green(`Logs saved to ${outputPath}`)); diff --git a/src/helpers.ts b/src/helpers.ts index 17280f7..afe59d6 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -322,6 +322,9 @@ export async function getPublicIP(): Promise { export async function getMetadataURI() { const metadataURI = process.env.NODE_URL + if (!metadataURI.startsWith('http')) { + return metadataURI + } const parsed = new URL(metadataURI); let ip = metadataURI // by default // has port number? From 0e5df97421ce019bec41d1352af8988899d59ccb Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Tue, 7 Apr 2026 10:52:39 +0300 Subject: [PATCH 2/2] allow localhost connections --- src/cli.ts | 85 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 81 insertions(+), 4 deletions(-) diff --git a/src/cli.ts b/src/cli.ts index 04e2e0b..8c251e4 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -4,7 +4,7 @@ import { JsonRpcProvider, Signer, ethers } from "ethers"; import chalk from "chalk"; import { stdin as input, stdout } from "node:process"; import { createInterface } from "readline/promises"; -import { unitsToAmount } from "@oceanprotocol/lib"; +import { unitsToAmount, ProviderInstance, isP2pUri } from "@oceanprotocol/lib"; import { toBoolean } from "./helpers.js"; async function initializeSigner() { @@ -36,6 +36,74 @@ export async function createCLI() { process.exit(1); } + if (isP2pUri(process.env.NODE_URL)) { + const extra = process.env.BOOTSTRAP_PEERS?.split(",").filter(Boolean) || []; + + // Default Ocean bootstrap nodes (must be included explicitly since passing + // bootstrapPeers to setupP2P replaces the built-in defaults) + const oceanDefaults = [ + "/dns4/bootstrap1.oncompute.ai/tcp/9001/ws/p2p/16Uiu2HAmLhRDqfufZiQnxvQs2XHhd6hwkLSPfjAQg1gH8wgRixiP", + "/dns4/bootstrap2.oncompute.ai/tcp/9001/ws/p2p/16Uiu2HAmHwzeVw7RpGopjZe6qNBJbzDDBdqtrSk7Gcx1emYsfgL4", + "/dns4/bootstrap3.oncompute.ai/tcp/9001/ws/p2p/16Uiu2HAmBKSeEP3v4tYEPsZsZv9VELinyMCsrVTJW9BvQeFXx28U", + "/dns4/bootstrap4.oncompute.ai/tcp/9001/ws/p2p/16Uiu2HAmSTVTArioKm2wVcyeASHYEsnx2ZNq467Z4GMDU4ErEPom", + ]; + + const nodeUrl = process.env.NODE_URL; + const isFullMultiaddr = + nodeUrl.startsWith("/") && nodeUrl.includes("/p2p/"); + const localPeer = isFullMultiaddr + ? [nodeUrl] + : [`/ip4/127.0.0.1/tcp/9001/ws/p2p/${nodeUrl}`]; + const bootstrapPeers = [...localPeer, ...extra, ...oceanDefaults]; + console.log(chalk.cyan("P2P mode detected. Initializing libp2p...")); + console.log(chalk.cyan(`Bootstrap peers: ${bootstrapPeers.length}`)); + + for (const peer of localPeer) { + console.log(chalk.cyan(` Local: ${peer}`)); + } + // Allow localhost connections / local nodes + await ProviderInstance.setupP2P({ + bootstrapPeers, + libp2p: { + connectionGater: { + denyDialMultiaddr: () => false, + }, + }, + } as any); + console.log( + chalk.cyan("libp2p node started. Waiting for peer connections...") + ); + + // Wait for at least one active P2P connection before running commands + const maxWait = 20_000; + const interval = 500; + let waited = 0; + const libp2p = (ProviderInstance as any).p2pProvider?.libp2pNode; + while (waited < maxWait) { + const conns = libp2p?.getConnections()?.length ?? 0; + if (conns > 0) { + console.log( + chalk.green(`Connected to ${conns} peer(s) in ${waited}ms`) + ); + break; + } + await new Promise((r) => setTimeout(r, interval)); + waited += interval; + if (waited % 3000 === 0) { + console.log( + chalk.yellow(` Still waiting for peers... (${waited / 1000}s)`) + ); + } + } + if ((libp2p?.getConnections()?.length ?? 0) === 0) { + console.error( + chalk.red( + `No P2P peers connected after ${maxWait / 1000}s. Commands may fail.` + ) + ); + } + } + const program = new Command(); program @@ -707,12 +775,21 @@ export async function createCLI() { ) .argument("[from]", "Start time (epoch ms) to get logs from") .argument("[to]", "End time (epoch ms) to get logs to") - .argument("[maxLogs]", "Maximum number of logs to retrieve (default: 100, max: 1000)") + .argument( + "[maxLogs]", + "Maximum number of logs to retrieve (default: 100, max: 1000)" + ) .option("-o, --output ", "Output directory to save the logs") - .option("-l, --last [last]", "Period of time to get logs from now (in hours)") + .option( + "-l, --last [last]", + "Period of time to get logs from now (in hours)" + ) .option("-f, --from [from]", "Start time (epoch ms) to get logs from") .option("-t, --to [to]", "End time (epoch ms) to get logs to") - .option("-m, --maxLogs [maxLogs]", "Maximum number of logs to retrieve (default: 100, max: 1000)") + .option( + "-m, --maxLogs [maxLogs]", + "Maximum number of logs to retrieve (default: 100, max: 1000)" + ) .action(async (output, last, from, to, options) => { const { signer, chainId } = await initializeSigner(); const commands = new Commands(signer, chainId);