Skip to content

Commit f8ef9a2

Browse files
author
Andrei Nadyktov
committed
IGNITE-22530 CDC: Add regex filters for cache names
1 parent 83c9c60 commit f8ef9a2

File tree

12 files changed

+31
-20
lines changed

12 files changed

+31
-20
lines changed

modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.File;
2121
import java.io.Serializable;
22+
import java.nio.file.Path;
2223
import java.util.Arrays;
2324
import java.util.List;
2425
import java.util.concurrent.CountDownLatch;
@@ -195,7 +196,7 @@ public void testDeleteLostSegmentLinksApplicationNotClosed() throws Exception {
195196
CdcConfiguration cfg = new CdcConfiguration();
196197

197198
cfg.setConsumer(new UserCdcConsumer() {
198-
@Override public void start(MetricRegistry mreg) {
199+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
199200
appStarted.countDown();
200201
}
201202
});

modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.util;
1919

20+
import java.nio.file.Path;
2021
import java.util.ArrayList;
2122
import java.util.HashMap;
2223
import java.util.Iterator;
@@ -217,7 +218,7 @@ synchronized List<CdcEvent> events() {
217218
}
218219

219220
/** {@inheritDoc} */
220-
@Override public void start(MetricRegistry mreg) {
221+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
221222
// No-op
222223
}
223224

modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.cdc;
1919

20+
import java.nio.file.Path;
2021
import java.util.Iterator;
2122
import org.apache.ignite.Ignite;
2223
import org.apache.ignite.IgniteBinary;
@@ -33,7 +34,7 @@
3334
* This consumer will receive data change events during ignite-cdc process invocation.
3435
* The lifecycle of the consumer is the following:
3536
* <ul>
36-
* <li>Start of the consumer {@link #start(MetricRegistry)}.</li>
37+
* <li>Start of the consumer {@link #start(MetricRegistry, Path)}.</li>
3738
* <li>Notification of the consumer by the {@link #onEvents(Iterator)} call.</li>
3839
* <li>Stop of the consumer {@link #stop()}.</li>
3940
* </ul>
@@ -66,8 +67,9 @@ public interface CdcConsumer {
6667
/**
6768
* Starts the consumer.
6869
* @param mreg Metric registry for consumer specific metrics.
70+
* @param cdcDir Path to Change Data Capture Directory.
6971
*/
70-
public void start(MetricRegistry mreg);
72+
public void start(MetricRegistry mreg, Path cdcDir);
7173

7274
/**
7375
* Handles entry changes events.
@@ -131,7 +133,7 @@ public interface CdcConsumer {
131133

132134
/**
133135
* Stops the consumer.
134-
* This method can be invoked only after {@link #start(MetricRegistry)}.
136+
* This method can be invoked only after {@link #start(MetricRegistry, Path)}.
135137
*/
136138
public void stop();
137139

modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ public void runX() throws Exception {
343343
committedSegmentOffset.value(walState.get1().fileOffset());
344344
}
345345

346-
consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")));
346+
consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")), cdcDir);
347347

348348
started = true;
349349

modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.internal.cdc;
1919

20+
import java.nio.file.Path;
2021
import java.util.EnumSet;
2122
import java.util.Iterator;
2223
import java.util.NoSuchElementException;
@@ -188,8 +189,8 @@ public void onCacheDestroyEvents(Iterator<Integer> caches) {
188189
* @param cdcConsumerReg CDC consumer metric registry.
189190
* @throws IgniteCheckedException If failed.
190191
*/
191-
public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg) throws IgniteCheckedException {
192-
consumer.start(cdcConsumerReg);
192+
public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg, Path cdcDir) throws IgniteCheckedException {
193+
consumer.start(cdcConsumerReg, cdcDir);
193194

194195
evtsCnt = cdcReg.longMetric(EVTS_CNT, "Count of events processed by the consumer");
195196
lastEvtTs = cdcReg.longMetric(LAST_EVT_TIME, "Time of the last event process");
@@ -200,7 +201,7 @@ public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg)
200201

201202
/**
202203
* Stops the consumer.
203-
* This methods can be invoked only after {@link #start(MetricRegistryImpl, MetricRegistryImpl)}.
204+
* This methods can be invoked only after {@link #start(MetricRegistryImpl, MetricRegistryImpl, Path)}.
204205
*/
205206
public void stop() {
206207
consumer.stop();

modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ public abstract static class TestCdcConsumer<T> implements CdcConsumer {
291291
private volatile boolean stopped;
292292

293293
/** {@inheritDoc} */
294-
@Override public void start(MetricRegistry mreg) {
294+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
295295
stopped = false;
296296
}
297297

@@ -445,7 +445,7 @@ public static class TrackCacheEventsConsumer implements CdcConsumer {
445445
}
446446

447447
/** {@inheritDoc} */
448-
@Override public void start(MetricRegistry mreg) {
448+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
449449
// No-op.
450450
}
451451

modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.ignite.cdc;
1919

2020
import java.io.File;
21+
import java.nio.file.Path;
2122
import java.util.concurrent.CountDownLatch;
2223
import org.apache.ignite.IgniteCheckedException;
2324
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -92,8 +93,8 @@ public void testCdcStartWithNonDefaultWorkDir() throws Exception {
9293
CdcConfiguration cdcCfg = new CdcConfiguration();
9394

9495
cdcCfg.setConsumer(new AbstractCdcTest.UserCdcConsumer() {
95-
@Override public void start(MetricRegistry mreg) {
96-
super.start(mreg);
96+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
97+
super.start(mreg, cdcDir);
9798

9899
started.countDown();
99100
}

modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.cdc;
1919

20+
import java.nio.file.Path;
2021
import java.util.Collections;
2122
import java.util.concurrent.CountDownLatch;
2223
import java.util.concurrent.TimeUnit;
@@ -197,10 +198,10 @@ public TestIgniteToIgniteConsumer(IgniteConfiguration destClusterCliCfg) {
197198
}
198199

199200
/** {@inheritDoc} */
200-
@Override public void start(MetricRegistry mreg) {
201+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
201202
Ignite ignite = Ignition.start(destClusterCliCfg);
202203

203-
super.start(mreg);
204+
super.start(mreg, cdcDir);
204205

205206
ignite.log().info("TestIgniteToIgniteConsumer started.");
206207
}

modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.File;
2121
import java.io.Serializable;
22+
import java.nio.file.Path;
2223
import java.util.ArrayList;
2324
import java.util.Arrays;
2425
import java.util.Collection;
@@ -418,7 +419,7 @@ public void testReadOneByOneForBackup() throws Exception {
418419
// No-op.
419420
}
420421

421-
@Override public void start(MetricRegistry mreg) {
422+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
422423
// No-op.
423424
}
424425
};
@@ -512,7 +513,7 @@ public void testReadFromNextEntry() throws Exception {
512513
// No-op.
513514
}
514515

515-
@Override public void start(MetricRegistry mreg) {
516+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
516517
// No-op.
517518
}
518519
}, cfg));

modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.internal.ducktest.tests.cdc;
1919

20+
import java.nio.file.Path;
2021
import java.util.Iterator;
2122
import java.util.concurrent.atomic.AtomicLong;
2223
import org.apache.ignite.IgniteLogger;
@@ -42,7 +43,7 @@ public class CountingCdcConsumer implements CdcConsumer {
4243
private final AtomicLong objectsConsumed = new AtomicLong();
4344

4445
/** {@inheritDoc} */
45-
@Override public void start(MetricRegistry mreg) {
46+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
4647
log.info("CountingCdcConsumer started");
4748
}
4849

0 commit comments

Comments
 (0)