Skip to content

Commit 250d982

Browse files
committed
CDAP-17583 Use instance id for app name and store in-memory offset under the appname key.
1 parent 3d052b3 commit 250d982

File tree

4 files changed

+20
-13
lines changed

4 files changed

+20
-13
lines changed

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlConstantOffsetBackingStore.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,9 @@ public class MySqlConstantOffsetBackingStore extends MemoryOffsetBackingStore {
4242
static final String ROW = "row";
4343
static final String EVENT = "event";
4444
static final String GTID_SET = "gtids";
45+
static final String REPLICATION_CONNECTOR_NAME = "replication.connector.name";
4546

4647
private static final Gson GSON = new Gson();
47-
// The key is hardcoded here
48-
private static final ByteBuffer KEY =
49-
StandardCharsets.UTF_8.encode("{\"schema\":null,\"payload\":[\"delta\",{\"server\":\"dummy\"}]}");
5048

5149
@Override
5250
public void configure(WorkerConfig config) {
@@ -57,7 +55,12 @@ public void configure(WorkerConfig config) {
5755
String rowStr = originalConfig.get(ROW);
5856
String eventStr = originalConfig.get(EVENT);
5957
String gtidSetStr = originalConfig.get(GTID_SET);
58+
String replicationConnectorName = originalConfig.get(REPLICATION_CONNECTOR_NAME);
6059

60+
ByteBuffer key =
61+
StandardCharsets.UTF_8.encode("{\"schema\":null,\"payload\":[\""
62+
+ replicationConnectorName
63+
+ "\",{\"server\":\"dummy\"}]}");
6164
Map<String, Object> offset = new HashMap<>();
6265
if (!Strings.isNullOrEmpty(fileStr)) {
6366
offset.put(FILE, fileStr);
@@ -82,6 +85,6 @@ public void configure(WorkerConfig config) {
8285
return;
8386
}
8487

85-
data.put(KEY, StandardCharsets.UTF_8.encode(GSON.toJson(offset)));
88+
data.put(key, StandardCharsets.UTF_8.encode(GSON.toJson(offset)));
8689
}
8790
}

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import java.time.temporal.Temporal;
5050
import java.util.Map;
5151
import java.util.Set;
52-
import java.util.UUID;
5352
import java.util.concurrent.ExecutorService;
5453
import java.util.concurrent.Executors;
5554
import java.util.concurrent.TimeUnit;
@@ -99,6 +98,7 @@ public void start(Offset offset) {
9998
Collectors.toMap(t -> config.getDatabase() + "." + t.getTable(), t -> t));
10099
Map<String, String> state = offset.get(); // state map is always not null
101100
String isSnapshot = state.getOrDefault(MySqlConstantOffsetBackingStore.SNAPSHOT, "");
101+
String replicationConnectorName = "delta" + context.getInstanceId();
102102
Configuration.Builder configBuilder = Configuration.create()
103103
.with("connector.class", MySqlConnector.class.getName())
104104
.with("offset.storage", MySqlConstantOffsetBackingStore.class.getName())
@@ -111,7 +111,7 @@ public void start(Offset offset) {
111111
.with("event", state.getOrDefault(MySqlConstantOffsetBackingStore.EVENT, ""))
112112
.with("gtids", state.getOrDefault(MySqlConstantOffsetBackingStore.GTID_SET, ""))
113113
/* begin connector properties */
114-
.with("name", "delta" + UUID.randomUUID().toString().replace("-", ""))
114+
.with("name", replicationConnectorName)
115115
.with("database.hostname", config.getHost())
116116
.with("database.port", config.getPort())
117117
.with("database.user", config.getUser())
@@ -121,7 +121,8 @@ public void start(Offset offset) {
121121
.with("database.server.name", "dummy") // this is the kafka topic for hosted debezium - it doesn't matter
122122
.with("database.serverTimezone", config.getServerTimezone())
123123
.with("table.whitelist", String.join(",", sourceTableMap.keySet()))
124-
.with("snapshot.mode", config.getReplicateExistingData() ? "initial" : "schema_only");
124+
.with("snapshot.mode", config.getReplicateExistingData() ? "initial" : "schema_only")
125+
.with(MySqlConstantOffsetBackingStore.REPLICATION_CONNECTOR_NAME, replicationConnectorName);;
125126

126127
if (config.getConsumerID() != null) {
127128
// If not provided debezium will randomly pick integer between 5400 and 6400.

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerConstantOffsetBackingStore.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@
3333
*/
3434
public class SqlServerConstantOffsetBackingStore extends MemoryOffsetBackingStore {
3535
private static final Gson GSON = new Gson();
36-
private static final String KEY = "{\"schema\":null,\"payload\":[\"delta\",{\"server\":\"dummy\"}]}";
36+
3737
static final String SNAPSHOT_COMPLETED = "snapshot_completed";
38+
static final String REPLICATION_CONNECTOR_NAME = "replication.connector.name";
3839

3940
@Override
4041
public void configure(WorkerConfig config) {
@@ -45,7 +46,8 @@ public void configure(WorkerConfig config) {
4546
String commitStr = originalConfig.get(SourceInfo.COMMIT_LSN_KEY);
4647
String snapshot = originalConfig.get(SourceInfo.SNAPSHOT_KEY);
4748
String snapshotCompleted = originalConfig.get(SNAPSHOT_COMPLETED);
48-
49+
String replicationConnectorName = originalConfig.get(REPLICATION_CONNECTOR_NAME);
50+
String key = "{\"schema\":null,\"payload\":[\"" + replicationConnectorName + "\",{\"server\":\"dummy\"}]}";
4951
Map<String, Object> offset = new HashMap<>();
5052
if (!changeStr.isEmpty()) {
5153
offset.put(SourceInfo.CHANGE_LSN_KEY, changeStr);
@@ -67,6 +69,6 @@ public void configure(WorkerConfig config) {
6769
}
6870
byte[] offsetBytes = Bytes.toBytes(GSON.toJson(offset));
6971

70-
data.put(ByteBuffer.wrap(Bytes.toBytes(KEY)), ByteBuffer.wrap(offsetBytes));
72+
data.put(ByteBuffer.wrap(Bytes.toBytes(key)), ByteBuffer.wrap(offsetBytes));
7173
}
7274
}

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerEventReader.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.util.HashSet;
4444
import java.util.Map;
4545
import java.util.Set;
46-
import java.util.UUID;
4746
import java.util.concurrent.ExecutorService;
4847
import java.util.concurrent.Executors;
4948
import java.util.concurrent.TimeUnit;
@@ -107,6 +106,7 @@ public void start(Offset offset) {
107106
Map<String, String> state = offset.get(); // this will never be null
108107
// offset config
109108
String isSnapshotCompleted = state.getOrDefault(SqlServerConstantOffsetBackingStore.SNAPSHOT_COMPLETED, "");
109+
String replicationConnectorName = "delta" + context.getInstanceId();
110110
Configuration.Builder configBuilder = Configuration.create()
111111
.with("connector.class", SqlServerConnector.class.getName())
112112
.with("offset.storage", SqlServerConstantOffsetBackingStore.class.getName())
@@ -117,7 +117,7 @@ public void start(Offset offset) {
117117
.with("snapshot", state.getOrDefault(SourceInfo.SNAPSHOT_KEY, ""))
118118
.with("snapshot_completed", isSnapshotCompleted)
119119
/* begin connector properties */
120-
.with("name", "delta" + UUID.randomUUID().toString().replace("-", ""))
120+
.with("name", replicationConnectorName)
121121
.with("database.hostname", config.getHost())
122122
.with("database.port", config.getPort())
123123
.with("database.user", config.getUser())
@@ -127,7 +127,8 @@ public void start(Offset offset) {
127127
.with("table.whitelist", String.join(",", sourceTableMap.keySet()))
128128
.with("database.server.name", "dummy") // this is the kafka topic for hosted debezium - it doesn't matter
129129
.with("database.serverTimezone", config.getServerTimezone())
130-
.with("snapshot.mode", config.getReplicateExistingData() ? "initial" : "schema_only");
130+
.with("snapshot.mode", config.getReplicateExistingData() ? "initial" : "schema_only")
131+
.with(SqlServerConstantOffsetBackingStore.REPLICATION_CONNECTOR_NAME, replicationConnectorName);
131132

132133
LOG.info("Overriding sql server connector configs with arguments {}", debeziumConnectorConfigs);
133134
for (Map.Entry<String, String> entry: debeziumConnectorConfigs.entrySet()) {

0 commit comments

Comments
 (0)