diff --git a/optd/core/src/ir/column/set.rs b/optd/core/src/ir/column/set.rs index e964734..174f4ec 100644 --- a/optd/core/src/ir/column/set.rs +++ b/optd/core/src/ir/column/set.rs @@ -24,6 +24,11 @@ impl ColumnSet { pub fn with_capacity(n: usize) -> Self { Self(HashSet::with_capacity(n)) } + + pub fn iter(&self) -> impl Iterator { + self.0.iter() + } + pub fn contains(&self, column: &Column) -> bool { self.0.contains(column) } diff --git a/optd/core/src/ir/operator/logical/dependent_join.rs b/optd/core/src/ir/operator/logical/dependent_join.rs new file mode 100644 index 0000000..ab7cbd7 --- /dev/null +++ b/optd/core/src/ir/operator/logical/dependent_join.rs @@ -0,0 +1,50 @@ +//! The dependent join operator joins two input relations where the inner +//! relation may depend on columns from the outer relation, typically for +//! correlated subqueries. + +use crate::ir::{ + IRCommon, Operator, Scalar, + explain::Explain, + macros::{define_node, impl_operator_conversion}, + properties::OperatorProperties, +}; +use pretty_xmlish::Pretty; +use std::sync::Arc; + +define_node!( + /// Metadata: (none) + /// Scalars: + /// - join_cond: The join conditions to join on + LogicalDependentJoin, LogicalDependentJoinBorrowed { + properties: OperatorProperties, + metadata: LogicalDependentJoinMetadata {}, + inputs: { + operators: [outer, inner], + scalars: [join_cond], + } + } +); +impl_operator_conversion!(LogicalDependentJoin, LogicalDependentJoinBorrowed); + +impl LogicalDependentJoin { + pub fn new(outer: Arc, inner: Arc, join_cond: Arc) -> Self { + Self { + meta: LogicalDependentJoinMetadata {}, + common: IRCommon::new(Arc::new([outer, inner]), Arc::new([join_cond])), + } + } +} + +impl Explain for LogicalDependentJoinBorrowed<'_> { + fn explain<'a>( + &self, + ctx: &crate::ir::IRContext, + option: &crate::ir::explain::ExplainOption, + ) -> pretty_xmlish::Pretty<'a> { + let mut fields = Vec::with_capacity(2); + fields.push((".join_cond", self.join_cond().explain(ctx, option))); + fields.extend(self.common.explain_operator_properties(ctx, option)); + let children = self.common.explain_input_operators(ctx, option); + Pretty::simple_record("LogicalDependentJoin", fields, children) + } +} diff --git a/optd/core/src/ir/operator/logical/mark_join.rs b/optd/core/src/ir/operator/logical/mark_join.rs new file mode 100644 index 0000000..67d53bc --- /dev/null +++ b/optd/core/src/ir/operator/logical/mark_join.rs @@ -0,0 +1,58 @@ +//! The mark join operator evaluates a subquery and produces a marker column on +//! the outer relation indicating whether a match exists. + +use crate::ir::{ + Column, IRCommon, Operator, Scalar, + explain::Explain, + macros::{define_node, impl_operator_conversion}, + properties::OperatorProperties, +}; +use pretty_xmlish::Pretty; +use std::sync::Arc; + +define_node!( + /// Metadata: + /// - mark_column: The column used to store the marker output. + /// Scalars: + /// - join_cond: The join conditions to join on + LogicalMarkJoin, LogicalMarkJoinBorrowed { + properties: OperatorProperties, + metadata: LogicalMarkJoinMetadata { + mark_column: Column, + }, + inputs: { + operators: [outer, inner], + scalars: [join_cond], + } + } +); +impl_operator_conversion!(LogicalMarkJoin, LogicalMarkJoinBorrowed); + +impl LogicalMarkJoin { + pub fn new( + mark_column: Column, + outer: Arc, + inner: Arc, + join_cond: Arc, + ) -> Self { + Self { + meta: LogicalMarkJoinMetadata { mark_column }, + common: IRCommon::new(Arc::new([outer, inner]), Arc::new([join_cond])), + } + } +} + +impl Explain for LogicalMarkJoinBorrowed<'_> { + fn explain<'a>( + &self, + ctx: &crate::ir::IRContext, + option: &crate::ir::explain::ExplainOption, + ) -> pretty_xmlish::Pretty<'a> { + let mut fields = Vec::with_capacity(3); + fields.push((".mark_column", Pretty::debug(self.mark_column()))); + fields.push((".join_cond", self.join_cond().explain(ctx, option))); + fields.extend(self.common.explain_operator_properties(ctx, option)); + let children = self.common.explain_input_operators(ctx, option); + Pretty::simple_record("LogicalMarkJoin", fields, children) + } +} diff --git a/optd/core/src/ir/operator/logical/mod.rs b/optd/core/src/ir/operator/logical/mod.rs index feb19aa..3b3dbeb 100644 --- a/optd/core/src/ir/operator/logical/mod.rs +++ b/optd/core/src/ir/operator/logical/mod.rs @@ -4,9 +4,13 @@ //! aggregating data. pub mod aggregate; +pub mod dependent_join; pub mod get; pub mod join; +pub mod mark_join; pub mod order_by; pub mod project; pub mod remap; pub mod select; +pub mod single_join; +pub mod subquery; diff --git a/optd/core/src/ir/operator/logical/single_join.rs b/optd/core/src/ir/operator/logical/single_join.rs new file mode 100644 index 0000000..84fcc6c --- /dev/null +++ b/optd/core/src/ir/operator/logical/single_join.rs @@ -0,0 +1,49 @@ +//! The single join operator joins two relations with the guarantee that the +//! inner side produces at most one matching row per outer row. + +use crate::ir::{ + IRCommon, Operator, Scalar, + explain::Explain, + macros::{define_node, impl_operator_conversion}, + properties::OperatorProperties, +}; +use pretty_xmlish::Pretty; +use std::sync::Arc; + +define_node!( + /// Metadata: (none) + /// Scalars: + /// - join_cond: The join conditions to join on + LogicalSingleJoin, LogicalSingleJoinBorrowed { + properties: OperatorProperties, + metadata: LogicalSingleJoinMetadata {}, + inputs: { + operators: [outer, inner], + scalars: [join_cond], + } + } +); +impl_operator_conversion!(LogicalSingleJoin, LogicalSingleJoinBorrowed); + +impl LogicalSingleJoin { + pub fn new(outer: Arc, inner: Arc, join_cond: Arc) -> Self { + Self { + meta: LogicalSingleJoinMetadata {}, + common: IRCommon::new(Arc::new([outer, inner]), Arc::new([join_cond])), + } + } +} + +impl Explain for LogicalSingleJoinBorrowed<'_> { + fn explain<'a>( + &self, + ctx: &crate::ir::IRContext, + option: &crate::ir::explain::ExplainOption, + ) -> pretty_xmlish::Pretty<'a> { + let mut fields = Vec::with_capacity(2); + fields.push((".join_cond", self.join_cond().explain(ctx, option))); + fields.extend(self.common.explain_operator_properties(ctx, option)); + let children = self.common.explain_input_operators(ctx, option); + Pretty::simple_record("LogicalSingleJoin", fields, children) + } +} diff --git a/optd/core/src/ir/operator/logical/subquery.rs b/optd/core/src/ir/operator/logical/subquery.rs new file mode 100644 index 0000000..7f2a6b7 --- /dev/null +++ b/optd/core/src/ir/operator/logical/subquery.rs @@ -0,0 +1,46 @@ +//! The subquery operator is used to represent subqueries in logical plans. + +use crate::ir::{ + IRCommon, Operator, + explain::Explain, + macros::{define_node, impl_operator_conversion}, + properties::OperatorProperties, +}; +use pretty_xmlish::Pretty; +use std::sync::Arc; + +define_node!( + /// Metadata: (none) + /// Scalars: (none) + LogicalSubquery, LogicalSubqueryBorrowed { + properties: OperatorProperties, + metadata: LogicalSubqueryMetadata {}, + inputs: { + operators: [input], + scalars: [], + } + } +); +impl_operator_conversion!(LogicalSubquery, LogicalSubqueryBorrowed); + +impl LogicalSubquery { + pub fn new(input: Arc) -> Self { + Self { + meta: LogicalSubqueryMetadata {}, + common: IRCommon::with_input_operators_only(Arc::new([input])), + } + } +} + +impl Explain for LogicalSubqueryBorrowed<'_> { + fn explain<'a>( + &self, + ctx: &crate::ir::IRContext, + option: &crate::ir::explain::ExplainOption, + ) -> pretty_xmlish::Pretty<'a> { + let mut fields = Vec::new(); + fields.extend(self.common.explain_operator_properties(ctx, option)); + let children = self.common.explain_input_operators(ctx, option); + Pretty::simple_record("LogicalSubquery", fields, children) + } +} diff --git a/optd/core/src/ir/operator/mod.rs b/optd/core/src/ir/operator/mod.rs index 96b61c9..2e14d1a 100644 --- a/optd/core/src/ir/operator/mod.rs +++ b/optd/core/src/ir/operator/mod.rs @@ -21,12 +21,16 @@ use std::sync::Arc; pub use enforcer::sort::*; pub use logical::aggregate::*; +pub use logical::dependent_join::*; pub use logical::get::*; pub use logical::join::{LogicalJoin, LogicalJoinBorrowed, LogicalJoinMetadata}; +pub use logical::mark_join::*; pub use logical::order_by::*; pub use logical::project::*; pub use logical::remap::*; pub use logical::select::*; +pub use logical::single_join::*; +pub use logical::subquery::*; pub use physical::filter::*; pub use physical::hash_aggregate::*; pub use physical::hash_join::*; @@ -51,11 +55,15 @@ pub enum OperatorKind { MockScan(MockScanMetadata), LogicalGet(LogicalGetMetadata), LogicalJoin(LogicalJoinMetadata), + LogicalDependentJoin(LogicalDependentJoinMetadata), + LogicalMarkJoin(LogicalMarkJoinMetadata), + LogicalSingleJoin(LogicalSingleJoinMetadata), LogicalSelect(LogicalSelectMetadata), LogicalProject(LogicalProjectMetadata), LogicalAggregate(LogicalAggregateMetadata), LogicalOrderBy(LogicalOrderByMetadata), LogicalRemap(LogicalRemapMetadata), + LogicalSubquery(LogicalSubqueryMetadata), EnforcerSort(EnforcerSortMetadata), PhysicalTableScan(PhysicalTableScanMetadata), PhysicalNLJoin(PhysicalNLJoinMetadata), @@ -79,8 +87,17 @@ impl OperatorKind { use OperatorKind::*; match self { Group(_) => OperatorCategory::Placeholder, - LogicalGet(_) | LogicalJoin(_) | LogicalProject(_) | LogicalAggregate(_) - | LogicalOrderBy(_) | LogicalRemap(_) | LogicalSelect(_) => OperatorCategory::Logical, + LogicalGet(_) + | LogicalJoin(_) + | LogicalDependentJoin(_) + | LogicalMarkJoin(_) + | LogicalSingleJoin(_) + | LogicalProject(_) + | LogicalAggregate(_) + | LogicalOrderBy(_) + | LogicalRemap(_) + | LogicalSelect(_) + | LogicalSubquery(_) => OperatorCategory::Logical, EnforcerSort(_) => OperatorCategory::Enforcer, PhysicalFilter(_) | PhysicalProject(_) @@ -179,6 +196,15 @@ impl Explain for Operator { OperatorKind::LogicalJoin(meta) => { LogicalJoin::borrow_raw_parts(meta, &self.common).explain(ctx, option) } + OperatorKind::LogicalDependentJoin(meta) => { + LogicalDependentJoin::borrow_raw_parts(meta, &self.common).explain(ctx, option) + } + OperatorKind::LogicalMarkJoin(meta) => { + LogicalMarkJoin::borrow_raw_parts(meta, &self.common).explain(ctx, option) + } + OperatorKind::LogicalSingleJoin(meta) => { + LogicalSingleJoin::borrow_raw_parts(meta, &self.common).explain(ctx, option) + } OperatorKind::LogicalSelect(meta) => { LogicalSelect::borrow_raw_parts(meta, &self.common).explain(ctx, option) } @@ -215,6 +241,9 @@ impl Explain for Operator { OperatorKind::LogicalRemap(meta) => { LogicalRemap::borrow_raw_parts(meta, &self.common).explain(ctx, option) } + OperatorKind::LogicalSubquery(meta) => { + LogicalSubquery::borrow_raw_parts(meta, &self.common).explain(ctx, option) + } } } } diff --git a/optd/core/src/ir/properties/output_columns.rs b/optd/core/src/ir/properties/output_columns.rs index cd01210..c612987 100644 --- a/optd/core/src/ir/properties/output_columns.rs +++ b/optd/core/src/ir/properties/output_columns.rs @@ -4,8 +4,8 @@ use crate::ir::{ Column, ColumnSet, OperatorKind, operator::{ - LogicalAggregate, LogicalGet, LogicalProject, LogicalRemap, PhysicalHashAggregate, - PhysicalProject, PhysicalTableScan, + LogicalAggregate, LogicalGet, LogicalMarkJoin, LogicalProject, LogicalRemap, + PhysicalHashAggregate, PhysicalProject, PhysicalTableScan, }, properties::{Derive, GetProperty, PropertyMarker}, scalar::{ColumnAssign, ColumnRef, List}, @@ -46,12 +46,15 @@ impl Derive for crate::ir::Operator { ) } OperatorKind::LogicalJoin(_) + | OperatorKind::LogicalDependentJoin(_) + | OperatorKind::LogicalSingleJoin(_) | OperatorKind::PhysicalNLJoin(_) | OperatorKind::PhysicalHashJoin(_) | OperatorKind::LogicalSelect(_) | OperatorKind::PhysicalFilter(_) | OperatorKind::LogicalOrderBy(_) - | OperatorKind::EnforcerSort(_) => { + | OperatorKind::EnforcerSort(_) + | OperatorKind::LogicalSubquery(_) => { let set = self.input_operators() .iter() @@ -61,6 +64,16 @@ impl Derive for crate::ir::Operator { }); Arc::new(set) } + OperatorKind::LogicalMarkJoin(meta) => { + let join = LogicalMarkJoin::borrow_raw_parts(meta, &self.common); + let outer_columns = join.outer().output_columns(ctx); + let set = outer_columns + .iter() + .cloned() + .chain(std::iter::once(*join.mark_column())) + .collect(); + Arc::new(set) + } OperatorKind::MockScan(meta) => meta.spec.mocked_output_columns.clone(), OperatorKind::LogicalProject(meta) => { let project = LogicalProject::borrow_raw_parts(meta, &self.common); diff --git a/optd/core/src/ir/properties/output_schema.rs b/optd/core/src/ir/properties/output_schema.rs index 24253ce..ffa3011 100644 --- a/optd/core/src/ir/properties/output_schema.rs +++ b/optd/core/src/ir/properties/output_schema.rs @@ -6,9 +6,10 @@ use crate::ir::{ OperatorKind, catalog::{Field, Schema}, operator::{ - EnforcerSort, LogicalAggregate, LogicalGet, LogicalJoin, LogicalOrderBy, LogicalProject, - LogicalRemap, LogicalSelect, PhysicalFilter, PhysicalHashAggregate, PhysicalHashJoin, - PhysicalNLJoin, PhysicalProject, PhysicalTableScan, + EnforcerSort, LogicalAggregate, LogicalDependentJoin, LogicalGet, LogicalJoin, + LogicalMarkJoin, LogicalOrderBy, LogicalProject, LogicalRemap, LogicalSelect, + LogicalSingleJoin, LogicalSubquery, PhysicalFilter, PhysicalHashAggregate, + PhysicalHashJoin, PhysicalNLJoin, PhysicalProject, PhysicalTableScan, }, properties::{Derive, GetProperty, PropertyMarker}, scalar::{ColumnAssign, List}, @@ -54,6 +55,10 @@ impl Derive for crate::ir::Operator { let select = LogicalSelect::borrow_raw_parts(meta, &self.common); select.input().output_schema(ctx) } + OperatorKind::LogicalSubquery(meta) => { + let subquery = LogicalSubquery::borrow_raw_parts(meta, &self.common); + subquery.input().output_schema(ctx) + } OperatorKind::PhysicalFilter(meta) => { let filter = PhysicalFilter::borrow_raw_parts(meta, &self.common); filter.input().output_schema(ctx) @@ -70,6 +75,47 @@ impl Derive for crate::ir::Operator { .collect_vec(); Some(Schema::new(columns)) } + OperatorKind::LogicalDependentJoin(meta) => { + let join = LogicalDependentJoin::borrow_raw_parts(meta, &self.common); + let columns = join + .outer() + .output_schema(ctx)? + .columns() + .iter() + .chain(join.inner().output_schema(ctx)?.columns().iter()) + .cloned() + .collect_vec(); + Some(Schema::new(columns)) + } + OperatorKind::LogicalMarkJoin(meta) => { + let join = LogicalMarkJoin::borrow_raw_parts(meta, &self.common); + let mut columns = join + .outer() + .output_schema(ctx)? + .columns() + .iter() + .cloned() + .collect_vec(); + let mark_meta = ctx.get_column_meta(join.mark_column()); + columns.push(Arc::new(Field::new( + mark_meta.name.clone(), + mark_meta.data_type, + true, + ))); + Some(Schema::new(columns)) + } + OperatorKind::LogicalSingleJoin(meta) => { + let join = LogicalSingleJoin::borrow_raw_parts(meta, &self.common); + let columns = join + .outer() + .output_schema(ctx)? + .columns() + .iter() + .chain(join.inner().output_schema(ctx)?.columns().iter()) + .cloned() + .collect_vec(); + Some(Schema::new(columns)) + } OperatorKind::PhysicalNLJoin(meta) => { let join = PhysicalNLJoin::borrow_raw_parts(meta, &self.common); let columns = join diff --git a/optd/core/src/ir/properties/tuple_ordering.rs b/optd/core/src/ir/properties/tuple_ordering.rs index f8e3537..ad14686 100644 --- a/optd/core/src/ir/properties/tuple_ordering.rs +++ b/optd/core/src/ir/properties/tuple_ordering.rs @@ -167,10 +167,14 @@ impl crate::ir::properties::TrySatisfy for Operator { OperatorKind::Group(_) => None, OperatorKind::LogicalGet(_) | OperatorKind::LogicalJoin(_) + | OperatorKind::LogicalDependentJoin(_) + | OperatorKind::LogicalMarkJoin(_) + | OperatorKind::LogicalSingleJoin(_) | OperatorKind::LogicalProject(_) | OperatorKind::LogicalSelect(_) | OperatorKind::LogicalOrderBy(_) - | OperatorKind::LogicalAggregate(_) => { + | OperatorKind::LogicalAggregate(_) + | OperatorKind::LogicalSubquery(_) => { assert_eq!(self.kind.category(), OperatorCategory::Logical); None } diff --git a/optd/core/src/magic/card.rs b/optd/core/src/magic/card.rs index e9e8910..be36ef8 100644 --- a/optd/core/src/magic/card.rs +++ b/optd/core/src/magic/card.rs @@ -47,6 +47,41 @@ impl CardinalityEstimator for MagicCardinalityEstimator { let right_card = join.inner().cardinality(ctx); selectivity * left_card * right_card } + OperatorKind::LogicalDependentJoin(meta) => { + let join = LogicalDependentJoin::borrow_raw_parts(meta, &op.common); + let selectivity = if let Ok(literal) = join.join_cond().try_borrow::() { + match literal.value() { + crate::ir::ScalarValue::Boolean(Some(true)) => 1., + crate::ir::ScalarValue::Boolean(_) => 0., + _ => unreachable!("join condition must be boolean"), + } + } else { + Self::MAGIC_JOIN_COND_SELECTIVITY + }; + let left_card = join.outer().cardinality(ctx); + let right_card = join.inner().cardinality(ctx); + selectivity * left_card * right_card + } + OperatorKind::LogicalSingleJoin(meta) => { + let join = LogicalSingleJoin::borrow_raw_parts(meta, &op.common); + let selectivity = if let Ok(literal) = join.join_cond().try_borrow::() { + match literal.value() { + crate::ir::ScalarValue::Boolean(Some(true)) => 1., + crate::ir::ScalarValue::Boolean(_) => 0., + _ => unreachable!("join condition must be boolean"), + } + } else { + Self::MAGIC_JOIN_COND_SELECTIVITY + }; + let left_card = join.outer().cardinality(ctx); + let right_card = join.inner().cardinality(ctx); + (selectivity * left_card * right_card) + .map(|value| value.min(left_card.as_f64())) + } + OperatorKind::LogicalMarkJoin(meta) => { + let join = LogicalMarkJoin::borrow_raw_parts(meta, &op.common); + join.outer().cardinality(ctx) + } OperatorKind::PhysicalNLJoin(meta) => { let join = PhysicalNLJoin::borrow_raw_parts(meta, &op.common); let selectivity = if let Ok(literal) = join.join_cond().try_borrow::() { @@ -101,6 +136,11 @@ impl CardinalityEstimator for MagicCardinalityEstimator { .input() .cardinality(ctx) } + OperatorKind::LogicalSubquery(meta) => { + LogicalSubquery::borrow_raw_parts(meta, &op.common) + .input() + .cardinality(ctx) + } OperatorKind::EnforcerSort(meta) => EnforcerSort::borrow_raw_parts(meta, &op.common) .input() .cardinality(ctx), diff --git a/optd/core/src/magic/cm.rs b/optd/core/src/magic/cm.rs index f32d305..9507777 100644 --- a/optd/core/src/magic/cm.rs +++ b/optd/core/src/magic/cm.rs @@ -21,10 +21,14 @@ impl CostModel for MagicCostModel { OperatorKind::Group(_) => None, OperatorKind::LogicalGet(_) => None, OperatorKind::LogicalJoin(_) => None, + OperatorKind::LogicalDependentJoin(_) => None, + OperatorKind::LogicalMarkJoin(_) => None, + OperatorKind::LogicalSingleJoin(_) => None, OperatorKind::LogicalSelect(_) => None, OperatorKind::LogicalProject(_) => None, OperatorKind::LogicalAggregate(_) => None, OperatorKind::LogicalOrderBy(_) => None, + OperatorKind::LogicalSubquery(_) => None, OperatorKind::LogicalRemap(_) => Some(Cost::UNIT), OperatorKind::EnforcerSort(_) => { let input_card = op.input_operators()[0].cardinality(ctx); diff --git a/tpch.sh b/tpch.sh old mode 100644 new mode 100755