1212use libproxy \protocol \LoginPacket ;
1313use libproxy \protocol \ProxyPacket ;
1414use libproxy \protocol \ProxyPacketPool ;
15- use libproxy \protocol \ProxyPacketSerializer ;
1615use NetherGames \Quiche \io \QueueWriter ;
1716use NetherGames \Quiche \QuicheConnection ;
1817use NetherGames \Quiche \socket \QuicheServerSocket ;
1918use NetherGames \Quiche \SocketAddress ;
2019use NetherGames \Quiche \stream \BiDirectionalQuicheStream ;
2120use NetherGames \Quiche \stream \QuicheStream ;
21+ use pmmp \encoding \BE ;
22+ use pmmp \encoding \ByteBufferReader ;
23+ use pmmp \encoding \ByteBufferWriter ;
24+ use pmmp \encoding \DataDecodeException ;
25+ use pmmp \encoding \LE ;
2226use pmmp \thread \ThreadSafeArray ;
2327use pocketmine \network \mcpe \compression \DecompressionException ;
2428use pocketmine \network \mcpe \compression \ZlibCompressor ;
3135use pocketmine \network \mcpe \protocol \ProtocolInfo ;
3236use pocketmine \network \mcpe \protocol \RequestNetworkSettingsPacket ;
3337use pocketmine \network \mcpe \protocol \serializer \PacketBatch ;
34- use pocketmine \network \mcpe \protocol \serializer \PacketSerializer ;
3538use pocketmine \network \mcpe \protocol \types \CompressionAlgorithm ;
3639use pocketmine \network \mcpe \raklib \PthreadsChannelReader ;
3740use pocketmine \network \mcpe \raklib \SnoozeAwarePthreadsChannelWriter ;
3841use pocketmine \network \PacketHandlingException ;
3942use pocketmine \snooze \SleeperHandler ;
4043use pocketmine \snooze \SleeperHandlerEntry ;
4144use pocketmine \thread \log \AttachableThreadSafeLogger ;
42- use pocketmine \utils \Binary ;
43- use pocketmine \utils \BinaryDataException ;
44- use pocketmine \utils \BinaryStream ;
4545use Socket ;
4646use function array_keys ;
4747use function base64_encode ;
5656
5757class ProxyServer
5858{
59+ private const INCOMING_PACKET_BATCH_HARD_LIMIT = 300 ;
60+
5961 /** @var PthreadsChannelReader */
6062 private PthreadsChannelReader $ mainToThreadReader ;
6163 /** @var SnoozeAwarePthreadsChannelWriter */
@@ -221,12 +223,12 @@ private function shutdownStream(int $streamIdentifier, string $reason, bool $fro
221223
222224 private function sendToMainBuffer (int $ streamIdentifier , ProxyPacket $ pk ): void
223225 {
224- $ serializer = new ProxyPacketSerializer ();
225- $ serializer-> putLInt ( $ streamIdentifier );
226+ $ serializer = new ByteBufferWriter ();
227+ LE :: writeUnsignedInt ( $ serializer, $ streamIdentifier );
226228
227229 $ pk ->encode ($ serializer );
228230
229- $ this ->threadToMainWriter ->write ($ serializer ->getBuffer ());
231+ $ this ->threadToMainWriter ->write ($ serializer ->getData ());
230232 }
231233
232234 public function tickProcessor (): void
@@ -237,16 +239,16 @@ public function tickProcessor(): void
237239 private function pushSockets (): void
238240 {
239241 while (($ payload = $ this ->mainToThreadReader ->read ()) !== null ) {
240- $ stream = new ProxyPacketSerializer ($ payload );
241- $ streamIdentifier = $ stream-> getLInt ( );
242+ $ stream = new ByteBufferReader ($ payload );
243+ $ streamIdentifier = LE :: readUnsignedInt ( $ stream );
242244
243- if (($ pk = ProxyPacketPool::getInstance ()->getPacket ($ payload, $ stream -> getOffset () )) === null ) {
245+ if (($ pk = ProxyPacketPool::getInstance ()->getPacket ($ payload )) === null ) {
244246 throw new PacketHandlingException ('Packet does not exist ' );
245247 }
246248
247249 try {
248250 $ pk ->decode ($ stream );
249- } catch (BinaryDataException $ e ) {
251+ } catch (DataDecodeException $ e ) {
250252 $ this ->logger ->debug ('Closed stream with id( ' . $ streamIdentifier . ') because server sent invalid packet ' );
251253 $ this ->shutdownStream ($ streamIdentifier , 'invalid packet ' , false );
252254 return ;
@@ -278,7 +280,7 @@ private function sendPayloadWithReceipt(int $streamIdentifier, string $payload,
278280 return ;
279281 }
280282
281- $ writer ->writeWithPromise (Binary:: writeInt (strlen ($ payload )) . $ payload )->onResult (function () use ($ streamIdentifier , $ receiptId ): void {
283+ $ writer ->writeWithPromise (BE :: packSignedInt (strlen ($ payload )) . $ payload )->onResult (function () use ($ streamIdentifier , $ receiptId ): void {
282284 $ pk = new AckPacket ();
283285 $ pk ->receiptId = $ receiptId ;
284286
@@ -296,7 +298,7 @@ private function sendPayload(int $streamIdentifier, string $payload): void
296298 return ;
297299 }
298300
299- $ writer ->write (Binary:: writeInt (strlen ($ payload )) . $ payload );
301+ $ writer ->write (BE :: packSignedInt (strlen ($ payload )) . $ payload );
300302 }
301303
302304 /**
@@ -323,26 +325,26 @@ private function getProtocolId(int $streamIdentifier): int
323325 */
324326 private function sendDataPacket (int $ streamIdentifier , BedrockPacket $ packet ): void
325327 {
326- $ packetSerializer = PacketSerializer:: encoder ( $ protocolId = $ this -> getProtocolId ( $ streamIdentifier ) );
327- $ packet ->encode ($ packetSerializer );
328+ $ packetSerializer = new ByteBufferWriter ( );
329+ $ packet ->encode ($ packetSerializer, $ protocolId = $ this -> getProtocolId ( $ streamIdentifier ) );
328330
329- $ stream = new BinaryStream ();
330- PacketBatch::encodeRaw ($ stream , [$ packetSerializer ->getBuffer ()]);
331- $ payload = ($ protocolId >= ProtocolInfo::PROTOCOL_1_20_60 ? chr (CompressionAlgorithm::ZLIB ) : '' ) . ZlibCompressor::getInstance ()->compress ($ stream ->getBuffer ());
331+ $ stream = new ByteBufferWriter ();
332+ PacketBatch::encodeRaw ($ stream , [$ packetSerializer ->getData ()]);
333+ $ payload = ($ protocolId >= ProtocolInfo::PROTOCOL_1_20_60 ? chr (CompressionAlgorithm::ZLIB ) : '' ) . ZlibCompressor::getInstance ()->compress ($ stream ->getData ());
332334
333335 $ this ->sendPayload ($ streamIdentifier , $ payload );
334336 }
335337
336338 private function decodePacket (int $ streamIdentifier , BedrockPacket $ packet , string $ buffer ): void
337339 {
338- $ stream = PacketSerializer:: decoder ( $ this -> protocolId [ $ streamIdentifier ] ?? ProtocolInfo:: CURRENT_PROTOCOL , $ buffer, 0 );
340+ $ stream = new ByteBufferReader ( $ buffer );
339341 try {
340- $ packet ->decode ($ stream );
342+ $ packet ->decode ($ stream, $ this -> protocolId [ $ streamIdentifier ] ?? ProtocolInfo:: CURRENT_PROTOCOL );
341343 } catch (PacketDecodeException $ e ) {
342344 throw PacketHandlingException::wrap ($ e );
343345 }
344- if (! $ stream ->feof () ) {
345- $ remains = substr ($ stream ->getBuffer (), $ stream ->getOffset ());
346+ if ($ stream ->getUnreadLength () > 0 ) {
347+ $ remains = substr ($ stream ->getData (), $ stream ->getOffset ());
346348 $ this ->logger ->debug ("Still " . strlen ($ remains ) . " bytes unread in " . $ packet ->getName () . ": " . bin2hex ($ remains ));
347349 }
348350 }
@@ -407,14 +409,18 @@ private function onFullDataReceive(int $streamIdentifier, string $payload): void
407409 throw PacketHandlingException::wrap ($ e , "Compressed packet batch decode error " );
408410 }
409411
412+ $ count = 0 ;
410413 try {
411- $ stream = new BinaryStream ($ decompressed );
412- $ count = 0 ;
414+ $ stream = new ByteBufferReader ($ decompressed );
413415 foreach (PacketBatch::decodeRaw ($ stream ) as $ buffer ) {
414- $ this ->getGamePacketLimiter ($ streamIdentifier )->decrement ();
415- if (++$ count > 100 ) {
416- throw new PacketHandlingException ("Too many packets in batch " );
416+ if (++$ count >= self ::INCOMING_PACKET_BATCH_HARD_LIMIT ){
417+ //this should be well more than enough; under normal conditions the game packet rate limiter
418+ //will kick in well before this. This is only here to make sure we can't get huge batches of
419+ //noisy packets to bog down the server, since those aren't counted by the regular limiter.
420+ throw new PacketHandlingException ("Reached hard limit of " . self ::INCOMING_PACKET_BATCH_HARD_LIMIT . " per batch packet " );
417421 }
422+
423+ $ this ->getGamePacketLimiter ($ streamIdentifier )->decrement ();
418424 $ packet = PacketPool::getInstance ()->getPacket ($ buffer );
419425 if ($ packet === null ) {
420426 $ this ->logger ->debug ("Unknown packet: " . base64_encode ($ buffer ));
@@ -429,7 +435,7 @@ private function onFullDataReceive(int $streamIdentifier, string $payload): void
429435 throw PacketHandlingException::wrap ($ e , "Error processing " . $ packet ->getName ());
430436 }
431437 }
432- } catch (PacketDecodeException |BinaryDataException $ e ) {
438+ } catch (PacketDecodeException |DataDecodeException $ e ) {
433439 $ this ->logger ->logException ($ e );
434440 throw PacketHandlingException::wrap ($ e , "Packet batch decode error " );
435441 }
@@ -457,8 +463,8 @@ private function onDataReceive(int $streamIdentifier, string $data): void
457463 return ; // wait for more data
458464 } else {
459465 try {
460- $ packetLength = Binary:: readInt (substr ($ buffer , 0 , 4 ));
461- } catch (BinaryDataException $ exception ) {
466+ $ packetLength = BE :: unpackSignedInt (substr ($ buffer , 0 , 4 ));
467+ } catch (DataDecodeException $ exception ) {
462468 $ this ->shutdownStream ($ streamIdentifier , 'invalid packet ' , false );
463469 return ;
464470 }
0 commit comments