Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
227c3a1
raw
Vladsz83 Feb 18, 2026
1e1dd00
fix
Vladsz83 Feb 18, 2026
43f4267
fix
Vladsz83 Feb 18, 2026
d68b352
fix
Vladsz83 Feb 19, 2026
5ac43ab
Merge branch 'master' into Message-serializer-for-TcpDiscoveryNodeAdd…
Vladsz83 Feb 19, 2026
1064805
refactoring. + dedicated if
Vladsz83 Feb 19, 2026
4adf01a
impl
Vladsz83 Feb 19, 2026
330d359
Revert "impl"
Vladsz83 Feb 19, 2026
1baa2f2
Revert "refactoring. + dedicated if"
Vladsz83 Feb 19, 2026
9067b9b
fix the serialization
Vladsz83 Feb 19, 2026
d4e6b2b
fixes
Vladsz83 Feb 20, 2026
fb52e0e
fixes
Vladsz83 Feb 20, 2026
333185b
fixes
Vladsz83 Feb 21, 2026
31a3d29
lost serialization fix
Vladsz83 Feb 21, 2026
0a2b97e
Merge branch 'master' into TcpDiscoveryNodeAddedMessage
Vladsz83 Feb 21, 2026
7272166
+ master
Vladsz83 Feb 21, 2026
31c46d9
minor
Vladsz83 Feb 21, 2026
e3e7ae2
impl
Vladsz83 Feb 21, 2026
629d051
impl
Vladsz83 Feb 22, 2026
7f0cbc3
impl
Vladsz83 Feb 22, 2026
cdedd33
fix
Vladsz83 Feb 22, 2026
4ce7031
fix
Vladsz83 Feb 23, 2026
de7eb5e
minority
Vladsz83 Feb 23, 2026
811144a
Merge branch 'master' into IGNITE-27414-TcpDiscoveryClientReconnectMe…
Vladsz83 Feb 24, 2026
6e066b9
merged master
Vladsz83 Feb 24, 2026
072e46d
Merge branch 'master' into IGNITE-27414-TcpDiscoveryClientReconnectMe…
Vladsz83 Feb 24, 2026
6e2ec6e
+ master
Vladsz83 Feb 24, 2026
bdb8b96
Merge branch 'master' into IGNITE-27414-TcpDiscoveryClientReconnectMe…
Vladsz83 Feb 25, 2026
226df11
- Review fix
Vladsz83 Feb 25, 2026
66a1dc1
Revert refactoring
Vladsz83 Feb 25, 2026
21cf4ac
Revert refactoring
Vladsz83 Feb 25, 2026
e132454
Merge remote-tracking branch 'my/IGNITE-27414-TcpDiscoveryClientRecon…
Vladsz83 Feb 25, 2026
13aa4d4
minority
Vladsz83 Feb 26, 2026
579d257
reimpl
Vladsz83 Feb 26, 2026
f333d9f
+master
Vladsz83 Feb 26, 2026
a186f45
minority
Vladsz83 Feb 26, 2026
32b0858
Merge branch 'master' into IGNITE-27414-TcpDiscoveryClientReconnectMe…
Vladsz83 Feb 27, 2026
e31efee
refactor
Vladsz83 Feb 28, 2026
7ea9e34
minor revert
Vladsz83 Feb 28, 2026
cc60543
revert TcpDiscoveryJoinRequestMessage finishMarsh
Vladsz83 Feb 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@

package org.apache.ignite.internal.managers.discovery;

import java.util.function.Supplier;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessage;
import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessageSerializer;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacketSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
Expand All @@ -45,6 +52,10 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequestSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponseSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
Expand All @@ -61,6 +72,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMarshallableMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
Expand All @@ -83,11 +95,35 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryServerOnlyCustomEventMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessageSerializer;
import org.jetbrains.annotations.Nullable;

/** Message factory for discovery messages. */
/**
* Message factory for discovery messages. Allows to create an enhanced {@link MessageFactory} allowing to create
* automated pre- and post- marshalling message serializer for {@link TcpDiscoveryMarshallableMessage}.
*/
public class DiscoveryMessageFactory implements MessageFactoryProvider {
/** Custom data marshaller. */
private final @Nullable Marshaller cstDataMarshall;

/** Class loader for the custom data marshalling. */
private final @Nullable ClassLoader cstDataMarshallClsLdr;

/**
* @param cstDataMarshall Custom data marshaller.
* @param cstDataMarshallClsLdr Class loader for the custom data marshalling.
*/
public DiscoveryMessageFactory(@Nullable Marshaller cstDataMarshall, @Nullable ClassLoader cstDataMarshallClsLdr) {
assert cstDataMarshall == null && cstDataMarshallClsLdr == null || cstDataMarshall != null && cstDataMarshallClsLdr != null;

this.cstDataMarshall = cstDataMarshall;
this.cstDataMarshallClsLdr = cstDataMarshallClsLdr;
}

/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
factory = enhanceMessageFactory(factory);

factory.register((short)-108, TcpDiscoveryCollectionMessage::new, new TcpDiscoveryCollectionMessageSerializer());
factory.register((short)-107, NodeSpecificData::new, new NodeSpecificDataSerializer());
factory.register((short)-106, DiscoveryDataPacket::new, new DiscoveryDataPacketSerializer());
factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new,
Expand Down Expand Up @@ -123,8 +159,75 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)21, TcpDiscoveryCustomEventMessage::new, new TcpDiscoveryCustomEventMessageSerializer());
factory.register((short)22, TcpDiscoveryServerOnlyCustomEventMessage::new,
new TcpDiscoveryServerOnlyCustomEventMessageSerializer());
factory.register((short)23, TcpDiscoveryClientReconnectMessage::new, new TcpDiscoveryClientReconnectMessageSerializer());

// DiscoveryCustomMessage
factory.register((short)500, CacheStatisticsModeChangeMessage::new, new CacheStatisticsModeChangeMessageSerializer());
}

/**
* @return Enhanced {@link MessageFactory} allowing to create automated pre- and post- marshalling message serializer
* for {@link TcpDiscoveryMarshallableMessage}.
*/
private MessageFactory enhanceMessageFactory(MessageFactory mf) {
if (cstDataMarshall == null || cstDataMarshallClsLdr == null)
return mf;

return new MessageFactory() {
@Override public void register(
short directType,
Supplier<Message> supplier,
MessageSerializer serializer
) throws IgniteException {
if (supplier.get() instanceof TcpDiscoveryMarshallableMessage) {
final MessageSerializer serializer0 = serializer;

serializer = new MessageSerializer() {
private Message curMarshallableMsg;

@Override public boolean writeTo(Message msg, MessageWriter writer) {
if (msg instanceof TcpDiscoveryMarshallableMessage && curMarshallableMsg == null) {
curMarshallableMsg = msg;

((TcpDiscoveryMarshallableMessage)msg).prepareMarshal(cstDataMarshall);
}

boolean res = serializer0.writeTo(msg, writer);

if (res && curMarshallableMsg != null) {
assert msg instanceof TcpDiscoveryMarshallableMessage;

curMarshallableMsg = null;
}

return res;
}

@Override public boolean readFrom(Message msg, MessageReader reader) {
boolean res = serializer0.readFrom(msg, reader);

if (res && msg instanceof TcpDiscoveryMarshallableMessage)
((TcpDiscoveryMarshallableMessage)msg).finishUnmarshal(cstDataMarshall, cstDataMarshallClsLdr);

return res;
}
};
}

mf.register(directType, supplier, serializer);
}

@Override public void register(short directType, Supplier<Message> supplier) throws IgniteException {
mf.register(directType, supplier);
}

@Override public Message create(short type) {
return mf.create(type);
}

@Override public MessageSerializer serializer(short type) {
return mf.serializer(type);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -788,8 +788,6 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws

TcpDiscoveryJoinRequestMessage joinReqMsg = new TcpDiscoveryJoinRequestMessage(node, discoveryData);

joinReqMsg.prepareMarshal(spi.marshaller());

TcpDiscoveryNode nodef = node;

joinReqMsg.spanContainer().span(
Expand Down Expand Up @@ -2241,7 +2239,7 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {

nodeAdded = true;

if (msg.topologyHistory() != null)
if (!F.isEmpty(msg.topologyHistory()))
topHist.putAll(msg.topologyHistory());
}
else {
Expand Down Expand Up @@ -2296,9 +2294,9 @@ private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage ms
}
}

if (getLocalNodeId().equals(msg.nodeId())) {
if (getLocalNodeId().equals(msg.nodeId)) {
if (joining()) {
DiscoveryDataPacket dataContainer = msg.clientDiscoData();
DiscoveryDataPacket dataContainer = msg.clientDiscoData;

if (dataContainer != null)
spi.onExchange(dataContainer, U.resolveClassLoader(spi.ignite().configuration()));
Expand All @@ -2310,8 +2308,6 @@ private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage ms
delayDiscoData.clear();
}

msg.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));

locNode.setAttributes(msg.clientNodeAttributes());

clearNodeSensitiveData(locNode);
Expand Down Expand Up @@ -2355,7 +2351,7 @@ else if (log.isDebugEnabled())
}
else {
if (nodeAdded()) {
TcpDiscoveryNode node = rmtNodes.get(msg.nodeId());
TcpDiscoveryNode node = rmtNodes.get(msg.nodeId);

if (node == null) {
if (log.isDebugEnabled())
Expand Down Expand Up @@ -2548,6 +2544,10 @@ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage ms
return;

if (getLocalNodeId().equals(msg.creatorNodeId())) {
Collection<TcpDiscoveryAbstractMessage> pendingMsgs = msg.pendingMsgsMsg == null
? Collections.emptyList()
: msg.pendingMsgsMsg.messages();

if (reconnector != null) {
assert msg.success() : msg;

Expand All @@ -2558,7 +2558,7 @@ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage ms

reconnector = null;

for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
if (log.isDebugEnabled())
log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']');

Expand All @@ -2568,7 +2568,7 @@ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage ms
else {
if (joinLatch.getCount() > 0) {
if (msg.success()) {
for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
if (log.isDebugEnabled())
log.debug("Process pending message on connect [msg=" + pendingMsg + ']');

Expand Down
Loading