Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.ignite.internal.direct.state.DirectMessageState;
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
Expand Down Expand Up @@ -59,10 +60,23 @@ public class DirectMessageReader implements MessageReader {
* @param msgFactory Message factory.
* @param cacheObjProc Cache object processor.
*/
public DirectMessageReader(final MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) {
state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() {
public DirectMessageReader(MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) {
this(msgFactory, cacheObjProc, null);
}

/**
* @param msgFactory Message factory.
* @param cacheObjProc Cache object processor.
* @param msgPostSerializer Optional message processor to call after the serialization.
*/
public DirectMessageReader(
final MessageFactory msgFactory,
IgniteCacheObjectProcessor cacheObjProc,
@Nullable Consumer<Message> msgPostSerializer
) {
state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<>() {
@Override public StateItem apply() {
return new StateItem(msgFactory, cacheObjProc);
return new StateItem(msgFactory, cacheObjProc, msgPostSerializer);
}
});
}
Expand Down Expand Up @@ -452,9 +466,14 @@ private static class StateItem implements DirectMessageStateItem {
/**
* @param msgFactory Message factory.
* @param cacheObjProc Cache object processor.
* @param msgPostReader Optional message post-reader.
*/
public StateItem(MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) {
stream = new DirectByteBufferStream(msgFactory, cacheObjProc);
public StateItem(
MessageFactory msgFactory,
IgniteCacheObjectProcessor cacheObjProc,
@Nullable Consumer<Message> msgPostReader
) {
stream = new DirectByteBufferStream(msgFactory, cacheObjProc, msgPostReader, null);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.ignite.internal.direct.state.DirectMessageState;
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
Expand Down Expand Up @@ -52,10 +53,18 @@ public class DirectMessageWriter implements MessageWriter {
private ByteBuffer buf;

/** */
public DirectMessageWriter(final MessageFactory msgFactory) {
state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() {
public DirectMessageWriter(MessageFactory msgFactory) {
this(msgFactory, null);
}

/**
* @param msgFactory Message factory.
* @param msgPreSerializer Optional message pre-writer.
*/
public DirectMessageWriter(MessageFactory msgFactory, @Nullable Consumer<Message> msgPreSerializer) {
state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<>() {
@Override public StateItem apply() {
return new StateItem(msgFactory);
return new StateItem(msgFactory, msgPreSerializer);
}
});
}
Expand Down Expand Up @@ -415,9 +424,12 @@ private static class StateItem implements DirectMessageStateItem {
/** */
private boolean hdrWritten;

/** */
public StateItem(MessageFactory msgFactory) {
stream = new DirectByteBufferStream(msgFactory);
/**
* @param msgFactory Message Factory.
* @param msgPreWriter Optional message pre-writer.
*/
public StateItem(MessageFactory msgFactory, @Nullable Consumer<Message> msgPreWriter) {
stream = new DirectByteBufferStream(msgFactory, null, null, msgPreWriter);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.RandomAccess;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
Expand Down Expand Up @@ -222,7 +223,13 @@ public class DirectByteBufferStream {

/** Is required to instantiate {@link CacheObject} while reading messages. */
@GridToStringExclude
private final IgniteCacheObjectProcessor cacheObjProc;
@Nullable private final IgniteCacheObjectProcessor cacheObjProc;

/** Optional message post-reader. */
@Nullable private final Consumer<Message> msgPostReader;

/** Optional message pre-writer. */
@Nullable private final Consumer<Message> msgPreWriter;

/** */
@GridToStringExclude
Expand Down Expand Up @@ -345,21 +352,27 @@ public class DirectByteBufferStream {
* @param msgFactory Message factory.
*/
public DirectByteBufferStream(MessageFactory msgFactory) {
this.msgFactory = msgFactory;

// Is not used while writing messages.
cacheObjProc = null;
this(msgFactory, null, null, null);
}

/**
* Constructror for stream used for reading messages.
*
* @param msgFactory Message factory.
* @param cacheObjProc Cache object processor.
*/
public DirectByteBufferStream(MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) {
* @param cacheObjProc Optional cache object processor.
* @param msgPostReader Optional message post-reader.
* @param msgPreWriter Optional message pre-writer.
*/
public DirectByteBufferStream(
MessageFactory msgFactory,
@Nullable IgniteCacheObjectProcessor cacheObjProc,
@Nullable Consumer<Message> msgPostReader,
@Nullable Consumer<Message> msgPreWriter
) {
this.msgFactory = msgFactory;
this.cacheObjProc = cacheObjProc;
this.msgPostReader = msgPostReader;
this.msgPreWriter = msgPreWriter;
}

/**
Expand Down Expand Up @@ -886,6 +899,10 @@ public void writeMessage(Message msg, MessageWriter writer) {
try {
writer.beforeInnerMessageWrite();

// Repeatable call. Current limitation.
if (msgPreWriter != null)
msgPreWriter.accept(msg);

lastFinished = msgFactory.serializer(msg.directType()).writeTo(msg, writer);
}
finally {
Expand Down Expand Up @@ -1553,6 +1570,9 @@ public <T extends Message> T readMessage(MessageReader reader) {
msgTypeDone = false;
msg = null;

if (msgPostReader != null)
msgPostReader.accept(msg0);

return (T)msg0;
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequestSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponseSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDiscardMessage;
Expand All @@ -59,6 +63,8 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
Expand All @@ -80,6 +86,7 @@
public class DiscoveryMessageFactory implements MessageFactoryProvider {
/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
factory.register((short)-108, TcpDiscoveryCollectionMessage::new, new TcpDiscoveryCollectionMessageSerializer());
factory.register((short)-107, NodeSpecificData::new, new NodeSpecificDataSerializer());
factory.register((short)-106, DiscoveryDataPacket::new, new DiscoveryDataPacketSerializer());
factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new,
Expand Down Expand Up @@ -110,5 +117,7 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)17, TcpDiscoveryNodeFailedMessage::new, new TcpDiscoveryNodeFailedMessageSerializer());
factory.register((short)18, TcpDiscoveryStatusCheckMessage::new, new TcpDiscoveryStatusCheckMessageSerializer());
factory.register((short)19, TcpDiscoveryNodeAddFinishedMessage::new, new TcpDiscoveryNodeAddFinishedMessageSerializer());
factory.register((short)20, TcpDiscoveryClientReconnectMessage::new, new TcpDiscoveryClientReconnectMessageSerializer());
factory.register((short)21, TcpDiscoveryNodeAddedMessage::new, new TcpDiscoveryNodeAddedMessageSerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2238,7 +2238,7 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {

nodeAdded = true;

if (msg.topologyHistory() != null)
if (!F.isEmpty(msg.topologyHistory()))
topHist.putAll(msg.topologyHistory());
}
else {
Expand Down Expand Up @@ -2307,8 +2307,6 @@ private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage ms
delayDiscoData.clear();
}

msg.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));

locNode.setAttributes(msg.clientNodeAttributes());

clearNodeSensitiveData(locNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Metric for max message queue size. */
private MaxValueMetric maxMsgQueueSizeMetric;

/** Failed nodes (but still in topology). */
/** Failed nodes (but still in topology): Node -> Id of the failure issuer node. */
private final Map<TcpDiscoveryNode, UUID> failedNodes = new HashMap<>();

/** */
Expand Down Expand Up @@ -2490,8 +2490,6 @@ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
if (addFinishMsg.clientDiscoData() != null) {
addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg);

addFinishMsg.prepareMarshal(spi.marshaller());

msg = addFinishMsg;

DiscoveryDataPacket discoData = addFinishMsg.clientDiscoData();
Expand Down Expand Up @@ -4863,8 +4861,6 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
addFinishMsg.clientDiscoData(msg.gridDiscoveryData());

addFinishMsg.clientNodeAttributes(node.attributes());

addFinishMsg.prepareMarshal(spi.marshaller());
}

addFinishMsg = tracing.messages().branch(addFinishMsg, msg);
Expand Down Expand Up @@ -5080,7 +5076,9 @@ else if (spiState == CONNECTING)
joiningNodesDiscoDataList = new ArrayList<>();

topHist.clear();
topHist.putAll(msg.topologyHistory());

if (!F.isEmpty(msg.topologyHistory()))
topHist.putAll(msg.topologyHistory());

pendingMsgs.reset(msg.messages());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMarshallableMessage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -108,8 +109,16 @@ public class TcpDiscoveryIoSession {

msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);

msgWriter = new DirectMessageWriter(spi.messageFactory());
msgReader = new DirectMessageReader(spi.messageFactory(), null);
msgWriter = new DirectMessageWriter(spi.messageFactory(), msg -> {
if (msg instanceof TcpDiscoveryMarshallableMessage)
((TcpDiscoveryMarshallableMessage)msg).prepareMarshal(spi.marshaller());
});
msgReader = new DirectMessageReader(spi.messageFactory(), null, msg -> {
if (msg instanceof TcpDiscoveryMarshallableMessage) {
((TcpDiscoveryMarshallableMessage)msg).finishUnmarshal(spi.marshaller(),
U.resolveClassLoader(spi.ignite().configuration()));
}
});

try {
int sendBufSize = sock.getSendBufferSize() > 0 ? sock.getSendBufferSize() : DFLT_SOCK_BUFFER_SIZE;
Expand Down Expand Up @@ -211,6 +220,9 @@ <T> T readMessage() throws IgniteCheckedException, IOException {
}
while (!finished);

if (msg instanceof TcpDiscoveryMarshallableMessage)
((TcpDiscoveryMarshallableMessage)msg).finishUnmarshal(spi.marshaller(), clsLdr);

return (T)msg;
}
catch (Exception e) {
Expand Down Expand Up @@ -250,6 +262,9 @@ public Socket socket() {
* @throws IOException If serialization fails.
*/
void serializeMessage(Message m, OutputStream out) throws IOException {
if (m instanceof TcpDiscoveryMarshallableMessage)
((TcpDiscoveryMarshallableMessage)m).prepareMarshal(spi.marshaller());

MessageSerializer msgSer = spi.messageFactory().serializer(m.directType());

msgWriter.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ public void clear() {
*/
@Nullable public TcpDiscoveryNode nextNode(@Nullable Collection<TcpDiscoveryNode> excluded) {
assert locNode.internalOrder() > 0 : locNode;
// TODO: May fire, https://issues.apache.org/jira/browse/IGNITE-27933
assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode) : excluded;

rwLock.readLock().lock();
Expand Down
Loading