diff --git a/postgres-types/src/type_gen.rs b/postgres-types/src/type_gen.rs index a1bc3f85c..104c56d18 100644 --- a/postgres-types/src/type_gen.rs +++ b/postgres-types/src/type_gen.rs @@ -13,6 +13,7 @@ pub struct Other { #[derive(PartialEq, Eq, Clone, Debug, Hash)] pub enum Inner { + Unspecified, Bool, Bytea, Char, @@ -204,6 +205,7 @@ pub enum Inner { impl Inner { pub fn from_oid(oid: Oid) -> Option { match oid { + 0 => Some(Inner::Unspecified), 16 => Some(Inner::Bool), 17 => Some(Inner::Bytea), 18 => Some(Inner::Char), @@ -395,6 +397,7 @@ impl Inner { pub fn oid(&self) -> Oid { match *self { + Inner::Unspecified => 0, Inner::Bool => 16, Inner::Bytea => 17, Inner::Char => 18, @@ -586,6 +589,7 @@ impl Inner { pub fn kind(&self) -> &Kind { match *self { + Inner::Unspecified => &Kind::Simple, Inner::Bool => &Kind::Simple, Inner::Bytea => &Kind::Simple, Inner::Char => &Kind::Simple, @@ -777,6 +781,7 @@ impl Inner { pub fn name(&self) -> &str { match *self { + Inner::Unspecified => "unspecified", Inner::Bool => "bool", Inner::Bytea => "bytea", Inner::Char => "char", @@ -967,6 +972,9 @@ impl Inner { } } impl Type { + /// Unspecified + pub const UNSPECIFIED: Type = Type(Inner::Unspecified); + /// BOOL - boolean, 'true'/'false' pub const BOOL: Type = Type(Inner::Bool); diff --git a/postgres/src/client.rs b/postgres/src/client.rs index c8e14cf81..29ba87161 100644 --- a/postgres/src/client.rs +++ b/postgres/src/client.rs @@ -7,7 +7,7 @@ use std::task::Poll; use std::time::Duration; use tokio_postgres::tls::{MakeTlsConnect, TlsConnect}; use tokio_postgres::types::{BorrowToSql, ToSql, Type}; -use tokio_postgres::{Error, Row, SimpleQueryMessage, Socket}; +use tokio_postgres::{Error, FormatCode, Row, SimpleQueryMessage, Socket}; /// A synchronous PostgreSQL client. pub struct Client { @@ -251,9 +251,9 @@ impl Client { I: IntoIterator, I::IntoIter: ExactSizeIterator, { - let stream = self - .connection - .block_on(self.client.query_raw(query, params))?; + let stream = + self.connection + .block_on(self.client.query_raw(query, params, FormatCode::Binary))?; Ok(RowIter::new(self.connection.as_ref(), stream)) } diff --git a/tokio-postgres/src/bind.rs b/tokio-postgres/src/bind.rs index 9c5c49218..a7aae016e 100644 --- a/tokio-postgres/src/bind.rs +++ b/tokio-postgres/src/bind.rs @@ -2,7 +2,7 @@ use crate::client::InnerClient; use crate::codec::FrontendMessage; use crate::connection::RequestMessages; use crate::types::BorrowToSql; -use crate::{query, Error, Portal, Statement}; +use crate::{query, Error, FormatCode, Portal, Statement}; use postgres_protocol::message::backend::Message; use postgres_protocol::message::frontend; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -22,7 +22,7 @@ where { let name = format!("p{}", NEXT_ID.fetch_add(1, Ordering::SeqCst)); let buf = client.with_buf(|buf| { - query::encode_bind(&statement, params, &name, buf)?; + query::encode_bind(&statement, params, &name, FormatCode::Binary, buf)?; frontend::sync(buf); Ok(buf.split().freeze()) })?; diff --git a/tokio-postgres/src/client.rs b/tokio-postgres/src/client.rs index 427a05049..65713b6ac 100644 --- a/tokio-postgres/src/client.rs +++ b/tokio-postgres/src/client.rs @@ -35,6 +35,13 @@ use std::task::{Context, Poll}; use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite}; +pub enum FormatCode { + /// Use the stable text representation. + Text = 0, + /// Use the less-stable binary representation. + Binary = 1, +} + pub struct Responses { receiver: mpsc::Receiver, cur: BackendMessages, @@ -87,6 +94,8 @@ pub struct InnerClient { /// A buffer to use when writing out postgres commands. buffer: Mutex, + + result_format: FormatCode, } impl InnerClient { @@ -196,6 +205,7 @@ impl Client { sender, cached_typeinfo: Default::default(), buffer: Default::default(), + result_format: FormatCode::Binary, }), #[cfg(feature = "runtime")] socket_config: None, @@ -250,7 +260,30 @@ impl Client { where T: ?Sized + ToStatement, { - self.query_raw(statement, slice_iter(params)) + self.query_raw(statement, slice_iter(params), FormatCode::Binary) + .await? + .try_collect() + .await + } + + /// Executes a statement, returning a vector of the resulting rows in the specified Format. + /// + /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list + /// provided, 1-indexed. + /// + /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be + /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front + /// with the `prepare` method. + pub async fn query_with_format( + &self, + statement: &T, + params: &[&(dyn ToSql + Sync)], + result_format: FormatCode, + ) -> Result, Error> + where + T: ?Sized + ToStatement, + { + self.query_raw(statement, slice_iter(params), result_format) .await? .try_collect() .await @@ -274,7 +307,9 @@ impl Client { where T: ?Sized + ToStatement, { - let stream = self.query_raw(statement, slice_iter(params)).await?; + let stream = self + .query_raw(statement, slice_iter(params), FormatCode::Binary) + .await?; pin_mut!(stream); let row = match stream.try_next().await? { @@ -307,7 +342,9 @@ impl Client { where T: ?Sized + ToStatement, { - let stream = self.query_raw(statement, slice_iter(params)).await?; + let stream = self + .query_raw(statement, slice_iter(params), FormatCode::Binary) + .await?; pin_mut!(stream); let row = match stream.try_next().await? { @@ -357,7 +394,12 @@ impl Client { /// # Ok(()) /// # } /// ``` - pub async fn query_raw(&self, statement: &T, params: I) -> Result + pub async fn query_raw( + &self, + statement: &T, + params: I, + result_format: FormatCode, + ) -> Result where T: ?Sized + ToStatement, P: BorrowToSql, @@ -365,7 +407,7 @@ impl Client { I::IntoIter: ExactSizeIterator, { let statement = statement.__convert().into_statement(self).await?; - query::query(&self.inner, statement, params).await + query::query(&self.inner, statement, params, result_format).await } /// Executes a statement, returning the number of rows modified. diff --git a/tokio-postgres/src/copy_in.rs b/tokio-postgres/src/copy_in.rs index 59e31fea6..dcccb1f97 100644 --- a/tokio-postgres/src/copy_in.rs +++ b/tokio-postgres/src/copy_in.rs @@ -2,7 +2,7 @@ use crate::client::{InnerClient, Responses}; use crate::codec::FrontendMessage; use crate::connection::RequestMessages; use crate::query::extract_row_affected; -use crate::{query, slice_iter, Error, Statement}; +use crate::{query, slice_iter, Error, FormatCode, Statement}; use bytes::{Buf, BufMut, BytesMut}; use futures_channel::mpsc; use futures_util::{future, ready, Sink, SinkExt, Stream, StreamExt}; @@ -194,7 +194,7 @@ where { debug!("executing copy in statement {}", statement.name()); - let buf = query::encode(client, &statement, slice_iter(&[]))?; + let buf = query::encode(client, &statement, slice_iter(&[]), FormatCode::Binary)?; let (mut sender, receiver) = mpsc::channel(1); let receiver = CopyInReceiver::new(receiver); diff --git a/tokio-postgres/src/copy_out.rs b/tokio-postgres/src/copy_out.rs index 1e6949252..43aad2a1d 100644 --- a/tokio-postgres/src/copy_out.rs +++ b/tokio-postgres/src/copy_out.rs @@ -1,7 +1,7 @@ use crate::client::{InnerClient, Responses}; use crate::codec::FrontendMessage; use crate::connection::RequestMessages; -use crate::{query, slice_iter, Error, Statement}; +use crate::{query, slice_iter, Error, FormatCode, Statement}; use bytes::Bytes; use futures_util::{ready, Stream}; use log::debug; @@ -14,7 +14,7 @@ use std::task::{Context, Poll}; pub async fn copy_out(client: &InnerClient, statement: Statement) -> Result { debug!("executing copy out statement {}", statement.name()); - let buf = query::encode(client, &statement, slice_iter(&[]))?; + let buf = query::encode(client, &statement, slice_iter(&[]), FormatCode::Binary)?; let responses = start(client, buf).await?; Ok(CopyOutStream { responses, diff --git a/tokio-postgres/src/generic_client.rs b/tokio-postgres/src/generic_client.rs index 50cff9712..ffe96176e 100644 --- a/tokio-postgres/src/generic_client.rs +++ b/tokio-postgres/src/generic_client.rs @@ -1,6 +1,6 @@ use crate::query::RowStream; use crate::types::{BorrowToSql, ToSql, Type}; -use crate::{Client, Error, Row, Statement, ToStatement, Transaction}; +use crate::{Client, Error, FormatCode, Row, Statement, ToStatement, Transaction}; use async_trait::async_trait; mod private { @@ -133,7 +133,7 @@ impl GenericClient for Client { I: IntoIterator + Sync + Send, I::IntoIter: ExactSizeIterator, { - self.query_raw(statement, params).await + self.query_raw(statement, params, FormatCode::Binary).await } async fn prepare(&self, query: &str) -> Result { diff --git a/tokio-postgres/src/lib.rs b/tokio-postgres/src/lib.rs index 2973d33b0..722a50fcd 100644 --- a/tokio-postgres/src/lib.rs +++ b/tokio-postgres/src/lib.rs @@ -119,7 +119,7 @@ #![warn(rust_2018_idioms, clippy::all, missing_docs)] pub use crate::cancel_token::CancelToken; -pub use crate::client::Client; +pub use crate::client::{Client, FormatCode}; pub use crate::config::Config; pub use crate::connection::Connection; pub use crate::copy_in::CopyInSink; diff --git a/tokio-postgres/src/prepare.rs b/tokio-postgres/src/prepare.rs index e3f09a7c2..8516d41cc 100644 --- a/tokio-postgres/src/prepare.rs +++ b/tokio-postgres/src/prepare.rs @@ -3,7 +3,7 @@ use crate::codec::FrontendMessage; use crate::connection::RequestMessages; use crate::error::SqlState; use crate::types::{Field, Kind, Oid, Type}; -use crate::{query, slice_iter}; +use crate::{query, slice_iter, FormatCode}; use crate::{Column, Error, Statement}; use bytes::Bytes; use fallible_iterator::FallibleIterator; @@ -137,7 +137,7 @@ async fn get_type(client: &Arc, oid: Oid) -> Result { let stmt = typeinfo_statement(client).await?; - let rows = query::query(client, stmt, slice_iter(&[&oid])).await?; + let rows = query::query(client, stmt, slice_iter(&[&oid]), FormatCode::Binary).await?; pin_mut!(rows); let row = match rows.try_next().await? { @@ -207,7 +207,7 @@ async fn typeinfo_statement(client: &Arc) -> Result, oid: Oid) -> Result, Error> { let stmt = typeinfo_enum_statement(client).await?; - query::query(client, stmt, slice_iter(&[&oid])) + query::query(client, stmt, slice_iter(&[&oid]), FormatCode::Binary) .await? .and_then(|row| async move { row.try_get(0) }) .try_collect() @@ -234,7 +234,7 @@ async fn typeinfo_enum_statement(client: &Arc) -> Result, oid: Oid) -> Result, Error> { let stmt = typeinfo_composite_statement(client).await?; - let rows = query::query(client, stmt, slice_iter(&[&oid])) + let rows = query::query(client, stmt, slice_iter(&[&oid]), FormatCode::Binary) .await? .try_collect::>() .await?; diff --git a/tokio-postgres/src/query.rs b/tokio-postgres/src/query.rs index e6e1d00a8..e5758ac94 100644 --- a/tokio-postgres/src/query.rs +++ b/tokio-postgres/src/query.rs @@ -2,7 +2,7 @@ use crate::client::{InnerClient, Responses}; use crate::codec::FrontendMessage; use crate::connection::RequestMessages; use crate::types::{BorrowToSql, IsNull}; -use crate::{Error, Portal, Row, Statement}; +use crate::{Error, FormatCode, Portal, Row, Statement}; use bytes::{Bytes, BytesMut}; use futures_util::{ready, Stream}; use log::{debug, log_enabled, Level}; @@ -31,6 +31,7 @@ pub async fn query( client: &InnerClient, statement: Statement, params: I, + result_format: FormatCode, ) -> Result where P: BorrowToSql, @@ -44,9 +45,9 @@ where statement.name(), BorrowToSqlParamsDebug(params.as_slice()), ); - encode(client, &statement, params)? + encode(client, &statement, params, result_format)? } else { - encode(client, &statement, params)? + encode(client, &statement, params, result_format)? }; let responses = start(client, buf).await?; Ok(RowStream { @@ -108,9 +109,9 @@ where statement.name(), BorrowToSqlParamsDebug(params.as_slice()), ); - encode(client, &statement, params)? + encode(client, &statement, params, FormatCode::Binary)? } else { - encode(client, &statement, params)? + encode(client, &statement, params, FormatCode::Binary)? }; let mut responses = start(client, buf).await?; @@ -139,14 +140,19 @@ async fn start(client: &InnerClient, buf: Bytes) -> Result { Ok(responses) } -pub fn encode(client: &InnerClient, statement: &Statement, params: I) -> Result +pub fn encode( + client: &InnerClient, + statement: &Statement, + params: I, + result_format: FormatCode, +) -> Result where P: BorrowToSql, I: IntoIterator, I::IntoIter: ExactSizeIterator, { client.with_buf(|buf| { - encode_bind(statement, params, "", buf)?; + encode_bind(statement, params, "", result_format, buf)?; frontend::execute("", 0, buf).map_err(Error::encode)?; frontend::sync(buf); Ok(buf.split().freeze()) @@ -157,6 +163,7 @@ pub fn encode_bind( statement: &Statement, params: I, portal: &str, + format_code: FormatCode, buf: &mut BytesMut, ) -> Result<(), Error> where @@ -177,6 +184,7 @@ where .unzip(); let params = params.into_iter(); + let format_code = format_code as i16; let mut error_idx = 0; let r = frontend::bind( @@ -192,7 +200,7 @@ where Err(e) } }, - Some(1), + Some(format_code), buf, ); match r { diff --git a/tokio-postgres/src/transaction.rs b/tokio-postgres/src/transaction.rs index 96a324652..552311ce4 100644 --- a/tokio-postgres/src/transaction.rs +++ b/tokio-postgres/src/transaction.rs @@ -9,7 +9,7 @@ use crate::types::{BorrowToSql, ToSql, Type}; #[cfg(feature = "runtime")] use crate::Socket; use crate::{ - bind, query, slice_iter, CancelToken, Client, CopyInSink, Error, Portal, Row, + bind, query, slice_iter, CancelToken, Client, CopyInSink, Error, FormatCode, Portal, Row, SimpleQueryMessage, Statement, ToStatement, }; use bytes::Buf; @@ -146,7 +146,9 @@ impl<'a> Transaction<'a> { I: IntoIterator, I::IntoIter: ExactSizeIterator, { - self.client.query_raw(statement, params).await + self.client + .query_raw(statement, params, FormatCode::Binary) + .await } /// Like `Client::execute`.