fix(ws)!: backpressure instead of fail when subscribers lag #184
Open
ariofrio wants to merge 4 commits intoPolymarket:mainfrom
Open
fix(ws)!: backpressure instead of fail when subscribers lag #184ariofrio wants to merge 4 commits intoPolymarket:mainfrom
ariofrio wants to merge 4 commits intoPolymarket:mainfrom
Conversation
when subscribed to thousands of assets, lag failure can occur even on the first next() called immaditely after subscribe_*() use async-broadcast to block WS incoming message processing once channel capacity is reached. drawbacks: - sending PONG in response to PING, close, and error messages are also blocked when subscribers lag. not sure how we could work around this. in any case, the old behavior is to drop messages, which is probably not better - it becomes possible to create deadlocks by having subscribers wait on each other the old behavior can be replicated by users by putting an mspc::bounded in front of the Stream
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #184 +/- ##
==========================================
+ Coverage 85.81% 85.86% +0.04%
==========================================
Files 32 32
Lines 4887 4882 -5
==========================================
- Hits 4194 4192 -2
+ Misses 693 690 -3
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| #[cfg(feature = "tracing")] | ||
| tracing::trace!(?message, "Parsed WebSocket message"); | ||
| _ = broadcast_tx.send(message); | ||
| _ = broadcast_tx.broadcast(message).await; |
Collaborator
There was a problem hiding this comment.
I might be missing something but doesn't setting set_await_active(false) (line 125 RHS) mean that the broadcast will error here (be silently dropped) and then the message will also be dropped?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
Currently,
ws::Clientandrtds::Clientgreedily consume messages from the WebSocket connection without backpressure. If any subscriber is too slow to process those messages compared to either the fastest subscriber OR the WebSocket server, the slow subscriber yieldsWsError::Lagged.This is fine for many use cases, but makes initial subscription to thousands of assets unreliable.
On a good Internet connection (e.g. wired and gigabit), more than
BROADCAST_CAPACITYmessages can be received by the WebSocket client before the first few calls tosubscriber_stream.next(), causing it to yieldWsError::Laggedeven if there is no blocking or slow code in the processing loop. I've worked around this by retrying, but this is not ideal.Solution
This PR replaces
tokio::sync::broadcast::Senderwithasync-broadcast::Senderto pause WS incoming message processing once channel capacity is reached, applying back pressure on the WS connection.Drawbacks
Drawbacks of this implementation:
WsStream. In any case, the old behavior is to drop messages, which is probably worse than the connection timing out.Alternatives Considered
I considered making the channel capacity configurable, but that seemed like a brittle solution. WebSockets over TCP already implement backpressure, so we should use it instead of side-stepping it. Latency-sensitive applications can rely on benchmarking, metrics, and monitoring instead of missing messages when subscribers are too slow.
Backwards Compatibility
The only change to the API is to remove
WsError::Lagged, since it cannot occur anymore.The old behavior of dropping messages when subscribers lag can be replicated by library users by merging the streams, putting a bounded mpsc channel in between, and splitting them again if needed.