Skip to content

Add non-blocking async COPY FROM support (pg_putcopydata_async, pg_putcopyend_async, pg_flush)#176

Open
jjn1056 wants to merge 5 commits intobucardo:masterfrom
jjn1056:async-copy-from
Open

Add non-blocking async COPY FROM support (pg_putcopydata_async, pg_putcopyend_async, pg_flush)#176
jjn1056 wants to merge 5 commits intobucardo:masterfrom
jjn1056:async-copy-from

Conversation

@jjn1056
Copy link

@jjn1056 jjn1056 commented Mar 18, 2026

Closes #177

Summary

Adds three new methods to enable non-blocking COPY FROM STDIN for async Perl libraries:

  • pg_putcopydata_async — non-blocking version of pg_putcopydata. Returns 1/0/-1 matching PQputCopyData semantics.
  • pg_putcopyend_async — non-blocking version of pg_putcopyend. Polls for server result without blocking.
  • pg_flush — exposes PQflush for manual output buffer management. Returns 0/1/-1 matching PQflush semantics.

Motivation

DBD::Pg has async support for queries (pg_async/PG_ASYNC) and async COPY TO (pg_getcopydata_async), but COPY FROM is unconditionally blocking. pg_putcopydata blocks inside PQputCopyData when the TCP send buffer fills because DBD::Pg never calls PQsetnonblocking.

This prevents async Perl libraries (Future::IO-based, EV::Pg-style, IO::Async) from performing non-blocking bulk data loading. COPY is the standard PostgreSQL mechanism for bulk import and is dramatically faster than multi-row INSERT.

Approach

The implementation follows the pattern already established by pg_getcopydata/pg_getcopydata_async:

  • pg_db_putcopydata gains an async flag parameter (0 for blocking, 1 for async)
  • The blocking path (async=0) is unchanged — existing pg_putcopydata behavior is identical
  • The async path calls PQsetnonblocking(conn, 1) on first use, scoped to COPY state (safe because no other operations are permitted during COPY)
  • PQsetnonblocking is restored to 0 when pg_putcopyend_async completes
  • Return values match libpq conventions exactly (verified against EV::Pg and libpq docs)

Return value conventions

pg_putcopydata_async: 1 = queued, 0 = buffer full (retry), -1 = error
pg_putcopyend_async: 1 = done, 0 = not ready (poll), -1 = error
pg_flush: 0 = flushed, 1 = pending (poll), -1 = error

After pg_putcopydata_async returns 1, the caller calls pg_flush to push data to the server. This matches the standard libpq pattern: PQputCopyData queues, PQflush sends.

Implementation details

  • Pg.h: Added TRACE_PQFLUSH and TRACE_PQSETNONBLOCKING macros
  • dbdimp.h: Added copy_nonblocking field to imp_dbh_t, new function declarations
  • dbdimp.c: Modified pg_db_putcopydata to accept async flag, added pg_db_putcopyend_async (uses copystate=-1 sentinel for "end sent, awaiting result" phase), added pg_db_flush
  • Pg.xs: Added XS bindings following the getcopydata/getcopydata_async pattern
  • Pg.pm: Added install_method calls and POD documentation with examples

Tests

27 new test assertions in t/07copy.t covering:

  • Basic async put/flush/end cycle with data verification
  • pg_flush return values
  • Wrong-state errors (not in COPY, COPY OUT state, no argument)
  • do() rejection during async COPY IN
  • Recovery after interrupted COPY
  • Binary COPY round-trip via async methods
  • Multiple async COPY cycles on the same connection (PQsetnonblocking toggling)
  • Large data set (1000 rows) with buffer-full handling
  • Backward compatibility (blocking methods still work after async used)

Full test suite passes: 3,773 tests, 0 failures, 0 regressions.

Backward compatibility

Zero risk. The blocking pg_putcopydata is unchanged — the XS wrapper passes async=0. No existing behavior is affected. New methods are purely additive.

jjn1056 and others added 5 commits March 17, 2026 14:24
Add pg_putcopydata_async, pg_putcopyend_async, and pg_flush methods to
enable non-blocking COPY FROM STDIN for async Perl libraries.

pg_putcopydata_async enables PQsetnonblocking on the connection (safe
during COPY state since no other operations are permitted), then uses
PQflush to manage the output buffer. Returns 0 when buffer is full or
2 when flush is pending, allowing the caller to poll the socket and
retry without blocking the event loop.

pg_putcopyend_async sends PQputCopyEnd and polls for the server result
using PQconsumeInput/PQisBusy. Uses copystate=-1 as a sentinel to
track the "end sent, awaiting result" phase across retries. Restores
blocking mode automatically on completion.

pg_flush exposes PQflush for callers that need to complete a pending
flush between putcopydata_async or putcopyend_async calls.

The blocking pg_putcopydata is unchanged (passes async=0 internally).
All 3,763 existing tests continue to pass with zero regressions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Drop the invented return value 2 from pg_putcopydata_async and
pg_putcopyend_async. The interface now matches libpq and EV::Pg:

pg_putcopydata_async: 1 (queued), 0 (buffer full), -1 (error)
pg_putcopyend_async:  1 (done), 0 (not ready), -1 (error)
pg_flush:             0 (flushed), 1 (pending), -1 (error)

After pg_putcopydata_async returns 1, caller calls pg_flush to push
data to the server. This is the standard libpq pattern: PQputCopyData
queues, PQflush sends. No DBD::Pg-specific protocol to learn.

pg_putcopydata_async no longer calls PQflush internally for async
mode; that responsibility moves to pg_flush. The COPY_BOTH path
(logical replication) still auto-flushes as before.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Show simple and robust usage patterns with real data (matching the
existing pg_putcopydata pizza examples), IO::Select for socket
polling, explicit column lists, and the full pg_putcopyend_async
poll loop. Add a note about COPY text format and alternatives.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- pg_putcopydata_async fails in COPY OUT state
- pg_putcopydata_async fails with no argument
- do() fails during async COPY IN (mirrors blocking test)
- Recovery after rude non-COPY attempt during async COPY
- Binary COPY round-trip via async methods
- Multiple async COPY cycles on the same connection

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Make explicit that the connection itself is restricted to COPY
operations, but the non-blocking methods let the event loop service
other connections and tasks between calls.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@esabol
Copy link
Collaborator

esabol commented Mar 18, 2026

Spelling and perlcritic test failures. Set AUTHOR_TESTING=1 and RELEASE_TESTING=1 to reproduce and revise.

Does this PR completely supersede the functionality (and tests) in PR #163 ? If so, I'll close that issue.


# Binary COPY with async methods

$dbh->do('CREATE TEMP TABLE binarycopy_async AS SELECT 1::INTEGER AS x');
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's just a temp table, but I suggest prefixing the table name with dbd_pg_ anyway.

##

my $async_table = 'dbd_pg_test_async_copy';
$dbh->do(qq{CREATE TABLE $async_table(id integer, name text)});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test table creation (and teardown) should probably be added to t/dbdpg_test_setup.pl or use one of the existing test tables, if feasible.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add non-blocking async COPY FROM STDIN support

2 participants