diff --git a/README.md b/README.md index 5fe3307..fe0ef6b 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,8 @@ In our implementation we opted to use Amazon Kinesis Client with DynamoDB Stream * Java 8 * Gradlew 5.3.1 * Kafka Connect Framework >= 2.1.1 -* Amazon Kinesis Client 1.9.1 -* DynamoDB Streams Kinesis Adapter 1.5.2 +* Amazon Kinesis Client 1.13.1 +* DynamoDB Streams Kinesis Adapter 1.5.3 ## Documentation * [Getting started](docs/getting-started.md) @@ -44,6 +44,7 @@ In our implementation we opted to use Amazon Kinesis Client with DynamoDB Stream * However you will only encounter this issue by running lots of tasks on one machine with really high load. * Synced(Source) DynamoDB table unit capacity must be large enough to ensure `INIT_SYNC` to be finished in around 16 hours. Otherwise there is a risk `INIT_SYNC` being restarted just as soon as it's finished because DynamoDB Streams store change events only for 24 hours. + * `INIT_SYNC` can be skipped with `init.sync.skip=true` configuration * Required AWS roles: ```json diff --git a/build.gradle b/build.gradle index e1c3448..deb9f54 100644 --- a/build.gradle +++ b/build.gradle @@ -20,6 +20,8 @@ task userWrapper(type: Wrapper) { description = "kafka-connect-dynamodb" +shadowJar.archiveName = "${project.name}.jar" + allprojects { group "com.trustpilot.kafka.connect.dynamodb" version = gitVersion() diff --git a/docs/details.md b/docs/details.md index c95aafb..8d95c97 100644 --- a/docs/details.md +++ b/docs/details.md @@ -13,6 +13,8 @@ This connector can sync multiple DynamoDB tables at the same time and it does so `INIT_SYNC` is a process when all existing table data is scanned and pushed into Kafka destination topic. Usually this happens only once after source task for specific table is started for the first time. But it can be repeated in case of unexpected issues, e.g. if source connector was down for long period of time and it is possible that it has missed some of the change events from the table stream (DynamoDB streams store data for 24 hours only). +Using `init.sync.skip` will skip this process and the connector will only ever read from the LATEST position in the stream. + ### 3. "SYNC" Once `INIT_SYNC` is finished source task switches into DynamoDB Streams consumer state. There all changes that happen to the source table are represented in this stream and copied over to the Kafka's destination topic. Consumers of this topic can recreate full state of the source table at any given time. diff --git a/docs/options.md b/docs/options.md index 423a1c7..4bacf77 100644 --- a/docs/options.md +++ b/docs/options.md @@ -20,6 +20,7 @@ "aws.region": "eu-west-1", "aws.access.key.id": "", "aws.secret.key": "", + "aws.assume.role.arn": "", "dynamodb.table.env.tag.key": "environment", "dynamodb.table.env.tag.value": "dev", @@ -35,9 +36,12 @@ "tasks.max": "1", "init.sync.delay.period": 60, + "init.sync.skip": false, "connect.dynamodb.rediscovery.period": "60000" } ``` +`aws.assume.role.arn` - ARN identifier of an IAM role that the KCL and Dynamo Clients can assume for cross account access + `dynamodb.table.env.tag.key` - tag key used to define environment. Useful if you have `staging` and `production` under same AWS account. Or if you want to use different Kafka Connect clusters to sync different tables. `dynamodb.table.env.tag.value` - defines from which environment to ingest tables. For e.g. 'staging' or 'production'... @@ -50,6 +54,8 @@ `init.sync.delay.period` - time interval in seconds. Defines how long `INIT_SYNC` should delay execution before starting. This is used to give time for Kafka Connect tasks to calm down after rebalance (Since multiple tasks rebalances can happen in quick succession and this would mean more duplicated data since `INIT_SYNC` process won't have time mark it's progress). +`init.sync.skip` - boolean to determine whether to start the connector reading the entire table or from the latest offset. + `connect.dynamodb.rediscovery.period` - time interval in milliseconds. Defines how often connector should try to find new DynamoDB tables (or detect removed ones). If changes are found tasks are automatically reconfigured. `dynamodb.service.endpoint` - AWS DynamoDB API Endpoint. Will use default AWS if not set. diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java index 0e95906..c864881 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java @@ -57,12 +57,14 @@ public void start(Map properties) { AwsClients.buildAWSResourceGroupsTaggingAPIClient(config.getAwsRegion(), config.getResourceTaggingServiceEndpoint(), config.getAwsAccessKeyIdValue(), - config.getAwsSecretKeyValue()); + config.getAwsSecretKeyValue(), + config.getAwsAssumeRoleArn()); AmazonDynamoDB dynamoDBClient = AwsClients.buildDynamoDbClient(config.getAwsRegion(), config.getDynamoDBServiceEndpoint(), config.getAwsAccessKeyIdValue(), - config.getAwsSecretKeyValue()); + config.getAwsSecretKeyValue(), + config.getAwsAssumeRoleArn()); if (tablesProvider == null) { if (config.getWhitelistTables() != null) { diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java index 078d6fb..f077930 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java @@ -17,6 +17,11 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig { public static final String SRC_INIT_SYNC_DELAY_DISPLAY = "INIT_SYNC delay"; public static final int SRC_INIT_SYNC_DELAY_DEFAULT = 60; + public static final String SRC_INIT_SYNC_SKIP_CONFIG = "init.sync.skip"; + public static final String SRC_INIT_SYNC_SKIP_DOC = "Define whether to skip INIT_SYNC of table."; + public static final String SRC_INIT_SYNC_SKIP_DISPLAY = "Skip INIT_SYNC"; + public static final boolean SRC_INIT_SYNC_SKIP_DEFAULT = false; + public static final String AWS_REGION_CONFIG = "aws.region"; public static final String AWS_REGION_DOC = "Define AWS region."; public static final String AWS_REGION_DISPLAY = "Region"; @@ -57,6 +62,11 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig { public static final String SRC_KCL_TABLE_BILLING_MODE_DISPLAY = "KCL table billing mode"; public static final String SRC_KCL_TABLE_BILLING_MODE_DEFAULT = "PROVISIONED"; + public static final String AWS_ASSUME_ROLE_ARN_CONFIG = "aws.assume.role.arn"; + public static final String AWS_ASSUME_ROLE_ARN_DOC = "Define which role arn the KCL/Dynamo Client should assume."; + public static final String AWS_ASSUME_ROLE_ARN_DISPLAY = "Assume Role Arn"; + public static final String AWS_ASSUME_ROLE_ARN_DEFAULT = null; + public static final String DST_TOPIC_PREFIX_CONFIG = "kafka.topic.prefix"; public static final String DST_TOPIC_PREFIX_DOC = "Define Kafka topic destination prefix. End will be the name of a table."; public static final String DST_TOPIC_PREFIX_DISPLAY = "Topic prefix"; @@ -181,6 +191,15 @@ public static ConfigDef baseConfigDef() { ConfigDef.Width.MEDIUM, SRC_KCL_TABLE_BILLING_MODE_DISPLAY) + .define(AWS_ASSUME_ROLE_ARN_CONFIG, + ConfigDef.Type.STRING, + AWS_ASSUME_ROLE_ARN_DEFAULT, + ConfigDef.Importance.LOW, + AWS_ASSUME_ROLE_ARN_DOC, + AWS_GROUP, 10, + ConfigDef.Width.LONG, + AWS_ASSUME_ROLE_ARN_DISPLAY) + .define(DST_TOPIC_PREFIX_CONFIG, ConfigDef.Type.STRING, DST_TOPIC_PREFIX_DEFAULT, @@ -190,12 +209,21 @@ public static ConfigDef baseConfigDef() { ConfigDef.Width.MEDIUM, DST_TOPIC_PREFIX_DISPLAY) + .define(SRC_INIT_SYNC_SKIP_CONFIG, + ConfigDef.Type.BOOLEAN, + SRC_INIT_SYNC_SKIP_DEFAULT, + ConfigDef.Importance.LOW, + SRC_INIT_SYNC_SKIP_DOC, + CONNECTOR_GROUP, 2, + ConfigDef.Width.MEDIUM, + SRC_INIT_SYNC_SKIP_DISPLAY) + .define(SRC_INIT_SYNC_DELAY_CONFIG, ConfigDef.Type.INT, SRC_INIT_SYNC_DELAY_DEFAULT, ConfigDef.Importance.LOW, SRC_INIT_SYNC_DELAY_DOC, - CONNECTOR_GROUP, 2, + CONNECTOR_GROUP, 3, ConfigDef.Width.MEDIUM, SRC_INIT_SYNC_DELAY_DISPLAY) @@ -253,6 +281,10 @@ public long getRediscoveryPeriod() { return getLong(REDISCOVERY_PERIOD_CONFIG); } + public boolean getInitSyncSkip() { + return (boolean)get(SRC_INIT_SYNC_SKIP_CONFIG); + } + public int getInitSyncDelay() { return (int)get(SRC_INIT_SYNC_DELAY_CONFIG); } @@ -272,4 +304,8 @@ public List getWhitelistTables() { public BillingMode getKCLTableBillingMode() { return BillingMode.fromValue(getString(SRC_KCL_TABLE_BILLING_MODE_CONFIG)); } + + public String getAwsAssumeRoleArn() { + return getString(AWS_ASSUME_ROLE_ARN_CONFIG); + } } diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java index 1868cc3..c934bb0 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java @@ -89,6 +89,7 @@ public class DynamoDBSourceTask extends SourceTask { private SourceInfo sourceInfo; private TableDescription tableDesc; private int initSyncDelay; + private boolean initSyncSkip; @SuppressWarnings("unused") //Used by Confluent platform to initialize connector @@ -118,11 +119,13 @@ public void start(Map configProperties) { config.getAwsRegion(), config.getDynamoDBServiceEndpoint(), config.getAwsAccessKeyIdValue(), - config.getAwsSecretKeyValue()); + config.getAwsSecretKeyValue(), + config.getAwsAssumeRoleArn()); } tableDesc = client.describeTable(config.getTableName()).getTable(); initSyncDelay = config.getInitSyncDelay(); + initSyncSkip = config.getInitSyncSkip(); LOGGER.debug("Getting offset for table: {}", tableDesc.getTableName()); setStateFromOffset(); @@ -142,15 +145,23 @@ public void start(Map configProperties) { config.getAwsRegion(), config.getDynamoDBServiceEndpoint(), config.getAwsAccessKeyIdValue(), - config.getAwsSecretKeyValue()); + config.getAwsSecretKeyValue(), + config.getAwsAssumeRoleArn()); if (kclWorker == null) { kclWorker = new KclWorkerImpl( - AwsClients.getCredentials(config.getAwsAccessKeyIdValue(), config.getAwsSecretKeyValue()), + AwsClients.getCredentials(config.getAwsAccessKeyIdValue(), config.getAwsSecretKeyValue(), config.getAwsAssumeRoleArn()), eventsQueue, shardRegister); } - kclWorker.start(client, dynamoDBStreamsClient, tableDesc.getTableName(), config.getTaskID(), config.getDynamoDBServiceEndpoint(), config.getKCLTableBillingMode()); + kclWorker.start(client, + dynamoDBStreamsClient, + tableDesc.getTableName(), + config.getTaskID(), + config.getDynamoDBServiceEndpoint(), + config.getInitSyncSkip(), + config.getKCLTableBillingMode() + ); shutdown = false; } @@ -160,10 +171,17 @@ private void setStateFromOffset() { .offset(Collections.singletonMap("table_name", tableDesc.getTableName())); if (offset != null) { sourceInfo = SourceInfo.fromOffset(offset, clock); + if (initSyncSkip) { + sourceInfo.skipInitSync(); + } } else { LOGGER.debug("No stored offset found for table: {}", tableDesc.getTableName()); sourceInfo = new SourceInfo(tableDesc.getTableName(), clock); - sourceInfo.startInitSync(); // InitSyncStatus always needs to run after adding new table + if (initSyncSkip) { + sourceInfo.skipInitSync(); + } else { + sourceInfo.startInitSync(); + } } } @@ -194,6 +212,9 @@ public List poll() throws InterruptedException { if (sourceInfo.initSyncStatus == InitSyncStatus.FINISHED) { return sync(); } + if (sourceInfo.initSyncStatus == InitSyncStatus.SKIPPED) { + return sync(); + } throw new Exception("Invalid SourceInfo InitSyncStatus state: " + sourceInfo.initSyncStatus); } catch (InterruptedException ex) { LOGGER.error("Failed to handle incoming records. Records dropped!", ex); @@ -236,6 +257,7 @@ private LinkedList initSync() throws Exception { result.add(converter.toSourceRecord(sourceInfo, Envelope.Operation.READ, record, + null, sourceInfo.lastInitSyncStart, null, null)); @@ -254,6 +276,7 @@ private LinkedList initSync() throws Exception { result.add(converter.toSourceRecord(sourceInfo, Envelope.Operation.READ, lastRecord, + null, sourceInfo.lastInitSyncStart, null, null)); @@ -277,6 +300,7 @@ private List sync() throws Exception { LOGGER.debug("Waiting for records from eventsQueue for table: {}", tableDesc.getTableName()); KclRecordsWrapper dynamoDBRecords = eventsQueue.poll(500, TimeUnit.MILLISECONDS); if (dynamoDBRecords == null) { + LOGGER.debug("null dynamoDBRecords"); return null; // returning thread control at regular intervals } @@ -310,12 +334,12 @@ private List sync() throws Exception { } // Received record which is behind "safe" zone. Indicating that "potentially" we lost some records. - // Need to resync... + // Need to resync if sync hasn't been skipped... // This happens if: // * connector was down for some time // * connector is lagging // * connector failed to finish init sync in acceptable time frame - if (recordIsInDangerZone(arrivalTimestamp)) { + if (recordIsInDangerZone(arrivalTimestamp) && sourceInfo.initSyncStatus != InitSyncStatus.SKIPPED) { sourceInfo.startInitSync(); LOGGER.info( @@ -342,9 +366,12 @@ private List sync() throws Exception { attributes = dynamoDbRecord.getDynamodb().getKeys(); } + Map oldImage = dynamoDbRecord.getDynamodb().getOldImage(); + SourceRecord sourceRecord = converter.toSourceRecord(sourceInfo, op, attributes, + oldImage, arrivalTimestamp.toInstant(), dynamoDBRecords.getShardId(), record.getSequenceNumber()); diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/Envelope.java b/source/src/main/java/com/trustpilot/connector/dynamodb/Envelope.java index a803ff1..92bd884 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/Envelope.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/Envelope.java @@ -56,6 +56,10 @@ public static final class FieldName { * The {@code after} field is used to store the state of a record after an operation. */ public static final String DOCUMENT = "document"; + /** + * The {@code oldDocument} field is used to store the state of a record before an operation. + */ + public static final String OLD_DOCUMENT = "old_document"; /** * The {@code op} field is used to store the kind of operation on a record. */ diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/InitSyncStatus.java b/source/src/main/java/com/trustpilot/connector/dynamodb/InitSyncStatus.java index 2ab048c..be6791a 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/InitSyncStatus.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/InitSyncStatus.java @@ -3,5 +3,6 @@ public enum InitSyncStatus { UNDEFINED, RUNNING, - FINISHED + FINISHED, + SKIPPED } diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/SourceInfo.java b/source/src/main/java/com/trustpilot/connector/dynamodb/SourceInfo.java index c0f04d4..51c3162 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/SourceInfo.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/SourceInfo.java @@ -71,6 +71,14 @@ public void endInitSync() { lastInitSyncEnd = Instant.now(clock); } + public void skipInitSync() { + initSyncStatus = InitSyncStatus.SKIPPED; + lastInitSyncStart = Instant.ofEpochSecond(0); + lastInitSyncEnd = Instant.ofEpochSecond(0);; + exclusiveStartKey = null; + initSyncCount = 0L; + } + private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct() .name(SchemaNameAdjuster .defaultAdjuster() diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/aws/AwsClients.java b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/AwsClients.java index 175f6f0..d6cbdfc 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/aws/AwsClients.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/aws/AwsClients.java @@ -5,6 +5,7 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; @@ -22,44 +23,56 @@ public class AwsClients { public static AmazonDynamoDB buildDynamoDbClient(String awsRegion, String serviceEndpoint, String awsAccessKeyID, - String awsSecretKey) { - + String awsSecretKey, + String awsAssumeRoleArn) { return (AmazonDynamoDB) configureBuilder( AmazonDynamoDBClientBuilder.standard(), awsRegion, serviceEndpoint, awsAccessKeyID, - awsSecretKey) + awsSecretKey, + awsAssumeRoleArn) .build(); } public static AWSResourceGroupsTaggingAPI buildAWSResourceGroupsTaggingAPIClient(String awsRegion, String serviceEndpoint, String awsAccessKeyID, - String awsSecretKey) { + String awsSecretKey, + String awsAssumeRoleArn) { return (AWSResourceGroupsTaggingAPI) configureBuilder( AWSResourceGroupsTaggingAPIClientBuilder.standard(), awsRegion, serviceEndpoint, awsAccessKeyID, - awsSecretKey) + awsSecretKey, + awsAssumeRoleArn) .build(); } public static AmazonDynamoDBStreams buildDynamoDbStreamsClient(String awsRegion, String serviceEndpoint, String awsAccessKeyID, - String awsSecretKey) { + String awsSecretKey, + String awsAssumeRoleArn) { return (AmazonDynamoDBStreams) configureBuilder( AmazonDynamoDBStreamsClientBuilder.standard(), awsRegion, serviceEndpoint, awsAccessKeyID, - awsSecretKey) + awsSecretKey, + awsAssumeRoleArn) .build(); } - public static AWSCredentialsProvider getCredentials(String awsAccessKeyID, String awsSecretKey) { - if (awsAccessKeyID == null || awsSecretKey == null) { + public static AWSCredentialsProvider getCredentials(String awsAccessKeyID, + String awsSecretKey, + String awsAssumeRoleArn) { + if (awsAssumeRoleArn != null ) { + LOGGER.debug("Using STSAssumeRoleSessionCredentialsProvider"); + AWSCredentialsProvider awsCredentialsProviderChain = DefaultAWSCredentialsProviderChain.getInstance(); + return new STSAssumeRoleSessionCredentialsProvider(awsCredentialsProviderChain, + awsAssumeRoleArn, "kafkaconnect"); + } else if (awsAccessKeyID == null || awsSecretKey == null) { LOGGER.debug("Using DefaultAWSCredentialsProviderChain"); return DefaultAWSCredentialsProviderChain.getInstance(); @@ -75,9 +88,10 @@ private static AwsClientBuilder configureBuilder(AwsClientBuilder builder, String awsRegion, String serviceEndpoint, String awsAccessKeyID, - String awsSecretKey) { + String awsSecretKey, + String awsAssumeRoleArn) { - builder.withCredentials(getCredentials(awsAccessKeyID, awsSecretKey)) + builder.withCredentials(getCredentials(awsAccessKeyID, awsSecretKey, awsAssumeRoleArn)) .withClientConfiguration(new ClientConfiguration().withThrottledRetries(true)); if(serviceEndpoint != null && !serviceEndpoint.isEmpty()) { diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java index 27557bb..98026ba 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java @@ -10,6 +10,7 @@ void start(AmazonDynamoDB dynamoDBClient, String tableName, String taskid, String endpoint, + Boolean isSkipSync, BillingMode kclTablebillingMode); void stop(); diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java index f6bd839..71dcfe4 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java @@ -30,7 +30,6 @@ public class KclWorkerImpl implements KclWorker { private final AWSCredentialsProvider awsCredentialsProvider; private final ArrayBlockingQueue eventsQueue; private final ConcurrentHashMap recordProcessorsRegister; - private volatile Thread thread; private volatile Worker worker; @@ -49,13 +48,14 @@ public void start(AmazonDynamoDB dynamoDBClient, String tableName, String taskid, String endpoint, + Boolean isSkipSync, BillingMode kclTableBillingMode) { IRecordProcessorFactory recordProcessorFactory = new KclRecordProcessorFactory(tableName, eventsQueue, recordProcessorsRegister); KinesisClientLibConfiguration clientLibConfiguration = getClientLibConfiguration(tableName, taskid, - dynamoDBClient, endpoint, kclTableBillingMode); + dynamoDBClient, endpoint, isSkipSync, kclTableBillingMode); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoDBStreamsClient); @@ -123,8 +123,15 @@ KinesisClientLibConfiguration getClientLibConfiguration(String tableName, String taskid, AmazonDynamoDB dynamoDBClient, String endpoint, + Boolean isSkipSync, BillingMode kclTableBillingMode) { + InitialPositionInStream initialPosition; + if (isSkipSync) { + initialPosition = InitialPositionInStream.LATEST; + } else { + initialPosition = InitialPositionInStream.TRIM_HORIZON; + } String streamArn = dynamoDBClient.describeTable( new DescribeTableRequest() .withTableName(tableName)).getTable().getLatestStreamArn(); @@ -141,7 +148,7 @@ KinesisClientLibConfiguration getClientLibConfiguration(String tableName, // worker will use checkpoint tableName if available, otherwise it is safer // to start at beginning of the stream - .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) + .withInitialPositionInStream(initialPosition) // we want the maximum batch size to avoid network transfer latency overhead .withMaxRecords(Constants.STREAMS_RECORDS_LIMIT) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index dd9137c..d5285ff 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -3,6 +3,7 @@ import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.dynamodbv2.model.TableDescription; +import com.amazonaws.services.dynamodbv2.document.ItemUtils; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; @@ -45,12 +46,13 @@ public class RecordConverter { public RecordConverter(TableDescription tableDesc, String topicNamePrefix) { this.tableDesc = tableDesc; - this.topic_name = topicNamePrefix + tableDesc.getTableName(); + this.topic_name = topicNamePrefix; valueSchema = SchemaBuilder.struct() .name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope")) .field(Envelope.FieldName.VERSION, Schema.STRING_SCHEMA) .field(Envelope.FieldName.DOCUMENT, DynamoDbJson.schema()) + .field(Envelope.FieldName.OLD_DOCUMENT, DynamoDbJson.schema()) .field(Envelope.FieldName.SOURCE, SourceInfo.structSchema()) .field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA) .field(Envelope.FieldName.TIMESTAMP, Schema.INT64_SCHEMA) @@ -61,6 +63,7 @@ public SourceRecord toSourceRecord( SourceInfo sourceInfo, Envelope.Operation op, Map attributes, + Map oldImage, Instant arrivalTimestamp, String shardId, String sequenceNumber) throws Exception { @@ -74,6 +77,10 @@ public SourceRecord toSourceRecord( LinkedHashMap::new )); + // getUnmarshallItems from Dynamo Document + Map unMarshalledItems = ItemUtils.toSimpleMapValue(attributes); + Map unMarshalledOldItems = ItemUtils.toSimpleMapValue(oldImage); + // Leveraging offsets to store shard and sequence number with each item pushed to Kafka. // This info will only be used to update `shardRegister` and won't be used to reset state after restart Map offsets = SourceInfo.toOffset(sourceInfo); @@ -101,7 +108,8 @@ public SourceRecord toSourceRecord( Struct valueData = new Struct(valueSchema) .put(Envelope.FieldName.VERSION, sourceInfo.version) - .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(sanitisedAttributes)) + .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(unMarshalledItems)) + .put(Envelope.FieldName.OLD_DOCUMENT, objectMapper.writeValueAsString(unMarshalledOldItems)) .put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo)) .put(Envelope.FieldName.OPERATION, op.code()) .put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli()); diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index c132506..3c2efbe 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -29,7 +29,6 @@ @SuppressWarnings("ConstantConditions") public class DynamoDBSourceTaskTests { private final static String tableName = "testTable1"; - private HashMap configs; @BeforeEach @@ -208,6 +207,7 @@ public void kclWorkerIsStartedOnStart() throws InterruptedException { eq(tableName), eq("testTask1"), eq(null), + eq(false), eq(BillingMode.PROVISIONED) ); } @@ -278,7 +278,7 @@ public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedEx assertEquals(1, response.size()); assertEquals("r", ((Struct) response.get(0).value()).getString("op")); - assertEquals("{\"col2\":{\"s\":\"val1\"},\"col3\":{\"n\":\"1\"},\"col1\":{\"s\":\"key1\"}}", ((Struct) response.get(0).value()).getString("document")); + assertEquals(("{\"col2\":\"val1\",\"col3\":1,\"col1\":\"key1\"}"), ((Struct) response.get(0).value()).getString("document")); assertEquals(InitSyncStatus.RUNNING, task.getSourceInfo().initSyncStatus); assertEquals(exclusiveStartKey, task.getSourceInfo().exclusiveStartKey); } @@ -560,8 +560,8 @@ public void onSyncPollReturnsReceivedRecords() throws InterruptedException { // Assert assertEquals(3, response.size()); - assertEquals("{\"col2\":{\"s\":\"val1\"},\"col3\":{\"n\":\"1\"},\"col1\":{\"s\":\"key1\"}}", ((Struct) response.get(0).value()).getString("document")); - assertEquals("{\"col1\":{\"s\":\"key2\"}}", ((Struct) response.get(1).value()).getString("document")); + assertEquals("{\"col2\":\"val1\",\"col3\":1,\"col1\":\"key1\"}", ((Struct) response.get(0).value()).getString("document")); + assertEquals("{\"col1\":\"key2\"}", ((Struct) response.get(1).value()).getString("document")); assertNull(response.get(2).value()); // tombstone } @@ -875,4 +875,197 @@ public void onCommitIgnoreRecordsWithoutSequenceNumber() throws InterruptedExcep assertEquals("", shardRegister.get("shard1").getLastCommittedRecordSeqNo()); } + @Test + public void initSyncIsSkippedWithNoOffsetOnStart() throws InterruptedException { + configs.put("init.sync.skip", "true"); + // Arrange + DynamoDBSourceTask task = new SourceTaskBuilder() + .withOffset(null) + .buildTask(); + + // Act + task.start(configs); + + // Assert + SourceInfo sourceInfo = task.getSourceInfo(); + assertEquals(tableName, sourceInfo.tableName); + assertEquals(InitSyncStatus.SKIPPED, sourceInfo.initSyncStatus); + assertEquals(Instant.parse("1970-01-01T00:00:00Z"), sourceInfo.lastInitSyncStart); + assertEquals(Instant.parse("1970-01-01T00:00:00Z"), sourceInfo.lastInitSyncEnd); + } + + @Test + public void sourceInfoOfSkippedInitSyncIsLoadedFromOffsetOnStart() throws InterruptedException { + configs.put("init.sync.skip", "true"); + // Arrange + HashMap offset = new HashMap<>(); + offset.put("table_name", tableName); + offset.put("init_sync_state", "SKIPPED"); + offset.put("init_sync_start", Instant.parse("1970-01-01T00:00:00Z").toEpochMilli()); + + DynamoDBSourceTask task = new SourceTaskBuilder() + .withOffset(offset) + .buildTask(); + + // Act + task.start(configs); + + // Assert + SourceInfo sourceInfo = task.getSourceInfo(); + assertEquals(tableName, sourceInfo.tableName); + assertEquals(InitSyncStatus.SKIPPED, sourceInfo.initSyncStatus); + assertEquals(Instant.parse("1970-01-01T00:00:00Z"), sourceInfo.lastInitSyncStart); + assertEquals(Instant.parse("1970-01-01T00:00:00Z"), sourceInfo.lastInitSyncEnd); + } + + @Test + public void skippedInitSyncOnSyncPollReturnsReceivedRecords() throws InterruptedException { + // Arrange + configs.put("init.sync.skip", "true"); + + HashMap offset = new HashMap<>(); + offset.put("table_name", tableName); + offset.put("init_sync_state", "SKIPPED"); + offset.put("init_sync_start", Instant.parse("2001-01-01T00:00:00.00Z").toEpochMilli()); + + KclRecordsWrapper dynamoDBRecords = new KclRecordsWrapper("testShardId1", new LinkedList<>()); + + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(tableName); + tableDescription.setKeySchema(Collections.singleton(new KeySchemaElement("col1", "S"))); + + Map row = new HashMap<>(); + row.put("col1", new AttributeValue("key1")); + row.put("col2", new AttributeValue("val1")); + row.put("col3", new AttributeValue().withN("1")); + List> initSyncRecords = Collections.singletonList(row); + + Map exclusiveStartKey = Collections.singletonMap("fake", new AttributeValue("key")); + + dynamoDBRecords.getRecords().add( + getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withS("key1")), + row, Instant.parse("2001-01-01T01:00:00.00Z"), + "1000000001", + "INSERT")); + dynamoDBRecords.getRecords().add( + getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withS("key2")), + null, Instant.parse("2001-01-01T01:00:00.00Z"), + "1000000002", + "REMOVE")); + dynamoDBRecords.getRecords().add( + getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withS("key1")), + row, Instant.parse("2001-01-01T01:00:00.00Z"), + "1000000003", + "INSERT")); + + DynamoDBSourceTask task = new SourceTaskBuilder() + .withOffset(offset) + .withTableDescription(tableDescription) + .withInitSyncRecords(initSyncRecords, exclusiveStartKey) + .withSyncRecords(Collections.singletonList(dynamoDBRecords)) + .buildTask(); + + // Act + task.start(configs); + List response = task.poll(); + + // Assert + assertEquals(4, response.size()); + assertEquals("{\"col2\":\"val1\",\"col3\":1,\"col1\":\"key1\"}", ((Struct) response.get(0).value()).getString("document")); + assertEquals("{\"col1\":\"key2\"}", ((Struct) response.get(1).value()).getString("document")); + assertNull(response.get(2).value()); // tombstone + } + @Test + public void onStartInitSyncSkipIsNotDelayed() throws InterruptedException { + // Arrange + configs.put("init.sync.skip", "true"); + configs.put("init.sync.delay.period", "2"); + + HashMap offset = new HashMap<>(); + offset.put("table_name", tableName); + offset.put("init_sync_state", "SKIPPED"); + offset.put("init_sync_start", Instant.parse("2001-01-01T00:00:00.00Z").toEpochMilli()); + + KclRecordsWrapper dynamoDBRecords = new KclRecordsWrapper("testShardId1", new LinkedList<>()); + + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(tableName); + tableDescription.setKeySchema(Collections.singleton(new KeySchemaElement("col1", "S"))); + + Map row = new HashMap<>(); + row.put("col1", new AttributeValue("key1")); + List> initSyncRecords = Collections.singletonList(row); + + dynamoDBRecords.getRecords().add( + getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withS("key1")), + row, Instant.parse("2001-01-01T01:01:00.00Z"), + "1000000001", + "INSERT")); + + DynamoDBSourceTask task = new SourceTaskBuilder() + .withOffset(offset) + .withClock(Clock.fixed(Instant.parse("2001-01-01T01:00:00.00Z"), ZoneId.of("UTC"))) + .withTableDescription(tableDescription) + .withInitSyncRecords(initSyncRecords, null) + .withSyncRecords(Collections.singletonList(dynamoDBRecords)) + .buildTask(); + + // Act + task.start(configs); + Instant start = Instant.now(); + List response = task.poll(); + Instant stop = Instant.now(); + + // Assert + assertTrue(Duration.between(start, stop).getSeconds() == 0); + assertEquals(0, task.getSourceInfo().initSyncCount); + assertEquals(1, response.size()); + } + + @Test + public void onSyncPollInitSyncSkipReturnsNullAndDoesNotStartInitSyncIfAnyOneRecordEventArrivedTooLate() throws InterruptedException { + // Arrange + configs.put("init.sync.skip", "true"); + + HashMap offset = new HashMap<>(); + offset.put("table_name", tableName); + offset.put("init_sync_state", "SKIPPED"); + offset.put("init_sync_start", Instant.parse("2001-01-01T00:00:00.00Z").toEpochMilli()); + + KclRecordsWrapper dynamoDBRecords = new KclRecordsWrapper("testShardId1", new LinkedList<>()); + + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(tableName); + tableDescription.setKeySchema(Collections.singleton(new KeySchemaElement("col1", "S"))); + + Map row = new HashMap<>(); + row.put("col1", new AttributeValue("key1")); + row.put("col2", new AttributeValue("val1")); + row.put("col3", new AttributeValue().withN("1")); + List> initSyncRecords = Collections.singletonList(row); + + Map exclusiveStartKey = Collections.singletonMap("fake", new AttributeValue("key")); + + dynamoDBRecords.getRecords().add(getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withN("key1")), + row, Instant.parse("2001-01-03T15:00:00.00Z"), "s1", "INSERT")); + dynamoDBRecords.getRecords().add(getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withN("key1")), + row, Instant.parse("2001-01-03T00:00:00.00Z"), "s2", "INSERT")); + + DynamoDBSourceTask task = new SourceTaskBuilder() + .withOffset(offset) + .withClock(Clock.fixed(Instant.parse("2001-01-03T20:00:00.00Z"), ZoneId.of("UTC"))) + .withTableDescription(tableDescription) + .withInitSyncRecords(initSyncRecords, exclusiveStartKey) + .withSyncRecords(Collections.singletonList(dynamoDBRecords)) + .buildTask(); + + // Act + task.start(configs); + List response = task.poll(); + + // Assert + assertEquals(2, response.size()); + assertEquals(0, task.getSourceInfo().initSyncCount); + assertEquals(InitSyncStatus.SKIPPED, task.getSourceInfo().initSyncStatus); + } } diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/KafkaConnectITBase.java b/source/src/test/java/com/trustpilot/connector/dynamodb/KafkaConnectITBase.java index cc0a713..f163528 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/KafkaConnectITBase.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/KafkaConnectITBase.java @@ -48,6 +48,7 @@ public class KafkaConnectITBase { protected static final String AWS_REGION_CONFIG = "eu-west-3"; protected static final String AWS_ACCESS_KEY_ID_CONFIG = "ABCD"; protected static final String AWS_SECRET_KEY_CONFIG = "1234"; + protected static final String AWS_ASSUME_ROLE_ARN_CONFIG = null; protected static final String SRC_DYNAMODB_TABLE_INGESTION_TAG_KEY_CONFIG = "datalake-ingest"; private static Network network; @@ -187,7 +188,8 @@ private AmazonDynamoDB getDynamoDBClient() { AWS_REGION_CONFIG, dynamodb.getEndpoint(), AWS_ACCESS_KEY_ID_CONFIG, - AWS_SECRET_KEY_CONFIG + AWS_SECRET_KEY_CONFIG, + AWS_ASSUME_ROLE_ARN_CONFIG ); } diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/aws/AwsClientsTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/aws/AwsClientsTests.java new file mode 100644 index 0000000..8dcd9b9 --- /dev/null +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/aws/AwsClientsTests.java @@ -0,0 +1,53 @@ +package com.trustpilot.connector.dynamodb.aws; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class AwsClientsTests { + + @Test + public void stsAssumeRoleProviderReturned() { + String testRoleArn = "arn:aws:iam::111111111111:role/unit-test"; + AWSCredentialsProvider provider = AwsClients.getCredentials( + null, + null, + testRoleArn + ); + + DefaultAWSCredentialsProviderChain testChain = Mockito.mock(DefaultAWSCredentialsProviderChain.class); + STSAssumeRoleSessionCredentialsProvider expectedProvider = new STSAssumeRoleSessionCredentialsProvider( + testChain.getInstance(), + testRoleArn, + "kafkaconnect" + ); + assertEquals(provider.getClass(), expectedProvider.getClass()); + } + + @Test + public void defaultProviderReturned() { + AWSCredentialsProvider provider = AwsClients.getCredentials( + null, + null, + null + ); + + assertEquals(provider.getClass(), DefaultAWSCredentialsProviderChain.class); + } + + @Test + public void staticCredentialsReturned() { + AWSCredentialsProvider provider = AwsClients.getCredentials( + "unit-test", + "unit-test", + null + ); + + assertEquals(provider.getClass(), AWSStaticCredentialsProvider.class); + } +} diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java index 12b7bdd..e2336b2 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java @@ -35,6 +35,7 @@ void initializationRegistersNewShardToRegistry() { String tableName = "testTableName1"; String taskId = "task1"; String serviceEndpoint = "http://localhost:8000"; + Boolean isSyncSkip = false; BillingMode kclTableBillingMode = BillingMode.PROVISIONED; AmazonDynamoDB dynamoDBClient = Mockito.mock(AmazonDynamoDB.class); @@ -43,7 +44,7 @@ void initializationRegistersNewShardToRegistry() { when(dynamoDBClient.describeTable(ArgumentMatchers.any())).thenReturn(result); // Act - KinesisClientLibConfiguration clientLibConfiguration = kclWorker.getClientLibConfiguration(tableName, taskId, dynamoDBClient, serviceEndpoint, kclTableBillingMode); + KinesisClientLibConfiguration clientLibConfiguration = kclWorker.getClientLibConfiguration(tableName, taskId, dynamoDBClient, serviceEndpoint, isSyncSkip, kclTableBillingMode); // Assert assertEquals("datalake-KCL-testTableName1", clientLibConfiguration.getApplicationName()); diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java index 3dd2ab9..8ecfc85 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java @@ -82,13 +82,14 @@ public void correctTopicNameIsConstructed() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" ); // Assert - assertEquals("TestTopicPrefix-TestTable1", record.topic()); + assertEquals("TestTopicPrefix-", record.topic()); } @Test @@ -101,6 +102,7 @@ public void sourceInfoIsPutToOffset() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -120,6 +122,7 @@ public void shardIdAndSequenceNumberIsPutToOffset() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -143,6 +146,7 @@ public void singleItemKeyIsAddedToRecord() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -168,6 +172,7 @@ public void multiItemKeyIsAddedToRecord() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -193,13 +198,14 @@ public void recordAttributesAreAddedToValueData() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" ); // Assert - assertEquals("{\"testKV1\":{\"s\":\"testKV1Value\"},\"testKV2\":{\"s\":\"2\"},\"testV2\":{\"s\":\"testStringValue\"},\"testV1\":{\"n\":\"1\"}}", + assertEquals("{\"testKV1\":\"testKV1Value\",\"testKV2\":\"2\",\"testV2\":\"testStringValue\",\"testV1\":1}", ((Struct) record.value()).getString("document")); } @@ -216,6 +222,7 @@ public void singleItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throw getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributesWithInvalidAvroCharacters(), + getAttributesWithInvalidAvroCharacters(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -241,6 +248,7 @@ public void multiItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throws getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributesWithInvalidAvroCharacters(), + getAttributesWithInvalidAvroCharacters(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -266,12 +274,13 @@ public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidChar getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributesWithInvalidAvroCharacters(), + getAttributesWithInvalidAvroCharacters(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" ); - String expected = "{\"test1234\":{\"s\":\"testKV1Value\"},\"_starts_with_underscore\":{\"n\":\"1\"},\"startswithnumber\":{\"s\":\"2\"},\"test\":{\"s\":\"testStringValue\"}}"; + String expected = "{\"test-1234\":\"testKV1Value\",\"_starts_with_underscore\":1,\"1-starts-with-number\":\"2\",\"test!@£$%^\":\"testStringValue\"}"; // Assert assertEquals(expected, @@ -288,6 +297,7 @@ public void sourceInfoIsAddedToValueData() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -310,6 +320,7 @@ public void operationIsAddedToValueData() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1" @@ -329,6 +340,7 @@ public void arrivalTimestampIsAddedToValueData() throws Exception { getSourceInfo(table), Envelope.Operation.forCode("r"), getAttributes(), + getAttributes(), Instant.parse("2001-01-02T00:00:00.00Z"), "testShardID1", "testSequenceNumberID1"