Skip to content

Commit a9ca469

Browse files
author
Andrei Nadyktov
committed
IGNITE-22530 Add CdcConsumerEx interface
1 parent 149446f commit a9ca469

File tree

13 files changed

+57
-28
lines changed

13 files changed

+57
-28
lines changed

modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.io.File;
2121
import java.io.Serializable;
22-
import java.nio.file.Path;
2322
import java.util.Arrays;
2423
import java.util.List;
2524
import java.util.concurrent.CountDownLatch;
@@ -196,7 +195,7 @@ public void testDeleteLostSegmentLinksApplicationNotClosed() throws Exception {
196195
CdcConfiguration cfg = new CdcConfiguration();
197196

198197
cfg.setConsumer(new UserCdcConsumer() {
199-
@Override public void start(MetricRegistry mreg, Path cdcDir) {
198+
@Override public void start(MetricRegistry mreg) {
200199
appStarted.countDown();
201200
}
202201
});

modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.ignite.util;
1919

20-
import java.nio.file.Path;
2120
import java.util.ArrayList;
2221
import java.util.HashMap;
2322
import java.util.Iterator;
@@ -218,7 +217,7 @@ synchronized List<CdcEvent> events() {
218217
}
219218

220219
/** {@inheritDoc} */
221-
@Override public void start(MetricRegistry mreg, Path cdcDir) {
220+
@Override public void start(MetricRegistry mreg) {
222221
// No-op
223222
}
224223

modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.ignite.cdc;
1919

20-
import java.nio.file.Path;
2120
import java.util.Iterator;
2221
import org.apache.ignite.Ignite;
2322
import org.apache.ignite.IgniteBinary;
@@ -34,7 +33,7 @@
3433
* This consumer will receive data change events during ignite-cdc process invocation.
3534
* The lifecycle of the consumer is the following:
3635
* <ul>
37-
* <li>Start of the consumer {@link #start(MetricRegistry, Path)}.</li>
36+
* <li>Start of the consumer {@link #start(MetricRegistry)}.</li>
3837
* <li>Notification of the consumer by the {@link #onEvents(Iterator)} call.</li>
3938
* <li>Stop of the consumer {@link #stop()}.</li>
4039
* </ul>
@@ -67,9 +66,8 @@ public interface CdcConsumer {
6766
/**
6867
* Starts the consumer.
6968
* @param mreg Metric registry for consumer specific metrics.
70-
* @param cdcDir Path to Change Data Capture Directory.
7169
*/
72-
public void start(MetricRegistry mreg, Path cdcDir);
70+
public void start(MetricRegistry mreg);
7371

7472
/**
7573
* Handles entry changes events.
@@ -133,7 +131,7 @@ public interface CdcConsumer {
133131

134132
/**
135133
* Stops the consumer.
136-
* This method can be invoked only after {@link #start(MetricRegistry, Path)}.
134+
* This method can be invoked only after {@link #start(MetricRegistry)}.
137135
*/
138136
public void stop();
139137

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.cdc;
19+
20+
import java.nio.file.Path;
21+
import org.apache.ignite.metric.MetricRegistry;
22+
23+
/**
24+
* Extended CdcConsumer interface which provides overloaded {@link CdcConsumerEx#start(MetricRegistry, Path)} method
25+
* required for CDC regex filters.
26+
*/
27+
public interface CdcConsumerEx extends CdcConsumer {
28+
/**
29+
* Starts the consumer.
30+
* @param mreg Metric registry for consumer specific metrics.
31+
* @param cdcDir Path to Change Data Capture Directory.
32+
*/
33+
void start(MetricRegistry mreg, Path cdcDir);
34+
}

modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ public void runX() throws Exception {
340340
committedSegmentOffset.value(walState.get1().fileOffset());
341341
}
342342

343-
consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")), cdcDir);
343+
consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")), ft.walCdc().toPath());
344344

345345
started = true;
346346

modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.ignite.binary.BinaryType;
2828
import org.apache.ignite.cdc.CdcCacheEvent;
2929
import org.apache.ignite.cdc.CdcConsumer;
30+
import org.apache.ignite.cdc.CdcConsumerEx;
3031
import org.apache.ignite.cdc.CdcEvent;
3132
import org.apache.ignite.cdc.TypeMapping;
3233
import org.apache.ignite.internal.pagemem.wal.WALIterator;
@@ -187,10 +188,14 @@ public void onCacheDestroyEvents(Iterator<Integer> caches) {
187188
*
188189
* @param cdcReg CDC metric registry.
189190
* @param cdcConsumerReg CDC consumer metric registry.
191+
* @param cdcDir Path to Change Data Capture Directory.
190192
* @throws IgniteCheckedException If failed.
191193
*/
192194
public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg, Path cdcDir) throws IgniteCheckedException {
193-
consumer.start(cdcConsumerReg, cdcDir);
195+
if (consumer instanceof CdcConsumerEx)
196+
((CdcConsumerEx) consumer).start(cdcConsumerReg, cdcDir);
197+
else
198+
consumer.start(cdcConsumerReg);
194199

195200
evtsCnt = cdcReg.longMetric(EVTS_CNT, "Count of events processed by the consumer");
196201
lastEvtTs = cdcReg.longMetric(LAST_EVT_TIME, "Time of the last event process");

modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ public abstract static class TestCdcConsumer<T> implements CdcConsumer {
291291
private volatile boolean stopped;
292292

293293
/** {@inheritDoc} */
294-
@Override public void start(MetricRegistry mreg, Path cdcDir) {
294+
@Override public void start(MetricRegistry mreg) {
295295
stopped = false;
296296
}
297297

@@ -445,7 +445,7 @@ public static class TrackCacheEventsConsumer implements CdcConsumer {
445445
}
446446

447447
/** {@inheritDoc} */
448-
@Override public void start(MetricRegistry mreg, Path cdcDir) {
448+
@Override public void start(MetricRegistry mreg) {
449449
// No-op.
450450
}
451451

modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.ignite.cdc;
1919

2020
import java.io.File;
21-
import java.nio.file.Path;
2221
import java.util.concurrent.CountDownLatch;
2322
import org.apache.ignite.IgniteCheckedException;
2423
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -93,8 +92,8 @@ public void testCdcStartWithNonDefaultWorkDir() throws Exception {
9392
CdcConfiguration cdcCfg = new CdcConfiguration();
9493

9594
cdcCfg.setConsumer(new AbstractCdcTest.UserCdcConsumer() {
96-
@Override public void start(MetricRegistry mreg, Path cdcDir) {
97-
super.start(mreg, cdcDir);
95+
@Override public void start(MetricRegistry mreg) {
96+
super.start(mreg);
9897

9998
started.countDown();
10099
}

modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.ignite.cdc;
1919

20-
import java.nio.file.Path;
2120
import java.util.Collections;
2221
import java.util.concurrent.CountDownLatch;
2322
import java.util.concurrent.TimeUnit;
@@ -198,10 +197,10 @@ public TestIgniteToIgniteConsumer(IgniteConfiguration destClusterCliCfg) {
198197
}
199198

200199
/** {@inheritDoc} */
201-
@Override public void start(MetricRegistry mreg, Path cdcDir) {
200+
@Override public void start(MetricRegistry mreg) {
202201
Ignite ignite = Ignition.start(destClusterCliCfg);
203202

204-
super.start(mreg, cdcDir);
203+
super.start(mreg);
205204

206205
ignite.log().info("TestIgniteToIgniteConsumer started.");
207206
}

modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.io.File;
2121
import java.io.Serializable;
22-
import java.nio.file.Path;
2322
import java.util.ArrayList;
2423
import java.util.Arrays;
2524
import java.util.Collection;
@@ -419,7 +418,7 @@ public void testReadOneByOneForBackup() throws Exception {
419418
// No-op.
420419
}
421420

422-
@Override public void start(MetricRegistry mreg, Path cdcDir) {
421+
@Override public void start(MetricRegistry mreg) {
423422
// No-op.
424423
}
425424
};
@@ -513,7 +512,7 @@ public void testReadFromNextEntry() throws Exception {
513512
// No-op.
514513
}
515514

516-
@Override public void start(MetricRegistry mreg, Path cdcDir) {
515+
@Override public void start(MetricRegistry mreg) {
517516
// No-op.
518517
}
519518
}, cfg));

0 commit comments

Comments
 (0)