Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust/lance-graph/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ A builder (`CypherQueryBuilder`) is also available for constructing queries prog
- Node patterns `(:Label)` with optional variables.
- Relationship patterns with fixed direction and type, including multi-hop paths.
- Property comparisons against literal values with `AND`/`OR`/`NOT`/`EXISTS`.
- RETURN lists of property accesses, optional `DISTINCT`, and `LIMIT`.
- RETURN lists of property accesses, optional `DISTINCT`, `ORDER BY`, `SKIP` (offset), and `LIMIT`.
- Positional and named parameters (e.g. `$min_age`).

Features such as ORDER BY, aggregations, optional matches, and subqueries are parsed but not executed yet.
Features such as aggregations, optional matches, and subqueries are parsed but not executed yet.

## Crate Layout

Expand Down
2 changes: 2 additions & 0 deletions rust/lance-graph/src/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub struct CypherQuery {
pub limit: Option<u64>,
/// ORDER BY clause (optional)
pub order_by: Option<OrderByClause>,
/// SKIP/OFFSET clause (optional)
pub skip: Option<u64>,
}

impl CypherQuery {
Expand Down
8 changes: 8 additions & 0 deletions rust/lance-graph/src/datafusion_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ impl DataFusionPlanner {
.build()
.unwrap())
}
LogicalOperator::Offset { input, offset } => {
let input_plan = self.plan_operator_with_ctx(input, var_labels)?;
Ok(LogicalPlanBuilder::from(input_plan)
.limit((*offset) as usize, None)
.unwrap()
.build()
.unwrap())
}
LogicalOperator::Expand {
input,
source_variable,
Expand Down
65 changes: 65 additions & 0 deletions rust/lance-graph/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ pub enum LogicalOperator {
sort_items: Vec<SortItem>,
},

/// Apply SKIP/OFFSET
Offset {
input: Box<LogicalOperator>,
offset: u64,
},

/// Apply LIMIT
Limit {
input: Box<LogicalOperator>,
Expand Down Expand Up @@ -147,6 +153,14 @@ impl LogicalPlanner {
};
}

// Apply SKIP/OFFSET if present
if let Some(skip) = query.skip {
plan = LogicalOperator::Offset {
input: Box::new(plan),
offset: skip,
};
}

// Apply LIMIT if present
if let Some(limit) = query.limit {
plan = LogicalOperator::Limit {
Expand Down Expand Up @@ -338,6 +352,7 @@ impl LogicalPlanner {
LogicalOperator::Project { input, .. } => self.extract_variable_from_plan(input),
LogicalOperator::Distinct { input } => self.extract_variable_from_plan(input),
LogicalOperator::Sort { input, .. } => self.extract_variable_from_plan(input),
LogicalOperator::Offset { input, .. } => self.extract_variable_from_plan(input),
LogicalOperator::Limit { input, .. } => self.extract_variable_from_plan(input),
LogicalOperator::Join { left, right, .. } => {
// Prefer the right branch's tail variable, else fall back to left
Expand Down Expand Up @@ -796,6 +811,56 @@ mod tests {
}
}

#[test]
fn test_order_skip_limit_wrapping() {
// ORDER BY + SKIP + LIMIT should be Limit(Offset(Sort(Project(..))))
let q = "MATCH (n:Person) RETURN n.name ORDER BY n.name SKIP 5 LIMIT 10";
let ast = parse_cypher_query(q).unwrap();
let mut planner = LogicalPlanner::new();
let logical = planner.plan(&ast).unwrap();
match logical {
LogicalOperator::Limit { input, count } => {
assert_eq!(count, 10);
match *input {
LogicalOperator::Offset {
input: inner,
offset,
} => {
assert_eq!(offset, 5);
match *inner {
LogicalOperator::Sort { input: inner2, .. } => match *inner2 {
LogicalOperator::Project { .. } => {}
_ => panic!("Expected Project under Sort"),
},
_ => panic!("Expected Sort under Offset"),
}
}
_ => panic!("Expected Offset under Limit"),
}
}
_ => panic!("Expected Limit at top level"),
}
}

#[test]
fn test_skip_only_wrapping() {
// SKIP only should be Offset(Project(..))
let q = "MATCH (n:Person) RETURN n.name SKIP 3";
let ast = parse_cypher_query(q).unwrap();
let mut planner = LogicalPlanner::new();
let logical = planner.plan(&ast).unwrap();
match logical {
LogicalOperator::Offset { input, offset } => {
assert_eq!(offset, 3);
match *input {
LogicalOperator::Project { .. } => {}
_ => panic!("Expected Project under Offset"),
}
}
_ => panic!("Expected Offset at top level"),
}
}

#[test]
fn test_relationship_properties_pushed_into_expand() {
let q = "MATCH (a)-[:KNOWS {since: 2020}]->(b) RETURN b";
Expand Down
83 changes: 82 additions & 1 deletion rust/lance-graph/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn cypher_query(input: &str) -> IResult<&str, CypherQuery> {
let (input, where_clause) = opt(where_clause)(input)?;
let (input, return_clause) = return_clause(input)?;
let (input, order_by) = opt(order_by_clause)(input)?;
let (input, limit) = opt(limit_clause)(input)?;
let (input, (skip, limit)) = pagination_clauses(input)?;
let (input, _) = multispace0(input)?;

Ok((
Expand All @@ -56,6 +56,7 @@ fn cypher_query(input: &str) -> IResult<&str, CypherQuery> {
return_clause,
limit,
order_by,
skip,
},
))
}
Expand Down Expand Up @@ -389,6 +390,49 @@ fn limit_clause(input: &str) -> IResult<&str, u64> {
Ok((input, limit as u64))
}

// Parse a SKIP clause
fn skip_clause(input: &str) -> IResult<&str, u64> {
let (input, _) = multispace0(input)?;
let (input, _) = tag_no_case("SKIP")(input)?;
let (input, _) = multispace1(input)?;
let (input, skip) = integer_literal(input)?;

Ok((input, skip as u64))
}

// Parse pagination clauses (SKIP and LIMIT)
fn pagination_clauses(input: &str) -> IResult<&str, (Option<u64>, Option<u64>)> {
let (mut remaining, _) = multispace0(input)?;
let mut skip: Option<u64> = None;
let mut limit: Option<u64> = None;

loop {
let before = remaining;

if skip.is_none() {
if let Ok((i, s)) = skip_clause(remaining) {
skip = Some(s);
remaining = i;
continue;
}
}

if limit.is_none() {
if let Ok((i, l)) = limit_clause(remaining) {
limit = Some(l);
remaining = i;
continue;
}
}

if before == remaining {
break;
}
}

Ok((remaining, (skip, limit)))
}

// Helper parsers

// Parse an identifier
Expand Down Expand Up @@ -572,4 +616,41 @@ mod tests {

assert_eq!(result.limit, Some(10));
}

#[test]
fn test_parse_query_with_skip() {
let query = "MATCH (n:Person) RETURN n.name SKIP 5";
let result = parse_cypher_query(query).unwrap();

assert_eq!(result.skip, Some(5));
assert_eq!(result.limit, None);
}

#[test]
fn test_parse_query_with_skip_and_limit() {
let query = "MATCH (n:Person) RETURN n.name SKIP 5 LIMIT 10";
let result = parse_cypher_query(query).unwrap();

assert_eq!(result.skip, Some(5));
assert_eq!(result.limit, Some(10));
}

#[test]
fn test_parse_query_with_skip_and_order_by() {
let query = "MATCH (n:Person) RETURN n.name ORDER BY n.age SKIP 5";
let result = parse_cypher_query(query).unwrap();

assert_eq!(result.skip, Some(5));
assert!(result.order_by.is_some());
}

#[test]
fn test_parse_query_with_skip_order_by_and_limit() {
let query = "MATCH (n:Person) RETURN n.name ORDER BY n.age SKIP 5 LIMIT 10";
let result = parse_cypher_query(query).unwrap();

assert_eq!(result.skip, Some(5));
assert_eq!(result.limit, Some(10));
assert!(result.order_by.is_some());
}
}
Loading
Loading