Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ or_fun_call = "warn"
unnecessary_lazy_evaluations = "warn"
uninlined_format_args = "warn"
inefficient_to_string = "warn"
clone_on_ref_ptr = "deny"
# https://github.com/apache/datafusion/issues/18503
needless_pass_by_value = "warn"
# https://github.com/apache/datafusion/issues/18881
Expand Down
3 changes: 3 additions & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ rust-version = { workspace = true }
[package.metadata.docs.rs]
all-features = true

[lints]
workspace = true

[features]
default = []
backtrace = ["datafusion/backtrace"]
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/examples/cli-session-context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ impl CliSessionContext for MyUnionerContext {
&self,
plan: LogicalPlan,
) -> Result<DataFrame, DataFusionError> {
let new_plan = LogicalPlanBuilder::from(plan.clone())
.union(plan.clone())?
let new_plan = LogicalPlanBuilder::from(Arc::clone(&plan))
.union(Arc::clone(&plan))?
.build()?;

self.ctx.execute_logical_plan(new_plan).await
Expand Down
8 changes: 4 additions & 4 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl CatalogProviderList for DynamicObjectStoreCatalog {
}

fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
let state = self.state.clone();
let state = Weak::clone(&self.state);
self.inner.catalog(name).map(|catalog| {
Arc::new(DynamicObjectStoreCatalogProvider::new(catalog, state)) as _
})
Expand Down Expand Up @@ -100,7 +100,7 @@ impl CatalogProvider for DynamicObjectStoreCatalogProvider {
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
let state = self.state.clone();
let state = Weak::clone(&self.state);
self.inner.schema(name).map(|schema| {
Arc::new(DynamicObjectStoreSchemaProvider::new(schema, state)) as _
})
Expand Down Expand Up @@ -240,12 +240,12 @@ mod tests {
fn setup_context() -> (SessionContext, Arc<dyn SchemaProvider>) {
let ctx = SessionContext::new();
ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new(
ctx.state().catalog_list().clone(),
Arc::clone(ctx.state().catalog_list()),
ctx.state_weak_ref(),
)));

let provider = &DynamicObjectStoreCatalog::new(
ctx.state().catalog_list().clone(),
Arc::clone(ctx.state().catalog_list()),
ctx.state_weak_ref(),
) as &dyn CatalogProviderList;
let catalog = provider
Expand Down
5 changes: 3 additions & 2 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
use std::io::prelude::*;
use std::sync::Arc;
use tokio::signal;

/// run and execute SQL statements and commands, against a context with the given print options
Expand Down Expand Up @@ -281,14 +282,14 @@ impl StatementExecutor {
}
// As the input stream comes, we can generate results.
// However, memory safety is not guaranteed.
let stream = execute_stream(physical_plan, task_ctx.clone())?;
let stream = execute_stream(physical_plan, Arc::clone(&task_ctx))?;
print_options
.print_stream(stream, now, &options.format)
.await?;
} else {
// Bounded stream; collected results size is limited by the maxrows option
let schema = physical_plan.schema();
let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
let mut stream = execute_stream(physical_plan, Arc::clone(&task_ctx))?;
let mut results = vec![];
let mut row_count = 0_usize;
let max_rows = match print_options.maxrows {
Expand Down
24 changes: 12 additions & 12 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ impl TableProvider for ParquetMetadataTable {
self
}

fn schema(&self) -> arrow::datatypes::SchemaRef {
self.schema.clone()
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}

fn table_type(&self) -> datafusion::logical_expr::TableType {
Expand Down Expand Up @@ -436,7 +436,7 @@ impl TableFunctionImpl for ParquetMetadataFunc {
}

let rb = RecordBatch::try_new(
schema.clone(),
Arc::clone(&schema),
vec![
Arc::new(StringArray::from(filename_arr)),
Arc::new(Int64Array::from(row_group_id_arr)),
Expand Down Expand Up @@ -482,8 +482,8 @@ impl TableProvider for MetadataCacheTable {
self
}

fn schema(&self) -> arrow::datatypes::SchemaRef {
self.schema.clone()
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}

fn table_type(&self) -> datafusion::logical_expr::TableType {
Expand Down Expand Up @@ -569,7 +569,7 @@ impl TableFunctionImpl for MetadataCacheFunc {
}

let batch = RecordBatch::try_new(
schema.clone(),
Arc::clone(&schema),
vec![
Arc::new(StringArray::from(path_arr)),
Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
Expand Down Expand Up @@ -600,8 +600,8 @@ impl TableProvider for StatisticsCacheTable {
self
}

fn schema(&self) -> arrow::datatypes::SchemaRef {
self.schema.clone()
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}

fn table_type(&self) -> datafusion::logical_expr::TableType {
Expand Down Expand Up @@ -684,7 +684,7 @@ impl TableFunctionImpl for StatisticsCacheFunc {
}

let batch = RecordBatch::try_new(
schema.clone(),
Arc::clone(&schema),
vec![
Arc::new(StringArray::from(path_arr)),
Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
Expand Down Expand Up @@ -735,8 +735,8 @@ impl TableProvider for ListFilesCacheTable {
self
}

fn schema(&self) -> arrow::datatypes::SchemaRef {
self.schema.clone()
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}

fn table_type(&self) -> datafusion::logical_expr::TableType {
Expand Down Expand Up @@ -862,7 +862,7 @@ impl TableFunctionImpl for ListFilesCacheFunc {
OffsetBuffer::new(ScalarBuffer::from(Buffer::from_vec(offsets)));

let batch = RecordBatch::try_new(
schema.clone(),
Arc::clone(&schema),
vec![
Arc::new(StringArray::from(table_arr)),
Arc::new(StringArray::from(path_arr)),
Expand Down
51 changes: 27 additions & 24 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use clap::Parser;
use datafusion::common::config_err;
use datafusion::config::ConfigOptions;
use datafusion::execution::disk_manager::{DiskManagerBuilder, DiskManagerMode};
use datafusion::execution::object_store::ObjectStoreRegistry;
use mimalloc::MiMalloc;

#[global_allocator]
Expand Down Expand Up @@ -221,7 +222,9 @@ async fn main_inner() -> Result<()> {
InstrumentedObjectStoreRegistry::new()
.with_profile_mode(args.object_store_profiling),
);
rt_builder = rt_builder.with_object_store_registry(instrumented_registry.clone());
rt_builder = rt_builder.with_object_store_registry(
Arc::clone(&instrumented_registry) as Arc<dyn ObjectStoreRegistry>,
);

let runtime_env = rt_builder.build_arc()?;

Expand All @@ -231,7 +234,7 @@ async fn main_inner() -> Result<()> {
ctx.refresh_catalogs().await?;
// install dynamic catalog provider that can register required object stores
ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new(
ctx.state().catalog_list().clone(),
Arc::clone(ctx.state().catalog_list()),
ctx.state_weak_ref(),
)));
// register `parquet_metadata` table function to get metadata from parquet files
Expand All @@ -240,24 +243,24 @@ async fn main_inner() -> Result<()> {
// register `metadata_cache` table function to get the contents of the file metadata cache
ctx.register_udtf(
"metadata_cache",
Arc::new(MetadataCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
Arc::new(MetadataCacheFunc::new(Arc::clone(
&ctx.task_ctx().runtime_env().cache_manager,
))),
);

// register `statistics_cache` table function to get the contents of the file statistics cache
ctx.register_udtf(
"statistics_cache",
Arc::new(StatisticsCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
Arc::new(StatisticsCacheFunc::new(Arc::clone(
&ctx.task_ctx().runtime_env().cache_manager,
))),
);

ctx.register_udtf(
"list_files_cache",
Arc::new(ListFilesCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
Arc::new(ListFilesCacheFunc::new(Arc::clone(
&ctx.task_ctx().runtime_env().cache_manager,
))),
);

let mut print_options = PrintOptions {
Expand Down Expand Up @@ -568,9 +571,9 @@ mod tests {
let ctx = SessionContext::new();
ctx.register_udtf(
"metadata_cache",
Arc::new(MetadataCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
Arc::new(MetadataCacheFunc::new(Arc::clone(
&ctx.task_ctx().runtime_env().cache_manager,
))),
);

ctx.register_parquet(
Expand Down Expand Up @@ -664,9 +667,9 @@ mod tests {

ctx.register_udtf(
"statistics_cache",
Arc::new(StatisticsCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
Arc::new(StatisticsCacheFunc::new(Arc::clone(
&ctx.task_ctx().runtime_env().cache_manager,
))),
);

for filename in [
Expand Down Expand Up @@ -715,9 +718,9 @@ mod tests {

ctx.register_udtf(
"statistics_cache",
Arc::new(StatisticsCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
Arc::new(StatisticsCacheFunc::new(Arc::clone(
&ctx.task_ctx().runtime_env().cache_manager,
))),
);

for filename in [
Expand Down Expand Up @@ -778,9 +781,9 @@ mod tests {

ctx.register_udtf(
"list_files_cache",
Arc::new(ListFilesCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
Arc::new(ListFilesCacheFunc::new(Arc::clone(
&ctx.task_ctx().runtime_env().cache_manager,
))),
);

ctx.sql(
Expand Down Expand Up @@ -816,7 +819,7 @@ mod tests {

assert_eq!(
2,
df.clone()
Arc::clone(&df)
.filter(col("expires_in").is_not_null())?
.count()
.await?
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl CredentialsFromConfig {

#[derive(Debug)]
struct S3CredentialProvider {
credentials: aws_credential_types::provider::SharedCredentialsProvider,
credentials: SharedCredentialsProvider,
}

#[async_trait]
Expand Down
8 changes: 4 additions & 4 deletions datafusion-cli/src/object_storage/instrumented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub enum InstrumentedObjectStoreMode {
}

impl fmt::Display for InstrumentedObjectStoreMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{self:?}")
}
}
Expand Down Expand Up @@ -383,7 +383,7 @@ impl InstrumentedObjectStore {
}

impl fmt::Display for InstrumentedObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mode: InstrumentedObjectStoreMode =
self.instrument_mode.load(Ordering::Relaxed).into();
write!(
Expand Down Expand Up @@ -490,7 +490,7 @@ pub enum Operation {
}

impl fmt::Display for Operation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{self:?}")
}
}
Expand All @@ -508,7 +508,7 @@ pub struct RequestDetails {
}

impl fmt::Display for RequestDetails {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut output_parts = vec![format!(
"{} operation={:?}",
self.timestamp.to_rfc3339(),
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/print_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ mod tests {
self.format
.print_batches(
&mut buffer,
self.schema.clone(),
Arc::clone(&self.schema),
&self.batches,
self.maxrows,
with_header,
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/src/print_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl PrintOptions {
row_count: usize,
format_options: &FormatOptions,
) -> Result<()> {
let stdout = std::io::stdout();
let stdout = io::stdout();
let mut writer = stdout.lock();

self.format.print_batches(
Expand Down Expand Up @@ -153,7 +153,7 @@ impl PrintOptions {
));
};

let stdout = std::io::stdout();
let stdout = io::stdout();
let mut writer = stdout.lock();

let mut row_count = 0_usize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ version = "0.1.0"
edition = { workspace = true }
publish = false

[lints]
workspace = true

[dependencies]
abi_stable = "0.11.3"
arrow = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ version = "0.1.0"
edition = "2024"
publish = false

[lints]
workspace = true

[dependencies]
abi_stable = "0.11.3"
datafusion-ffi = { workspace = true }
3 changes: 3 additions & 0 deletions datafusion-examples/examples/ffi/ffi_module_loader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ version = "0.1.0"
edition = "2024"
publish = false

[lints]
workspace = true

[dependencies]
abi_stable = "0.11.3"
datafusion = { workspace = true }
Expand Down
Loading