Add Crr Cascade capabilities to backbeat crr replication#2747
Add Crr Cascade capabilities to backbeat crr replication#2747SylvainSenechal wants to merge 1 commit into
Conversation
Hello sylvainsenechal,My role is to assist you with the merge of this Available options
Available commands
Status report is not available. |
|
There was a problem hiding this comment.
I think we can functional tests instead of just these,
But waiting for Arsenal/cloudserver to be merged, as it will be easier to make these tests (functional tests in backbeat rely on an image of cloudserver)
There was a problem hiding this comment.
keeping unit test is good, functional test should just be an addition?
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files
... and 3 files with indirect coverage changes
@@ Coverage Diff @@
## development/9.5 #2747 +/- ##
===================================================
+ Coverage 75.23% 75.30% +0.07%
===================================================
Files 200 200
Lines 13764 13799 +35
===================================================
+ Hits 10355 10391 +36
+ Misses 3399 3398 -1
Partials 10 10
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
|
|
4c64ed6 to
3237f9e
Compare
| "@smithy/node-http-handler": "^3.3.3", | ||
| "JSONStream": "^1.3.5", | ||
| "arsenal": "git+https://github.com/scality/arsenal#8.3.9", | ||
| "arsenal": "git+https://github.com/scality/Arsenal#2c429ab35a5ac82c3dafa5a0296a49a23a9c8a4a", |
There was a problem hiding this comment.
Arsenal is pinned to a raw commit hash (2c429ab...) instead of a semantic version tag. Per project conventions, git-based deps (arsenal, vaultclient, etc.) should pin to tags (e.g. #8.x.y). Commit hashes are opaque — it's unclear which features/fixes are included, and there's no semver contract. This also makes it harder for reviewers and operators to reason about what changed.
— Claude Code
f9df514 to
ab2ea02
Compare
ab2ea02 to
13dafae
Compare
13dafae to
7dae570
Compare
7dae570 to
5780ef4
Compare
| VersioningRequired: true, | ||
| RequestUids: log.getSerializedUids(), | ||
| }); | ||
| const putCommand = attachExpectContinueMiddleware( |
There was a problem hiding this comment.
Can you create the follow-up ticket @SylvainSenechal ?
| err, sourceEntry, destEntry, kafkaEntry, log, done)); | ||
| } | ||
|
|
||
| _handleReplicationOutcome(err, sourceEntry, destEntry, kafkaEntry, |
There was a problem hiding this comment.
This section has quite a bit of nested conditional logic and duplicate checks that make it hard to read and maintain.
Could we flatten this using guard clauses (early returns) and abstract the err.XYZ || err.name === 'XYZ' checks into a helper function? It would drastically reduce the cognitive load of this function. Let me know if you want to pair on it!
There was a problem hiding this comment.
I agree its trash code and the diff is hard to read with the last else
I just changed it and tried something that i didn't want to do first but i think it's fine : for each condition, directly publish/return, without having to do it at the end of the function. I believe its quite readable this way
There was a problem hiding this comment.
Could still use some refactor maybe, although all the erro don't have the same form, I think the diff is reasonnable here
| this._getAndPutPart(sourceEntry, destEntry, part, log, done); | ||
| }, (err, destLocations) => { | ||
| }, (err, partResults) => { | ||
| const destLocations = (partResults || []) |
There was a problem hiding this comment.
Could you extract this change into its own commit so you can explain the 'why' in the commit description ?
There was a problem hiding this comment.
For now I added some comments to clarify this a bit
fd87ff8 to
d1bf122
Compare
| // If all parts were already at destination, destLocations is []. | ||
| // Sending [] without metadata-only mode would wipe the object's | ||
| // location field at the destination, so force metadata-only. |
There was a problem hiding this comment.
should we not pass partResults anyway? I mean for deleteOrphans we do indeed need to skip the parts already at destination ; but for continuation we need to pass all parts from destination : so that we can construct the metadata of the MPU object ?
the issue I think of is racing to create the object: so 2 source replicate half the objects each, and eventually one of them must still write the metadata after filling it with the actual part "ids".
→ if all parts at destination, maybe we can indeed skip metadata creation (or best to try it anyway? this cheap vs the upload of all parts anyway....)
→ if some pats are missing, we must still try to write the metadata I guess (contrary to simple objects, the case of a "partial" MPU is not the same!)
There was a problem hiding this comment.
Thinking about this.
DeleteOrphans is fine as it is.
I think the big problem we have (although maybe it's not a expected product scenario) is
4 locations with a mix of cascade and multi destination, using MPU :
a -> b
a -> c
b -> d
c -> d
b and c will write to d at the same time, so let's say for 10 parts, half of them will be written from b and the other from c.
Then for the parts already written by the other location, the current location will get a list of partResults with some elements being errors partAlreadyAtDest, and other elements will be proper locations.
If we pass the whole unfiltered partResults down the call chain, we will get corrupted location data with beause of the errors in partResults.
I feel like a correct way to handle this would be :
When doing an mpu, as soon as I see a partAlreadyAtDest errors, I stop because it means another location is already doing its replication, so I will let it do it fully.
But it seems like this could get racy, when both sources detect a partAlreadyAtDest -_-
There was a problem hiding this comment.
And thinking some more I think what we would want is :
When doing a putData, even when there is a 409, return the existing location for that part to backbeat
Then regardless of we had had collision on putData or not, at the end we have the data locations for all parts, backbeat can send putMEtaData with all parts locations, and so in our scenarios, both location b and c will call putMetaData, and one of the call will fail but its alright
cc @maeldonn
There was a problem hiding this comment.
When doing a putData, even when there is a 409, return the existing location for that part to backbeat
409 is not "by part" : the 409 means "there is already an object" (i.e. metadata), it is not related to a single part.
each part upload is completely transparent.
When doing an mpu, as soon as I see a partAlreadyAtDest errors, I stop because it means another location is already doing its replication, so I will let it do it fully.
agreed (though to be precise: once we detect it, it means the metadata was written: so it has already done the replication)
also need to delete all uploaded parts, since each replication source will upload their own parts independently until either success or conflict. On conflict all parts must be removed.
b and c may both write parts (putData) in parallel, but they will do so completely indepedently: the only moment when they will interact (i.e. conflict will be detectable) is when the metadata has been written by either → from that point the "remaining" putData will fail, and the loser must remove his "own" parts (all the successful putData he made) and skip the putMetadata...
this also means there is likely a followup here, to redesign MPU upload both for safety (avoid orphans) and optimization (detect conflict earlier and/or allow reusing parts already uploaded)
There was a problem hiding this comment.
Oh ok I didn't understand that the 409 would only happen after we have done the putMetadata, but yeah it makes sense, it's when the data gets linked to the metadata.
In that case, our problem might become much easier, since what happens now is :
- one of the 2 location finishes its replication first, with multiple putData and 1 putMetedata
- The second one is slower, and as soon as the first one has done the putMetadata, this one starts receiving the "partAlreadyAtDestination". Then that second one eventually also tries to do a putMetadata, but because it's already there, it will get a MicroVersionIdAlreadyStoredException, and the error will trigger a deleteOrphans.
So I think the only follow we may wanna do, which is not even fully related to this feature is : stopping the code execution earlier as soon as we receive one "partAlreadyAtDest"
There was a problem hiding this comment.
| if (!collisionErr.microVersionId) { | ||
| log.info('cascade putData: data at destination, ' + | ||
| 'no microVersionId, proceeding with putMetadata', logMeta); | ||
| return { err: null, result: partAlreadyAtDest }; |
There was a problem hiding this comment.
can this really happen?
→ if we get a collision, it means we have the "newer" cloudserver code, which returns the microVersionId : the field should always be set
→ so this case should either be an error (like decoding), or just the usual path of compareMicroVersionId() (since microVersionId is not set when "creating" object)
There was a problem hiding this comment.
here microVersionId is the microVersionId at the destination, so it could be undefined.
I think it's unlikely but maybe a scenario like replication already setup before cascaded : we have a replica already but no microVersionId
This is quite defensive though but decode can crash if we don't check this
Keeping this open because in cloudserver there is still discussion about microVersionId beging set when creating the object
| if (err.ObjNotFound || err.name === 'ObjNotFound') { | ||
| return cbOnce(err); | ||
| } | ||
| if (err instanceof MicroVersionIdAlreadyStoredException) { |
There was a problem hiding this comment.
don't we have a single exception in the latest API, leaving the caller to check microVersionID to identify if this is a "loop" or "stale" ?
There was a problem hiding this comment.
I built it this way, prefer to have backbeat just checking error instead of reruning the whole microVersionId comparison that cloudserver already did
d1bf122 to
3958e74
Compare
3958e74 to
f6f31c8
Compare
francoisferrand
left a comment
There was a problem hiding this comment.
multiple open topics open in scality/cloudserver#6179, regarding "empty" versionId, shape of errors returned, ... → best to settle these before reviewing
83c6e9f to
9da41bb
Compare
9da41bb to
5c4ed70
Compare
| @@ -685,14 +744,14 @@ class ReplicateObject extends BackbeatTask { | |||
| return cbOnce(err); | |||
| } | |||
| log.error('an error occurred when putting metadata to S3', | |||
There was a problem hiding this comment.
MicroVersionIdAlreadyStoredException and StaleMicroVersionIdException from putMetadata are expected cascade signals, not errors. They fall through to this log.error because the catch block only early-returns for ObjNotFound. In production cascade scenarios, every loop/stale detection at the metadata level will emit a misleading error log.
Add early returns before the error log, mirroring the ObjNotFound pattern:
if (err instanceof MicroVersionIdAlreadyStoredException ||
err instanceof StaleMicroVersionIdException) {
return cbOnce(err);
}| this._getAndPutPart(sourceEntry, destEntry, part, log, done); | ||
| }, (err, destLocations) => { | ||
| }, (err, partResults) => { | ||
| // partAlreadyAtDest signals data already at dest (cascade putData 409); |
There was a problem hiding this comment.
partAlreadyAtDest name is not correct : we cannot detect that a "part" is already at destination, only that there is already an object (i.e. a document in mongo with the specified key and versionId).
→ should really be objectAlreadyAtDest
....and it changes the logic a bit : when it happen (even on a single part), all successful parts must be deleted.
| // partAlreadyAtDest signals data already at dest (cascade putData 409); | |
| if (err) { | |
| return this._deleteOrphans(destEntry, destLocations, log, () => cb(err)); | |
| } | |
| const destLocations = (partResults || []).filter(result => result && result !== partAlreadyAtDest); | |
| if (destLocations.length != partResults.length) { | |
| // object already exist, release all parts then check if metadata needs to be updated | |
| return this._deleteOrphans(destEntry, destLocations, log, () => | |
| cb(null, destLocations, true); | |
| } | |
| return this._deleteOrphans(destEntry, destLocations, log, () => | |
| cb(null, destLocations, false); |
i.e. we cannot make a partial write of object data. To create the metadata, all parts must be known: so all of them must have been written successfully. If there is a "conflict" on any single part, it means another replicant has finished uploading parts and created the metadata document : so we must drop all the parts we wrote, and just update the object metadata if needed....
There was a problem hiding this comment.
also please create followups to
- ignore (other) error if a single part has a conflict
- abort other parts upload as soon as we identify a conflict (i.e. imagine a 1000 parts object: if we have a conflict on first part, no point trying the other parts)
- consider changing the putPartData protocol to create actual MPU parts (which can be garbage-collected by lifecycle after transfer is aborted), for extra safety (not strictly related to CRR, but best to track it)
| }); | ||
| } | ||
|
|
||
| _resolveVersionIdCollision(collisionErr, sourceEntry, destEntry, log) { |
There was a problem hiding this comment.
this whole function is not correct in case of putData(Parts) : if we receive VersionIdCollision, it means there is already a object with this versionId.
- We must not create or replace the metadata with the new data (data is immutable)
- We must immediately delete whatever we already uploaded
- The
microVersionIdcomparison can be used only after this, to decide if we need to proceed with the metadata update (e.g. not the location, but the other fields: tags, ...) or can skip it
→ here the function is used to compute partAlreadyAtDest, which would make _getAndPutData silently "hide" the error if there is already some data at the destination BUT the metadata we are trying to replicate is newer.
→ _getAndPutData() should probably return the max/last microVersionId it received (in case of conflict) or nothing it is wrote all data successfully (i.e. no conflict)
(the logic here is what should happen on microVersionId conflict on metadata, not here on VersionIdConflict)
There was a problem hiding this comment.
also in that case the x-scal-replication-content sent to cloudserver MUST not be DATA+METADATA anymore, but be "downgraded" to METADATA (to keep the existing data)
| if (err instanceof MicroVersionIdAlreadyStoredException) { | ||
| log.info('replication completed via cascade loop: ' + | ||
| 'object already at destination with the same revision', | ||
| { entry: sourceEntry.getLogInfo() }); | ||
| this._publishReplicationStatus(sourceEntry, 'COMPLETED', { kafkaEntry, log }); | ||
| return done(null, { committable: false }); | ||
| } | ||
| if (err instanceof StaleMicroVersionIdException) { | ||
| log.info('replication completed: destination already holds ' + | ||
| 'this version with a newer revision', | ||
| { entry: sourceEntry.getLogInfo() }); | ||
| this._publishReplicationStatus(sourceEntry, 'COMPLETED', { kafkaEntry, log }); | ||
| return done(null, { committable: false }); | ||
| } | ||
| if (!err) { | ||
| log.debug('replication succeeded for object, publishing ' + | ||
| 'replication status as COMPLETED', | ||
| { entry: sourceEntry.getLogInfo() }); | ||
| this._publishReplicationStatus( | ||
| sourceEntry, 'COMPLETED', { kafkaEntry, log }); | ||
| this._publishReplicationStatus(sourceEntry, 'COMPLETED', { kafkaEntry, log }); |
There was a problem hiding this comment.
we do exactly the same in all 3 branches : do we need 2 errors and different logs?
if (!err || err instanceof ...) {
log.debug('replication succeeded for object, publishing ' +
'replication status as COMPLETED',
{ entry: sourceEntry.getLogInfo(), err });
this._publishReplicationStatus(sourceEntry, 'COMPLETED', { kafkaEntry, log });
}
francoisferrand
left a comment
There was a problem hiding this comment.
handling of MPU is not correct : each "replicant" must write their own object fully, there is no situation where 2 "sources" each replicate part of the data (nor a way for these to identify the data they have written). Each source create the metadata document with the parts they uploaded, so they MUST successfully upload all parts data and MUST not have a conflict on any part.
Waiting for approvalThe following approvals are needed before I can proceed with the merge:
The following reviewers are expecting changes from the author, or must review again: |
| if (err instanceof MicroVersionIdAlreadyStoredException) { | ||
| log.info('replication completed via cascade loop: ' + | ||
| 'object already at destination with the same revision', | ||
| { entry: sourceEntry.getLogInfo() }); | ||
| this._publishReplicationStatus(sourceEntry, 'COMPLETED', { kafkaEntry, log }); | ||
| return done(null, { committable: false }); | ||
| } | ||
| if (err instanceof StaleMicroVersionIdException) { | ||
| log.info('replication completed: destination already holds ' + | ||
| 'this version with a newer revision', | ||
| { entry: sourceEntry.getLogInfo() }); | ||
| this._publishReplicationStatus(sourceEntry, 'COMPLETED', { kafkaEntry, log }); | ||
| return done(null, { committable: false }); | ||
| } |
There was a problem hiding this comment.
there is a risk of creating orphans here, please create a followup:
- new object was created → try to replicate DATA+META
- putDataParts succeeds (no conflict)
- before we could put metadata, another site was able to putData (same as us, not metadata → impossible to detect conflict) and putMetadata
- putMetadata will thus fail with MicroVersionIdAlreadyStoredException/StaleMicroVersionIdException
→ in that case, data created in 2. must be deleted, i.e. call _deleteOrphans. _deleteOrphans MUST NOT be called if this was a META-only update / if we did not call putDataParts
Issue: BB-767
Related PRs :
Arsenal : scality/Arsenal#2628
Cloudserver : scality/cloudserver#6179
CloudserverClient : scality/cloudserverclient#24
S3utils : scality/s3utils#395