Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions postgres-types/src/type_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct Other {

#[derive(PartialEq, Eq, Clone, Debug, Hash)]
pub enum Inner {
Unspecified,
Bool,
Bytea,
Char,
Expand Down Expand Up @@ -204,6 +205,7 @@ pub enum Inner {
impl Inner {
pub fn from_oid(oid: Oid) -> Option<Inner> {
match oid {
0 => Some(Inner::Unspecified),
16 => Some(Inner::Bool),
17 => Some(Inner::Bytea),
18 => Some(Inner::Char),
Expand Down Expand Up @@ -395,6 +397,7 @@ impl Inner {

pub fn oid(&self) -> Oid {
match *self {
Inner::Unspecified => 0,
Inner::Bool => 16,
Inner::Bytea => 17,
Inner::Char => 18,
Expand Down Expand Up @@ -586,6 +589,7 @@ impl Inner {

pub fn kind(&self) -> &Kind {
match *self {
Inner::Unspecified => &Kind::Simple,
Inner::Bool => &Kind::Simple,
Inner::Bytea => &Kind::Simple,
Inner::Char => &Kind::Simple,
Expand Down Expand Up @@ -777,6 +781,7 @@ impl Inner {

pub fn name(&self) -> &str {
match *self {
Inner::Unspecified => "unspecified",
Inner::Bool => "bool",
Inner::Bytea => "bytea",
Inner::Char => "char",
Expand Down Expand Up @@ -967,6 +972,9 @@ impl Inner {
}
}
impl Type {
/// Unspecified
pub const UNSPECIFIED: Type = Type(Inner::Unspecified);

/// BOOL - boolean, &#39;true&#39;/&#39;false&#39;
pub const BOOL: Type = Type(Inner::Bool);

Expand Down
8 changes: 4 additions & 4 deletions postgres/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::task::Poll;
use std::time::Duration;
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
use tokio_postgres::types::{BorrowToSql, ToSql, Type};
use tokio_postgres::{Error, Row, SimpleQueryMessage, Socket};
use tokio_postgres::{Error, FormatCode, Row, SimpleQueryMessage, Socket};

/// A synchronous PostgreSQL client.
pub struct Client {
Expand Down Expand Up @@ -251,9 +251,9 @@ impl Client {
I: IntoIterator<Item = P>,
I::IntoIter: ExactSizeIterator,
{
let stream = self
.connection
.block_on(self.client.query_raw(query, params))?;
let stream =
self.connection
.block_on(self.client.query_raw(query, params, FormatCode::Binary))?;
Ok(RowIter::new(self.connection.as_ref(), stream))
}

Expand Down
4 changes: 2 additions & 2 deletions tokio-postgres/src/bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::client::InnerClient;
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::BorrowToSql;
use crate::{query, Error, Portal, Statement};
use crate::{query, Error, FormatCode, Portal, Statement};
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand All @@ -22,7 +22,7 @@ where
{
let name = format!("p{}", NEXT_ID.fetch_add(1, Ordering::SeqCst));
let buf = client.with_buf(|buf| {
query::encode_bind(&statement, params, &name, buf)?;
query::encode_bind(&statement, params, &name, FormatCode::Binary, buf)?;
frontend::sync(buf);
Ok(buf.split().freeze())
})?;
Expand Down
52 changes: 47 additions & 5 deletions tokio-postgres/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};

pub enum FormatCode {
/// Use the stable text representation.
Text = 0,
/// Use the less-stable binary representation.
Binary = 1,
}

pub struct Responses {
receiver: mpsc::Receiver<BackendMessages>,
cur: BackendMessages,
Expand Down Expand Up @@ -87,6 +94,8 @@ pub struct InnerClient {

/// A buffer to use when writing out postgres commands.
buffer: Mutex<BytesMut>,

result_format: FormatCode,
}

impl InnerClient {
Expand Down Expand Up @@ -196,6 +205,7 @@ impl Client {
sender,
cached_typeinfo: Default::default(),
buffer: Default::default(),
result_format: FormatCode::Binary,
}),
#[cfg(feature = "runtime")]
socket_config: None,
Expand Down Expand Up @@ -250,7 +260,30 @@ impl Client {
where
T: ?Sized + ToStatement,
{
self.query_raw(statement, slice_iter(params))
self.query_raw(statement, slice_iter(params), FormatCode::Binary)
.await?
.try_collect()
.await
}

/// Executes a statement, returning a vector of the resulting rows in the specified Format.
///
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
/// provided, 1-indexed.
///
/// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
/// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
/// with the `prepare` method.
pub async fn query_with_format<T>(
&self,
statement: &T,
params: &[&(dyn ToSql + Sync)],
result_format: FormatCode,
) -> Result<Vec<Row>, Error>
where
T: ?Sized + ToStatement,
{
self.query_raw(statement, slice_iter(params), result_format)
.await?
.try_collect()
.await
Expand All @@ -274,7 +307,9 @@ impl Client {
where
T: ?Sized + ToStatement,
{
let stream = self.query_raw(statement, slice_iter(params)).await?;
let stream = self
.query_raw(statement, slice_iter(params), FormatCode::Binary)
.await?;
pin_mut!(stream);

let row = match stream.try_next().await? {
Expand Down Expand Up @@ -307,7 +342,9 @@ impl Client {
where
T: ?Sized + ToStatement,
{
let stream = self.query_raw(statement, slice_iter(params)).await?;
let stream = self
.query_raw(statement, slice_iter(params), FormatCode::Binary)
.await?;
pin_mut!(stream);

let row = match stream.try_next().await? {
Expand Down Expand Up @@ -357,15 +394,20 @@ impl Client {
/// # Ok(())
/// # }
/// ```
pub async fn query_raw<T, P, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
pub async fn query_raw<T, P, I>(
&self,
statement: &T,
params: I,
result_format: FormatCode,
) -> Result<RowStream, Error>
where
T: ?Sized + ToStatement,
P: BorrowToSql,
I: IntoIterator<Item = P>,
I::IntoIter: ExactSizeIterator,
{
let statement = statement.__convert().into_statement(self).await?;
query::query(&self.inner, statement, params).await
query::query(&self.inner, statement, params, result_format).await
}

/// Executes a statement, returning the number of rows modified.
Expand Down
4 changes: 2 additions & 2 deletions tokio-postgres/src/copy_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::query::extract_row_affected;
use crate::{query, slice_iter, Error, Statement};
use crate::{query, slice_iter, Error, FormatCode, Statement};
use bytes::{Buf, BufMut, BytesMut};
use futures_channel::mpsc;
use futures_util::{future, ready, Sink, SinkExt, Stream, StreamExt};
Expand Down Expand Up @@ -194,7 +194,7 @@ where
{
debug!("executing copy in statement {}", statement.name());

let buf = query::encode(client, &statement, slice_iter(&[]))?;
let buf = query::encode(client, &statement, slice_iter(&[]), FormatCode::Binary)?;

let (mut sender, receiver) = mpsc::channel(1);
let receiver = CopyInReceiver::new(receiver);
Expand Down
4 changes: 2 additions & 2 deletions tokio-postgres/src/copy_out.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::{query, slice_iter, Error, Statement};
use crate::{query, slice_iter, Error, FormatCode, Statement};
use bytes::Bytes;
use futures_util::{ready, Stream};
use log::debug;
Expand All @@ -14,7 +14,7 @@ use std::task::{Context, Poll};
pub async fn copy_out(client: &InnerClient, statement: Statement) -> Result<CopyOutStream, Error> {
debug!("executing copy out statement {}", statement.name());

let buf = query::encode(client, &statement, slice_iter(&[]))?;
let buf = query::encode(client, &statement, slice_iter(&[]), FormatCode::Binary)?;
let responses = start(client, buf).await?;
Ok(CopyOutStream {
responses,
Expand Down
4 changes: 2 additions & 2 deletions tokio-postgres/src/generic_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::query::RowStream;
use crate::types::{BorrowToSql, ToSql, Type};
use crate::{Client, Error, Row, Statement, ToStatement, Transaction};
use crate::{Client, Error, FormatCode, Row, Statement, ToStatement, Transaction};
use async_trait::async_trait;

mod private {
Expand Down Expand Up @@ -133,7 +133,7 @@ impl GenericClient for Client {
I: IntoIterator<Item = P> + Sync + Send,
I::IntoIter: ExactSizeIterator,
{
self.query_raw(statement, params).await
self.query_raw(statement, params, FormatCode::Binary).await
}

async fn prepare(&self, query: &str) -> Result<Statement, Error> {
Expand Down
2 changes: 1 addition & 1 deletion tokio-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
#![warn(rust_2018_idioms, clippy::all, missing_docs)]

pub use crate::cancel_token::CancelToken;
pub use crate::client::Client;
pub use crate::client::{Client, FormatCode};
pub use crate::config::Config;
pub use crate::connection::Connection;
pub use crate::copy_in::CopyInSink;
Expand Down
8 changes: 4 additions & 4 deletions tokio-postgres/src/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::error::SqlState;
use crate::types::{Field, Kind, Oid, Type};
use crate::{query, slice_iter};
use crate::{query, slice_iter, FormatCode};
use crate::{Column, Error, Statement};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
Expand Down Expand Up @@ -137,7 +137,7 @@ async fn get_type(client: &Arc<InnerClient>, oid: Oid) -> Result<Type, Error> {

let stmt = typeinfo_statement(client).await?;

let rows = query::query(client, stmt, slice_iter(&[&oid])).await?;
let rows = query::query(client, stmt, slice_iter(&[&oid]), FormatCode::Binary).await?;
pin_mut!(rows);

let row = match rows.try_next().await? {
Expand Down Expand Up @@ -207,7 +207,7 @@ async fn typeinfo_statement(client: &Arc<InnerClient>) -> Result<Statement, Erro
async fn get_enum_variants(client: &Arc<InnerClient>, oid: Oid) -> Result<Vec<String>, Error> {
let stmt = typeinfo_enum_statement(client).await?;

query::query(client, stmt, slice_iter(&[&oid]))
query::query(client, stmt, slice_iter(&[&oid]), FormatCode::Binary)
.await?
.and_then(|row| async move { row.try_get(0) })
.try_collect()
Expand All @@ -234,7 +234,7 @@ async fn typeinfo_enum_statement(client: &Arc<InnerClient>) -> Result<Statement,
async fn get_composite_fields(client: &Arc<InnerClient>, oid: Oid) -> Result<Vec<Field>, Error> {
let stmt = typeinfo_composite_statement(client).await?;

let rows = query::query(client, stmt, slice_iter(&[&oid]))
let rows = query::query(client, stmt, slice_iter(&[&oid]), FormatCode::Binary)
.await?
.try_collect::<Vec<_>>()
.await?;
Expand Down
24 changes: 16 additions & 8 deletions tokio-postgres/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::{BorrowToSql, IsNull};
use crate::{Error, Portal, Row, Statement};
use crate::{Error, FormatCode, Portal, Row, Statement};
use bytes::{Bytes, BytesMut};
use futures_util::{ready, Stream};
use log::{debug, log_enabled, Level};
Expand Down Expand Up @@ -31,6 +31,7 @@ pub async fn query<P, I>(
client: &InnerClient,
statement: Statement,
params: I,
result_format: FormatCode,
) -> Result<RowStream, Error>
where
P: BorrowToSql,
Expand All @@ -44,9 +45,9 @@ where
statement.name(),
BorrowToSqlParamsDebug(params.as_slice()),
);
encode(client, &statement, params)?
encode(client, &statement, params, result_format)?
} else {
encode(client, &statement, params)?
encode(client, &statement, params, result_format)?
};
let responses = start(client, buf).await?;
Ok(RowStream {
Expand Down Expand Up @@ -108,9 +109,9 @@ where
statement.name(),
BorrowToSqlParamsDebug(params.as_slice()),
);
encode(client, &statement, params)?
encode(client, &statement, params, FormatCode::Binary)?
} else {
encode(client, &statement, params)?
encode(client, &statement, params, FormatCode::Binary)?
};
let mut responses = start(client, buf).await?;

Expand Down Expand Up @@ -139,14 +140,19 @@ async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
Ok(responses)
}

pub fn encode<P, I>(client: &InnerClient, statement: &Statement, params: I) -> Result<Bytes, Error>
pub fn encode<P, I>(
client: &InnerClient,
statement: &Statement,
params: I,
result_format: FormatCode,
) -> Result<Bytes, Error>
where
P: BorrowToSql,
I: IntoIterator<Item = P>,
I::IntoIter: ExactSizeIterator,
{
client.with_buf(|buf| {
encode_bind(statement, params, "", buf)?;
encode_bind(statement, params, "", result_format, buf)?;
frontend::execute("", 0, buf).map_err(Error::encode)?;
frontend::sync(buf);
Ok(buf.split().freeze())
Expand All @@ -157,6 +163,7 @@ pub fn encode_bind<P, I>(
statement: &Statement,
params: I,
portal: &str,
format_code: FormatCode,
buf: &mut BytesMut,
) -> Result<(), Error>
where
Expand All @@ -177,6 +184,7 @@ where
.unzip();

let params = params.into_iter();
let format_code = format_code as i16;

let mut error_idx = 0;
let r = frontend::bind(
Expand All @@ -192,7 +200,7 @@ where
Err(e)
}
},
Some(1),
Some(format_code),
buf,
);
match r {
Expand Down
Loading