From 97c754e6cfdc33a4084d713e339725e9f75f03fd Mon Sep 17 00:00:00 2001 From: alexcos20 Date: Tue, 7 Apr 2026 13:56:22 +0300 Subject: [PATCH 1/8] persistentStorage --- src/@types/AccessList.ts | 6 + src/@types/C2D/C2D.ts | 3 +- src/@types/OceanNode.ts | 2 + src/@types/PersistentStorage.ts | 24 ++ src/@types/commands.ts | 44 +- src/OceanNode.ts | 16 + src/components/P2P/handleProtocolCommands.ts | 42 +- src/components/P2P/index.ts | 52 ++- src/components/core/compute/startCompute.ts | 38 +- .../core/handler/coreHandlersRegistry.ts | 27 ++ .../core/handler/persistentStorage.ts | 327 +++++++++++++++ src/components/httpRoutes/index.ts | 3 + .../httpRoutes/persistentStorage.ts | 179 ++++++++ src/components/httpRoutes/validateCommands.ts | 19 +- .../PersistentStorageFactory.ts | 209 ++++++++++ .../PersistentStorageLocalFS.ts | 162 ++++++++ .../persistentStorage/PersistentStorageS3.ts | 74 ++++ .../createPersistentStorage.ts | 23 ++ src/components/persistentStorage/index.ts | 4 + src/test/integration/accessLists.test.ts | 14 +- src/test/integration/compute.test.ts | 5 +- .../integration/persistentStorage.test.ts | 389 ++++++++++++++++++ src/test/utils/contracts.ts | 8 +- src/test/utils/utils.ts | 4 +- src/utils/accessList.ts | 50 ++- src/utils/address.ts | 2 +- src/utils/config/builder.ts | 3 +- src/utils/config/schemas.ts | 59 +++ src/utils/constants.ts | 14 +- src/utils/credentials.ts | 6 +- 30 files changed, 1740 insertions(+), 68 deletions(-) create mode 100644 src/@types/AccessList.ts create mode 100644 src/@types/PersistentStorage.ts create mode 100644 src/components/core/handler/persistentStorage.ts create mode 100644 src/components/httpRoutes/persistentStorage.ts create mode 100644 src/components/persistentStorage/PersistentStorageFactory.ts create mode 100644 src/components/persistentStorage/PersistentStorageLocalFS.ts create mode 100644 src/components/persistentStorage/PersistentStorageS3.ts create mode 100644 src/components/persistentStorage/createPersistentStorage.ts create mode 100644 src/components/persistentStorage/index.ts create mode 100644 src/test/integration/persistentStorage.test.ts diff --git a/src/@types/AccessList.ts b/src/@types/AccessList.ts new file mode 100644 index 000000000..242b991d1 --- /dev/null +++ b/src/@types/AccessList.ts @@ -0,0 +1,6 @@ +/** + * Mapping of `chainId` -> list of smart contract addresses on that chain. + */ +export interface AccessList { + [chainId: string]: string[] +} diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index 1252e7c99..4c4a67b42 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -1,5 +1,6 @@ import { MetadataAlgorithm, ConsumerParameter } from '@oceanprotocol/ddo-js' import type { BaseFileObject, StorageObject, EncryptMethod } from '../fileObject.js' +import type { AccessList } from '../AccessList.js' export enum C2DClusterType { // eslint-disable-next-line no-unused-vars OPF_K8 = 0, @@ -95,7 +96,7 @@ export interface RunningPlatform { export interface ComputeAccessList { addresses: string[] - accessLists: { [chainId: string]: string[] }[] | null + accessLists: AccessList[] | null } export interface ComputeEnvironmentFreeOptions { diff --git a/src/@types/OceanNode.ts b/src/@types/OceanNode.ts index d007a7f64..c2dff8f43 100644 --- a/src/@types/OceanNode.ts +++ b/src/@types/OceanNode.ts @@ -4,6 +4,7 @@ import { C2DClusterInfo, C2DDockerConfig } from './C2D/C2D' import { FeeStrategy } from './Fees' import { Schema } from '../components/database' import { KeyProviderType } from './KeyManager' +import type { PersistentStorageConfig } from './PersistentStorage.js' export interface OceanNodeDBConfig { url: string | null @@ -139,6 +140,7 @@ export interface OceanNodeConfig { jwtSecret?: string httpCertPath?: string httpKeyPath?: string + persistentStorage?: PersistentStorageConfig } export interface P2PStatusResponse { diff --git a/src/@types/PersistentStorage.ts b/src/@types/PersistentStorage.ts new file mode 100644 index 000000000..52b751691 --- /dev/null +++ b/src/@types/PersistentStorage.ts @@ -0,0 +1,24 @@ +import type { AccessList } from './AccessList' + +export type PersistentStorageType = 'localfs' | 's3' + +export interface PersistentStorageLocalFSOptions { + folder: string +} + +export interface PersistentStorageS3Options { + endpoint: string + region?: string + objectKey: string + accessKeyId: string + secretAccessKey: string + /** If true, use path-style addressing (e.g. endpoint/bucket/key). Required for some S3-compatible services (e.g. MinIO). Default false (virtual-host style, e.g. bucket.endpoint/key). */ + forcePathStyle?: boolean +} + +export interface PersistentStorageConfig { + enabled: boolean + type: PersistentStorageType + accessLists: AccessList[] + options: PersistentStorageLocalFSOptions | PersistentStorageS3Options +} diff --git a/src/@types/commands.ts b/src/@types/commands.ts index ba35502b9..58d81fbea 100644 --- a/src/@types/commands.ts +++ b/src/@types/commands.ts @@ -1,3 +1,4 @@ +import { Readable } from 'stream' import { ValidateParams } from '../components/httpRoutes/validateCommands.js' import { P2PCommandResponse } from './OceanNode' import { DDO } from '@oceanprotocol/ddo-js' @@ -8,12 +9,13 @@ import type { DBComputeJobMetadata } from './C2D/C2D.js' import { FileObjectType, StorageObject, EncryptMethod } from './fileObject' - +import type { AccessList } from './AccessList.js' export interface Command { command: string // command name node?: string // if not present it means current node authorization?: string caller?: string | string[] // added by our node for rate limiting + stream?: Readable | null // commands may have an extra stream, after body. IE: Encrypt file } export interface GetP2PPeerCommand extends Command { @@ -314,3 +316,43 @@ export interface GetJobsCommand extends Command { consumerAddrs?: string[] runningJobs?: boolean } + +export interface PersistentStorageCreateBucketCommand extends Command { + consumerAddress: string + signature: string + nonce: string + accessLists: AccessList[] +} + +export interface PersistentStorageGetBucketsCommand extends Command { + consumerAddress: string + signature: string + nonce: string + chainId: number + /** Ethereum address; only buckets with this stored owner are returned (then filtered by ACL). */ + owner: string +} + +export interface PersistentStorageListFilesCommand extends Command { + consumerAddress: string + signature: string + nonce: string + bucketId: string +} + +export interface PersistentStorageUploadFileCommand extends Command { + consumerAddress: string + signature: string + nonce: string + bucketId: string + fileName: string +} + +export interface PersistentStorageDeleteFileCommand extends Command { + consumerAddress: string + signature: string + nonce: string + chainId: number + bucketId: string + fileName: string +} diff --git a/src/OceanNode.ts b/src/OceanNode.ts index f32cf7d87..dfaf5d6a1 100644 --- a/src/OceanNode.ts +++ b/src/OceanNode.ts @@ -13,6 +13,8 @@ import { Auth } from './components/Auth/index.js' import { KeyManager } from './components/KeyManager/index.js' import { BlockchainRegistry } from './components/BlockchainRegistry/index.js' import { Blockchain } from './utils/blockchain.js' +import { createPersistentStorage } from './components/persistentStorage/createPersistentStorage.js' +import { PersistentStorageFactory } from './components/persistentStorage/PersistentStorageFactory.js' export interface RequestLimiter { requester: string | string[] // IP address or peer ID @@ -37,6 +39,7 @@ export class OceanNode { private remoteCaller: string | string[] private requestMap: Map private auth: Auth + private persistentStorage: PersistentStorageFactory // eslint-disable-next-line no-useless-constructor private constructor( @@ -73,6 +76,11 @@ export class OceanNode { this.config.claimDurationTimeout, this.blockchainRegistry ) + if (this.config.persistentStorage?.enabled) { + this.persistentStorage = createPersistentStorage(this) + } else { + this.persistentStorage = null + } } } @@ -181,6 +189,10 @@ export class OceanNode { return this.blockchainRegistry } + public getPersistentStorage(): PersistentStorageFactory | null { + return this.persistentStorage + } + /** * Get a Blockchain instance for the given chainId. * Delegates to BlockchainRegistry. @@ -200,6 +212,10 @@ export class OceanNode { } } + public getConfig(): OceanNodeConfig { + return this.config + } + /** * v3: Direct protocol command handler - no P2P, just call handler directly * Returns {status, stream} without buffering diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 0750dec33..f94f6a2f5 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -123,7 +123,39 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - P2P_LOGGER.logMessage('Performing P2P task: ' + JSON.stringify(task), true) + const taskRecord = task as unknown as Record + if (taskRecord.p2pStreamBody === true) { + delete taskRecord.p2pStreamBody + + // True streaming: expose an async Readable that reads LP frames lazily + // as the handler consumes it. Frames are terminated by an empty chunk. + taskRecord.stream = Readable.from( + (async function* () { + while (true) { + const frame = await lp.read({ signal: handshakeSignal() }) + const buf = Buffer.from( + (frame as unknown as { subarray: () => Uint8Array }).subarray() + ) + + if (buf.length === 0) { + break + } + + yield buf + } + })() + ) + } + + const logPayload = { ...taskRecord } + // Avoid JSON-stringifying the request stream itself. + if (logPayload.stream) { + logPayload.stream = '[request stream]' + } + if (Buffer.isBuffer(logPayload.rawData)) { + logPayload.rawData = `[${logPayload.rawData.length} bytes]` + } + P2P_LOGGER.logMessage('Performing P2P task: ' + JSON.stringify(logPayload), true) // Get and execute handler const handler: BaseHandler = this.getCoreHandlers().getHandler(task.command) @@ -152,11 +184,15 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect await stream.close() } catch (err) { P2P_LOGGER.logMessageWithEmoji( - 'handleProtocolCommands Error: ' + err.message, + 'handleProtocolCommands Error: ' + + (err instanceof Error ? err.message : String(err)), true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR ) - await sendErrorAndClose(500, err.message) + const httpStatus = + typeof (err as any)?.status === 'number' ? (err as any).status : 500 + const msg = err instanceof Error ? err.message : String(err) + await sendErrorAndClose(httpStatus, msg) } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 17519b5bf..844255bf5 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -33,7 +33,7 @@ import { } from '@libp2p/kad-dht' import { EVENTS, cidFromRawString } from '../../utils/index.js' -import { Transform } from 'stream' +import { Transform, Readable } from 'stream' import { Database } from '../database' import { OceanNodeConfig, @@ -70,6 +70,35 @@ type DDOCache = { let index = 0 +/** Optional request payload sent as LP frames after the command JSON; ends with an empty LP frame. */ +export type P2PRequestBodyStream = AsyncIterable | Readable + +function toUint8ArrayChunk(chunk: unknown): Uint8Array { + if (chunk instanceof Uint8Array) return chunk + if (Buffer.isBuffer(chunk)) return new Uint8Array(chunk) + if (typeof chunk === 'string') return uint8ArrayFromString(chunk) + if ( + chunk && + typeof chunk === 'object' && + ArrayBuffer.isView(chunk as ArrayBufferView) + ) { + const v = chunk as ArrayBufferView + return new Uint8Array(v.buffer, v.byteOffset, v.byteLength) + } + throw new Error('Unsupported chunk type for P2P request body') +} + +async function writeP2pRequestBodyLp( + lp: LengthPrefixedStream, + body: P2PRequestBodyStream, + signal: AbortSignal +): Promise { + for await (const chunk of body as AsyncIterable) { + await lp.write(toUint8ArrayChunk(chunk), { signal }) + } + await lp.write(new Uint8Array(0), { signal }) +} + export class OceanP2P extends EventEmitter { _libp2p: Libp2p _topic: string @@ -725,9 +754,19 @@ export class OceanP2P extends EventEmitter { async send( lp: LengthPrefixedStream, message: string, - options: { signal: AbortSignal } + options: { signal: AbortSignal }, + requestBody?: P2PRequestBodyStream ) { - await lp.write(uint8ArrayFromString(message), { signal: options.signal }) + let outbound = message + if (requestBody) { + const cmd = JSON.parse(message) as Record + cmd.p2pStreamBody = true + outbound = JSON.stringify(cmd) + } + await lp.write(uint8ArrayFromString(outbound), { signal: options.signal }) + if (requestBody) { + await writeP2pRequestBodyLp(lp, requestBody, options.signal) + } const statusBytes = await lp.read({ signal: options.signal }) return { status: JSON.parse(uint8ArrayToString(statusBytes.subarray())), @@ -747,7 +786,8 @@ export class OceanP2P extends EventEmitter { async sendTo( peerName: string, message: string, - multiAddrs?: string[] + multiAddrs?: string[], + requestBody?: P2PRequestBodyStream ): Promise<{ status: any; stream?: AsyncIterable }> { const options = { signal: AbortSignal.timeout(10_000), @@ -799,7 +839,7 @@ export class OceanP2P extends EventEmitter { let streamErr: Error | null = null try { - return await this.send(lpStream(stream), message, options) + return await this.send(lpStream(stream), message, options, requestBody) } catch (err) { try { stream.abort(err as Error) @@ -823,7 +863,7 @@ export class OceanP2P extends EventEmitter { stream = await connection.newStream(this._protocol, options) try { - return await this.send(lpStream(stream), message, options) + return await this.send(lpStream(stream), message, options, requestBody) } catch (retryErr) { try { stream.abort(retryErr as Error) diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index e43c668fa..0d42608d6 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -1025,41 +1025,5 @@ async function validateAccess( if (access.addresses.includes(consumerAddress)) { return true } - - const config = await getConfiguration() - const { supportedNetworks } = config - for (const accessListMap of access.accessLists) { - if (!accessListMap) continue - for (const chain of Object.keys(accessListMap)) { - const { chainId } = supportedNetworks[chain] - try { - const blockchain = oceanNode.getBlockchain(chainId) - if (!blockchain) { - CORE_LOGGER.logMessage( - `Blockchain instance not available for chain ${chainId}, skipping access list check`, - true - ) - continue - } - const signer = await blockchain.getSigner() - for (const accessListAddress of accessListMap[chain]) { - const hasAccess = await checkAddressOnAccessList( - accessListAddress, - consumerAddress, - signer - ) - if (hasAccess) { - return true - } - } - } catch (error) { - CORE_LOGGER.logMessage( - `Failed to check access lists on chain ${chain}: ${error.message}`, - true - ) - } - } - } - - return false + return await checkAddressOnAccessList(consumerAddress, access.accessLists, oceanNode) } diff --git a/src/components/core/handler/coreHandlersRegistry.ts b/src/components/core/handler/coreHandlersRegistry.ts index de3464580..985282192 100644 --- a/src/components/core/handler/coreHandlersRegistry.ts +++ b/src/components/core/handler/coreHandlersRegistry.ts @@ -47,6 +47,13 @@ import { } from './p2p.js' import { CreateAuthTokenHandler, InvalidateAuthTokenHandler } from './authHandler.js' import { GetJobsHandler } from './getJobs.js' +import { + PersistentStorageCreateBucketHandler, + PersistentStorageDeleteFileHandler, + PersistentStorageGetBucketsHandler, + PersistentStorageListFilesHandler, + PersistentStorageUploadFileHandler +} from './persistentStorage.js' export type HandlerRegistry = { handlerName: string // name of the handler @@ -167,6 +174,26 @@ export class CoreHandlersRegistry { this.registerCoreHandler(PROTOCOL_COMMANDS.PUSH_CONFIG, new PushConfigHandler(node)) this.registerCoreHandler(PROTOCOL_COMMANDS.GET_LOGS, new GetLogsHandler(node)) this.registerCoreHandler(PROTOCOL_COMMANDS.JOBS, new GetJobsHandler(node)) + this.registerCoreHandler( + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + new PersistentStorageCreateBucketHandler(node) + ) + this.registerCoreHandler( + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS, + new PersistentStorageGetBucketsHandler(node) + ) + this.registerCoreHandler( + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, + new PersistentStorageListFilesHandler(node) + ) + this.registerCoreHandler( + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, + new PersistentStorageUploadFileHandler(node) + ) + this.registerCoreHandler( + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE, + new PersistentStorageDeleteFileHandler(node) + ) } public static getInstance( diff --git a/src/components/core/handler/persistentStorage.ts b/src/components/core/handler/persistentStorage.ts new file mode 100644 index 000000000..b94ca529a --- /dev/null +++ b/src/components/core/handler/persistentStorage.ts @@ -0,0 +1,327 @@ +import { Readable } from 'stream' +import type { + PersistentStorageCreateBucketCommand, + PersistentStorageDeleteFileCommand, + PersistentStorageGetBucketsCommand, + PersistentStorageListFilesCommand, + PersistentStorageUploadFileCommand +} from '../../../@types/commands.js' +import { + PersistentStorageAccessDeniedError, + type PersistentStorageFactory +} from '../../persistentStorage/PersistentStorageFactory.js' +import type { P2PCommandResponse } from '../../../@types/OceanNode.js' +import { getAddress } from 'ethers' +import { checkAddressOnAccessList } from '../../../utils/accessList.js' + +import { CORE_LOGGER } from '../../../utils/logging/common.js' +import { + buildInvalidRequestMessage, + validateCommandParameters, + type ValidateParams +} from '../../httpRoutes/validateCommands.js' +import { CommandHandler } from './handler.js' + +function requirePersistentStorage( + handler: CommandHandler +): Promise { + const node = handler.getOceanNode() as any + if (!node.getPersistentStorage) { + throw new Error('Persistent storage is not available on this node') + } + const storage = node.getPersistentStorage() + if (!storage) { + throw new Error('Persistent storage is not configured or disabled') + } + return storage +} + +export class PersistentStorageCreateBucketHandler extends CommandHandler { + validate(command: PersistentStorageCreateBucketCommand): ValidateParams { + const base = validateCommandParameters(command, [ + 'consumerAddress', + 'signature', + 'nonce', + 'accessLists' + ]) + if (!base.valid) return base + if (!Array.isArray(command.accessLists)) { + return buildInvalidRequestMessage( + 'Invalid parameter: "accessLists" must be an array of objects' + ) + } + return { valid: true } + } + + async handle(task: PersistentStorageCreateBucketCommand): Promise { + const validationResponse = await this.verifyParamsAndRateLimits(task) + if (this.shouldDenyTaskHandling(validationResponse)) return validationResponse + + const isAuthRequestValid = await this.validateTokenOrSignature( + task.authorization, + task.consumerAddress, + task.nonce, + task.signature, + task.command + ) + if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid + + try { + const storage = await requirePersistentStorage(this) + const node = this.getOceanNode() + const config = node.getConfig() + const isAllowedCreate = await checkAddressOnAccessList( + task.consumerAddress, + config.persistentStorage?.accessLists, + node + ) + if (!isAllowedCreate) { + return { + stream: null, + status: { httpStatus: 403, error: 'You are not allowed to create new buckets' } + } + } + + let ownerNormalized: string + try { + ownerNormalized = getAddress(task.consumerAddress) + } catch { + return { + stream: null, + status: { httpStatus: 400, error: 'Invalid parameter: "consumerAddress"' } + } + } + + const result = await storage.createNewBucket(task.accessLists, ownerNormalized) + return { + stream: Readable.from(JSON.stringify(result)), + status: { httpStatus: 200, error: null } + } + } catch (e) { + const message = e instanceof Error ? e.message : String(e) + CORE_LOGGER.error(`PersistentStorageCreateBucketHandler error: ${message}`) + return { stream: null, status: { httpStatus: 500, error: message } } + } + } +} + +export class PersistentStorageGetBucketsHandler extends CommandHandler { + validate(command: PersistentStorageGetBucketsCommand): ValidateParams { + const base = validateCommandParameters(command, [ + 'consumerAddress', + 'signature', + 'nonce', + 'chainId', + 'owner' + ]) + if (!base.valid) return base + if (typeof command.chainId !== 'number') { + return buildInvalidRequestMessage('Invalid parameter: "chainId" must be a number') + } + if (!command.owner || typeof command.owner !== 'string') { + return buildInvalidRequestMessage( + 'Invalid parameter: "owner" must be a non-empty string' + ) + } + return { valid: true } + } + + async handle(task: PersistentStorageGetBucketsCommand): Promise { + const validationResponse = await this.verifyParamsAndRateLimits(task) + if (this.shouldDenyTaskHandling(validationResponse)) return validationResponse + + const isAuthRequestValid = await this.validateTokenOrSignature( + task.authorization, + task.consumerAddress, + task.nonce, + task.signature, + task.command + ) + if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid + + let ownerNormalized: string + // let consumerNormalized: string + try { + ownerNormalized = getAddress(task.owner) + // consumerNormalized = getAddress(task.consumerAddress) + } catch { + return { + stream: null, + status: { + httpStatus: 400, + error: 'Invalid parameter: "owner" or "consumerAddress"' + } + } + } + + try { + const storage = await requirePersistentStorage(this) + // const node = this.getOceanNode() + const rows = await storage.listBuckets(ownerNormalized) + + return { + stream: Readable.from(JSON.stringify(rows)), + status: { httpStatus: 200, error: null } + } + } catch (e) { + const message = e instanceof Error ? e.message : String(e) + CORE_LOGGER.error(`PersistentStorageGetBucketsHandler error: ${message}`) + return { stream: null, status: { httpStatus: 500, error: message } } + } + } +} + +export class PersistentStorageListFilesHandler extends CommandHandler { + validate(command: PersistentStorageListFilesCommand): ValidateParams { + const base = validateCommandParameters(command, [ + 'consumerAddress', + 'signature', + 'nonce', + 'bucketId' + ]) + if (!base.valid) return base + if (!command.bucketId || typeof command.bucketId !== 'string') { + return buildInvalidRequestMessage('Invalid parameter: "bucketId" must be a string') + } + return { valid: true } + } + + async handle(task: PersistentStorageListFilesCommand): Promise { + const validationResponse = await this.verifyParamsAndRateLimits(task) + if (this.shouldDenyTaskHandling(validationResponse)) return validationResponse + + const isAuthRequestValid = await this.validateTokenOrSignature( + task.authorization, + task.consumerAddress, + task.nonce, + task.signature, + task.command + ) + if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid + + try { + const storage = await requirePersistentStorage(this) + const result = await storage.listFiles(task.bucketId, task.consumerAddress) + return { + stream: Readable.from(JSON.stringify(result)), + status: { httpStatus: 200, error: null } + } + } catch (e) { + if (e instanceof PersistentStorageAccessDeniedError) { + return { + stream: null, + status: { httpStatus: 403, error: e.message } + } + } + const message = e instanceof Error ? e.message : String(e) + CORE_LOGGER.error(`PersistentStorageListFilesHandler error: ${message}`) + return { stream: null, status: { httpStatus: 500, error: message } } + } + } +} + +export class PersistentStorageUploadFileHandler extends CommandHandler { + validate(command: PersistentStorageUploadFileCommand): ValidateParams { + const base = validateCommandParameters(command, [ + 'consumerAddress', + 'signature', + 'nonce', + 'bucketId', + 'fileName' + ]) + if (!base.valid) return base + return { valid: true } + } + + async handle(task: PersistentStorageUploadFileCommand): Promise { + const validationResponse = await this.verifyParamsAndRateLimits(task) + if (this.shouldDenyTaskHandling(validationResponse)) return validationResponse + + const isAuthRequestValid = await this.validateTokenOrSignature( + task.authorization, + task.consumerAddress, + task.nonce, + task.signature, + task.command + ) + if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid + + try { + const storage = await requirePersistentStorage(this) + if (!task.stream) { + return { + stream: null, + status: { httpStatus: 403, error: 'Upload stream error' } + } + } + const result = await storage.uploadFile( + task.bucketId, + task.fileName, + task.stream, + task.consumerAddress + ) + return { + stream: Readable.from(JSON.stringify(result)), + status: { httpStatus: 200, error: null } + } + } catch (e) { + if (e instanceof PersistentStorageAccessDeniedError) { + return { + stream: null, + status: { httpStatus: 403, error: e.message } + } + } + const message = e instanceof Error ? e.message : String(e) + CORE_LOGGER.error(`PersistentStorageUploadFileHandler error: ${message}`) + return { stream: null, status: { httpStatus: 500, error: message } } + } + } +} + +export class PersistentStorageDeleteFileHandler extends CommandHandler { + validate(command: PersistentStorageDeleteFileCommand): ValidateParams { + const base = validateCommandParameters(command, [ + 'consumerAddress', + 'signature', + 'nonce', + 'chainId', + 'bucketId', + 'fileName' + ]) + if (!base.valid) return base + return { valid: true } + } + + async handle(task: PersistentStorageDeleteFileCommand): Promise { + const validationResponse = await this.verifyParamsAndRateLimits(task) + if (this.shouldDenyTaskHandling(validationResponse)) return validationResponse + + const isAuthRequestValid = await this.validateTokenOrSignature( + task.authorization, + task.consumerAddress, + task.nonce, + task.signature, + task.command + ) + if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid + + try { + const storage = await requirePersistentStorage(this) + await storage.deleteFile(task.bucketId, task.fileName, task.consumerAddress) + return { + stream: Readable.from(JSON.stringify({ success: true })), + status: { httpStatus: 200, error: null } + } + } catch (e) { + if (e instanceof PersistentStorageAccessDeniedError) { + return { + stream: null, + status: { httpStatus: 403, error: e.message } + } + } + const message = e instanceof Error ? e.message : String(e) + CORE_LOGGER.error(`PersistentStorageDeleteFileHandler error: ${message}`) + return { stream: null, status: { httpStatus: 500, error: message } } + } + } +} diff --git a/src/components/httpRoutes/index.ts b/src/components/httpRoutes/index.ts index cf5530c5f..184608f80 100644 --- a/src/components/httpRoutes/index.ts +++ b/src/components/httpRoutes/index.ts @@ -15,6 +15,7 @@ import { addMapping, allRoutesMapping, findPathName } from './routeUtils.js' import { PolicyServerPassthroughRoute } from './policyServer.js' import { authRoutes } from './auth.js' import { adminConfigRoutes } from './adminConfig.js' +import { persistentStorageRoutes } from './persistentStorage.js' export * from './getOceanPeers.js' export * from './auth.js' @@ -62,6 +63,8 @@ httpRoutes.use(PolicyServerPassthroughRoute) httpRoutes.use(authRoutes) // admin config routes httpRoutes.use(adminConfigRoutes) +// persistent storage routes +httpRoutes.use(persistentStorageRoutes) export function getAllServiceEndpoints() { httpRoutes.stack.forEach(addMapping.bind(null, [])) diff --git a/src/components/httpRoutes/persistentStorage.ts b/src/components/httpRoutes/persistentStorage.ts new file mode 100644 index 000000000..db7e977d9 --- /dev/null +++ b/src/components/httpRoutes/persistentStorage.ts @@ -0,0 +1,179 @@ +import express from 'express' +import { Readable } from 'stream' + +import { SERVICES_API_BASE_PATH, PROTOCOL_COMMANDS } from '../../utils/constants.js' +import { HTTP_LOGGER } from '../../utils/logging/common.js' +import { streamToObject, streamToString } from '../../utils/util.js' + +import { + PersistentStorageCreateBucketHandler, + PersistentStorageDeleteFileHandler, + PersistentStorageGetBucketsHandler, + PersistentStorageListFilesHandler, + PersistentStorageUploadFileHandler +} from '../core/handler/persistentStorage.js' + +export const persistentStorageRoutes = express.Router() + +function readRawBody(req: any): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = [] + req.on('data', (chunk: any) => chunks.push(Buffer.from(chunk))) + req.on('end', () => resolve(Buffer.concat(chunks))) + req.on('error', reject) + }) +} + +// Create bucket +persistentStorageRoutes.post( + `${SERVICES_API_BASE_PATH}/persistentStorage/buckets`, + express.json(), + async (req, res) => { + try { + const response = await new PersistentStorageCreateBucketHandler( + req.oceanNode + ).handle({ + ...req.body, + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + authorization: req.headers?.authorization, + caller: req.caller + }) + if (!response.stream) { + res.status(response.status.httpStatus).send(response.status.error) + return + } + const payload = await streamToObject(response.stream as Readable) + res.status(200).json(payload) + } catch (error) { + HTTP_LOGGER.error(`PersistentStorage create bucket error: ${error}`) + res.status(500).send('Internal Server Error') + } + } +) + +// List buckets for an owner (then filtered by ACL in handler) +persistentStorageRoutes.get( + `${SERVICES_API_BASE_PATH}/persistentStorage/buckets`, + async (req, res) => { + try { + const response = await new PersistentStorageGetBucketsHandler(req.oceanNode).handle( + { + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS, + consumerAddress: req.query.consumerAddress as string, + signature: req.query.signature as string, + nonce: req.query.nonce as string, + chainId: parseInt(req.query.chainId as string) || null, + owner: req.query.owner as string, + authorization: req.headers?.authorization, + caller: req.caller + } as any + ) + if (!response.stream) { + res.status(response.status.httpStatus).send(response.status.error) + return + } + const payload = await streamToObject(response.stream as Readable) + res.status(200).json(payload) + } catch (error) { + HTTP_LOGGER.error(`PersistentStorage get buckets error: ${error}`) + res.status(500).send('Internal Server Error') + } + } +) + +// List files in bucket +persistentStorageRoutes.get( + `${SERVICES_API_BASE_PATH}/persistentStorage/buckets/:bucketId/files`, + async (req, res) => { + try { + const response = await new PersistentStorageListFilesHandler(req.oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, + consumerAddress: req.query.consumerAddress as string, + signature: req.query.signature as string, + nonce: req.query.nonce as string, + bucketId: req.params.bucketId, + authorization: req.headers?.authorization, + caller: req.caller + } as any) + if (!response.stream) { + res.status(response.status.httpStatus).send(response.status.error) + return + } + const payload = await streamToObject(response.stream as Readable) + res.status(200).json(payload) + } catch (error) { + HTTP_LOGGER.error(`PersistentStorage list files error: ${error}`) + res.status(500).send('Internal Server Error') + } + } +) + +// Upload file to bucket. Body is treated as raw bytes. +persistentStorageRoutes.post( + `${SERVICES_API_BASE_PATH}/persistentStorage/buckets/:bucketId/files/:fileName`, + async (req, res) => { + try { + const raw = await readRawBody(req) + const response = await new PersistentStorageUploadFileHandler(req.oceanNode).handle( + { + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, + consumerAddress: req.query.consumerAddress as string, + signature: req.query.signature as string, + nonce: req.query.nonce as string, + bucketId: req.params.bucketId, + fileName: req.params.fileName, + stream: Readable.from(raw), + authorization: req.headers?.authorization, + caller: req.caller + } as any + ) + if (!response.stream) { + res.status(response.status.httpStatus).send(response.status.error) + return + } + const payload = await streamToObject(response.stream as Readable) + res.status(200).json(payload) + } catch (error) { + HTTP_LOGGER.error(`PersistentStorage upload error: ${error}`) + res.status(500).send('Internal Server Error') + } + } +) + +// Delete file from bucket +persistentStorageRoutes.delete( + `${SERVICES_API_BASE_PATH}/persistentStorage/buckets/:bucketId/files/:fileName`, + async (req, res) => { + try { + const response = await new PersistentStorageDeleteFileHandler(req.oceanNode).handle( + { + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE, + consumerAddress: req.query.consumerAddress as string, + signature: req.query.signature as string, + nonce: req.query.nonce as string, + chainId: parseInt(req.query.chainId as string) || null, + bucketId: req.params.bucketId, + fileName: req.params.fileName, + authorization: req.headers?.authorization, + caller: req.caller + } as any + ) + + if (response.status.httpStatus !== 200) { + res.status(response.status.httpStatus).send(response.status.error) + return + } + + if (!response.stream) { + res.status(200).json({ success: true }) + return + } + + const payload = JSON.parse(await streamToString(response.stream as Readable)) + res.status(200).json(payload) + } catch (error) { + HTTP_LOGGER.error(`PersistentStorage delete error: ${error}`) + res.status(500).send('Internal Server Error') + } + } +) diff --git a/src/components/httpRoutes/validateCommands.ts b/src/components/httpRoutes/validateCommands.ts index 49850f496..f8bb365cb 100644 --- a/src/components/httpRoutes/validateCommands.ts +++ b/src/components/httpRoutes/validateCommands.ts @@ -32,8 +32,23 @@ export function validateCommandParameters( return buildInvalidRequestMessage(`Invalid or unrecognized command: "${commandStr}"`) } - // deep copy - const logCommandData = structuredClone(commandData) + // deep copy for logging (must not throw for non-cloneable payloads like streams) + let logCommandData: any + try { + // For some commands, the task contains non-cloneable fields (e.g. Node streams). + // We redact those before cloning to avoid DataCloneError. + const sanitized = { ...(commandData ?? {}) } + if ('stream' in sanitized) { + sanitized.stream = '[STREAM]' + } + logCommandData = structuredClone(sanitized) + } catch { + // Last resort: shallow clone; avoid crashing validation because of logging. + logCommandData = { ...(commandData ?? {}) } + if ('stream' in logCommandData) { + logCommandData.stream = '[STREAM]' + } + } if (commandStr === PROTOCOL_COMMANDS.ENCRYPT) { logCommandData.files = [] // hide files data (sensitive) + rawData (long buffer) from logging diff --git a/src/components/persistentStorage/PersistentStorageFactory.ts b/src/components/persistentStorage/PersistentStorageFactory.ts new file mode 100644 index 000000000..8d88f13fb --- /dev/null +++ b/src/components/persistentStorage/PersistentStorageFactory.ts @@ -0,0 +1,209 @@ +import type { AccessList } from '../../@types/AccessList.js' +import type { BaseFileObject } from '../../@types/fileObject.js' +import sqlite3, { RunResult } from 'sqlite3' +import path from 'path' +import fs from 'fs' +import { OceanNode } from '../../OceanNode.js' +import { checkAddressOnAccessList } from '../../utils/accessList.js' + +export class PersistentStorageAccessDeniedError extends Error { + constructor(message = 'You are not allowed to access this bucket') { + super(message) + this.name = 'PersistentStorageAccessDeniedError' + } +} + +function parseBucketAccessListsJson(accessListJson: string): AccessList[] { + try { + const parsed = JSON.parse(accessListJson || '[]') + return Array.isArray(parsed) ? (parsed as AccessList[]) : [] + } catch { + return [] + } +} + +export type BucketRow = { + bucketId: string + owner: string + accessListJson: string + createdAt: number +} + +export interface PersistentStorageFileInfo { + bucketId: string + name: string + size: number + lastModified: number +} + +export type CreateBucketResult = { + bucketId: string + owner: string + accessList: AccessList[] +} + +/** Bucket metadata from registry (list APIs and internal filtering). */ +export type PersistentStorageBucketRecord = { + bucketId: string + owner: string + createdAt: number + accessLists: AccessList[] +} + +export abstract class PersistentStorageFactory { + private db: sqlite3.Database + private node: OceanNode + + constructor(node: OceanNode) { + this.node = node + const dbDir = path.dirname('databases/') + if (!fs.existsSync(dbDir)) { + fs.mkdirSync(dbDir, { recursive: true }) + } + this.db = new sqlite3.Database(dbDir + 'persistentStorage.sqlite') + } + + public abstract createNewBucket( + accessList: AccessList[], + owner: string + ): Promise + + public abstract listFiles( + bucketId: string, + consumerAddress: string + ): Promise + + public abstract uploadFile( + bucketId: string, + fileName: string, + content: NodeJS.ReadableStream, + consumerAddress: string + ): Promise + + public abstract deleteFile( + bucketId: string, + fileName: string, + consumerAddress: string + ): Promise + + /** + * Returns a file object that can be attached to compute jobs. + * The concrete shape depends on the backend implementation. + */ + public abstract getFileObject( + bucketId: string, + fileName: string, + consumerAddress: string + ): Promise + + // common functions + async getBucketAccessList(bucketId: string): Promise { + await this.dbCreateTables() + try { + const row = await this.dbGetBucket(bucketId) + if (!row) { + return [] + } + return parseBucketAccessListsJson(row.accessListJson) + } catch { + return [] + } + } + + /** + * Lists buckets for a given owner from the SQLite registry (metadata only). + * `owner` must already be normalized (e.g. checksummed `getAddress`). + * Backends that need setup (e.g. localfs init) should override and call `super.listBuckets(owner)`. + */ + async listBuckets(owner: string): Promise { + await this.dbCreateTables() + const rows = await this.dbListBucketsByOwner(owner) + return rows.map((row) => ({ + bucketId: row.bucketId, + owner: row.owner, + createdAt: row.createdAt, + accessLists: parseBucketAccessListsJson(row.accessListJson) + })) + } + + dbCreateTables(): Promise { + const createBucketsSQL = ` + CREATE TABLE IF NOT EXISTS persistent_storage_buckets ( + bucketId TEXT PRIMARY KEY, + owner TEXT NOT NULL, + accessListJson TEXT NOT NULL, + createdAt INTEGER NOT NULL + ); + ` + return new Promise((resolve, reject) => { + this.db.run(createBucketsSQL, (err) => { + if (err) reject(err) + else resolve() + }) + }) + } + + dbUpsertBucket( + bucketId: string, + owner: string, + accessListJson: string, + createdAt: number + ): Promise { + const sql = ` + INSERT INTO persistent_storage_buckets (bucketId, owner, accessListJson, createdAt) + VALUES (?, ?, ?, ?) + ON CONFLICT(bucketId) DO UPDATE SET accessListJson=excluded.accessListJson; + ` + return new Promise((resolve, reject) => { + this.db.run(sql, [bucketId, owner, accessListJson, createdAt], (err) => { + if (err) reject(err) + else resolve() + }) + }) + } + + dbGetBucket(bucketId: string): Promise { + const sql = `SELECT bucketId, owner, accessListJson, createdAt FROM persistent_storage_buckets WHERE bucketId = ?` + return new Promise((resolve, reject) => { + this.db.get(sql, [bucketId], (err, row: BucketRow | undefined) => { + if (err) reject(err) + else resolve(row ?? null) + }) + }) + } + + dbListBucketsByOwner(owner: string): Promise { + const sql = `SELECT bucketId, owner, accessListJson, createdAt FROM persistent_storage_buckets WHERE owner = ? ORDER BY createdAt ASC` + return new Promise((resolve, reject) => { + this.db.all(sql, [owner], (err, rows: BucketRow[]) => { + if (err) reject(err) + else resolve(rows ?? []) + }) + }) + } + + dbDeleteBucket(bucketId: string): Promise { + const sql = `DELETE FROM persistent_storage_buckets WHERE bucketId = ?` + return new Promise((resolve, reject) => { + this.db.run(sql, [bucketId], function (this: RunResult, err) { + if (err) reject(err) + else resolve(this.changes === 1) + }) + }) + } + + isAllowed(consumerAddress: string, accessLists: AccessList[]): Promise { + return checkAddressOnAccessList(consumerAddress, accessLists, this.node) + } + + /** Throws {@link PersistentStorageAccessDeniedError} if the consumer is not on the bucket access list. */ + protected async assertConsumerAllowedForBucket( + consumerAddress: string, + bucketId: string + ): Promise { + const accessLists = await this.getBucketAccessList(bucketId) + if (!(await this.isAllowed(consumerAddress, accessLists))) { + throw new PersistentStorageAccessDeniedError() + } + } +} diff --git a/src/components/persistentStorage/PersistentStorageLocalFS.ts b/src/components/persistentStorage/PersistentStorageLocalFS.ts new file mode 100644 index 000000000..fa02c34f7 --- /dev/null +++ b/src/components/persistentStorage/PersistentStorageLocalFS.ts @@ -0,0 +1,162 @@ +import fs from 'fs' +import fsp from 'fs/promises' +import path from 'path' +import { pipeline } from 'stream/promises' +import { randomUUID } from 'crypto' + +import type { AccessList } from '../../@types/AccessList.js' +import type { PersistentStorageLocalFSOptions } from '../../@types/PersistentStorage.js' +import type { BaseFileObject } from '../../@types/fileObject.js' + +import { + CreateBucketResult, + PersistentStorageBucketRecord, + PersistentStorageFactory, + PersistentStorageFileInfo +} from './PersistentStorageFactory.js' +import { OceanNode } from '../../OceanNode.js' + +type LocalFileObject = BaseFileObject & { + type: 'localfs' + bucketId: string + fileName: string +} + +export class PersistentStorageLocalFS extends PersistentStorageFactory { + private baseFolder: string + + constructor(node: OceanNode) { + super(node) + const options = node.getConfig().persistentStorage + .options as PersistentStorageLocalFSOptions + + this.baseFolder = options.folder + } + + async init(): Promise { + await fsp.mkdir(this.baseFolder, { recursive: true }) + await super.dbCreateTables() + } + + private bucketPath(bucketId: string): string { + return path.join(this.baseFolder, 'buckets', bucketId) + } + + private async ensureBucketExists(bucketId: string): Promise { + const row = await this.dbGetBucket(bucketId) + if (!row) { + throw new Error(`Bucket not found: ${bucketId}`) + } + } + + async listBuckets(owner: string): Promise { + await this.init() + return super.listBuckets(owner) + } + + async createNewBucket( + accessList: AccessList[], + owner: string + ): Promise { + await this.init() + + const bucketId = randomUUID() + const createdAt = Math.floor(Date.now() / 1000) + await fsp.mkdir(this.bucketPath(bucketId), { recursive: true }) + await super.dbUpsertBucket( + bucketId, + owner, + JSON.stringify(accessList ?? []), + createdAt + ) + + return { bucketId, owner, accessList } + } + + async listFiles( + bucketId: string, + consumerAddress: string + ): Promise { + await this.init() + await this.ensureBucketExists(bucketId) + await this.assertConsumerAllowedForBucket(consumerAddress, bucketId) + + const dir = this.bucketPath(bucketId) + const entries = await fsp.readdir(dir, { withFileTypes: true }) + const out: PersistentStorageFileInfo[] = [] + + for (const ent of entries) { + if (!ent.isFile()) continue + const filePath = path.join(dir, ent.name) + const st = await fsp.stat(filePath) + out.push({ + bucketId, + name: ent.name, + size: st.size, + lastModified: Math.floor(st.mtimeMs) + }) + } + + return out + } + + async uploadFile( + bucketId: string, + fileName: string, + content: NodeJS.ReadableStream, + consumerAddress: string + ): Promise { + await this.init() + await this.ensureBucketExists(bucketId) + await this.assertConsumerAllowedForBucket(consumerAddress, bucketId) + + if (!fileName || fileName.includes('/') || fileName.includes('\\')) { + throw new Error('Invalid fileName') + } + + const targetDir = this.bucketPath(bucketId) + await fsp.mkdir(targetDir, { recursive: true }) + const targetPath = path.join(targetDir, fileName) + + await pipeline(content, fs.createWriteStream(targetPath)) + + const st = await fsp.stat(targetPath) + return { + bucketId, + name: fileName, + size: st.size, + lastModified: Math.floor(st.mtimeMs) + } + } + + async deleteFile( + bucketId: string, + fileName: string, + consumerAddress: string + ): Promise { + await this.init() + await this.ensureBucketExists(bucketId) + await this.assertConsumerAllowedForBucket(consumerAddress, bucketId) + + const targetPath = path.join(this.bucketPath(bucketId), fileName) + await fsp.rm(targetPath, { force: true }) + } + + async getFileObject( + bucketId: string, + fileName: string, + consumerAddress: string + ): Promise { + await this.init() + await this.ensureBucketExists(bucketId) + await this.assertConsumerAllowedForBucket(consumerAddress, bucketId) + + // This is intentionally not a downloadable URL; compute backends can interpret this object. + const obj: LocalFileObject = { + type: 'localfs', + bucketId, + fileName + } + return obj + } +} diff --git a/src/components/persistentStorage/PersistentStorageS3.ts b/src/components/persistentStorage/PersistentStorageS3.ts new file mode 100644 index 000000000..4be68c9dd --- /dev/null +++ b/src/components/persistentStorage/PersistentStorageS3.ts @@ -0,0 +1,74 @@ +import { + CreateBucketResult, + PersistentStorageBucketRecord, + PersistentStorageFactory, + PersistentStorageFileInfo +} from './PersistentStorageFactory.js' + +import type { AccessList } from '../../@types/AccessList.js' +import type { PersistentStorageS3Options } from '../../@types/PersistentStorage.js' +import type { BaseFileObject } from '../../@types/fileObject.js' +import { OceanNode } from '../../OceanNode.js' + +export class PersistentStorageS3 extends PersistentStorageFactory { + private options: PersistentStorageS3Options + constructor(node: OceanNode) { + super(node) + this.options = node.getConfig().persistentStorage + .options as PersistentStorageS3Options + } + + // eslint-disable-next-line require-await + async init(): Promise { + throw new Error('PersistentStorageS3 is not implemented yet') + } + + async listBuckets(owner: string): Promise { + await this.init() + return super.listBuckets(owner) + } + + // eslint-disable-next-line require-await + async createNewBucket( + accessList: AccessList[], + _owner: string + ): Promise { + throw new Error('PersistentStorageS3 is not implemented yet') + } + + // eslint-disable-next-line require-await + async listFiles( + _bucketId: string, + _consumerAddress: string + ): Promise { + throw new Error('PersistentStorageS3 is not implemented yet') + } + + // eslint-disable-next-line require-await + async uploadFile( + _bucketId: string, + _fileName: string, + _content: Buffer | NodeJS.ReadableStream, + _consumerAddress: string + ): Promise { + throw new Error('PersistentStorageS3 is not implemented yet') + } + + // eslint-disable-next-line require-await + async deleteFile( + _bucketId: string, + _fileName: string, + _consumerAddress: string + ): Promise { + throw new Error('PersistentStorageS3 is not implemented yet') + } + + // eslint-disable-next-line require-await + async getFileObject( + _bucketId: string, + _fileName: string, + _consumerAddress: string + ): Promise { + throw new Error('PersistentStorageS3 is not implemented yet') + } +} diff --git a/src/components/persistentStorage/createPersistentStorage.ts b/src/components/persistentStorage/createPersistentStorage.ts new file mode 100644 index 000000000..c8588cde4 --- /dev/null +++ b/src/components/persistentStorage/createPersistentStorage.ts @@ -0,0 +1,23 @@ +import { OceanNode } from '../../OceanNode.js' + +import type { PersistentStorageFactory } from './PersistentStorageFactory.js' +import { PersistentStorageLocalFS } from './PersistentStorageLocalFS.js' +import { PersistentStorageS3 } from './PersistentStorageS3.js' + +export function createPersistentStorage(node: OceanNode): PersistentStorageFactory { + const config = node.getConfig().persistentStorage + if (!config?.enabled) { + throw new Error('Persistent storage is disabled') + } + + switch (config.type) { + case 'localfs': + return new PersistentStorageLocalFS(node) + case 's3': + return new PersistentStorageS3(node) + default: + throw new Error( + `Unsupported persistent storage type: ${(config as { type?: string })?.type}` + ) + } +} diff --git a/src/components/persistentStorage/index.ts b/src/components/persistentStorage/index.ts new file mode 100644 index 000000000..00ad60b6a --- /dev/null +++ b/src/components/persistentStorage/index.ts @@ -0,0 +1,4 @@ +export * from './PersistentStorageFactory.js' +export { createPersistentStorage } from './createPersistentStorage.js' +export * from './PersistentStorageLocalFS.js' +export * from './PersistentStorageS3.js' diff --git a/src/test/integration/accessLists.test.ts b/src/test/integration/accessLists.test.ts index fe901f65e..18e62fd7d 100644 --- a/src/test/integration/accessLists.test.ts +++ b/src/test/integration/accessLists.test.ts @@ -17,7 +17,7 @@ import { AccessListContract, OceanNodeConfig } from '../../@types/OceanNode.js' import { homedir } from 'os' import { getConfiguration } from '../../utils/config.js' import { assert, expect } from 'chai' -import { checkAddressOnAccessList } from '../../utils/accessList.js' +import { checkAddressOnAccessListWithSigner } from '../../utils/accessList.js' import { KeyManager } from '../../components/KeyManager/index.js' describe('Should deploy some accessLists before all other tests.', () => { @@ -174,7 +174,11 @@ describe('Should deploy some accessLists before all other tests.', () => { for (let i = 0; i < wallets.length; i++) { const account = await wallets[i].getAddress() expect( - (await checkAddressOnAccessList(accessListAddress, account, owner)) === true, + (await checkAddressOnAccessListWithSigner( + accessListAddress, + account, + owner + )) === true, `Address ${account} has no balance on Access List ${accessListAddress}, so its not Authorized` ) } @@ -187,7 +191,11 @@ describe('Should deploy some accessLists before all other tests.', () => { for (let i = wallets.length; i < 4; i++) { const account = await (await provider.getSigner(i)).getAddress() expect( - (await checkAddressOnAccessList(accessListAddress, account, owner)) === false, + (await checkAddressOnAccessListWithSigner( + accessListAddress, + account, + owner + )) === false, `Address ${account} should not be part Access List ${accessListAddress}, therefore its not Authorized` ) } diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index 77ca097dc..c5ff1e258 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -58,7 +58,8 @@ import { buildEnvOverrideConfig, getMockSupportedNetworks, setupEnvironment, - tearDownEnvironment + tearDownEnvironment, + sleep } from '../utils/utils.js' import { ProviderFees, ProviderComputeInitializeResults } from '../../@types/Fees.js' @@ -83,8 +84,6 @@ import { C2DEngineDocker } from '../../components/c2d/compute_engine_docker.js' import { createHashForSignature, safeSign } from '../utils/signature.js' import { create256Hash } from '../../utils/crypt.js' -const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) - /** * Polls getComputeEnvironments until every environment's resources (and free.resources) * have inUse === 0. Use with the same pattern as the compute tests: pass a callback that diff --git a/src/test/integration/persistentStorage.test.ts b/src/test/integration/persistentStorage.test.ts new file mode 100644 index 000000000..ddd1bb10a --- /dev/null +++ b/src/test/integration/persistentStorage.test.ts @@ -0,0 +1,389 @@ +import { expect } from 'chai' +import fsp from 'fs/promises' +import os from 'os' +import path from 'path' +import { Readable } from 'stream' +import { getAddress, JsonRpcProvider, Signer } from 'ethers' + +import { Database } from '../../components/database/index.js' +import { + PersistentStorageCreateBucketHandler, + PersistentStorageDeleteFileHandler, + PersistentStorageGetBucketsHandler, + PersistentStorageListFilesHandler, + PersistentStorageUploadFileHandler +} from '../../components/core/handler/persistentStorage.js' +import { OceanNode } from '../../OceanNode.js' +import type { AccessList } from '../../@types/AccessList.js' +import { ENVIRONMENT_VARIABLES, PROTOCOL_COMMANDS } from '../../utils/constants.js' +import { getConfiguration } from '../../utils/config.js' +import { streamToObject } from '../../utils/util.js' +import { + DEFAULT_TEST_TIMEOUT, + OverrideEnvConfig, + TEST_ENV_CONFIG_FILE, + buildEnvOverrideConfig, + setupEnvironment, + tearDownEnvironment, + sleep +} from '../utils/utils.js' +import { createHashForSignature, safeSign } from '../utils/signature.js' + +import { BlockchainRegistry } from '../../components/BlockchainRegistry/index.js' +import { Blockchain } from '../../utils/blockchain.js' +import { RPCS, SupportedNetwork } from '../../@types/blockchain.js' +import { DEVELOPMENT_CHAIN_ID } from '../../utils/address.js' +import { deployAndGetAccessListConfig } from '../utils/contracts.js' +import { OceanNodeConfig } from '../../@types/OceanNode.js' +import { KeyManager } from '../../components/KeyManager/index.js' + +describe('Persistent storage handlers (integration)', function () { + this.timeout(DEFAULT_TEST_TIMEOUT) + + let previousConfiguration: OverrideEnvConfig[] + let config: OceanNodeConfig + let database: Database + let oceanNode: OceanNode + let consumer: Signer + let psRoot: string + + let provider: JsonRpcProvider + let blockchain: Blockchain + let owner: Signer + let wallets: Signer[] = [] + let forbiddenConsumer: Signer + + before(async () => { + provider = new JsonRpcProvider('http://127.0.0.1:8545') + config = await getConfiguration() // Force reload the configuration + + wallets = [ + (await provider.getSigner(0)) as Signer, + (await provider.getSigner(1)) as Signer, + (await provider.getSigner(2)) as Signer, + (await provider.getSigner(3)) as Signer + ] + forbiddenConsumer = (await provider.getSigner(4)) as Signer + + const rpcs: RPCS = config.supportedNetworks + const chain: SupportedNetwork = rpcs[String(DEVELOPMENT_CHAIN_ID)] + const keyManager = new KeyManager(config) + const blockchains = new BlockchainRegistry(keyManager, config) + blockchain = blockchains.getBlockchain(chain.chainId) + + owner = await blockchain.getSigner() + + // ENVIRONMENT_VARIABLES.AUTHORIZED_PUBLISHERS_LIST + const accessListPublishers = await deployAndGetAccessListConfig( + owner, + provider, + wallets + ) + previousConfiguration = await setupEnvironment( + TEST_ENV_CONFIG_FILE, + buildEnvOverrideConfig( + [ENVIRONMENT_VARIABLES.PRIVATE_KEY], + ['0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58'] + ) + ) + + config = await getConfiguration(true) + psRoot = await fsp.mkdtemp(path.join(os.tmpdir(), 'ocean-ps-it-')) + config.persistentStorage = { + enabled: true, + type: 'localfs', + accessLists: [accessListPublishers], + options: { folder: psRoot } + } + + database = await Database.init(config.dbConfig) + oceanNode = await OceanNode.getInstance( + config, + database, + undefined, + undefined, + undefined, + undefined, + undefined, + true + ) + + consumer = (await provider.getSigner(1)) as Signer + }) + + after(async () => { + await tearDownEnvironment(previousConfiguration) + // await fsp.rm(psRoot, { recursive: true, force: true }) + }) + + it('create bucket → upload → list → delete (happy path)', async () => { + const consumerAddress = await consumer.getAddress() + let nonce = Date.now().toString() + let messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + let signature = await safeSign(consumer, messageHashBytes) + + const createRes = await new PersistentStorageCreateBucketHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress, + signature, + nonce, + accessLists: [], + authorization: undefined + } as any) + + expect(createRes.status.httpStatus).to.equal(200) + expect(createRes.stream).to.be.instanceOf(Readable) + const created = await streamToObject(createRes.stream as Readable) + expect(created.bucketId).to.be.a('string') + expect(getAddress(created.owner)).to.equal(getAddress(consumerAddress)) + const bucketId = created.bucketId as string + + const fileName = 'hello.txt' + const body = Buffer.from('persistent-storage-it') + + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE + ) + signature = await safeSign(consumer, messageHashBytes) + const uploadRes = await new PersistentStorageUploadFileHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, + consumerAddress, + signature, + nonce, + bucketId, + fileName, + stream: Readable.from(body) + } as any) + + expect(uploadRes.status.httpStatus).to.equal(200) + const uploaded = await streamToObject(uploadRes.stream as Readable) + expect(uploaded.name).to.equal(fileName) + expect(uploaded.size).to.equal(body.length) + await sleep(1000) + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES + ) + signature = await safeSign(consumer, messageHashBytes) + const listRes = await new PersistentStorageListFilesHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, + consumerAddress, + signature, + nonce, + bucketId, + authorization: undefined + } as any) + + expect(listRes.status.httpStatus).to.equal(200) + const listed = await streamToObject(listRes.stream as Readable) + expect(listed).to.be.an('array') + expect(listed.some((f: { name: string }) => f.name === fileName)).to.equal(true) + await sleep(1000) + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE + ) + signature = await safeSign(consumer, messageHashBytes) + const delRes = await new PersistentStorageDeleteFileHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE, + consumerAddress, + signature, + nonce, + chainId: 8996, + bucketId, + fileName, + authorization: undefined + } as any) + + expect(delRes.status.httpStatus).to.equal(200) + await sleep(1000) + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES + ) + signature = await safeSign(consumer, messageHashBytes) + const listAfterDel = await new PersistentStorageListFilesHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, + consumerAddress, + signature, + nonce, + bucketId, + authorization: undefined + } as any) + expect(listAfterDel.status.httpStatus).to.equal(200) + const listedAfter = await streamToObject(listAfterDel.stream as Readable) + expect(listedAfter.some((f: { name: string }) => f.name === fileName)).to.equal(false) + }) + + it('should not create bucket when consumer is not on allow list', async () => { + const forbiddenConsumerAddress = await forbiddenConsumer.getAddress() + const nonce = Date.now().toString() + const messageHashBytes = createHashForSignature( + forbiddenConsumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + const signature = await safeSign(forbiddenConsumer, messageHashBytes) + + const res = await new PersistentStorageCreateBucketHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress: forbiddenConsumerAddress, + signature, + nonce, + accessLists: [], + authorization: undefined + } as any) + + expect(res.status.httpStatus).to.equal(403) + expect(res.status.error).to.contain('not allowed') + }) + + it('getBuckets returns buckets the consumer can access', async () => { + const consumerAddress = await consumer.getAddress() + await sleep(1000) + let nonce = Date.now() + let messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS + ) + let signature = await safeSign(consumer, messageHashBytes) + const beforeCreate = await new PersistentStorageGetBucketsHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS, + consumerAddress, + signature, + nonce, + chainId: 8996, + owner: consumerAddress, + authorization: undefined + } as any) + expect(beforeCreate.status.httpStatus).to.equal(200) + const beforeList = await streamToObject(beforeCreate.stream as Readable) + expect(beforeList).to.be.an('array') + await sleep(1000) + nonce = Date.now() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + signature = await safeSign(consumer, messageHashBytes) + const createRes = await new PersistentStorageCreateBucketHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress, + signature, + nonce, + accessLists: [], + authorization: undefined + } as any) + expect(createRes.status.httpStatus).to.equal(200) + const created = await streamToObject(createRes.stream as Readable) + const newBucketId = created.bucketId as string + await sleep(1000) + nonce = Date.now() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS + ) + signature = await safeSign(consumer, messageHashBytes) + const afterCreate = await new PersistentStorageGetBucketsHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS, + consumerAddress, + signature, + nonce, + chainId: 8996, + owner: consumerAddress, + authorization: undefined + } as any) + expect(afterCreate.status.httpStatus).to.equal(200) + const afterList = await streamToObject(afterCreate.stream as Readable) + expect(afterList).to.be.an('array') + const found = afterList.find((b: { bucketId: string }) => b.bucketId === newBucketId) + expect(found).to.be.an('object') + expect(found.createdAt).to.be.a('number') + expect(getAddress(found.owner)).to.equal(getAddress(consumerAddress)) + expect(found.accessLists).to.be.an('array') + expect(afterList.length).to.be.at.least(beforeList.length + 1) + }) + + it('create bucket validate fails when accessLists is missing', async () => { + const consumerAddress = await consumer.getAddress() + await sleep(1000) + const nonce = Date.now().toString() + const messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + const signature = await safeSign(consumer, messageHashBytes) + const validation = await new PersistentStorageCreateBucketHandler(oceanNode).validate( + { + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress, + signature, + nonce + } as any + ) + + expect(validation.valid).to.equal(false) + expect(validation.reason).to.contain('accessLists') + }) + + it('returns error when persistent storage is disabled', async () => { + const disabledConfig = { + ...config, + persistentStorage: { + enabled: false, + type: 'localfs' as const, + accessLists: [] as AccessList[], + options: { folder: psRoot } + } + } + const nodeDisabled = await OceanNode.getInstance( + disabledConfig, + database, + undefined, + undefined, + undefined, + undefined, + undefined, + true + ) + + const consumerAddress = await consumer.getAddress() + await sleep(1000) + const nonce = Date.now().toString() + const messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + const signature = await safeSign(consumer, messageHashBytes) + + const res = await new PersistentStorageCreateBucketHandler(nodeDisabled).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress, + signature, + nonce, + accessLists: [], + authorization: undefined + } as any) + + expect(res.status.httpStatus).to.equal(500) + expect(res.status.error).to.match(/not configured|disabled/i) + }) +}) diff --git a/src/test/utils/contracts.ts b/src/test/utils/contracts.ts index ba1c4112b..4dc7bbfa9 100644 --- a/src/test/utils/contracts.ts +++ b/src/test/utils/contracts.ts @@ -55,7 +55,6 @@ export async function deployAccessListContract( if (!nameAccessList || !symbolAccessList) { throw new Error(`Access list symbol and name are required`) } - const contract = getContract(contractFactoryAddress, contractFactoryAbi, signer) try { @@ -125,7 +124,12 @@ export async function deployAndGetAccessListConfig( await wallets[2].getAddress(), await wallets[3].getAddress() ], - ['https://oceanprotocol.com/nft/'] + [ + 'https://oceanprotocol.com/nft/', + 'https://oceanprotocol.com/nft/', + 'https://oceanprotocol.com/nft/', + 'https://oceanprotocol.com/nft/' + ] ) if (!txAddress) { diff --git a/src/test/utils/utils.ts b/src/test/utils/utils.ts index 8376d6ae1..b3bf0917b 100644 --- a/src/test/utils/utils.ts +++ b/src/test/utils/utils.ts @@ -4,7 +4,7 @@ import { fileURLToPath } from 'url' import { DB_TYPES, ENVIRONMENT_VARIABLES, EnvVariable } from '../../utils/constants.js' import { CONFIG_LOGGER } from '../../utils/logging/common.js' import { RPCS } from '../../@types/blockchain.js' -import { getConfiguration } from '../../utils/config.js' +import { getConfiguration } from '../../utils/config/builder.js' export const DEFAULT_TEST_TIMEOUT = 20000 // 20 secs MAX // __dirname and __filename are not defined in ES module scope @@ -164,3 +164,5 @@ export function isRunningContinousIntegrationEnv(): boolean { export const SELECTED_RUN_DATABASE = new Date().getTime() % 2 === 0 ? DB_TYPES.ELASTIC_SEARCH : DB_TYPES.TYPESENSE CONFIG_LOGGER.debug(`SELECTED_RUN_DATABASE: ${SELECTED_RUN_DATABASE}`) + +export const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) diff --git a/src/utils/accessList.ts b/src/utils/accessList.ts index fbecbe462..0a995fc95 100644 --- a/src/utils/accessList.ts +++ b/src/utils/accessList.ts @@ -1,6 +1,8 @@ import AccessListJson from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' with { type: 'json' } import { ethers, Signer } from 'ethers' import { CORE_LOGGER } from './logging/common.js' +import { AccessList } from '../@types/AccessList.js' +import { OceanNode } from '../OceanNode.js' /** * @param accessList the access list contract address @@ -9,7 +11,7 @@ import { CORE_LOGGER } from './logging/common.js' * @param signer signer for the contract part * @returns true if the account has balanceOf > 0 OR if the accessList is empty OR does not contain info for this chain, false otherwise */ -export async function checkAddressOnAccessList( +export async function checkAddressOnAccessListWithSigner( accessListContractAddress: string, addressToCheck: string, signer: Signer @@ -40,3 +42,49 @@ export async function checkAddressOnAccessList( return false } } + +export async function checkAddressOnAccessList( + consumerAddress: string, + access: AccessList[], + oceanNode: OceanNode +): Promise { + if (!access || access.length === 0) { + return true + } + const config = oceanNode.getConfig() + const { supportedNetworks } = config + for (const accessListMap of access) { + if (!accessListMap) continue + for (const chain of Object.keys(accessListMap)) { + const { chainId } = supportedNetworks[chain] + try { + const blockchain = oceanNode.getBlockchain(chainId) + if (!blockchain) { + CORE_LOGGER.logMessage( + `Blockchain instance not available for chain ${chainId}, skipping access list check`, + true + ) + continue + } + const signer = await blockchain.getSigner() + for (const accessListAddress of accessListMap[chain]) { + const hasAccess = await checkAddressOnAccessListWithSigner( + accessListAddress, + consumerAddress, + signer + ) + if (hasAccess) { + return true + } + } + } catch (error) { + CORE_LOGGER.logMessage( + `Failed to check access lists on chain ${chain}: ${error.message}`, + true + ) + } + } + } + + return false +} diff --git a/src/utils/address.ts b/src/utils/address.ts index 9760ed3b5..624438f28 100644 --- a/src/utils/address.ts +++ b/src/utils/address.ts @@ -1,7 +1,7 @@ import fs from 'fs' import addresses from '@oceanprotocol/contracts/addresses/address.json' with { type: 'json' } import { CORE_LOGGER } from './logging/common.js' -import { isDefined } from './index.js' +import { isDefined } from './util.js' /** * Get the artifacts address from the address.json file diff --git a/src/utils/config/builder.ts b/src/utils/config/builder.ts index 7a14e8cee..f55906e81 100644 --- a/src/utils/config/builder.ts +++ b/src/utils/config/builder.ts @@ -9,8 +9,7 @@ import { C2DClusterType } from '../../@types/C2D/C2D.js' import fs from 'fs' import os from 'os' import path from 'path' -// import { hexStringToByteArray, computeCodebaseHash } from '../index.js' -import { computeCodebaseHash } from '../index.js' +import { computeCodebaseHash } from '../attestation.js' import { getOceanArtifactsAdresses, diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index e18df6de8..47ff86f78 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -84,6 +84,64 @@ export const OceanNodeDBConfigSchema = z.object({ dbType: z.string().nullable() }) +export const PersistentStorageConfigSchema = z + .object({ + enabled: z.boolean().optional().default(false), + type: z.enum(['localfs', 's3']).optional().default('localfs'), + accessLists: jsonFromString(z.array(z.record(z.string(), z.array(z.string())))) + .optional() + .default([]), + options: z.any().optional() + }) + .superRefine((data, ctx) => { + if (!data.enabled) return + + if (data.type === 'localfs') { + if (!data.options || typeof data.options !== 'object') { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: 'persistentStorage.options must be an object for localfs', + path: ['options'] + }) + return + } + if ( + typeof (data.options as any).folder !== 'string' || + !(data.options as any).folder + ) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: 'persistentStorage.options.folder is required for localfs', + path: ['options', 'folder'] + }) + } + } + + if (data.type === 's3') { + if (!data.options || typeof data.options !== 'object') { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: 'persistentStorage.options must be an object for s3', + path: ['options'] + }) + return + } + const required = ['endpoint', 'objectKey', 'accessKeyId', 'secretAccessKey'] + for (const key of required) { + if ( + typeof (data.options as any)[key] !== 'string' || + !(data.options as any)[key] + ) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: `persistentStorage.options.${key} is required for s3`, + path: ['options', key] + }) + } + } + } + }) + export const DockerRegistryAuthSchema = z .object({ username: z.string().optional(), @@ -329,6 +387,7 @@ export const OceanNodeConfigSchema = z DB_PASSWORD: z.string().optional(), DB_TYPE: z.string().optional(), dbConfig: OceanNodeDBConfigSchema.optional(), + persistentStorage: PersistentStorageConfigSchema.optional(), FEE_AMOUNT: z.string().optional(), FEE_TOKENS: z.string().optional(), diff --git a/src/utils/constants.ts b/src/utils/constants.ts index 756e11b1e..63fb34a4c 100644 --- a/src/utils/constants.ts +++ b/src/utils/constants.ts @@ -38,7 +38,12 @@ export const PROTOCOL_COMMANDS = { FETCH_CONFIG: 'fetchConfig', PUSH_CONFIG: 'pushConfig', GET_LOGS: 'getLogs', - JOBS: 'jobs' + JOBS: 'jobs', + PERSISTENT_STORAGE_CREATE_BUCKET: 'persistentStorageCreateBucket', + PERSISTENT_STORAGE_GET_BUCKETS: 'persistentStorageGetBuckets', + PERSISTENT_STORAGE_LIST_FILES: 'persistentStorageListFiles', + PERSISTENT_STORAGE_UPLOAD_FILE: 'persistentStorageUploadFile', + PERSISTENT_STORAGE_DELETE_FILE: 'persistentStorageDeleteFile' } // more visible, keep then close to make sure we always update both export const SUPPORTED_PROTOCOL_COMMANDS: string[] = [ @@ -78,7 +83,12 @@ export const SUPPORTED_PROTOCOL_COMMANDS: string[] = [ PROTOCOL_COMMANDS.FETCH_CONFIG, PROTOCOL_COMMANDS.PUSH_CONFIG, PROTOCOL_COMMANDS.GET_LOGS, - PROTOCOL_COMMANDS.JOBS + PROTOCOL_COMMANDS.JOBS, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE ] export const MetadataStates = { diff --git a/src/utils/credentials.ts b/src/utils/credentials.ts index d54692b4f..b21c25f21 100644 --- a/src/utils/credentials.ts +++ b/src/utils/credentials.ts @@ -3,7 +3,7 @@ import { AccessListContract } from '../@types/OceanNode.js' import { CORE_LOGGER } from './logging/common.js' import { Credential, Credentials, MATCH_RULES } from '@oceanprotocol/ddo-js' import { CREDENTIALS_TYPES } from '../@types/DDO/Credentials.js' -import { checkAddressOnAccessList } from './accessList.js' +import { checkAddressOnAccessListWithSigner } from './accessList.js' import { isDefined } from './util.js' /** @@ -203,7 +203,7 @@ export async function checkSingleCredential( try { // Check if the consumer address has tokens in the access list contract - const hasAccess = await checkAddressOnAccessList( + const hasAccess = await checkAddressOnAccessListWithSigner( accessListCredential.accessList, consumerAddress, signer @@ -252,7 +252,7 @@ export async function checkCredentialOnAccessList( if (chainsListed.length > 0 && chainsListed.includes(chainId)) { let isAuthorized = false for (const accessListAddress of accessList[chainId]) { - const result = await checkAddressOnAccessList( + const result = await checkAddressOnAccessListWithSigner( accessListAddress, addressToCheck, signer From 2b7ee29d76c4ad71cce549a62568b84ffd3a5ea2 Mon Sep 17 00:00:00 2001 From: alexcos20 Date: Tue, 7 Apr 2026 14:06:34 +0300 Subject: [PATCH 2/8] add docs --- README.md | 3 +- docs/API.md | 151 +++++++++++++++++++++++++++++++++ docs/persistentStorage.md | 172 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 325 insertions(+), 1 deletion(-) create mode 100644 docs/persistentStorage.md diff --git a/README.md b/README.md index f6b404ac0..c3d1d33ba 100644 --- a/README.md +++ b/README.md @@ -147,7 +147,8 @@ Your node is now running. To start additional nodes, repeat these steps in a new - [API Endpoints](docs/API.md) - [Environmental Variables](docs/env.md) - [Database Guide](docs/database.md) -- [Storage Types](docs/Storage.md) +- [Asset Storage Types](docs/Storage.md) +- [Persistent storage for c2d jobs](docs/persistentStorage.md) - [Testing Guide](docs/testing.md) - [Network Configuration](docs/networking.md) - [Logging & accessing logs](docs/networking.md) diff --git a/docs/API.md b/docs/API.md index b141a61d9..b1b08174c 100644 --- a/docs/API.md +++ b/docs/API.md @@ -1586,3 +1586,154 @@ returns job result #### Response File content + +--- + +## Persistent Storage + +### `HTTP` POST /api/services/persistentStorage/buckets + +#### Description + +Create a new persistent storage bucket. Bucket ownership is set to the request `consumerAddress`. + +#### Request Headers + +| name | type | required | description | +| --------------- | ------ | -------- | ----------- | +| Authorization | string | | auth token (optional; depends on node auth configuration) | + +#### Request Body + +```json +{ + "consumerAddress": "0x...", + "signature": "0x...", + "nonce": "123", + "accessLists": [] +} +``` + +#### Response (200) + +```json +{ + "bucketId": "uuid", + "owner": "0x...", + "accessList": [] +} +``` + +--- + +### `HTTP` GET /api/services/persistentStorage/buckets + +#### Description + +List buckets for a given `owner`. Results are filtered by bucket access lists for the calling consumer. + +#### Query Parameters + +| name | type | required | description | +| --------------- | ------ | -------- | ----------- | +| consumerAddress | string | v | consumer address | +| signature | string | v | signed message (consumerAddress + nonce + command) | +| nonce | string | v | request nonce | +| chainId | number | v | chain id (used by auth/signature checks) | +| owner | string | v | bucket owner to filter by | + +#### Response (200) + +```json +[ + { + "bucketId": "uuid", + "owner": "0x...", + "createdAt": 1710000000, + "accessLists": [] + } +] +``` + +--- + +### `HTTP` GET /api/services/persistentStorage/buckets/:bucketId/files + +#### Description + +List files in a bucket. + +#### Query Parameters + +| name | type | required | description | +| --------------- | ------ | -------- | ----------- | +| consumerAddress | string | v | consumer address | +| signature | string | v | signed message (consumerAddress + nonce + command) | +| nonce | string | v | request nonce | + +#### Response (200) + +```json +[ + { + "bucketId": "uuid", + "name": "hello.txt", + "size": 123, + "lastModified": 1710000000 + } +] +``` + +--- + +### `HTTP` POST /api/services/persistentStorage/buckets/:bucketId/files/:fileName + +#### Description + +Upload a file to a bucket. The request body is treated as raw bytes. + +#### Query Parameters + +| name | type | required | description | +| --------------- | ------ | -------- | ----------- | +| consumerAddress | string | v | consumer address | +| signature | string | v | signed message (consumerAddress + nonce + command) | +| nonce | string | v | request nonce | + +#### Request Body + +Raw bytes (any content-type). + +#### Response (200) + +```json +{ + "bucketId": "uuid", + "name": "hello.txt", + "size": 123, + "lastModified": 1710000000 +} +``` + +--- + +### `HTTP` DELETE /api/services/persistentStorage/buckets/:bucketId/files/:fileName + +#### Description + +Delete a file from a bucket. + +#### Query Parameters + +| name | type | required | description | +| --------------- | ------ | -------- | ----------- | +| consumerAddress | string | v | consumer address | +| signature | string | v | signed message (consumerAddress + nonce + command) | +| nonce | string | v | request nonce | +| chainId | number | v | chain id (used by auth/signature checks) | + +#### Response (200) + +```json +{ "success": true } +``` diff --git a/docs/persistentStorage.md b/docs/persistentStorage.md new file mode 100644 index 000000000..db1396c6b --- /dev/null +++ b/docs/persistentStorage.md @@ -0,0 +1,172 @@ +# Persistent Storage + +This document describes Ocean Node **Persistent Storage** at a high level: what it is, how it is structured, how access control works, and how to use it via **P2P commands** and **HTTP endpoints**. + +--- + +## What it is + +Persistent Storage is a simple bucket + file store intended for **long-lived artifacts** that Ocean Node needs to keep across requests (and potentially across restarts), and to reference later (e.g. as file objects for compute). + +Key primitives: +- **Bucket**: a logical container for files. +- **File**: binary content stored inside a bucket. +- **Bucket registry**: a local SQLite table that stores bucket metadata (owner, access lists, createdAt). + +--- + +## Architecture (high level) + +### Components + +- **Handlers (protocol layer)**: `src/components/core/handler/persistentStorage.ts` + - Implements protocol commands such as create bucket, list files, upload, delete, and get buckets. + - Validates auth (token or signature) and applies high-level authorization checks. + +- **Persistent storage backends (storage layer)**: `src/components/persistentStorage/*` + - `PersistentStorageFactory`: shared functionality (SQLite bucket registry, access list checks). + - `PersistentStorageLocalFS`: local filesystem backend. + - `PersistentStorageS3`: stub for future S3-compatible backend. + +- **HTTP routes (HTTP interface)**: `src/components/httpRoutes/persistentStorage.ts` + - Exposes REST-ish endpoints under `/api/services/persistentStorage/...` that call the same handlers. + +### Data storage + +Persistent Storage uses two stores: + +1) **Bucket registry (SQLite)** +- File: `databases/persistentStorage.sqlite` +- Table: `persistent_storage_buckets` +- Columns: + - `bucketId` (primary key) + - `owner` (address, stored as a string) + - `accessListJson` (JSON-encoded access list array) + - `createdAt` (unix timestamp) + +2) **Backend data** +- `localfs`: writes file bytes to the configured folder under `buckets//`. +- `s3`: not implemented yet. + +--- + +## Ownership and access control + +### Ownership + +Every bucket has a single **owner** address, stored in the bucket registry. + +- When a bucket is created, the node sets: + - `owner = consumerAddress` (normalized via `ethers.getAddress`) + +### Bucket access list + +Each bucket stores an **AccessList[]** (per-chain list(s) of access list contract addresses): + +```ts +export interface AccessList { + [chainId: string]: string[] +} +``` + +This access list is used to decide whether a given `consumerAddress` is allowed to interact with a bucket. + +### Where checks happen + +Access checks happen at two levels: + +1) **Backend enforcement** (required) +- Backend operations `listFiles`, `uploadFile`, `deleteFile`, and `getFileObject` all require `consumerAddress`. +- The base class helper `assertConsumerAllowedForBucket(consumerAddress, bucketId)` loads the bucket ACL and throws `PersistentStorageAccessDeniedError` if the consumer is not allowed. + +2) **Handler enforcement** (command-specific) +- `createBucket`: additionally checks the node-level allow list `config.persistentStorage.accessLists` (who can create buckets at all). +- `getBuckets`: queries registry rows filtered by `owner` and then: + - if `consumerAddress === owner`: returns all buckets for that owner + - else: filters buckets by the bucket ACL + +### Error behavior + +- Backends throw `PersistentStorageAccessDeniedError` when forbidden. +- Handlers translate that into **HTTP 403** / `status.httpStatus = 403`. + +--- + +## Features + +### Supported today + +- **Create bucket** + - Creates a bucket id (UUID), persists it in SQLite with `owner` and `accessListJson`, and creates a local directory (localfs). + +- **List buckets (by owner)** + - Returns buckets from the registry filtered by `owner` (mandatory arg). + - Applies ACL filtering for non-owners. + +- **Upload file** + - Writes a stream to the backend. + - Enforces bucket ACL. + +- **List files** + - Returns file metadata (`name`, `size`, `lastModified`) for a bucket. + - Enforces bucket ACL. + +- **Delete file** + - Deletes the named file from the bucket. + - Enforces bucket ACL. + +### Not implemented yet + +- **S3 backend** + - `PersistentStorageS3` exists as a placeholder and currently throws “not implemented”. + +--- + +## Configuration + +Persistent storage is controlled by `persistentStorage` in node config. + +Key fields: +- `enabled`: boolean +- `type`: `"localfs"` or `"s3"` +- `accessLists`: AccessList[] — node-level allow list to create buckets +- `options`: + - localfs: `{ "folder": "/path/to/storage" }` + - s3: `{ endpoint, objectKey, accessKeyId, secretAccessKey, ... }` (future) + +--- + +## Usage + +### P2P commands + +All persistent storage operations are implemented as protocol commands in the handler: +- `persistentStorageCreateBucket` +- `persistentStorageGetBuckets` +- `persistentStorageListFiles` +- `persistentStorageUploadFile` +- `persistentStorageDeleteFile` + +Each command requires authentication (token or signature) based on Ocean Node’s auth configuration. + +### HTTP endpoints + +HTTP routes are available under `/api/services/persistentStorage/...` and call the same handlers. See `docs/API.md` for the full parameter lists and examples. + +At a glance: +- `POST /api/services/persistentStorage/buckets` +- `GET /api/services/persistentStorage/buckets` +- `GET /api/services/persistentStorage/buckets/:bucketId/files` +- `POST /api/services/persistentStorage/buckets/:bucketId/files/:fileName` +- `DELETE /api/services/persistentStorage/buckets/:bucketId/files/:fileName` + +Upload uses the raw request body as bytes and forwards it to the handler as a stream. + +--- + +## Limitations and notes + +- The bucket registry is local to the node (SQLite file). If you run multiple nodes, each node’s registry is independent unless you externalize/replicate it. +- `listBuckets(owner)` requires `owner` and only returns buckets that were created with that owner recorded. +- Filenames in `localfs` are constrained (no path separators) to avoid path traversal. + From e4403d5d90996d85ea35bd24e15ce80e76ec1cfe Mon Sep 17 00:00:00 2001 From: alexcos20 Date: Tue, 7 Apr 2026 14:45:25 +0300 Subject: [PATCH 3/8] fix typo in accesslist --- src/utils/accessList.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/accessList.ts b/src/utils/accessList.ts index 0a995fc95..21d7671f3 100644 --- a/src/utils/accessList.ts +++ b/src/utils/accessList.ts @@ -49,7 +49,7 @@ export async function checkAddressOnAccessList( oceanNode: OceanNode ): Promise { if (!access || access.length === 0) { - return true + return false } const config = oceanNode.getConfig() const { supportedNetworks } = config From 5901f4cb8c8f3f671b9da054bc7a80396968bb42 Mon Sep 17 00:00:00 2001 From: alexcos20 Date: Tue, 7 Apr 2026 15:20:52 +0300 Subject: [PATCH 4/8] add some logs --- src/OceanNode.ts | 4 ++++ src/test/integration/persistentStorage.test.ts | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/OceanNode.ts b/src/OceanNode.ts index dfaf5d6a1..a8c9c0aac 100644 --- a/src/OceanNode.ts +++ b/src/OceanNode.ts @@ -77,8 +77,12 @@ export class OceanNode { this.blockchainRegistry ) if (this.config.persistentStorage?.enabled) { + OCEAN_NODE_LOGGER.info( + `Starting PersistenStorage with type ${this.config.persistentStorage.type}` + ) this.persistentStorage = createPersistentStorage(this) } else { + OCEAN_NODE_LOGGER.info(`Starting without PersistenStorage`) this.persistentStorage = null } } diff --git a/src/test/integration/persistentStorage.test.ts b/src/test/integration/persistentStorage.test.ts index ddd1bb10a..be0d01b20 100644 --- a/src/test/integration/persistentStorage.test.ts +++ b/src/test/integration/persistentStorage.test.ts @@ -161,7 +161,7 @@ describe('Persistent storage handlers (integration)', function () { fileName, stream: Readable.from(body) } as any) - + console.log(uploadRes) expect(uploadRes.status.httpStatus).to.equal(200) const uploaded = await streamToObject(uploadRes.stream as Readable) expect(uploaded.name).to.equal(fileName) From 8856c6be7593b8d9fd023c7d5d863cbad7f72480 Mon Sep 17 00:00:00 2001 From: alexcos20 Date: Tue, 7 Apr 2026 16:02:17 +0300 Subject: [PATCH 5/8] add more tests --- .../PersistentStorageFactory.ts | 27 ++++++- .../integration/persistentStorage.test.ts | 71 ++++++++++++++++++- 2 files changed, 94 insertions(+), 4 deletions(-) diff --git a/src/components/persistentStorage/PersistentStorageFactory.ts b/src/components/persistentStorage/PersistentStorageFactory.ts index 8d88f13fb..6ceee168d 100644 --- a/src/components/persistentStorage/PersistentStorageFactory.ts +++ b/src/components/persistentStorage/PersistentStorageFactory.ts @@ -3,6 +3,7 @@ import type { BaseFileObject } from '../../@types/fileObject.js' import sqlite3, { RunResult } from 'sqlite3' import path from 'path' import fs from 'fs' +import { getAddress } from 'ethers' import { OceanNode } from '../../OceanNode.js' import { checkAddressOnAccessList } from '../../utils/accessList.js' @@ -13,6 +14,14 @@ export class PersistentStorageAccessDeniedError extends Error { } } +function normalizeWeb3Address(addr: string): string { + try { + return getAddress(addr) + } catch { + return (addr ?? '').toLowerCase() + } +} + function parseBucketAccessListsJson(accessListJson: string): AccessList[] { try { const parsed = JSON.parse(accessListJson || '[]') @@ -100,7 +109,7 @@ export abstract class PersistentStorageFactory { async getBucketAccessList(bucketId: string): Promise { await this.dbCreateTables() try { - const row = await this.dbGetBucket(bucketId) + const row = await this.getBucket(bucketId) if (!row) { return [] } @@ -110,6 +119,16 @@ export abstract class PersistentStorageFactory { } } + async getBucket(bucketId: string): Promise { + await this.dbCreateTables() + try { + const row = await this.dbGetBucket(bucketId) + return row + } catch { + return null + } + } + /** * Lists buckets for a given owner from the SQLite registry (metadata only). * `owner` must already be normalized (e.g. checksummed `getAddress`). @@ -201,7 +220,11 @@ export abstract class PersistentStorageFactory { consumerAddress: string, bucketId: string ): Promise { - const accessLists = await this.getBucketAccessList(bucketId) + const bucket = await this.getBucket(bucketId) + const accessLists = parseBucketAccessListsJson(bucket.accessListJson) + if (normalizeWeb3Address(consumerAddress) === normalizeWeb3Address(bucket.owner)) { + return + } if (!(await this.isAllowed(consumerAddress, accessLists))) { throw new PersistentStorageAccessDeniedError() } diff --git a/src/test/integration/persistentStorage.test.ts b/src/test/integration/persistentStorage.test.ts index be0d01b20..f8ddf72a0 100644 --- a/src/test/integration/persistentStorage.test.ts +++ b/src/test/integration/persistentStorage.test.ts @@ -52,6 +52,7 @@ describe('Persistent storage handlers (integration)', function () { let owner: Signer let wallets: Signer[] = [] let forbiddenConsumer: Signer + let bucketAllowList: any before(async () => { provider = new JsonRpcProvider('http://127.0.0.1:8545') @@ -79,6 +80,7 @@ describe('Persistent storage handlers (integration)', function () { provider, wallets ) + bucketAllowList = accessListPublishers previousConfiguration = await setupEnvironment( TEST_ENV_CONFIG_FILE, buildEnvOverrideConfig( @@ -92,7 +94,7 @@ describe('Persistent storage handlers (integration)', function () { config.persistentStorage = { enabled: true, type: 'localfs', - accessLists: [accessListPublishers], + accessLists: [bucketAllowList], options: { folder: psRoot } } @@ -161,7 +163,6 @@ describe('Persistent storage handlers (integration)', function () { fileName, stream: Readable.from(body) } as any) - console.log(uploadRes) expect(uploadRes.status.httpStatus).to.equal(200) const uploaded = await streamToObject(uploadRes.stream as Readable) expect(uploaded.name).to.equal(fileName) @@ -251,6 +252,72 @@ describe('Persistent storage handlers (integration)', function () { expect(res.status.error).to.contain('not allowed') }) + it('should deny forbiddenConsumer for bucket operations when bucket has accessList', async () => { + // Create a bucket whose ACL allows only wallets[0..3] + const consumerAddress = await consumer.getAddress() + let nonce = Date.now().toString() + let messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + let signature = await safeSign(consumer, messageHashBytes) + + const createRes = await new PersistentStorageCreateBucketHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress, + signature, + nonce, + accessLists: [bucketAllowList], + authorization: undefined + } as any) + + expect(createRes.status.httpStatus).to.equal(200) + const created = await streamToObject(createRes.stream as Readable) + const bucketId = created.bucketId as string + + // Forbidden consumer tries to list files -> should fail + const forbiddenConsumerAddress = await forbiddenConsumer.getAddress() + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + forbiddenConsumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES + ) + signature = await safeSign(forbiddenConsumer, messageHashBytes) + const listRes = await new PersistentStorageListFilesHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, + consumerAddress: forbiddenConsumerAddress, + signature, + nonce, + bucketId, + authorization: undefined + } as any) + expect(listRes.status.httpStatus).to.equal(403) + expect(listRes.status.error).to.contain('not allowed') + + // Forbidden consumer tries to upload -> should fail + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + forbiddenConsumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE + ) + signature = await safeSign(forbiddenConsumer, messageHashBytes) + const uploadRes = await new PersistentStorageUploadFileHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, + consumerAddress: forbiddenConsumerAddress, + signature, + nonce, + bucketId, + fileName: 'forbidden.txt', + stream: Readable.from(Buffer.from('nope')), + authorization: undefined + } as any) + expect(uploadRes.status.httpStatus).to.equal(403) + expect(uploadRes.status.error).to.contain('not allowed') + }) + it('getBuckets returns buckets the consumer can access', async () => { const consumerAddress = await consumer.getAddress() await sleep(1000) From ac34d48cc68a1a1acc25567f5e067ad46efb85b5 Mon Sep 17 00:00:00 2001 From: alexcos20 Date: Thu, 9 Apr 2026 10:20:03 +0300 Subject: [PATCH 6/8] add docker mounting --- docs/API.md | 26 ++ docs/persistentStorage.md | 2 + src/@types/PersistentStorage.ts | 19 +- src/@types/commands.ts | 8 + src/components/c2d/compute_engine_docker.ts | 62 +++ src/components/core/compute/initialize.ts | 20 + src/components/core/compute/startCompute.ts | 38 +- .../core/handler/coreHandlersRegistry.ts | 5 + .../core/handler/persistentStorage.ts | 55 +++ .../httpRoutes/persistentStorage.ts | 31 ++ .../PersistentStorageFactory.ts | 122 +++++- .../PersistentStorageLocalFS.ts | 72 +++- .../persistentStorage/PersistentStorageS3.ts | 18 +- src/test/integration/compute.test.ts | 388 +++++++++++++++++- .../integration/persistentStorage.test.ts | 223 ++++++++++ src/utils/constants.ts | 2 + 16 files changed, 1070 insertions(+), 21 deletions(-) diff --git a/docs/API.md b/docs/API.md index b1b08174c..4abcc55d4 100644 --- a/docs/API.md +++ b/docs/API.md @@ -1686,6 +1686,32 @@ List files in a bucket. --- +### `HTTP` GET /api/services/persistentStorage/buckets/:bucketId/files/:fileName/object + +#### Description + +Return the `fileObject` for a specific file in a bucket (useful for passing references to other subsystems like compute). + +#### Query Parameters + +| name | type | required | description | +| --------------- | ------ | -------- | ----------- | +| consumerAddress | string | v | consumer address | +| signature | string | v | signed message (consumerAddress + nonce + command) | +| nonce | string | v | request nonce | + +#### Response (200) + +```json +{ + "type": "nodePersistentStorage", + "bucketId": "uuid", + "fileName": "hello.txt" +} +``` + +--- + ### `HTTP` POST /api/services/persistentStorage/buckets/:bucketId/files/:fileName #### Description diff --git a/docs/persistentStorage.md b/docs/persistentStorage.md index db1396c6b..7f1144366 100644 --- a/docs/persistentStorage.md +++ b/docs/persistentStorage.md @@ -144,6 +144,7 @@ All persistent storage operations are implemented as protocol commands in the ha - `persistentStorageCreateBucket` - `persistentStorageGetBuckets` - `persistentStorageListFiles` +- `persistentStorageGetFileObject` - `persistentStorageUploadFile` - `persistentStorageDeleteFile` @@ -157,6 +158,7 @@ At a glance: - `POST /api/services/persistentStorage/buckets` - `GET /api/services/persistentStorage/buckets` - `GET /api/services/persistentStorage/buckets/:bucketId/files` +- `GET /api/services/persistentStorage/buckets/:bucketId/files/:fileName/object` - `POST /api/services/persistentStorage/buckets/:bucketId/files/:fileName` - `DELETE /api/services/persistentStorage/buckets/:bucketId/files/:fileName` diff --git a/src/@types/PersistentStorage.ts b/src/@types/PersistentStorage.ts index 52b751691..67b0448a2 100644 --- a/src/@types/PersistentStorage.ts +++ b/src/@types/PersistentStorage.ts @@ -1,5 +1,5 @@ import type { AccessList } from './AccessList' - +import type { BaseFileObject } from './fileObject.js' export type PersistentStorageType = 'localfs' | 's3' export interface PersistentStorageLocalFSOptions { @@ -22,3 +22,20 @@ export interface PersistentStorageConfig { accessLists: AccessList[] options: PersistentStorageLocalFSOptions | PersistentStorageS3Options } + +/** + * Docker mount descriptor used by the Docker C2D engine. + * Mirrors Dockerode `HostConfig.Mounts[]` item shape. + */ +export interface DockerMountObject { + Type: 'bind' + Source: string + Target: string + ReadOnly: boolean +} + +export interface PersistentStorageObject extends BaseFileObject { + type: 'nodePersistentStorage' + bucketId: string + fileName: string +} diff --git a/src/@types/commands.ts b/src/@types/commands.ts index 58d81fbea..999ddb1b1 100644 --- a/src/@types/commands.ts +++ b/src/@types/commands.ts @@ -348,6 +348,14 @@ export interface PersistentStorageUploadFileCommand extends Command { fileName: string } +export interface PersistentStorageGetFileObjectCommand extends Command { + consumerAddress: string + signature: string + nonce: string + bucketId: string + fileName: string +} + export interface PersistentStorageDeleteFileCommand extends Command { consumerAddress: string signature: string diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 91070735a..3c1c4075e 100755 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -1820,6 +1820,64 @@ export class C2DEngineDocker extends C2DEngine { } containerInfo.Env = envVars } + // persistent Storage: bind-mount bucket files into the job container (localfs backend) + for (const i in job.assets) { + const asset = job.assets[i] + if (!asset.fileObject || asset.fileObject.type !== 'nodePersistentStorage') { + continue + } + const fo = asset.fileObject as { bucketId?: string; fileName?: string } + if (!fo.bucketId || !fo.fileName) { + CORE_LOGGER.error( + `Job ${job.jobId} asset ${i}: nodePersistentStorage requires bucketId and fileName` + ) + job.status = C2DStatusNumber.DataProvisioningFailed + job.statusText = C2DStatusText.DataProvisioningFailed + job.isRunning = false + job.dateFinished = String(Date.now() / 1000) + await this.db.updateJob(job) + await this.cleanupJob(job) + return + } + const ps = OceanNode.getInstance().getPersistentStorage() + if (!ps) { + CORE_LOGGER.error( + `Job ${job.jobId} asset ${i}: persistent storage is not configured on this node` + ) + job.status = C2DStatusNumber.DataProvisioningFailed + job.statusText = C2DStatusText.DataProvisioningFailed + job.isRunning = false + job.dateFinished = String(Date.now() / 1000) + await this.db.updateJob(job) + await this.cleanupJob(job) + return + } + try { + const bindMount = await ps.getDockerMountObject( + fo.bucketId, + fo.fileName, + job.owner + ) + CORE_LOGGER.debug( + `Mounting bucket ${fo.bucketId} to folder ${bindMount.Target}` + ) + hostConfig.Mounts.push(bindMount) + mountVols[bindMount.Target] = {} + } catch (e) { + const errMsg = e instanceof Error ? e.message : String(e) + CORE_LOGGER.error( + `Job ${job.jobId} asset ${i}: failed to resolve persistent storage bind: ${errMsg}` + ) + job.status = C2DStatusNumber.DataProvisioningFailed + job.statusText = C2DStatusText.DataProvisioningFailed + job.isRunning = false + job.dateFinished = String(Date.now() / 1000) + await this.db.updateJob(job) + await this.cleanupJob(job) + return + } + } + const container = await this.createDockerContainer(containerInfo, true) if (container) { job.status = C2DStatusNumber.Provisioning @@ -2767,6 +2825,10 @@ export class C2DEngineDocker extends C2DEngine { if (asset.fileObject) { try { if (asset.fileObject.type) { + if (asset.fileObject.type === 'nodePersistentStorage') { + // local storage is handled later, when we start the container and create the binds + continue + } storage = Storage.getStorageClass(asset.fileObject, config) } else { CORE_LOGGER.info('asset file object seems to be encrypted, checking it...') diff --git a/src/components/core/compute/initialize.ts b/src/components/core/compute/initialize.ts index 6b81bf8ff..73603c8a8 100644 --- a/src/components/core/compute/initialize.ts +++ b/src/components/core/compute/initialize.ts @@ -39,6 +39,10 @@ import { validateAlgoForDataset, validateOutput } from './utils.js' +import { + ensureConsumerAllowedForPersistentStorageLocalfsFileObject, + rejectPersistentStorageFileObjectOnAlgorithm +} from '../../persistentStorage/PersistentStorageFactory.js' export class ComputeInitializeHandler extends CommandHandler { validate(command: ComputeInitializeCommand): ValidateParams { @@ -220,6 +224,22 @@ export class ComputeInitializeHandler extends CommandHandler { if (isValidOutput.status.httpStatus !== 200) { return isValidOutput } + const algoPersistentStorageBan = rejectPersistentStorageFileObjectOnAlgorithm( + task.algorithm.fileObject + ) + if (algoPersistentStorageBan) { + return algoPersistentStorageBan + } + for (const dataset of task.datasets) { + const psAccess = await ensureConsumerAllowedForPersistentStorageLocalfsFileObject( + node, + task.consumerAddress, + dataset.fileObject + ) + if (psAccess) { + return psAccess + } + } // check algo let index = 0 const policyServer = new PolicyServer() diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index 0d42608d6..cb56b9454 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -43,6 +43,10 @@ import { getNonceAsNumber } from '../utils/nonceHandler.js' import { PolicyServer } from '../../policyServer/index.js' import { checkCredentials } from '../../../utils/credentials.js' import { checkAddressOnAccessList } from '../../../utils/accessList.js' +import { + ensureConsumerAllowedForPersistentStorageLocalfsFileObject, + rejectPersistentStorageFileObjectOnAlgorithm +} from '../../persistentStorage/PersistentStorageFactory.js' export class CommonComputeHandler extends CommandHandler { validate(command: PaidComputeStartCommand): ValidateParams { @@ -225,7 +229,23 @@ export class PaidComputeStartHandler extends CommonComputeHandler { } } const policyServer = new PolicyServer() - // check algo + const algoPersistentStorageBan = rejectPersistentStorageFileObjectOnAlgorithm( + task.algorithm.fileObject + ) + if (algoPersistentStorageBan) { + return algoPersistentStorageBan + } + for (const dataset of task.datasets) { + const psAccess = await ensureConsumerAllowedForPersistentStorageLocalfsFileObject( + node, + task.consumerAddress, + dataset.fileObject + ) + if (psAccess) { + return psAccess + } + } + // check algo and datasets (orders, credentials, etc.) for (const elem of [...[task.algorithm], ...task.datasets]) { const result: any = { validOrder: false } if ('documentId' in elem && elem.documentId) { @@ -747,6 +767,22 @@ export class FreeComputeStartHandler extends CommonComputeHandler { return isValidOutput } const policyServer = new PolicyServer() + const algoPersistentStorageBanFree = rejectPersistentStorageFileObjectOnAlgorithm( + task.algorithm.fileObject + ) + if (algoPersistentStorageBanFree) { + return algoPersistentStorageBanFree + } + for (const dataset of task.datasets) { + const psAccess = await ensureConsumerAllowedForPersistentStorageLocalfsFileObject( + thisNode, + task.consumerAddress, + dataset.fileObject + ) + if (psAccess) { + return psAccess + } + } for (const elem of [...[task.algorithm], ...task.datasets]) { if (!('documentId' in elem)) { continue diff --git a/src/components/core/handler/coreHandlersRegistry.ts b/src/components/core/handler/coreHandlersRegistry.ts index 985282192..531f7f1a9 100644 --- a/src/components/core/handler/coreHandlersRegistry.ts +++ b/src/components/core/handler/coreHandlersRegistry.ts @@ -51,6 +51,7 @@ import { PersistentStorageCreateBucketHandler, PersistentStorageDeleteFileHandler, PersistentStorageGetBucketsHandler, + PersistentStorageGetFileObjectHandler, PersistentStorageListFilesHandler, PersistentStorageUploadFileHandler } from './persistentStorage.js' @@ -190,6 +191,10 @@ export class CoreHandlersRegistry { PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, new PersistentStorageUploadFileHandler(node) ) + this.registerCoreHandler( + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_FILE_OBJECT, + new PersistentStorageGetFileObjectHandler(node) + ) this.registerCoreHandler( PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE, new PersistentStorageDeleteFileHandler(node) diff --git a/src/components/core/handler/persistentStorage.ts b/src/components/core/handler/persistentStorage.ts index b94ca529a..e4394f3d8 100644 --- a/src/components/core/handler/persistentStorage.ts +++ b/src/components/core/handler/persistentStorage.ts @@ -3,6 +3,7 @@ import type { PersistentStorageCreateBucketCommand, PersistentStorageDeleteFileCommand, PersistentStorageGetBucketsCommand, + PersistentStorageGetFileObjectCommand, PersistentStorageListFilesCommand, PersistentStorageUploadFileCommand } from '../../../@types/commands.js' @@ -220,6 +221,60 @@ export class PersistentStorageListFilesHandler extends CommandHandler { } } +export class PersistentStorageGetFileObjectHandler extends CommandHandler { + validate(command: PersistentStorageGetFileObjectCommand): ValidateParams { + const base = validateCommandParameters(command, [ + 'consumerAddress', + 'signature', + 'nonce', + 'bucketId', + 'fileName' + ]) + if (!base.valid) return base + return { valid: true } + } + + async handle(task: PersistentStorageGetFileObjectCommand): Promise { + const validationResponse = await this.verifyParamsAndRateLimits(task) + if (this.shouldDenyTaskHandling(validationResponse)) return validationResponse + + const isAuthRequestValid = await this.validateTokenOrSignature( + task.authorization, + task.consumerAddress, + task.nonce, + task.signature, + task.command + ) + if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid + + try { + const storage = await requirePersistentStorage(this) + const obj = await storage.getFileObject( + task.bucketId, + task.fileName, + task.consumerAddress + ) + return { + stream: Readable.from(JSON.stringify(obj)), + status: { httpStatus: 200, error: null } + } + } catch (e) { + if (e instanceof PersistentStorageAccessDeniedError) { + return { + stream: null, + status: { httpStatus: 403, error: e.message } + } + } + const message = e instanceof Error ? e.message : String(e) + if (message.toLowerCase().includes('file not found')) { + return { stream: null, status: { httpStatus: 404, error: message } } + } + CORE_LOGGER.error(`PersistentStorageGetFileObjectHandler error: ${message}`) + return { stream: null, status: { httpStatus: 500, error: message } } + } + } +} + export class PersistentStorageUploadFileHandler extends CommandHandler { validate(command: PersistentStorageUploadFileCommand): ValidateParams { const base = validateCommandParameters(command, [ diff --git a/src/components/httpRoutes/persistentStorage.ts b/src/components/httpRoutes/persistentStorage.ts index db7e977d9..574fe5e7d 100644 --- a/src/components/httpRoutes/persistentStorage.ts +++ b/src/components/httpRoutes/persistentStorage.ts @@ -9,6 +9,7 @@ import { PersistentStorageCreateBucketHandler, PersistentStorageDeleteFileHandler, PersistentStorageGetBucketsHandler, + PersistentStorageGetFileObjectHandler, PersistentStorageListFilesHandler, PersistentStorageUploadFileHandler } from '../core/handler/persistentStorage.js' @@ -108,6 +109,36 @@ persistentStorageRoutes.get( } ) +// Get file object for a file in a bucket +persistentStorageRoutes.get( + `${SERVICES_API_BASE_PATH}/persistentStorage/buckets/:bucketId/files/:fileName/object`, + async (req, res) => { + try { + const response = await new PersistentStorageGetFileObjectHandler( + req.oceanNode + ).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_FILE_OBJECT, + consumerAddress: req.query.consumerAddress as string, + signature: req.query.signature as string, + nonce: req.query.nonce as string, + bucketId: req.params.bucketId, + fileName: req.params.fileName, + authorization: req.headers?.authorization, + caller: req.caller + } as any) + if (!response.stream) { + res.status(response.status.httpStatus).send(response.status.error) + return + } + const payload = await streamToObject(response.stream as Readable) + res.status(200).json(payload) + } catch (error) { + HTTP_LOGGER.error(`PersistentStorage get file object error: ${error}`) + res.status(500).send('Internal Server Error') + } + } +) + // Upload file to bucket. Body is treated as raw bytes. persistentStorageRoutes.post( `${SERVICES_API_BASE_PATH}/persistentStorage/buckets/:bucketId/files/:fileName`, diff --git a/src/components/persistentStorage/PersistentStorageFactory.ts b/src/components/persistentStorage/PersistentStorageFactory.ts index 6ceee168d..8b3599792 100644 --- a/src/components/persistentStorage/PersistentStorageFactory.ts +++ b/src/components/persistentStorage/PersistentStorageFactory.ts @@ -1,5 +1,10 @@ +import { P2PCommandResponse } from '../../@types/index.js' import type { AccessList } from '../../@types/AccessList.js' -import type { BaseFileObject } from '../../@types/fileObject.js' +import type { + DockerMountObject, + PersistentStorageObject +} from '../../@types/PersistentStorage.js' + import sqlite3, { RunResult } from 'sqlite3' import path from 'path' import fs from 'fs' @@ -72,6 +77,19 @@ export abstract class PersistentStorageFactory { this.db = new sqlite3.Database(dbDir + 'persistentStorage.sqlite') } + /** + * Validate a bucket id. Today localfs uses UUIDs, so enforce UUIDv4. + * This is a security boundary because bucketId participates in filesystem paths. + */ + public validateBucket(bucketId: string): void { + // UUID v4: xxxxxxxx-xxxx-4xxx-[89ab]xxx-xxxxxxxxxxxx + const uuidV4 = + /^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i + if (typeof bucketId !== 'string' || !uuidV4.test(bucketId)) { + throw new Error('Invalid bucketId') + } + } + public abstract createNewBucket( accessList: AccessList[], owner: string @@ -103,7 +121,17 @@ export abstract class PersistentStorageFactory { bucketId: string, fileName: string, consumerAddress: string - ): Promise + ): Promise + + /** + * Returns a Docker mount descriptor for a specific bucket file. + * This is used by the Docker C2D engine to mount the file into the job container. + */ + public abstract getDockerMountObject( + bucketId: string, + fileName: string, + consumerAddress?: string + ): Promise // common functions async getBucketAccessList(bucketId: string): Promise { @@ -216,11 +244,14 @@ export abstract class PersistentStorageFactory { } /** Throws {@link PersistentStorageAccessDeniedError} if the consumer is not on the bucket access list. */ - protected async assertConsumerAllowedForBucket( + public async assertConsumerAllowedForBucket( consumerAddress: string, bucketId: string ): Promise { const bucket = await this.getBucket(bucketId) + if (!bucket) { + throw new PersistentStorageAccessDeniedError() + } const accessLists = parseBucketAccessListsJson(bucket.accessListJson) if (normalizeWeb3Address(consumerAddress) === normalizeWeb3Address(bucket.owner)) { return @@ -230,3 +261,88 @@ export abstract class PersistentStorageFactory { } } } + +/** + * Algorithms must not reference node persistent storage; only datasets may use + * `nodePersistentStorage` / `localfs` file objects. + */ +export function rejectPersistentStorageFileObjectOnAlgorithm( + fileObject: unknown +): P2PCommandResponse | null { + if (fileObject === null || fileObject === undefined || typeof fileObject !== 'object') { + return null + } + const fo = fileObject as { type?: string } + if (fo.type === 'nodePersistentStorage' || fo.type === 'localfs') { + return { + stream: null, + status: { + httpStatus: 400, + error: + 'Algorithms cannot use node persistent storage file objects; only datasets may reference persistent storage.' + } + } + } + return null +} + +/** + * When a compute dataset uses a node persistent-storage file (localfs backend), + * ensure the consumer is on the bucket ACL before proceeding. + */ +export async function ensureConsumerAllowedForPersistentStorageLocalfsFileObject( + node: OceanNode, + consumerAddress: string, + fileObject: unknown +): Promise { + if (fileObject === null || fileObject === undefined || typeof fileObject !== 'object') { + return null + } + const fo = fileObject as { type?: string; bucketId?: unknown } + if (fo.type !== 'nodePersistentStorage') { + return null + } + if (typeof fo.bucketId !== 'string' || fo.bucketId.length === 0) { + return { + stream: null, + status: { + httpStatus: 400, + error: 'Persistent storage file object is missing a valid bucketId' + } + } + } + const cfg = node.getConfig().persistentStorage + if (!cfg?.enabled || cfg.type !== 'localfs') { + return { + stream: null, + status: { + httpStatus: 400, + error: + 'This compute job references node persistent storage (localfs), which is not enabled or not configured as localfs on this node' + } + } + } + const storage = node.getPersistentStorage() + if (!storage) { + return { + stream: null, + status: { + httpStatus: 400, + error: + 'This compute job references node persistent storage but persistent storage is not available on this node' + } + } + } + try { + await storage.assertConsumerAllowedForBucket(consumerAddress, fo.bucketId) + } catch (e) { + if (e instanceof PersistentStorageAccessDeniedError) { + return { + stream: null, + status: { httpStatus: 403, error: e.message } + } + } + throw e + } + return null +} diff --git a/src/components/persistentStorage/PersistentStorageLocalFS.ts b/src/components/persistentStorage/PersistentStorageLocalFS.ts index fa02c34f7..e37f9be51 100644 --- a/src/components/persistentStorage/PersistentStorageLocalFS.ts +++ b/src/components/persistentStorage/PersistentStorageLocalFS.ts @@ -5,8 +5,11 @@ import { pipeline } from 'stream/promises' import { randomUUID } from 'crypto' import type { AccessList } from '../../@types/AccessList.js' -import type { PersistentStorageLocalFSOptions } from '../../@types/PersistentStorage.js' -import type { BaseFileObject } from '../../@types/fileObject.js' +import type { + DockerMountObject, + PersistentStorageLocalFSOptions, + PersistentStorageObject +} from '../../@types/PersistentStorage.js' import { CreateBucketResult, @@ -16,13 +19,8 @@ import { } from './PersistentStorageFactory.js' import { OceanNode } from '../../OceanNode.js' -type LocalFileObject = BaseFileObject & { - type: 'localfs' - bucketId: string - fileName: string -} - export class PersistentStorageLocalFS extends PersistentStorageFactory { + /* eslint-disable security/detect-non-literal-fs-filename -- localfs backend operates on filesystem paths */ private baseFolder: string constructor(node: OceanNode) { @@ -43,12 +41,36 @@ export class PersistentStorageLocalFS extends PersistentStorageFactory { } private async ensureBucketExists(bucketId: string): Promise { + this.validateBucket(bucketId) + const bucketsRoot = path.resolve(this.baseFolder, 'buckets') + const resolvedBucketPath = path.resolve(this.bucketPath(bucketId)) + if ( + resolvedBucketPath !== bucketsRoot && + !resolvedBucketPath.startsWith(bucketsRoot + path.sep) + ) { + throw new Error('Invalid bucketId') + } const row = await this.dbGetBucket(bucketId) if (!row) { throw new Error(`Bucket not found: ${bucketId}`) } } + private async ensureFileExists(bucketId: string, fileName: string): Promise { + if (!fileName || fileName.includes('/') || fileName.includes('\\')) { + throw new Error('Invalid fileName') + } + const targetPath = path.join(this.bucketPath(bucketId), fileName) + try { + const st = await fsp.stat(targetPath) + if (!st.isFile()) { + throw new Error(`File not found: ${fileName}`) + } + } catch { + throw new Error(`File not found: ${fileName}`) + } + } + async listBuckets(owner: string): Promise { await this.init() return super.listBuckets(owner) @@ -137,26 +159,52 @@ export class PersistentStorageLocalFS extends PersistentStorageFactory { await this.init() await this.ensureBucketExists(bucketId) await this.assertConsumerAllowedForBucket(consumerAddress, bucketId) + await this.ensureFileExists(bucketId, fileName) const targetPath = path.join(this.bucketPath(bucketId), fileName) - await fsp.rm(targetPath, { force: true }) + await fsp.rm(targetPath) } async getFileObject( bucketId: string, fileName: string, consumerAddress: string - ): Promise { + ): Promise { await this.init() await this.ensureBucketExists(bucketId) await this.assertConsumerAllowedForBucket(consumerAddress, bucketId) + await this.ensureFileExists(bucketId, fileName) // This is intentionally not a downloadable URL; compute backends can interpret this object. - const obj: LocalFileObject = { - type: 'localfs', + const obj: PersistentStorageObject = { + type: 'nodePersistentStorage', bucketId, fileName } return obj } + + async getDockerMountObject( + bucketId: string, + fileName: string, + consumerAddress?: string + ): Promise { + await this.init() + await this.ensureBucketExists(bucketId) + if (consumerAddress) { + await this.assertConsumerAllowedForBucket(consumerAddress, bucketId) + } + await this.ensureFileExists(bucketId, fileName) + + const source = path.join(this.bucketPath(bucketId), fileName) + const target = path.posix.join('/data', 'persistentStorage', bucketId, fileName) + + return { + Type: 'bind', + Source: source, + Target: target, + ReadOnly: true + } + } } +/* eslint-enable security/detect-non-literal-fs-filename */ diff --git a/src/components/persistentStorage/PersistentStorageS3.ts b/src/components/persistentStorage/PersistentStorageS3.ts index 4be68c9dd..bd4cac5ee 100644 --- a/src/components/persistentStorage/PersistentStorageS3.ts +++ b/src/components/persistentStorage/PersistentStorageS3.ts @@ -6,8 +6,11 @@ import { } from './PersistentStorageFactory.js' import type { AccessList } from '../../@types/AccessList.js' -import type { PersistentStorageS3Options } from '../../@types/PersistentStorage.js' -import type { BaseFileObject } from '../../@types/fileObject.js' +import type { + DockerMountObject, + PersistentStorageS3Options, + PersistentStorageObject +} from '../../@types/PersistentStorage.js' import { OceanNode } from '../../OceanNode.js' export class PersistentStorageS3 extends PersistentStorageFactory { @@ -68,7 +71,16 @@ export class PersistentStorageS3 extends PersistentStorageFactory { _bucketId: string, _fileName: string, _consumerAddress: string - ): Promise { + ): Promise { + throw new Error('PersistentStorageS3 is not implemented yet') + } + + // eslint-disable-next-line require-await + async getDockerMountObject( + _bucketId: string, + _fileName: string, + _consumerAddress?: string + ): Promise { throw new Error('PersistentStorageS3 is not implemented yet') } } diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index 0f9c66d06..cb74eb97f 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -63,7 +63,7 @@ import { } from '../utils/utils.js' import { ProviderFees, ProviderComputeInitializeResults } from '../../@types/Fees.js' -import { homedir } from 'os' +import { homedir, tmpdir } from 'os' import { publishAlgoDDO, publishDatasetDDO } from '../data/ddo.js' import { DEVELOPMENT_CHAIN_ID, getOceanArtifactsAdresses } from '../../utils/address.js' import ERC721Factory from '@oceanprotocol/contracts/artifacts/contracts/ERC721Factory.sol/ERC721Factory.json' with { type: 'json' } @@ -83,6 +83,15 @@ import Dockerode from 'dockerode' import { C2DEngineDocker } from '../../components/c2d/compute_engine_docker.js' import { createHashForSignature, safeSign } from '../utils/signature.js' import { create256Hash } from '../../utils/crypt.js' +import fsp from 'fs/promises' +import path from 'path' +import { existsSync } from 'fs' +import { + PersistentStorageCreateBucketHandler, + PersistentStorageUploadFileHandler +} from '../../components/core/handler/persistentStorage.js' +import { deployAndGetAccessListConfig } from '../utils/contracts.js' +import * as tar from 'tar' /** * Polls getComputeEnvironments until every environment's resources (and free.resources) @@ -2213,6 +2222,383 @@ describe('Compute', () => { ) }) + describe('Compute with persistent storage (localfs)', function () { + this.timeout(DEFAULT_TEST_TIMEOUT * 4) + + let psRoot: string + let psDockerEngine: C2DEngineDocker | undefined + let psSuiteActive = false + + const jobReachedSuccessfulTerminalStatus = (status: number) => + status === C2DStatusNumber.JobFinished || status === C2DStatusNumber.JobSettle + + const waitForComputeJobFinished = async ( + node: OceanNode, + fullJobId: string, + timeoutMs: number + ) => { + const deadline = Date.now() + timeoutMs + while (Date.now() < deadline) { + const r = await new ComputeGetStatusHandler(node).handle({ + command: PROTOCOL_COMMANDS.COMPUTE_GET_STATUS, + consumerAddress: null, + agreementId: null, + jobId: fullJobId + }) + assert.equal(r.status.httpStatus, 200) + const jobs = await streamToObject(r.stream as Readable) + const j = jobs[0] + if (!j) { + await sleep(2000) + continue + } + if (jobReachedSuccessfulTerminalStatus(j.status)) { + return j + } + if (j.dateFinished && !jobReachedSuccessfulTerminalStatus(j.status)) { + assert.fail( + `Job ended with status ${j.status} (${j.statusText}) instead of JobFinished or JobSettle` + ) + } + await sleep(3000) + } + assert.fail( + `Job ${fullJobId} did not reach JobFinished or JobSettle within ${timeoutMs}ms` + ) + } + + before(async function () { + try { + const d = new Dockerode() + await d.info() + } catch { + this.skip() + } + + psRoot = await fsp.mkdtemp(path.join(tmpdir(), 'ocean-compute-ps-')) + const bucketAllowList = await deployAndGetAccessListConfig( + publisherAccount, + provider, + [ + publisherAccount, + consumerAccount, + (await provider.getSigner(2)) as Signer, + (await provider.getSigner(3)) as Signer + ] + ) + assert(bucketAllowList, 'access list deploy failed for persistent storage') + + const cfg = await getConfiguration(true) + cfg.persistentStorage = { + enabled: true, + type: 'localfs', + accessLists: [bucketAllowList], + options: { folder: psRoot } + } + + const enginesOld = oceanNode.getC2DEngines() + if (enginesOld) await enginesOld.stopAllEngines() + const km = oceanNode.getKeyManager() + const br = oceanNode.blockchainRegistry + oceanNode = OceanNode.getInstance(cfg, dbconn, null, null, indexer, km, br, true) + oceanNode.addIndexer(indexer) + await oceanNode.addC2DEngines() + + const c2dEngines = oceanNode.getC2DEngines() + const engines = (c2dEngines as any).engines as C2DEngineDocker[] + psDockerEngine = engines.find((e) => e instanceof C2DEngineDocker) + if (!psDockerEngine) { + this.skip() + } + + await waitForAllJobsToFinish(oceanNode) + psSuiteActive = true + }) + + after(async () => { + if (!psSuiteActive) return + try { + const enginesOld = oceanNode.getC2DEngines() + if (enginesOld) await enginesOld.stopAllEngines() + const cfg = await getConfiguration(true) + cfg.persistentStorage = { + enabled: false, + type: 'localfs', + accessLists: [], + options: { folder: '/tmp' } + } + const km = oceanNode.getKeyManager() + const br = oceanNode.blockchainRegistry + oceanNode = OceanNode.getInstance(cfg, dbconn, null, null, indexer, km, br, true) + oceanNode.addIndexer(indexer) + await oceanNode.addC2DEngines() + } catch (e) { + console.error('Compute persistent-storage suite teardown failed:', e) + } + }) + + it('happy path: bind-mounted persistent storage file is readable inside the container', async function () { + const consumerAddress = await consumerAccount.getAddress() + let nonce = Date.now().toString() + let messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + let signature = await safeSign(consumerAccount, messageHashBytes) + const createRes = await new PersistentStorageCreateBucketHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress, + signature, + nonce, + accessLists: [], + authorization: undefined + } as any) + assert.equal(createRes.status.httpStatus, 200) + const created = await streamToObject(createRes.stream as Readable) + const bucketId = created.bucketId as string + + const fileName = 'ps-data.txt' + const secret = 'PS_COMPUTE_INTEGRATION_OK\n' + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE + ) + signature = await safeSign(consumerAccount, messageHashBytes) + const uploadRes = await new PersistentStorageUploadFileHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, + consumerAddress, + signature, + nonce, + bucketId, + fileName, + stream: Readable.from(Buffer.from(secret)) + } as any) + assert.equal(uploadRes.status.httpStatus, 200) + + const rawcode = [ + "const fs = require('fs');", + `const p = '/data/persistentStorage/${bucketId}/${fileName}';`, + "const out = '/data/outputs/ps-result.txt';", + "fs.mkdirSync('/data/outputs', { recursive: true });", + "const c = fs.readFileSync(p, 'utf8');", + "fs.writeFileSync(out, c, 'utf8');" + ].join('\n') + + const algoMeta = publishedAlgoDataset.ddo.metadata.algorithm + + const initResp = await new ComputeInitializeHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.COMPUTE_INITIALIZE, + consumerAddress, + datasets: [ + { + fileObject: { + type: 'nodePersistentStorage', + bucketId, + fileName + } as any + } + ], + algorithm: { + meta: { + ...algoMeta, + rawcode + } + }, + environment: firstEnv.id, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: 60 + } as any) + assert.equal(initResp.status.httpStatus, 200, String(initResp.status.error)) + + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.FREE_COMPUTE_START + ) + signature = await safeSign(consumerAccount, messageHashBytes) + + const startTask: FreeComputeStartCommand = { + command: PROTOCOL_COMMANDS.FREE_COMPUTE_START, + consumerAddress, + signature, + nonce, + environment: firstEnv.id, + queueMaxWaitTime: 0, + datasets: [ + { + fileObject: { + type: 'nodePersistentStorage', + bucketId, + fileName + } as any + } + ], + algorithm: { + meta: { + ...algoMeta, + rawcode + } + }, + output: null + } + + const startRes = await new FreeComputeStartHandler(oceanNode).handle(startTask) + assert.equal(startRes.status.httpStatus, 200, String(startRes.status.error)) + const started = await streamToObject(startRes.stream as Readable) + const fullJobId = started[0].jobId as string + const innerJobId = fullJobId.slice(fullJobId.indexOf('-') + 1) + + await waitForComputeJobFinished(oceanNode, fullJobId, 180_000) + + const base = (psDockerEngine as any).getStoragePath() as string + const outputsTarPath = path.join(base, innerJobId, 'data/outputs/outputs.tar') + /* eslint-disable security/detect-non-literal-fs-filename -- job paths from C2D engine */ + assert( + existsSync(outputsTarPath), + `expected outputs archive at ${outputsTarPath} (algorithm should write into /data/outputs before tar)` + ) + const extractDir = await fsp.mkdtemp(path.join(tmpdir(), 'ocean-ps-tar-')) + try { + await tar.x( + { + file: outputsTarPath, + cwd: extractDir + }, + ['outputs/ps-result.txt'] + ) + const extractedFile = path.join(extractDir, 'outputs/ps-result.txt') + assert( + existsSync(extractedFile), + 'expected outputs/ps-result.txt inside outputs.tar' + ) + const written = await fsp.readFile(extractedFile, 'utf8') + assert.equal(written, secret) + } finally { + await fsp.rm(extractDir, { recursive: true, force: true }) + } + /* eslint-enable security/detect-non-literal-fs-filename */ + }) + + it('denies free compute start when consumer is not on the bucket access list', async function () { + const ownerAddress = await consumerAccount.getAddress() + let nonce = Date.now().toString() + let messageHashBytes = createHashForSignature( + ownerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + let signature = await safeSign(consumerAccount, messageHashBytes) + const createRes = await new PersistentStorageCreateBucketHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress: ownerAddress, + signature, + nonce, + accessLists: [], + authorization: undefined + } as any) + assert.equal(createRes.status.httpStatus, 200) + const created = await streamToObject(createRes.stream as Readable) + const bucketId = created.bucketId as string + + const fileName = 'private.txt' + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + ownerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE + ) + signature = await safeSign(consumerAccount, messageHashBytes) + const uploadRes = await new PersistentStorageUploadFileHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, + consumerAddress: ownerAddress, + signature, + nonce, + bucketId, + fileName, + stream: Readable.from(Buffer.from('secret')) + } as any) + assert.equal(uploadRes.status.httpStatus, 200) + + const intruderAddress = await nonAllowedAccount.getAddress() + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + intruderAddress, + nonce, + PROTOCOL_COMMANDS.FREE_COMPUTE_START + ) + signature = await safeSign(nonAllowedAccount, messageHashBytes) + + const algoMeta = publishedAlgoDataset.ddo.metadata.algorithm + + const initResp = await new ComputeInitializeHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.COMPUTE_INITIALIZE, + consumerAddress: intruderAddress, + datasets: [ + { + fileObject: { + type: 'nodePersistentStorage', + bucketId, + fileName + } as any + } + ], + algorithm: { + meta: { + ...algoMeta, + rawcode: "console.log('noop');" + } + }, + environment: firstEnv.id, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: 60 + } as any) + assert.equal(initResp.status.httpStatus, 403, String(initResp.status.error)) + + const startTask: FreeComputeStartCommand = { + command: PROTOCOL_COMMANDS.FREE_COMPUTE_START, + consumerAddress: intruderAddress, + signature, + nonce, + environment: firstEnv.id, + queueMaxWaitTime: 0, + datasets: [ + { + fileObject: { + type: 'nodePersistentStorage', + bucketId, + fileName + } as any + } + ], + algorithm: { + meta: { + ...algoMeta, + rawcode: "console.log('noop');" + } + }, + output: null + } + + const startRes = await new FreeComputeStartHandler(oceanNode).handle(startTask) + assert.equal(startRes.status.httpStatus, 403, String(startRes.status.error)) + assert.include( + (startRes.status.error || '').toLowerCase(), + 'allow', + 'expected access-denied style message' + ) + }) + }) + after(async () => { await tearDownEnvironment(previousConfiguration) await indexer.stopAllChainIndexers() diff --git a/src/test/integration/persistentStorage.test.ts b/src/test/integration/persistentStorage.test.ts index f8ddf72a0..d9d74e9b0 100644 --- a/src/test/integration/persistentStorage.test.ts +++ b/src/test/integration/persistentStorage.test.ts @@ -10,6 +10,7 @@ import { PersistentStorageCreateBucketHandler, PersistentStorageDeleteFileHandler, PersistentStorageGetBucketsHandler, + PersistentStorageGetFileObjectHandler, PersistentStorageListFilesHandler, PersistentStorageUploadFileHandler } from '../../components/core/handler/persistentStorage.js' @@ -229,6 +230,72 @@ describe('Persistent storage handlers (integration)', function () { expect(listedAfter.some((f: { name: string }) => f.name === fileName)).to.equal(false) }) + it('getFileObject returns a file object for an allowed consumer', async () => { + const consumerAddress = await consumer.getAddress() + + let nonce = Date.now().toString() + let messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + let signature = await safeSign(consumer, messageHashBytes) + + const createRes = await new PersistentStorageCreateBucketHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress, + signature, + nonce, + accessLists: [], + authorization: undefined + } as any) + expect(createRes.status.httpStatus).to.equal(200) + const created = await streamToObject(createRes.stream as Readable) + const bucketId = created.bucketId as string + + const fileName = 'obj.txt' + const body = Buffer.from('file-object') + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE + ) + signature = await safeSign(consumer, messageHashBytes) + const uploadRes = await new PersistentStorageUploadFileHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, + consumerAddress, + signature, + nonce, + bucketId, + fileName, + stream: Readable.from(body) + } as any) + expect(uploadRes.status.httpStatus).to.equal(200) + + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_FILE_OBJECT + ) + signature = await safeSign(consumer, messageHashBytes) + const objRes = await new PersistentStorageGetFileObjectHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_FILE_OBJECT, + consumerAddress, + signature, + nonce, + bucketId, + fileName, + authorization: undefined + } as any) + expect(objRes.status.httpStatus).to.equal(200) + const obj = await streamToObject(objRes.stream as Readable) + expect(obj).to.be.an('object') + expect(obj.bucketId).to.equal(bucketId) + expect(obj.fileName).to.equal(fileName) + }) + it('should not create bucket when consumer is not on allow list', async () => { const forbiddenConsumerAddress = await forbiddenConsumer.getAddress() const nonce = Date.now().toString() @@ -318,6 +385,162 @@ describe('Persistent storage handlers (integration)', function () { expect(uploadRes.status.error).to.contain('not allowed') }) + it('getFileObject should fail for forbiddenConsumer when bucket has accessList', async () => { + // Create a bucket whose ACL allows only wallets[0..3] + const consumerAddress = await consumer.getAddress() + let nonce = Date.now().toString() + let messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + let signature = await safeSign(consumer, messageHashBytes) + + const createRes = await new PersistentStorageCreateBucketHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress, + signature, + nonce, + accessLists: [bucketAllowList], + authorization: undefined + } as any) + expect(createRes.status.httpStatus).to.equal(200) + const created = await streamToObject(createRes.stream as Readable) + const bucketId = created.bucketId as string + + const fileName = 'forbidden-obj.txt' + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE + ) + signature = await safeSign(consumer, messageHashBytes) + const uploadRes = await new PersistentStorageUploadFileHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, + consumerAddress, + signature, + nonce, + bucketId, + fileName, + stream: Readable.from(Buffer.from('secret')) + } as any) + expect(uploadRes.status.httpStatus).to.equal(200) + + const forbiddenConsumerAddress = await forbiddenConsumer.getAddress() + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + forbiddenConsumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_FILE_OBJECT + ) + signature = await safeSign(forbiddenConsumer, messageHashBytes) + + const objRes = await new PersistentStorageGetFileObjectHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_FILE_OBJECT, + consumerAddress: forbiddenConsumerAddress, + signature, + nonce, + bucketId, + fileName, + authorization: undefined + } as any) + + expect(objRes.status.httpStatus).to.equal(403) + expect(objRes.status.error).to.contain('not allowed') + }) + + it('getFileObject should fail when file does not exist', async () => { + const consumerAddress = await consumer.getAddress() + + let nonce = Date.now().toString() + let messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + let signature = await safeSign(consumer, messageHashBytes) + + const createRes = await new PersistentStorageCreateBucketHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress, + signature, + nonce, + accessLists: [], + authorization: undefined + } as any) + expect(createRes.status.httpStatus).to.equal(200) + const created = await streamToObject(createRes.stream as Readable) + const bucketId = created.bucketId as string + + const missingFileName = 'missing.txt' + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_FILE_OBJECT + ) + signature = await safeSign(consumer, messageHashBytes) + + const objRes = await new PersistentStorageGetFileObjectHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_FILE_OBJECT, + consumerAddress, + signature, + nonce, + bucketId, + fileName: missingFileName, + authorization: undefined + } as any) + expect(objRes.status.httpStatus).to.equal(404) + expect(objRes.status.error?.toLowerCase()).to.contain('file not found') + }) + + it('deleteFile should fail when file does not exist', async () => { + const consumerAddress = await consumer.getAddress() + + let nonce = Date.now().toString() + let messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + let signature = await safeSign(consumer, messageHashBytes) + + const createRes = await new PersistentStorageCreateBucketHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress, + signature, + nonce, + accessLists: [], + authorization: undefined + } as any) + expect(createRes.status.httpStatus).to.equal(200) + const created = await streamToObject(createRes.stream as Readable) + const bucketId = created.bucketId as string + + const missingFileName = 'missing-delete.txt' + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE + ) + signature = await safeSign(consumer, messageHashBytes) + + const delRes = await new PersistentStorageDeleteFileHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE, + consumerAddress, + signature, + nonce, + chainId: 8996, + bucketId, + fileName: missingFileName, + authorization: undefined + } as any) + expect(delRes.status.httpStatus).to.equal(500) + expect(delRes.status.error?.toLowerCase()).to.contain('file not found') + }) + it('getBuckets returns buckets the consumer can access', async () => { const consumerAddress = await consumer.getAddress() await sleep(1000) diff --git a/src/utils/constants.ts b/src/utils/constants.ts index 63fb34a4c..408878fea 100644 --- a/src/utils/constants.ts +++ b/src/utils/constants.ts @@ -43,6 +43,7 @@ export const PROTOCOL_COMMANDS = { PERSISTENT_STORAGE_GET_BUCKETS: 'persistentStorageGetBuckets', PERSISTENT_STORAGE_LIST_FILES: 'persistentStorageListFiles', PERSISTENT_STORAGE_UPLOAD_FILE: 'persistentStorageUploadFile', + PERSISTENT_STORAGE_GET_FILE_OBJECT: 'persistentStorageGetFileObject', PERSISTENT_STORAGE_DELETE_FILE: 'persistentStorageDeleteFile' } // more visible, keep then close to make sure we always update both @@ -88,6 +89,7 @@ export const SUPPORTED_PROTOCOL_COMMANDS: string[] = [ PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_FILE_OBJECT, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE ] From f1570911f47090a19f84f7136688ee4150766b80 Mon Sep 17 00:00:00 2001 From: alexcos20 Date: Thu, 9 Apr 2026 10:26:36 +0300 Subject: [PATCH 7/8] improve docs --- docs/persistentStorage.md | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/docs/persistentStorage.md b/docs/persistentStorage.md index 7f1144366..0b78c0f63 100644 --- a/docs/persistentStorage.md +++ b/docs/persistentStorage.md @@ -6,9 +6,10 @@ This document describes Ocean Node **Persistent Storage** at a high level: what ## What it is -Persistent Storage is a simple bucket + file store intended for **long-lived artifacts** that Ocean Node needs to keep across requests (and potentially across restarts), and to reference later (e.g. as file objects for compute). +Persistent Storage is a simple bucket + file store intended for **long-lived artifacts** that Ocean Node needs to keep across requests and across restarts, and to reference later (e.g. as file objects for compute). Key primitives: + - **Bucket**: a logical container for files. - **File**: binary content stored inside a bucket. - **Bucket registry**: a local SQLite table that stores bucket metadata (owner, access lists, createdAt). @@ -35,7 +36,8 @@ Key primitives: Persistent Storage uses two stores: -1) **Bucket registry (SQLite)** +1. **Bucket registry (SQLite)** + - File: `databases/persistentStorage.sqlite` - Table: `persistent_storage_buckets` - Columns: @@ -44,7 +46,8 @@ Persistent Storage uses two stores: - `accessListJson` (JSON-encoded access list array) - `createdAt` (unix timestamp) -2) **Backend data** +2. **Backend data** + - `localfs`: writes file bytes to the configured folder under `buckets//`. - `s3`: not implemented yet. @@ -75,11 +78,13 @@ This access list is used to decide whether a given `consumerAddress` is allowed Access checks happen at two levels: -1) **Backend enforcement** (required) +1. **Backend enforcement** (required) + - Backend operations `listFiles`, `uploadFile`, `deleteFile`, and `getFileObject` all require `consumerAddress`. - The base class helper `assertConsumerAllowedForBucket(consumerAddress, bucketId)` loads the bucket ACL and throws `PersistentStorageAccessDeniedError` if the consumer is not allowed. -2) **Handler enforcement** (command-specific) +2. **Handler enforcement** (command-specific) + - `createBucket`: additionally checks the node-level allow list `config.persistentStorage.accessLists` (who can create buckets at all). - `getBuckets`: queries registry rows filtered by `owner` and then: - if `consumerAddress === owner`: returns all buckets for that owner @@ -115,6 +120,10 @@ Access checks happen at two levels: - Deletes the named file from the bucket. - Enforces bucket ACL. +- **getFileObject** + - Returns fileObject format for c2d use + - Enforces bucket ACL. + ### Not implemented yet - **S3 backend** @@ -127,6 +136,7 @@ Access checks happen at two levels: Persistent storage is controlled by `persistentStorage` in node config. Key fields: + - `enabled`: boolean - `type`: `"localfs"` or `"s3"` - `accessLists`: AccessList[] — node-level allow list to create buckets @@ -138,9 +148,18 @@ Key fields: ## Usage +Flow is: + +- create bucket (or use existing bucket) +- list files +- upload file if needed +- GetFileObject to get object needed for c2d reference +- start c2d job using fileObject for datasets + ### P2P commands All persistent storage operations are implemented as protocol commands in the handler: + - `persistentStorageCreateBucket` - `persistentStorageGetBuckets` - `persistentStorageListFiles` @@ -155,6 +174,7 @@ Each command requires authentication (token or signature) based on Ocean Node’ HTTP routes are available under `/api/services/persistentStorage/...` and call the same handlers. See `docs/API.md` for the full parameter lists and examples. At a glance: + - `POST /api/services/persistentStorage/buckets` - `GET /api/services/persistentStorage/buckets` - `GET /api/services/persistentStorage/buckets/:bucketId/files` @@ -171,4 +191,3 @@ Upload uses the raw request body as bytes and forwards it to the handler as a st - The bucket registry is local to the node (SQLite file). If you run multiple nodes, each node’s registry is independent unless you externalize/replicate it. - `listBuckets(owner)` requires `owner` and only returns buckets that were created with that owner recorded. - Filenames in `localfs` are constrained (no path separators) to avoid path traversal. - From e55ff185103cbab905bccb280405185699c343e9 Mon Sep 17 00:00:00 2001 From: alexcos20 Date: Thu, 9 Apr 2026 17:38:15 +0300 Subject: [PATCH 8/8] resolve comments --- .../core/handler/persistentStorage.ts | 16 ++- .../PersistentStorageFactory.ts | 118 +++++++++++------- .../PersistentStorageLocalFS.ts | 15 +-- 3 files changed, 82 insertions(+), 67 deletions(-) diff --git a/src/components/core/handler/persistentStorage.ts b/src/components/core/handler/persistentStorage.ts index e4394f3d8..0eea49171 100644 --- a/src/components/core/handler/persistentStorage.ts +++ b/src/components/core/handler/persistentStorage.ts @@ -23,9 +23,7 @@ import { } from '../../httpRoutes/validateCommands.js' import { CommandHandler } from './handler.js' -function requirePersistentStorage( - handler: CommandHandler -): Promise { +function requirePersistentStorage(handler: CommandHandler): PersistentStorageFactory { const node = handler.getOceanNode() as any if (!node.getPersistentStorage) { throw new Error('Persistent storage is not available on this node') @@ -68,7 +66,7 @@ export class PersistentStorageCreateBucketHandler extends CommandHandler { if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid try { - const storage = await requirePersistentStorage(this) + const storage = requirePersistentStorage(this) const node = this.getOceanNode() const config = node.getConfig() const isAllowedCreate = await checkAddressOnAccessList( @@ -156,7 +154,7 @@ export class PersistentStorageGetBucketsHandler extends CommandHandler { } try { - const storage = await requirePersistentStorage(this) + const storage = requirePersistentStorage(this) // const node = this.getOceanNode() const rows = await storage.listBuckets(ownerNormalized) @@ -201,7 +199,7 @@ export class PersistentStorageListFilesHandler extends CommandHandler { if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid try { - const storage = await requirePersistentStorage(this) + const storage = requirePersistentStorage(this) const result = await storage.listFiles(task.bucketId, task.consumerAddress) return { stream: Readable.from(JSON.stringify(result)), @@ -248,7 +246,7 @@ export class PersistentStorageGetFileObjectHandler extends CommandHandler { if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid try { - const storage = await requirePersistentStorage(this) + const storage = requirePersistentStorage(this) const obj = await storage.getFileObject( task.bucketId, task.fileName, @@ -302,7 +300,7 @@ export class PersistentStorageUploadFileHandler extends CommandHandler { if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid try { - const storage = await requirePersistentStorage(this) + const storage = requirePersistentStorage(this) if (!task.stream) { return { stream: null, @@ -361,7 +359,7 @@ export class PersistentStorageDeleteFileHandler extends CommandHandler { if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid try { - const storage = await requirePersistentStorage(this) + const storage = requirePersistentStorage(this) await storage.deleteFile(task.bucketId, task.fileName, task.consumerAddress) return { stream: Readable.from(JSON.stringify({ success: true })), diff --git a/src/components/persistentStorage/PersistentStorageFactory.ts b/src/components/persistentStorage/PersistentStorageFactory.ts index 8b3599792..d0ee58d5c 100644 --- a/src/components/persistentStorage/PersistentStorageFactory.ts +++ b/src/components/persistentStorage/PersistentStorageFactory.ts @@ -67,14 +67,45 @@ export type PersistentStorageBucketRecord = { export abstract class PersistentStorageFactory { private db: sqlite3.Database private node: OceanNode + private dbReady = false + private dbReadyPromise: Promise constructor(node: OceanNode) { this.node = node - const dbDir = path.dirname('databases/') + const dbDir = path.dirname('databases/persistentStorage.sqlite') if (!fs.existsSync(dbDir)) { fs.mkdirSync(dbDir, { recursive: true }) } - this.db = new sqlite3.Database(dbDir + 'persistentStorage.sqlite') + this.db = new sqlite3.Database('databases/persistentStorage.sqlite') + const createBucketsSQL = ` + CREATE TABLE IF NOT EXISTS persistent_storage_buckets ( + bucketId TEXT PRIMARY KEY, + owner TEXT NOT NULL, + accessListJson TEXT NOT NULL, + createdAt INTEGER NOT NULL + ); + ` + this.dbReadyPromise = new Promise((resolve, reject) => { + this.db.run(createBucketsSQL, (err) => { + if (err) { + reject(err) + return + } + this.dbReady = true + resolve() + }) + }) + } + + public isDbReady(): boolean { + return this.dbReady + } + + private async ensureDbReady(): Promise { + if (this.dbReady) { + return + } + await this.dbReadyPromise } /** @@ -135,7 +166,6 @@ export abstract class PersistentStorageFactory { // common functions async getBucketAccessList(bucketId: string): Promise { - await this.dbCreateTables() try { const row = await this.getBucket(bucketId) if (!row) { @@ -148,7 +178,6 @@ export abstract class PersistentStorageFactory { } async getBucket(bucketId: string): Promise { - await this.dbCreateTables() try { const row = await this.dbGetBucket(bucketId) return row @@ -163,7 +192,6 @@ export abstract class PersistentStorageFactory { * Backends that need setup (e.g. localfs init) should override and call `super.listBuckets(owner)`. */ async listBuckets(owner: string): Promise { - await this.dbCreateTables() const rows = await this.dbListBucketsByOwner(owner) return rows.map((row) => ({ bucketId: row.bucketId, @@ -173,22 +201,10 @@ export abstract class PersistentStorageFactory { })) } - dbCreateTables(): Promise { - const createBucketsSQL = ` - CREATE TABLE IF NOT EXISTS persistent_storage_buckets ( - bucketId TEXT PRIMARY KEY, - owner TEXT NOT NULL, - accessListJson TEXT NOT NULL, - createdAt INTEGER NOT NULL - ); - ` - return new Promise((resolve, reject) => { - this.db.run(createBucketsSQL, (err) => { - if (err) reject(err) - else resolve() - }) - }) - } + /* + * NOTE: db* methods are intentionally gated on ensureDbReady() to avoid races + * with constructor-time schema creation. + */ dbUpsertBucket( bucketId: string, @@ -201,42 +217,54 @@ export abstract class PersistentStorageFactory { VALUES (?, ?, ?, ?) ON CONFLICT(bucketId) DO UPDATE SET accessListJson=excluded.accessListJson; ` - return new Promise((resolve, reject) => { - this.db.run(sql, [bucketId, owner, accessListJson, createdAt], (err) => { - if (err) reject(err) - else resolve() - }) - }) + return this.ensureDbReady().then( + () => + new Promise((resolve, reject) => { + this.db.run(sql, [bucketId, owner, accessListJson, createdAt], (err) => { + if (err) reject(err) + else resolve() + }) + }) + ) } dbGetBucket(bucketId: string): Promise { const sql = `SELECT bucketId, owner, accessListJson, createdAt FROM persistent_storage_buckets WHERE bucketId = ?` - return new Promise((resolve, reject) => { - this.db.get(sql, [bucketId], (err, row: BucketRow | undefined) => { - if (err) reject(err) - else resolve(row ?? null) - }) - }) + return this.ensureDbReady().then( + () => + new Promise((resolve, reject) => { + this.db.get(sql, [bucketId], (err, row: BucketRow | undefined) => { + if (err) reject(err) + else resolve(row ?? null) + }) + }) + ) } dbListBucketsByOwner(owner: string): Promise { const sql = `SELECT bucketId, owner, accessListJson, createdAt FROM persistent_storage_buckets WHERE owner = ? ORDER BY createdAt ASC` - return new Promise((resolve, reject) => { - this.db.all(sql, [owner], (err, rows: BucketRow[]) => { - if (err) reject(err) - else resolve(rows ?? []) - }) - }) + return this.ensureDbReady().then( + () => + new Promise((resolve, reject) => { + this.db.all(sql, [owner], (err, rows: BucketRow[]) => { + if (err) reject(err) + else resolve(rows ?? []) + }) + }) + ) } dbDeleteBucket(bucketId: string): Promise { const sql = `DELETE FROM persistent_storage_buckets WHERE bucketId = ?` - return new Promise((resolve, reject) => { - this.db.run(sql, [bucketId], function (this: RunResult, err) { - if (err) reject(err) - else resolve(this.changes === 1) - }) - }) + return this.ensureDbReady().then( + () => + new Promise((resolve, reject) => { + this.db.run(sql, [bucketId], function (this: RunResult, err) { + if (err) reject(err) + else resolve(this.changes === 1) + }) + }) + ) } isAllowed(consumerAddress: string, accessLists: AccessList[]): Promise { diff --git a/src/components/persistentStorage/PersistentStorageLocalFS.ts b/src/components/persistentStorage/PersistentStorageLocalFS.ts index e37f9be51..76eee0dc2 100644 --- a/src/components/persistentStorage/PersistentStorageLocalFS.ts +++ b/src/components/persistentStorage/PersistentStorageLocalFS.ts @@ -29,11 +29,7 @@ export class PersistentStorageLocalFS extends PersistentStorageFactory { .options as PersistentStorageLocalFSOptions this.baseFolder = options.folder - } - - async init(): Promise { - await fsp.mkdir(this.baseFolder, { recursive: true }) - await super.dbCreateTables() + fsp.mkdir(this.baseFolder, { recursive: true }) } private bucketPath(bucketId: string): string { @@ -71,8 +67,8 @@ export class PersistentStorageLocalFS extends PersistentStorageFactory { } } + // eslint-disable-next-line require-await async listBuckets(owner: string): Promise { - await this.init() return super.listBuckets(owner) } @@ -80,8 +76,6 @@ export class PersistentStorageLocalFS extends PersistentStorageFactory { accessList: AccessList[], owner: string ): Promise { - await this.init() - const bucketId = randomUUID() const createdAt = Math.floor(Date.now() / 1000) await fsp.mkdir(this.bucketPath(bucketId), { recursive: true }) @@ -99,7 +93,6 @@ export class PersistentStorageLocalFS extends PersistentStorageFactory { bucketId: string, consumerAddress: string ): Promise { - await this.init() await this.ensureBucketExists(bucketId) await this.assertConsumerAllowedForBucket(consumerAddress, bucketId) @@ -128,7 +121,6 @@ export class PersistentStorageLocalFS extends PersistentStorageFactory { content: NodeJS.ReadableStream, consumerAddress: string ): Promise { - await this.init() await this.ensureBucketExists(bucketId) await this.assertConsumerAllowedForBucket(consumerAddress, bucketId) @@ -156,7 +148,6 @@ export class PersistentStorageLocalFS extends PersistentStorageFactory { fileName: string, consumerAddress: string ): Promise { - await this.init() await this.ensureBucketExists(bucketId) await this.assertConsumerAllowedForBucket(consumerAddress, bucketId) await this.ensureFileExists(bucketId, fileName) @@ -170,7 +161,6 @@ export class PersistentStorageLocalFS extends PersistentStorageFactory { fileName: string, consumerAddress: string ): Promise { - await this.init() await this.ensureBucketExists(bucketId) await this.assertConsumerAllowedForBucket(consumerAddress, bucketId) await this.ensureFileExists(bucketId, fileName) @@ -189,7 +179,6 @@ export class PersistentStorageLocalFS extends PersistentStorageFactory { fileName: string, consumerAddress?: string ): Promise { - await this.init() await this.ensureBucketExists(bucketId) if (consumerAddress) { await this.assertConsumerAllowedForBucket(consumerAddress, bucketId)