Skip to content

Comments

fix(ws)!: backpressure instead of fail when subscribers lag #184

Open
ariofrio wants to merge 4 commits intoPolymarket:mainfrom
ariofrio:ariofrio/ws-backpressure
Open

fix(ws)!: backpressure instead of fail when subscribers lag #184
ariofrio wants to merge 4 commits intoPolymarket:mainfrom
ariofrio:ariofrio/ws-backpressure

Conversation

@ariofrio
Copy link
Contributor

@ariofrio ariofrio commented Jan 13, 2026

Problem

Currently, ws::Client and rtds::Client greedily consume messages from the WebSocket connection without backpressure. If any subscriber is too slow to process those messages compared to either the fastest subscriber OR the WebSocket server, the slow subscriber yields WsError::Lagged.

This is fine for many use cases, but makes initial subscription to thousands of assets unreliable.

On a good Internet connection (e.g. wired and gigabit), more than BROADCAST_CAPACITY messages can be received by the WebSocket client before the first few calls to subscriber_stream.next(), causing it to yield WsError::Lagged even if there is no blocking or slow code in the processing loop. I've worked around this by retrying, but this is not ideal.

Solution

This PR replaces tokio::sync::broadcast::Sender with async-broadcast::Sender to pause WS incoming message processing once channel capacity is reached, applying back pressure on the WS connection.

Drawbacks

Drawbacks of this implementation:

  • This also prevents processing PONGs in response to PINGs, close, and error messages from the WebSocket client, while waiting for subscribers to catch up. I'm not sure how we would work around this since all of these come in the same WsStream. In any case, the old behavior is to drop messages, which is probably worse than the connection timing out.
  • Blocking message processing might make it theoretically possible to create deadlocks by having subscribers wait on each other. I haven't worked out how exactly, and doesn't seem like a problem likely to come up often, but just something to keep in mind.

Alternatives Considered

I considered making the channel capacity configurable, but that seemed like a brittle solution. WebSockets over TCP already implement backpressure, so we should use it instead of side-stepping it. Latency-sensitive applications can rely on benchmarking, metrics, and monitoring instead of missing messages when subscribers are too slow.

Backwards Compatibility

The only change to the API is to remove WsError::Lagged, since it cannot occur anymore.

The old behavior of dropping messages when subscribers lag can be replicated by library users by merging the streams, putting a bounded mpsc channel in between, and splitting them again if needed.

when subscribed to thousands of assets, lag failure can occur even on the first next() called immaditely after subscribe_*()

use async-broadcast to block WS incoming message processing once channel capacity is reached. drawbacks:

- sending PONG in response to PING, close, and error messages are also blocked when subscribers lag. not sure how we could work around this. in any case, the old behavior is to drop messages, which is probably not better
- it becomes possible to create deadlocks by having subscribers wait on each other

the old behavior can be replicated by users by putting an mspc::bounded in front of the Stream
@codecov
Copy link

codecov bot commented Jan 13, 2026

Codecov Report

❌ Patch coverage is 85.71429% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 85.86%. Comparing base (f63cc98) to head (5e882cf).

Files with missing lines Patch % Lines
src/rtds/subscription.rs 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #184      +/-   ##
==========================================
+ Coverage   85.81%   85.86%   +0.04%     
==========================================
  Files          32       32              
  Lines        4887     4882       -5     
==========================================
- Hits         4194     4192       -2     
+ Misses        693      690       -3     
Flag Coverage Δ
rust 85.86% <85.71%> (+0.04%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

#[cfg(feature = "tracing")]
tracing::trace!(?message, "Parsed WebSocket message");
_ = broadcast_tx.send(message);
_ = broadcast_tx.broadcast(message).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something but doesn't setting set_await_active(false) (line 125 RHS) mean that the broadcast will error here (be silently dropped) and then the message will also be dropped?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ariofrio thoughts?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants