Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions optd/core/src/ir/column/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ impl ColumnSet {
pub fn with_capacity(n: usize) -> Self {
Self(HashSet::with_capacity(n))
}

pub fn iter(&self) -> impl Iterator<Item = &Column> {
self.0.iter()
}

pub fn contains(&self, column: &Column) -> bool {
self.0.contains(column)
}
Expand Down
50 changes: 50 additions & 0 deletions optd/core/src/ir/operator/logical/dependent_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//! The dependent join operator joins two input relations where the inner
//! relation may depend on columns from the outer relation, typically for
//! correlated subqueries.

use crate::ir::{
IRCommon, Operator, Scalar,
explain::Explain,
macros::{define_node, impl_operator_conversion},
properties::OperatorProperties,
};
use pretty_xmlish::Pretty;
use std::sync::Arc;

define_node!(
/// Metadata: (none)
/// Scalars:
/// - join_cond: The join conditions to join on
LogicalDependentJoin, LogicalDependentJoinBorrowed {
properties: OperatorProperties,
metadata: LogicalDependentJoinMetadata {},
inputs: {
operators: [outer, inner],
scalars: [join_cond],
}
}
);
impl_operator_conversion!(LogicalDependentJoin, LogicalDependentJoinBorrowed);

impl LogicalDependentJoin {
pub fn new(outer: Arc<Operator>, inner: Arc<Operator>, join_cond: Arc<Scalar>) -> Self {
Self {
meta: LogicalDependentJoinMetadata {},
common: IRCommon::new(Arc::new([outer, inner]), Arc::new([join_cond])),
}
}
}

impl Explain for LogicalDependentJoinBorrowed<'_> {
fn explain<'a>(
&self,
ctx: &crate::ir::IRContext,
option: &crate::ir::explain::ExplainOption,
) -> pretty_xmlish::Pretty<'a> {
let mut fields = Vec::with_capacity(2);
fields.push((".join_cond", self.join_cond().explain(ctx, option)));
fields.extend(self.common.explain_operator_properties(ctx, option));
let children = self.common.explain_input_operators(ctx, option);
Pretty::simple_record("LogicalDependentJoin", fields, children)
}
}
58 changes: 58 additions & 0 deletions optd/core/src/ir/operator/logical/mark_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//! The mark join operator evaluates a subquery and produces a marker column on
//! the outer relation indicating whether a match exists.

use crate::ir::{
Column, IRCommon, Operator, Scalar,
explain::Explain,
macros::{define_node, impl_operator_conversion},
properties::OperatorProperties,
};
use pretty_xmlish::Pretty;
use std::sync::Arc;

define_node!(
/// Metadata:
/// - mark_column: The column used to store the marker output.
/// Scalars:
/// - join_cond: The join conditions to join on
LogicalMarkJoin, LogicalMarkJoinBorrowed {
properties: OperatorProperties,
metadata: LogicalMarkJoinMetadata {
mark_column: Column,
},
inputs: {
operators: [outer, inner],
scalars: [join_cond],
}
}
);
impl_operator_conversion!(LogicalMarkJoin, LogicalMarkJoinBorrowed);

impl LogicalMarkJoin {
pub fn new(
mark_column: Column,
outer: Arc<Operator>,
inner: Arc<Operator>,
join_cond: Arc<Scalar>,
) -> Self {
Self {
meta: LogicalMarkJoinMetadata { mark_column },
common: IRCommon::new(Arc::new([outer, inner]), Arc::new([join_cond])),
}
}
}

impl Explain for LogicalMarkJoinBorrowed<'_> {
fn explain<'a>(
&self,
ctx: &crate::ir::IRContext,
option: &crate::ir::explain::ExplainOption,
) -> pretty_xmlish::Pretty<'a> {
let mut fields = Vec::with_capacity(3);
fields.push((".mark_column", Pretty::debug(self.mark_column())));
fields.push((".join_cond", self.join_cond().explain(ctx, option)));
fields.extend(self.common.explain_operator_properties(ctx, option));
let children = self.common.explain_input_operators(ctx, option);
Pretty::simple_record("LogicalMarkJoin", fields, children)
}
}
4 changes: 4 additions & 0 deletions optd/core/src/ir/operator/logical/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
//! aggregating data.

pub mod aggregate;
pub mod dependent_join;
pub mod get;
pub mod join;
pub mod mark_join;
pub mod order_by;
pub mod project;
pub mod remap;
pub mod select;
pub mod single_join;
pub mod subquery;
49 changes: 49 additions & 0 deletions optd/core/src/ir/operator/logical/single_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//! The single join operator joins two relations with the guarantee that the
//! inner side produces at most one matching row per outer row.

use crate::ir::{
IRCommon, Operator, Scalar,
explain::Explain,
macros::{define_node, impl_operator_conversion},
properties::OperatorProperties,
};
use pretty_xmlish::Pretty;
use std::sync::Arc;

define_node!(
/// Metadata: (none)
/// Scalars:
/// - join_cond: The join conditions to join on
LogicalSingleJoin, LogicalSingleJoinBorrowed {
properties: OperatorProperties,
metadata: LogicalSingleJoinMetadata {},
inputs: {
operators: [outer, inner],
scalars: [join_cond],
}
}
);
impl_operator_conversion!(LogicalSingleJoin, LogicalSingleJoinBorrowed);

impl LogicalSingleJoin {
pub fn new(outer: Arc<Operator>, inner: Arc<Operator>, join_cond: Arc<Scalar>) -> Self {
Self {
meta: LogicalSingleJoinMetadata {},
common: IRCommon::new(Arc::new([outer, inner]), Arc::new([join_cond])),
}
}
}

impl Explain for LogicalSingleJoinBorrowed<'_> {
fn explain<'a>(
&self,
ctx: &crate::ir::IRContext,
option: &crate::ir::explain::ExplainOption,
) -> pretty_xmlish::Pretty<'a> {
let mut fields = Vec::with_capacity(2);
fields.push((".join_cond", self.join_cond().explain(ctx, option)));
fields.extend(self.common.explain_operator_properties(ctx, option));
let children = self.common.explain_input_operators(ctx, option);
Pretty::simple_record("LogicalSingleJoin", fields, children)
}
}
46 changes: 46 additions & 0 deletions optd/core/src/ir/operator/logical/subquery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//! The subquery operator is used to represent subqueries in logical plans.

use crate::ir::{
IRCommon, Operator,
explain::Explain,
macros::{define_node, impl_operator_conversion},
properties::OperatorProperties,
};
use pretty_xmlish::Pretty;
use std::sync::Arc;

define_node!(
/// Metadata: (none)
/// Scalars: (none)
LogicalSubquery, LogicalSubqueryBorrowed {
properties: OperatorProperties,
metadata: LogicalSubqueryMetadata {},
inputs: {
operators: [input],
scalars: [],
}
}
);
impl_operator_conversion!(LogicalSubquery, LogicalSubqueryBorrowed);

impl LogicalSubquery {
pub fn new(input: Arc<Operator>) -> Self {
Self {
meta: LogicalSubqueryMetadata {},
common: IRCommon::with_input_operators_only(Arc::new([input])),
}
}
}

impl Explain for LogicalSubqueryBorrowed<'_> {
fn explain<'a>(
&self,
ctx: &crate::ir::IRContext,
option: &crate::ir::explain::ExplainOption,
) -> pretty_xmlish::Pretty<'a> {
let mut fields = Vec::new();
fields.extend(self.common.explain_operator_properties(ctx, option));
let children = self.common.explain_input_operators(ctx, option);
Pretty::simple_record("LogicalSubquery", fields, children)
}
}
33 changes: 31 additions & 2 deletions optd/core/src/ir/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ use std::sync::Arc;

pub use enforcer::sort::*;
pub use logical::aggregate::*;
pub use logical::dependent_join::*;
pub use logical::get::*;
pub use logical::join::{LogicalJoin, LogicalJoinBorrowed, LogicalJoinMetadata};
pub use logical::mark_join::*;
pub use logical::order_by::*;
pub use logical::project::*;
pub use logical::remap::*;
pub use logical::select::*;
pub use logical::single_join::*;
pub use logical::subquery::*;
pub use physical::filter::*;
pub use physical::hash_aggregate::*;
pub use physical::hash_join::*;
Expand All @@ -51,11 +55,15 @@ pub enum OperatorKind {
MockScan(MockScanMetadata),
LogicalGet(LogicalGetMetadata),
LogicalJoin(LogicalJoinMetadata),
LogicalDependentJoin(LogicalDependentJoinMetadata),
LogicalMarkJoin(LogicalMarkJoinMetadata),
LogicalSingleJoin(LogicalSingleJoinMetadata),
LogicalSelect(LogicalSelectMetadata),
LogicalProject(LogicalProjectMetadata),
LogicalAggregate(LogicalAggregateMetadata),
LogicalOrderBy(LogicalOrderByMetadata),
LogicalRemap(LogicalRemapMetadata),
LogicalSubquery(LogicalSubqueryMetadata),
EnforcerSort(EnforcerSortMetadata),
PhysicalTableScan(PhysicalTableScanMetadata),
PhysicalNLJoin(PhysicalNLJoinMetadata),
Expand All @@ -79,8 +87,17 @@ impl OperatorKind {
use OperatorKind::*;
match self {
Group(_) => OperatorCategory::Placeholder,
LogicalGet(_) | LogicalJoin(_) | LogicalProject(_) | LogicalAggregate(_)
| LogicalOrderBy(_) | LogicalRemap(_) | LogicalSelect(_) => OperatorCategory::Logical,
LogicalGet(_)
| LogicalJoin(_)
| LogicalDependentJoin(_)
| LogicalMarkJoin(_)
| LogicalSingleJoin(_)
| LogicalProject(_)
| LogicalAggregate(_)
| LogicalOrderBy(_)
| LogicalRemap(_)
| LogicalSelect(_)
| LogicalSubquery(_) => OperatorCategory::Logical,
EnforcerSort(_) => OperatorCategory::Enforcer,
PhysicalFilter(_)
| PhysicalProject(_)
Expand Down Expand Up @@ -179,6 +196,15 @@ impl Explain for Operator {
OperatorKind::LogicalJoin(meta) => {
LogicalJoin::borrow_raw_parts(meta, &self.common).explain(ctx, option)
}
OperatorKind::LogicalDependentJoin(meta) => {
LogicalDependentJoin::borrow_raw_parts(meta, &self.common).explain(ctx, option)
}
OperatorKind::LogicalMarkJoin(meta) => {
LogicalMarkJoin::borrow_raw_parts(meta, &self.common).explain(ctx, option)
}
OperatorKind::LogicalSingleJoin(meta) => {
LogicalSingleJoin::borrow_raw_parts(meta, &self.common).explain(ctx, option)
}
OperatorKind::LogicalSelect(meta) => {
LogicalSelect::borrow_raw_parts(meta, &self.common).explain(ctx, option)
}
Expand Down Expand Up @@ -215,6 +241,9 @@ impl Explain for Operator {
OperatorKind::LogicalRemap(meta) => {
LogicalRemap::borrow_raw_parts(meta, &self.common).explain(ctx, option)
}
OperatorKind::LogicalSubquery(meta) => {
LogicalSubquery::borrow_raw_parts(meta, &self.common).explain(ctx, option)
}
}
}
}
19 changes: 16 additions & 3 deletions optd/core/src/ir/properties/output_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
use crate::ir::{
Column, ColumnSet, OperatorKind,
operator::{
LogicalAggregate, LogicalGet, LogicalProject, LogicalRemap, PhysicalHashAggregate,
PhysicalProject, PhysicalTableScan,
LogicalAggregate, LogicalGet, LogicalMarkJoin, LogicalProject, LogicalRemap,
PhysicalHashAggregate, PhysicalProject, PhysicalTableScan,
},
properties::{Derive, GetProperty, PropertyMarker},
scalar::{ColumnAssign, ColumnRef, List},
Expand Down Expand Up @@ -46,12 +46,15 @@ impl Derive<OutputColumns> for crate::ir::Operator {
)
}
OperatorKind::LogicalJoin(_)
| OperatorKind::LogicalDependentJoin(_)
| OperatorKind::LogicalSingleJoin(_)
| OperatorKind::PhysicalNLJoin(_)
| OperatorKind::PhysicalHashJoin(_)
| OperatorKind::LogicalSelect(_)
| OperatorKind::PhysicalFilter(_)
| OperatorKind::LogicalOrderBy(_)
| OperatorKind::EnforcerSort(_) => {
| OperatorKind::EnforcerSort(_)
| OperatorKind::LogicalSubquery(_) => {
let set =
self.input_operators()
.iter()
Expand All @@ -61,6 +64,16 @@ impl Derive<OutputColumns> for crate::ir::Operator {
});
Arc::new(set)
}
OperatorKind::LogicalMarkJoin(meta) => {
let join = LogicalMarkJoin::borrow_raw_parts(meta, &self.common);
let outer_columns = join.outer().output_columns(ctx);
let set = outer_columns
.iter()
.cloned()
.chain(std::iter::once(*join.mark_column()))
.collect();
Arc::new(set)
}
OperatorKind::MockScan(meta) => meta.spec.mocked_output_columns.clone(),
OperatorKind::LogicalProject(meta) => {
let project = LogicalProject::borrow_raw_parts(meta, &self.common);
Expand Down
Loading
Loading