Skip to content

Add async send/recv to TcpConn (#2056)#2056

Open
rmahidhar wants to merge 3 commits intometa-pytorch:mainfrom
rmahidhar:export-D97788124
Open

Add async send/recv to TcpConn (#2056)#2056
rmahidhar wants to merge 3 commits intometa-pytorch:mainfrom
rmahidhar:export-D97788124

Conversation

@rmahidhar
Copy link
Copy Markdown

@rmahidhar rmahidhar commented Apr 13, 2026

Summary:

Refactor TcpConn into a policy-based template TcpConn<IOPolicy> and add
epoll-driven async send/recv for non-blocking data and control plane
messaging on the EventBase thread.

Design

Policy-based TcpConn:

  • TcpConn<SyncIO> — sync-only, no EventBase dependency, zero
    overhead via [[no_unique_address]]
  • TcpConn<AsyncIO> — sync + async, holds EventBase& and all async
    state (SendState, RecvState) in the AsyncIO policy struct
  • Async methods use C++20 requires clauses — calling async on a
    TcpConn<SyncIO> is a compile error, not a runtime error
  • Follows the same policy pattern as TcpServer<AcceptPolicy> and
    TcpClient<ConnectPolicy>

Asio-style dual overloads:

Send:

  • asyncSend(std::span<const uint8_t>) — zero-copy, caller keeps
    buffer alive until the future resolves
  • asyncSend(std::vector<uint8_t>) — TcpConn takes ownership via move

Recv:

  • asyncRecv(std::span<uint8_t>) — zero-copy into caller's
    pre-allocated buffer; error if payload exceeds buffer size
  • asyncRecv() — allocating recv, returns the payload vector

Usage

// Create async-capable connection
auto conn = TcpConn<AsyncIO>::create(sock, evb);

// Data path: zero-copy send (caller keeps buffer alive)
std::vector<uint8_t> data = ...;
auto f = conn->asyncSend(std::span<const uint8_t>(data));
auto result = f.get();

// Control path: TcpConn takes ownership
auto f = conn->asyncSend(std::vector<uint8_t>{'H', 'I'});

// Data path: zero-copy recv into pre-allocated buffer
std::vector<uint8_t> buf(4096);
auto f = conn->asyncRecv(std::span<uint8_t>(buf));
size_t n = f.get().value();

// Control path: allocating recv
auto f = conn->asyncRecv();
std::vector<uint8_t> payload = f.get().value();

Implementation

  • Length-prefixed framing (4-byte network-order header + payload)
  • asyncSendImpl — shared dispatch for both send overloads; span is
    re-pointed after the final move into sendState to prevent dangling
  • RecvState::setError/complete/target — centralize span-vs-alloc
    branching, eliminating scattered isSpanMode checks
  • trySend/tryRecv — non-blocking I/O with tri-state return
    (done / EAGAIN / error)
  • Eager send/recv — onSendReady/onRecvReady called in the dispatch
    lambda before registering with epoll, allowing small messages to
    complete without an epoll round-trip
  • poisonConnection — on protocol errors (oversized message, span too
    small), shuts down the socket with SHUT_RDWR to prevent future I/O
    on a desynchronized stream while keeping the fd valid for deferred
    unregisterFd
  • Destructor synchronizes with EventBase via dispatchAndWait + drain
    to prevent use-after-free from IO callbacks capturing this
  • One in-flight operation per direction; concurrent attempts return
    ResourceExhausted

Differential Revision: D97788124

@meta-cla meta-cla bot added the CLA Signed This label is managed by the Meta Open Source bot. label Apr 13, 2026
@meta-codesync
Copy link
Copy Markdown
Contributor

meta-codesync bot commented Apr 13, 2026

@rmahidhar has exported this pull request. If you are a Meta employee, you can view the originating Diff in D97788124.

Mahidhar Ramesh Rajala added 2 commits April 13, 2026 12:00
Summary:
Refactor TcpServer from a monolithic class (mixed sync/async via runtime checks)
into a policy-based template `TcpServer<AcceptPolicy>` with compile-time dispatch.

**Motivation:**
The original TcpServer combined sync and async modes in a single class:
- `evb_` was a nullable pointer with null checks scattered throughout
- `asyncAccept()` on a sync server silently returned nullptr (runtime error)
- `accept()` on an async server silently returned nullptr (runtime error)
- `shutdown()` had 3 if/else branches mixing sync and async code paths

**Design: Policy-based (chosen over inheritance and composition)**
- 1 template + N policy structs vs 3 classes for inheritance/composition
- Zero-cost: compiler sees concrete policy type, can inline through policy calls
- Compile-time safety: wrong mode is a compile error, not a runtime nullptr
- Extensible: adding IoUringAccept is one new struct, no existing code modified

**Accept policies:**
- `SyncAccept` — blocks calling thread, returns ready future. sizeof == 1.
- `AsyncAccept(EventBase&)` — non-blocking via fd-watching, returns pending future.
  Reference (not pointer) — forgetting EventBase is a compile error.

**Interface change:**
`Server::accept()` returns `std::future<std::unique_ptr<Conn>>` (was `std::unique_ptr<Conn>`).
Both policies return futures; the policy determines how the future is fulfilled:
- SyncAccept: blocks, then returns ready future (`.get()` is instant — 2 atomic ops)
- AsyncAccept: returns immediately, `.get()` blocks until EventBase resolves

**Usage:**
```
// Sync:
TcpServer<SyncAccept> server("host:0");
auto conn = server.accept().get();

// Async:
TcpServer<AsyncAccept> server("host:0", 5, evb);
auto future = server.accept();
auto conn = future.get();
```

Detailed design: fbcode/comms/fb/uniflow/controller/PolicyBasedTcpDesign.md

Differential Revision: D96984108
Summary:
Replace monolithic TcpClient with policy-based `TcpClient<ConnectPolicy>` template,
mirroring the `TcpServer<AcceptPolicy>` pattern from D96984108.

## Design

Two connect policies, selected at compile time:

| Policy | Mechanism | Retry | EventBase |
|--------|-----------|-------|-----------|
| `SyncConnect` | Blocking `connect()` with linear backoff | Yes (`numRetries` × `retryTimeout`) | No |
| `AsyncConnect` | Non-blocking `connect()` + EPOLLOUT via EventBase | No (single attempt) | Required |

Both policies produce `std::future<std::unique_ptr<Conn>>` — the caller doesn't need
to know whether the connect was sync or async.

`AsyncConnect` uses `EPOLLONESHOT` — the fd is automatically deregistered after the
connect event fires, so no explicit `unregisterFd` is needed and there are no
deferred-close concerns.

## Usage

```cpp
// Sync: blocking connect with retries
TcpClient<SyncConnect> client(/*retries=*/10, /*timeout=*/1000ms);
auto conn = client.connect("host:port").get();

// Async: non-blocking connect via EventBase
TcpClient<AsyncConnect> client(/*retries=*/10, /*timeout=*/1000ms, evb);
auto future = client.connect("host:port");
// ... future resolves when connect + handshake complete
auto conn = future.get();
```

## Implementation

**Shared helpers** (anonymous namespace, used by both policies):
- `resolveConnectAddr()` — parses host:port, detects address family, builds sockaddr
- `finishConnect()` — configures socket options + magic handshake (produces `TcpConn<SyncIO>`)
- `completeAsyncConnect()` — restores blocking mode, sets handshake timeout, calls `finishConnect()`
- `FdGuard` — RAII guard that closes fd on destruction unless `release()`d
- `initiateConnect()` — resolves address, creates non-blocking socket, calls `connect()`; returns `{fd, connected}` pair

**AsyncConnect flow:**
1. `initiateConnect()` creates a `SOCK_NONBLOCK` socket and calls `connect()`
2. If `connect()` returns 0 (immediate), calls `completeAsyncConnect()` directly
3. If `EINPROGRESS`, wraps fd in `ConnectState` (shared_ptr for `registerFd` callback copyability), registers `EPOLLOUT | EPOLLONESHOT` on EventBase
4. On EPOLLOUT: checks `SO_ERROR`, releases `FdGuard`, calls `completeAsyncConnect()`
5. On failure: `FdGuard` destructor closes the fd, promise resolves to `nullptr`

**Interface change:** `Client::connect()` return type changed from `std::unique_ptr<Conn>` to `std::future<std::unique_ptr<Conn>>` to support async connect without blocking the caller.

Differential Revision: D97703353
@meta-codesync meta-codesync bot changed the title Add async send/recv to TcpConn Add async send/recv to TcpConn (#2056) Apr 13, 2026
rmahidhar pushed a commit to rmahidhar/torchcomms that referenced this pull request Apr 13, 2026
Summary:

Refactor TcpConn into a policy-based template `TcpConn<IOPolicy>` and add
epoll-driven async send/recv for non-blocking data and control plane
messaging on the EventBase thread.

## Design

**Policy-based TcpConn:**
- `TcpConn<SyncIO>` — sync-only, no EventBase dependency, zero
  overhead via `[[no_unique_address]]`
- `TcpConn<AsyncIO>` — sync + async, holds `EventBase&` and all async
  state (`SendState`, `RecvState`) in the `AsyncIO` policy struct
- Async methods use C++20 `requires` clauses — calling async on a
  `TcpConn<SyncIO>` is a compile error, not a runtime error
- Follows the same policy pattern as `TcpServer<AcceptPolicy>` and
  `TcpClient<ConnectPolicy>`

**Asio-style dual overloads:**

Send:
- `asyncSend(std::span<const uint8_t>)` — zero-copy, caller keeps
  buffer alive until the future resolves
- `asyncSend(std::vector<uint8_t>)` — TcpConn takes ownership via move

Recv:
- `asyncRecv(std::span<uint8_t>)` — zero-copy into caller's
  pre-allocated buffer; error if payload exceeds buffer size
- `asyncRecv()` — allocating recv, returns the payload vector

## Usage

```cpp
// Create async-capable connection
auto conn = TcpConn<AsyncIO>::create(sock, evb);

// Data path: zero-copy send (caller keeps buffer alive)
std::vector<uint8_t> data = ...;
auto f = conn->asyncSend(std::span<const uint8_t>(data));
auto result = f.get();

// Control path: TcpConn takes ownership
auto f = conn->asyncSend(std::vector<uint8_t>{'H', 'I'});

// Data path: zero-copy recv into pre-allocated buffer
std::vector<uint8_t> buf(4096);
auto f = conn->asyncRecv(std::span<uint8_t>(buf));
size_t n = f.get().value();

// Control path: allocating recv
auto f = conn->asyncRecv();
std::vector<uint8_t> payload = f.get().value();
```

## Implementation

- Length-prefixed framing (4-byte network-order header + payload)
- `asyncSendImpl` — shared dispatch for both send overloads; span is
  re-pointed after the final move into `sendState` to prevent dangling
- `RecvState::setError`/`complete`/`target` — centralize span-vs-alloc
  branching, eliminating scattered `isSpanMode` checks
- `trySend`/`tryRecv` — non-blocking I/O with tri-state return
  (done / EAGAIN / error)
- Eager send/recv — `onSendReady`/`onRecvReady` called in the dispatch
  lambda before registering with epoll, allowing small messages to
  complete without an epoll round-trip
- `poisonConnection` — on protocol errors (oversized message, span too
  small), shuts down the socket with `SHUT_RDWR` to prevent future I/O
  on a desynchronized stream while keeping the fd valid for deferred
  `unregisterFd`
- Destructor synchronizes with EventBase via `dispatchAndWait` + drain
  to prevent use-after-free from IO callbacks capturing `this`
- One in-flight operation per direction; concurrent attempts return
  `ResourceExhausted`

Differential Revision: D97788124
rmahidhar pushed a commit to rmahidhar/torchcomms that referenced this pull request Apr 13, 2026
Summary:

Refactor TcpConn into a policy-based template `TcpConn<IOPolicy>` and add
epoll-driven async send/recv for non-blocking data and control plane
messaging on the EventBase thread.

## Design

**Policy-based TcpConn:**
- `TcpConn<SyncIO>` — sync-only, no EventBase dependency, zero
  overhead via `[[no_unique_address]]`
- `TcpConn<AsyncIO>` — sync + async, holds `EventBase&` and all async
  state (`SendState`, `RecvState`) in the `AsyncIO` policy struct
- Async methods use C++20 `requires` clauses — calling async on a
  `TcpConn<SyncIO>` is a compile error, not a runtime error
- Follows the same policy pattern as `TcpServer<AcceptPolicy>` and
  `TcpClient<ConnectPolicy>`

**Asio-style dual overloads:**

Send:
- `asyncSend(std::span<const uint8_t>)` — zero-copy, caller keeps
  buffer alive until the future resolves
- `asyncSend(std::vector<uint8_t>)` — TcpConn takes ownership via move

Recv:
- `asyncRecv(std::span<uint8_t>)` — zero-copy into caller's
  pre-allocated buffer; error if payload exceeds buffer size
- `asyncRecv()` — allocating recv, returns the payload vector

## Usage

```cpp
// Create async-capable connection
auto conn = TcpConn<AsyncIO>::create(sock, evb);

// Data path: zero-copy send (caller keeps buffer alive)
std::vector<uint8_t> data = ...;
auto f = conn->asyncSend(std::span<const uint8_t>(data));
auto result = f.get();

// Control path: TcpConn takes ownership
auto f = conn->asyncSend(std::vector<uint8_t>{'H', 'I'});

// Data path: zero-copy recv into pre-allocated buffer
std::vector<uint8_t> buf(4096);
auto f = conn->asyncRecv(std::span<uint8_t>(buf));
size_t n = f.get().value();

// Control path: allocating recv
auto f = conn->asyncRecv();
std::vector<uint8_t> payload = f.get().value();
```

## Implementation

- Length-prefixed framing (4-byte network-order header + payload)
- `asyncSendImpl` — shared dispatch for both send overloads; span is
  re-pointed after the final move into `sendState` to prevent dangling
- `RecvState::setError`/`complete`/`target` — centralize span-vs-alloc
  branching, eliminating scattered `isSpanMode` checks
- `trySend`/`tryRecv` — non-blocking I/O with tri-state return
  (done / EAGAIN / error)
- Eager send/recv — `onSendReady`/`onRecvReady` called in the dispatch
  lambda before registering with epoll, allowing small messages to
  complete without an epoll round-trip
- `poisonConnection` — on protocol errors (oversized message, span too
  small), shuts down the socket with `SHUT_RDWR` to prevent future I/O
  on a desynchronized stream while keeping the fd valid for deferred
  `unregisterFd`
- Destructor synchronizes with EventBase via `dispatchAndWait` + drain
  to prevent use-after-free from IO callbacks capturing `this`
- One in-flight operation per direction; concurrent attempts return
  `ResourceExhausted`

Differential Revision: D97788124
rmahidhar pushed a commit to rmahidhar/torchcomms that referenced this pull request Apr 13, 2026
Summary:

Refactor TcpConn into a policy-based template `TcpConn<IOPolicy>` and add
epoll-driven async send/recv for non-blocking data and control plane
messaging on the EventBase thread.

## Design

**Policy-based TcpConn:**
- `TcpConn<SyncIO>` — sync-only, no EventBase dependency, zero
  overhead via `[[no_unique_address]]`
- `TcpConn<AsyncIO>` — sync + async, holds `EventBase&` and all async
  state (`SendState`, `RecvState`) in the `AsyncIO` policy struct
- Async methods use C++20 `requires` clauses — calling async on a
  `TcpConn<SyncIO>` is a compile error, not a runtime error
- Follows the same policy pattern as `TcpServer<AcceptPolicy>` and
  `TcpClient<ConnectPolicy>`

**Asio-style dual overloads:**

Send:
- `asyncSend(std::span<const uint8_t>)` — zero-copy, caller keeps
  buffer alive until the future resolves
- `asyncSend(std::vector<uint8_t>)` — TcpConn takes ownership via move

Recv:
- `asyncRecv(std::span<uint8_t>)` — zero-copy into caller's
  pre-allocated buffer; error if payload exceeds buffer size
- `asyncRecv()` — allocating recv, returns the payload vector

## Usage

```cpp
// Create async-capable connection
auto conn = TcpConn<AsyncIO>::create(sock, evb);

// Data path: zero-copy send (caller keeps buffer alive)
std::vector<uint8_t> data = ...;
auto f = conn->asyncSend(std::span<const uint8_t>(data));
auto result = f.get();

// Control path: TcpConn takes ownership
auto f = conn->asyncSend(std::vector<uint8_t>{'H', 'I'});

// Data path: zero-copy recv into pre-allocated buffer
std::vector<uint8_t> buf(4096);
auto f = conn->asyncRecv(std::span<uint8_t>(buf));
size_t n = f.get().value();

// Control path: allocating recv
auto f = conn->asyncRecv();
std::vector<uint8_t> payload = f.get().value();
```

## Implementation

- Length-prefixed framing (4-byte network-order header + payload)
- `asyncSendImpl` — shared dispatch for both send overloads; span is
  re-pointed after the final move into `sendState` to prevent dangling
- `RecvState::setError`/`complete`/`target` — centralize span-vs-alloc
  branching, eliminating scattered `isSpanMode` checks
- `trySend`/`tryRecv` — non-blocking I/O with tri-state return
  (done / EAGAIN / error)
- Eager send/recv — `onSendReady`/`onRecvReady` called in the dispatch
  lambda before registering with epoll, allowing small messages to
  complete without an epoll round-trip
- `poisonConnection` — on protocol errors (oversized message, span too
  small), shuts down the socket with `SHUT_RDWR` to prevent future I/O
  on a desynchronized stream while keeping the fd valid for deferred
  `unregisterFd`
- Destructor synchronizes with EventBase via `dispatchAndWait` + drain
  to prevent use-after-free from IO callbacks capturing `this`
- One in-flight operation per direction; concurrent attempts return
  `ResourceExhausted`

Differential Revision: D97788124
Summary:
Pull Request resolved: meta-pytorch#2056

Refactor TcpConn into a policy-based template `TcpConn<IOPolicy>` and add
epoll-driven async send/recv for non-blocking data and control plane
messaging on the EventBase thread.

## Design

**Policy-based TcpConn:**
- `TcpConn<SyncIO>` — sync-only, no EventBase dependency, zero
  overhead via `[[no_unique_address]]`
- `TcpConn<AsyncIO>` — sync + async, holds `EventBase&` and all async
  state (`SendState`, `RecvState`) in the `AsyncIO` policy struct
- Async methods use C++20 `requires` clauses — calling async on a
  `TcpConn<SyncIO>` is a compile error, not a runtime error
- Follows the same policy pattern as `TcpServer<AcceptPolicy>` and
  `TcpClient<ConnectPolicy>`

**Asio-style dual overloads:**

Send:
- `asyncSend(std::span<const uint8_t>)` — zero-copy, caller keeps
  buffer alive until the future resolves
- `asyncSend(std::vector<uint8_t>)` — TcpConn takes ownership via move

Recv:
- `asyncRecv(std::span<uint8_t>)` — zero-copy into caller's
  pre-allocated buffer; error if payload exceeds buffer size
- `asyncRecv()` — allocating recv, returns the payload vector

## Usage

```cpp
// Create async-capable connection
auto conn = TcpConn<AsyncIO>::create(sock, evb);

// Data path: zero-copy send (caller keeps buffer alive)
std::vector<uint8_t> data = ...;
auto f = conn->asyncSend(std::span<const uint8_t>(data));
auto result = f.get();

// Control path: TcpConn takes ownership
auto f = conn->asyncSend(std::vector<uint8_t>{'H', 'I'});

// Data path: zero-copy recv into pre-allocated buffer
std::vector<uint8_t> buf(4096);
auto f = conn->asyncRecv(std::span<uint8_t>(buf));
size_t n = f.get().value();

// Control path: allocating recv
auto f = conn->asyncRecv();
std::vector<uint8_t> payload = f.get().value();
```

## Implementation

- Length-prefixed framing (4-byte network-order header + payload)
- `asyncSendImpl` — shared dispatch for both send overloads; span is
  re-pointed after the final move into `sendState` to prevent dangling
- `RecvState::setError`/`complete`/`target` — centralize span-vs-alloc
  branching, eliminating scattered `isSpanMode` checks
- `trySend`/`tryRecv` — non-blocking I/O with tri-state return
  (done / EAGAIN / error)
- Eager send/recv — `onSendReady`/`onRecvReady` called in the dispatch
  lambda before registering with epoll, allowing small messages to
  complete without an epoll round-trip
- `poisonConnection` — on protocol errors (oversized message, span too
  small), shuts down the socket with `SHUT_RDWR` to prevent future I/O
  on a desynchronized stream while keeping the fd valid for deferred
  `unregisterFd`
- Destructor synchronizes with EventBase via `dispatchAndWait` + drain
  to prevent use-after-free from IO callbacks capturing `this`
- One in-flight operation per direction; concurrent attempts return
  `ResourceExhausted`

Differential Revision: D97788124
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Meta Open Source bot. fb-exported meta-exported

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant