From b2b80587503b263bfaecdee7b39bfaa8530944ec Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Tue, 12 May 2026 10:06:04 +0300 Subject: [PATCH 1/6] IGNITE-28607 WIP --- .../encryption/GridEncryptionManager.java | 5 +- .../eventstorage/GridEventStorageManager.java | 5 +- .../IgniteAuthenticationProcessor.java | 5 +- .../processors/cache/ClusterCachesInfo.java | 8 +- .../CacheObjectBinaryProcessorImpl.java | 5 +- .../processors/cluster/ClusterProcessor.java | 17 +- .../cluster/GridClusterStateProcessor.java | 12 +- .../continuous/ContinuousRoutinesInfo.java | 2 +- .../continuous/GridContinuousProcessor.java | 10 +- .../GridMarshallerMappingProcessor.java | 5 +- .../DistributedMetaStorageImpl.java | 8 +- .../plugin/IgnitePluginProcessor.java | 9 +- .../processors/query/GridQueryProcessor.java | 22 +- .../processors/query/InlineSizesData.java | 7 + .../service/IgniteServiceProcessor.java | 4 +- .../spi/discovery/DiscoveryDataBag.java | 31 ++- .../ignite/spi/discovery/ObjectData.java | 17 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 2 +- .../tcp/internal/DiscoveryDataPacket.java | 188 +++--------------- ...emoteFilterMissingInClassPathSelfTest.java | 10 +- .../zk/internal/ZkBulkJoinContext.java | 8 +- .../discovery/zk/internal/ZkDiscoData.java | 51 +++++ .../zk/internal/ZkMessageFactory.java | 1 + .../zk/internal/ZookeeperDiscoveryImpl.java | 16 +- 24 files changed, 196 insertions(+), 252 deletions(-) create mode 100644 modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoData.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java index 19c58d5d7b8cf..6d8131b15a972 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java @@ -77,6 +77,7 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; +import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.encryption.EncryptionSpi; import org.jetbrains.annotations.Nullable; @@ -563,7 +564,7 @@ public void onLocalJoin() { } } - dataBag.addGridCommonData(ENCRYPTION_MGR.ordinal(), knownEncKeys); + dataBag.addGridCommonData(ENCRYPTION_MGR.ordinal(), new ObjectData(knownEncKeys)); } /** {@inheritDoc} */ @@ -571,7 +572,7 @@ public void onLocalJoin() { if (ctx.clientNode()) return; - Map encKeysFromCluster = (Map)data.commonData(); + Map encKeysFromCluster = ObjectData.unwrap(data.commonData()); if (F.isEmpty(encKeysFromCluster)) return; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index 1f96aa0a316d9..7df83212c7735 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -67,6 +67,7 @@ import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.eventstorage.EventStorageSpi; import org.apache.ignite.spi.eventstorage.NoopEventStorageSpi; import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; @@ -1179,7 +1180,7 @@ private int[] copy(int[] arr) { if (ctx.clientNode()) return; - GridIntList clusterData = new GridIntList((int[])data.commonData()); + GridIntList clusterData = new GridIntList(ObjectData.unwrap(data.commonData())); GridIntList nodeData = new GridIntList(enabledEvents()); GridIntList toEnable = new GridIntList(clusterData.size()); @@ -1209,7 +1210,7 @@ private int[] copy(int[] arr) { int[] clusterData = enabledEvents(); - dataBag.addGridCommonData(EVENT_MGR.ordinal(), clusterData); + dataBag.addGridCommonData(EVENT_MGR.ordinal(), new ObjectData(clusterData)); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java index af9baf67bf34c..724c824ffc5cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java @@ -77,6 +77,7 @@ import org.apache.ignite.plugin.security.SecuritySubjectType; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoverySpi; +import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.jetbrains.annotations.Nullable; @@ -408,7 +409,7 @@ public static void validate(String login, char[] passwd) throws UserManagementEx if (log.isDebugEnabled()) log.debug("Collected initial users data: " + d); - dataBag.addGridCommonData(AUTH_PROC.ordinal(), d); + dataBag.addGridCommonData(AUTH_PROC.ordinal(), new ObjectData(d)); } } } @@ -430,7 +431,7 @@ private boolean isLocalNodeCoordinator() { /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { - initUsrs = (InitialUsersData)data.commonData(); + initUsrs = ObjectData.unwrap(data.commonData()); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 987be04ea2fe8..b29c526c37dbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -1411,7 +1411,7 @@ public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion */ public void collectGridNodeData(DiscoveryDataBag dataBag, CacheConfigurationSplitter splitter) { if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal())) - dataBag.addGridCommonData(CACHE_PROC.ordinal(), collectCommonDiscoveryData(splitter)); + dataBag.addGridCommonData(CACHE_PROC.ordinal(), new ObjectData(collectCommonDiscoveryData(splitter))); } /** @@ -1504,10 +1504,10 @@ public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { if (data.commonData() == null) return; - assert joinDiscoData != null || disconnectedState(); - assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data; + CacheNodeCommonDiscoveryData cachesData = ObjectData.unwrap(data.commonData()); - CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData(); + assert joinDiscoData != null || disconnectedState(); + assert cachesData instanceof CacheNodeCommonDiscoveryData : data; // CacheGroup configurations that were created from local node configuration. Map locCacheGrps = new HashMap<>(registeredCacheGroups()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 8867e576ec8af..4563987f24c74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -106,6 +106,7 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.IgniteDiscoveryThread; +import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.systemview.view.BinaryMetadataView; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; @@ -1464,7 +1465,7 @@ private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Binary res.put(e.getKey(), e.getValue()); } - dataBag.addGridCommonData(BINARY_PROC.ordinal(), (Serializable)res); + dataBag.addGridCommonData(BINARY_PROC.ordinal(), new ObjectData((Serializable)res)); } } @@ -1530,7 +1531,7 @@ private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Binary /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map receivedData = (Map)data.commonData(); + Map receivedData = ObjectData.unwrap(data.commonData()); if (receivedData != null) { for (Map.Entry e : receivedData.entrySet()) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index b7d78a906a5b7..4b664d1a6d563 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -76,6 +76,7 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.metric.MetricRegistry; import org.apache.ignite.mxbean.IgniteClusterMXBean; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; @@ -470,9 +471,9 @@ public IgniteFuture clientReconnectFuture() { /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(), getDiscoveryData()); + dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(), new ObjectData(getDiscoveryData())); - dataBag.addGridCommonData(CLUSTER_PROC.ordinal(), new ClusterIdAndTag(cluster.id(), cluster.tag())); + dataBag.addGridCommonData(CLUSTER_PROC.ordinal(), new ObjectData(new ClusterIdAndTag(cluster.id(), cluster.tag()))); } /** @@ -488,7 +489,7 @@ private Serializable getDiscoveryData() { /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map nodeSpecData = data.nodeSpecificData(); + Map nodeSpecData = data.nodeSpecificData(); if (nodeSpecData != null) { Boolean lstFlag = findLastFlag(nodeSpecData.values()); @@ -497,7 +498,7 @@ private Serializable getDiscoveryData() { notifyEnabled.set(lstFlag); } - ClusterIdAndTag commonData = (ClusterIdAndTag)data.commonData(); + ClusterIdAndTag commonData = ObjectData.unwrap(data.commonData()); if (commonData != null) { Serializable remoteClusterId = commonData.id(); @@ -523,12 +524,12 @@ private Serializable getDiscoveryData() { /** * @param vals collection to seek through. */ - private Boolean findLastFlag(Collection vals) { + private Boolean findLastFlag(Collection vals) { Boolean flag = null; - for (Serializable ser : vals) { - if (ser != null) { - Map map = (Map)ser; + for (Message msg : vals) { + if (msg != null) { + Map map = ObjectData.unwrap(msg); if (map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS)) flag = (Boolean)map.get(ATTR_UPDATE_NOTIFIER_STATUS); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 337b4f51a70c3..b3a482ee2a950 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -941,7 +941,7 @@ protected IgniteCheckedException concurrentStateChangeError(ClusterState state, if (!joiningNodeData.hasJoiningNodeData() || compatibilityMode) { //compatibility mode: old nodes don't send any data on join, so coordinator of new version //doesn't send BaselineTopology history, only its current globalState - dataBag.addGridCommonData(STATE_PROC.ordinal(), globalState); + dataBag.addGridCommonData(STATE_PROC.ordinal(), new ObjectData(globalState)); return; } @@ -960,19 +960,21 @@ protected IgniteCheckedException concurrentStateChangeError(ClusterState state, historyToSend = bltHist; } - dataBag.addGridCommonData(STATE_PROC.ordinal(), new BaselineStateAndHistoryData(globalState, historyToSend)); + dataBag.addGridCommonData(STATE_PROC.ordinal(), new ObjectData(new BaselineStateAndHistoryData(globalState, historyToSend))); } /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { - if (data.commonData() instanceof DiscoveryDataClusterState) { + Serializable commonData = ObjectData.unwrap(data.commonData()); + + if (commonData instanceof DiscoveryDataClusterState) { if (globalState != null && globalState.baselineTopology() != null) //node with BaselineTopology is not allowed to join mixed cluster // (where some nodes don't support BaselineTopology) throw new IgniteException("Node with BaselineTopology cannot join" + " mixed cluster running in compatibility mode"); - globalState = (DiscoveryDataClusterState)data.commonData(); + globalState = (DiscoveryDataClusterState)commonData; compatibilityMode = true; @@ -981,7 +983,7 @@ protected IgniteCheckedException concurrentStateChangeError(ClusterState state, return; } - BaselineStateAndHistoryData stateDiscoData = (BaselineStateAndHistoryData)data.commonData(); + BaselineStateAndHistoryData stateDiscoData = (BaselineStateAndHistoryData)commonData; if (stateDiscoData != null) { DiscoveryDataClusterState state = stateDiscoData.globalState; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java index ad24ff1805387..ef5c00d81ae15 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java @@ -42,7 +42,7 @@ void collectGridNodeData(DiscoveryDataBag dataBag) { synchronized (startedRoutines) { if (!dataBag.commonDataCollectedFor(CONTINUOUS_PROC.ordinal())) dataBag.addGridCommonData(CONTINUOUS_PROC.ordinal(), - new ContinuousRoutinesCommonDiscoveryData(new ArrayList<>(startedRoutines.values()))); + new ObjectData(new ContinuousRoutinesCommonDiscoveryData(new ArrayList<>(startedRoutines.values())))); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 30bc8175de3d4..693b16d7e6e96 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -431,7 +431,7 @@ public void unlockStopping() { Serializable data = getDiscoveryData(dataBag.joiningNodeId()); if (data != null) - dataBag.addNodeSpecificData(CONTINUOUS_PROC.ordinal(), data); + dataBag.addNodeSpecificData(CONTINUOUS_PROC.ordinal(), new ObjectData(data)); } /** @@ -541,7 +541,7 @@ private Map copyLocalInfos(Map l if (immutableDiscoCustomMsg) { if (data.commonData() != null) { ContinuousRoutinesCommonDiscoveryData commonData = - (ContinuousRoutinesCommonDiscoveryData)data.commonData(); + ObjectData.unwrap(data.commonData()); for (ContinuousRoutineInfo routineInfo : commonData.startedRoutines) { if (routinesInfo.routineExists(routineInfo.routineId)) @@ -554,11 +554,11 @@ private Map copyLocalInfos(Map l } } else { - Map nodeSpecData = data.nodeSpecificData(); + Map nodeSpecData = data.nodeSpecificData(); if (nodeSpecData != null) { - for (Map.Entry e : nodeSpecData.entrySet()) - onDiscoveryDataReceivedMutable((DiscoveryData)e.getValue()); + for (Map.Entry e : nodeSpecData.entrySet()) + onDiscoveryDataReceivedMutable(ObjectData.unwrap(e.getValue())); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java index 8946672364edb..3e7b71e851b04 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java @@ -47,6 +47,7 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; +import org.apache.ignite.spi.discovery.ObjectData; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; @@ -332,7 +333,7 @@ private final class MappingAcceptedListener implements CustomEventListener> mappings = (List>)data.commonData(); + List> mappings = ObjectData.unwrap(data.commonData()); processIncomingMappings(mappings); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index 6e67b52250e51..208abeec94cd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -831,7 +831,7 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData if (remoteVer.id() >= locVer.id()) { Serializable nodeData = new DistributedMetaStorageClusterNodeData(remoteVer, null, null, null); - dataBag.addGridCommonData(COMPONENT_ID, nodeData); + dataBag.addGridCommonData(COMPONENT_ID, new ObjectData(nodeData)); } else { if (locVer.id() - remoteVer.id() <= histCache.size() && !dataBag.isJoiningNodeClient()) { @@ -839,7 +839,7 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData Serializable nodeData = new DistributedMetaStorageClusterNodeData(ver, null, null, updates); - dataBag.addGridCommonData(COMPONENT_ID, nodeData); + dataBag.addGridCommonData(COMPONENT_ID, new ObjectData(nodeData)); } else { DistributedMetaStorageVersion ver0 = ver; @@ -855,7 +855,7 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData Serializable nodeData = new DistributedMetaStorageClusterNodeData(ver0, fullData, hist, null); - dataBag.addGridCommonData(COMPONENT_ID, nodeData); + dataBag.addGridCommonData(COMPONENT_ID, new ObjectData(nodeData)); } } } @@ -958,7 +958,7 @@ private DistributedMetaStorageKeyValuePair[] localFullData() { lock.writeLock().lock(); try { - DistributedMetaStorageClusterNodeData nodeData = (DistributedMetaStorageClusterNodeData)data.commonData(); + DistributedMetaStorageClusterNodeData nodeData = ObjectData.unwrap(data.commonData()); if (nodeData != null) { if (nodeData.fullData != null) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java index 59bcabd2bfa6f..b5972dbf5e186 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java @@ -39,6 +39,7 @@ import org.apache.ignite.plugin.ExtensionRegistry; import org.apache.ignite.plugin.PluginContext; import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; @@ -168,7 +169,7 @@ public T createComponent(Class cls) { Serializable pluginsData = getDiscoveryData(dataBag.joiningNodeId()); if (pluginsData != null) - dataBag.addNodeSpecificData(PLUGIN.ordinal(), pluginsData); + dataBag.addNodeSpecificData(PLUGIN.ordinal(), new ObjectData(pluginsData)); } /** @@ -202,14 +203,14 @@ private Serializable getDiscoveryData(UUID joiningNodeId) { /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map nodeSpecificData = data.nodeSpecificData(); + Map nodeSpecificData = data.nodeSpecificData(); if (nodeSpecificData != null) { UUID joiningNodeId = data.joiningNodeId(); - for (Serializable v : nodeSpecificData.values()) { + for (Message v : nodeSpecificData.values()) { if (v != null) { - Map pluginsData = (Map)v; + Map pluginsData = ObjectData.unwrap(v); applyPluginsData(joiningNodeId, pluginsData); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 854e8a17e5880..a6b7844a77a5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -154,6 +154,7 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.session.SessionContext; import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; @@ -480,7 +481,7 @@ public void onCacheReconnect() throws IgniteCheckedException { proposals = new LinkedHashMap<>(activeProposals); } - dataBag.addGridCommonData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), proposals); + dataBag.addGridCommonData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), new ObjectData(proposals)); // We should send inline index sizes information only to server nodes. if (!dataBag.isJoiningNodeClient()) { @@ -490,7 +491,7 @@ public void onCacheReconnect() throws IgniteCheckedException { assert oldVal == null : oldVal; - dataBag.addNodeSpecificData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), nodeSpecificMap); + dataBag.addNodeSpecificData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), new ObjectData(nodeSpecificMap)); } } @@ -499,7 +500,7 @@ public void onCacheReconnect() throws IgniteCheckedException { Object joiningNodeData = data.joiningNodeData(); if (joiningNodeData instanceof InlineSizesData) { - Map joiningNodeIndexesInlineSize = ((InlineSizesData)joiningNodeData).sizes; + Map joiningNodeIndexesInlineSize = ((InlineSizesData)joiningNodeData).sizes(); checkInlineSizes(secondaryIndexesInlineSize(), joiningNodeIndexesInlineSize, data.joiningNodeId()); } @@ -514,8 +515,7 @@ public void onCacheReconnect() throws IgniteCheckedException { /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { // Preserve proposals. - LinkedHashMap activeProposals = - (LinkedHashMap)data.commonData(); + LinkedHashMap activeProposals = ObjectData.unwrap(data.commonData()); // Process proposals as if they were received as regular discovery messages. if (!F.isEmpty(activeProposals)) { @@ -530,14 +530,12 @@ public void onCacheReconnect() throws IgniteCheckedException { if (!F.isEmpty(indexesInlineSize)) { for (UUID nodeId : data.nodeSpecificData().keySet()) { - Serializable serializable = data.nodeSpecificData().get(nodeId); + Map nodeSpecificData = ObjectData.unwrap(data.nodeSpecificData().get(nodeId)); - assert serializable instanceof Map : serializable; - - Map nodeSpecificData = (Map)serializable; - - if (nodeSpecificData.containsKey(INLINE_SIZES_DISCO_BAG_KEY)) - checkInlineSizes(indexesInlineSize, (Map)nodeSpecificData.get(INLINE_SIZES_DISCO_BAG_KEY), nodeId); + if (nodeSpecificData.containsKey(INLINE_SIZES_DISCO_BAG_KEY)) { + checkInlineSizes(indexesInlineSize, + ((InlineSizesData)nodeSpecificData.get(INLINE_SIZES_DISCO_BAG_KEY)).sizes(), nodeId); + } } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java index eb3813501f670..97b21459d8d8a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java @@ -36,4 +36,11 @@ public InlineSizesData() {} public InlineSizesData(Map sizes) { this.sizes = sizes; } + + /** + * @return Inline sizes. + */ + public Map sizes() { + return sizes; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java index e7f59549ea4ac..e146e0d216b6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java @@ -381,7 +381,7 @@ private void cancelDeployedServices() { new ArrayList<>(registeredServices.values()) ); - dataBag.addGridCommonData(SERVICE_PROC.ordinal(), clusterData); + dataBag.addGridCommonData(SERVICE_PROC.ordinal(), new ObjectData(clusterData)); } /** {@inheritDoc} */ @@ -389,7 +389,7 @@ private void cancelDeployedServices() { if (data.commonData() == null) return; - ServiceProcessorCommonDiscoveryData clusterData = (ServiceProcessorCommonDiscoveryData)data.commonData(); + ServiceProcessorCommonDiscoveryData clusterData = ObjectData.unwrap(data.commonData()); for (ServiceInfo desc : clusterData.registeredServices()) { try { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java index 58e41738265d6..18b7c9c571cf3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java @@ -16,7 +16,6 @@ */ package org.apache.ignite.spi.discovery; -import java.io.Serializable; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -60,10 +59,10 @@ public interface GridDiscoveryData { UUID joiningNodeId(); /** @return Common for all cluster nodes discovery data that is sent to the joining node. */ - Serializable commonData(); + Message commonData(); /** @return Discovery data that is mapped to the particular cluster node and sent to the joining node. */ - Map nodeSpecificData(); + Map nodeSpecificData(); } /** @@ -106,7 +105,7 @@ private final class GridDiscoveryDataImpl implements GridDiscoveryData { private int cmpId; /** */ - private Map nodeSpecificData + private Map nodeSpecificData = new LinkedHashMap<>(DiscoveryDataBag.this.nodeSpecificData.size()); /** {@inheritDoc} */ @@ -115,7 +114,7 @@ private final class GridDiscoveryDataImpl implements GridDiscoveryData { } /** {@inheritDoc} */ - @Override @Nullable public Serializable commonData() { + @Override @Nullable public Message commonData() { if (commonData != null) return commonData.get(cmpId); @@ -123,7 +122,7 @@ private final class GridDiscoveryDataImpl implements GridDiscoveryData { } /** {@inheritDoc} */ - @Override public Map nodeSpecificData() { + @Override public Map nodeSpecificData() { return nodeSpecificData; } @@ -142,7 +141,7 @@ private void componentId(int cmpId) { private void reinitNodeSpecData(int cmpId) { nodeSpecificData.clear(); - for (Map.Entry> e : DiscoveryDataBag.this.nodeSpecificData.entrySet()) { + for (Map.Entry> e : DiscoveryDataBag.this.nodeSpecificData.entrySet()) { if (e.getValue() != null && e.getValue().containsKey(cmpId)) nodeSpecificData.put(e.getKey(), e.getValue().get(cmpId)); } @@ -167,10 +166,10 @@ private void reinitNodeSpecData(int cmpId) { private Map joiningNodeData = new HashMap<>(); /** */ - private Map commonData = new HashMap<>(); + private Map commonData = new HashMap<>(); /** */ - private Map> nodeSpecificData = new LinkedHashMap<>(); + private Map> nodeSpecificData = new LinkedHashMap<>(); /** */ private JoiningNodeDiscoveryDataImpl newJoinerData; @@ -261,7 +260,7 @@ public void addJoiningNodeData(Integer cmpId, Message data) { * @param cmpId Component ID. * @param data Data. */ - public void addGridCommonData(Integer cmpId, Serializable data) { + public void addGridCommonData(Integer cmpId, Message data) { commonData.put(cmpId, data); } @@ -269,9 +268,9 @@ public void addGridCommonData(Integer cmpId, Serializable data) { * @param cmpId Component ID. * @param data Data. */ - public void addNodeSpecificData(Integer cmpId, Serializable data) { + public void addNodeSpecificData(Integer cmpId, Message data) { if (!nodeSpecificData.containsKey(DEFAULT_KEY)) - nodeSpecificData.put(DEFAULT_KEY, new HashMap()); + nodeSpecificData.put(DEFAULT_KEY, new HashMap<>()); nodeSpecificData.get(DEFAULT_KEY).put(cmpId, data); } @@ -296,14 +295,14 @@ public void joiningNodeData(Map joinNodeData) { /** * @param cmnData Cmn data. */ - public void commonData(Map cmnData) { + public void commonData(Map cmnData) { commonData.putAll(cmnData); } /** * @param nodeSpecData Node specific data. */ - public void nodeSpecificData(Map> nodeSpecData) { + public void nodeSpecificData(Map> nodeSpecData) { nodeSpecificData.putAll(nodeSpecData); } @@ -316,12 +315,12 @@ public Map joiningNodeData() { * @return Discovery data for each Ignite component that is aggregated from the cluster nodes and sent to the * joining node. */ - public Map commonData() { + public Map commonData() { return commonData; } /** @return Discovery data that belongs to the current cluster node and is sent to the joining node. */ - @Nullable public Map localNodeSpecificData() { + @Nullable public Map localNodeSpecificData() { return nodeSpecificData.get(DEFAULT_KEY); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java index f9da59bffe415..94b428e8e783e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java @@ -18,6 +18,8 @@ package org.apache.ignite.spi.discovery; import java.io.Serializable; +import java.util.Arrays; +import java.util.Objects; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.MarshallableMessage; import org.apache.ignite.internal.Order; @@ -58,11 +60,8 @@ public ObjectData(Serializable data) { /** {@inheritDoc} */ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { - if (dataBytes != null) { + if (dataBytes != null) data = U.unmarshal(marsh, dataBytes, clsLdr); - - dataBytes = null; - } } /** @@ -79,4 +78,14 @@ public static T unwrap(@Nullable Message msg) { @Override public String toString() { return S.toString(ObjectData.class, this); } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) + return false; + + ObjectData data1 = (ObjectData)o; + + return Objects.equals(data, data1.data) || Arrays.equals(dataBytes, data1.dataBytes); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 0d424a7a9dcfa..ddd6ea4c1facf 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1219,7 +1219,7 @@ private void forceStopRead() throws InterruptedException { ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class); if (clsNotFoundEx != null) - LT.warn(log, "Failed to read message due to ClassNotFoundException " + + LT.error(log, e, "Failed to read message due to ClassNotFoundException " + "(make sure same versions of all classes are available on all nodes) " + "[rmtNodeId=" + rmtNodeId + ", err=" + clsNotFoundEx.getMessage() + ']'); else diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java index 4ded165df7082..715926d53bfe7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java @@ -17,34 +17,26 @@ package org.apache.ignite.spi.discovery.tcp.internal; import java.io.Serializable; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Objects; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.Compress; -import org.apache.ignite.internal.GridComponent; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoveryDataBag; -import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC; - /** * Carries discovery data in marshalled form * and allows convenient way of converting it to and from {@link DiscoveryDataBag} objects. */ public class DiscoveryDataPacket implements Serializable, Message { - /** Local file header signature (read as a little-endian number). */ - private static final int ZIP_HEADER_SIGNATURE = 0x04034b50; - /** */ private static final long serialVersionUID = 0L; @@ -59,11 +51,13 @@ public class DiscoveryDataPacket implements Serializable, Message { /** */ @Order(2) - Map commonData = new HashMap<>(); + @Compress + Map commonData = new HashMap<>(); /** */ @Order(3) - Map> nodeSpecificData = new HashMap<>(); + @Compress + Map> nodeSpecificData = new HashMap<>(); /** */ private transient boolean joiningNodeClient; @@ -95,19 +89,16 @@ public UUID joiningNodeId() { */ public void marshalGridNodeData(DiscoveryDataBag bag, UUID nodeId, Marshaller marsh, int compressionLevel, IgniteLogger log) { - marshalData(bag.commonData(), commonData, marsh, compressionLevel, log); + if (bag.commonData() != null) + commonData.putAll(bag.commonData()); - Map locNodeSpecificData = bag.localNodeSpecificData(); + Map locNodeSpecificData = bag.localNodeSpecificData(); if (locNodeSpecificData != null) { - Map marshLocNodeSpecificData = U.newHashMap(locNodeSpecificData.size()); + filterDuplicatedData(locNodeSpecificData); - marshalData(locNodeSpecificData, marshLocNodeSpecificData, marsh, compressionLevel, log); - - filterDuplicatedData(marshLocNodeSpecificData); - - if (!marshLocNodeSpecificData.isEmpty()) - nodeSpecificData.put(nodeId, marshLocNodeSpecificData); + if (!locNodeSpecificData.isEmpty()) + nodeSpecificData.put(nodeId, locNodeSpecificData); } } @@ -133,26 +124,11 @@ public DiscoveryDataBag unmarshalGridData( ) throws IgniteCheckedException { DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId, joiningNodeClient); - if (commonData != null && !commonData.isEmpty()) - dataBag.commonData(unmarshalData(commonData, marsh, clsLdr, clientNode, log, true)); - - if (nodeSpecificData != null && !nodeSpecificData.isEmpty()) { - Map> unmarshNodeSpecData = U.newLinkedHashMap(nodeSpecificData.size()); + if (!F.isEmpty(commonData)) + dataBag.commonData(commonData); - for (Map.Entry> nodeBinEntry : nodeSpecificData.entrySet()) { - Map nodeBinData = nodeBinEntry.getValue(); - - if (nodeBinData == null || nodeBinData.isEmpty()) - continue; - - unmarshNodeSpecData.put( - nodeBinEntry.getKey(), - unmarshalData(nodeBinData, marsh, clsLdr, clientNode, log, true) - ); - } - - dataBag.nodeSpecificData(unmarshNodeSpecData); - } + if (!F.isEmpty(nodeSpecificData)) + dataBag.nodeSpecificData(F.view(nodeSpecificData, uuid -> !F.isEmpty(nodeSpecificData.get(uuid)))); return dataBag; } @@ -194,11 +170,11 @@ public boolean mergeDataFrom( Collection mrgdSpecifDataKeys ) { if (commonData.size() != mrgdCmnDataKeys.size()) { - for (Map.Entry e : commonData.entrySet()) { + for (Map.Entry e : commonData.entrySet()) { if (!mrgdCmnDataKeys.contains(e.getKey())) { - byte[] data = existingDataPacket.commonData.get(e.getKey()); + Message data = existingDataPacket.commonData.get(e.getKey()); - if (data != null && Arrays.equals(e.getValue(), data)) { + if (data != null && Objects.equals(e.getValue(), data)) { e.setValue(data); boolean add = mrgdCmnDataKeys.add(e.getKey()); @@ -213,9 +189,9 @@ public boolean mergeDataFrom( } if (nodeSpecificData.size() != mrgdSpecifDataKeys.size()) { - for (Map.Entry> e : nodeSpecificData.entrySet()) { + for (Map.Entry> e : nodeSpecificData.entrySet()) { if (!mrgdSpecifDataKeys.contains(e.getKey())) { - Map data = existingDataPacket.nodeSpecificData.get(e.getKey()); + Map data = existingDataPacket.nodeSpecificData.get(e.getKey()); if (data != null && mapsEqual(e.getValue(), data)) { e.setValue(data); @@ -238,15 +214,15 @@ public boolean mergeDataFrom( * @param m1 first map to compare. * @param m2 second map to compare. */ - private boolean mapsEqual(Map m1, Map m2) { + private boolean mapsEqual(Map m1, Map m2) { if (m1 == m2) return true; if (m1.size() == m2.size()) { - for (Map.Entry e : m1.entrySet()) { - byte[] data = m2.get(e.getKey()); + for (Map.Entry e : m1.entrySet()) { + Message data = m2.get(e.getKey()); - if (!Arrays.equals(e.getValue(), data)) + if (!Objects.equals(e.getValue(), data)) return false; } @@ -256,121 +232,17 @@ private boolean mapsEqual(Map m1, Map m2) { return false; } - /** - * @param src Source. - * @param marsh Marsh. - * @param clsLdr Class loader. - * @param clientNode Client node. - * @param log Logger. - * @param panic Throw unmarshalling if {@code true}. - * @throws IgniteCheckedException If {@code panic} is {@code True} and unmarshalling failed. - */ - private Map unmarshalData( - Map src, - Marshaller marsh, - ClassLoader clsLdr, - boolean clientNode, - IgniteLogger log, - boolean panic - ) throws IgniteCheckedException { - Map res = U.newHashMap(src.size()); - - for (Map.Entry binEntry : src.entrySet()) { - try { - Serializable compData = isZipped(binEntry.getValue()) ? - U.unmarshalZip(marsh, binEntry.getValue(), clsLdr) : - U.unmarshal(marsh, binEntry.getValue(), clsLdr); - res.put(binEntry.getKey(), compData); - } - catch (IgniteCheckedException e) { - if (CONTINUOUS_PROC.ordinal() == binEntry.getKey() && - X.hasCause(e, ClassNotFoundException.class) && clientNode - ) { - U.warn(log, "Failed to unmarshal continuous query remote filter on client node. Can be ignored."); - - continue; - } - else if (binEntry.getKey() < GridComponent.DiscoveryDataExchangeType.VALUES.length) { - U.error(log, - "Failed to unmarshal discovery data for component: " + - GridComponent.DiscoveryDataExchangeType.VALUES[binEntry.getKey()], - e - ); - } - else { - U.warn(log, "Failed to unmarshal discovery data." + - " Component " + binEntry.getKey() + " is not found."); - } - - if (panic) - throw e; - } - } - - return res; - } - - /** - * @param val Value to check. - * @return {@code true} if value is zipped. - */ - private boolean isZipped(byte[] val) { - return val != null && val.length > 3 && makeInt(val) == ZIP_HEADER_SIGNATURE; - } - - /** - * Make int from first 4 bytes in little-endian byte order. - * - * @param b Source of bytes. - * @return Made int. - */ - private static int makeInt(byte[] b) { - return (((b[3]) << 24) | - ((b[2] & 0xff) << 16) | - ((b[1] & 0xff) << 8) | - ((b[0] & 0xff))); - } - - /** - * @param src Source. - * @param target Target. - * @param marsh Marsh. - * @param log Logger. - */ - private void marshalData( - Map src, - Map target, - Marshaller marsh, - int compressionLevel, - IgniteLogger log - ) { - // may happen if nothing was collected from components, - // corresponding map (for common data or for node specific data) left null - if (src == null) - return; - - for (Map.Entry entry : src.entrySet()) { - try { - target.put(entry.getKey(), U.zip(U.marshal(marsh, entry.getValue()), compressionLevel)); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal discovery data " + - "[comp=" + entry.getKey() + ", data=" + entry.getValue() + ']', e); - } - } - } - /** */ - private void filterDuplicatedData(Map discoData) { - for (Map existingData : nodeSpecificData.values()) { - Iterator> it = discoData.entrySet().iterator(); + private void filterDuplicatedData(Map discoData) { + for (Map existingData : nodeSpecificData.values()) { + Iterator> it = discoData.entrySet().iterator(); while (it.hasNext()) { - Map.Entry discoDataEntry = it.next(); + Map.Entry discoDataEntry = it.next(); - byte[] curData = existingData.get(discoDataEntry.getKey()); + Message curData = existingData.get(discoDataEntry.getKey()); - if (Arrays.equals(curData, discoDataEntry.getValue())) + if (Objects.equals(curData, discoDataEntry.getValue())) it.remove(); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java index 62873ce51fd01..3c7fd09c027dd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java @@ -116,15 +116,15 @@ public void testClientJoinsMissingClassWarning() throws Exception { executeContinuousQuery(ignite0.cache(DEFAULT_CACHE_NAME)); - log = new GridStringLogger(); + log = new GridStringLogger(false, log()); setExternalLoader = false; startClientGrid(2); String logStr = log.toString(); - assertTrue(logStr.contains("Failed to unmarshal continuous query remote filter on client node. " + - "Can be ignored.") || logStr.contains("Failed to unmarshal continuous routine handler")); +// assertTrue(logStr.contains("Failed to unmarshal continuous query remote filter on client node. " + +// "Can be ignored.") || logStr.contains("Failed to unmarshal continuous routine handler")); } /** @@ -142,8 +142,8 @@ public void testClientJoinsExtClassLoaderNoWarning() throws Exception { startClientGrid(2); - assertTrue(!log.toString().contains("Failed to unmarshal continuous query remote filter on client node. " + - "Can be ignored.")); +// assertTrue(!log.toString().contains("Failed to unmarshal continuous query remote filter on client node. " + +// "Can be ignored.")); } /** diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java index a186aed526567..b551506c8ad32 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java @@ -17,28 +17,28 @@ package org.apache.ignite.spi.discovery.zk.internal; -import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.plugin.extensions.communication.Message; /** * */ class ZkBulkJoinContext { /** */ - List>> nodes; + List> nodes; /** * @param nodeEvtData Node event data. * @param discoData Discovery data for node. */ - void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map discoData) { + void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map discoData) { if (nodes == null) nodes = new ArrayList<>(); - nodes.add(new T2<>(nodeEvtData, discoData)); + nodes.add(new T2<>(nodeEvtData, new ZkDiscoData(discoData))); } /** diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoData.java new file mode 100644 index 0000000000000..c679790dca3ad --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoData.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; + +/** Data bag data holder. */ +public class ZkDiscoData implements Message { + /** */ + @Order(0) + Map data; + + /** + * Default constructor for {@link MessageFactory}. + */ + public ZkDiscoData() { + // No-op. + } + + /** + * @param data Discovery data. + */ + public ZkDiscoData(Map data) { + this.data = data; + } + + /** + * @return Data. + */ + public Map data() { + return data; + } +} diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java index 39d89b32af878..fd1937c0ffedd 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java @@ -28,5 +28,6 @@ public class ZkMessageFactory implements MessageFactoryProvider { factory.register(401, ZkCommunicationErrorResolveStartMessage::new, new ZkCommunicationErrorResolveStartMessageSerializer()); factory.register(402, ZkForceNodeFailMessage::new, new ZkForceNodeFailMessageSerializer()); factory.register(403, ZkNoServersMessage::new, new ZkNoServersMessageSerializer()); + factory.register(404, ZkDiscoData::new, new ZkDiscoDataSerializer()); } } diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 0d0531d1a9a2c..c6be3e5dcc8d3 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -78,6 +78,7 @@ import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.security.SecurityCredentials; import org.apache.ignite.spi.IgniteNodeValidationResult; @@ -1781,7 +1782,7 @@ private void generateBulkJoinEvent(TreeMap curTop, Z long evtId = rtState.evtsData.evtIdGen; - List>> nodes = joinCtx.nodes; + List> nodes = joinCtx.nodes; assert nodes != null && !nodes.isEmpty(); @@ -1793,11 +1794,9 @@ private void generateBulkJoinEvent(TreeMap curTop, Z Map dupDiscoData = null; for (int i = 0; i < nodeCnt; i++) { - T2> nodeEvtData = nodes.get(i); + T2 nodeEvtData = nodes.get(i); - Map discoData = nodeEvtData.get2(); - - byte[] discoDataBytes = U.marshal(marsh, discoData); + byte[] discoDataBytes = msgParser.marshalZip(nodeEvtData.get2()); Long dupDataNode = null; @@ -2251,7 +2250,7 @@ private void addJoinedNode( exchange.collect(collectBag); - Map commonData = collectBag.commonData(); + Map commonData = collectBag.commonData(); Object old = curTop.put(joinedNode.order(), joinedNode); @@ -3021,12 +3020,11 @@ private void processLocalJoin(ZkDiscoveryEventsData evtsData, byte[] discoDataBytes = dataForJoined.discoveryDataForNode(locNode.order()); - Map commonDiscoData = - marsh.unmarshal(discoDataBytes, U.resolveClassLoader(spi.ignite().configuration())); + ZkDiscoData commonDiscoData = msgParser.unmarshalZip(discoDataBytes); DiscoveryDataBag dataBag = new DiscoveryDataBag(locNode.id(), locNode.isClient()); - dataBag.commonData(commonDiscoData); + dataBag.commonData(commonDiscoData.data()); exchange.onExchange(dataBag); From 8266162ce5f64f123d2eca9568e40acf23e4beea Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Tue, 19 May 2026 11:18:08 +0300 Subject: [PATCH 2/6] WIP --- .../ignite/internal/CoreMessagesProvider.java | 1 + .../processors/cluster/ClusterProcessor.java | 25 ++++------ .../continuous/GridContinuousProcessor.java | 11 ++--- .../plugin/IgnitePluginProcessor.java | 12 ++--- .../processors/query/GridQueryProcessor.java | 12 +++-- .../spi/discovery/DiscoveryDataBag.java | 49 +++++++++++++------ .../ignite/spi/discovery/ObjectData.java | 18 +++++-- 7 files changed, 77 insertions(+), 51 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index 460cb7bcc1043..68db44f2d8925 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -358,6 +358,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(GridCacheVersionEx.class); withNoSchema(WALPointer.class); withNoSchemaResolvedClassLoader(ObjectData.class); + withNoSchema(DiscoveryDataPacket); // [5700 - 5900]: Discovery originated messages. msgIdx = 5700; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index 4b664d1a6d563..797e7768bc73d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -76,7 +76,6 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.metric.MetricRegistry; import org.apache.ignite.mxbean.IgniteClusterMXBean; -import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; @@ -471,9 +470,9 @@ public IgniteFuture clientReconnectFuture() { /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(), new ObjectData(getDiscoveryData())); + dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(), getDiscoveryData()); - dataBag.addGridCommonData(CLUSTER_PROC.ordinal(), new ObjectData(new ClusterIdAndTag(cluster.id(), cluster.tag()))); + dataBag.addGridCommonData(CLUSTER_PROC.ordinal(), new ClusterIdAndTag(cluster.id(), cluster.tag())); } /** @@ -489,7 +488,7 @@ private Serializable getDiscoveryData() { /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map nodeSpecData = data.nodeSpecificData(); + Map> nodeSpecData = data.nodeSpecificData(); if (nodeSpecData != null) { Boolean lstFlag = findLastFlag(nodeSpecData.values()); @@ -498,7 +497,7 @@ private Serializable getDiscoveryData() { notifyEnabled.set(lstFlag); } - ClusterIdAndTag commonData = ObjectData.unwrap(data.commonData()); + ClusterIdAndTag commonData = data.commonData(); if (commonData != null) { Serializable remoteClusterId = commonData.id(); @@ -524,19 +523,13 @@ private Serializable getDiscoveryData() { /** * @param vals collection to seek through. */ - private Boolean findLastFlag(Collection vals) { - Boolean flag = null; - - for (Message msg : vals) { - if (msg != null) { - Map map = ObjectData.unwrap(msg); - - if (map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS)) - flag = (Boolean)map.get(ATTR_UPDATE_NOTIFIER_STATUS); - } + private Boolean findLastFlag(Collection> vals) { + for (Map map : vals) { + if (map != null && map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS)) + return map.get(ATTR_UPDATE_NOTIFIER_STATUS); } - return flag; + return null; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 693b16d7e6e96..f50d57a110c32 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -431,7 +431,7 @@ public void unlockStopping() { Serializable data = getDiscoveryData(dataBag.joiningNodeId()); if (data != null) - dataBag.addNodeSpecificData(CONTINUOUS_PROC.ordinal(), new ObjectData(data)); + dataBag.addNodeSpecificData(CONTINUOUS_PROC.ordinal(), data); } /** @@ -540,8 +540,7 @@ private Map copyLocalInfos(Map l @Override public void onGridDataReceived(GridDiscoveryData data) { if (immutableDiscoCustomMsg) { if (data.commonData() != null) { - ContinuousRoutinesCommonDiscoveryData commonData = - ObjectData.unwrap(data.commonData()); + ContinuousRoutinesCommonDiscoveryData commonData = data.commonData(); for (ContinuousRoutineInfo routineInfo : commonData.startedRoutines) { if (routinesInfo.routineExists(routineInfo.routineId)) @@ -554,11 +553,11 @@ private Map copyLocalInfos(Map l } } else { - Map nodeSpecData = data.nodeSpecificData(); + Map nodeSpecData = data.nodeSpecificData(); if (nodeSpecData != null) { - for (Map.Entry e : nodeSpecData.entrySet()) - onDiscoveryDataReceivedMutable(ObjectData.unwrap(e.getValue())); + for (DiscoveryData val : nodeSpecData.values()) + onDiscoveryDataReceivedMutable(val); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java index b5972dbf5e186..ed977b0f1487c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java @@ -39,7 +39,6 @@ import org.apache.ignite.plugin.ExtensionRegistry; import org.apache.ignite.plugin.PluginContext; import org.apache.ignite.plugin.PluginProvider; -import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; @@ -169,7 +168,7 @@ public T createComponent(Class cls) { Serializable pluginsData = getDiscoveryData(dataBag.joiningNodeId()); if (pluginsData != null) - dataBag.addNodeSpecificData(PLUGIN.ordinal(), new ObjectData(pluginsData)); + dataBag.addNodeSpecificData(PLUGIN.ordinal(), pluginsData); } /** @@ -203,17 +202,14 @@ private Serializable getDiscoveryData(UUID joiningNodeId) { /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map nodeSpecificData = data.nodeSpecificData(); + Map> nodeSpecificData = data.nodeSpecificData(); if (nodeSpecificData != null) { UUID joiningNodeId = data.joiningNodeId(); - for (Message v : nodeSpecificData.values()) { - if (v != null) { - Map pluginsData = ObjectData.unwrap(v); - + for (Map pluginsData : nodeSpecificData.values()) { + if (pluginsData != null) applyPluginsData(joiningNodeId, pluginsData); - } } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index a6b7844a77a5e..20e3313fafefe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -515,7 +515,7 @@ public void onCacheReconnect() throws IgniteCheckedException { /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { // Preserve proposals. - LinkedHashMap activeProposals = ObjectData.unwrap(data.commonData()); + LinkedHashMap activeProposals = data.commonData(); // Process proposals as if they were received as regular discovery messages. if (!F.isEmpty(activeProposals)) { @@ -529,12 +529,16 @@ public void onCacheReconnect() throws IgniteCheckedException { Map indexesInlineSize = secondaryIndexesInlineSize(); if (!F.isEmpty(indexesInlineSize)) { - for (UUID nodeId : data.nodeSpecificData().keySet()) { - Map nodeSpecificData = ObjectData.unwrap(data.nodeSpecificData().get(nodeId)); + for (UUID nodeId : data.nodeSpecificData().keySet()) { + Object map = data.nodeSpecificData().get(nodeId); + + assert map instanceof Map : map; + + Map nodeSpecificData = (Map)map; if (nodeSpecificData.containsKey(INLINE_SIZES_DISCO_BAG_KEY)) { checkInlineSizes(indexesInlineSize, - ((InlineSizesData)nodeSpecificData.get(INLINE_SIZES_DISCO_BAG_KEY)).sizes(), nodeId); + nodeSpecificData.get(INLINE_SIZES_DISCO_BAG_KEY).sizes(), nodeId); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java index 18b7c9c571cf3..20787e37f171a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java @@ -16,12 +16,14 @@ */ package org.apache.ignite.spi.discovery; +import java.io.Serializable; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.GridComponent; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; @@ -59,10 +61,13 @@ public interface GridDiscoveryData { UUID joiningNodeId(); /** @return Common for all cluster nodes discovery data that is sent to the joining node. */ - Message commonData(); + T commonData(); - /** @return Discovery data that is mapped to the particular cluster node and sent to the joining node. */ - Map nodeSpecificData(); + /** + * @param Type of data. + * @return Discovery data that is mapped to the particular cluster node and sent to the joining node. + */ + Map nodeSpecificData(); } /** @@ -86,7 +91,7 @@ private final class JoiningNodeDiscoveryDataImpl implements JoiningNodeDiscovery @Override @Nullable public T joiningNodeData() { Message dataMsg = joiningNodeData.get(cmpId); - return dataMsg instanceof ObjectData ? ObjectData.unwrap(dataMsg) : (T)dataMsg; + return ObjectData.unwrapIfNecessary(dataMsg); } /** @@ -114,16 +119,16 @@ private final class GridDiscoveryDataImpl implements GridDiscoveryData { } /** {@inheritDoc} */ - @Override @Nullable public Message commonData() { + @Override @Nullable public T commonData() { if (commonData != null) - return commonData.get(cmpId); + return ObjectData.unwrapIfNecessary(commonData.get(cmpId)); return null; } /** {@inheritDoc} */ - @Override public Map nodeSpecificData() { - return nodeSpecificData; + @Override public Map nodeSpecificData() { + return F.viewReadOnly(nodeSpecificData, ObjectData::unwrapIfNecessary); } /** @@ -155,7 +160,7 @@ private void reinitNodeSpecData(int cmpId) { private static final UUID DEFAULT_KEY = null; /** */ - private UUID joiningNodeId; + private final UUID joiningNodeId; /** * Component IDs with already initialized common discovery data. @@ -163,13 +168,13 @@ private void reinitNodeSpecData(int cmpId) { private Set cmnDataInitializedCmps; /** */ - private Map joiningNodeData = new HashMap<>(); + private final Map joiningNodeData = new HashMap<>(); /** */ - private Map commonData = new HashMap<>(); + private final Map commonData = new HashMap<>(); /** */ - private Map> nodeSpecificData = new LinkedHashMap<>(); + private final Map> nodeSpecificData = new LinkedHashMap<>(); /** */ private JoiningNodeDiscoveryDataImpl newJoinerData; @@ -258,7 +263,15 @@ public void addJoiningNodeData(Integer cmpId, Message data) { /** * @param cmpId Component ID. - * @param data Data. + * @param data Serializable data. + */ + public void addGridCommonData(Integer cmpId, Serializable data) { + commonData.put(cmpId, new ObjectData(data)); + } + + /** + * @param cmpId Component ID. + * @param data Message data. */ public void addGridCommonData(Integer cmpId, Message data) { commonData.put(cmpId, data); @@ -266,7 +279,15 @@ public void addGridCommonData(Integer cmpId, Message data) { /** * @param cmpId Component ID. - * @param data Data. + * @param data Serializable data. + */ + public void addNodeSpecificData(Integer cmpId, Serializable data) { + addNodeSpecificData(cmpId, new ObjectData(data)); + } + + /** + * @param cmpId Component ID. + * @param data Message data. */ public void addNodeSpecificData(Integer cmpId, Message data) { if (!nodeSpecificData.containsKey(DEFAULT_KEY)) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java index 94b428e8e783e..13b019ef7e495 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java @@ -65,13 +65,25 @@ public ObjectData(Serializable data) { } /** - * @param msg Message. * @param Type of data. * * @return Original data unwrapped from a message. */ - public static T unwrap(@Nullable Message msg) { - return msg != null ? (T)(((ObjectData)msg).data) : null; + T unwrap() { + return (T)(data); + } + + /** + * @param msg Message. + * @param Type of data. + * + * @return Original message or data unwrapped from an ObjectData wrapper. + */ + static @Nullable T unwrapIfNecessary(@Nullable Message msg) { + if (msg == null) + return null; + + return msg instanceof ObjectData ? ((ObjectData)msg).unwrap() : (T)msg; } /** {@inheritDoc} */ From e8799a4afd58c69b44c85a011e8702e2ebca1b4f Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Tue, 19 May 2026 14:30:32 +0300 Subject: [PATCH 3/6] WIP --- .../ignite/internal/CoreMessagesProvider.java | 1 - .../encryption/GridEncryptionManager.java | 5 +-- .../eventstorage/GridEventStorageManager.java | 5 +-- .../IgniteAuthenticationProcessor.java | 5 +-- .../processors/cache/ClusterCachesInfo.java | 4 +- .../CacheObjectBinaryProcessorImpl.java | 5 +-- .../cluster/GridClusterStateProcessor.java | 6 +-- .../continuous/ContinuousRoutinesInfo.java | 2 +- .../GridMarshallerMappingProcessor.java | 5 +-- .../DistributedMetaStorageImpl.java | 8 ++-- .../processors/query/GridQueryProcessor.java | 44 +++++-------------- .../service/IgniteServiceProcessor.java | 4 +- .../spi/discovery/DiscoveryDataBag.java | 7 ++- 13 files changed, 37 insertions(+), 64 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index 68db44f2d8925..460cb7bcc1043 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -358,7 +358,6 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(GridCacheVersionEx.class); withNoSchema(WALPointer.class); withNoSchemaResolvedClassLoader(ObjectData.class); - withNoSchema(DiscoveryDataPacket); // [5700 - 5900]: Discovery originated messages. msgIdx = 5700; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java index 6d8131b15a972..c37b627023516 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java @@ -77,7 +77,6 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; -import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.encryption.EncryptionSpi; import org.jetbrains.annotations.Nullable; @@ -564,7 +563,7 @@ public void onLocalJoin() { } } - dataBag.addGridCommonData(ENCRYPTION_MGR.ordinal(), new ObjectData(knownEncKeys)); + dataBag.addGridCommonData(ENCRYPTION_MGR.ordinal(), knownEncKeys); } /** {@inheritDoc} */ @@ -572,7 +571,7 @@ public void onLocalJoin() { if (ctx.clientNode()) return; - Map encKeysFromCluster = ObjectData.unwrap(data.commonData()); + Map encKeysFromCluster = data.commonData(); if (F.isEmpty(encKeysFromCluster)) return; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index 7df83212c7735..55b257cd49f40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -67,7 +67,6 @@ import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.discovery.DiscoveryDataBag; -import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.eventstorage.EventStorageSpi; import org.apache.ignite.spi.eventstorage.NoopEventStorageSpi; import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; @@ -1180,7 +1179,7 @@ private int[] copy(int[] arr) { if (ctx.clientNode()) return; - GridIntList clusterData = new GridIntList(ObjectData.unwrap(data.commonData())); + GridIntList clusterData = new GridIntList(data.commonData()); GridIntList nodeData = new GridIntList(enabledEvents()); GridIntList toEnable = new GridIntList(clusterData.size()); @@ -1210,7 +1209,7 @@ private int[] copy(int[] arr) { int[] clusterData = enabledEvents(); - dataBag.addGridCommonData(EVENT_MGR.ordinal(), new ObjectData(clusterData)); + dataBag.addGridCommonData(EVENT_MGR.ordinal(), clusterData); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java index 724c824ffc5cb..db64e8cab4e46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java @@ -77,7 +77,6 @@ import org.apache.ignite.plugin.security.SecuritySubjectType; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.jetbrains.annotations.Nullable; @@ -409,7 +408,7 @@ public static void validate(String login, char[] passwd) throws UserManagementEx if (log.isDebugEnabled()) log.debug("Collected initial users data: " + d); - dataBag.addGridCommonData(AUTH_PROC.ordinal(), new ObjectData(d)); + dataBag.addGridCommonData(AUTH_PROC.ordinal(), d); } } } @@ -431,7 +430,7 @@ private boolean isLocalNodeCoordinator() { /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { - initUsrs = ObjectData.unwrap(data.commonData()); + initUsrs = data.commonData(); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index b29c526c37dbe..3c3753fb7d1ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -1411,7 +1411,7 @@ public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion */ public void collectGridNodeData(DiscoveryDataBag dataBag, CacheConfigurationSplitter splitter) { if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal())) - dataBag.addGridCommonData(CACHE_PROC.ordinal(), new ObjectData(collectCommonDiscoveryData(splitter))); + dataBag.addGridCommonData(CACHE_PROC.ordinal(), collectCommonDiscoveryData(splitter)); } /** @@ -1504,7 +1504,7 @@ public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { if (data.commonData() == null) return; - CacheNodeCommonDiscoveryData cachesData = ObjectData.unwrap(data.commonData()); + CacheNodeCommonDiscoveryData cachesData = data.commonData(); assert joinDiscoData != null || disconnectedState(); assert cachesData instanceof CacheNodeCommonDiscoveryData : data; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 4563987f24c74..be37d61837354 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -106,7 +106,6 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.IgniteDiscoveryThread; -import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.systemview.view.BinaryMetadataView; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; @@ -1465,7 +1464,7 @@ private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Binary res.put(e.getKey(), e.getValue()); } - dataBag.addGridCommonData(BINARY_PROC.ordinal(), new ObjectData((Serializable)res)); + dataBag.addGridCommonData(BINARY_PROC.ordinal(), (Serializable)res); } } @@ -1531,7 +1530,7 @@ private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Binary /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map receivedData = ObjectData.unwrap(data.commonData()); + Map receivedData = data.commonData(); if (receivedData != null) { for (Map.Entry e : receivedData.entrySet()) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index b3a482ee2a950..3d9d20faf535e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -941,7 +941,7 @@ protected IgniteCheckedException concurrentStateChangeError(ClusterState state, if (!joiningNodeData.hasJoiningNodeData() || compatibilityMode) { //compatibility mode: old nodes don't send any data on join, so coordinator of new version //doesn't send BaselineTopology history, only its current globalState - dataBag.addGridCommonData(STATE_PROC.ordinal(), new ObjectData(globalState)); + dataBag.addGridCommonData(STATE_PROC.ordinal(), globalState); return; } @@ -960,12 +960,12 @@ protected IgniteCheckedException concurrentStateChangeError(ClusterState state, historyToSend = bltHist; } - dataBag.addGridCommonData(STATE_PROC.ordinal(), new ObjectData(new BaselineStateAndHistoryData(globalState, historyToSend))); + dataBag.addGridCommonData(STATE_PROC.ordinal(), new BaselineStateAndHistoryData(globalState, historyToSend)); } /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { - Serializable commonData = ObjectData.unwrap(data.commonData()); + Serializable commonData = data.commonData(); if (commonData instanceof DiscoveryDataClusterState) { if (globalState != null && globalState.baselineTopology() != null) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java index ef5c00d81ae15..ad24ff1805387 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java @@ -42,7 +42,7 @@ void collectGridNodeData(DiscoveryDataBag dataBag) { synchronized (startedRoutines) { if (!dataBag.commonDataCollectedFor(CONTINUOUS_PROC.ordinal())) dataBag.addGridCommonData(CONTINUOUS_PROC.ordinal(), - new ObjectData(new ContinuousRoutinesCommonDiscoveryData(new ArrayList<>(startedRoutines.values())))); + new ContinuousRoutinesCommonDiscoveryData(new ArrayList<>(startedRoutines.values()))); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java index 3e7b71e851b04..a459fbc3a6fcd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java @@ -47,7 +47,6 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; -import org.apache.ignite.spi.discovery.ObjectData; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; @@ -333,7 +332,7 @@ private final class MappingAcceptedListener implements CustomEventListener> mappings = ObjectData.unwrap(data.commonData()); + List> mappings = data.commonData(); processIncomingMappings(mappings); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index 208abeec94cd4..7f68de7acafd7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -831,7 +831,7 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData if (remoteVer.id() >= locVer.id()) { Serializable nodeData = new DistributedMetaStorageClusterNodeData(remoteVer, null, null, null); - dataBag.addGridCommonData(COMPONENT_ID, new ObjectData(nodeData)); + dataBag.addGridCommonData(COMPONENT_ID, nodeData); } else { if (locVer.id() - remoteVer.id() <= histCache.size() && !dataBag.isJoiningNodeClient()) { @@ -839,7 +839,7 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData Serializable nodeData = new DistributedMetaStorageClusterNodeData(ver, null, null, updates); - dataBag.addGridCommonData(COMPONENT_ID, new ObjectData(nodeData)); + dataBag.addGridCommonData(COMPONENT_ID, nodeData); } else { DistributedMetaStorageVersion ver0 = ver; @@ -855,7 +855,7 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData Serializable nodeData = new DistributedMetaStorageClusterNodeData(ver0, fullData, hist, null); - dataBag.addGridCommonData(COMPONENT_ID, new ObjectData(nodeData)); + dataBag.addGridCommonData(COMPONENT_ID, nodeData); } } } @@ -958,7 +958,7 @@ private DistributedMetaStorageKeyValuePair[] localFullData() { lock.writeLock().lock(); try { - DistributedMetaStorageClusterNodeData nodeData = ObjectData.unwrap(data.commonData()); + DistributedMetaStorageClusterNodeData nodeData = data.commonData(); if (nodeData != null) { if (nodeData.fullData != null) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 20e3313fafefe..bac2b21eeb45b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -154,7 +153,6 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.session.SessionContext; import org.apache.ignite.spi.discovery.DiscoveryDataBag; -import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; @@ -182,9 +180,6 @@ */ @SuppressWarnings("rawtypes") public class GridQueryProcessor extends GridProcessorAdapter { - /** */ - private static final String INLINE_SIZES_DISCO_BAG_KEY = "inline_sizes"; - /** Warn message if some indexes have different inline sizes on the nodes. */ public static final String INLINE_SIZES_DIFFER_WARN_MSG_FORMAT = "Inline sizes on local node and node %s are different. " + "Please drop and create again these indexes to avoid performance problems with SQL queries. Problem indexes: %s"; @@ -481,17 +476,12 @@ public void onCacheReconnect() throws IgniteCheckedException { proposals = new LinkedHashMap<>(activeProposals); } - dataBag.addGridCommonData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), new ObjectData(proposals)); + dataBag.addGridCommonData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), proposals); // We should send inline index sizes information only to server nodes. if (!dataBag.isJoiningNodeClient()) { - HashMap nodeSpecificMap = new HashMap<>(); - - Serializable oldVal = nodeSpecificMap.put(INLINE_SIZES_DISCO_BAG_KEY, collectSecondaryIndexesInlineSize()); - - assert oldVal == null : oldVal; - - dataBag.addNodeSpecificData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), new ObjectData(nodeSpecificMap)); + dataBag.addNodeSpecificData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), + new InlineSizesData(secondaryIndexesInlineSize())); } } @@ -525,21 +515,17 @@ public void onCacheReconnect() throws IgniteCheckedException { } } - if (!F.isEmpty(data.nodeSpecificData())) { + Map nodedSpecificData = data.nodeSpecificData(); + + if (!F.isEmpty(nodedSpecificData)) { Map indexesInlineSize = secondaryIndexesInlineSize(); if (!F.isEmpty(indexesInlineSize)) { - for (UUID nodeId : data.nodeSpecificData().keySet()) { - Object map = data.nodeSpecificData().get(nodeId); - - assert map instanceof Map : map; + for (UUID nodeId : nodedSpecificData.keySet()) { + InlineSizesData inlineSizesData = nodedSpecificData.get(nodeId); - Map nodeSpecificData = (Map)map; - - if (nodeSpecificData.containsKey(INLINE_SIZES_DISCO_BAG_KEY)) { - checkInlineSizes(indexesInlineSize, - nodeSpecificData.get(INLINE_SIZES_DISCO_BAG_KEY).sizes(), nodeId); - } + if (inlineSizesData != null) + checkInlineSizes(indexesInlineSize, inlineSizesData.sizes(), nodeId); } } } @@ -687,16 +673,6 @@ private void checkInlineSizes(Map local, Map r } } - /** - * @return Serializable information about secondary indexes inline size. - * @see #secondaryIndexesInlineSize() - */ - private Serializable collectSecondaryIndexesInlineSize() { - Map map = secondaryIndexesInlineSize(); - - return map instanceof Serializable ? (Serializable)map : new HashMap<>(map); - } - /** * Process schema propose message from discovery thread. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java index e146e0d216b6a..34118541ece6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java @@ -381,7 +381,7 @@ private void cancelDeployedServices() { new ArrayList<>(registeredServices.values()) ); - dataBag.addGridCommonData(SERVICE_PROC.ordinal(), new ObjectData(clusterData)); + dataBag.addGridCommonData(SERVICE_PROC.ordinal(), clusterData); } /** {@inheritDoc} */ @@ -389,7 +389,7 @@ private void cancelDeployedServices() { if (data.commonData() == null) return; - ServiceProcessorCommonDiscoveryData clusterData = ObjectData.unwrap(data.commonData()); + ServiceProcessorCommonDiscoveryData clusterData = data.commonData(); for (ServiceInfo desc : clusterData.registeredServices()) { try { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java index 20787e37f171a..0642f5be2307c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java @@ -60,11 +60,14 @@ public interface GridDiscoveryData { /** @return ID fo the joining node. */ UUID joiningNodeId(); - /** @return Common for all cluster nodes discovery data that is sent to the joining node. */ + /** + * @param Data type. + * @return Common for all cluster nodes discovery data that is sent to the joining node. + */ T commonData(); /** - * @param Type of data. + * @param Data type. * @return Discovery data that is mapped to the particular cluster node and sent to the joining node. */ Map nodeSpecificData(); From 5f2420c1af3e654045683afe65d2b50bf03d8893 Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Wed, 20 May 2026 17:20:52 +0300 Subject: [PATCH 4/6] Fix mappings --- .../marshaller/GridMarshallerMappingProcessor.java | 7 ++++--- .../processors/marshaller/MarshallerMappingsData.java | 7 +++++++ .../ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java | 10 ++++++---- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java index a459fbc3a6fcd..3707a2b69b8df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java @@ -332,7 +332,8 @@ private final class MappingAcceptedListener implements CustomEventListener> mappings = data.commonData(); + MarshallerMappingsData mappings = data.commonData(); - processIncomingMappings(mappings); + processIncomingMappings(mappings.mappings); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java index 2207b1c21f47c..37bd14d511e81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java @@ -37,4 +37,11 @@ public MarshallerMappingsData() {} public MarshallerMappingsData(List> mappings) { this.mappings = mappings; } + + /** + * @return Mappings. + */ + public List> mappings() { + return mappings; + } } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index ad98a2936b98f..1f2080e270691 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -58,6 +58,8 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage; +import org.apache.ignite.internal.processors.marshaller.MappedName; +import org.apache.ignite.internal.processors.marshaller.MarshallerMappingsData; import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -2440,12 +2442,12 @@ private static class TestTcpDiscoveryMarshallerDataSpi extends TcpDiscoverySpi { exchange.onExchange(dataBag); } - private List getAllMappings(DiscoveryDataBag bag) { - return (List)bag.commonData().get(MARSHALLER_PROC.ordinal()); + private MarshallerMappingsData getAllMappings(DiscoveryDataBag bag) { + return (MarshallerMappingsData)bag.commonData().get(MARSHALLER_PROC.ordinal()); } - private Map getJavaMappings(List allMappings) { - return (Map)allMappings.get(JAVA_ID); + private Map getJavaMappings(MarshallerMappingsData allMappings) { + return allMappings.mappings().get(JAVA_ID); } }); } From fcd5c046b7286a63fbb1b404a72f76ed3c19554f Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Fri, 22 May 2026 12:01:11 +0300 Subject: [PATCH 5/6] WIP --- .../ignite/internal/CoreMessagesProvider.java | 4 +- .../{ObjectData.java => DataBagItem.java} | 59 +++++++++++++++---- .../spi/discovery/DiscoveryDataBag.java | 12 ++-- .../ignite/spi/discovery/tcp/ServerImpl.java | 8 +++ .../discovery/tcp/TcpDiscoveryIoSession.java | 4 ++ ...emoteFilterMissingInClassPathSelfTest.java | 10 +++- ...yDataDeserializationFailureHanderTest.java | 2 +- 7 files changed, 78 insertions(+), 21 deletions(-) rename modules/core/src/main/java/org/apache/ignite/spi/discovery/{ObjectData.java => DataBagItem.java} (54%) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index 460cb7bcc1043..fc3cbf311d2bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -255,7 +255,7 @@ import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage; import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage; import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage; -import org.apache.ignite.spi.discovery.ObjectData; +import org.apache.ignite.spi.discovery.DataBagItem; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; @@ -357,7 +357,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(GridCacheVersion.class); withNoSchema(GridCacheVersionEx.class); withNoSchema(WALPointer.class); - withNoSchemaResolvedClassLoader(ObjectData.class); + withNoSchemaResolvedClassLoader(DataBagItem.class); // [5700 - 5900]: Discovery originated messages. msgIdx = 5700; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DataBagItem.java similarity index 54% rename from modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java rename to modules/core/src/main/java/org/apache/ignite/spi/discovery/DataBagItem.java index 13b019ef7e495..92269567f33ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DataBagItem.java @@ -21,18 +21,24 @@ import java.util.Arrays; import java.util.Objects; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridComponent; import org.apache.ignite.internal.MarshallableMessage; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; -/** Wrapper message for serializable data. */ -public class ObjectData implements MarshallableMessage { +import static org.apache.ignite.Ignition.localIgnite; +import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC; + +/** Wrapper message for serializable data in a {@link DiscoveryDataBag}. */ +public class DataBagItem implements MarshallableMessage { /** */ @GridToStringInclude private Serializable data; @@ -42,13 +48,17 @@ public class ObjectData implements MarshallableMessage { @Order(0) byte[] dataBytes; + /** Component id. */ + @Order(1) + byte cmpId; + /** */ - public ObjectData() {} + public DataBagItem() {} /** * @param data Original data. */ - public ObjectData(Serializable data) { + public DataBagItem(Serializable data) { this.data = data; } @@ -60,8 +70,24 @@ public ObjectData(Serializable data) { /** {@inheritDoc} */ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { - if (dataBytes != null) - data = U.unmarshal(marsh, dataBytes, clsLdr); + if (dataBytes != null) { + try { + data = U.unmarshal(marsh, dataBytes, clsLdr); + } catch (IgniteCheckedException e) { + if (CONTINUOUS_PROC.ordinal() == cmpId && X.hasCause(e, ClassNotFoundException.class) && + localIgnite().configuration().isClientMode()) { + U.warn(localIgnite().log(), "Failed to unmarshal continuous query remote filter on client node. Can be ignored."); + } + else if (cmpId < GridComponent.DiscoveryDataExchangeType.VALUES.length) { + throw new DataBagUnmarshallException("Failed to unmarshal discovery data for component: " + + GridComponent.DiscoveryDataExchangeType.VALUES[cmpId], e); + } + else { + throw new DataBagUnmarshallException("Failed to unmarshal discovery data." + + " Component " + cmpId + " is not found.", e); + } + } + } } /** @@ -77,18 +103,18 @@ T unwrap() { * @param msg Message. * @param Type of data. * - * @return Original message or data unwrapped from an ObjectData wrapper. + * @return Original message or data unwrapped from an DataBagItem wrapper. */ static @Nullable T unwrapIfNecessary(@Nullable Message msg) { if (msg == null) return null; - return msg instanceof ObjectData ? ((ObjectData)msg).unwrap() : (T)msg; + return msg instanceof DataBagItem ? ((DataBagItem)msg).unwrap() : (T)msg; } /** {@inheritDoc} */ @Override public String toString() { - return S.toString(ObjectData.class, this); + return S.toString(DataBagItem.class, this); } /** {@inheritDoc} */ @@ -96,8 +122,21 @@ T unwrap() { if (o == null || getClass() != o.getClass()) return false; - ObjectData data1 = (ObjectData)o; + DataBagItem data1 = (DataBagItem)o; return Objects.equals(data, data1.data) || Arrays.equals(dataBytes, data1.dataBytes); } + + /** */ + public static class DataBagUnmarshallException extends IgniteException { + /** + * Creates new exception with given error message and optional nested exception. + * + * @param msg Error message. + * @param cause Optional nested exception (can be {@code null}). + */ + public DataBagUnmarshallException(String msg, Throwable cause) { + super(msg, cause); + } + } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java index 0642f5be2307c..f1f0fad7c81da 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java @@ -94,7 +94,7 @@ private final class JoiningNodeDiscoveryDataImpl implements JoiningNodeDiscovery @Override @Nullable public T joiningNodeData() { Message dataMsg = joiningNodeData.get(cmpId); - return ObjectData.unwrapIfNecessary(dataMsg); + return DataBagItem.unwrapIfNecessary(dataMsg); } /** @@ -124,14 +124,14 @@ private final class GridDiscoveryDataImpl implements GridDiscoveryData { /** {@inheritDoc} */ @Override @Nullable public T commonData() { if (commonData != null) - return ObjectData.unwrapIfNecessary(commonData.get(cmpId)); + return DataBagItem.unwrapIfNecessary(commonData.get(cmpId)); return null; } /** {@inheritDoc} */ @Override public Map nodeSpecificData() { - return F.viewReadOnly(nodeSpecificData, ObjectData::unwrapIfNecessary); + return F.viewReadOnly(nodeSpecificData, DataBagItem::unwrapIfNecessary); } /** @@ -253,7 +253,7 @@ public JoiningNodeDiscoveryData newJoinerDiscoveryData(int cmpId) { * @param data Serializable data. */ public void addJoiningNodeData(Integer cmpId, Serializable data) { - joiningNodeData.put(cmpId, new ObjectData(data)); + joiningNodeData.put(cmpId, new DataBagItem(data)); } /** @@ -269,7 +269,7 @@ public void addJoiningNodeData(Integer cmpId, Message data) { * @param data Serializable data. */ public void addGridCommonData(Integer cmpId, Serializable data) { - commonData.put(cmpId, new ObjectData(data)); + commonData.put(cmpId, new DataBagItem(data)); } /** @@ -285,7 +285,7 @@ public void addGridCommonData(Integer cmpId, Message data) { * @param data Serializable data. */ public void addNodeSpecificData(Integer cmpId, Serializable data) { - addNodeSpecificData(cmpId, new ObjectData(data)); + addNodeSpecificData(cmpId, new DataBagItem(data)); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index f11ab667db0c2..70a67092d7f7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -123,6 +123,7 @@ import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; import org.apache.ignite.spi.IgniteSpiThread; +import org.apache.ignite.spi.discovery.DataBagItem; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryNotification; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; @@ -7448,6 +7449,13 @@ else if (msg instanceof TcpDiscoveryRingLatencyCheckMessage) { failure.process(new FailureContext(SYSTEM_WORKER_TERMINATION, e)); } } + catch (DataBagItem.DataBagUnmarshallException e) { + if (spi.ignite() instanceof IgniteEx) { + FailureProcessor failure = ((IgniteEx)spi.ignite()).context().failure(); + + failure.process(new FailureContext(CRITICAL_ERROR, e)); + } + } finally { if (clientMsgWrk != null) { if (log.isDebugEnabled()) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index b8fc471493069..527d880b6c990 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -40,6 +40,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageSerializer; +import org.apache.ignite.spi.discovery.DataBagItem; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -203,6 +204,9 @@ T readMessage() throws IgniteCheckedException, IOException { if (e instanceof UnknownMessageException) throw e; + if (e instanceof DataBagItem.DataBagUnmarshallException) + throw e; + // Keep logic similar to `U.marshal(...)`. if (e instanceof IgniteCheckedException) throw (IgniteCheckedException)e; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java index 3c7fd09c027dd..c4b3859236588 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java @@ -35,6 +35,7 @@ import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.StopNodeFailureHandler; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.testframework.GridStringLogger; import org.apache.ignite.testframework.GridTestUtils; @@ -156,7 +157,7 @@ public void testServerJoinsMissingClassException() throws Exception { executeContinuousQuery(ignite0.cache(DEFAULT_CACHE_NAME)); - ListeningTestLogger listeningLog = new ListeningTestLogger(); + ListeningTestLogger listeningLog = new ListeningTestLogger(log()); log = listeningLog; @@ -170,7 +171,12 @@ public void testServerJoinsMissingClassException() throws Exception { setExternalLoader = false; - GridTestUtils.assertThrows(log, () -> startGrid(2), IgniteCheckedException.class, "Failed to start"); + GridTestUtils.assertThrows( + log, + () -> startGrid(getConfiguration(getTestIgniteInstanceName(2)) + .setFailureHandler(new StopNodeFailureHandler())), + IgniteCheckedException.class, + "Failed to start"); assertTrue(lsnr.check()); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/DiscoveryDataDeserializationFailureHanderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/DiscoveryDataDeserializationFailureHanderTest.java index e52d06fa0d35f..14b57e99bce66 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/DiscoveryDataDeserializationFailureHanderTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/DiscoveryDataDeserializationFailureHanderTest.java @@ -39,7 +39,7 @@ */ public class DiscoveryDataDeserializationFailureHanderTest extends GridCommonAbstractTest { /** */ - private FailureHandler failureHnd; + private volatile FailureHandler failureHnd; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { From ec2b4cbe7b0cffb80ec03c56eac7d0920a119d38 Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Wed, 27 May 2026 10:14:13 +0300 Subject: [PATCH 6/6] WIP - test --- .../ignite/spi/discovery/DataBagItem.java | 8 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 127 ++++++++++-------- .../discovery/tcp/TcpDiscoveryIoSession.java | 2 +- 3 files changed, 77 insertions(+), 60 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DataBagItem.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DataBagItem.java index 92269567f33ed..0feccfac11009 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DataBagItem.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DataBagItem.java @@ -79,11 +79,11 @@ public DataBagItem(Serializable data) { U.warn(localIgnite().log(), "Failed to unmarshal continuous query remote filter on client node. Can be ignored."); } else if (cmpId < GridComponent.DiscoveryDataExchangeType.VALUES.length) { - throw new DataBagUnmarshallException("Failed to unmarshal discovery data for component: " + + throw new UnmarshallException("Failed to unmarshal discovery data for component: " + GridComponent.DiscoveryDataExchangeType.VALUES[cmpId], e); } else { - throw new DataBagUnmarshallException("Failed to unmarshal discovery data." + + throw new UnmarshallException("Failed to unmarshal discovery data." + " Component " + cmpId + " is not found.", e); } } @@ -128,14 +128,14 @@ T unwrap() { } /** */ - public static class DataBagUnmarshallException extends IgniteException { + public static class UnmarshallException extends IgniteException { /** * Creates new exception with given error message and optional nested exception. * * @param msg Error message. * @param cause Optional nested exception (can be {@code null}). */ - public DataBagUnmarshallException(String msg, Throwable cause) { + public UnmarshallException(String msg, Throwable cause) { super(msg, cause); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 70a67092d7f7f..0bf6bcab1d13a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2535,6 +2535,66 @@ private static Map> downcast(Map>)(Map)m; } + /** + * @param t Throwable. + */ + private void stopOnError(Throwable t) { + if (!spi.isNodeStopping0() && spiStateCopy() != DISCONNECTING) { + final Ignite ignite = spi.ignite(); + + if (ignite != null) { + U.error(log, "TcpDiscoverSpi's message worker thread failed abnormally. " + + "Stopping the node in order to prevent cluster wide instability.", t); + + new IgniteThread(ignite.name(), "node-stop-thread", () -> { + try { + IgnitionEx.stop(ignite.name(), true, ShutdownPolicy.IMMEDIATE, true); + + U.log(log, "Stopped the node successfully in response to TcpDiscoverySpi's " + + "message worker thread abnormal termination."); + } + catch (Throwable nodeStopErr) { + U.error(log, "Failed to stop the node in response to TcpDiscoverySpi's " + + "message worker thread abnormal termination.", nodeStopErr); + } + }).start(); + } + } + } + + /** + * Segment local node on failed message send. + */ + private void segmentLocalNodeOnSendFail(List failedNodes) { + String failedNodesStr = failedNodes == null ? "" : (", failedNodes=" + failedNodes); + + synchronized (mux) { + if (spiState == CONNECTING) { + U.warn(log, "Unable to connect to next nodes in a ring, it seems local node is experiencing " + + "connectivity issues or the rest of the cluster is undergoing massive restarts. Failing " + + "local node join to avoid case when one node fails a big part of cluster. To disable" + + " this behavior set TcpDiscoverySpi.setConnectionRecoveryTimeout() to 0. " + + "[connRecoveryTimeout=" + spi.connRecoveryTimeout + ", effectiveConnRecoveryTimeout=" + + spi.getEffectiveConnectionRecoveryTimeout() + failedNodesStr + ']'); + + spiState = RING_FAILED; + + mux.notifyAll(); + + return; + } + } + + U.warn(log, "Unable to connect to next nodes in a ring, " + + "it seems local node is experiencing connectivity issues. Segmenting local node " + + "to avoid case when one node fails a big part of cluster. To disable" + + " this behavior set TcpDiscoverySpi.setConnectionRecoveryTimeout() to 0. " + + "[connRecoveryTimeout=" + spi.connRecoveryTimeout + ", effectiveConnRecoveryTimeout=" + + spi.getEffectiveConnectionRecoveryTimeout() + failedNodesStr + ']'); + + notifyDiscovery(EVT_NODE_SEGMENTED, ring.topologyVersion(), locNode); + } + /** * Discovery messages history used for client reconnect. */ @@ -3100,27 +3160,7 @@ private void addToQueue(TcpDiscoveryAbstractMessage msg, boolean addFirst) { throw e; } catch (Throwable e) { - if (!spi.isNodeStopping0() && spiStateCopy() != DISCONNECTING) { - final Ignite ignite = spi.ignite(); - - if (ignite != null) { - U.error(log, "TcpDiscoverSpi's message worker thread failed abnormally. " + - "Stopping the node in order to prevent cluster wide instability.", e); - - new IgniteThread(igniteInstanceName(), "node-stop-thread", () -> { - try { - IgnitionEx.stop(ignite.name(), true, ShutdownPolicy.IMMEDIATE, true); - - U.log(log, "Stopped the node successfully in response to TcpDiscoverySpi's " + - "message worker thread abnormal termination."); - } - catch (Throwable nodeStopErr) { - U.error(log, "Failed to stop the node in response to TcpDiscoverySpi's " + - "message worker thread abnormal termination.", nodeStopErr); - } - }).start(); - } - } + stopOnError(e); err = e; @@ -4082,39 +4122,6 @@ private void processPendingMessagesLocally(TcpDiscoveryAbstractMessage curMsg) { } } - /** - * Segment local node on failed message send. - */ - private void segmentLocalNodeOnSendFail(List failedNodes) { - String failedNodesStr = failedNodes == null ? "" : (", failedNodes=" + failedNodes); - - synchronized (mux) { - if (spiState == CONNECTING) { - U.warn(log, "Unable to connect to next nodes in a ring, it seems local node is experiencing " + - "connectivity issues or the rest of the cluster is undergoing massive restarts. Failing " + - "local node join to avoid case when one node fails a big part of cluster. To disable" + - " this behavior set TcpDiscoverySpi.setConnectionRecoveryTimeout() to 0. " + - "[connRecoveryTimeout=" + spi.connRecoveryTimeout + ", effectiveConnRecoveryTimeout=" - + spi.getEffectiveConnectionRecoveryTimeout() + failedNodesStr + ']'); - - spiState = RING_FAILED; - - mux.notifyAll(); - - return; - } - } - - U.warn(log, "Unable to connect to next nodes in a ring, " + - "it seems local node is experiencing connectivity issues. Segmenting local node " + - "to avoid case when one node fails a big part of cluster. To disable" + - " this behavior set TcpDiscoverySpi.setConnectionRecoveryTimeout() to 0. " + - "[connRecoveryTimeout=" + spi.connRecoveryTimeout + ", effectiveConnRecoveryTimeout=" - + spi.getEffectiveConnectionRecoveryTimeout() + failedNodesStr + ']'); - - notifyDiscovery(EVT_NODE_SEGMENTED, ring.topologyVersion(), locNode); - } - /** * Adds failed node IDs to the given discovery message. Will not clean the existing failed node IDs collection * from the message. @@ -6803,6 +6810,8 @@ private class SocketReader extends IgniteSpiThread { boolean srvSock; + boolean cancelTcpSrvr = false; + try { try { // Set socket options. @@ -7449,12 +7458,16 @@ else if (msg instanceof TcpDiscoveryRingLatencyCheckMessage) { failure.process(new FailureContext(SYSTEM_WORKER_TERMINATION, e)); } } - catch (DataBagItem.DataBagUnmarshallException e) { + catch (DataBagItem.UnmarshallException e) { if (spi.ignite() instanceof IgniteEx) { FailureProcessor failure = ((IgniteEx)spi.ignite()).context().failure(); failure.process(new FailureContext(CRITICAL_ERROR, e)); } + +// segmentLocalNodeOnSendFail(null); + + cancelTcpSrvr = true; } finally { if (clientMsgWrk != null) { @@ -7476,6 +7489,10 @@ else if (msg instanceof TcpDiscoveryRingLatencyCheckMessage) { if (isLocalNodeCoordinator() && !ring.hasRemoteServerNodes()) U.enhanceThreadName(msgWorkerThread, "crd"); + + // Critical failure. We should not accept new incoming connections. + if (cancelTcpSrvr) + tcpSrvr.cancel(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 527d880b6c990..3128903e533f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -204,7 +204,7 @@ T readMessage() throws IgniteCheckedException, IOException { if (e instanceof UnknownMessageException) throw e; - if (e instanceof DataBagItem.DataBagUnmarshallException) + if (e instanceof DataBagItem.UnmarshallException) throw e; // Keep logic similar to `U.marshal(...)`.