Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8dc39ac
feat(geth-engine): support EventQL queries
YoEight Jul 19, 2025
584142d
pending lexical analysis implementation
YoEight Jul 19, 2025
d04f728
make some progress on the rename phase
YoEight Jul 19, 2025
a87513c
rename seems complete, time to test
YoEight Jul 20, 2025
9faf1c0
add rename phase tests
YoEight Jul 20, 2025
508e863
start working on inferring field types
YoEight Jul 20, 2025
e9c673d
more type inference implementation
YoEight Jul 21, 2025
3c9a9e2
pending unary op inference implementation
YoEight Jul 21, 2025
6c61c66
infer unary operations
YoEight Jul 24, 2025
667dd4c
inference is almost done
YoEight Jul 25, 2025
d50a02a
inference ready not time for some tests
YoEight Jul 25, 2025
fdc5f1c
fix parser tracking position and binary operation parsing
YoEight Jul 25, 2025
656f4eb
improve type inference when dealing with binary operation
YoEight Jul 25, 2025
978e2f7
add homemade error for the eventql parser
YoEight Jul 26, 2025
4b4021c
eventql parser now uses its own error type, need to migrate tests for
YoEight Jul 26, 2025
71c67e2
formatting
YoEight Jul 26, 2025
f2dee1b
add a binary operation test
YoEight Jul 26, 2025
6c832ac
add test for unhinged unary query
YoEight Jul 26, 2025
41c5333
move to more meaningful error introspection during tests
YoEight Jul 26, 2025
149e06e
pending collection of what the user want to do from the query
YoEight Jul 27, 2025
049761d
just discovered how marvelous subjects are
YoEight Jul 27, 2025
b591b75
Promote subject to a real type
YoEight Jul 27, 2025
d9f3f21
start the predicate evaluation implementation
YoEight Jul 28, 2025
ef96a3a
implement linearization step
YoEight Aug 1, 2025
0aa86e2
working on a stack-based interpreter
YoEight Aug 1, 2025
9f4942c
make some progress with the eval code but far from completion
YoEight Aug 2, 2025
831ef2d
implement proper codegeneration for the query
YoEight Aug 3, 2025
aeaa78c
More eval code done
YoEight Aug 3, 2025
82d4889
eval core logic is done, need to implement function calling
YoEight Aug 4, 2025
7cb0574
stop toying and get a function query operation as soon as possible
YoEight Aug 4, 2025
fc56b65
export some symbol from the eval module
YoEight Aug 4, 2025
6284efd
code cleaning/formating
YoEight Aug 4, 2025
7087a8e
simplify eventql parsing code
YoEight Aug 9, 2025
311cf28
implement DFS post order traversal for expr
YoEight Aug 9, 2025
ce416dc
implement generic query traversal
YoEight Aug 9, 2025
c8a1583
refact the rename phase
YoEight Aug 10, 2025
3c607f8
refactor inference phase
YoEight Aug 10, 2025
c8539c0
fix inference tests
YoEight Aug 10, 2025
f0d2e1c
small renaming
YoEight Aug 10, 2025
48bb10a
refact codegen
YoEight Aug 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions geth-domain/src/index/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl MemTable {
self.entries_count += 1;
}

pub fn scan_forward(&self, key: u64, start: u64, max: usize) -> ScanForward {
pub fn scan_forward(&self, key: u64, start: u64, max: usize) -> ScanForward<'_> {
ScanForward {
key,
start,
Expand All @@ -44,7 +44,7 @@ impl MemTable {
}
}

pub fn scan_backward(&self, key: u64, start: u64, max: usize) -> ScanBackward {
pub fn scan_backward(&self, key: u64, start: u64, max: usize) -> ScanBackward<'_> {
ScanBackward {
key,
start,
Expand Down
4 changes: 2 additions & 2 deletions geth-domain/src/index/ss_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl SsTable {
self.iter().map(|t| (t.key, t.revision, t.position))
}

pub fn scan_forward(&self, key: u64, start: u64, count: usize) -> ScanForward {
pub fn scan_forward(&self, key: u64, start: u64, count: usize) -> ScanForward<'_> {
ScanForward {
key,
revision: start,
Expand All @@ -300,7 +300,7 @@ impl SsTable {
}
}

pub fn scan_backward(&self, key: u64, start: u64, count: usize) -> ScanBackward {
pub fn scan_backward(&self, key: u64, start: u64, count: usize) -> ScanBackward<'_> {
let mut candidates = self.find_best_candidates(key, start);

candidates.rotate_left(candidates.len() - 1);
Expand Down
5 changes: 4 additions & 1 deletion geth-engine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "geth-engine"
version = "0.1.0"
edition = "2021"
edition = "2024"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -17,6 +17,9 @@ path = "../geth-grpc"
[dependencies.geth-domain]
path = "../geth-domain"

[dependencies.geth-eventql]
path = "../geth-eventql"

[dependencies.tokio]
version = "1.20"
features = ["full"]
Expand Down
12 changes: 6 additions & 6 deletions geth-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,26 @@ mod options;
mod process;

use geth_mikoshi::{
storage::Storage, wal::chunks::ChunkContainer, FileSystemStorage, InMemoryStorage,
FileSystemStorage, InMemoryStorage, storage::Storage, wal::chunks::ChunkContainer,
};
use opentelemetry::{trace::TracerProvider, KeyValue};
use opentelemetry::{KeyValue, trace::TracerProvider};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
logs::SdkLoggerProvider, metrics::PeriodicReader, trace::SdkTracerProvider, Resource,
Resource, logs::SdkLoggerProvider, metrics::PeriodicReader, trace::SdkTracerProvider,
};
pub use process::{
Proc, RequestContext,
indexing::IndexClient,
manager::{start_process_manager_with_catalog, Catalog, CatalogBuilder, ManagerClient},
manager::{Catalog, CatalogBuilder, ManagerClient, start_process_manager_with_catalog},
reading::{self, ReaderClient},
start_process_manager,
writing::WriterClient,
Proc, RequestContext,
};
use tokio::sync::OnceCell;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::{EnvFilter, prelude::*};
use tracing_subscriber::{filter::filter_fn, layer::SubscriberExt};
use tracing_subscriber::{prelude::*, EnvFilter};

pub mod built_info {
// The file has been placed there by the build script.
Expand Down
2 changes: 1 addition & 1 deletion geth-engine/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
sync::{mpsc, Arc, RwLock},
sync::{Arc, RwLock, mpsc},
thread,
time::Duration,
};
Expand Down
4 changes: 3 additions & 1 deletion geth-engine/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::time::Instant;
use tokio::sync::mpsc::UnboundedSender;
use uuid::Uuid;

use crate::process::manager::{start_process_manager_with_catalog, Catalog};
use crate::Options;
use crate::process::manager::{Catalog, start_process_manager_with_catalog};

#[cfg(test)]
mod tests;
Expand All @@ -19,6 +19,7 @@ pub mod manager;
mod messages;
#[cfg(test)]
mod panic;
pub mod query;
pub mod reading;
#[cfg(test)]
mod sink;
Expand Down Expand Up @@ -82,6 +83,7 @@ pub enum Proc {
PubSub,
Grpc,
PyroWorker,
Query,
#[cfg(test)]
Echo,
#[cfg(test)]
Expand Down
11 changes: 6 additions & 5 deletions geth-engine/src/process/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use tokio::select;
use tracing::instrument;

use crate::{
IndexClient, ManagerClient, ReaderClient, RequestContext,
process::subscription::{self, SubscriptionClient},
reading, IndexClient, ManagerClient, ReaderClient, RequestContext,
reading,
};

#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -222,10 +223,10 @@ impl Consumer {

State::Live => {
if let Some(event) = self.sub_streaming.next().await? {
if let SubscriptionEvent::EventAppeared(temp) = &event {
if temp.revision < self.end {
continue;
}
if let SubscriptionEvent::EventAppeared(temp) = &event
&& temp.revision < self.end
{
continue;
}

return Ok(Some(event));
Expand Down
2 changes: 1 addition & 1 deletion geth-engine/src/process/echo.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::process::{env::Managed, Item, ProcessEnv};
use crate::process::{Item, ProcessEnv, env::Managed};

pub async fn run(mut env: ProcessEnv<Managed>) -> eyre::Result<()> {
while let Some(item) = env.recv().await {
Expand Down
2 changes: 1 addition & 1 deletion geth-engine/src/process/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use tokio::{
};

use crate::{
process::{subscription::SubscriptionClient, Item},
IndexClient, ManagerClient, Options, Proc,
process::{Item, subscription::SubscriptionClient},
};

pub struct Managed {
Expand Down
20 changes: 10 additions & 10 deletions geth-engine/src/process/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::{pin::Pin, sync::Arc};

use tokio::sync::Notify;
use tonic::{transport::Server, Code, Status};
use tonic::{Code, Status, transport::Server};

use geth_grpc::generated::protocol::protocol_server::ProtocolServer;
use tracing::instrument;

use crate::{
metrics::{get_metrics, Metrics},
process::{manager::ManagerClient, Managed, ProcessEnv},
Options,
metrics::{Metrics, get_metrics},
process::{Managed, ProcessEnv, manager::ManagerClient},
};

mod protocol;
Expand Down Expand Up @@ -116,13 +116,13 @@ where

Box::pin(async move {
let resp = inner.call(req).await?;
if let Some(status) = Status::from_header_map(resp.headers()) {
if status.code() != Code::Ok {
if is_client_error(status.code()) {
metrics.observe_client_error();
} else if is_server_error(status.code()) {
metrics.observe_server_error();
}
if let Some(status) = Status::from_header_map(resp.headers())
&& status.code() != Code::Ok
{
if is_client_error(status.code()) {
metrics.observe_client_error();
} else if is_server_error(status.code()) {
metrics.observe_server_error();
}
}

Expand Down
4 changes: 2 additions & 2 deletions geth-engine/src/process/grpc/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tonic::{Request, Response, Status};
use uuid::Uuid;

use crate::metrics::get_metrics;
use crate::process::consumer::{start_consumer, ConsumerResult};
use crate::process::consumer::{ConsumerResult, start_consumer};
use crate::process::reading::ReaderClient;
use crate::process::subscription::SubscriptionClient;
use crate::process::writing::WriterClient;
Expand Down Expand Up @@ -172,7 +172,7 @@ impl Protocol for ProtocolImpl {
Ok(result) => match result {
ConsumerResult::Success(c) => c,
ConsumerResult::StreamDeleted => {
return Err(Status::failed_precondition("stream-deleted"))
return Err(Status::failed_precondition("stream-deleted"));
}
},
};
Expand Down
6 changes: 4 additions & 2 deletions geth-engine/src/process/indexing/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl IndexClient {
return Ok(ReadCompleted::Success(Streaming {
inner,
batch: Some(entries.into_iter()),
}))
}));
}

_ => {
Expand Down Expand Up @@ -134,7 +134,9 @@ impl IndexClient {
}

_ => {
eyre::bail!("unexpected response when fetching the latest revision from the index process");
eyre::bail!(
"unexpected response when fetching the latest revision from the index process"
);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion geth-engine/src/process/indexing/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use geth_common::{Direction, IteratorIO};
use geth_domain::index::BlockEntry;
use geth_domain::{Lsm, LsmSettings};
use geth_mikoshi::hashing::mikoshi_hash;
use geth_mikoshi::wal::chunks::ChunkContainer;
use geth_mikoshi::wal::LogReader;
use geth_mikoshi::wal::chunks::ChunkContainer;
use std::cmp::min;
use std::sync::{Arc, RwLock};
use std::{io, mem};
Expand Down
2 changes: 1 addition & 1 deletion geth-engine/src/process/manager/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use tokio::sync::oneshot;
use uuid::Uuid;

use crate::{
process::{messages::Messages, Item, Mail, ProcId, RunningProc, SpawnError, SpawnResult},
Proc, RequestContext,
process::{Item, Mail, ProcId, RunningProc, SpawnError, SpawnResult, messages::Messages},
};

#[derive(Debug)]
Expand Down
6 changes: 3 additions & 3 deletions geth-engine/src/process/manager/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@ use std::time::{Duration, Instant};

use geth_common::ProgramSummary;
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
oneshot,
};
use tracing::instrument;
use uuid::Uuid;

use crate::{
IndexClient, Proc, ReaderClient, RequestContext, WriterClient,
process::{
Item, Mail, ProcId, RunningProc, SpawnResult, Stream,
manager::{
FindParams, ManagerCommand, ProcReadyParams, ProcTerminatedParams, SendParams,
ShutdownNotification, ShutdownParams, TimeoutParams, TimeoutTarget, WaitForParams,
},
messages::Messages,
subscription::SubscriptionClient,
Item, Mail, ProcId, RunningProc, SpawnResult, Stream,
},
IndexClient, Proc, ReaderClient, RequestContext, WriterClient,
};

#[derive(Clone, Debug)]
Expand Down
51 changes: 25 additions & 26 deletions geth-engine/src/process/manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
atomic::{AtomicBool, Ordering},
},
time::{Duration, Instant},
};

use geth_common::ProgramSummary;
use tokio::sync::{oneshot, Notify};
use tokio::sync::{Notify, oneshot};
use uuid::Uuid;

use crate::{
Options, Proc, RequestContext,
process::{
Item, Mail, ProcId, RunningProc, SpawnError, SpawnResult,
manager::{
catalog::ProvisionResult,
proc::process_manager,
spawn::{spawn_process, SpawnParams},
spawn::{SpawnParams, spawn_process},
},
messages::{Messages, Notifications, Responses},
Item, Mail, ProcId, RunningProc, SpawnError, SpawnResult,
},
Options, Proc, RequestContext,
};

mod catalog;
Expand Down Expand Up @@ -187,13 +187,13 @@ impl Manager {
}

Item::Stream(stream) => {
if let Some(proc) = self.catalog.get_process(cmd.dest) {
if !proc.mailbox.send(Item::Stream(stream)) {
self.handle_terminate(ProcTerminatedParams {
id: cmd.dest,
error: None,
});
}
if let Some(proc) = self.catalog.get_process(cmd.dest)
&& !proc.mailbox.send(Item::Stream(stream))
{
self.handle_terminate(ProcTerminatedParams {
id: cmd.dest,
error: None,
});
}
}
}
Expand Down Expand Up @@ -273,20 +273,19 @@ impl Manager {
}

for dependent in running.dependents {
if let Some(running) = self.catalog.get_process(dependent) {
if !self.closing
&& !running.mailbox.send(Item::Mail(Mail {
context: RequestContext::new(),
origin: 0,
correlation: Uuid::nil(),
payload: Notifications::ProcessTerminated(cmd.id).into(),
created: Instant::now(),
}))
{
// I don't want to call `handle_terminate` here because it could end up blowing up the stack.
// I could rewrite `handle_terminate` to avoid recursion.
tracing::warn!(id = dependent, proc = ?running.proc, closing = self.closing, "process seems to be terminated");
}
if let Some(running) = self.catalog.get_process(dependent)
&& !self.closing
&& !running.mailbox.send(Item::Mail(Mail {
context: RequestContext::new(),
origin: 0,
correlation: Uuid::nil(),
payload: Notifications::ProcessTerminated(cmd.id).into(),
created: Instant::now(),
}))
{
// I don't want to call `handle_terminate` here because it could end up blowing up the stack.
// I could rewrite `handle_terminate` to avoid recursion.
tracing::warn!(id = dependent, proc = ?running.proc, closing = self.closing, "process seems to be terminated");
}
}
} else if !self.closing {
Expand Down
Loading
Loading