Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ jobs:

test_system:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
transport: [http, p2p]

steps:
- name: Checkout code
Expand All @@ -60,8 +64,8 @@ jobs:
cache-name: cache-node-modules
with:
path: ~/.npm
key: ${{ runner.os }}-test-integration-${{ env.cache-name }}-${{ hashFiles('**/package-lock.json') }}
restore-keys: ${{ runner.os }}-test-integration-${{ env.cache-name }}-
key: ${{ runner.os }}-test-${{ matrix.transport }}-${{ env.cache-name }}-${{ hashFiles('**/package-lock.json') }}
restore-keys: ${{ runner.os }}-test-${{ matrix.transport }}-${{ env.cache-name }}-

- name: Set ADDRESS_FILE
run: echo "ADDRESS_FILE=${HOME}/.ocean/ocean-contracts/artifacts/address.json" >> $GITHUB_ENV
Expand Down Expand Up @@ -121,16 +125,41 @@ jobs:
attempt=$((attempt + 1))
done

- name: Set NODE_URL for HTTP
if: matrix.transport == 'http'
run: echo "NODE_URL=http://127.0.0.1:8001" >> $GITHUB_ENV

- name: Set NODE_URL for P2P
if: matrix.transport == 'p2p'
run: |
max_attempts=30
attempt=1
echo "Waiting for node HTTP API to be ready..."
while [ $attempt -le $max_attempts ]; do
PEER_ID=$(curl -s http://127.0.0.1:8001 | jq -r '.nodeId // empty' 2>/dev/null)
if [ -n "$PEER_ID" ]; then
echo "Got peer ID: $PEER_ID"
echo "NODE_URL=${PEER_ID}" >> $GITHUB_ENV
break
fi
echo "Attempt $attempt/$max_attempts: Node API not ready yet..."
if [ $attempt -eq $max_attempts ]; then
echo "Error: Could not get node peer ID"
exit 1
fi
sleep 5
attempt=$((attempt + 1))
done

- name: docker logs
run: docker logs ocean_ocean-contracts_1 && docker logs ocean_typesense_1
if: ${{ failure() }}

- name: Run system tests
- name: Run system tests (${{ matrix.transport }})
run: npm run test:system
env:
INDEXING_RETRY_INTERVAL: 4000
INDEXING_MAX_RETRIES: 120
NODE_URL: 'http://127.0.0.1:8001'
AVOID_LOOP_RUN: true
- name: Print Ocean Node Logs if tests fail
if: ${{ failure() }}
Expand Down
12 changes: 4 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"@oasisprotocol/sapphire-paratime": "^1.3.2",
"@oceanprotocol/contracts": "^2.5.0",
"@oceanprotocol/ddo-js": "^0.1.4",
"@oceanprotocol/lib": "^6.1.0",
"@oceanprotocol/lib": "^7.0.0-next.6",
"commander": "^13.1.0",
"cross-fetch": "^3.1.5",
"crypto-js": "^4.1.1",
Expand Down
85 changes: 81 additions & 4 deletions src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import chalk from "chalk";
import { stdin as input, stdout } from "node:process";
import { createInterface } from "readline/promises";
import { unitsToAmount } from "@oceanprotocol/lib";
import { unitsToAmount, ProviderInstance, isP2pUri } from "@oceanprotocol/lib";
import { toBoolean } from "./helpers.js";

async function initializeSigner() {
Expand Down Expand Up @@ -36,6 +36,74 @@
process.exit(1);
}

if (isP2pUri(process.env.NODE_URL)) {
const extra = process.env.BOOTSTRAP_PEERS?.split(",").filter(Boolean) || [];

// Default Ocean bootstrap nodes (must be included explicitly since passing
// bootstrapPeers to setupP2P replaces the built-in defaults)
const oceanDefaults = [
"/dns4/bootstrap1.oncompute.ai/tcp/9001/ws/p2p/16Uiu2HAmLhRDqfufZiQnxvQs2XHhd6hwkLSPfjAQg1gH8wgRixiP",
"/dns4/bootstrap2.oncompute.ai/tcp/9001/ws/p2p/16Uiu2HAmHwzeVw7RpGopjZe6qNBJbzDDBdqtrSk7Gcx1emYsfgL4",
"/dns4/bootstrap3.oncompute.ai/tcp/9001/ws/p2p/16Uiu2HAmBKSeEP3v4tYEPsZsZv9VELinyMCsrVTJW9BvQeFXx28U",
"/dns4/bootstrap4.oncompute.ai/tcp/9001/ws/p2p/16Uiu2HAmSTVTArioKm2wVcyeASHYEsnx2ZNq467Z4GMDU4ErEPom",
];

const nodeUrl = process.env.NODE_URL;
const isFullMultiaddr =
nodeUrl.startsWith("/") && nodeUrl.includes("/p2p/");
const localPeer = isFullMultiaddr
? [nodeUrl]
: [`/ip4/127.0.0.1/tcp/9001/ws/p2p/${nodeUrl}`];
const bootstrapPeers = [...localPeer, ...extra, ...oceanDefaults];
console.log(chalk.cyan("P2P mode detected. Initializing libp2p..."));
console.log(chalk.cyan(`Bootstrap peers: ${bootstrapPeers.length}`));

for (const peer of localPeer) {
console.log(chalk.cyan(` Local: ${peer}`));
}
// Allow localhost connections / local nodes
await ProviderInstance.setupP2P({
bootstrapPeers,
libp2p: {
connectionGater: {
denyDialMultiaddr: () => false,
},
},
} as any);

Check warning on line 72 in src/cli.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
console.log(
chalk.cyan("libp2p node started. Waiting for peer connections...")
);

// Wait for at least one active P2P connection before running commands
const maxWait = 20_000;
const interval = 500;
let waited = 0;
const libp2p = (ProviderInstance as any).p2pProvider?.libp2pNode;

Check warning on line 81 in src/cli.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
while (waited < maxWait) {
const conns = libp2p?.getConnections()?.length ?? 0;
if (conns > 0) {
console.log(
chalk.green(`Connected to ${conns} peer(s) in ${waited}ms`)
);
break;
}
await new Promise((r) => setTimeout(r, interval));
waited += interval;
if (waited % 3000 === 0) {
console.log(
chalk.yellow(` Still waiting for peers... (${waited / 1000}s)`)
);
}
}
if ((libp2p?.getConnections()?.length ?? 0) === 0) {
console.error(
chalk.red(
`No P2P peers connected after ${maxWait / 1000}s. Commands may fail.`
)
);
}
}

const program = new Command();

program
Expand Down Expand Up @@ -707,12 +775,21 @@
)
.argument("[from]", "Start time (epoch ms) to get logs from")
.argument("[to]", "End time (epoch ms) to get logs to")
.argument("[maxLogs]", "Maximum number of logs to retrieve (default: 100, max: 1000)")
.argument(
"[maxLogs]",
"Maximum number of logs to retrieve (default: 100, max: 1000)"
)
.option("-o, --output <output>", "Output directory to save the logs")
.option("-l, --last [last]", "Period of time to get logs from now (in hours)")
.option(
"-l, --last [last]",
"Period of time to get logs from now (in hours)"
)
.option("-f, --from [from]", "Start time (epoch ms) to get logs from")
.option("-t, --to [to]", "End time (epoch ms) to get logs to")
.option("-m, --maxLogs [maxLogs]", "Maximum number of logs to retrieve (default: 100, max: 1000)")
.option(
"-m, --maxLogs [maxLogs]",
"Maximum number of logs to retrieve (default: 100, max: 1000)"
)
.action(async (output, last, from, to, options) => {
const { signer, chainId } = await initializeSigner();
const commands = new Commands(signer, chainId);
Expand Down
84 changes: 51 additions & 33 deletions src/commands.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import fs from "fs";
import path from "path";
import util from "util";
import {
createAssetUtil,
Expand Down Expand Up @@ -226,7 +227,7 @@

const orderTx = await tx.wait();

const urlDownloadUrl = await ProviderInstance.getDownloadUrl(
const downloadResult = await ProviderInstance.getDownloadUrl(
dataDdo.id,
dataDdo.services[0].id,
0,
Expand All @@ -235,9 +236,16 @@
this.signer
);
try {
const path = args[2] ? args[2] : ".";
const { filename } = await downloadFile(urlDownloadUrl, path);
console.log("File downloaded successfully:", path + "/" + filename);
const destPath = args[2] ? args[2] : ".";
if (typeof downloadResult === "string") {
const { filename } = await downloadFile(downloadResult, destPath);
console.log("File downloaded successfully:", destPath + "/" + filename);
} else {
const filename = downloadResult.filename || "file.out";
const filePath = path.join(destPath, filename);
fs.writeFileSync(filePath, Buffer.from(downloadResult.data));
console.log("File downloaded successfully:", filePath);
}
} catch (e) {
console.log(`Download url dataset failed: ${e}`);
}
Expand Down Expand Up @@ -451,7 +459,7 @@
paymentToken,
supportedMaxJobDuration,
providerURI,
this.signer, // V1 was this.signer.getAddress()
await this.signer.getAddress(),
parsedResources,
Number(chainId)
);
Expand Down Expand Up @@ -1046,23 +1054,24 @@
this.signer,
jobId
);
console.log("response: ", logsResponse);

if (!logsResponse) {
console.error("Error fetching streamable logs. No logs available.");
return;
}

let text: string;
if (logsResponse[Symbol.asyncIterator]) {
const chunks: Uint8Array[] = [];
for await (const chunk of logsResponse) {
chunks.push(chunk);
}
text = Buffer.concat(chunks).toString("utf-8");
} else {
const stream = logsResponse as ReadableStream;
console.log("stream: ", stream);
const text = await new Response(stream).text();
console.log("Streamable Logs: ");
console.log(text);
// for await (const value of stream) {
// // just print it to the console
// console.log(value);
// }
}
console.log("Exiting computeStreamableLogs: ", logsResponse);
text = await new Response(logsResponse).text();
}
console.log("Streamable Logs:");
console.log(text);
}

public async allowAlgo(args: string[]) {
Expand Down Expand Up @@ -1238,32 +1247,32 @@

const jobStatus = (await ProviderInstance.computeStatus(
this.oceanNodeUrl,
await this.signer.getAddress(),
this.signer,
jobId,
agreementId
)) as ComputeJob;
console.log(util.inspect(jobStatus, false, null, true));
}

public async downloadJobResults(args: string[]) {
const jobResult = await ProviderInstance.getComputeResultUrl(
this.oceanNodeUrl,
this.signer,
args[1],
parseInt(args[2])
);
console.log("jobResult ", jobResult);

try {
const path = args[3] ? args[3] : ".";
const { filename } = await downloadFile(
jobResult,
path,
const stream = await ProviderInstance.getComputeResult(
this.oceanNodeUrl,
this.signer,
args[1],
parseInt(args[2])
);
console.log("File downloaded successfully:", path + "/" + filename);
const chunks: Uint8Array[] = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
const destPath = args[3] || ".";
const filename = `file_${args[2]}.out`;
const filePath = path.join(destPath, filename);
fs.writeFileSync(filePath, Buffer.concat(chunks));
console.log("File downloaded successfully:", filePath);
} catch (e) {
console.log(`Download url dataset failed: ${e}`);
console.log(`Download compute result failed: ${e}`);
}
}

Expand Down Expand Up @@ -1698,7 +1707,7 @@
)
);
removedCount++;
} catch (e: any) {

Check warning on line 1710 in src/commands.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
console.log(
chalk.yellow(
`⚠ Could not remove token at index ${index} for user ${user}: ${e.message}`
Expand Down Expand Up @@ -1778,7 +1787,16 @@
maxLogs
);

const text = await new Response(response).text();
let text: string;
if (response[Symbol.asyncIterator]) {
const chunks: Uint8Array[] = [];
for await (const chunk of response) {
chunks.push(chunk);
}
text = Buffer.concat(chunks).toString("utf-8");
} else {
text = await new Response(response).text();
}
const outputPath = `${outputLocation}/logs.json`;
fs.writeFileSync(outputPath, text);
console.log(chalk.green(`Logs saved to ${outputPath}`));
Expand Down
3 changes: 3 additions & 0 deletions src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
} from "@oceanprotocol/lib";
import { homedir } from "os";

const ERC20Template = readFileSync('./node_modules/@oceanprotocol/contracts/artifacts/contracts/templates/ERC20Template.sol/ERC20Template.json', 'utf8') as any;

Check warning on line 28 in src/helpers.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type

export async function downloadFile(
url: string,
Expand Down Expand Up @@ -107,7 +107,7 @@
name: string,
symbol: string,
owner: Signer,
assetUrl: any,

Check warning on line 110 in src/helpers.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
ddo: DDO,
oceanNodeUrl: string,
config: Config,
Expand Down Expand Up @@ -154,7 +154,7 @@
oceanNodeUrl: string,
aquariusInstance: Aquarius,
encryptDDO: boolean = true
): Promise<any> {

Check warning on line 157 in src/helpers.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
const nft = new Nft(owner, Number((await owner.provider.getNetwork()).chainId));
let flags;
let metadata;
Expand Down Expand Up @@ -322,6 +322,9 @@

export async function getMetadataURI() {
const metadataURI = process.env.NODE_URL
if (!metadataURI.startsWith('http')) {
return metadataURI
}
const parsed = new URL(metadataURI);
let ip = metadataURI // by default
// has port number?
Expand Down Expand Up @@ -411,7 +414,7 @@
const addressFile = await fs.readFile(addressFilePath, 'utf8');

const data = JSON.parse(addressFile);
const chainConfig = Object.values(data).find((network: any) => network.chainId === chainId) as any;

Check warning on line 417 in src/helpers.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type

Check warning on line 417 in src/helpers.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type

if (!chainConfig) {
throw new Error(`Chain ${chainId} not found in address file`);
Expand Down
Loading