Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ dependencies = [
"pydantic>=2.0.0",
"openai>=1.52.0",
"pylance",
"fsspec",
"gcsfs",
]
description = "Python bindings for the lance-graph Cypher engine"
authors = [{ name = "Lance Devs", email = "[email protected]" }]
Expand Down
70 changes: 55 additions & 15 deletions python/python/knowledge_graph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pathlib import Path
from typing import Any, Mapping, Optional

import pyarrow.fs
import yaml
from lance_graph import GraphConfig

Expand All @@ -14,49 +15,74 @@
class KnowledgeGraphConfig:
"""Root configuration for the Lance-backed knowledge graph."""

storage_path: Path
schema_path: Optional[Path] = None
storage_path: Path | str
schema_path: Optional[Path | str] = None
default_dataset: Optional[str] = None
entity_types: tuple[str, ...] = field(default_factory=tuple)
relationship_types: tuple[str, ...] = field(default_factory=tuple)
storage_options: Optional[dict[str, str]] = None

@classmethod
def from_root(
cls,
root: Path,
root: Path | str,
*,
default_dataset: Optional[str] = None,
storage_options: Optional[dict[str, str]] = None,
) -> KnowledgeGraphConfig:
"""Create a configuration anchored at ``root``."""
schema_path = root / "graph.yaml"
if isinstance(root, str) and "://" in root:
schema_path = f"{root.rstrip('/')}/graph.yaml"
else:
if isinstance(root, str):
root = Path(root)
schema_path = root / "graph.yaml"
return cls(
storage_path=root,
schema_path=schema_path,
default_dataset=default_dataset,
storage_options=storage_options,
)

@classmethod
def default(cls) -> KnowledgeGraphConfig:
"""Use a storage folder relative to the current working directory."""
return cls.from_root(Path.cwd() / "knowledge_graph_data")

def resolved_schema_path(self) -> Path:
def resolved_schema_path(self) -> Path | str:
"""Return the expected path to the graph schema definition."""
return self.schema_path or (self.storage_path / "graph.yaml")
if self.schema_path:
return self.schema_path
if isinstance(self.storage_path, str) and "://" in self.storage_path:
return f"{self.storage_path.rstrip('/')}/graph.yaml"
if isinstance(self.storage_path, str):
return Path(self.storage_path) / "graph.yaml"
return self.storage_path / "graph.yaml"

def ensure_directories(self) -> None:
"""Create required directories for persistent storage."""
self.storage_path.mkdir(parents=True, exist_ok=True)
self.resolved_schema_path().parent.mkdir(parents=True, exist_ok=True)
if isinstance(self.storage_path, str) and "://" in self.storage_path:
return

path = (
self.storage_path
if isinstance(self.storage_path, Path)
else Path(self.storage_path)
)
path.mkdir(parents=True, exist_ok=True)
schema = self.resolved_schema_path()
if isinstance(schema, Path):
schema.parent.mkdir(parents=True, exist_ok=True)

def with_schema(self, schema_path: Path) -> KnowledgeGraphConfig:
def with_schema(self, schema_path: Path | str) -> KnowledgeGraphConfig:
"""Return a copy of the config with an explicit schema path."""
return KnowledgeGraphConfig(
storage_path=self.storage_path,
schema_path=schema_path,
default_dataset=self.default_dataset,
entity_types=self.entity_types,
relationship_types=self.relationship_types,
storage_options=self.storage_options,
)

def load_graph_config(self) -> GraphConfig:
Expand Down Expand Up @@ -84,12 +110,26 @@ def type_hints(self) -> dict[str, tuple[str, ...]]:

def _load_schema_payload(self) -> Mapping[str, Any]:
schema_path = self.resolved_schema_path()
if not schema_path.exists():
raise FileNotFoundError(
f"Graph schema configuration not found at {schema_path}"
)
with schema_path.open("r", encoding="utf-8") as handle:
payload = yaml.safe_load(handle) or {}
if isinstance(schema_path, str) and "://" in schema_path:
fs, path = pyarrow.fs.FileSystem.from_uri(schema_path)
try:
with fs.open_input_stream(path) as f:
payload = yaml.safe_load(f.read()) or {}
except FileNotFoundError:
raise FileNotFoundError(
f"Graph schema configuration not found at {schema_path}"
)
else:
if isinstance(schema_path, str):
schema_path = Path(schema_path)

if not schema_path.exists():
raise FileNotFoundError(
f"Graph schema configuration not found at {schema_path}"
)
with schema_path.open("r", encoding="utf-8") as handle:
payload = yaml.safe_load(handle) or {}

if not isinstance(payload, Mapping):
raise ValueError("Graph schema configuration must be a mapping")
return payload # type: ignore[return-value]
Expand Down
28 changes: 23 additions & 5 deletions python/python/knowledge_graph/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,8 @@ def init_graph(config: "KnowledgeGraphConfig") -> None:
config.ensure_directories()
LanceGraphStore(config).ensure_layout()
schema_path = config.resolved_schema_path()
if schema_path.exists():
print(f"Schema already present at {schema_path}")
return

schema_stub = """\
# Lance knowledge graph schema
schema_stub = """# Lance knowledge graph schema
#
# Define node labels and relationship mappings. Example:
# nodes:
Expand All @@ -56,6 +52,28 @@ def init_graph(config: "KnowledgeGraphConfig") -> None:
entity_types: []
relationship_types: []
"""

if isinstance(schema_path, str):
import pyarrow.fs

try:
fs, path = pyarrow.fs.FileSystem.from_uri(schema_path)
info = fs.get_file_info(path)
if info.type != pyarrow.fs.FileType.NotFound:
print(f"Schema already present at {schema_path}")
return
with fs.open_output_stream(path) as f:
f.write(schema_stub.encode("utf-8"))
print(f"Created schema template at {schema_path}")
return
except Exception as e:
print(f"Failed to initialize schema at {schema_path}: {e}", file=sys.stderr)
return

if schema_path.exists():
print(f"Schema already present at {schema_path}")
return

schema_path.write_text(schema_stub, encoding="utf-8")
print(f"Created schema template at {schema_path}")

Expand Down
102 changes: 81 additions & 21 deletions python/python/knowledge_graph/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@

import logging
from importlib import import_module
from pathlib import Path
from typing import TYPE_CHECKING, Dict, Iterable, Mapping, Optional

import fsspec
import pyarrow as pa

if TYPE_CHECKING:
from pathlib import Path
from types import ModuleType

import pyarrow as pa

from .config import KnowledgeGraphConfig


Expand All @@ -23,38 +24,80 @@ class LanceGraphStore:

def __init__(self, config: "KnowledgeGraphConfig"):
self._config = config
self._root: "Path" = config.storage_path
self._root: Path | str = config.storage_path
self._lance: Optional[ModuleType] = None
self._lance_attempted = False

# Initialize filesystem interface
# We convert to string to ensure compatibility with fsspec, but we'll
# use self._root (the original type) when reconstructing return values.
try:
self._fs, self._fs_path = fsspec.core.url_to_fs(
str(self._root), **(self.config.storage_options or {})
)
except ImportError:
# Re-raise explicit ImportError if protocol driver (e.g. gcsfs, s3fs)
# is missing
raise

@property
def config(self) -> "KnowledgeGraphConfig":
"""Return the configuration backing this store."""
return self._config

@property
def root(self) -> "Path":
def root(self) -> Path | str:
"""Return the root path for persisted datasets."""
return self._root

def ensure_layout(self) -> None:
"""Create the storage layout if it does not already exist."""
self._root.mkdir(parents=True, exist_ok=True)

def list_datasets(self) -> Dict[str, "Path"]:
try:
self._fs.makedirs(self._fs_path, exist_ok=True)
except Exception:
# S3/GCS might not support directory creation or it might be implicit.
# We treat failure here as non-fatal if the path is actually accessible
# later,
# but usually makedirs is safe on object stores (no-op).
pass

def list_datasets(self) -> Dict[str, Path | str]:
"""Enumerate known Lance datasets."""
datasets: Dict[str, Path] = {}
if not self._root.exists():
return datasets
for child in self._root.iterdir():
if child.is_dir() and child.suffix == ".lance":
datasets[child.stem] = child
datasets: Dict[str, Path | str] = {}

try:
if not self._fs.exists(self._fs_path):
return datasets
infos = self._fs.ls(self._fs_path, detail=True)
except Exception as e:
# We want to swallow "not found" errors but raise others (like Auth errors)
if isinstance(e, FileNotFoundError):
return datasets

msg = str(e).lower()
if "not found" in msg or "no such file" in msg or "does not exist" in msg:
return datasets
raise

root_str = str(self._root)
for info in infos:
name = info["name"].rstrip("/")
base_name = name.split("/")[-1]
if info["type"] == "directory" and base_name.endswith(".lance"):
dataset_name = base_name[:-6]
full_path = f"{root_str.rstrip('/')}/{base_name}"
if isinstance(self._root, Path):
datasets[dataset_name] = Path(full_path)
else:
datasets[dataset_name] = full_path
return datasets

def _dataset_path(self, name: str) -> "Path":
def _dataset_path(self, name: str) -> Path | str:
"""Create the canonical path for a dataset."""
safe_name = name.replace("/", "_")
return self._root / f"{safe_name}.lance"
if isinstance(self._root, Path):
return self._root / f"{safe_name}.lance"
return f"{self._root.rstrip('/')}/{safe_name}.lance"

def _get_lance(self) -> ModuleType:
if not self._lance_attempted:
Expand All @@ -77,6 +120,20 @@ def _get_lance(self) -> ModuleType:
raise ImportError("Lance module failed to load")
return self._lance

def _path_exists(self, path: Path | str) -> bool:
if isinstance(path, Path):
return path.exists()
try:
fs, p = fsspec.core.url_to_fs(path)
except Exception:
# If we cannot resolve the filesystem (e.g. missing gcsfs), we should raise
# rather than assuming the path does not exist.
raise
try:
return fs.exists(p)
except Exception:
return False

def load_tables(
self,
names: Optional[Iterable[str]] = None,
Expand All @@ -91,17 +148,18 @@ def load_tables(
tables: Dict[str, "pa.Table"] = {}
for name in requested:
path = available.get(name, self._dataset_path(name))
if not path.exists():
if not self._path_exists(path):
raise FileNotFoundError(f"Dataset '{name}' not found at {path}")
dataset = lance.dataset(str(path))
dataset = lance.dataset(
str(path), storage_options=self.config.storage_options
)
table = dataset.scanner().to_table()
tables[name] = table
return tables

def write_tables(self, tables: Mapping[str, "pa.Table"]) -> None:
"""Persist PyArrow tables as Lance datasets."""
lance = self._get_lance()
import pyarrow as pa # Local import; optional dependency

self.ensure_layout()
for name, table in tables.items():
Expand All @@ -110,5 +168,7 @@ def write_tables(self, tables: Mapping[str, "pa.Table"]) -> None:
f"Dataset '{name}' must be a pyarrow.Table (got {type(table)!r})"
)
path = self._dataset_path(name)
mode = "overwrite" if path.exists() else "create"
lance.write_dataset(table, str(path), mode=mode)
mode = "overwrite" if self._path_exists(path) else "create"
lance.write_dataset(
table, str(path), mode=mode, storage_options=self.config.storage_options
)
Loading
Loading