diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index d188ac507048a..ff20a7f6feb92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -21,6 +21,10 @@ import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacketSerializer; +import org.apache.ignite.spi.discovery.tcp.messages.ClusterNodeCollectionMessage; +import org.apache.ignite.spi.discovery.tcp.messages.ClusterNodeCollectionMessageSerializer; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage; @@ -43,6 +47,8 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequestSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponseSerializer; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDiscardMessage; @@ -59,12 +65,16 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessageSerializer; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessageSerializer; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; @@ -80,6 +90,10 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { /** {@inheritDoc} */ @Override public void registerAll(MessageFactory factory) { + factory.register((short)-111, TcpDiscoveryCollectionMessage::new, new TcpDiscoveryCollectionMessageSerializer()); + factory.register((short)-110, ClusterNodeCollectionMessage::new, new ClusterNodeCollectionMessageSerializer()); + factory.register((short)-109, TcpDiscoveryNodeMessage::new, new TcpDiscoveryNodeMessageSerializer()); + factory.register((short)-108, IgniteProductVersionMessage::new, new IgniteProductVersionMessageSerializer()); factory.register((short)-107, NodeSpecificData::new, new NodeSpecificDataSerializer()); factory.register((short)-106, DiscoveryDataPacket::new, new DiscoveryDataPacketSerializer()); factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new, @@ -110,5 +124,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)17, TcpDiscoveryNodeFailedMessage::new, new TcpDiscoveryNodeFailedMessageSerializer()); factory.register((short)18, TcpDiscoveryStatusCheckMessage::new, new TcpDiscoveryStatusCheckMessageSerializer()); factory.register((short)19, TcpDiscoveryNodeAddFinishedMessage::new, new TcpDiscoveryNodeAddFinishedMessageSerializer()); + factory.register((short)20, TcpDiscoveryNodeAddedMessage::new, new TcpDiscoveryNodeAddedMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusMetaIO.java index edcfefc4f8890..7ab88b3809182 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusMetaIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusMetaIO.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.util.GridStringBuilder; import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage; /** * IO routines for B+Tree meta pages. @@ -303,7 +304,7 @@ public IgniteProductVersion createdVersion(long pageAddr) { PageUtils.getByte(pageAddr, CREATED_VER_OFFSET + 1), PageUtils.getByte(pageAddr, CREATED_VER_OFFSET + 2), PageUtils.getLong(pageAddr, CREATED_VER_OFFSET + 3), - PageUtils.getBytes(pageAddr, CREATED_VER_OFFSET + 11, IgniteProductVersion.REV_HASH_SIZE)); + PageUtils.getBytes(pageAddr, CREATED_VER_OFFSET + 11, IgniteProductVersionMessage.REV_HASH_SIZE)); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java index de920fddadf31..5fb2d97a02d13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java @@ -120,6 +120,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage; import org.apache.ignite.spi.encryption.EncryptionSpi; import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi; import org.jetbrains.annotations.Nullable; @@ -850,7 +851,7 @@ WALRecord readPlainRecord(RecordType type, ByteBufferBackedDataInput in, long flags = in.readLong(); - byte[] revHash = new byte[IgniteProductVersion.REV_HASH_SIZE]; + byte[] revHash = new byte[IgniteProductVersionMessage.REV_HASH_SIZE]; byte maj = in.readByte(); byte min = in.readByte(); byte maint = in.readByte(); diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java index 1c78694550bd5..bca9354027f8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java @@ -27,6 +27,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteVersionUtils; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage; import org.jetbrains.annotations.NotNull; /** @@ -41,39 +42,26 @@ public class IgniteProductVersion implements Comparable, E /** */ private static final long serialVersionUID = 0L; - /** Size of the {@link #revHash }*/ - public static final int REV_HASH_SIZE = 20; - /** Size in bytes of serialized: 3 bytes (maj, min, maintenance version), 8 bytes - timestamp */ - public static final int SIZE_IN_BYTES = 3 + 8 + REV_HASH_SIZE; + public static final int SIZE_IN_BYTES = 3 + 8 + IgniteProductVersionMessage.REV_HASH_SIZE; /** Regexp parse pattern. */ private static final Pattern VER_PATTERN = Pattern.compile("(\\d+)\\.(\\d+)\\.(\\d+)([-.]([^0123456789][^-]+)(-SNAPSHOT)?)?(-(\\d+))?(-([\\da-f]+))?"); - /** Major version number. */ - private byte major; - - /** Minor version number. */ - private byte minor; - - /** Maintenance version number. */ - private byte maintenance; - - /** Stage of development. */ - private String stage; - - /** Revision timestamp. */ - private long revTs; - - /** Revision hash. */ - private byte[] revHash; + /** The values holding message. */ + private final IgniteProductVersionMessage productVerMsg; /** * Empty constructor required by {@link Externalizable}. */ public IgniteProductVersion() { - // No-op. + productVerMsg = new IgniteProductVersionMessage(); + } + + /** @param productVerMsg Product version message. */ + public IgniteProductVersion(IgniteProductVersionMessage productVerMsg) { + this.productVerMsg = productVerMsg; } /** @@ -96,17 +84,17 @@ public IgniteProductVersion(byte major, byte minor, byte maintenance, long revTs * @param revHash Revision hash. */ public IgniteProductVersion(byte major, byte minor, byte maintenance, String stage, long revTs, byte[] revHash) { - if (revHash != null && revHash.length != REV_HASH_SIZE) { + productVerMsg = new IgniteProductVersionMessage(major, minor, maintenance, stage, revTs, revHash); + + if (revHash != null && revHash.length != IgniteProductVersionMessage.REV_HASH_SIZE) { throw new IllegalArgumentException("Invalid length for SHA1 hash (must be " - + REV_HASH_SIZE + "): " + revHash.length); + + IgniteProductVersionMessage.REV_HASH_SIZE + "): " + revHash.length); } + } - this.major = major; - this.minor = minor; - this.maintenance = maintenance; - this.stage = stage; - this.revTs = revTs; - this.revHash = revHash != null ? revHash : new byte[REV_HASH_SIZE]; + /** @return {@link IgniteProductVersionMessage}. */ + public IgniteProductVersionMessage message() { + return productVerMsg; } /** @@ -115,7 +103,7 @@ public IgniteProductVersion(byte major, byte minor, byte maintenance, String sta * @return Major version number. */ public byte major() { - return major; + return productVerMsg.major(); } /** @@ -124,7 +112,7 @@ public byte major() { * @return Minor version number. */ public byte minor() { - return minor; + return productVerMsg.minor(); } /** @@ -133,14 +121,14 @@ public byte minor() { * @return Maintenance version number. */ public byte maintenance() { - return maintenance; + return productVerMsg.maintenance(); } /** * @return Stage of development. */ public String stage() { - return stage; + return productVerMsg.stage(); } /** @@ -149,7 +137,7 @@ public String stage() { * @return Revision timestamp. */ public long revisionTimestamp() { - return revTs; + return productVerMsg.revisionTimestamp(); } /** @@ -158,7 +146,7 @@ public long revisionTimestamp() { * @return Revision hash. */ public byte[] revisionHash() { - return revHash; + return productVerMsg.revisionHash(); } /** @@ -167,7 +155,7 @@ public byte[] revisionHash() { * @return Release date. */ public Date releaseDate() { - return new Date(revTs * 1000); + return new Date(revisionTimestamp() * 1000); } /** @@ -178,31 +166,31 @@ public Date releaseDate() { */ public boolean greaterThanEqual(int major, int minor, int maintenance) { // NOTE: Unknown version is less than any other version. - if (major == this.major) - return minor == this.minor ? this.maintenance >= maintenance : this.minor > minor; + if (major == major()) + return minor == minor() ? maintenance() >= maintenance : minor() > minor; else - return this.major > major; + return major() > major; } /** {@inheritDoc} */ @Override public int compareTo(@NotNull IgniteProductVersion o) { // NOTE: Unknown version is less than any other version. - int res = Integer.compare(major, o.major); + int res = Integer.compare(major(), o.major()); if (res != 0) return res; - res = Integer.compare(minor, o.minor); + res = Integer.compare(minor(), o.minor()); if (res != 0) return res; - res = Integer.compare(maintenance, o.maintenance); + res = Integer.compare(maintenance(), o.maintenance()); if (res != 0) return res; - return Long.compare(revTs, o.revTs); + return Long.compare(revisionTimestamp(), o.revisionTimestamp()); } /** @@ -210,17 +198,17 @@ public boolean greaterThanEqual(int major, int minor, int maintenance) { * @return Compare result. */ public int compareToIgnoreTimestamp(@NotNull IgniteProductVersion o) { - int res = Integer.compare(major, o.major); + int res = Integer.compare(major(), o.major()); if (res != 0) return res; - res = Integer.compare(minor, o.minor); + res = Integer.compare(minor(), o.minor()); if (res != 0) return res; - return Integer.compare(maintenance, o.maintenance); + return Integer.compare(maintenance(), o.maintenance()); } /** {@inheritDoc} */ @@ -233,47 +221,50 @@ public int compareToIgnoreTimestamp(@NotNull IgniteProductVersion o) { IgniteProductVersion that = (IgniteProductVersion)o; - return revTs == that.revTs && maintenance == that.maintenance && minor == that.minor && major == that.major; + return revisionTimestamp() == that.revisionTimestamp() && maintenance() == that.maintenance() + && minor() == that.minor() && major() == that.major(); } /** {@inheritDoc} */ @Override public int hashCode() { - int res = major; + int res = major(); - res = 31 * res + minor; - res = 31 * res + maintenance; - res = 31 * res + (int)(revTs ^ (revTs >>> 32)); + res = 31 * res + minor(); + res = 31 * res + maintenance(); + res = 31 * res + (int)(revisionTimestamp() ^ (revisionTimestamp() >>> 32)); return res; } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeByte(major); - out.writeByte(minor); - out.writeByte(maintenance); - out.writeLong(revTs); - U.writeByteArray(out, revHash); + out.writeByte(major()); + out.writeByte(minor()); + out.writeByte(maintenance()); + out.writeLong(revisionTimestamp()); + U.writeByteArray(out, revisionHash()); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - major = in.readByte(); - minor = in.readByte(); - maintenance = in.readByte(); - revTs = in.readLong(); - revHash = U.readByteArray(in); + assert productVerMsg != null; + + productVerMsg.major(in.readByte()); + productVerMsg.minor(in.readByte()); + productVerMsg.maintenance(in.readByte()); + productVerMsg.revisionTimestamp(in.readLong()); + productVerMsg.revisionHash(U.readByteArray(in)); } /** {@inheritDoc} */ @Override public String toString() { - String revTsStr = IgniteVersionUtils.formatBuildTimeStamp(revTs * 1000); + String revTsStr = IgniteVersionUtils.formatBuildTimeStamp(revisionTimestamp() * 1000); - String hash = U.byteArray2HexString(revHash).toLowerCase(); + String hash = U.byteArray2HexString(revisionHash()).toLowerCase(); hash = hash.length() > 8 ? hash.substring(0, 8) : hash; - return major + "." + minor + "." + maintenance + "#" + revTsStr + "-sha1:" + hash; + return major() + "." + minor() + "." + maintenance() + "#" + revTsStr + "-sha1:" + hash; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 2e96845ff0332..27e6fc57e679c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -2160,8 +2160,12 @@ protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) { if (msg instanceof TraceableMessage) tracing.messages().beforeSend((TraceableMessage)msg); - if (msg instanceof TcpDiscoveryNodeAddedMessage) + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + ((TcpDiscoveryNodeAddedMessage)msg).finishUnmarshal(spi.marshaller(), + U.resolveClassLoader(spi.ignite().configuration())); + processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); + } else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg); else if (msg instanceof TcpDiscoveryNodeLeftMessage) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index e6d3052e902a2..827cf4751189e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -113,6 +113,7 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.security.SecurityCredentials; import org.apache.ignite.spi.IgniteNodeValidationResult; import org.apache.ignite.spi.IgniteSpiContext; @@ -262,7 +263,7 @@ class ServerImpl extends TcpDiscoveryImpl { /** Metric for max message queue size. */ private MaxValueMetric maxMsgQueueSizeMetric; - /** Failed nodes (but still in topology). */ + /** Failed nodes (but still in topology): Node -> Id of the failure issuer node. */ private final Map failedNodes = new HashMap<>(); /** */ @@ -1845,6 +1846,7 @@ private void printStatistics() { } /** + * Calls {@link TcpDiscoveryNodeAddedMessage#prepareMarshal(Marshaller)}. * @param msg Message to prepare. * @param destNodeId Destination node ID. * @param msgs Messages to include. @@ -1898,6 +1900,8 @@ private void prepareNodeAddedMessage( } nodeAddedMsg.topologyHistory(hist); + + nodeAddedMsg.prepareMarshal(spi.marshaller()); } } } @@ -2483,6 +2487,8 @@ void add(TcpDiscoveryAbstractMessage msg) { // Do not need this data for client reconnect. if (addedMsg.gridDiscoveryData() != null) addedMsg.clearDiscoveryData(); + + addedMsg.prepareMarshal(spi.marshaller()); } else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg; @@ -2645,6 +2651,8 @@ private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUI msg0.topology(addedMsg.clientTopology()); + msg0.prepareMarshal(spi.marshaller()); + return msg0; } } @@ -3121,8 +3129,12 @@ protected void runTasks() { if (locNode.internalOrder() == 0) { boolean proc = false; - if (msg instanceof TcpDiscoveryNodeAddedMessage) + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + ((TcpDiscoveryNodeAddedMessage)msg).finishUnmarshal(spi.marshaller(), + U.resolveClassLoader(spi.ignite().configuration())); + proc = ((TcpDiscoveryNodeAddedMessage)msg).node().equals(locNode); + } if (!proc) { if (log.isDebugEnabled()) { @@ -3152,8 +3164,12 @@ else if (msg instanceof TcpDiscoveryClientReconnectMessage) { sendMessageAcrossRing(msg); } - else if (msg instanceof TcpDiscoveryNodeAddedMessage) + else if (msg instanceof TcpDiscoveryNodeAddedMessage) { + ((TcpDiscoveryNodeAddedMessage)msg).finishUnmarshal(spi.marshaller(), + U.resolveClassLoader(spi.ignite().configuration())); + processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); + } else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg); @@ -4891,8 +4907,11 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) { // Local node already has node from message in local topology. // Just pass it to coordinator via the ring. - if (sendMessageToRemotes(msg)) + if (sendMessageToRemotes(msg)) { + msg.prepareMarshal(spi.marshaller()); + sendMessageAcrossRing(msg); + } if (log.isDebugEnabled()) { log.debug("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " + @@ -5115,8 +5134,11 @@ else if (spiState == CONNECTING) processMessageFailedNodes(msg); } - if (sendMessageToRemotes(msg)) + if (sendMessageToRemotes(msg)) { + msg.prepareMarshal(spi.marshaller()); + sendMessageAcrossRing(msg); + } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java index e6303ee4f0d7b..f9b54349f2cd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java @@ -500,6 +500,7 @@ public void clear() { */ @Nullable public TcpDiscoveryNode nextNode(@Nullable Collection excluded) { assert locNode.internalOrder() > 0 : locNode; + // TODO: May fire, https://issues.apache.org/jira/browse/IGNITE-27933 assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode) : excluded; rwLock.readLock().lock(); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeCollectionMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeCollectionMessage.java new file mode 100644 index 0000000000000..aed660ba84b3d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/ClusterNodeCollectionMessage.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.util.Collection; +import java.util.stream.Collectors; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; + +/** A container message for a collection of {@link TcpDiscoveryNodeMessage}. */ +public class ClusterNodeCollectionMessage implements TcpDiscoveryMarshallableMessage { + /** The collection of wrapped {@link TcpDiscoveryNodeMessage}. */ + @Order(value = 0, method = "clusterNodeMessages") + private Collection clusterNodeMsgs; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public ClusterNodeCollectionMessage() { + // No-op. + } + + /** @param clusterNodeMsgs Holder messages of {@link ClusterNode}. */ + public ClusterNodeCollectionMessage(Collection clusterNodeMsgs) { + this.clusterNodeMsgs = clusterNodeMsgs; + } + + /** + * @param clusterNodes Tcp discovery nodes. + * @return {@link ClusterNodeCollectionMessage} + */ + public static ClusterNodeCollectionMessage of(Collection clusterNodes) { + return new ClusterNodeCollectionMessage(clusterNodes.stream().map(TcpDiscoveryNodeMessage::new).collect(Collectors.toList())); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + clusterNodeMsgs.forEach(m -> finishUnmarshal(marsh, clsLdr)); + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) { + clusterNodeMsgs.forEach(m -> prepareMarshal(marsh)); + } + + /** @return Holder messages of {@link ClusterNode}. */ + public Collection clusterNodeMessages() { + return clusterNodeMsgs; + } + + /** @return Collection of {@link ClusterNode}. */ + public Collection clusterNodes() { + return clusterNodeMsgs.stream().map(msg -> (ClusterNode)msg).collect(Collectors.toList()); + } + + /** @param clusterNodeMsgs Holder messages of {@link ClusterNode}. */ + public void clusterNodeMessages(Collection clusterNodeMsgs) { + this.clusterNodeMsgs = clusterNodeMsgs; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -110; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/IgniteProductVersionMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/IgniteProductVersionMessage.java new file mode 100644 index 0000000000000..0ef5b13290c2c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/IgniteProductVersionMessage.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** Message for {@link IgniteProductVersion}.*/ +public class IgniteProductVersionMessage implements Message { + /** Size of the {@link #revHash }*/ + public static final int REV_HASH_SIZE = 20; + + /** Major version number. */ + @Order(value = 0, method = "major") + private byte major; + + /** Minor version number. */ + @Order(value = 1, method = "minor") + private byte minor; + + /** Maintenance version number. */ + @Order(value = 2, method = "maintenance") + private byte maintenance; + + /** Stage of development. */ + @Order(value = 3, method = "stage") + private String stage; + + /** Revision timestamp. */ + @Order(value = 4, method = "revisionTimestamp") + private long revTs; + + /** Revision hash. */ + @Order(value = 5, method = "revisionHash") + private byte[] revHash; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public IgniteProductVersionMessage() { + // No-op. + } + + /** + * @param major Major version. + * @param minor Minor version. + * @param maintenance Maintenance. + * @param stage Stage. + * @param revTs Revision timestamp. + * @param revHash Revision hash. + */ + public IgniteProductVersionMessage(byte major, byte minor, byte maintenance, String stage, long revTs, byte[] revHash) { + this.major = major; + this.minor = minor; + this.maintenance = maintenance; + this.stage = stage; + this.revTs = revTs; + this.revHash = revHash != null ? revHash : new byte[REV_HASH_SIZE]; + } + + /** @param ver Product version. */ + public IgniteProductVersionMessage(IgniteProductVersion ver) { + this( + ver.major(), + ver.minor(), + ver.maintenance(), + ver.stage(), + ver.revisionTimestamp(), + ver.revisionHash() + ); + } + + /** @return Maintenance. */ + public byte maintenance() { + return maintenance; + } + + /** @param maintenance Maintenance. */ + public void maintenance(byte maintenance) { + this.maintenance = maintenance; + } + + /** @return Major version. */ + public byte major() { + return major; + } + + /** @param major Major version. */ + public void major(byte major) { + this.major = major; + } + + /** @return Minor version. */ + public byte minor() { + return minor; + } + + /** @param minor Minor version. */ + public void minor(byte minor) { + this.minor = minor; + } + + /** @return Revision hash. */ + public byte[] revisionHash() { + return revHash; + } + + /** @param revHash Revision hash. */ + public void revisionHash(byte[] revHash) { + this.revHash = revHash; + } + + /** @return Revision timestamp. */ + public long revisionTimestamp() { + return revTs; + } + + /** @param revTs Revision timestamp. */ + public void revisionTimestamp(long revTs) { + this.revTs = revTs; + } + + /** @return Statge. */ + public String stage() { + return stage; + } + + /** @param stage Stage. */ + public void stage(String stage) { + this.stage = stage; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -108; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCollectionMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCollectionMessage.java new file mode 100644 index 0000000000000..02c5b59564670 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCollectionMessage.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +/** + * TODO: Remove/revise after https://issues.apache.org/jira/browse/IGNITE-25883 + * Message to transfer a collection of {@link TcpDiscoveryAbstractMessage} with the original order. + * Several of them might be a {@link Message}, several may not and require the original marshalling. + */ +public class TcpDiscoveryCollectionMessage implements TcpDiscoveryMarshallableMessage { + /** {@link TcpDiscoveryAbstractMessage} pending messages which are a {@link Message}. */ + @Order(value = 0, method = "writableMessages") + @Nullable private Map writableMsgs; + + /** Marshallable or Java-serializable pending messages which are not a {@link Message}. */ + @Nullable private Map marshallableMsgs; + + /** Marshalled {@link #marshallableMsgs}. */ + @Order(value = 1, method = "marshallableMessagesBytes") + @GridToStringExclude + @Nullable private byte[] marshallableMsgsBytes; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryCollectionMessage() { + // No-op. + } + + /** @param msgs Discovery messages to hold. */ + public TcpDiscoveryCollectionMessage(Collection msgs) { + messages(msgs); + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) { + if (marshallableMsgs != null && marshallableMsgsBytes == null) { + try { + marshallableMsgsBytes = U.marshal(marsh, marshallableMsgs); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal marshallable pending messages.", e); + } + } + + if (F.isEmpty(writableMsgs)) + return; + + writableMsgs.values().forEach(m -> { + if (m instanceof TcpDiscoveryMarshallableMessage) + ((TcpDiscoveryMarshallableMessage)m).prepareMarshal(marsh); + }); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + if (marshallableMsgsBytes != null && marshallableMsgs == null) { + try { + marshallableMsgs = U.unmarshal(marsh, marshallableMsgsBytes, clsLdr); + + marshallableMsgsBytes = null; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal marshallable pending messages.", e); + } + } + + if (F.isEmpty(writableMsgs)) + return; + + writableMsgs.values().forEach(m -> { + if (m instanceof TcpDiscoveryMarshallableMessage) + ((TcpDiscoveryMarshallableMessage)m).finishUnmarshal(marsh, clsLdr); + }); + } + + /** + * @return Writable messages by their order. + * @see #messages() + */ + public @Nullable Map writableMessages() { + return writableMsgs; + } + + /** @param msgs Writable messages by their order. */ + public void writableMessages(@Nullable Map msgs) { + writableMsgs = msgs; + } + + /** @return Bytes of {@link #marshallableMsgs}. */ + public @Nullable byte[] marshallableMessagesBytes() { + return marshallableMsgsBytes; + } + + /** @param marshallableMsgsBytes Bytes of {@link #marshallableMsgs}. */ + public void marshallableMessagesBytes(@Nullable byte[] marshallableMsgsBytes) { + this.marshallableMsgsBytes = marshallableMsgsBytes; + } + + /** + * Gets pending messages sent to new node by its previous. + * + * @return Pending messages from previous node. + */ + public Collection messages() { + if (F.isEmpty(writableMsgs) && F.isEmpty(marshallableMsgs)) + return Collections.emptyList(); + + int totalSz = (F.isEmpty(writableMsgs) ? 0 : writableMsgs.size()) + + (F.isEmpty(marshallableMsgs) ? 0 : marshallableMsgs.size()); + + List res = new ArrayList<>(totalSz); + + for (int i = 0; i < totalSz; ++i) { + Message m = F.isEmpty(writableMsgs) ? null : writableMsgs.get(i); + + if (m == null) { + TcpDiscoveryAbstractMessage sm = marshallableMsgs.get(i); + + assert sm != null; + + res.add(sm); + } + else { + assert marshallableMsgs == null || marshallableMsgs.get(i) == null; + assert m instanceof TcpDiscoveryAbstractMessage; + + res.add((TcpDiscoveryAbstractMessage)m); + } + } + + return res; + } + + /** + * Sets pending messages to send to new node. + * + * @param msgs Pending messages to send to new node. + */ + public void messages(@Nullable Collection msgs) { + marshallableMsgsBytes = null; + + if (F.isEmpty(msgs)) { + marshallableMsgs = null; + writableMsgs = null; + + return; + } + + // Keeps the original message order. + int idx = 0; + + for (TcpDiscoveryAbstractMessage m : msgs) { + if (m instanceof Message) { + if (writableMsgs == null) + writableMsgs = U.newHashMap(msgs.size()); + + writableMsgs.put(idx++, (Message)m); + + continue; + } + + if (marshallableMsgs == null) + marshallableMsgs = U.newHashMap(msgs.size()); + + marshallableMsgs.put(idx++, m); + } + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -111; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryCollectionMessage.class, this, "super", super.toString()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java new file mode 100644 index 0000000000000..cf1854275c565 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Base class for TCP Discovery messages which still require external pre- and post- marshalling. + *
+ * TODO: Remove/revise after https://issues.apache.org/jira/browse/IGNITE-25883 + */ +public interface TcpDiscoveryMarshallableMessage extends Message { + /** + * Should be idempotent. + * + * @param marsh Marshaller. + */ + void prepareMarshal(Marshaller marsh); + + /** + * Should be idempotent. + * + * @param marsh Marshaller. + * @param clsLdr Class loader. + */ + void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr); +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index 36540d8b7dfc1..1e6db86b9c41a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -17,49 +17,69 @@ package org.apache.ignite.spi.discovery.tcp.messages; +import java.util.AbstractMap; import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.jetbrains.annotations.Nullable; /** + * TODO: Revise serialization of the {@link TcpDiscoveryNode} fields after https://issues.apache.org/jira/browse/IGNITE-27899 * Message telling nodes that new node should be added to topology. * When newly added node receives the message it connects to its next and finishes * join process. */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage { +public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage implements TcpDiscoveryMarshallableMessage { /** */ private static final long serialVersionUID = 0L; /** Added node. */ - private final TcpDiscoveryNode node; + @Order(6) + TcpDiscoveryNodeMessage nodeMsg; /** */ + @Order(value = 7, method = "gridDiscoveryData") private DiscoveryDataPacket dataPacket; - /** Pending messages from previous node. */ - private Collection msgs; + /** Message to hold collection of pending {@link TcpDiscoveryAbstractMessage}. */ + @Order(8) + TcpDiscoveryCollectionMessage msgs; /** Current topology. Initialized by coordinator. */ @GridToStringInclude - private Collection top; + @Order(9) + @Nullable ClusterNodeCollectionMessage top; /** */ @GridToStringInclude private transient Collection clientTop; /** Topology snapshots history. */ - private Map> topHist; + @Order(10) + @Nullable Map topHistMsgs; /** Start time of the first grid node. */ - private final long gridStartTime; + @Order(value = 11, method = "gridStartTime") + private long gridStartTime; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeAddedMessage() { + // No-op. + } /** * Constructor. @@ -79,7 +99,7 @@ public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId, assert node != null; assert gridStartTime > 0; - this.node = node; + nodeMsg = new TcpDiscoveryNodeMessage(node); this.dataPacket = dataPacket; this.gridStartTime = gridStartTime; } @@ -90,13 +110,13 @@ public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId, public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { super(msg); - this.node = msg.node; - this.msgs = msg.msgs; - this.top = msg.top; - this.clientTop = msg.clientTop; - this.topHist = msg.topHist; - this.dataPacket = msg.dataPacket; - this.gridStartTime = msg.gridStartTime; + nodeMsg = msg.nodeMsg; + top = msg.top; + msgs = msg.msgs; + clientTop = msg.clientTop; + topHistMsgs = msg.topHistMsgs; + dataPacket = msg.dataPacket; + gridStartTime = msg.gridStartTime; } /** @@ -105,7 +125,7 @@ public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { * @return New node. */ public TcpDiscoveryNode node() { - return node; + return new TcpDiscoveryNode(nodeMsg); } /** @@ -113,19 +133,17 @@ public TcpDiscoveryNode node() { * * @return Pending messages from previous node. */ - @Nullable public Collection messages() { - return msgs; + public Collection messages() { + return msgs == null ? Collections.emptyList() : msgs.messages(); } /** - * Sets pending messages to send to new node. + * Sets pending messages. * - * @param msgs Pending messages to send to new node. + * @param msgs Pending messages. */ - public void messages( - @Nullable Collection msgs - ) { - this.msgs = msgs; + public void messages(@Nullable Collection msgs) { + this.msgs = F.isEmpty(msgs) ? null : new TcpDiscoveryCollectionMessage(msgs); } /** @@ -134,7 +152,7 @@ public void messages( * @return Current topology. */ @Nullable public Collection topology() { - return top; + return top.clusterNodeMessages().stream().map(TcpDiscoveryNode::new).collect(Collectors.toList()); } /** @@ -143,7 +161,7 @@ public void messages( * @param top Current topology. */ public void topology(@Nullable Collection top) { - this.top = top; + this.top = top == null ? null : ClusterNodeCollectionMessage.of(top); } /** @@ -168,7 +186,12 @@ public Collection clientTopology() { * @return Map with topology snapshots history. */ public Map> topologyHistory() { - return topHist; + if (F.isEmpty(topHistMsgs)) + return Collections.emptyMap(); + + return topHistMsgs.entrySet().stream() + .map(e -> new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().clusterNodes())) + .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)); } /** @@ -177,7 +200,20 @@ public Map> topologyHistory() { * @param topHist Map with topology snapshots history. */ public void topologyHistory(@Nullable Map> topHist) { - this.topHist = topHist; + if (F.isEmpty(topHist)) { + topHistMsgs = null; + + return; + } + + topHistMsgs = U.newHashMap(topHist.size()); + + topHist.forEach((nodeId, clusterNodes) -> { + Collection clusterNodeImpls = clusterNodes.stream().map(TcpDiscoveryNodeMessage::new) + .collect(Collectors.toList()); + + topHistMsgs.put(nodeId, new ClusterNodeCollectionMessage(clusterNodeImpls)); + }); } /** @@ -187,6 +223,11 @@ public DiscoveryDataPacket gridDiscoveryData() { return dataPacket; } + /** @param dataPacket {@link DiscoveryDataPacket} carried by this message. */ + public void gridDiscoveryData(DiscoveryDataPacket dataPacket) { + this.dataPacket = dataPacket; + } + /** * Clears discovery data to minimize message size. */ @@ -210,6 +251,44 @@ public long gridStartTime() { return gridStartTime; } + /** @param gridStartTime First grid node start time. */ + public void gridStartTime(long gridStartTime) { + this.gridStartTime = gridStartTime; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) { + nodeMsg.prepareMarshal(marsh); + + if (msgs != null) + msgs.prepareMarshal(marsh); + + if (top != null) + top.prepareMarshal(marsh); + + if (!F.isEmpty(topHistMsgs)) + topHistMsgs.values().forEach(t -> t.prepareMarshal(marsh)); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + nodeMsg.finishUnmarshal(marsh, clsLdr); + + if (msgs != null) + msgs.finishUnmarshal(marsh, clsLdr); + + if (top != null) + top.finishUnmarshal(marsh, clsLdr); + + if (!F.isEmpty(topHistMsgs)) + topHistMsgs.values().forEach(t -> t.finishUnmarshal(marsh, clsLdr)); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 20; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryNodeAddedMessage.class, this, "super", super.toString()); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMessage.java new file mode 100644 index 0000000000000..fb8785e8002f2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMessage.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.ClusterMetricsSnapshot; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; +import org.jetbrains.annotations.Nullable; + +/** + * Message for {@link TcpDiscoveryNode} and for {@link org.apache.ignite.cluster.ClusterNode}. + */ +public class TcpDiscoveryNodeMessage implements TcpDiscoveryMarshallableMessage, ClusterNode { + /** */ + @Order(0) + UUID id; + + /** */ + Map attrs; + + /** */ + @Order(1) + byte[] attrsBytes; + + /** */ + @Order(2) + Collection addrs; + + /** */ + @Order(3) + Collection hostNames; + + /** Port */ + @Order(4) + int port; + + /** */ + @Order(5) + TcpDiscoveryNodeMetricsMessage metrics; + + /** */ + @Order(6) + long order; + + /** */ + @Order(7) + long intOrder; + + /** */ + @Order(8) + IgniteProductVersionMessage verMsg; + + /** */ + @Order(9) + UUID clientRouterNodeId; + + /** */ + @Nullable Object consistentId; + + /** */ + @Order(10) + @Nullable byte[] consistentIdBytes; + + /** */ + @Order(11) + boolean loc; + + /** */ + @Order(12) + boolean client; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeMessage() { + // No-op. + } + + /** @param tcpDiscoNode {@link TcpDiscoveryNode}. */ + public TcpDiscoveryNodeMessage(TcpDiscoveryNode tcpDiscoNode) { + id = tcpDiscoNode.id(); + attrs = tcpDiscoNode.attributes(); + addrs = tcpDiscoNode.addresses(); + hostNames = tcpDiscoNode.hostNames(); + port = tcpDiscoNode.discoveryPort(); + metrics = new TcpDiscoveryNodeMetricsMessage(tcpDiscoNode.metrics()); + order = tcpDiscoNode.order(); + verMsg = new IgniteProductVersionMessage(tcpDiscoNode.version()); + // Not a ClusterNode. + clientRouterNodeId = tcpDiscoNode.clientRouterNodeId(); + intOrder = tcpDiscoNode.internalOrder(); + } + + /** @param clusterNode {@link ClusterNode}. */ + public TcpDiscoveryNodeMessage(ClusterNode clusterNode) { + id = clusterNode.id(); + attrs = clusterNode.attributes(); + addrs = clusterNode.addresses(); + hostNames = clusterNode.hostNames(); + metrics = new TcpDiscoveryNodeMetricsMessage(clusterNode.metrics()); + order = clusterNode.order(); + verMsg = new IgniteProductVersionMessage(clusterNode.version()); + // Not transfered by TcpDiscoveryNode. + consistentId = clusterNode.consistentId(); + loc = clusterNode.isLocal(); + client = clusterNode.isClient(); + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) { + if (attrs != null && attrsBytes == null) { + try { + attrsBytes = U.marshal(marsh, attrs); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal cluster node attributes.", e); + } + } + + if (consistentId == null && consistentIdBytes == null) { + try { + consistentIdBytes = U.marshal(marsh, consistentId); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal consistent id.", e); + } + } + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + if (attrsBytes != null && attrs == null) { + try { + attrs = U.unmarshal(marsh, attrsBytes, clsLdr); + + attrsBytes = null; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal cluster node attributes.", e); + } + } + + if (consistentId != null && consistentIdBytes == null) { + try { + consistentId = U.unmarshal(marsh, consistentIdBytes, clsLdr); + + consistentIdBytes = null; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal consistent id.", e); + } + } + } + + /** {@inheritDoc} */ + @Override public UUID id() { + return id; + } + + /** {@inheritDoc} */ + @Override public @Nullable Object consistentId() { + return consistentId; + } + + /** {@inheritDoc} */ + @Override public @Nullable T attribute(String name) { + return (T)attributes().get(name); + } + + /** {@inheritDoc} */ + @Override public ClusterMetrics metrics() { + assert metrics != null; + + return new ClusterMetricsSnapshot(metrics); + } + + /** {@inheritDoc} */ + @Override public Map attributes() { + return attrs == null ? Collections.emptyMap() : attrs; + } + + /** {@inheritDoc} */ + @Override public Collection addresses() { + return addrs; + } + + /** {@inheritDoc} */ + @Override public Collection hostNames() { + return hostNames; + } + + /** {@inheritDoc} */ + @Override public long order() { + return order; + } + + /** @return Port. */ + public int port() { + return port; + } + + /** {@inheritDoc} */ + @Override public IgniteProductVersion version() { + return new IgniteProductVersion(verMsg); + } + + /** @return Message of {@link IgniteProductVersion}. */ + public IgniteProductVersionMessage versionMessage() { + return verMsg; + } + + /** {@inheritDoc} */ + @Override public boolean isLocal() { + return loc; + } + + /** {@inheritDoc} */ + @Override public boolean isClient() { + return client; + } + + /** @return Internal order. */ + public UUID clientRouterNodeId() { + return clientRouterNodeId; + } + + /** @return Internal order. */ + public long internalOrder() { + return intOrder; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -109; + } +}