Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 177 additions & 19 deletions lib/routes/routeBackbeat.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const { constants: { HTTP_STATUS_CONFLICT } } = require('http2');
const url = require('url');
const async = require('async');
const httpProxy = require('http-proxy');
Expand All @@ -7,7 +8,13 @@ const joi = require('@hapi/joi');
const backbeatProxy = httpProxy.createProxyServer({
ignorePath: true,
});
const { auth, errors, errorInstances, s3middleware, s3routes, models, storage } = require('arsenal');
const { auth, errors, errorInstances, s3middleware, s3routes, models, storage, versioning } = require('arsenal');
const { decode, encode } = versioning.VersionID;
const {
VersionIdCollisionException,
StaleMicroVersionIdException,
MicroVersionIdAlreadyStoredException,
} = require('@scality/cloudserverclient');

const { responseJSONBody } = s3routes.routesUtils;
const { getSubPartIds } = s3middleware.azureHelper.mpuUtils;
Expand All @@ -21,6 +28,7 @@ const locationStorageCheck = require('../api/apiUtils/object/locationStorageChec
const { dataStore } = require('../api/apiUtils/object/storeObject');
const prepareRequestContexts = require('../api/apiUtils/authorization/prepareRequestContexts');
const { decodeVersionId } = require('../api/apiUtils/object/versioning');
const getReplicationInfo = require('../api/apiUtils/object/getReplicationInfo');
const locationKeysHaveChanged = require('../api/apiUtils/object/locationKeysHaveChanged');
const { standardMetadataValidateBucketAndObj, metadataGetObject } = require('../metadata/metadataUtils');
const { config } = require('../Config');
Expand All @@ -32,6 +40,7 @@ const {
} = require('../api/apiUtils/integrity/validateChecksums');
const { BackendInfo } = models;
const { pushReplicationMetric } = require('./utilities/pushReplicationMetric');
const writeContinue = require('../utilities/writeContinue');
const kms = require('../kms/wrapper');
const { listLifecycleCurrents } = require('../api/backbeat/listLifecycleCurrents');
const { listLifecycleNonCurrents } = require('../api/backbeat/listLifecycleNonCurrents');
Expand Down Expand Up @@ -93,7 +102,7 @@ function _isObjectRequest(req) {
return ['data', 'metadata', 'multiplebackenddata', 'multiplebackendmetadata'].includes(req.resourceType);
}

function _respondWithHeaders(response, payload, extraHeaders, log, callback) {
function _respondWithHeaders(response, payload, extraHeaders, log, callback, statusCode = 200) {
let body = '';
if (typeof payload === 'string') {
body = payload;
Expand All @@ -115,10 +124,10 @@ function _respondWithHeaders(response, payload, extraHeaders, log, callback) {
// eslint-disable-next-line no-param-reassign
response.serverAccessLog.endTurnAroundTime = process.hrtime.bigint();
}
response.writeHead(200, httpHeaders);
response.writeHead(statusCode, httpHeaders);
response.end(body, 'utf8', () => {
log.end().info('responded with payload', {
httpCode: 200,
httpCode: statusCode,
contentLength: Buffer.byteLength(body),
});
callback();
Expand All @@ -129,6 +138,15 @@ function _respond(response, payload, log, callback) {
_respondWithHeaders(response, payload, {}, log, callback);
}

function _respondWithHeaderCrrConflict(response, log, callback, code, message, mvId) {
return _respondWithHeaders(
response,
{ code, message },
{ 'x-scal-micro-version-id': mvId ? encode(mvId) : '' },
log, callback, HTTP_STATUS_CONFLICT,
);
}

function _getRequestPayload(req, cb) {
const payload = [];
let payloadLen = 0;
Expand Down Expand Up @@ -414,6 +432,38 @@ function putData(request, response, bucketInfo, objMd, log, callback) {
log.error(errMessage);
return callback(errorInstances.BadRequest.customizeDescription(errMessage));
}

const incomingVersionIdEncoded = request.headers['x-scal-version-id'];
if (incomingVersionIdEncoded) {

@francoisferrand francoisferrand Jul 3, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

header may be present but empty, for null versions

Suggested change
if (incomingVersionIdEncoded) {
if (incomingVersionIdEncoded != undefined) {

const incomingVersionIdDecoded = decode(incomingVersionIdEncoded);
if (incomingVersionIdDecoded instanceof Error) {
log.error('crr putData: failed to decode x-scal-version-id header', {
method: 'putData',
error: incomingVersionIdDecoded.message,
});
return callback(errorInstances.BadRequest.customizeDescription(
'bad request: invalid x-scal-version-id header'));
}
if (objMd && objMd.versionId === incomingVersionIdDecoded) {
// Data already at destination for this version; return 409 with the existing
// microVersionId so backbeat can decide if putMetadata is still needed.
log.debug('crr putData: version already at destination', {
method: 'putData',
bucketName: request.bucketName,
objectKey: request.objectKey,
hasMicroVersionId: !!objMd.microVersionId,
});
request.resume();
return _respondWithHeaderCrrConflict(
response, log, callback,
VersionIdCollisionException.name,
'version id already at destination',
objMd.microVersionId,
);
}
}

writeContinue(request, response);
const context = {
bucketName: request.bucketName,
owner: canonicalID,
Expand Down Expand Up @@ -539,6 +589,65 @@ function getCanonicalIdsByAccountId(accountId, log, cb) {
}

function putMetadata(request, response, bucketInfo, objMd, log, callback) {
const { bucketName, objectKey } = request;

const encodedMicroVersionId = request.headers['x-scal-micro-version-id'];
Comment thread
francoisferrand marked this conversation as resolved.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when the header is set, maybe we should have a sanity check (later) to verify the metadata contains the same microVersionId as the header

const isCascadeWrite = encodedMicroVersionId !== undefined;
let incomingMicroVersionId = null;
if (isCascadeWrite) {
// '' means source has no microVersionId, treated as older revision
incomingMicroVersionId = encodedMicroVersionId === '' ? null : decode(encodedMicroVersionId);
Comment on lines +595 to +599

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cloudserver DOES NOT know this is replication: this route is putMetadata, not write replicated metadata, and is used everywhere in backbeat (lifecycle, gc...)

The semantics here is not that of replication, it is a generic conditional-write : "when microVersionId header is set AND the object/version already exist, update metadata only if microVersoinId is newer than the one already stored"

Suggested change
const isCascadeWrite = encodedMicroVersionId !== undefined;
let incomingMicroVersionId = null;
if (isCascadeWrite) {
// '' means source has no microVersionId, treated as older revision
incomingMicroVersionId = encodedMicroVersionId === '' ? null : decode(encodedMicroVersionId);
if (encodedMicroVersionId !== undefined) {
const microVersionId = encodedMicroVersionId === '' ? null : decode(encodedMicroVersionId);

if (incomingMicroVersionId instanceof Error) {
log.error('crr putMetadata: failed to decode x-scal-micro-version-id header', {
method: 'putMetadata',
error: incomingMicroVersionId.message,
});
return callback(errorInstances.BadRequest.customizeDescription(
'bad request: invalid x-scal-micro-version-id header'));
}
// Loop/stale detection only applies when the object already exists at destination

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same: this is not loop/stale detection, but really microVersionId conditional update/conflict detection.

loop/stale is the feature of replication we build on top - in backbeat only, where the replication lives.

if (objMd) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to validate microVersionId if obj (i.e. the specified object/version document) was not found?

We could keep things simpler with if (encodedMicroVersionId !== undefined && obj) { at the beginning, what do you think?

// null = oldest revision (original putObject, before any metadata update).
// VersionID strings are in reverse chronological order: a lexicographically
// larger string is an older revision.
const isSourceOlderThanDestination = (sourceMicroVersionId, destinationMicroVersionId) =>
destinationMicroVersionId !== null &&
(sourceMicroVersionId === null || sourceMicroVersionId > destinationMicroVersionId);
Comment on lines +613 to +615

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this not handled already in arsenal's compareMicroVersionId function ?


// Treat '' and undefined as null (no microVersionId set).
const sourceMicroVersionId = incomingMicroVersionId || null;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • incomingMicroVersionId is already null if false, c.f. definition above: incomingMicroVersionId = encodedMicroVersionId === '' ? null : decode(encodedMicroVersionId); ?
  • do we really need a second variable?

const destinationMicroVersionId = objMd.microVersionId || null;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source and destination are a bit replication-specific : just microVersionId and objMicroVersionId variables should be enough?

if (sourceMicroVersionId === destinationMicroVersionId) {
log.debug('crr cascade putMetadata: loop detected, skipping write', {
method: 'putMetadata',
bucketName,
objectKey,
});
request.resume();
return _respondWithHeaderCrrConflict(
response, log, callback,
MicroVersionIdAlreadyStoredException.name,
'incoming microVersionId already at destination',
);
}
if (isSourceOlderThanDestination(sourceMicroVersionId, destinationMicroVersionId)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid this may not be enough : we are basically

  1. reading metadata from mongo (obj)
  2. comparing the microVersionId → abort if too old
  3. writing metadata

This is not atomic, so 2 such calls could race each-other, both pass the check 2 and write the metadata... To make it safe, we need to make a conditional write in mongodb somehow: i.e. let mongo do the microVersionId comparison (or implement a generic "racing writing" detection logic, probably out of scope here...)

log.debug('crr cascade putMetadata: stale event, rejecting', {
method: 'putMetadata',
bucketName,
objectKey,
});
request.resume();
return _respondWithHeaderCrrConflict(
response, log, callback,
StaleMicroVersionIdException.name,
'incoming revision is older than destination',
objMd?.microVersionId,
);
}
}
}

Comment thread
SylvainSenechal marked this conversation as resolved.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

putData correctly calls writeContinue(request, response) before consuming the body (line 466), but putMetadata doesn't call it before _getRequestPayload. The cloudserverclient upgrade from 1.0.7 to 1.0.9 adds @aws-sdk/middleware-expect-continue, so clients will now send Expect: 100-continue for metadata requests. Since the server registers a checkContinue listener (lib/server.js:296), Node.js won't auto-send 100 Continue — the client will stall until its expectContinueTimeout (default 1s) before sending the body, adding latency to every metadata replication.

Suggested change
writeContinue(request, response);

return _getRequestPayload(request, (err, payload) => {
if (err) {
return callback(err);
Expand All @@ -552,15 +661,15 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
return callback(errors.MalformedPOSTRequest);
}

const { headers, bucketName, objectKey } = request;
const { headers } = request;

// Destination-side delete-marker replication.
// We need the REPLICA status to distinguish from
// source-side replication status updates that also carry isDeleteMarker=true.
if (
omVal.isDeleteMarker &&
omVal.replicationInfo &&
omVal.replicationInfo.status === 'REPLICA' &&
(omVal.replicationInfo.isReplica === true || omVal.replicationInfo.status === 'REPLICA') &&
request.serverAccessLog
) {
// eslint-disable-next-line no-param-reassign
Expand All @@ -576,7 +685,7 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
// The REPLICA status excludes source-side replication-status updates.
if (
omVal.replicationInfo &&
omVal.replicationInfo.status === 'REPLICA' &&
(omVal.replicationInfo.isReplica === true || omVal.replicationInfo.status === 'REPLICA') &&
(omVal.originOp === 's3:ObjectTagging:Put' || omVal.originOp === 's3:ObjectTagging:Delete') &&
request.serverAccessLog
) {
Expand All @@ -593,7 +702,7 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
// The REPLICA status excludes source-side replication-status updates.
if (
omVal.replicationInfo &&
omVal.replicationInfo.status === 'REPLICA' &&
(omVal.replicationInfo.isReplica === true || omVal.replicationInfo.status === 'REPLICA') &&
omVal.originOp === 's3:ObjectAcl:Put' &&
request.serverAccessLog
) {
Expand All @@ -607,17 +716,23 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {

if (headers['x-scal-replication-content'] === 'METADATA') {
if (!objMd) {
return callback(errors.ObjNotFound);
// For metadata-only writes, the destination's data location must be
// preserved from the existing object. If there is no existing object
// and the source has data, there is nothing to copy the location from.
// Zero-byte objects have no data location, so this is safe to skip.
Comment on lines +719 to +722

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is safe to skip, but also safe to keep as well : and the previous code was much simpler. If there is no need change it, best not to change it.

Especially semantically, headers['x-scal-replication-content'] === 'METADATA' means we expect to update the metadata of an object: so that object MUST exist. Removing the check could mask bugs (e.g. backbeat set METADATA instead of DATA+METADATA) or lead to re-creating the object/version (empty) after it was deleted.

→ keep the previous chunk, this should not be changed

if (omVal['content-length'] > 0) {
return callback(errors.ObjNotFound);
}
} else {
[
'location',
'x-amz-server-side-encryption',
'x-amz-server-side-encryption-aws-kms-key-id',
'x-amz-server-side-encryption-customer-algorithm',
].forEach(headerName => {
omVal[headerName] = objMd[headerName];
});
}

[
'location',
'x-amz-server-side-encryption',
'x-amz-server-side-encryption-aws-kms-key-id',
'x-amz-server-side-encryption-customer-algorithm',
].forEach(headerName => {
omVal[headerName] = objMd[headerName];
});
}

let versionId = decodeVersionId(request.query);
Expand Down Expand Up @@ -672,7 +787,8 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
// then we want to create a version for the replica object even though
// none was provided in the object metadata value.
if (omVal.replicationInfo.isNFS) {
const { isReplica } = omVal.replicationInfo;
const isReplica = omVal.replicationInfo.isReplica === true
|| omVal.replicationInfo.status === 'REPLICA';
versioning = isReplica;
omVal.replicationInfo.isNFS = !isReplica;
}
Expand Down Expand Up @@ -724,6 +840,48 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
options.isNull = isNull;
}

// Cascade triggering
// If the bucket receiving this replica has its own CRR rules, set
// status to PENDING so the queue populator here picks it up for the
// next hop. If not, clear the source-side replicationInfo fields
// Always mark isReplica=true.
if (isCascadeWrite) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

microVersionId header semantics is not "try to replicate" but really "do not write older metadata":

  • if another feature (lifecycle) uses putMetadata sets this header → may unexpectedly trigger replication ;
  • conversely, another feature may use putMetadata to make a change which requires replication

so this is not the right condition: microVersionId is about making the call conditioned, while this block is really about "begin called as part of replication"...

→ For the first point, checking headers['x-scal-replication-content'] is the correct & precise way to identify if this put metadata is part of replication. If it is, then we must (now) try to cascade.
→ Second point is not addressed, and not needed at the moment AFAIK
→ As a safety check, we may here require that microVersionId condition is specified when making a replication's putMetadata : and reject the call otherwise (internal error) instead of just (silently) skipping cascaded replication...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if headers['x-scal-replication-content'] is set, but microVersionID is not:

  • we must still mark the object as isReplica. This must be done as long as this is a replication request
  • what we may skip or reject is only the cascade trigger, i.e. updating replication info to PENDING. Not sure what we should put instead, if we skip

This could be caused by 2 things:

  • bug in code → should reject/fail with an error
  • or may be the case of "older artesca" replicating to "newer artesca" → in that case, skipping cascade may be better indeed to avoid breaking customer workflow, but need to double-think if this could break further... At the very least we would need a
  • maybe a compromise would be to fail only if microVersionId is not set AND we would trigger a cascade (could be risky? or maybe not, since at least we would set it...) and ignore otherwise...
Suggested change
if (isCascadeWrite) {
if (headers['x-scal-replication-content']) {
const replInfo = getReplicationInfo(config, objectKey, bucketInfo, isMDOnly, objSize, null, null, null);
replInfo.isReplica = true
if (!headers['x-scal-micro-version-id'] && replInfo.status == PENDING) {
return respondWithError(...)
}
obj.relicationInfo = replInfo;

→ in any case headers['x-scal-replication-content'] must be part of the condition, this is the real "first level" condition for cascade
→ not sure how best to handle the missing microVersionId in that case: e.g. if ignoring is safe (but still we must set "isReplica"), if it should fail (strictly safe, but may break in upgrade scenarios), if it should replicate anyway (trust the user to deploy cascaded/... only when all systems are ugpraded) or if we should fail only when actually triggering replication... what do you think?

const isMDOnly = headers['x-scal-replication-content'] === 'METADATA';
const objSize = omVal['content-length'] || 0;

// These S3-compatible Scality locations are excluded
// as cascade targets because they use the MultiBackend S3 path which
// bypasses the putData/putMetadata routes, so loop detection cannot fire
// on those destinations.
Comment thread
francoisferrand marked this conversation as resolved.
const BLOCKED_LOCATION_TYPES = ['location-scality-ring-s3-v1', 'location-scality-artesca-s3-v1'];

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be tested on a real lab ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be honestly, i think i need to double check this together with the design and the code

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we know the "source" (sender) of the request here, and map it to a location?
It would not solve the loop problem, but passing that information would allow skipping extra API call(s) in case of bidirectional CRR, and keep same "load" as the existing code.

→ please create followup, could be a nice optimization?

(kind of flimsy, but we could extract the "replication group" from versionId ; not sure how to map to locations though. Or checking if source IP is in the location's bootstrap list. Other maybe other ways...)


const nextReplInfo = getReplicationInfo(config, objectKey, bucketInfo, isMDOnly, objSize, null, null, null);

if (nextReplInfo) {
nextReplInfo.backends = nextReplInfo.backends.filter(b => {
const loc = config.locationConstraints[b.site];
return !loc || !BLOCKED_LOCATION_TYPES.includes(loc.type);
});
Comment on lines +861 to +864

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic should be in getReplicationInfo, this is the function responsible to build the list of backend/destinations. Doing it there will also avoid create a backend entry only to remove it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure you wanna do this ? It means I have to add some kind of flag to getReplicationInfo to do the current logic in getReplicationInfo 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see, maybe it can be done cleanly if, in getReplicationInfo.js I add a helper function, leave the existing function almost as it is and just call that extra helper at the end of getReplicationInfo if the flag is on ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

up, just wanna double check whats the potential issue here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to move it out of here indeed : easier to test (isolated), easier to find (logic in a single place), and makes the current function smaller...

Not sure if we could do with the existing params, extend them, or if we need a new one ; but (esp if we add one) we should also take the chance to refactor the API of getReplicationInfo to have a take a structure instead of a long series of null...

(may be in followup PR though, but should be done either way I think)

}

if (nextReplInfo && nextReplInfo.backends.length > 0) {
omVal.replicationInfo = nextReplInfo;
} else {
omVal.replicationInfo = {
Comment thread
SylvainSenechal marked this conversation as resolved.
status: '',
backends: [],
content: [],
destination: '',
storageClass: '',
role: '',
storageType: '',
dataStoreVersionId: '',
};
}

omVal.replicationInfo.isReplica = true;
}

return async.series(
[
// Zenko's CRR delegates replacing the account
Expand Down
12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
},
"homepage": "https://github.com/scality/S3#readme",
"dependencies": {
"@aws-crypto/crc32": "^5.2.0",
"@aws-crypto/crc32c": "^5.2.0",
"@aws-sdk/client-iam": "^3.930.0",
"@aws-sdk/client-s3": "^3.1013.0",
"@aws-sdk/client-sts": "^3.930.0",
"@aws-sdk/crc64-nvme-crt": "^3.989.0",
Comment thread
SylvainSenechal marked this conversation as resolved.
"@aws-sdk/credential-providers": "^3.864.0",
"@aws-sdk/middleware-retry": "^3.374.0",
"@aws-sdk/protocol-http": "^3.374.0",
"@aws-sdk/s3-request-presigner": "^3.901.0",
"@aws-sdk/signature-v4": "^3.374.0",
"@aws-crypto/crc32": "^5.2.0",
"@aws-crypto/crc32c": "^5.2.0",
"@aws-sdk/crc64-nvme-crt": "^3.989.0",
"@azure/storage-blob": "^12.28.0",
"@hapi/joi": "^17.1.1",
"@opentelemetry/api": "^1.9.0",
Expand Down Expand Up @@ -65,11 +65,11 @@
"vaultclient": "scality/vaultclient#8.5.3",
"werelogs": "scality/werelogs#8.2.2",
"ws": "^8.18.0",
"@scality/cloudserverclient": "1.0.9",
"xml2js": "^0.6.2"
Comment thread
SylvainSenechal marked this conversation as resolved.
},
"devDependencies": {
"@eslint/compat": "^1.2.2",
"@scality/cloudserverclient": "1.0.7",
"@scality/eslint-config-scality": "scality/Guidelines#8.3.1",
"eslint": "^9.14.0",
"eslint-plugin-import": "^2.31.0",
Expand All @@ -88,10 +88,10 @@
"nodemon": "^3.1.10",
"nyc": "^15.1.0",
"pino-pretty": "^13.1.3",
"prettier": "^3.4.2",
"sinon": "^13.0.1",
"ts-morph": "^28.0.0",
"tv4": "^1.3.0",
"prettier": "^3.4.2"
"tv4": "^1.3.0"
},
"resolutions": {
"jsonwebtoken": "^9.0.0",
Expand Down
Loading
Loading