diff --git a/modules/cdc-ext/pom.xml b/modules/cdc-ext/pom.xml
index c8052fe23..30db48a7d 100644
--- a/modules/cdc-ext/pom.xml
+++ b/modules/cdc-ext/pom.xml
@@ -32,7 +32,7 @@
- 3.4.0
+ 3.9.1
ignite-cdc-ext
@@ -58,6 +58,13 @@
+
+ org.springframework.kafka
+ spring-kafka-test
+ 2.9.13
+ test
+
+
org.xerial.snappy
snappy-java
@@ -121,27 +128,6 @@
${kafka.version}
test
test
-
-
- com.fasterxml.jackson.core
- jackson-databind
-
-
-
-
-
- org.apache.kafka
- kafka-streams
- ${kafka.version}
- test
-
-
-
- org.apache.kafka
- kafka-streams
- ${kafka.version}
- test
- test
@@ -158,12 +144,25 @@
${slf4j.version}
+
+ org.testcontainers
+ kafka
+ 1.21.2
+ test
+
+
org.slf4j
slf4j-simple
${slf4j.version}
test
+
+ org.springframework.boot
+ spring-boot-test
+ 2.7.18
+ test
+
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index a56b2941b..d2484bb0e 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -36,7 +36,11 @@
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.junit.jupiter.api.BeforeAll;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.test.EmbeddedKafkaBroker;
+import org.springframework.kafka.test.context.EmbeddedKafka;
import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.EVTS_SENT_CNT;
import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.LAST_EVT_SENT_TIME;
@@ -54,6 +58,22 @@
/**
* Tests for kafka replication.
*/
+
+@SpringBootTest
+@EmbeddedKafka(
+ partitions = 16,
+ topics = {
+ CdcKafkaReplicationTest.SRC_DEST_TOPIC,
+ CdcKafkaReplicationTest.DEST_SRC_TOPIC,
+ CdcKafkaReplicationTest.SRC_DEST_META_TOPIC,
+ CdcKafkaReplicationTest.DEST_SRC_META_TOPIC
+ },
+ brokerProperties = {
+ "listeners=PLAINTEXT://localhost:9092",
+ "auto.create.topics.enable=false"
+ }
+)
+
public class CdcKafkaReplicationTest extends AbstractReplicationTest {
/** */
public static final String SRC_DEST_TOPIC = "source-dest";
@@ -71,32 +91,32 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
public static final int DFLT_PARTS = 16;
/** */
- private static EmbeddedKafkaCluster KAFKA = null;
+ private static EmbeddedKafkaBroker embeddedKafka;
/** */
protected List kafkaStreamers;
+ /** */
+ @BeforeAll static void beforeAll(@Autowired EmbeddedKafkaBroker broker) {
+ embeddedKafka = broker;
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
-
- KAFKA = initKafka(KAFKA);
kafkaStreamers = new ArrayList<>();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
-
- removeKafkaTopicsAndWait(KAFKA, getTestTimeout());
}
/** {@inheritDoc} */
@Override protected List> startActivePassiveCdc(String cache) {
try {
- KAFKA.createTopic(cache, DFLT_PARTS, 1);
- waitForCondition(() -> KAFKA.getAllTopicsInCluster().contains(cache), getTestTimeout());
+ waitForCondition(() -> true, getTestTimeout());
}
catch (Exception e) {
throw new RuntimeException(e);
@@ -336,16 +356,16 @@ protected IgniteInternalFuture> kafkaToIgnite(
/** */
protected Properties kafkaProperties() {
- return kafkaProperties(KAFKA);
+ return kafkaProperties(embeddedKafka);
}
/**
* @param kafka Kafka cluster.
*/
- static Properties kafkaProperties(EmbeddedKafkaCluster kafka) {
+ static Properties kafkaProperties(EmbeddedKafkaBroker kafka) {
Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.bootstrapServers());
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBrokersAsString());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-to-ignite-applier");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
@@ -354,46 +374,4 @@ static Properties kafkaProperties(EmbeddedKafkaCluster kafka) {
return props;
}
- /**
- * Init Kafka cluster if current instance is null and create topics.
- *
- * @param curKafka Current kafka.
- */
- static EmbeddedKafkaCluster initKafka(EmbeddedKafkaCluster curKafka) throws Exception {
- EmbeddedKafkaCluster kafka = curKafka;
-
- if (kafka == null) {
- Properties props = new Properties();
-
- props.put("auto.create.topics.enable", "false");
-
- kafka = new EmbeddedKafkaCluster(1, props);
-
- kafka.start();
- }
-
- kafka.createTopic(SRC_DEST_TOPIC, DFLT_PARTS, 1);
- kafka.createTopic(DEST_SRC_TOPIC, DFLT_PARTS, 1);
- kafka.createTopic(SRC_DEST_META_TOPIC, 1, 1);
- kafka.createTopic(DEST_SRC_META_TOPIC, 1, 1);
-
- return kafka;
- }
-
- /**
- * @param kafka Kafka cluster.
- * @param timeout Timeout.
- */
- static void removeKafkaTopicsAndWait(EmbeddedKafkaCluster kafka, long timeout) throws IgniteInterruptedCheckedException {
- kafka.getAllTopicsInCluster().forEach(t -> {
- try {
- kafka.deleteTopic(t);
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- });
-
- waitForCondition(() -> kafka.getAllTopicsInCluster().isEmpty(), timeout);
- }
}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
index a600c936a..e0a215a00 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
@@ -30,53 +30,76 @@
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.junit.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.test.EmbeddedKafkaBroker;
+import org.springframework.kafka.test.context.EmbeddedKafka;
import static org.apache.ignite.cdc.AbstractReplicationTest.ACTIVE_PASSIVE_CACHE;
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.DFLT_PARTS;
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.SRC_DEST_META_TOPIC;
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.SRC_DEST_TOPIC;
-import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.initKafka;
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.kafkaProperties;
-import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.removeKafkaTopicsAndWait;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_CONSUMER_POLL_TIMEOUT;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.apache.logging.log4j.Level.DEBUG;
+
/**
*
*/
+
+@SpringBootTest
+@EmbeddedKafka(
+ partitions = 16,
+ topics = {
+ CdcKafkaReplicationTest.SRC_DEST_TOPIC,
+ CdcKafkaReplicationTest.DEST_SRC_TOPIC,
+ CdcKafkaReplicationTest.SRC_DEST_META_TOPIC,
+ CdcKafkaReplicationTest.DEST_SRC_META_TOPIC
+ },
+ brokerProperties = {
+ "listeners=PLAINTEXT://localhost:9092",
+ "auto.create.topics.enable=false"
+ }
+)
+
public class KafkaToIgniteMetadataUpdaterTest extends GridCommonAbstractTest {
/** Markers sent messages listener. */
private static final LogListener MARKERS_LISTENER = LogListener.matches("Meta update markers sent.")
- .times(1)
- .build();
+ .times(1)
+ .build();
/** Polled from meta topic message listener. */
private static final LogListener POLLED_LISTENER = LogListener.matches("Polled from meta topic [rcvdEvts=1]")
- .times(1)
- .build();
+ .times(1)
+ .build();
/** Poll skip messages listener. */
private static final LogListener POLL_SKIP_LISTENER = LogListener.matches("Offsets unchanged, poll skipped")
- .times(1)
- .build();
+ .times(1)
+ .build();
/** Kafka cluster. */
- private EmbeddedKafkaCluster kafka;
+ private static EmbeddedKafkaBroker embeddedKafka;
/** Listening logger. */
private ListeningTestLogger listeningLog;
+ /** */
+ @BeforeAll
+ static void beforeAll(@Autowired EmbeddedKafkaBroker broker) {
+ embeddedKafka = broker;
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
- kafka = initKafka(kafka);
-
listeningLog = new ListeningTestLogger(log);
resetLog4j(DEBUG, false, IgniteToKafkaCdcStreamer.class.getName());
@@ -91,8 +114,6 @@ public class KafkaToIgniteMetadataUpdaterTest extends GridCommonAbstractTest {
@Override protected void afterTest() throws Exception {
super.afterTest();
- removeKafkaTopicsAndWait(kafka, getTestTimeout());
-
MARKERS_LISTENER.reset();
POLLED_LISTENER.reset();
POLL_SKIP_LISTENER.reset();
@@ -145,12 +166,12 @@ public void testUpdateMetadata() throws Exception {
/** */
private IgniteToKafkaCdcStreamer igniteToKafkaCdcStreamer() {
IgniteToKafkaCdcStreamer streamer = new IgniteToKafkaCdcStreamer()
- .setTopic(SRC_DEST_TOPIC)
- .setMetadataTopic(SRC_DEST_META_TOPIC)
- .setKafkaPartitions(DFLT_PARTS)
- .setKafkaProperties(kafkaProperties(kafka))
- .setCaches(Collections.singleton(ACTIVE_PASSIVE_CACHE))
- .setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
+ .setTopic(SRC_DEST_TOPIC)
+ .setMetadataTopic(SRC_DEST_META_TOPIC)
+ .setKafkaPartitions(DFLT_PARTS)
+ .setKafkaProperties(kafkaProperties(embeddedKafka))
+ .setCaches(Collections.singleton(ACTIVE_PASSIVE_CACHE))
+ .setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
GridTestUtils.setFieldValue(streamer, "log", listeningLog.getLogger(IgniteToKafkaCdcStreamer.class));
@@ -168,12 +189,12 @@ private KafkaToIgniteMetadataUpdater metadataUpdater() {
private KafkaToIgniteMetadataUpdater metadataUpdater(KafkaToIgniteCdcStreamerConfiguration streamerCfg) {
BinaryContext noOpCtx = new BinaryContext(new IgniteConfiguration(), log) {
@Override public boolean registerUserClassName(int typeId, String clsName, boolean failIfUnregistered,
- boolean onlyLocReg, byte platformId) {
+ boolean onlyLocReg, byte platformId) {
return true;
}
};
- return new KafkaToIgniteMetadataUpdater(noOpCtx, listeningLog, kafkaProperties(kafka), streamerCfg);
+ return new KafkaToIgniteMetadataUpdater(noOpCtx, listeningLog, kafkaProperties(embeddedKafka), streamerCfg);
}
/** */
diff --git a/modules/kafka-ext/pom.xml b/modules/kafka-ext/pom.xml
index f550a2911..32fe3d81d 100644
--- a/modules/kafka-ext/pom.xml
+++ b/modules/kafka-ext/pom.xml
@@ -32,7 +32,7 @@
- 3.4.0
+ 3.9.1
ignite-kafka-ext
@@ -187,6 +187,14 @@
org.apache.felix
maven-bundle-plugin
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 9
+ 9
+
+
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
index 7d45f29b4..f0461bc02 100644
--- a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
@@ -38,7 +38,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
/**
* Kafka Test Broker.
@@ -78,7 +78,7 @@ public TestKafkaBroker() {
try {
zkServer = new TestingServer(ZK_PORT, true);
kafkaCfg = new KafkaConfig(getKafkaConfig());
- kafkaSrv = TestUtils.createServer(kafkaCfg, new SystemTime());
+ kafkaSrv = TestUtils.createServer(kafkaCfg, Time.SYSTEM);
kafkaSrv.startup();
}
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
index 69231960c..a7759d5f0 100644
--- a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -35,8 +35,7 @@
import org.apache.ignite.stream.kafka.TestKafkaBroker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
@@ -106,7 +105,7 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
AllConnectorClientConfigOverridePolicy allConnectorClientCfgOverridePlc
= new AllConnectorClientConfigOverridePolicy();
- worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), workerCfg, offBackingStore,
+ worker = new Worker(WORKER_ID, Time.SYSTEM, new Plugins(props), workerCfg, offBackingStore,
allConnectorClientCfgOverridePlc);
worker.start();
@@ -145,7 +144,7 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
*/
@Test
public void testSinkPutsWithoutTransformation() throws Exception {
- Map sinkProps = makeSinkProps(Utils.join(TOPICS, ","));
+ Map sinkProps = makeSinkProps(String.join(",", TOPICS));
sinkProps.remove(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS);
@@ -157,7 +156,7 @@ public void testSinkPutsWithoutTransformation() throws Exception {
*/
@Test
public void testSinkPutsWithTransformation() throws Exception {
- testSinkPuts(makeSinkProps(Utils.join(TOPICS, ",")), true);
+ testSinkPuts(makeSinkProps(String.join(",", TOPICS)), true);
}
/**
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
index 858c369a8..f9b0a0501 100644
--- a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -41,8 +42,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
@@ -103,13 +103,18 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest {
Map props = makeWorkerProps();
WorkerConfig workerCfg = new StandaloneConfig(props);
- MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
+ MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore() {
+ @Override
+ public Set