Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 190 additions & 46 deletions apps/framework-cli/src/infrastructure/olap/clickhouse/sql_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

use crate::infrastructure::olap::clickhouse::model::ClickHouseIndex;
use sqlparser::ast::{
Expr, ObjectName, ObjectNamePart, Query, Select, SelectItem, SetExpr, Statement, TableFactor,
TableWithJoins, ToSql, VisitMut, VisitorMut,
CreateTableOptions, Expr, ObjectName, ObjectNamePart, Query, Select, SelectItem, SetExpr,
SqlOption, Statement, TableFactor, TableWithJoins, ToSql, VisitMut, VisitorMut,
};
use sqlparser::dialect::ClickHouseDialect;
use sqlparser::keywords::Keyword;
use sqlparser::parser::Parser;
use sqlparser::tokenizer::{Location, Span, Token, Tokenizer};
use std::collections::HashSet;
use std::ops::ControlFlow;
use std::sync::LazyLock;
Expand Down Expand Up @@ -175,58 +177,184 @@ pub fn extract_table_settings_from_create_table(
}

pub fn extract_engine_from_create_table(sql: &str) -> Option<String> {
// Find the ENGINE keyword (case-insensitive)
let sql_upper = sql.to_uppercase();
let engine_pos = sql_upper.find("ENGINE")?;
// Prefer parsing with sqlparser (AST-aware) to avoid substring matches.
// ClickHouse engine parameters can include numeric literals that the parser
// doesn't fully support, so we fall back to tokenizer-based extraction when
// parsing fails.
let dialect = ClickHouseDialect {};
if let Ok(ast) = Parser::parse_sql(&dialect, sql) {
if ast.len() != 1 {
return None;
}
if let Statement::CreateTable(create_table) = &ast[0] {
if !create_table_has_engine_option(&create_table.table_options) {
return None;
}
return extract_engine_from_tokens(sql, &dialect);
}
return None;
}

// Fallback: tokenizer-based extraction (still SQL-aware, avoids naive find)
extract_engine_from_tokens(sql, &dialect)
}

// Skip "ENGINE" and any whitespace/equals
let rest = &sql[engine_pos + 6..];
let rest_trimmed = rest.trim_start();
let rest_after_eq = rest_trimmed.strip_prefix('=').map(|s| s.trim_start())?; // ENGINE must be followed by =

// Now extract the engine name and parameters
// Engine name is alphanumeric (including underscore)
let engine_name_end = rest_after_eq
.find(|c: char| !c.is_alphanumeric() && c != '_')
.unwrap_or(rest_after_eq.len());

let engine_name = &rest_after_eq[..engine_name_end];

// Check if there are parameters (starting with '(')
let after_name = &rest_after_eq[engine_name_end..].trim_start();
if after_name.starts_with('(') {
// Find the matching closing parenthesis, handling nested parentheses
let mut paren_count = 0;
let mut in_string = false;
let mut escape_next = false;
let mut end_pos = None;

for (i, ch) in after_name.chars().enumerate() {
if escape_next {
escape_next = false;
continue;
/// Returns true if the parsed CREATE TABLE options include an ENGINE clause.
///
/// This avoids token-scanning statements that don't declare an engine.
fn create_table_has_engine_option(options: &CreateTableOptions) -> bool {
let opts = match options {
CreateTableOptions::None => return false,
CreateTableOptions::With(opts) => opts,
CreateTableOptions::Options(opts) => opts,
CreateTableOptions::Plain(opts) => opts,
CreateTableOptions::TableProperties(opts) => opts,
};

opts.iter().any(|opt| match opt {
SqlOption::NamedParenthesizedList(list) => list.key.value.eq_ignore_ascii_case("ENGINE"),
_ => false,
})
}

/// Extract the ENGINE clause using token spans instead of substring search.
///
/// Token spans preserve the original formatting (quotes, escapes) and avoid
/// false matches such as column names containing "engine".
fn extract_engine_from_tokens(sql: &str, dialect: &ClickHouseDialect) -> Option<String> {
let tokens = Tokenizer::new(dialect, sql).tokenize_with_location().ok()?;

let mut saw_create = false;
let mut saw_table = false;
let mut i = 0usize;
while i < tokens.len() {
let token = &tokens[i].token;
if is_keyword(token, Keyword::CREATE) {
saw_create = true;
i += 1;
continue;
}
if saw_create && is_keyword(token, Keyword::TABLE) {
saw_table = true;
i += 1;
continue;
}

if saw_table && is_keyword(token, Keyword::ENGINE) {
let mut j = i + 1;
skip_whitespace(&tokens, &mut j);

if j < tokens.len() && matches!(tokens[j].token, Token::Eq) {
j += 1;
skip_whitespace(&tokens, &mut j);
}

match ch {
'\\' if in_string => escape_next = true,
'\'' => in_string = !in_string,
'(' if !in_string => paren_count += 1,
')' if !in_string => {
paren_count -= 1;
if paren_count == 0 {
end_pos = Some(i + 1);
break;
}
let engine_tok = tokens.get(j)?;
let engine_name = slice_for_span(sql, engine_tok.span)?;
let engine_name = engine_name.trim();
if engine_name.is_empty() {
return None;
}
j += 1;
skip_whitespace(&tokens, &mut j);

if j < tokens.len() && matches!(tokens[j].token, Token::LParen) {
let (lparen_idx, rparen_idx) = find_matching_paren(&tokens, j)?;
let params_slice = slice_for_span(
sql,
Span::new(tokens[lparen_idx].span.start, tokens[rparen_idx].span.end),
)?;
let params_slice = params_slice.trim();
return Some(format!("{engine_name}{params_slice}"));
}

return Some(engine_name.to_string());
}

i += 1;
}

None
}

/// Checks whether the token is the given keyword.
fn is_keyword(token: &Token, keyword: Keyword) -> bool {
matches!(token, Token::Word(word) if word.keyword == keyword)
}

/// Advances the index past any whitespace (including comments).
fn skip_whitespace(tokens: &[sqlparser::tokenizer::TokenWithSpan], idx: &mut usize) {
while *idx < tokens.len() {
match tokens[*idx].token {
Token::Whitespace(_) => *idx += 1,
_ => break,
}
}
}

/// Finds the matching ')' for the '(' at start_idx, tracking nesting.
fn find_matching_paren(
tokens: &[sqlparser::tokenizer::TokenWithSpan],
start_idx: usize,
) -> Option<(usize, usize)> {
let mut depth = 0i32;
let mut idx = start_idx;
let mut lparen_idx = None;
while idx < tokens.len() {
match tokens[idx].token {
Token::LParen => {
depth += 1;
if depth == 1 {
lparen_idx = Some(idx);
}
}
Token::RParen => {
depth -= 1;
if depth == 0 {
return lparen_idx.map(|l| (l, idx));
}
_ => {}
}
_ => {}
}
idx += 1;
}
None
}

end_pos.map(|end| format!("{}{}", engine_name, &after_name[..end]))
} else {
// Engine without parameters
Some(engine_name.to_string())
/// Returns the SQL substring corresponding to the given span.
fn slice_for_span(sql: &str, span: Span) -> Option<&str> {
let start = location_to_index(sql, span.start)?;
let end = location_to_index(sql, span.end)?;
if end < start {
return None;
}
Some(&sql[start..end])
}

/// Convert a sqlparser Location (1-based line/column) into a byte index.
fn location_to_index(sql: &str, location: Location) -> Option<usize> {
if location.line == 0 || location.column == 0 {
return None;
}

let mut line = 1u64;
let mut column = 1u64;
for (idx, ch) in sql.char_indices() {
if line == location.line && column == location.column {
return Some(idx);
}
if ch == '\n' {
line += 1;
column = 1;
} else {
column += 1;
}
}

if line == location.line && column == location.column {
return Some(sql.len());
}
None
}

/// Extract SAMPLE BY expression from a CREATE TABLE statement
Expand Down Expand Up @@ -1125,6 +1253,22 @@ pub mod tests {
assert_eq!(result, Some("MergeTree".to_string()));
}

#[test]
fn test_extract_engine_when_column_name_contains_engine() {
let sql = r#"CREATE TABLE acme_telemetry.device_script_event_consumer (
_id String,
scripting_engine String
)
ENGINE = Buffer('acme_telemetry', 'device_script_event_consumer_stored', 16, 1, 300, 100, 10000, 10000000, 50000000)"#;
let result = extract_engine_from_create_table(sql);
assert_eq!(
result,
Some(
"Buffer('acme_telemetry', 'device_script_event_consumer_stored', 16, 1, 300, 100, 10000, 10000000, 50000000)".to_string()
)
);
}

#[test]
fn test_extract_no_engine() {
let sql = "CREATE TABLE test (x Int32)";
Expand Down
Loading