Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/lance-graph/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/lance-graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ datafusion = { version = "49.0.2", default-features = false, features = [
datafusion-common = "49.0.2"
datafusion-expr = "49.0.2"
datafusion-sql = "49.0.2"
datafusion-functions-aggregate = "49.0.2"
lance-core = "0.37.0"
nom = "7.1"
serde = { version = "1", features = ["derive"] }
Expand Down
217 changes: 172 additions & 45 deletions rust/lance-graph/src/datafusion_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::source_catalog::GraphSourceCatalog;
use datafusion::logical_expr::{
col, lit, BinaryExpr, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion_functions_aggregate::count::count;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

Expand Down Expand Up @@ -359,7 +360,7 @@ impl DataFusionPlanner {
} => self.build_scan(ctx, variable, label, properties),
LogicalOperator::Filter { input, predicate } => {
let input_plan = self.build_operator(ctx, input)?;
let expr = self.to_df_boolean_expr(predicate);
let expr = Self::to_df_boolean_expr(predicate);
LogicalPlanBuilder::from(input_plan)
.filter(expr)
.map_err(|e| self.plan_error("Failed to build filter", e))?
Expand All @@ -368,25 +369,88 @@ impl DataFusionPlanner {
}
LogicalOperator::Project { input, projections } => {
let input_plan = self.build_operator(ctx, input)?;
let exprs: Vec<Expr> = projections

// Check if any projection contains an aggregate function
let has_aggregates = projections
.iter()
.map(|p| {
let expr = self.to_df_value_expr(&p.expression);
// Apply alias if provided, otherwise use Cypher dot notation
if let Some(alias) = &p.alias {
expr.alias(alias)
.any(|p| Self::contains_aggregate(&p.expression));

if has_aggregates {
// Build aggregate plan
// Separate group expressions (non-aggregates) from aggregate expressions
let mut group_exprs = Vec::new();
let mut agg_exprs = Vec::new();
// Store computed aliases for aggregates to reuse in final projection
let mut agg_aliases = Vec::new();

for p in projections {
let expr = Self::to_df_value_expr(&p.expression);

if Self::contains_aggregate(&p.expression) {
// Aggregate expressions get aliased
let alias = if let Some(alias) = &p.alias {
alias.clone()
} else {
self.to_cypher_column_name(&p.expression)
};
agg_exprs.push(expr.alias(&alias));
agg_aliases.push(alias);
} else {
// Convert to Cypher dot notation (e.g., p__name -> p.name)
let cypher_name = self.to_cypher_column_name(&p.expression);
expr.alias(cypher_name)
// Group expressions: use raw expression for grouping, no alias
group_exprs.push(expr);
}
})
.collect();
LogicalPlanBuilder::from(input_plan)
.project(exprs)
.map_err(|e| self.plan_error("Failed to build projection", e))?
.build()
.map_err(|e| self.plan_error("Failed to build plan", e))
}

// After aggregation, add a projection to apply aliases to group columns
let mut final_projection = Vec::new();
let mut agg_idx = 0;
for p in projections {
if !Self::contains_aggregate(&p.expression) {
// Re-create the expression and apply alias
let expr = Self::to_df_value_expr(&p.expression);
let aliased = if let Some(alias) = &p.alias {
expr.alias(alias)
} else {
let cypher_name = self.to_cypher_column_name(&p.expression);
expr.alias(cypher_name)
};
final_projection.push(aliased);
} else {
// For aggregates, reference the column using the same alias we computed earlier
final_projection.push(col(&agg_aliases[agg_idx]));
agg_idx += 1;
}
}

LogicalPlanBuilder::from(input_plan)
.aggregate(group_exprs, agg_exprs)
.map_err(|e| self.plan_error("Failed to build aggregate", e))?
.project(final_projection)
.map_err(|e| self.plan_error("Failed to project after aggregate", e))?
.build()
.map_err(|e| self.plan_error("Failed to build plan", e))
} else {
// Regular projection
let exprs: Vec<Expr> = projections
.iter()
.map(|p| {
let expr = Self::to_df_value_expr(&p.expression);
// Apply alias if provided, otherwise use Cypher dot notation
if let Some(alias) = &p.alias {
expr.alias(alias)
} else {
// Convert to Cypher dot notation (e.g., p__name -> p.name)
let cypher_name = self.to_cypher_column_name(&p.expression);
expr.alias(cypher_name)
}
})
.collect();
LogicalPlanBuilder::from(input_plan)
.project(exprs)
.map_err(|e| self.plan_error("Failed to build projection", e))?
.build()
.map_err(|e| self.plan_error("Failed to build plan", e))
}
}
LogicalOperator::Distinct { input } => {
let input_plan = self.build_operator(ctx, input)?;
Expand All @@ -405,7 +469,7 @@ impl DataFusionPlanner {
let sort_exprs: Vec<SortExpr> = sort_items
.iter()
.map(|item| {
let expr = self.to_df_value_expr(&item.expression);
let expr = Self::to_df_value_expr(&item.expression);
let asc = matches!(item.direction, crate::ast::SortDirection::Ascending);
SortExpr {
expr,
Expand Down Expand Up @@ -513,8 +577,9 @@ impl DataFusionPlanner {
let filter_exprs: Vec<Expr> = properties
.iter()
.map(|(k, v)| {
let lit_expr = self
.to_df_value_expr(&crate::ast::ValueExpression::Literal(v.clone()));
let lit_expr = Self::to_df_value_expr(
&crate::ast::ValueExpression::Literal(v.clone()),
);
Expr::BinaryExpr(BinaryExpr {
left: Box::new(col(k)),
op: Operator::Eq,
Expand Down Expand Up @@ -683,7 +748,7 @@ impl DataFusionPlanner {

// Apply relationship property filters (e.g., -[r {since: 2020}]->)
for (k, v) in relationship_properties.iter() {
let lit_expr = self.to_df_value_expr(&crate::ast::ValueExpression::Literal(v.clone()));
let lit_expr = Self::to_df_value_expr(&crate::ast::ValueExpression::Literal(v.clone()));
let filter_expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(col(k)),
op: Operator::Eq,
Expand Down Expand Up @@ -776,7 +841,7 @@ impl DataFusionPlanner {

// Apply target property filters (e.g., (b {age: 30}))
for (k, v) in params.target_properties.iter() {
let lit_expr = self.to_df_value_expr(&crate::ast::ValueExpression::Literal(v.clone()));
let lit_expr = Self::to_df_value_expr(&crate::ast::ValueExpression::Literal(v.clone()));
let filter_expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(col(k)),
op: Operator::Eq,
Expand Down Expand Up @@ -1313,7 +1378,7 @@ impl DataFusionPlanner {

// Apply target property filters
for (k, v) in target_properties.iter() {
let lit_expr = self.to_df_value_expr(&crate::ast::ValueExpression::Literal(v.clone()));
let lit_expr = Self::to_df_value_expr(&crate::ast::ValueExpression::Literal(v.clone()));
let filter_expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(col(k)),
op: Operator::Eq,
Expand Down Expand Up @@ -1382,16 +1447,16 @@ impl DataFusionPlanner {
// Expression Translators
// ============================================================================

fn to_df_boolean_expr(&self, expr: &crate::ast::BooleanExpression) -> Expr {
fn to_df_boolean_expr(expr: &crate::ast::BooleanExpression) -> Expr {
use crate::ast::{BooleanExpression as BE, ComparisonOperator as CO};
match expr {
BE::Comparison {
left,
operator,
right,
} => {
let l = self.to_df_value_expr(left);
let r = self.to_df_value_expr(right);
let l = Self::to_df_value_expr(left);
let r = Self::to_df_value_expr(right);
let op = match operator {
CO::Equal => Operator::Eq,
CO::NotEqual => Operator::NotEq,
Expand All @@ -1408,32 +1473,30 @@ impl DataFusionPlanner {
}
BE::In { expression, list } => {
use datafusion::logical_expr::expr::InList as DFInList;
let expr = self.to_df_value_expr(expression);
let list_exprs = list
.iter()
.map(|item| self.to_df_value_expr(item))
.collect::<Vec<_>>();
let expr = Self::to_df_value_expr(expression);
let list_exprs = list.iter().map(Self::to_df_value_expr).collect::<Vec<_>>();
Expr::InList(DFInList::new(Box::new(expr), list_exprs, false))
}
BE::And(l, r) => Expr::BinaryExpr(BinaryExpr {
left: Box::new(self.to_df_boolean_expr(l)),
left: Box::new(Self::to_df_boolean_expr(l)),
op: Operator::And,
right: Box::new(self.to_df_boolean_expr(r)),
right: Box::new(Self::to_df_boolean_expr(r)),
}),
BE::Or(l, r) => Expr::BinaryExpr(BinaryExpr {
left: Box::new(self.to_df_boolean_expr(l)),
left: Box::new(Self::to_df_boolean_expr(l)),
op: Operator::Or,
right: Box::new(self.to_df_boolean_expr(r)),
right: Box::new(Self::to_df_boolean_expr(r)),
}),
BE::Not(inner) => Expr::Not(Box::new(self.to_df_boolean_expr(inner))),
BE::Exists(prop) => Expr::IsNotNull(Box::new(
self.to_df_value_expr(&crate::ast::ValueExpression::Property(prop.clone())),
)),
BE::Not(inner) => Expr::Not(Box::new(Self::to_df_boolean_expr(inner))),
BE::Exists(prop) => Expr::IsNotNull(Box::new(Self::to_df_value_expr(
&crate::ast::ValueExpression::Property(prop.clone()),
))),
_ => lit(true),
}
}

fn to_df_value_expr(&self, expr: &crate::ast::ValueExpression) -> Expr {
/// Convert ValueExpression to DataFusion Expr
fn to_df_value_expr(expr: &crate::ast::ValueExpression) -> Expr {
use crate::ast::{PropertyValue as PV, ValueExpression as VE};
match expr {
VE::Property(prop) => {
Expand All @@ -1455,14 +1518,64 @@ impl DataFusionPlanner {
let qualified_name = format!("{}__{}", prop.variable, prop.property);
col(&qualified_name)
}
VE::Function { .. } | VE::Arithmetic { .. } => lit(0),
VE::Function { name, args } => {
// Handle aggregation functions
match name.to_lowercase().as_str() {
"count" => {
if args.len() == 1 {
// Check for COUNT(*)
let arg_expr = if let VE::Variable(v) = &args[0] {
if v == "*" {
lit(1)
} else {
Self::to_df_value_expr(&args[0])
}
} else {
Self::to_df_value_expr(&args[0])
};

// Use DataFusion's count helper function
count(arg_expr)
} else {
// Invalid argument count - return placeholder
lit(0)
}
}
_ => {
// Unsupported function - return placeholder for now
lit(0)
}
}
}
VE::Arithmetic { .. } => lit(0),
}
}

/// Check if a ValueExpression contains an aggregate function
fn contains_aggregate(expr: &crate::ast::ValueExpression) -> bool {
use crate::ast::ValueExpression as VE;
match expr {
VE::Function { name, args } => {
// Check if this is an aggregate function
let is_aggregate = matches!(
name.to_lowercase().as_str(),
"count" | "sum" | "avg" | "min" | "max"
);
// Also check arguments recursively
is_aggregate || args.iter().any(Self::contains_aggregate)
}
VE::Arithmetic { left, right, .. } => {
Self::contains_aggregate(left) || Self::contains_aggregate(right)
}
_ => false,
}
}

/// Convert a ValueExpression to Cypher dot notation for column naming
///
/// This generates user-friendly column names following Cypher conventions:
/// - Property references: `p.name` (variable.property)
/// - Functions: `function_name(arg)` with simplified argument representation
/// - Other expressions: Use the expression as-is
///
/// This is used when no explicit alias is provided in RETURN clauses.
Expand All @@ -1478,8 +1591,24 @@ impl DataFusionPlanner {
// Handle nested property references
format!("{}.{}", prop.variable, prop.property)
}
VE::Function { name, args } => {
// Generate descriptive function name: count(*), count(p.name), etc.
if args.len() == 1 {
let arg_repr = match &args[0] {
VE::Variable(v) => v.clone(),
VE::Property(prop) => format!("{}.{}", prop.variable, prop.property),
_ => "expr".to_string(),
};
format!("{}({})", name.to_lowercase(), arg_repr)
} else if args.is_empty() {
format!("{}()", name.to_lowercase())
} else {
// Multiple args - just use function name
name.to_lowercase()
}
}
_ => {
// For other expressions (literals, functions), use a generic name
// For other expressions (literals, arithmetic), use a generic name
// In practice, these should always have explicit aliases
"expr".to_string()
}
Expand Down Expand Up @@ -1522,8 +1651,6 @@ mod tests {

#[test]
fn test_df_boolean_expr_in_list() {
let cfg = crate::config::GraphConfig::builder().build().unwrap();
let planner = DataFusionPlanner::new(cfg);
let expr = BooleanExpression::In {
expression: ValueExpression::Property(PropertyRef {
variable: "rel".into(),
Expand All @@ -1535,7 +1662,7 @@ mod tests {
],
};

if let Expr::InList(in_list) = planner.to_df_boolean_expr(&expr) {
if let Expr::InList(in_list) = DataFusionPlanner::to_df_boolean_expr(&expr) {
assert!(!in_list.negated);
assert_eq!(in_list.list.len(), 2);
match *in_list.expr {
Expand Down
Loading
Loading