Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# [11.0.1](https://github.com/mapbox/dynamodb-replicator/pull/116) July 21, 2025

**Potential Breaking Change**: Upgrading all aws sdk v2
usage to aws sdk v3. AWS sdk v2 EOL is September 2025, so
this change is mandatory for standard upkeep.
37 changes: 24 additions & 13 deletions backup.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
var AWS = require('aws-sdk');
var { S3Client } = require('@aws-sdk/client-s3');
var { Upload } = require('@aws-sdk/lib-storage');

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you didnt introduce this pattern but we should be using const and not var for this

var Dyno = require('@mapbox/dyno');
var stream = require('stream');
var zlib = require('zlib');

module.exports = function(config, done) {
var primary = Dyno(config);
var s3 = new AWS.S3();
var s3Client = new S3Client({ region: config.region });

var log = config.log || console.log;

Expand Down Expand Up @@ -40,20 +41,30 @@ module.exports = function(config, done) {

log('[segment %s] Starting backup job %s of %s', index, config.backup.jobid, config.region + '/' + config.table);

s3.upload({
Bucket: config.backup.bucket,
Key: key,
Body: data
}, function(err) {
if (err) return next(err);
log('[segment %s] Uploaded dynamo backup to s3://%s/%s', index, config.backup.bucket, key);
log('[segment %s] Wrote %s items to backup', index, count);
next();
}).on('httpUploadProgress', function(progress) {
const upload = new Upload({
client: s3Client,
params: {
Bucket: config.backup.bucket,
Key: key,
Body: data
}
});

upload.on('httpUploadProgress', function(progress) {
log('[segment %s] Uploaded %s bytes', index, progress.loaded);
size = progress.total;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intresting it seems progress.loaded was available before this change, assuming total is not deprecated. Do you know the difference between the two?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In @aws-sdk/lib-storage's Upload class the httpUploadProgress event provides a progress object, but progress.total can be undefined or not reliably set.

I'm not married to this change but when testing I noticed the console spitting out 'undefined' for this value. I went down a deep rabbit hole and found this gh issue which recommended using the loaded over total

size = progress.loaded;
});

upload.done()
.then(() => {
log('[segment %s] Uploaded dynamo backup to s3://%s/%s', index, config.backup.bucket, key);
log('[segment %s] Wrote %s items to backup', index, count);
next();
})
.catch(err => {
next(err);
});

function next(err) {
if (err) return done(err);
done(null, { size: size, count: count });
Expand Down
14 changes: 9 additions & 5 deletions bin/backup-table.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ var fastlog = require('../fastlog');
var args = require('minimist')(process.argv.slice(2));
var crypto = require('crypto');
var s3urls = require('s3urls');
var AWS = require('aws-sdk');
var { CloudWatchClient, PutMetricDataCommand } = require('@aws-sdk/client-cloudwatch');

function usage() {
console.error('');
Expand Down Expand Up @@ -64,7 +64,7 @@ backup(config, function(err, details) {
if (err) log.error(err);

if (args.metric) {
var cw = new AWS.CloudWatch({ region: config.region });
var cwClient = new CloudWatchClient({ region: config.region });
var params = {
Namespace: args.metric,
MetricData: []
Expand Down Expand Up @@ -107,8 +107,12 @@ backup(config, function(err, details) {
});
}

cw.putMetricData(params, function(err) {
if (err) log.error(err);
});
cwClient.send(new PutMetricDataCommand(params))
.then(() => {
// Success

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we put anything here?

})
.catch(err => {
log.error(err);
});
}
});
78 changes: 50 additions & 28 deletions bin/incremental-diff-record.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ var minimist = require('minimist');
var s3urls = require('s3urls');
var Dyno = require('@mapbox/dyno');
var crypto = require('crypto');
var AWS = require('aws-sdk');
var s3 = new AWS.S3();
var { S3Client, GetObjectCommand } = require('@aws-sdk/client-s3');
var assert = require('assert');

var args = minimist(process.argv.slice(2));
Expand Down Expand Up @@ -34,6 +33,8 @@ if (!table) {
var region = table.split('/')[0];
table = table.split('/')[1];

var s3Client = new S3Client({ region: region });

var s3url = args._[1];

if (!s3url) {
Expand Down Expand Up @@ -86,30 +87,51 @@ dyno.getItem({ Key: key }, function(err, data) {
if (err) throw err;
var dynamoRecord = data.Item;

s3.getObject(s3url, function(err, data) {
if (err && err.statusCode !== 404) throw err;
var s3data = err ? undefined : Dyno.deserialize(data.Body.toString());

console.log('DynamoDB record');
console.log('--------------');
console.log(dynamoRecord);
console.log('');

console.log('Incremental backup record (%s)', s3url.Key);
console.log('--------------');
console.log(s3data);
console.log('');

try {
assert.deepEqual(s3data, dynamoRecord);
console.log('----------------------------');
console.log('✔ The records are equivalent');
console.log('----------------------------');
}
catch (err) {
console.log('--------------------------------');
console.log('✘ The records are not equivalent');
console.log('--------------------------------');
}
});
s3Client.send(new GetObjectCommand(s3url))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

way too many callbacks being used here not related to your PR but it makes everything harder to read, maybe we can come back and use the await/async model

.then(data => {
var s3data = Dyno.deserialize(data.Body.toString());

console.log('DynamoDB record');
console.log('--------------');
console.log(dynamoRecord);
console.log('');

console.log('Incremental backup record (%s)', s3url.Key);
console.log('--------------');
console.log(s3data);
console.log('');

try {
assert.deepEqual(s3data, dynamoRecord);
console.log('----------------------------');
console.log('✔ The records are equivalent');
console.log('----------------------------');
}
catch (err) {
console.log('--------------------------------');
console.log('✘ The records are not equivalent');
console.log('--------------------------------');
}
})
.catch(err => {
if (err.$metadata && err.$metadata.httpStatusCode === 404) {
var s3data = undefined;

console.log('DynamoDB record');
console.log('--------------');
console.log(dynamoRecord);
console.log('');

console.log('Incremental backup record (%s)', s3url.Key);
console.log('--------------');
console.log(s3data);
console.log('');

console.log('--------------------------------');
console.log('✘ The records are not equivalent');
console.log('--------------------------------');
} else {
throw err;
}
});
});
17 changes: 10 additions & 7 deletions bin/incremental-snapshot.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env node

var AWS = require('aws-sdk');
var { CloudWatchClient, PutMetricDataCommand } = require('@aws-sdk/client-cloudwatch');
var args = require('minimist')(process.argv.slice(2));
var s3urls = require('s3urls');
var fastlog = require('../fastlog');
Expand Down Expand Up @@ -59,7 +59,7 @@ snapshot(config, function(err, details) {
var namespace = args.metric.split('/')[1];
var table = args.metric.split('/')[2];

var cw = new AWS.CloudWatch({ region: region });
var cwClient = new CloudWatchClient({ region: region });

var params = {
Namespace: namespace,
Expand Down Expand Up @@ -103,10 +103,13 @@ snapshot(config, function(err, details) {
});
}

cw.putMetricData(params, function(err) {
if (err) return log.error(err);
if (!details) return log.info('Snapshot failed, wrote error metric to %s', args.metric);
log.info('Wrote %s size / %s count metrics to %s', details.size, details.count, args.metric);
});
cwClient.send(new PutMetricDataCommand(params))
.then(() => {
if (!details) return log.info('Snapshot failed, wrote error metric to %s', args.metric);
log.info('Wrote %s size / %s count metrics to %s', details.size, details.count, args.metric);
})
.catch(err => {
log.error(err);
});
}
});
68 changes: 40 additions & 28 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
var AWS = require('aws-sdk');
var { S3Client, PutObjectCommand, DeleteObjectCommand } = require('@aws-sdk/client-s3');
var Dyno = require('@mapbox/dyno');
var queue = require('queue-async');
var crypto = require('crypto');
Expand Down Expand Up @@ -118,16 +118,16 @@ function replicate(event, context, callback) {

function incrementalBackup(event, context, callback) {
var params = {
maxRetries: 1000,
httpOptions: {
timeout: 1000,
agent: module.exports.agent
maxAttempts: 1000,
requestHandler: {
connectionTimeout: 1000,
httpAgent: module.exports.agent
}
};

if (process.env.BackupRegion) params.region = process.env.BackupRegion;

var s3 = new AWS.S3(params);
var s3Client = new S3Client(params);

var filterer;
if (process.env.TurnoverRole && process.env.TurnoverAt) {
Expand Down Expand Up @@ -185,28 +185,40 @@ function incrementalBackup(event, context, callback) {
Key: [process.env.BackupPrefix, table, id].join('/')
};

var req = change.eventName === 'REMOVE' ? 'deleteObject' : 'putObject';
if (req === 'putObject') params.Body = JSON.stringify(change.dynamodb.NewImage);

s3[req](params, function(err) {
if (err) console.log(
'[error] %s | %s s3://%s/%s | %s',
JSON.stringify(change.dynamodb.Keys),
req, params.Bucket, params.Key,
err.message
);
next(err);
}).on('retry', function(res) {
if (!res.error || !res.httpResponse || !res.httpResponse.headers) return;
if (res.error.name === 'TimeoutError') res.error.retryable = true;
console.log(
'[failed-request] request-id: %s | id-2: %s | %s s3://%s/%s | %s',
res.httpResponse.headers['x-amz-request-id'],
res.httpResponse.headers['x-amz-id-2'],
req, params.Bucket, params.Key,
res.error
);
});
var command;
if (change.eventName === 'REMOVE') {
command = new DeleteObjectCommand(params);
} else {
params.Body = JSON.stringify(change.dynamodb.NewImage);
command = new PutObjectCommand(params);
}

s3Client.send(command)
.then(() => {
next();
})
.catch(err => {
console.log(
'[error] %s | %s s3://%s/%s | %s',
JSON.stringify(change.dynamodb.Keys),
change.eventName === 'REMOVE' ? 'deleteObject' : 'putObject',
params.Bucket, params.Key,
err.message
);

// Log retry information if available
if (err.$metadata && err.$metadata.requestId) {
console.log(
'[failed-request] request-id: %s | %s s3://%s/%s | %s',
err.$metadata.requestId,
change.eventName === 'REMOVE' ? 'deleteObject' : 'putObject',
params.Bucket, params.Key,
err
);
}

next(err);
});
});
});

Expand Down
Loading