Add async send/recv to TcpConn (#2056)#2056
Open
rmahidhar wants to merge 3 commits intometa-pytorch:mainfrom
Open
Add async send/recv to TcpConn (#2056)#2056rmahidhar wants to merge 3 commits intometa-pytorch:mainfrom
rmahidhar wants to merge 3 commits intometa-pytorch:mainfrom
Conversation
Contributor
|
@rmahidhar has exported this pull request. If you are a Meta employee, you can view the originating Diff in D97788124. |
added 2 commits
April 13, 2026 12:00
Summary:
Refactor TcpServer from a monolithic class (mixed sync/async via runtime checks)
into a policy-based template `TcpServer<AcceptPolicy>` with compile-time dispatch.
**Motivation:**
The original TcpServer combined sync and async modes in a single class:
- `evb_` was a nullable pointer with null checks scattered throughout
- `asyncAccept()` on a sync server silently returned nullptr (runtime error)
- `accept()` on an async server silently returned nullptr (runtime error)
- `shutdown()` had 3 if/else branches mixing sync and async code paths
**Design: Policy-based (chosen over inheritance and composition)**
- 1 template + N policy structs vs 3 classes for inheritance/composition
- Zero-cost: compiler sees concrete policy type, can inline through policy calls
- Compile-time safety: wrong mode is a compile error, not a runtime nullptr
- Extensible: adding IoUringAccept is one new struct, no existing code modified
**Accept policies:**
- `SyncAccept` — blocks calling thread, returns ready future. sizeof == 1.
- `AsyncAccept(EventBase&)` — non-blocking via fd-watching, returns pending future.
Reference (not pointer) — forgetting EventBase is a compile error.
**Interface change:**
`Server::accept()` returns `std::future<std::unique_ptr<Conn>>` (was `std::unique_ptr<Conn>`).
Both policies return futures; the policy determines how the future is fulfilled:
- SyncAccept: blocks, then returns ready future (`.get()` is instant — 2 atomic ops)
- AsyncAccept: returns immediately, `.get()` blocks until EventBase resolves
**Usage:**
```
// Sync:
TcpServer<SyncAccept> server("host:0");
auto conn = server.accept().get();
// Async:
TcpServer<AsyncAccept> server("host:0", 5, evb);
auto future = server.accept();
auto conn = future.get();
```
Detailed design: fbcode/comms/fb/uniflow/controller/PolicyBasedTcpDesign.md
Differential Revision: D96984108
Summary:
Replace monolithic TcpClient with policy-based `TcpClient<ConnectPolicy>` template,
mirroring the `TcpServer<AcceptPolicy>` pattern from D96984108.
## Design
Two connect policies, selected at compile time:
| Policy | Mechanism | Retry | EventBase |
|--------|-----------|-------|-----------|
| `SyncConnect` | Blocking `connect()` with linear backoff | Yes (`numRetries` × `retryTimeout`) | No |
| `AsyncConnect` | Non-blocking `connect()` + EPOLLOUT via EventBase | No (single attempt) | Required |
Both policies produce `std::future<std::unique_ptr<Conn>>` — the caller doesn't need
to know whether the connect was sync or async.
`AsyncConnect` uses `EPOLLONESHOT` — the fd is automatically deregistered after the
connect event fires, so no explicit `unregisterFd` is needed and there are no
deferred-close concerns.
## Usage
```cpp
// Sync: blocking connect with retries
TcpClient<SyncConnect> client(/*retries=*/10, /*timeout=*/1000ms);
auto conn = client.connect("host:port").get();
// Async: non-blocking connect via EventBase
TcpClient<AsyncConnect> client(/*retries=*/10, /*timeout=*/1000ms, evb);
auto future = client.connect("host:port");
// ... future resolves when connect + handshake complete
auto conn = future.get();
```
## Implementation
**Shared helpers** (anonymous namespace, used by both policies):
- `resolveConnectAddr()` — parses host:port, detects address family, builds sockaddr
- `finishConnect()` — configures socket options + magic handshake (produces `TcpConn<SyncIO>`)
- `completeAsyncConnect()` — restores blocking mode, sets handshake timeout, calls `finishConnect()`
- `FdGuard` — RAII guard that closes fd on destruction unless `release()`d
- `initiateConnect()` — resolves address, creates non-blocking socket, calls `connect()`; returns `{fd, connected}` pair
**AsyncConnect flow:**
1. `initiateConnect()` creates a `SOCK_NONBLOCK` socket and calls `connect()`
2. If `connect()` returns 0 (immediate), calls `completeAsyncConnect()` directly
3. If `EINPROGRESS`, wraps fd in `ConnectState` (shared_ptr for `registerFd` callback copyability), registers `EPOLLOUT | EPOLLONESHOT` on EventBase
4. On EPOLLOUT: checks `SO_ERROR`, releases `FdGuard`, calls `completeAsyncConnect()`
5. On failure: `FdGuard` destructor closes the fd, promise resolves to `nullptr`
**Interface change:** `Client::connect()` return type changed from `std::unique_ptr<Conn>` to `std::future<std::unique_ptr<Conn>>` to support async connect without blocking the caller.
Differential Revision: D97703353
462e34b to
eab1211
Compare
rmahidhar
pushed a commit
to rmahidhar/torchcomms
that referenced
this pull request
Apr 13, 2026
Summary:
Refactor TcpConn into a policy-based template `TcpConn<IOPolicy>` and add
epoll-driven async send/recv for non-blocking data and control plane
messaging on the EventBase thread.
## Design
**Policy-based TcpConn:**
- `TcpConn<SyncIO>` — sync-only, no EventBase dependency, zero
overhead via `[[no_unique_address]]`
- `TcpConn<AsyncIO>` — sync + async, holds `EventBase&` and all async
state (`SendState`, `RecvState`) in the `AsyncIO` policy struct
- Async methods use C++20 `requires` clauses — calling async on a
`TcpConn<SyncIO>` is a compile error, not a runtime error
- Follows the same policy pattern as `TcpServer<AcceptPolicy>` and
`TcpClient<ConnectPolicy>`
**Asio-style dual overloads:**
Send:
- `asyncSend(std::span<const uint8_t>)` — zero-copy, caller keeps
buffer alive until the future resolves
- `asyncSend(std::vector<uint8_t>)` — TcpConn takes ownership via move
Recv:
- `asyncRecv(std::span<uint8_t>)` — zero-copy into caller's
pre-allocated buffer; error if payload exceeds buffer size
- `asyncRecv()` — allocating recv, returns the payload vector
## Usage
```cpp
// Create async-capable connection
auto conn = TcpConn<AsyncIO>::create(sock, evb);
// Data path: zero-copy send (caller keeps buffer alive)
std::vector<uint8_t> data = ...;
auto f = conn->asyncSend(std::span<const uint8_t>(data));
auto result = f.get();
// Control path: TcpConn takes ownership
auto f = conn->asyncSend(std::vector<uint8_t>{'H', 'I'});
// Data path: zero-copy recv into pre-allocated buffer
std::vector<uint8_t> buf(4096);
auto f = conn->asyncRecv(std::span<uint8_t>(buf));
size_t n = f.get().value();
// Control path: allocating recv
auto f = conn->asyncRecv();
std::vector<uint8_t> payload = f.get().value();
```
## Implementation
- Length-prefixed framing (4-byte network-order header + payload)
- `asyncSendImpl` — shared dispatch for both send overloads; span is
re-pointed after the final move into `sendState` to prevent dangling
- `RecvState::setError`/`complete`/`target` — centralize span-vs-alloc
branching, eliminating scattered `isSpanMode` checks
- `trySend`/`tryRecv` — non-blocking I/O with tri-state return
(done / EAGAIN / error)
- Eager send/recv — `onSendReady`/`onRecvReady` called in the dispatch
lambda before registering with epoll, allowing small messages to
complete without an epoll round-trip
- `poisonConnection` — on protocol errors (oversized message, span too
small), shuts down the socket with `SHUT_RDWR` to prevent future I/O
on a desynchronized stream while keeping the fd valid for deferred
`unregisterFd`
- Destructor synchronizes with EventBase via `dispatchAndWait` + drain
to prevent use-after-free from IO callbacks capturing `this`
- One in-flight operation per direction; concurrent attempts return
`ResourceExhausted`
Differential Revision: D97788124
eab1211 to
9abdc00
Compare
rmahidhar
pushed a commit
to rmahidhar/torchcomms
that referenced
this pull request
Apr 13, 2026
Summary:
Refactor TcpConn into a policy-based template `TcpConn<IOPolicy>` and add
epoll-driven async send/recv for non-blocking data and control plane
messaging on the EventBase thread.
## Design
**Policy-based TcpConn:**
- `TcpConn<SyncIO>` — sync-only, no EventBase dependency, zero
overhead via `[[no_unique_address]]`
- `TcpConn<AsyncIO>` — sync + async, holds `EventBase&` and all async
state (`SendState`, `RecvState`) in the `AsyncIO` policy struct
- Async methods use C++20 `requires` clauses — calling async on a
`TcpConn<SyncIO>` is a compile error, not a runtime error
- Follows the same policy pattern as `TcpServer<AcceptPolicy>` and
`TcpClient<ConnectPolicy>`
**Asio-style dual overloads:**
Send:
- `asyncSend(std::span<const uint8_t>)` — zero-copy, caller keeps
buffer alive until the future resolves
- `asyncSend(std::vector<uint8_t>)` — TcpConn takes ownership via move
Recv:
- `asyncRecv(std::span<uint8_t>)` — zero-copy into caller's
pre-allocated buffer; error if payload exceeds buffer size
- `asyncRecv()` — allocating recv, returns the payload vector
## Usage
```cpp
// Create async-capable connection
auto conn = TcpConn<AsyncIO>::create(sock, evb);
// Data path: zero-copy send (caller keeps buffer alive)
std::vector<uint8_t> data = ...;
auto f = conn->asyncSend(std::span<const uint8_t>(data));
auto result = f.get();
// Control path: TcpConn takes ownership
auto f = conn->asyncSend(std::vector<uint8_t>{'H', 'I'});
// Data path: zero-copy recv into pre-allocated buffer
std::vector<uint8_t> buf(4096);
auto f = conn->asyncRecv(std::span<uint8_t>(buf));
size_t n = f.get().value();
// Control path: allocating recv
auto f = conn->asyncRecv();
std::vector<uint8_t> payload = f.get().value();
```
## Implementation
- Length-prefixed framing (4-byte network-order header + payload)
- `asyncSendImpl` — shared dispatch for both send overloads; span is
re-pointed after the final move into `sendState` to prevent dangling
- `RecvState::setError`/`complete`/`target` — centralize span-vs-alloc
branching, eliminating scattered `isSpanMode` checks
- `trySend`/`tryRecv` — non-blocking I/O with tri-state return
(done / EAGAIN / error)
- Eager send/recv — `onSendReady`/`onRecvReady` called in the dispatch
lambda before registering with epoll, allowing small messages to
complete without an epoll round-trip
- `poisonConnection` — on protocol errors (oversized message, span too
small), shuts down the socket with `SHUT_RDWR` to prevent future I/O
on a desynchronized stream while keeping the fd valid for deferred
`unregisterFd`
- Destructor synchronizes with EventBase via `dispatchAndWait` + drain
to prevent use-after-free from IO callbacks capturing `this`
- One in-flight operation per direction; concurrent attempts return
`ResourceExhausted`
Differential Revision: D97788124
rmahidhar
pushed a commit
to rmahidhar/torchcomms
that referenced
this pull request
Apr 13, 2026
Summary:
Refactor TcpConn into a policy-based template `TcpConn<IOPolicy>` and add
epoll-driven async send/recv for non-blocking data and control plane
messaging on the EventBase thread.
## Design
**Policy-based TcpConn:**
- `TcpConn<SyncIO>` — sync-only, no EventBase dependency, zero
overhead via `[[no_unique_address]]`
- `TcpConn<AsyncIO>` — sync + async, holds `EventBase&` and all async
state (`SendState`, `RecvState`) in the `AsyncIO` policy struct
- Async methods use C++20 `requires` clauses — calling async on a
`TcpConn<SyncIO>` is a compile error, not a runtime error
- Follows the same policy pattern as `TcpServer<AcceptPolicy>` and
`TcpClient<ConnectPolicy>`
**Asio-style dual overloads:**
Send:
- `asyncSend(std::span<const uint8_t>)` — zero-copy, caller keeps
buffer alive until the future resolves
- `asyncSend(std::vector<uint8_t>)` — TcpConn takes ownership via move
Recv:
- `asyncRecv(std::span<uint8_t>)` — zero-copy into caller's
pre-allocated buffer; error if payload exceeds buffer size
- `asyncRecv()` — allocating recv, returns the payload vector
## Usage
```cpp
// Create async-capable connection
auto conn = TcpConn<AsyncIO>::create(sock, evb);
// Data path: zero-copy send (caller keeps buffer alive)
std::vector<uint8_t> data = ...;
auto f = conn->asyncSend(std::span<const uint8_t>(data));
auto result = f.get();
// Control path: TcpConn takes ownership
auto f = conn->asyncSend(std::vector<uint8_t>{'H', 'I'});
// Data path: zero-copy recv into pre-allocated buffer
std::vector<uint8_t> buf(4096);
auto f = conn->asyncRecv(std::span<uint8_t>(buf));
size_t n = f.get().value();
// Control path: allocating recv
auto f = conn->asyncRecv();
std::vector<uint8_t> payload = f.get().value();
```
## Implementation
- Length-prefixed framing (4-byte network-order header + payload)
- `asyncSendImpl` — shared dispatch for both send overloads; span is
re-pointed after the final move into `sendState` to prevent dangling
- `RecvState::setError`/`complete`/`target` — centralize span-vs-alloc
branching, eliminating scattered `isSpanMode` checks
- `trySend`/`tryRecv` — non-blocking I/O with tri-state return
(done / EAGAIN / error)
- Eager send/recv — `onSendReady`/`onRecvReady` called in the dispatch
lambda before registering with epoll, allowing small messages to
complete without an epoll round-trip
- `poisonConnection` — on protocol errors (oversized message, span too
small), shuts down the socket with `SHUT_RDWR` to prevent future I/O
on a desynchronized stream while keeping the fd valid for deferred
`unregisterFd`
- Destructor synchronizes with EventBase via `dispatchAndWait` + drain
to prevent use-after-free from IO callbacks capturing `this`
- One in-flight operation per direction; concurrent attempts return
`ResourceExhausted`
Differential Revision: D97788124
9abdc00 to
805b3e1
Compare
Summary: Pull Request resolved: meta-pytorch#2056 Refactor TcpConn into a policy-based template `TcpConn<IOPolicy>` and add epoll-driven async send/recv for non-blocking data and control plane messaging on the EventBase thread. ## Design **Policy-based TcpConn:** - `TcpConn<SyncIO>` — sync-only, no EventBase dependency, zero overhead via `[[no_unique_address]]` - `TcpConn<AsyncIO>` — sync + async, holds `EventBase&` and all async state (`SendState`, `RecvState`) in the `AsyncIO` policy struct - Async methods use C++20 `requires` clauses — calling async on a `TcpConn<SyncIO>` is a compile error, not a runtime error - Follows the same policy pattern as `TcpServer<AcceptPolicy>` and `TcpClient<ConnectPolicy>` **Asio-style dual overloads:** Send: - `asyncSend(std::span<const uint8_t>)` — zero-copy, caller keeps buffer alive until the future resolves - `asyncSend(std::vector<uint8_t>)` — TcpConn takes ownership via move Recv: - `asyncRecv(std::span<uint8_t>)` — zero-copy into caller's pre-allocated buffer; error if payload exceeds buffer size - `asyncRecv()` — allocating recv, returns the payload vector ## Usage ```cpp // Create async-capable connection auto conn = TcpConn<AsyncIO>::create(sock, evb); // Data path: zero-copy send (caller keeps buffer alive) std::vector<uint8_t> data = ...; auto f = conn->asyncSend(std::span<const uint8_t>(data)); auto result = f.get(); // Control path: TcpConn takes ownership auto f = conn->asyncSend(std::vector<uint8_t>{'H', 'I'}); // Data path: zero-copy recv into pre-allocated buffer std::vector<uint8_t> buf(4096); auto f = conn->asyncRecv(std::span<uint8_t>(buf)); size_t n = f.get().value(); // Control path: allocating recv auto f = conn->asyncRecv(); std::vector<uint8_t> payload = f.get().value(); ``` ## Implementation - Length-prefixed framing (4-byte network-order header + payload) - `asyncSendImpl` — shared dispatch for both send overloads; span is re-pointed after the final move into `sendState` to prevent dangling - `RecvState::setError`/`complete`/`target` — centralize span-vs-alloc branching, eliminating scattered `isSpanMode` checks - `trySend`/`tryRecv` — non-blocking I/O with tri-state return (done / EAGAIN / error) - Eager send/recv — `onSendReady`/`onRecvReady` called in the dispatch lambda before registering with epoll, allowing small messages to complete without an epoll round-trip - `poisonConnection` — on protocol errors (oversized message, span too small), shuts down the socket with `SHUT_RDWR` to prevent future I/O on a desynchronized stream while keeping the fd valid for deferred `unregisterFd` - Destructor synchronizes with EventBase via `dispatchAndWait` + drain to prevent use-after-free from IO callbacks capturing `this` - One in-flight operation per direction; concurrent attempts return `ResourceExhausted` Differential Revision: D97788124
805b3e1 to
87be75b
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary:
Refactor TcpConn into a policy-based template
TcpConn<IOPolicy>and addepoll-driven async send/recv for non-blocking data and control plane
messaging on the EventBase thread.
Design
Policy-based TcpConn:
TcpConn<SyncIO>— sync-only, no EventBase dependency, zerooverhead via
[[no_unique_address]]TcpConn<AsyncIO>— sync + async, holdsEventBase&and all asyncstate (
SendState,RecvState) in theAsyncIOpolicy structrequiresclauses — calling async on aTcpConn<SyncIO>is a compile error, not a runtime errorTcpServer<AcceptPolicy>andTcpClient<ConnectPolicy>Asio-style dual overloads:
Send:
asyncSend(std::span<const uint8_t>)— zero-copy, caller keepsbuffer alive until the future resolves
asyncSend(std::vector<uint8_t>)— TcpConn takes ownership via moveRecv:
asyncRecv(std::span<uint8_t>)— zero-copy into caller'spre-allocated buffer; error if payload exceeds buffer size
asyncRecv()— allocating recv, returns the payload vectorUsage
Implementation
asyncSendImpl— shared dispatch for both send overloads; span isre-pointed after the final move into
sendStateto prevent danglingRecvState::setError/complete/target— centralize span-vs-allocbranching, eliminating scattered
isSpanModecheckstrySend/tryRecv— non-blocking I/O with tri-state return(done / EAGAIN / error)
onSendReady/onRecvReadycalled in the dispatchlambda before registering with epoll, allowing small messages to
complete without an epoll round-trip
poisonConnection— on protocol errors (oversized message, span toosmall), shuts down the socket with
SHUT_RDWRto prevent future I/Oon a desynchronized stream while keeping the fd valid for deferred
unregisterFddispatchAndWait+ drainto prevent use-after-free from IO callbacks capturing
thisResourceExhaustedDifferential Revision: D97788124