Skip to content

Commit 7dbfee5

Browse files
committed
unify the execute api with an execution strategy
1 parent 2e0bcae commit 7dbfee5

File tree

4 files changed

+134
-31
lines changed

4 files changed

+134
-31
lines changed

python/src/graph.rs

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use arrow::ffi_stream::ArrowArrayStreamReader;
2222
use arrow_array::{RecordBatch, RecordBatchReader};
2323
use arrow_schema::Schema;
2424
use lance_graph::{
25-
CypherQuery as RustCypherQuery, GraphConfig as RustGraphConfig, GraphError as RustGraphError,
25+
ExecutionStrategy as RustExecutionStrategy, CypherQuery as RustCypherQuery,
26+
GraphConfig as RustGraphConfig, GraphError as RustGraphError,
2627
};
2728
use pyo3::{
2829
exceptions::{PyNotImplementedError, PyRuntimeError, PyValueError},
@@ -34,6 +35,28 @@ use serde_json::Value as JsonValue;
3435

3536
use crate::RT;
3637

38+
/// Execution strategy for Cypher queries
39+
#[pyclass(name = "ExecutionStrategy", module = "lance.graph")]
40+
#[derive(Clone, Copy)]
41+
pub enum ExecutionStrategy {
42+
/// Use DataFusion query planner (default, full feature support)
43+
DataFusion,
44+
/// Use simple single-table executor (legacy, limited features)
45+
Simple,
46+
/// Use Lance native executor (not yet implemented)
47+
LanceNative,
48+
}
49+
50+
impl From<ExecutionStrategy> for RustExecutionStrategy {
51+
fn from(strategy: ExecutionStrategy) -> Self {
52+
match strategy {
53+
ExecutionStrategy::DataFusion => RustExecutionStrategy::DataFusion,
54+
ExecutionStrategy::Simple => RustExecutionStrategy::Simple,
55+
ExecutionStrategy::LanceNative => RustExecutionStrategy::LanceNative,
56+
}
57+
}
58+
}
59+
3760
/// Convert GraphError to PyErr
3861
fn graph_error_to_pyerr(err: RustGraphError) -> PyErr {
3962
match &err {
@@ -267,6 +290,8 @@ impl CypherQuery {
267290
/// ----------
268291
/// datasets : dict
269292
/// Dictionary mapping table names to Lance datasets
293+
/// strategy : ExecutionStrategy, optional
294+
/// Execution strategy to use (defaults to DataFusion)
270295
///
271296
/// Returns
272297
/// -------
@@ -277,16 +302,34 @@ impl CypherQuery {
277302
/// ------
278303
/// RuntimeError
279304
/// If query execution fails
280-
fn execute(&self, py: Python, datasets: &Bound<'_, PyDict>) -> PyResult<PyObject> {
281-
// Convert datasets to Arrow batches while holding the GIL - same as before
305+
///
306+
/// Examples
307+
/// --------
308+
/// >>> # Default strategy (DataFusion)
309+
/// >>> result = query.execute(datasets)
310+
///
311+
/// >>> # Explicit strategy
312+
/// >>> from lance.graph import ExecutionStrategy
313+
/// >>> result = query.execute(datasets, strategy=ExecutionStrategy.Simple)
314+
#[pyo3(signature = (datasets, strategy=None))]
315+
fn execute(
316+
&self,
317+
py: Python,
318+
datasets: &Bound<'_, PyDict>,
319+
strategy: Option<ExecutionStrategy>,
320+
) -> PyResult<PyObject> {
321+
// Convert datasets to Arrow batches while holding the GIL
282322
let arrow_datasets = python_datasets_to_batches(datasets)?;
283323

324+
// Convert Python strategy to Rust strategy
325+
let rust_strategy = strategy.map(|s| s.into());
326+
284327
// Clone the inner query for use in the async block
285328
let inner_query = self.inner.clone();
286329

287330
// Use RT.block_on with Some(py) like the scanner to_pyarrow method
288331
let result_batch = RT
289-
.block_on(Some(py), inner_query.execute(arrow_datasets))?
332+
.block_on(Some(py), inner_query.execute(arrow_datasets, rust_strategy))?
290333
.map_err(graph_error_to_pyerr)?;
291334

292335
record_batch_to_python_table(py, &result_batch)
@@ -562,6 +605,7 @@ fn record_batch_to_python_table(
562605
pub fn register_graph_module(py: Python, parent_module: &Bound<'_, PyModule>) -> PyResult<()> {
563606
let graph_module = PyModule::new(py, "graph")?;
564607

608+
graph_module.add_class::<ExecutionStrategy>()?;
565609
graph_module.add_class::<GraphConfig>()?;
566610
graph_module.add_class::<GraphConfigBuilder>()?;
567611
graph_module.add_class::<CypherQuery>()?;

rust/lance-graph/benches/graph_execution.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use arrow_schema::{DataType, Field, Schema as ArrowSchema};
2222
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
2323
use futures::TryStreamExt;
2424
use lance::dataset::{Dataset, WriteMode, WriteParams};
25-
use lance_graph::{CypherQuery, GraphConfig};
25+
use lance_graph::{CypherQuery, ExecutionStrategy, GraphConfig};
2626
use tempfile::TempDir;
2727

2828
fn create_people_batch() -> RecordBatch {
@@ -71,7 +71,11 @@ fn execute_cypher_query(
7171
q: &CypherQuery,
7272
datasets: HashMap<String, RecordBatch>,
7373
) -> RecordBatch {
74-
rt.block_on(async move { q.execute(datasets).await.unwrap() })
74+
rt.block_on(async move {
75+
q.execute(datasets, Some(ExecutionStrategy::Simple))
76+
.await
77+
.unwrap()
78+
})
7579
}
7680

7781
fn make_people_batch(n: usize) -> RecordBatch {

rust/lance-graph/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,4 @@ pub const MAX_VARIABLE_LENGTH_HOPS: u32 = 20;
5353

5454
pub use config::{GraphConfig, NodeMapping, RelationshipMapping};
5555
pub use error::{GraphError, Result};
56-
pub use query::CypherQuery;
56+
pub use query::{CypherQuery, ExecutionStrategy};

rust/lance-graph/src/query.rs

Lines changed: 79 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,23 @@ mod clauses;
1717
mod expr;
1818
mod simple_executor;
1919

20+
/// Execution strategy for Cypher queries
21+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22+
pub enum ExecutionStrategy {
23+
/// Use DataFusion query planner (default, full feature support)
24+
DataFusion,
25+
/// Use simple single-table executor (legacy, limited features)
26+
Simple,
27+
/// Use Lance native executor (not yet implemented)
28+
LanceNative,
29+
}
30+
31+
impl Default for ExecutionStrategy {
32+
fn default() -> Self {
33+
Self::DataFusion
34+
}
35+
}
36+
2037
/// A Cypher query that can be executed against Lance datasets
2138
#[derive(Debug, Clone)]
2239
pub struct CypherQuery {
@@ -92,6 +109,68 @@ impl CypherQuery {
92109
})
93110
}
94111

112+
/// Execute the query against provided in-memory datasets
113+
///
114+
/// This method uses the DataFusion planner by default for comprehensive query support
115+
/// including joins, aggregations, and complex patterns. You can optionally specify
116+
/// a different execution strategy.
117+
///
118+
/// # Arguments
119+
/// * `datasets` - HashMap of table name to RecordBatch (nodes and relationships)
120+
/// * `strategy` - Optional execution strategy (defaults to DataFusion)
121+
///
122+
/// # Returns
123+
/// A single RecordBatch containing the query results
124+
///
125+
/// # Errors
126+
/// Returns error if query parsing, planning, or execution fails
127+
///
128+
/// # Example
129+
/// ```ignore
130+
/// use std::collections::HashMap;
131+
/// use arrow::record_batch::RecordBatch;
132+
/// use lance_graph::query::CypherQuery;
133+
///
134+
/// // Create in-memory datasets
135+
/// let mut datasets = HashMap::new();
136+
/// datasets.insert("Person".to_string(), person_batch);
137+
/// datasets.insert("KNOWS".to_string(), knows_batch);
138+
///
139+
/// // Parse and execute query
140+
/// let query = CypherQuery::parse("MATCH (p:Person)-[:KNOWS]->(f) RETURN p.name, f.name")?
141+
/// .with_config(config);
142+
/// // Use the default DataFusion strategy
143+
/// let result = query.execute(datasets, None).await?;
144+
/// // Use the Simple strategy explicitly
145+
/// let result = query.execute(datasets, Some(ExecutionStrategy::Simple)).await?;
146+
/// ```
147+
pub async fn execute(
148+
&self,
149+
datasets: HashMap<String, arrow::record_batch::RecordBatch>,
150+
strategy: Option<ExecutionStrategy>,
151+
) -> Result<arrow::record_batch::RecordBatch> {
152+
let strategy = strategy.unwrap_or_default();
153+
match strategy {
154+
ExecutionStrategy::DataFusion => self.execute_datafusion(datasets).await,
155+
ExecutionStrategy::Simple => self.execute_simple(datasets).await,
156+
ExecutionStrategy::LanceNative => Err(GraphError::UnsupportedFeature {
157+
feature: "Lance native execution strategy is not yet implemented".to_string(),
158+
location: snafu::Location::new(file!(), line!(), column!()),
159+
}),
160+
}
161+
}
162+
163+
/// Explain the query execution plan using the DataFusion planner
164+
///
165+
/// This method provides a high-level overview of the query execution plan
166+
/// using the DataFusion planner, which is useful for debugging and optimization.
167+
pub async fn explain(
168+
&self,
169+
datasets: HashMap<String, arrow::record_batch::RecordBatch>,
170+
) -> Result<String> {
171+
self.explain_datafusion(datasets).await
172+
}
173+
95174
/// Execute using the DataFusion planner with in-memory datasets
96175
///
97176
/// # Overview
@@ -601,30 +680,6 @@ impl CypherQuery {
601680
Ok(output)
602681
}
603682

604-
/// Execute the query against provided in-memory datasets using the DataFusion planner
605-
///
606-
/// This is the primary execution method that uses the full DataFusion-based planner
607-
/// for comprehensive query support including joins, aggregations, and complex patterns.
608-
///
609-
/// For legacy single-table queries, use `execute_simple()` instead.
610-
pub async fn execute(
611-
&self,
612-
datasets: HashMap<String, arrow::record_batch::RecordBatch>,
613-
) -> Result<arrow::record_batch::RecordBatch> {
614-
self.execute_datafusion(datasets).await
615-
}
616-
617-
/// Explain the query execution plan using the DataFusion planner
618-
///
619-
/// This method provides a high-level overview of the query execution plan
620-
/// using the DataFusion planner, which is useful for debugging and optimization.
621-
pub async fn explain(
622-
&self,
623-
datasets: HashMap<String, arrow::record_batch::RecordBatch>,
624-
) -> Result<String> {
625-
self.explain_datafusion(datasets).await
626-
}
627-
628683
/// Execute simple single-table queries (legacy implementation)
629684
///
630685
/// This method supports basic projection/filter/limit workflows on a single table.

0 commit comments

Comments
 (0)