Add non-blocking async COPY FROM support (pg_putcopydata_async, pg_putcopyend_async, pg_flush)#176
Open
jjn1056 wants to merge 5 commits intobucardo:masterfrom
Open
Add non-blocking async COPY FROM support (pg_putcopydata_async, pg_putcopyend_async, pg_flush)#176jjn1056 wants to merge 5 commits intobucardo:masterfrom
jjn1056 wants to merge 5 commits intobucardo:masterfrom
Conversation
Add pg_putcopydata_async, pg_putcopyend_async, and pg_flush methods to enable non-blocking COPY FROM STDIN for async Perl libraries. pg_putcopydata_async enables PQsetnonblocking on the connection (safe during COPY state since no other operations are permitted), then uses PQflush to manage the output buffer. Returns 0 when buffer is full or 2 when flush is pending, allowing the caller to poll the socket and retry without blocking the event loop. pg_putcopyend_async sends PQputCopyEnd and polls for the server result using PQconsumeInput/PQisBusy. Uses copystate=-1 as a sentinel to track the "end sent, awaiting result" phase across retries. Restores blocking mode automatically on completion. pg_flush exposes PQflush for callers that need to complete a pending flush between putcopydata_async or putcopyend_async calls. The blocking pg_putcopydata is unchanged (passes async=0 internally). All 3,763 existing tests continue to pass with zero regressions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Drop the invented return value 2 from pg_putcopydata_async and pg_putcopyend_async. The interface now matches libpq and EV::Pg: pg_putcopydata_async: 1 (queued), 0 (buffer full), -1 (error) pg_putcopyend_async: 1 (done), 0 (not ready), -1 (error) pg_flush: 0 (flushed), 1 (pending), -1 (error) After pg_putcopydata_async returns 1, caller calls pg_flush to push data to the server. This is the standard libpq pattern: PQputCopyData queues, PQflush sends. No DBD::Pg-specific protocol to learn. pg_putcopydata_async no longer calls PQflush internally for async mode; that responsibility moves to pg_flush. The COPY_BOTH path (logical replication) still auto-flushes as before. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Show simple and robust usage patterns with real data (matching the existing pg_putcopydata pizza examples), IO::Select for socket polling, explicit column lists, and the full pg_putcopyend_async poll loop. Add a note about COPY text format and alternatives. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- pg_putcopydata_async fails in COPY OUT state - pg_putcopydata_async fails with no argument - do() fails during async COPY IN (mirrors blocking test) - Recovery after rude non-COPY attempt during async COPY - Binary COPY round-trip via async methods - Multiple async COPY cycles on the same connection Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Make explicit that the connection itself is restricted to COPY operations, but the non-blocking methods let the event loop service other connections and tasks between calls. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Collaborator
|
Spelling and perlcritic test failures. Set AUTHOR_TESTING=1 and RELEASE_TESTING=1 to reproduce and revise. Does this PR completely supersede the functionality (and tests) in PR #163 ? If so, I'll close that issue. |
esabol
reviewed
Mar 18, 2026
|
|
||
| # Binary COPY with async methods | ||
|
|
||
| $dbh->do('CREATE TEMP TABLE binarycopy_async AS SELECT 1::INTEGER AS x'); |
Collaborator
There was a problem hiding this comment.
I know it's just a temp table, but I suggest prefixing the table name with dbd_pg_ anyway.
| ## | ||
|
|
||
| my $async_table = 'dbd_pg_test_async_copy'; | ||
| $dbh->do(qq{CREATE TABLE $async_table(id integer, name text)}); |
Collaborator
There was a problem hiding this comment.
Test table creation (and teardown) should probably be added to t/dbdpg_test_setup.pl or use one of the existing test tables, if feasible.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #177
Summary
Adds three new methods to enable non-blocking COPY FROM STDIN for async Perl libraries:
Motivation
DBD::Pg has async support for queries (pg_async/PG_ASYNC) and async COPY TO (pg_getcopydata_async), but COPY FROM is unconditionally blocking. pg_putcopydata blocks inside PQputCopyData when the TCP send buffer fills because DBD::Pg never calls PQsetnonblocking.
This prevents async Perl libraries (Future::IO-based, EV::Pg-style, IO::Async) from performing non-blocking bulk data loading. COPY is the standard PostgreSQL mechanism for bulk import and is dramatically faster than multi-row INSERT.
Approach
The implementation follows the pattern already established by pg_getcopydata/pg_getcopydata_async:
asyncflag parameter (0 for blocking, 1 for async)Return value conventions
pg_putcopydata_async: 1 = queued, 0 = buffer full (retry), -1 = error
pg_putcopyend_async: 1 = done, 0 = not ready (poll), -1 = error
pg_flush: 0 = flushed, 1 = pending (poll), -1 = error
After pg_putcopydata_async returns 1, the caller calls pg_flush to push data to the server. This matches the standard libpq pattern: PQputCopyData queues, PQflush sends.
Implementation details
copy_nonblockingfield to imp_dbh_t, new function declarationsTests
27 new test assertions in t/07copy.t covering:
Full test suite passes: 3,773 tests, 0 failures, 0 regressions.
Backward compatibility
Zero risk. The blocking pg_putcopydata is unchanged — the XS wrapper passes async=0. No existing behavior is affected. New methods are purely additive.