diff --git a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java index 355072e0b305f..41d93c4e26054 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -400,10 +401,7 @@ private void returnFalseIfWriteFailed(VariableElement field) throws Exception { return; } - imports.add("org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType"); - - returnFalseIfWriteFailed(write, field, "writer.writeObjectArray", getExpr, - "MessageCollectionItemType." + messageCollectionItemType(componentType)); + returnFalseIfWriteFailed(write, field, "writer.writeObjectArray", getExpr, messageCollectionItemTypes(type)); return; } @@ -425,16 +423,10 @@ else if (sameType(type, "org.apache.ignite.internal.processors.affinity.Affinity returnFalseIfWriteFailed(write, field, "writer.writeAffinityTopologyVersion", getExpr); else if (assignableFrom(erasedType(type), type(Map.class.getName()))) { - List typeArgs = ((DeclaredType)type).getTypeArguments(); - - assert typeArgs.size() == 2; - - imports.add("org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType"); - List args = new ArrayList<>(); + args.add(getExpr); - args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0))); - args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1))); + args.add(messageCollectionItemTypes(type)); if (compress) args.add("true"); // the value of the compress argument in the MessageWriter#writeMap method @@ -461,20 +453,8 @@ else if (assignableFrom(type, type(MESSAGE_INTERFACE))) { returnFalseIfWriteFailed(write, field, "writer.writeMessage", getExpr); } - else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) { - List typeArgs = ((DeclaredType)type).getTypeArguments(); - - assert typeArgs.size() == 1; - - imports.add("org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType"); - - String collectionWriter = assignableFrom(erasedType(type), type(Set.class.getName())) - ? "writer.writeSet" - : "writer.writeCollection"; - - returnFalseIfWriteFailed(write, field, collectionWriter, getExpr, - "MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0))); - } + else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) + returnFalseIfWriteFailed(write, field, "writer.writeCollection", getExpr, messageCollectionItemTypes(type)); else if (enumType(env, type)) { Element element = env.getTypeUtils().asElement(type); @@ -641,13 +621,7 @@ private void returnFalseIfReadFailed(VariableElement field) throws Exception { } if (componentType.getKind() == TypeKind.ARRAY) { - TypeMirror ctype = ((ArrayType)componentType).getComponentType(); - - assert ctype.getKind().isPrimitive(); - - returnFalseIfReadFailed(field, "reader.readObjectArray", - "MessageCollectionItemType." + messageCollectionItemType(ctype), - ctype.getKind().name().toLowerCase() + "[].class"); + returnFalseIfReadFailed(field, "reader.readObjectArray", messageCollectionItemTypes(type)); return; } @@ -655,11 +629,7 @@ private void returnFalseIfReadFailed(VariableElement field) throws Exception { if (componentType.getKind() == TypeKind.DECLARED) { Element componentElement = ((DeclaredType)componentType).asElement(); - String cls = componentElement.getSimpleName().toString(); - - returnFalseIfReadFailed(field, "reader.readObjectArray", - "MessageCollectionItemType." + messageCollectionItemType(componentType), - cls + ".class"); + returnFalseIfReadFailed(field, "reader.readObjectArray", messageCollectionItemTypes(type)); if (!"java.lang".equals(env.getElementUtils().getPackageOf(componentElement).getQualifiedName().toString())) { String importCls = ((QualifiedNameable)componentElement).getQualifiedName().toString(); @@ -693,9 +663,8 @@ else if (assignableFrom(erasedType(type), type(Map.class.getName()))) { assert typeArgs.size() == 2; List args = new ArrayList<>(); - args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0))); - args.add("MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(1))); - args.add("false"); // the value of the linked argument in the MessageReader#readMap method + + args.add(messageCollectionItemTypes(type)); if (compress) args.add("true"); // the value of the compress argument in the MessageReader#readMap method @@ -723,16 +692,7 @@ else if (assignableFrom(type, type(MESSAGE_INTERFACE))) { } else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) { - List typeArgs = ((DeclaredType)type).getTypeArguments(); - - assert typeArgs.size() == 1; - - String collectionReader = assignableFrom(erasedType(type), type(Set.class.getName())) - ? "reader.readSet" - : "reader.readCollection"; - - returnFalseIfReadFailed(field, collectionReader, - "MessageCollectionItemType." + messageCollectionItemType(typeArgs.get(0))); + returnFalseIfReadFailed(field, "reader.readCollection", messageCollectionItemTypes(type)); } else if (enumType(env, type)) { String fieldPrefix = typeNameToFieldName(env.getTypeUtils().asElement(type).getSimpleName().toString()); @@ -754,6 +714,68 @@ else if (enumType(env, type)) { throw new IllegalArgumentException("Unsupported type kind: " + type.getKind()); } + /** */ + private String messageCollectionItemTypes(TypeMirror type) throws Exception { + imports.add("org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType"); + + if (type.getKind() == TypeKind.ARRAY) { + ArrayType arrType = (ArrayType)type; + TypeMirror componentType = arrType.getComponentType(); + + String clazz; + + if (componentType.getKind() == TypeKind.ARRAY) { + TypeMirror ctype = ((ArrayType)componentType).getComponentType(); + + clazz = ctype.getKind().name().toLowerCase() + "[].class"; + } + else if (componentType.getKind() == TypeKind.DECLARED) { + Element componentElement = ((DeclaredType)componentType).asElement(); + + clazz = componentElement.getSimpleName() + ".class"; + } + else { + assert componentType.getKind().isPrimitive(); + + imports.add("org.apache.ignite.plugin.extensions.communication.MessageItemType"); + + return "new MessageItemType(MessageCollectionItemType." + messageCollectionItemType(componentType) + "_ARR)"; + } + + imports.add("org.apache.ignite.plugin.extensions.communication.MessageArrayType"); + + return "new MessageArrayType(" + messageCollectionItemTypes(componentType) + ", " + clazz + ")"; + } + else if (assignableFrom(erasedType(type), type(Map.class.getName()))) { + imports.add("org.apache.ignite.plugin.extensions.communication.MessageMapType"); + + List typeArgs = ((DeclaredType)type).getTypeArguments(); + + assert typeArgs.size() == 2; + + return "new MessageMapType(" + + messageCollectionItemTypes(typeArgs.get(0)) + ", " + + messageCollectionItemTypes(typeArgs.get(1)) + ", " + + assignableFrom(erasedType(type), type(LinkedHashMap.class.getName())) + ")"; + } + else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) { + imports.add("org.apache.ignite.plugin.extensions.communication.MessageCollectionType"); + + List typeArgs = ((DeclaredType)type).getTypeArguments(); + + assert typeArgs.size() == 1; + + return "new MessageCollectionType(" + + messageCollectionItemTypes(typeArgs.get(0)) + ", " + + assignableFrom(erasedType(type), type(Set.class.getName())) + ")"; + } + else { + imports.add("org.apache.ignite.plugin.extensions.communication.MessageItemType"); + + return "new MessageItemType(MessageCollectionItemType." + messageCollectionItemType(type) + ")"; + } + } + /** * Find MessageCollectionItemType for a given type. *

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index 3217d7630a0e4..cce73aee5b1ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -21,7 +21,6 @@ import java.util.BitSet; import java.util.Collection; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.function.Function; import org.apache.ignite.internal.direct.state.DirectMessageState; @@ -38,8 +37,10 @@ import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageArrayType; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionType; import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.apache.ignite.plugin.extensions.communication.MessageMapType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.jetbrains.annotations.Nullable; @@ -379,10 +380,10 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Override public T[] readObjectArray(MessageCollectionItemType itemType, Class itemCls) { + @Override public T[] readObjectArray(MessageArrayType type) { DirectByteBufferStream stream = state.item().stream; - T[] msg = stream.readObjectArray(itemType, itemCls, this); + T[] msg = stream.readObjectArray(type, this); lastRead = stream.lastFinished(); @@ -390,10 +391,10 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Override public > C readCollection(MessageCollectionItemType itemType) { + @Override public > C readCollection(MessageCollectionType type) { DirectByteBufferStream stream = state.item().stream; - C col = stream.readList(itemType, this); + C col = stream.readCollection(type, this); lastRead = stream.lastFinished(); @@ -401,19 +402,7 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Override public > SET readSet(MessageCollectionItemType itemType) { - DirectByteBufferStream stream = state.item().stream; - - SET set = stream.readSet(itemType, this); - - lastRead = stream.lastFinished(); - - return set; - } - - /** {@inheritDoc} */ - @Override public > M readMap(MessageCollectionItemType keyType, - MessageCollectionItemType valType, boolean linked, boolean compress) { + @Override public > M readMap(MessageMapType type, boolean compress) { DirectByteBufferStream stream = state.item().stream; M map; @@ -421,10 +410,10 @@ public ByteBuffer getBuffer() { if (compress) map = readCompressedMessageAndDeserialize( stream, - tmpReader -> tmpReader.state.item().stream.readMap(keyType, valType, linked, tmpReader) + tmpReader -> tmpReader.state.item().stream.readMap(type, tmpReader) ); else { - map = stream.readMap(keyType, valType, linked, this); + map = stream.readMap(type, this); lastRead = stream.lastFinished(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index 2cb363f1bd41a..db35c93e1651b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -21,7 +21,6 @@ import java.util.BitSet; import java.util.Collection; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.function.Consumer; import org.apache.ignite.internal.direct.state.DirectMessageState; @@ -37,8 +36,10 @@ import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageArrayType; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionType; import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.apache.ignite.plugin.extensions.communication.MessageMapType; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; @@ -360,41 +361,35 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Override public boolean writeObjectArray(T[] arr, MessageCollectionItemType itemType) { + @Override public boolean writeObjectArray(T[] arr, MessageArrayType type) { DirectByteBufferStream stream = state.item().stream; - stream.writeObjectArray(arr, itemType, this); + stream.writeObjectArray(arr, type, this); return stream.lastFinished(); } /** {@inheritDoc} */ - @Override public boolean writeCollection(Collection col, MessageCollectionItemType itemType) { + @Override public boolean writeCollection(Collection col, MessageCollectionType type) { DirectByteBufferStream stream = state.item().stream; - stream.writeCollection(col, itemType, this); + stream.writeCollection(col, type, this); return stream.lastFinished(); } /** {@inheritDoc} */ - @Override public boolean writeSet(Set set, MessageCollectionItemType itemType) { - return writeCollection(set, itemType); - } - - /** {@inheritDoc} */ - @Override public boolean writeMap(Map map, MessageCollectionItemType keyType, - MessageCollectionItemType valType, boolean compress) { + @Override public boolean writeMap(Map map, MessageMapType type, boolean compress) { DirectByteBufferStream stream = state.item().stream; if (compress) writeCompressedMessage( - tmpWriter -> tmpWriter.state.item().stream.writeMap(map, keyType, valType, tmpWriter), + tmpWriter -> tmpWriter.state.item().stream.writeMap(map, type, tmpWriter), map == null, stream ); else - stream.writeMap(map, keyType, valType, this); + stream.writeMap(map, type, this); return stream.lastFinished(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java index ca3ecfc7fbc3d..6890200d5f754 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.RandomAccess; -import java.util.Set; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -43,9 +42,12 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageArrayType; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionType; import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.apache.ignite.plugin.extensions.communication.MessageMapType; import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageType; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; @@ -938,10 +940,10 @@ public void writeMessage(Message msg, MessageWriter writer) { /** * @param arr Array. - * @param itemType Component type. + * @param type Type. * @param writer Writer. */ - public void writeObjectArray(T[] arr, MessageCollectionItemType itemType, MessageWriter writer) { + public void writeObjectArray(T[] arr, MessageArrayType type, MessageWriter writer) { if (arr != null) { int len = arr.length; @@ -958,7 +960,7 @@ public void writeObjectArray(T[] arr, MessageCollectionItemType itemType, Me if (arrCur == NULL) arrCur = arr[arrPos++]; - write(itemType, arrCur, writer); + write(type.valueType(), arrCur, writer); if (!lastFinished) return; @@ -974,13 +976,13 @@ public void writeObjectArray(T[] arr, MessageCollectionItemType itemType, Me /** * @param col Collection. - * @param itemType Component type. + * @param type Type. * @param writer Writer. */ - public void writeCollection(Collection col, MessageCollectionItemType itemType, MessageWriter writer) { + public void writeCollection(Collection col, MessageCollectionType type, MessageWriter writer) { if (col != null) { if (col instanceof List && col instanceof RandomAccess) - writeRandomAccessList((List)col, itemType, writer); + writeRandomAccessList((List)col, type, writer); else { if (it == null) { writeInt(col.size()); @@ -995,7 +997,7 @@ public void writeCollection(Collection col, MessageCollectionItemType ite if (cur == NULL) cur = it.next(); - write(itemType, cur, writer); + write(type.valueType(), cur, writer); if (!lastFinished) return; @@ -1012,10 +1014,10 @@ public void writeCollection(Collection col, MessageCollectionItemType ite /** * @param list List. - * @param itemType Component type. + * @param type Type. * @param writer Writer. */ - private void writeRandomAccessList(List list, MessageCollectionItemType itemType, MessageWriter writer) { + private void writeRandomAccessList(List list, MessageCollectionType type, MessageWriter writer) { assert list instanceof RandomAccess; int size = list.size(); @@ -1033,7 +1035,7 @@ private void writeRandomAccessList(List list, MessageCollectionItemType i if (arrCur == NULL) arrCur = list.get(arrPos++); - write(itemType, arrCur, writer); + write(type.valueType(), arrCur, writer); if (!lastFinished) return; @@ -1046,11 +1048,10 @@ private void writeRandomAccessList(List list, MessageCollectionItemType i /** * @param map Map. - * @param keyType Key type. - * @param valType Value type. + * @param type Type. * @param writer Writer. */ - public void writeMap(Map map, MessageCollectionItemType keyType, MessageCollectionItemType valType, MessageWriter writer) { + public void writeMap(Map map, MessageMapType type, MessageWriter writer) { if (map != null) { if (mapIt == null) { writeInt(map.size()); @@ -1070,7 +1071,7 @@ public void writeMap(Map map, MessageCollectionItemType keyType, Me e = (Map.Entry)mapCur; if (!keyDone) { - write(keyType, e.getKey(), writer); + write(type.keyType(), e.getKey(), writer); if (!lastFinished) return; @@ -1078,7 +1079,7 @@ public void writeMap(Map map, MessageCollectionItemType keyType, Me keyDone = true; } - write(valType, e.getValue(), writer); + write(type.valueType(), e.getValue(), writer); if (!lastFinished) return; @@ -1597,12 +1598,11 @@ public T readMessage(MessageReader reader) { } /** - * @param itemType Item type. - * @param itemCls Item class. + * @param type Item type. * @param reader Reader. * @return Array. */ - public T[] readObjectArray(MessageCollectionItemType itemType, Class itemCls, MessageReader reader) { + public T[] readObjectArray(MessageArrayType type, MessageReader reader) { if (readSize == -1) { int size = readInt(); @@ -1614,10 +1614,10 @@ public T[] readObjectArray(MessageCollectionItemType itemType, Class item if (readSize >= 0) { if (objArr == null) - objArr = itemCls != null ? (Object[])Array.newInstance(itemCls, readSize) : new Object[readSize]; + objArr = type.clazz() != null ? (Object[])Array.newInstance(type.clazz(), readSize) : new Object[readSize]; for (int i = readItems; i < readSize; i++) { - Object item = read(itemType, reader); + Object item = read(type.valueType(), reader); if (!lastFinished) return null; @@ -1639,37 +1639,14 @@ public T[] readObjectArray(MessageCollectionItemType itemType, Class item return objArr0; } - /** - * Reads collection as an {@link ArrayList}. - * - * @param itemType Item type. - * @param reader Reader. - * @return {@link ArrayList}. - */ - public > L readList(MessageCollectionItemType itemType, MessageReader reader) { - return readCollection(itemType, reader, false); - } - - /** - * Reads collection as a {@link HashSet}. - * - * @param itemType Item type. - * @param reader Reader. - * @return {@link HashSet}. - */ - public > SET readSet(MessageCollectionItemType itemType, MessageReader reader) { - return readCollection(itemType, reader, true); - } - /** * Reads collection eather as a {@link ArrayList} or a {@link HashSet}. * - * @param itemType Item type. + * @param type Item type. * @param reader Reader. - * @param set Read-as-Set flag. * @return {@link ArrayList} or a {@link HashSet}. */ - private > C readCollection(MessageCollectionItemType itemType, MessageReader reader, boolean set) { + public > C readCollection(MessageCollectionType type, MessageReader reader) { if (readSize == -1) { int size = readInt(); @@ -1681,10 +1658,10 @@ private > C readCollection(MessageCollectionItemType ite if (readSize >= 0) { if (col == null) - col = set ? U.newHashSet(readSize) : new ArrayList<>(readSize); + col = type.set() ? U.newHashSet(readSize) : new ArrayList<>(readSize); for (int i = readItems; i < readSize; i++) { - Object item = read(itemType, reader); + Object item = read(type.valueType(), reader); if (!lastFinished) return null; @@ -1707,14 +1684,11 @@ private > C readCollection(MessageCollectionItemType ite } /** - * @param keyType Key type. - * @param valType Value type. - * @param linked Whether linked map should be created. + * @param type Value type. * @param reader Reader. * @return Map. */ - public > M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType, - boolean linked, MessageReader reader) { + public > M readMap(MessageMapType type, MessageReader reader) { if (readSize == -1) { int size = readInt(); @@ -1726,11 +1700,11 @@ private > C readCollection(MessageCollectionItemType ite if (readSize >= 0) { if (map == null) - map = linked ? U.newLinkedHashMap(readSize) : U.newHashMap(readSize); + map = type.linked() ? U.newLinkedHashMap(readSize) : U.newHashMap(readSize); for (int i = readItems; i < readSize; i++) { if (!keyDone) { - Object key = read(keyType, reader); + Object key = read(type.keyType(), reader); if (!lastFinished) return null; @@ -1739,7 +1713,7 @@ private > C readCollection(MessageCollectionItemType ite keyDone = true; } - Object val = read(valType, reader); + Object val = read(type.valueType(), reader); if (!lastFinished) return null; @@ -2020,8 +1994,8 @@ T readArrayLE(ArrayCreator creator, int typeSize, int lenShift, long off) * @param val Value. * @param writer Writer. */ - protected void write(MessageCollectionItemType type, Object val, MessageWriter writer) { - switch (type) { + protected void write(MessageType type, Object val, MessageWriter writer) { + switch (type.type()) { case BYTE: writeByte((Byte)val); @@ -2142,6 +2116,21 @@ protected void write(MessageCollectionItemType type, Object val, MessageWriter w break; + case MAP: + writeMap((Map)val, (MessageMapType)type, writer); + + break; + + case COLLECTION: + writeCollection((Collection)val, (MessageCollectionType)type, writer); + + break; + + case ARRAY: + writeObjectArray((V[])val, (MessageArrayType)type, writer); + + break; + case MSG: try { if (val != null) @@ -2166,8 +2155,8 @@ protected void write(MessageCollectionItemType type, Object val, MessageWriter w * @param reader Reader. * @return Value. */ - protected Object read(MessageCollectionItemType type, MessageReader reader) { - switch (type) { + protected Object read(MessageType type, MessageReader reader) { + switch (type.type()) { case BYTE: return readByte(); @@ -2240,6 +2229,15 @@ protected Object read(MessageCollectionItemType type, MessageReader reader) { case GRID_LONG_LIST: return readGridLongList(); + case MAP: + return readMap((MessageMapType)type, reader); + + case COLLECTION: + return readCollection((MessageCollectionType)type, reader); + + case ARRAY: + return readObjectArray((MessageArrayType)type, reader); + case MSG: return readMessage(reader); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 14272ac0f0ea3..83a6271af1cae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -27,6 +27,8 @@ import org.apache.ignite.internal.processors.authentication.UserProposedMessage; import org.apache.ignite.internal.processors.authentication.UserProposedMessageSerializer; import org.apache.ignite.internal.processors.authentication.UserSerializer; +import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; +import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessageSerializer; import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessage; import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessageSerializer; import org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessage; @@ -217,5 +219,6 @@ public DiscoveryMessageFactory(@Nullable Marshaller cstDataMarshall, @Nullable C factory.register((short)512, ChangeGlobalStateFinishMessage::new, new ChangeGlobalStateFinishMessageSerializer()); factory.register((short)513, StopRoutineAckDiscoveryMessage::new, new StopRoutineAckDiscoveryMessageSerializer()); factory.register((short)514, StopRoutineDiscoveryMessage::new, new StopRoutineDiscoveryMessageSerializer()); + factory.register((short)515, CacheAffinityChangeMessage::new, new CacheAffinityChangeMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java index 748693473b44a..6026449475c0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; @@ -28,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport; import org.jetbrains.annotations.Nullable; @@ -35,30 +37,37 @@ * CacheAffinityChangeMessage represent a message that switches to a new affinity assignmentafter rebalance is finished. * This message should not be mutated in any way outside the "disco-notifier-worker" thread. */ -public class CacheAffinityChangeMessage implements DiscoveryCustomMessage { +public class CacheAffinityChangeMessage implements DiscoveryCustomMessage, Message { /** */ private static final long serialVersionUID = 0L; /** */ - private IgniteUuid id = IgniteUuid.randomUuid(); + @Order(0) + IgniteUuid id; /** */ - private AffinityTopologyVersion topVer; + @Order(1) + AffinityTopologyVersion topVer; /** */ - private GridDhtPartitionExchangeId exchId; + @Order(2) + GridDhtPartitionExchangeId exchId; /** */ - private Map>> assignmentChange; + @Order(3) + Map>> assignmentChange; /** */ - private Map cacheDeploymentIds; + @Order(4) + Map cacheDeploymentIds; /** */ - private GridDhtPartitionsFullMessage partsMsg; + @Order(5) + GridDhtPartitionsFullMessage partsMsg; /** If this flag is {@code true} then this message should lead to partition map exchnage. */ - private boolean exchangeNeeded; + @Order(6) + boolean exchangeNeeded; /** * This flag indicates that this message should not be passed to other nodes except the coordinator. @@ -68,7 +77,10 @@ public class CacheAffinityChangeMessage implements DiscoveryCustomMessage { * This flag is used when discovery SPI does not support mutable custom messages. * See {@link DiscoverySpiMutableCustomMessageSupport}. */ - private transient boolean stopProc; + private boolean stopProc; + + /** */ + public CacheAffinityChangeMessage() {} /** * Constructor used when message is created after cache rebalance finished. @@ -77,6 +89,7 @@ public class CacheAffinityChangeMessage implements DiscoveryCustomMessage { * @param cacheDeploymentIds Cache deployment ID. */ public CacheAffinityChangeMessage(AffinityTopologyVersion topVer, Map cacheDeploymentIds) { + id = IgniteUuid.randomUuid(); this.topVer = topVer; this.cacheDeploymentIds = cacheDeploymentIds; } @@ -92,6 +105,7 @@ public CacheAffinityChangeMessage( GridDhtPartitionExchangeId exchId, GridDhtPartitionsFullMessage partsMsg, Map>> assignmentChange) { + id = IgniteUuid.randomUuid(); this.exchId = exchId; this.partsMsg = partsMsg; this.assignmentChange = assignmentChange; @@ -194,6 +208,11 @@ public void stopProcess(boolean stopProc) { return discoCache.copy(topVer, null); } + /** {@inheritDoc} */ + @Override public short directType() { + return 515; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheAffinityChangeMessage.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageArrayType.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageArrayType.java new file mode 100644 index 0000000000000..1fa3fe3d21619 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageArrayType.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.plugin.extensions.communication; + +/** */ +public class MessageArrayType implements MessageType { + /** */ + private final MessageType valType; + + /** */ + private final Class clazz; + + /** + * @param valType Value type. + * @param clazz Class. + */ + public MessageArrayType(MessageType valType, Class clazz) { + this.valType = valType; + this.clazz = clazz; + } + + /** @return Value type. */ + public MessageType valueType() { + return valType; + } + + /** {@inheritDoc} */ + @Override public MessageCollectionItemType type() { + return MessageCollectionItemType.ARRAY; + } + + /** @return Class. */ + public Class clazz() { + return clazz; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java index b994809b77d23..b28e67297ce16 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java @@ -95,8 +95,17 @@ public enum MessageCollectionItemType { /** Cache object. */ CACHE_OBJECT, - /** GridLongList */ - GRID_LONG_LIST; + /** GridLongList. */ + GRID_LONG_LIST, + + /** Map. */ + MAP, + + /** Collection. */ + COLLECTION, + + /** Array. */ + ARRAY; /** Enum values. */ private static final MessageCollectionItemType[] VALS = values(); diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionType.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionType.java new file mode 100644 index 0000000000000..f6d982de72a87 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionType.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.plugin.extensions.communication; + +/** */ +public class MessageCollectionType implements MessageType { + /** */ + private final MessageType valType; + + /** */ + private final boolean set; + + /** + * @param valType Value type. + * @param set Is set. + */ + public MessageCollectionType(MessageType valType, boolean set) { + this.valType = valType; + this.set = set; + } + + /** @return Value type. */ + public MessageType valueType() { + return valType; + } + + /** {@inheritDoc} */ + @Override public MessageCollectionItemType type() { + return MessageCollectionItemType.COLLECTION; + } + + /** @return Is set. */ + public boolean set() { + return set; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageItemType.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageItemType.java new file mode 100644 index 0000000000000..77664ebba9559 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageItemType.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.plugin.extensions.communication; + +/** */ +public class MessageItemType implements MessageType { + /** */ + private final MessageCollectionItemType type; + + /** + @param type Type. + */ + public MessageItemType(MessageCollectionItemType type) { + this.type = type; + } + + /** {@inheritDoc} */ + @Override public MessageCollectionItemType type() { + return type; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageMapType.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageMapType.java new file mode 100644 index 0000000000000..978a101e60b39 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageMapType.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.plugin.extensions.communication; + +/** */ +public class MessageMapType implements MessageType { + /** */ + private final MessageType keyType; + + /** */ + private final MessageType valType; + + /** */ + private final boolean linked; + + /** + * @param keyType Key type. + * @param valType Value type. + * @param linked Is linked hash map. + */ + public MessageMapType(MessageType keyType, MessageType valType, boolean linked) { + this.keyType = keyType; + this.valType = valType; + this.linked = linked; + } + + /** @return Key type. */ + public MessageType keyType() { + return keyType; + } + + /** @return Value type. */ + public MessageType valueType() { + return valType; + } + + /** {@inheritDoc} */ + @Override public MessageCollectionItemType type() { + return MessageCollectionItemType.MAP; + } + + /** @return Is linked hash map. */ + public boolean linked() { + return linked; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java index eda96716facc3..48cf3355ce6c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java @@ -20,9 +20,7 @@ import java.nio.ByteBuffer; import java.util.BitSet; import java.util.Collection; -import java.util.LinkedHashMap; import java.util.Map; -import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -236,59 +234,41 @@ public default T readMessage() { /** * Reads array of objects. * - * @param itemType Array component type. - * @param itemCls Array component class. + * @param type Array component type. * @param Type of the read object. * @return Array of objects. */ - public T[] readObjectArray(MessageCollectionItemType itemType, Class itemCls); + public T[] readObjectArray(MessageArrayType type); /** * Reads any collection. * - * @param itemType Collection item type. + * @param type Collection item type. * @param Type of the read collection. * @return Collection. */ - public > C readCollection(MessageCollectionItemType itemType); - - /** - * Reads any collection and provides it as a set. - * - * @param itemType Set item type. - * @param Type of the read set. - * @return Set. - */ - public > S readSet(MessageCollectionItemType itemType); + public > C readCollection(MessageCollectionType type); /** * Reads map. * - * @param keyType Map key type. - * @param valType Map value type. - * @param linked Whether {@link LinkedHashMap} should be created. + * @param type Map type. * @param Type of the read map. * @return Map. */ - // TODO: IGNITE-26329 — switch to the new readMap method without the linked flag parameter - public default > M readMap(MessageCollectionItemType keyType, - MessageCollectionItemType valType, boolean linked) { - return readMap(keyType, valType, linked, false); + public default > M readMap(MessageMapType type) { + return readMap(type, false); } /** * Reads map. * - * @param keyType Map key type. - * @param valType Map value type. - * @param linked Whether {@link LinkedHashMap} should be created. + * @param type Map type. * @param compress Whether map should be compressed. * @param Type of the read map. * @return Map. */ - // TODO: IGNITE-26329 — switch to the new readMap method without the linked flag parameter - public > M readMap(MessageCollectionItemType keyType, - MessageCollectionItemType valType, boolean linked, boolean compress); + public > M readMap(MessageMapType type, boolean compress); /** * Tells whether last invocation of any of {@code readXXX(...)} diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageType.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageType.java new file mode 100644 index 0000000000000..c9f241f6e384a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.plugin.extensions.communication; + +/** */ +public interface MessageType { + /** + * @return Type. + */ + public MessageCollectionItemType type(); +} diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java index e073ca8313423..a189a195b8e7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java @@ -21,7 +21,6 @@ import java.util.BitSet; import java.util.Collection; import java.util.Map; -import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -288,60 +287,46 @@ public default boolean writeMessage(Message val) { * Writes array of objects. * * @param arr Array of objects. - * @param itemType Array component type. + * @param type Array component type. * @param Type of the objects that array contains. * @return Whether array was fully written. */ - public boolean writeObjectArray(T[] arr, MessageCollectionItemType itemType); + public boolean writeObjectArray(T[] arr, MessageArrayType type); /** * Writes collection with its elements order. * * @param col Collection. - * @param itemType Collection item type. + * @param type Collection item type. * @param Type of the objects that collection contains. * @return Whether value was fully written. */ - public boolean writeCollection(Collection col, MessageCollectionItemType itemType); - - /** - * Writes set with its elements order. - * - * @param set Set. - * @param itemType Set item type. - * @param Type of the objects that set contains. - * @return Whether value was fully written. - */ - public boolean writeSet(Set set, MessageCollectionItemType itemType); + public boolean writeCollection(Collection col, MessageCollectionType type); /** * Writes map. * * @param map Map. - * @param keyType Map key type. - * @param valType Map value type. + * @param type Map type. * @param Initial key types of the map to write. * @param Initial value types of the map to write. * @return Whether value was fully written. */ - public default boolean writeMap(Map map, MessageCollectionItemType keyType, - MessageCollectionItemType valType) { - return writeMap(map, keyType, valType, false); + public default boolean writeMap(Map map, MessageMapType type) { + return writeMap(map, type, false); } /** * Writes map. * * @param map Map. - * @param keyType Map key type. - * @param valType Map value type. + * @param type Map type. * @param compress Whether map should be compressed. * @param Initial key types of the map to write. * @param Initial value types of the map to write. * @return Whether value was fully written. */ - public boolean writeMap(Map map, MessageCollectionItemType keyType, - MessageCollectionItemType valType, boolean compress); + public boolean writeMap(Map map, MessageMapType type, boolean compress); /** * @return Whether header of current message is already written. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java index 2f28cfc4146b0..1104b04e2176b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java @@ -20,6 +20,8 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -29,9 +31,11 @@ import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageArrayType; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionType; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; +import org.apache.ignite.plugin.extensions.communication.MessageMapType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage; @@ -291,24 +295,18 @@ private boolean writeField(Class type) { } /** {@inheritDoc} */ - @Override public boolean writeObjectArray(T[] arr, MessageCollectionItemType itemType) { + @Override public boolean writeObjectArray(T[] arr, MessageArrayType type) { return writeField(Object[].class); } /** {@inheritDoc} */ - @Override public boolean writeCollection(Collection col, MessageCollectionItemType itemType) { - return writeField(Collection.class); + @Override public boolean writeCollection(Collection col, MessageCollectionType type) { + return writeField(type.set() ? Set.class : Collection.class); } /** {@inheritDoc} */ - @Override public boolean writeSet(Set set, MessageCollectionItemType itemType) { - return writeField(Set.class); - } - - /** {@inheritDoc} */ - @Override public boolean writeMap(Map map, MessageCollectionItemType keyType, - MessageCollectionItemType valType, boolean compress) { - return writeField(Map.class); + @Override public boolean writeMap(Map map, MessageMapType type, boolean compress) { + return writeField(type.linked() ? LinkedHashMap.class : HashMap.class); } /** {@inheritDoc} */ @@ -552,30 +550,22 @@ private void readField(Class type) { } /** {@inheritDoc} */ - @Override public T[] readObjectArray(MessageCollectionItemType itemType, Class itemCls) { + @Override public T[] readObjectArray(MessageArrayType type) { readField(Object[].class); return null; } /** {@inheritDoc} */ - @Override public > C readCollection(MessageCollectionItemType itemType) { - readField(Collection.class); + @Override public > C readCollection(MessageCollectionType type) { + readField(type.set() ? Set.class : Collection.class); return null; } - - /** {@inheritDoc} */ - @Override public > S readSet(MessageCollectionItemType itemType) { - readField(Set.class); - - return null; - } - + /** {@inheritDoc} */ - @Override public > M readMap(MessageCollectionItemType keyType, - MessageCollectionItemType valType, boolean linked, boolean compress) { - readField(Map.class); + @Override public > M readMap(MessageMapType type, boolean compress) { + readField(type.linked() ? LinkedHashMap.class : HashMap.class); return null; } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java index 3ad36b8f8f3ee..6acfc4c101ca5 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java @@ -42,8 +42,10 @@ import org.apache.ignite.plugin.PluginContext; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionType; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; +import org.apache.ignite.plugin.extensions.communication.MessageItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -268,7 +270,8 @@ public Collection entries() { switch (writer.state()) { case 3: - if (!writer.writeCollection(entries, MessageCollectionItemType.MSG)) + if (!writer.writeCollection(entries, + new MessageCollectionType(new MessageItemType(MessageCollectionItemType.MSG), false))) return false; writer.incrementState(); @@ -287,7 +290,7 @@ public Collection entries() { switch (reader.state()) { case 3: - entries = reader.readCollection(MessageCollectionItemType.MSG); + entries = reader.readCollection(new MessageCollectionType(new MessageItemType(MessageCollectionItemType.MSG), false)); if (!reader.isLastRead()) return false; diff --git a/modules/core/src/test/resources/codegen/TestCollectionsMessageSerializer.java b/modules/core/src/test/resources/codegen/TestCollectionsMessageSerializer.java index 2f4f365cc830a..d27220d6b25fe 100644 --- a/modules/core/src/test/resources/codegen/TestCollectionsMessageSerializer.java +++ b/modules/core/src/test/resources/codegen/TestCollectionsMessageSerializer.java @@ -19,6 +19,8 @@ import org.apache.ignite.internal.TestCollectionsMessage; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionType; +import org.apache.ignite.plugin.extensions.communication.MessageItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageSerializer; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -40,151 +42,151 @@ public class TestCollectionsMessageSerializer implements MessageSerializer gridLongListIntegerMap; + @Order(24) + Map>> gridlistDoubleMapUuidMap; + public short directType() { return 0; } diff --git a/modules/core/src/test/resources/codegen/TestMapMessageSerializer.java b/modules/core/src/test/resources/codegen/TestMapMessageSerializer.java index 42d0773b7e224..afa07ce5787f2 100644 --- a/modules/core/src/test/resources/codegen/TestMapMessageSerializer.java +++ b/modules/core/src/test/resources/codegen/TestMapMessageSerializer.java @@ -19,6 +19,9 @@ import org.apache.ignite.internal.TestMapMessage; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionType; +import org.apache.ignite.plugin.extensions.communication.MessageItemType; +import org.apache.ignite.plugin.extensions.communication.MessageMapType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageSerializer; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -40,145 +43,151 @@ public class TestMapMessageSerializer implements MessageSerializer { writer.incrementState(); case 3: - if (!writer.writeObjectArray(msg.strArr, MessageCollectionItemType.STRING)) + if (!writer.writeObjectArray(msg.strArr, new MessageArrayType(new MessageItemType(MessageCollectionItemType.STRING), String.class))) return false; writer.incrementState(); case 4: - if (!writer.writeObjectArray(msg.intMatrix, MessageCollectionItemType.INT_ARR)) + if (!writer.writeObjectArray(msg.intMatrix, new MessageArrayType(new MessageItemType(MessageCollectionItemType.INT_ARR), int[].class))) return false; writer.incrementState(); @@ -77,7 +79,7 @@ public class TestMessageSerializer implements MessageSerializer { writer.incrementState(); case 6: - if (!writer.writeObjectArray(msg.verArr, MessageCollectionItemType.MSG)) + if (!writer.writeObjectArray(msg.verArr, new MessageArrayType(new MessageItemType(MessageCollectionItemType.MSG), GridCacheVersion.class))) return false; writer.incrementState(); @@ -162,7 +164,7 @@ public class TestMessageSerializer implements MessageSerializer { reader.incrementState(); case 3: - msg.strArr = reader.readObjectArray(MessageCollectionItemType.STRING, String.class); + msg.strArr = reader.readObjectArray(new MessageArrayType(new MessageItemType(MessageCollectionItemType.STRING), String.class)); if (!reader.isLastRead()) return false; @@ -170,7 +172,7 @@ public class TestMessageSerializer implements MessageSerializer { reader.incrementState(); case 4: - msg.intMatrix = reader.readObjectArray(MessageCollectionItemType.INT, int[].class); + msg.intMatrix = reader.readObjectArray(new MessageArrayType(new MessageItemType(MessageCollectionItemType.INT_ARR), int[].class)); if (!reader.isLastRead()) return false; @@ -186,7 +188,7 @@ public class TestMessageSerializer implements MessageSerializer { reader.incrementState(); case 6: - msg.verArr = reader.readObjectArray(MessageCollectionItemType.MSG, GridCacheVersion.class); + msg.verArr = reader.readObjectArray(new MessageArrayType(new MessageItemType(MessageCollectionItemType.MSG), GridCacheVersion.class)); if (!reader.isLastRead()) return false;