From e5b54cf2578ac245d354053122969947e958aaba Mon Sep 17 00:00:00 2001 From: Aleksandr Nikolaev Date: Mon, 21 Apr 2025 10:12:01 +0300 Subject: [PATCH 1/7] IGNITE-25203 Update kafka dependency --- modules/cdc-ext/pom.xml | 2 +- modules/kafka-ext/pom.xml | 10 +++++++++- .../kafka/connect/IgniteSinkConnectorTest.java | 4 ++-- .../kafka/connect/IgniteSourceConnectorTest.java | 15 ++++++++++++--- 4 files changed, 24 insertions(+), 7 deletions(-) diff --git a/modules/cdc-ext/pom.xml b/modules/cdc-ext/pom.xml index c8052fe23..dda69c2e1 100644 --- a/modules/cdc-ext/pom.xml +++ b/modules/cdc-ext/pom.xml @@ -32,7 +32,7 @@ - 3.4.0 + 3.8.1 ignite-cdc-ext diff --git a/modules/kafka-ext/pom.xml b/modules/kafka-ext/pom.xml index f550a2911..31ff24574 100644 --- a/modules/kafka-ext/pom.xml +++ b/modules/kafka-ext/pom.xml @@ -32,7 +32,7 @@ - 3.4.0 + 3.8.1 ignite-kafka-ext @@ -187,6 +187,14 @@ org.apache.felix maven-bundle-plugin + + org.apache.maven.plugins + maven-compiler-plugin + + 9 + 9 + + diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java index 69231960c..0944afea7 100644 --- a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java +++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java @@ -145,7 +145,7 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest { */ @Test public void testSinkPutsWithoutTransformation() throws Exception { - Map sinkProps = makeSinkProps(Utils.join(TOPICS, ",")); + Map sinkProps = makeSinkProps(String.join(",", TOPICS)); sinkProps.remove(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS); @@ -157,7 +157,7 @@ public void testSinkPutsWithoutTransformation() throws Exception { */ @Test public void testSinkPutsWithTransformation() throws Exception { - testSinkPuts(makeSinkProps(Utils.join(TOPICS, ",")), true); + testSinkPuts(makeSinkProps(String.join(",", TOPICS)), true); } /** diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java index 858c369a8..bc0f7418b 100644 --- a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java +++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -103,7 +104,12 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest { Map props = makeWorkerProps(); WorkerConfig workerCfg = new StandaloneConfig(props); - MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore(); + MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore() { + @Override + public Set> connectorPartitions(String s) { + return Set.of(); + } + }; offBackingStore.configure(workerCfg); AllConnectorClientConfigOverridePolicy allConnectorClientCfgOverridePlc @@ -142,7 +148,7 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest { */ @Test public void testEventsInjectedIntoKafkaWithoutFilter() throws Exception { - Map srcProps = makeSourceProps(Utils.join(TOPICS, ",")); + Map srcProps = makeSourceProps(String.join(",", TOPICS)); srcProps.remove(IgniteSourceConstants.CACHE_FILTER_CLASS); @@ -156,7 +162,10 @@ public void testEventsInjectedIntoKafkaWithoutFilter() throws Exception { */ @Test public void testEventsInjectedIntoKafka() throws Exception { - doTest(makeSourceProps(Utils.join(TOPICS, ",")), true); +// doTest(makeSourceProps(Utils.join(TOPICS, ",")), true); + doTest(makeSourceProps(String.join(",", TOPICS)), true); +// "constructor with %s for %s", Arrays.stream(argTypes).map(Object::toString).collect(Collectors.joining(", ")), className), e); + } /** From 3862c75f255f8a24e1a880fac2f3144f782dba2a Mon Sep 17 00:00:00 2001 From: Aleksandr Nikolaev Date: Thu, 15 May 2025 17:48:07 +0300 Subject: [PATCH 2/7] IGNITE-25203 update kafka version --- modules/cdc-ext/pom.xml | 2 +- modules/kafka-ext/pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/cdc-ext/pom.xml b/modules/cdc-ext/pom.xml index dda69c2e1..ee7ec5afc 100644 --- a/modules/cdc-ext/pom.xml +++ b/modules/cdc-ext/pom.xml @@ -32,7 +32,7 @@ - 3.8.1 + 3.9.0 ignite-cdc-ext diff --git a/modules/kafka-ext/pom.xml b/modules/kafka-ext/pom.xml index 31ff24574..93a1aea7a 100644 --- a/modules/kafka-ext/pom.xml +++ b/modules/kafka-ext/pom.xml @@ -32,7 +32,7 @@ - 3.8.1 + 3.9.0 ignite-kafka-ext From b32e2cb91a092b21175ec61877838ae31b19fbfa Mon Sep 17 00:00:00 2001 From: Aleksandr Nikolaev Date: Mon, 23 Jun 2025 15:32:05 +0300 Subject: [PATCH 3/7] Fix tests --- .../java/org/apache/ignite/stream/kafka/TestKafkaBroker.java | 4 ++-- .../ignite/stream/kafka/connect/IgniteSinkConnectorTest.java | 5 ++--- .../stream/kafka/connect/IgniteSourceConnectorTest.java | 5 ++--- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java index 7d45f29b4..f0461bc02 100644 --- a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java +++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java @@ -38,7 +38,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; /** * Kafka Test Broker. @@ -78,7 +78,7 @@ public TestKafkaBroker() { try { zkServer = new TestingServer(ZK_PORT, true); kafkaCfg = new KafkaConfig(getKafkaConfig()); - kafkaSrv = TestUtils.createServer(kafkaCfg, new SystemTime()); + kafkaSrv = TestUtils.createServer(kafkaCfg, Time.SYSTEM); kafkaSrv.startup(); } diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java index 0944afea7..a7759d5f0 100644 --- a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java +++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java @@ -35,8 +35,7 @@ import org.apache.ignite.stream.kafka.TestKafkaBroker; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.utils.SystemTime; -import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; @@ -106,7 +105,7 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest { AllConnectorClientConfigOverridePolicy allConnectorClientCfgOverridePlc = new AllConnectorClientConfigOverridePolicy(); - worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), workerCfg, offBackingStore, + worker = new Worker(WORKER_ID, Time.SYSTEM, new Plugins(props), workerCfg, offBackingStore, allConnectorClientCfgOverridePlc); worker.start(); diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java index bc0f7418b..f9b0a0501 100644 --- a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java +++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java @@ -42,8 +42,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.utils.SystemTime; -import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; @@ -115,7 +114,7 @@ public Set> connectorPartitions(String s) { AllConnectorClientConfigOverridePolicy allConnectorClientCfgOverridePlc = new AllConnectorClientConfigOverridePolicy(); - worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), workerCfg, offBackingStore, + worker = new Worker(WORKER_ID, Time.SYSTEM, new Plugins(props), workerCfg, offBackingStore, allConnectorClientCfgOverridePlc); worker.start(); From 7b8a1bbbbd40c3845208d0733b8a46ff65a181f2 Mon Sep 17 00:00:00 2001 From: Aleksandr Nikolaev Date: Tue, 24 Jun 2025 10:47:08 +0300 Subject: [PATCH 4/7] update kafka ver + fix test --- modules/cdc-ext/pom.xml | 43 +++++----- .../cdc/kafka/CdcKafkaReplicationTest.java | 84 +++++++------------ .../KafkaToIgniteMetadataUpdaterTest.java | 65 +++++++++----- modules/kafka-ext/pom.xml | 2 +- 4 files changed, 96 insertions(+), 98 deletions(-) diff --git a/modules/cdc-ext/pom.xml b/modules/cdc-ext/pom.xml index ee7ec5afc..2cd3babe4 100644 --- a/modules/cdc-ext/pom.xml +++ b/modules/cdc-ext/pom.xml @@ -32,7 +32,7 @@ - 3.9.0 + 3.9.1 ignite-cdc-ext @@ -58,6 +58,13 @@ + + org.springframework.kafka + spring-kafka-test + 3.3.7 + test + + org.xerial.snappy snappy-java @@ -121,27 +128,6 @@ ${kafka.version} test test - - - com.fasterxml.jackson.core - jackson-databind - - - - - - org.apache.kafka - kafka-streams - ${kafka.version} - test - - - - org.apache.kafka - kafka-streams - ${kafka.version} - test - test @@ -158,12 +144,25 @@ ${slf4j.version} + + org.testcontainers + kafka + 1.21.2 + test + + org.slf4j slf4j-simple ${slf4j.version} test + + org.springframework.boot + spring-boot-test + 3.2.12 + test + diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java index a56b2941b..d2484bb0e 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java @@ -36,7 +36,11 @@ import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.junit.jupiter.api.BeforeAll; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.EVTS_SENT_CNT; import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.LAST_EVT_SENT_TIME; @@ -54,6 +58,22 @@ /** * Tests for kafka replication. */ + +@SpringBootTest +@EmbeddedKafka( + partitions = 16, + topics = { + CdcKafkaReplicationTest.SRC_DEST_TOPIC, + CdcKafkaReplicationTest.DEST_SRC_TOPIC, + CdcKafkaReplicationTest.SRC_DEST_META_TOPIC, + CdcKafkaReplicationTest.DEST_SRC_META_TOPIC + }, + brokerProperties = { + "listeners=PLAINTEXT://localhost:9092", + "auto.create.topics.enable=false" + } +) + public class CdcKafkaReplicationTest extends AbstractReplicationTest { /** */ public static final String SRC_DEST_TOPIC = "source-dest"; @@ -71,32 +91,32 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest { public static final int DFLT_PARTS = 16; /** */ - private static EmbeddedKafkaCluster KAFKA = null; + private static EmbeddedKafkaBroker embeddedKafka; /** */ protected List kafkaStreamers; + /** */ + @BeforeAll static void beforeAll(@Autowired EmbeddedKafkaBroker broker) { + embeddedKafka = broker; + } + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); - - KAFKA = initKafka(KAFKA); kafkaStreamers = new ArrayList<>(); } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); - - removeKafkaTopicsAndWait(KAFKA, getTestTimeout()); } /** {@inheritDoc} */ @Override protected List> startActivePassiveCdc(String cache) { try { - KAFKA.createTopic(cache, DFLT_PARTS, 1); - waitForCondition(() -> KAFKA.getAllTopicsInCluster().contains(cache), getTestTimeout()); + waitForCondition(() -> true, getTestTimeout()); } catch (Exception e) { throw new RuntimeException(e); @@ -336,16 +356,16 @@ protected IgniteInternalFuture kafkaToIgnite( /** */ protected Properties kafkaProperties() { - return kafkaProperties(KAFKA); + return kafkaProperties(embeddedKafka); } /** * @param kafka Kafka cluster. */ - static Properties kafkaProperties(EmbeddedKafkaCluster kafka) { + static Properties kafkaProperties(EmbeddedKafkaBroker kafka) { Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.bootstrapServers()); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBrokersAsString()); props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-to-ignite-applier"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); @@ -354,46 +374,4 @@ static Properties kafkaProperties(EmbeddedKafkaCluster kafka) { return props; } - /** - * Init Kafka cluster if current instance is null and create topics. - * - * @param curKafka Current kafka. - */ - static EmbeddedKafkaCluster initKafka(EmbeddedKafkaCluster curKafka) throws Exception { - EmbeddedKafkaCluster kafka = curKafka; - - if (kafka == null) { - Properties props = new Properties(); - - props.put("auto.create.topics.enable", "false"); - - kafka = new EmbeddedKafkaCluster(1, props); - - kafka.start(); - } - - kafka.createTopic(SRC_DEST_TOPIC, DFLT_PARTS, 1); - kafka.createTopic(DEST_SRC_TOPIC, DFLT_PARTS, 1); - kafka.createTopic(SRC_DEST_META_TOPIC, 1, 1); - kafka.createTopic(DEST_SRC_META_TOPIC, 1, 1); - - return kafka; - } - - /** - * @param kafka Kafka cluster. - * @param timeout Timeout. - */ - static void removeKafkaTopicsAndWait(EmbeddedKafkaCluster kafka, long timeout) throws IgniteInterruptedCheckedException { - kafka.getAllTopicsInCluster().forEach(t -> { - try { - kafka.deleteTopic(t); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); - - waitForCondition(() -> kafka.getAllTopicsInCluster().isEmpty(), timeout); - } } diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java index a600c936a..e0a215a00 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java @@ -30,53 +30,76 @@ import org.apache.ignite.testframework.ListeningTestLogger; import org.apache.ignite.testframework.LogListener; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.junit.Test; +import org.junit.jupiter.api.BeforeAll; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; import static org.apache.ignite.cdc.AbstractReplicationTest.ACTIVE_PASSIVE_CACHE; import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.DFLT_PARTS; import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.SRC_DEST_META_TOPIC; import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.SRC_DEST_TOPIC; -import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.initKafka; import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.kafkaProperties; -import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.removeKafkaTopicsAndWait; import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_CONSUMER_POLL_TIMEOUT; import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT; import static org.apache.ignite.testframework.GridTestUtils.assertThrows; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; import static org.apache.logging.log4j.Level.DEBUG; + /** * */ + +@SpringBootTest +@EmbeddedKafka( + partitions = 16, + topics = { + CdcKafkaReplicationTest.SRC_DEST_TOPIC, + CdcKafkaReplicationTest.DEST_SRC_TOPIC, + CdcKafkaReplicationTest.SRC_DEST_META_TOPIC, + CdcKafkaReplicationTest.DEST_SRC_META_TOPIC + }, + brokerProperties = { + "listeners=PLAINTEXT://localhost:9092", + "auto.create.topics.enable=false" + } +) + public class KafkaToIgniteMetadataUpdaterTest extends GridCommonAbstractTest { /** Markers sent messages listener. */ private static final LogListener MARKERS_LISTENER = LogListener.matches("Meta update markers sent.") - .times(1) - .build(); + .times(1) + .build(); /** Polled from meta topic message listener. */ private static final LogListener POLLED_LISTENER = LogListener.matches("Polled from meta topic [rcvdEvts=1]") - .times(1) - .build(); + .times(1) + .build(); /** Poll skip messages listener. */ private static final LogListener POLL_SKIP_LISTENER = LogListener.matches("Offsets unchanged, poll skipped") - .times(1) - .build(); + .times(1) + .build(); /** Kafka cluster. */ - private EmbeddedKafkaCluster kafka; + private static EmbeddedKafkaBroker embeddedKafka; /** Listening logger. */ private ListeningTestLogger listeningLog; + /** */ + @BeforeAll + static void beforeAll(@Autowired EmbeddedKafkaBroker broker) { + embeddedKafka = broker; + } + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); - kafka = initKafka(kafka); - listeningLog = new ListeningTestLogger(log); resetLog4j(DEBUG, false, IgniteToKafkaCdcStreamer.class.getName()); @@ -91,8 +114,6 @@ public class KafkaToIgniteMetadataUpdaterTest extends GridCommonAbstractTest { @Override protected void afterTest() throws Exception { super.afterTest(); - removeKafkaTopicsAndWait(kafka, getTestTimeout()); - MARKERS_LISTENER.reset(); POLLED_LISTENER.reset(); POLL_SKIP_LISTENER.reset(); @@ -145,12 +166,12 @@ public void testUpdateMetadata() throws Exception { /** */ private IgniteToKafkaCdcStreamer igniteToKafkaCdcStreamer() { IgniteToKafkaCdcStreamer streamer = new IgniteToKafkaCdcStreamer() - .setTopic(SRC_DEST_TOPIC) - .setMetadataTopic(SRC_DEST_META_TOPIC) - .setKafkaPartitions(DFLT_PARTS) - .setKafkaProperties(kafkaProperties(kafka)) - .setCaches(Collections.singleton(ACTIVE_PASSIVE_CACHE)) - .setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT); + .setTopic(SRC_DEST_TOPIC) + .setMetadataTopic(SRC_DEST_META_TOPIC) + .setKafkaPartitions(DFLT_PARTS) + .setKafkaProperties(kafkaProperties(embeddedKafka)) + .setCaches(Collections.singleton(ACTIVE_PASSIVE_CACHE)) + .setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT); GridTestUtils.setFieldValue(streamer, "log", listeningLog.getLogger(IgniteToKafkaCdcStreamer.class)); @@ -168,12 +189,12 @@ private KafkaToIgniteMetadataUpdater metadataUpdater() { private KafkaToIgniteMetadataUpdater metadataUpdater(KafkaToIgniteCdcStreamerConfiguration streamerCfg) { BinaryContext noOpCtx = new BinaryContext(new IgniteConfiguration(), log) { @Override public boolean registerUserClassName(int typeId, String clsName, boolean failIfUnregistered, - boolean onlyLocReg, byte platformId) { + boolean onlyLocReg, byte platformId) { return true; } }; - return new KafkaToIgniteMetadataUpdater(noOpCtx, listeningLog, kafkaProperties(kafka), streamerCfg); + return new KafkaToIgniteMetadataUpdater(noOpCtx, listeningLog, kafkaProperties(embeddedKafka), streamerCfg); } /** */ diff --git a/modules/kafka-ext/pom.xml b/modules/kafka-ext/pom.xml index 93a1aea7a..32fe3d81d 100644 --- a/modules/kafka-ext/pom.xml +++ b/modules/kafka-ext/pom.xml @@ -32,7 +32,7 @@ - 3.9.0 + 3.9.1 ignite-kafka-ext From bf1f3062b36bd166662787ae4c08953eb8b06d1b Mon Sep 17 00:00:00 2001 From: Aleksandr Nikolaev Date: Tue, 24 Jun 2025 12:22:26 +0300 Subject: [PATCH 5/7] fix spring-kafka-test version --- modules/cdc-ext/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/cdc-ext/pom.xml b/modules/cdc-ext/pom.xml index 2cd3babe4..9343a813d 100644 --- a/modules/cdc-ext/pom.xml +++ b/modules/cdc-ext/pom.xml @@ -61,7 +61,7 @@ org.springframework.kafka spring-kafka-test - 3.3.7 + 3.2.10 test From e5f75ce013dc265e1c34d024a217e0a3415fd5ce Mon Sep 17 00:00:00 2001 From: Aleksandr Nikolaev Date: Tue, 24 Jun 2025 12:37:20 +0300 Subject: [PATCH 6/7] fix spring-kafka-test version --- modules/cdc-ext/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/cdc-ext/pom.xml b/modules/cdc-ext/pom.xml index 9343a813d..1d78adb8d 100644 --- a/modules/cdc-ext/pom.xml +++ b/modules/cdc-ext/pom.xml @@ -61,7 +61,7 @@ org.springframework.kafka spring-kafka-test - 3.2.10 + 2.9.13 test From b89443d85c0cb4e1373b0c629a2dcb19593f75b3 Mon Sep 17 00:00:00 2001 From: Aleksandr Nikolaev Date: Tue, 24 Jun 2025 13:11:52 +0300 Subject: [PATCH 7/7] fix spring-boot-test version --- modules/cdc-ext/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/cdc-ext/pom.xml b/modules/cdc-ext/pom.xml index 1d78adb8d..30db48a7d 100644 --- a/modules/cdc-ext/pom.xml +++ b/modules/cdc-ext/pom.xml @@ -160,7 +160,7 @@ org.springframework.boot spring-boot-test - 3.2.12 + 2.7.18 test