2323import io .netty .channel .ChannelFuture ;
2424import org .checkerframework .checker .nullness .qual .Nullable ;
2525import java .time .Instant ;
26+ import java .util .BitSet ;
2627import java .util .concurrent .CompletableFuture ;
28+ import java .util .concurrent .atomic .AtomicInteger ;
2729import java .util .function .Function ;
2830
2931/**
3234 */
3335public class ChatQueue {
3436
35- private final Object internalLock ;
37+ private final Object internalLock = new Object () ;
3638 private final ConnectedPlayer player ;
37- private CompletableFuture <WrappedPacket > packetFuture ;
39+ private final ChatState chatState = new ChatState ();
40+ private CompletableFuture <Void > head = CompletableFuture .completedFuture (null );
3841
3942 /**
4043 * Instantiates a {@link ChatQueue} for a specific {@link ConnectedPlayer}.
@@ -43,130 +46,135 @@ public class ChatQueue {
4346 */
4447 public ChatQueue (ConnectedPlayer player ) {
4548 this .player = player ;
46- this .packetFuture = CompletableFuture .completedFuture (new WrappedPacket (Instant .EPOCH , null ));
47- this .internalLock = new Object ();
49+ }
50+
51+ private void queueTask (Task task ) {
52+ synchronized (internalLock ) {
53+ MinecraftConnection smc = player .ensureAndGetCurrentServer ().ensureConnected ();
54+ head = head .thenCompose (v -> {
55+ try {
56+ return task .update (chatState , smc ).exceptionally (ignored -> null );
57+ } catch (Throwable ignored ) {
58+ return CompletableFuture .completedFuture (null );
59+ }
60+ });
61+ }
4862 }
4963
5064 /**
5165 * Queues a packet sent from the player - all packets must wait until this processes to send their
5266 * packets. This maintains order on the server-level for the client insertions of commands
5367 * and messages. All entries are locked through an internal object lock.
5468 *
55- * @param nextPacket the {@link CompletableFuture} which will provide the next-processed packet.
56- * @param timestamp the {@link Instant} timestamp of this packet so we can allow piggybacking.
69+ * @param nextPacket a function mapping {@link LastSeenMessages} state to a {@link CompletableFuture} that will
70+ * provide the next-processed packet. This should include the fixed {@link LastSeenMessages}.
71+ * @param timestamp the new {@link Instant} timestamp of this packet to update the internal chat state.
72+ * @param lastSeenMessages the new {@link LastSeenMessages} last seen messages to update the internal chat state.
5773 */
58- public void queuePacket (CompletableFuture <MinecraftPacket > nextPacket , Instant timestamp ) {
59- synchronized (internalLock ) { // wait for the lock to resolve - we don't want to drop packets
60- MinecraftConnection smc = player .ensureAndGetCurrentServer ().ensureConnected ();
61-
62- CompletableFuture <WrappedPacket > nextInLine = WrappedPacket .wrap (timestamp , nextPacket );
63- this .packetFuture = awaitChat (smc , this .packetFuture ,
64- nextInLine ); // we await chat, binding `this.packetFuture` -> `nextInLine`
65- }
74+ public void queuePacket (Function <LastSeenMessages , CompletableFuture <MinecraftPacket >> nextPacket , @ Nullable Instant timestamp , @ Nullable LastSeenMessages lastSeenMessages ) {
75+ queueTask ((chatState , smc ) -> {
76+ LastSeenMessages newLastSeenMessages = chatState .updateFromMessage (timestamp , lastSeenMessages );
77+ return nextPacket .apply (newLastSeenMessages ).thenCompose (packet -> writePacket (packet , smc ));
78+ });
6679 }
6780
6881 /**
69- * Hijacks the latest sent packet's timestamp to provide an in-order packet without polling the
82+ * Hijacks the latest sent packet's chat state to provide an in-order packet without polling the
7083 * physical, or prior packets sent through the stream.
7184 *
72- * @param packet the {@link MinecraftPacket} to send.
73- * @param instantMapper the {@link InstantPacketMapper} which maps the prior timestamp and current
74- * packet to a new packet.
75- * @param <K> the type of base to expect when mapping the packet.
76- * @param <V> the type of packet for instantMapper type-checking.
85+ * @param packetFunction a function that maps the prior {@link ChatState} into a new packet.
86+ * @param <T> the type of packet to send.
7787 */
78- public <K , V extends MinecraftPacket > void hijack (K packet ,
79- InstantPacketMapper <K , V > instantMapper ) {
80- synchronized (internalLock ) {
81- CompletableFuture <K > trueFuture = CompletableFuture .completedFuture (packet );
82- MinecraftConnection smc = player .ensureAndGetCurrentServer ().ensureConnected ();
88+ public <T extends MinecraftPacket > void queuePacket (Function <ChatState , T > packetFunction ) {
89+ queueTask ((chatState , smc ) -> {
90+ T packet = packetFunction .apply (chatState );
91+ return writePacket (packet , smc );
92+ });
93+ }
8394
84- this .packetFuture = hijackCurrentPacket (smc , this .packetFuture , trueFuture , instantMapper );
85- }
95+ public void handleAcknowledgement (int offset ) {
96+ queueTask ((chatState , smc ) -> {
97+ int ackCountToForward = chatState .accumulateAckCount (offset );
98+ if (ackCountToForward > 0 ) {
99+ return writePacket (new ChatAcknowledgementPacket (ackCountToForward ), smc );
100+ }
101+ return CompletableFuture .completedFuture (null );
102+ });
86103 }
87104
88- private static Function < WrappedPacket , WrappedPacket > writePacket (MinecraftConnection connection ) {
89- return wrappedPacket -> {
90- if (!connection .isClosed ()) {
91- ChannelFuture future = wrappedPacket .write (connection );
105+ private static < T extends MinecraftPacket > CompletableFuture < Void > writePacket (T packet , MinecraftConnection smc ) {
106+ return CompletableFuture . runAsync (() -> {
107+ if (!smc .isClosed ()) {
108+ ChannelFuture future = smc .write (packet );
92109 if (future != null ) {
93110 future .awaitUninterruptibly ();
94111 }
95112 }
96-
97- return wrappedPacket ;
98- };
113+ }, smc .eventLoop ());
99114 }
100115
101- private static <T extends MinecraftPacket > CompletableFuture <WrappedPacket > awaitChat (
102- MinecraftConnection connection ,
103- CompletableFuture <WrappedPacket > binder ,
104- CompletableFuture <WrappedPacket > future
105- ) {
106- // the binder will run -> then the future will get the `write packet` caller
107- return binder .thenCompose (ignored -> future .thenApply (writePacket (connection )));
108- }
109-
110- private static <K , V extends MinecraftPacket > CompletableFuture <WrappedPacket > hijackCurrentPacket (
111- MinecraftConnection connection ,
112- CompletableFuture <WrappedPacket > binder ,
113- CompletableFuture <K > future ,
114- InstantPacketMapper <K , V > packetMapper
115- ) {
116- CompletableFuture <WrappedPacket > awaitedFuture = new CompletableFuture <>();
117- // the binder will complete -> then the future will get the `write packet` caller
118- binder .whenComplete ((previous , ignored ) -> {
119- // map the new packet into a better "designed" packet with the hijacked packet's timestamp
120- WrappedPacket .wrap (previous .timestamp ,
121- future .thenApply (item -> packetMapper .map (previous .timestamp , item )))
122- .thenApplyAsync (writePacket (connection ), connection .eventLoop ())
123- .whenComplete (
124- (packet , throwable ) -> awaitedFuture .complete (throwable != null ? null : packet ));
125- });
126- return awaitedFuture ;
116+ private interface Task {
117+ CompletableFuture <Void > update (ChatState chatState , MinecraftConnection smc );
127118 }
128119
129120 /**
130- * Provides an {@link Instant} based timestamp mapper from an existing object to create a packet.
121+ * Tracks the last Secure Chat state that we received from the client. This is important to always have a valid 'last
122+ * seen' state that is consistent with future and past updates from the client (which may be signed). This state is
123+ * used to construct 'spoofed' command packets from the proxy to the server.
124+ * <ul>
125+ * <li>If we last forwarded a chat or command packet from the client, we have a known 'last seen' that we can
126+ * reuse.</li>
127+ * <li>If we last forwarded a {@link ChatAcknowledgementPacket}, the previous 'last seen' cannot be reused. We
128+ * cannot predict an up-to-date 'last seen', as we do not know which messages the client actually saw.</li>
129+ * <li>Therefore, we need to hold back any acknowledgement packets so that we can continue to reuse the last valid
130+ * 'last seen' state.</li>
131+ * <li>However, there is a limit to the number of messages that can remain unacknowledged on the server.</li>
132+ * <li>To address this, we know that if the client has moved its 'last seen' window far enough, we can fill in the
133+ * gap with dummy 'last seen', and it will never be checked.</li>
134+ * </ul>
131135 *
132- * @param <K> The base object type to map.
133- * @param <V> The resulting packet type .
136+ * Note that this is effectively unused for 1.20.5+ clients, as commands without any signature do not send 'last seen'
137+ * updates .
134138 */
135- public interface InstantPacketMapper <K , V extends MinecraftPacket > {
136-
137- /**
138- * Maps a value into a packet with it and a timestamp.
139- *
140- * @param nextInstant the {@link Instant} timestamp to use for tracking.
141- * @param currentObject the current item to map to the packet.
142- * @return The resulting packet from the mapping.
143- */
144- V map (Instant nextInstant , K currentObject );
145- }
146-
147- private static class WrappedPacket {
139+ public static class ChatState {
140+ private static final int MINIMUM_DELAYED_ACK_COUNT = LastSeenMessages .WINDOW_SIZE ;
141+ private static final BitSet DUMMY_LAST_SEEN_MESSAGES = new BitSet ();
148142
149- private final Instant timestamp ;
150- private final MinecraftPacket packet ;
143+ public volatile Instant lastTimestamp = Instant .EPOCH ;
144+ private volatile BitSet lastSeenMessages = new BitSet ();
145+ private final AtomicInteger delayedAckCount = new AtomicInteger ();
151146
152- private WrappedPacket (Instant timestamp , MinecraftPacket packet ) {
153- this .timestamp = timestamp ;
154- this .packet = packet ;
147+ private ChatState () {
155148 }
156149
157150 @ Nullable
158- public ChannelFuture write (MinecraftConnection connection ) {
159- if (packet != null ) {
160- return connection .write (packet );
151+ public LastSeenMessages updateFromMessage (@ Nullable Instant timestamp , @ Nullable LastSeenMessages lastSeenMessages ) {
152+ if (timestamp != null ) {
153+ this .lastTimestamp = timestamp ;
154+ }
155+ if (lastSeenMessages != null ) {
156+ // We held back some acknowledged messages, so flush that out now that we have a known 'last seen' state again
157+ int delayedAckCount = this .delayedAckCount .getAndSet (0 );
158+ this .lastSeenMessages = lastSeenMessages .getAcknowledged ();
159+ return lastSeenMessages .offset (delayedAckCount );
161160 }
162161 return null ;
163162 }
164163
165- private static CompletableFuture <WrappedPacket > wrap (Instant timestamp ,
166- CompletableFuture <MinecraftPacket > nextPacket ) {
167- return nextPacket
168- .thenApply (pkt -> new WrappedPacket (timestamp , pkt ))
169- .exceptionally (ignored -> new WrappedPacket (timestamp , null ));
164+ public int accumulateAckCount (int ackCount ) {
165+ int delayedAckCount = this .delayedAckCount .addAndGet (ackCount );
166+ int ackCountToForward = delayedAckCount - MINIMUM_DELAYED_ACK_COUNT ;
167+ if (ackCountToForward >= LastSeenMessages .WINDOW_SIZE ) {
168+ // Because we only forward acknowledgements above the window size, we don't have to shift the previous 'last seen' state
169+ this .lastSeenMessages = DUMMY_LAST_SEEN_MESSAGES ;
170+ this .delayedAckCount .set (MINIMUM_DELAYED_ACK_COUNT );
171+ return ackCountToForward ;
172+ }
173+ return 0 ;
174+ }
175+
176+ public LastSeenMessages createLastSeen () {
177+ return new LastSeenMessages (0 , lastSeenMessages );
170178 }
171179 }
172180}
0 commit comments