Skip to content

Commit acb90d6

Browse files
authored
feat: support gcs and s3 as storage (#71)
* feat: support gcs and s3 as storage * fix lint
1 parent 304fe37 commit acb90d6

File tree

7 files changed

+1634
-45
lines changed

7 files changed

+1634
-45
lines changed

python/pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ dependencies = [
99
"pydantic>=2.0.0",
1010
"openai>=1.52.0",
1111
"pylance",
12+
"fsspec",
13+
"gcsfs",
1214
]
1315
description = "Python bindings for the lance-graph Cypher engine"
1416
authors = [{ name = "Lance Devs", email = "[email protected]" }]

python/python/knowledge_graph/config.py

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from pathlib import Path
77
from typing import Any, Mapping, Optional
88

9+
import pyarrow.fs
910
import yaml
1011
from lance_graph import GraphConfig
1112

@@ -14,49 +15,74 @@
1415
class KnowledgeGraphConfig:
1516
"""Root configuration for the Lance-backed knowledge graph."""
1617

17-
storage_path: Path
18-
schema_path: Optional[Path] = None
18+
storage_path: Path | str
19+
schema_path: Optional[Path | str] = None
1920
default_dataset: Optional[str] = None
2021
entity_types: tuple[str, ...] = field(default_factory=tuple)
2122
relationship_types: tuple[str, ...] = field(default_factory=tuple)
23+
storage_options: Optional[dict[str, str]] = None
2224

2325
@classmethod
2426
def from_root(
2527
cls,
26-
root: Path,
28+
root: Path | str,
2729
*,
2830
default_dataset: Optional[str] = None,
31+
storage_options: Optional[dict[str, str]] = None,
2932
) -> KnowledgeGraphConfig:
3033
"""Create a configuration anchored at ``root``."""
31-
schema_path = root / "graph.yaml"
34+
if isinstance(root, str) and "://" in root:
35+
schema_path = f"{root.rstrip('/')}/graph.yaml"
36+
else:
37+
if isinstance(root, str):
38+
root = Path(root)
39+
schema_path = root / "graph.yaml"
3240
return cls(
3341
storage_path=root,
3442
schema_path=schema_path,
3543
default_dataset=default_dataset,
44+
storage_options=storage_options,
3645
)
3746

3847
@classmethod
3948
def default(cls) -> KnowledgeGraphConfig:
4049
"""Use a storage folder relative to the current working directory."""
4150
return cls.from_root(Path.cwd() / "knowledge_graph_data")
4251

43-
def resolved_schema_path(self) -> Path:
52+
def resolved_schema_path(self) -> Path | str:
4453
"""Return the expected path to the graph schema definition."""
45-
return self.schema_path or (self.storage_path / "graph.yaml")
54+
if self.schema_path:
55+
return self.schema_path
56+
if isinstance(self.storage_path, str) and "://" in self.storage_path:
57+
return f"{self.storage_path.rstrip('/')}/graph.yaml"
58+
if isinstance(self.storage_path, str):
59+
return Path(self.storage_path) / "graph.yaml"
60+
return self.storage_path / "graph.yaml"
4661

4762
def ensure_directories(self) -> None:
4863
"""Create required directories for persistent storage."""
49-
self.storage_path.mkdir(parents=True, exist_ok=True)
50-
self.resolved_schema_path().parent.mkdir(parents=True, exist_ok=True)
64+
if isinstance(self.storage_path, str) and "://" in self.storage_path:
65+
return
66+
67+
path = (
68+
self.storage_path
69+
if isinstance(self.storage_path, Path)
70+
else Path(self.storage_path)
71+
)
72+
path.mkdir(parents=True, exist_ok=True)
73+
schema = self.resolved_schema_path()
74+
if isinstance(schema, Path):
75+
schema.parent.mkdir(parents=True, exist_ok=True)
5176

52-
def with_schema(self, schema_path: Path) -> KnowledgeGraphConfig:
77+
def with_schema(self, schema_path: Path | str) -> KnowledgeGraphConfig:
5378
"""Return a copy of the config with an explicit schema path."""
5479
return KnowledgeGraphConfig(
5580
storage_path=self.storage_path,
5681
schema_path=schema_path,
5782
default_dataset=self.default_dataset,
5883
entity_types=self.entity_types,
5984
relationship_types=self.relationship_types,
85+
storage_options=self.storage_options,
6086
)
6187

6288
def load_graph_config(self) -> GraphConfig:
@@ -84,12 +110,26 @@ def type_hints(self) -> dict[str, tuple[str, ...]]:
84110

85111
def _load_schema_payload(self) -> Mapping[str, Any]:
86112
schema_path = self.resolved_schema_path()
87-
if not schema_path.exists():
88-
raise FileNotFoundError(
89-
f"Graph schema configuration not found at {schema_path}"
90-
)
91-
with schema_path.open("r", encoding="utf-8") as handle:
92-
payload = yaml.safe_load(handle) or {}
113+
if isinstance(schema_path, str) and "://" in schema_path:
114+
fs, path = pyarrow.fs.FileSystem.from_uri(schema_path)
115+
try:
116+
with fs.open_input_stream(path) as f:
117+
payload = yaml.safe_load(f.read()) or {}
118+
except FileNotFoundError:
119+
raise FileNotFoundError(
120+
f"Graph schema configuration not found at {schema_path}"
121+
)
122+
else:
123+
if isinstance(schema_path, str):
124+
schema_path = Path(schema_path)
125+
126+
if not schema_path.exists():
127+
raise FileNotFoundError(
128+
f"Graph schema configuration not found at {schema_path}"
129+
)
130+
with schema_path.open("r", encoding="utf-8") as handle:
131+
payload = yaml.safe_load(handle) or {}
132+
93133
if not isinstance(payload, Mapping):
94134
raise ValueError("Graph schema configuration must be a mapping")
95135
return payload # type: ignore[return-value]

python/python/knowledge_graph/main.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,8 @@ def init_graph(config: "KnowledgeGraphConfig") -> None:
3030
config.ensure_directories()
3131
LanceGraphStore(config).ensure_layout()
3232
schema_path = config.resolved_schema_path()
33-
if schema_path.exists():
34-
print(f"Schema already present at {schema_path}")
35-
return
3633

37-
schema_stub = """\
38-
# Lance knowledge graph schema
34+
schema_stub = """# Lance knowledge graph schema
3935
#
4036
# Define node labels and relationship mappings. Example:
4137
# nodes:
@@ -56,6 +52,28 @@ def init_graph(config: "KnowledgeGraphConfig") -> None:
5652
entity_types: []
5753
relationship_types: []
5854
"""
55+
56+
if isinstance(schema_path, str):
57+
import pyarrow.fs
58+
59+
try:
60+
fs, path = pyarrow.fs.FileSystem.from_uri(schema_path)
61+
info = fs.get_file_info(path)
62+
if info.type != pyarrow.fs.FileType.NotFound:
63+
print(f"Schema already present at {schema_path}")
64+
return
65+
with fs.open_output_stream(path) as f:
66+
f.write(schema_stub.encode("utf-8"))
67+
print(f"Created schema template at {schema_path}")
68+
return
69+
except Exception as e:
70+
print(f"Failed to initialize schema at {schema_path}: {e}", file=sys.stderr)
71+
return
72+
73+
if schema_path.exists():
74+
print(f"Schema already present at {schema_path}")
75+
return
76+
5977
schema_path.write_text(schema_stub, encoding="utf-8")
6078
print(f"Created schema template at {schema_path}")
6179

python/python/knowledge_graph/store.py

Lines changed: 81 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@
44

55
import logging
66
from importlib import import_module
7+
from pathlib import Path
78
from typing import TYPE_CHECKING, Dict, Iterable, Mapping, Optional
89

10+
import fsspec
11+
import pyarrow as pa
12+
913
if TYPE_CHECKING:
10-
from pathlib import Path
1114
from types import ModuleType
1215

13-
import pyarrow as pa
14-
1516
from .config import KnowledgeGraphConfig
1617

1718

@@ -23,38 +24,80 @@ class LanceGraphStore:
2324

2425
def __init__(self, config: "KnowledgeGraphConfig"):
2526
self._config = config
26-
self._root: "Path" = config.storage_path
27+
self._root: Path | str = config.storage_path
2728
self._lance: Optional[ModuleType] = None
2829
self._lance_attempted = False
2930

31+
# Initialize filesystem interface
32+
# We convert to string to ensure compatibility with fsspec, but we'll
33+
# use self._root (the original type) when reconstructing return values.
34+
try:
35+
self._fs, self._fs_path = fsspec.core.url_to_fs(
36+
str(self._root), **(self.config.storage_options or {})
37+
)
38+
except ImportError:
39+
# Re-raise explicit ImportError if protocol driver (e.g. gcsfs, s3fs)
40+
# is missing
41+
raise
42+
3043
@property
3144
def config(self) -> "KnowledgeGraphConfig":
3245
"""Return the configuration backing this store."""
3346
return self._config
3447

3548
@property
36-
def root(self) -> "Path":
49+
def root(self) -> Path | str:
3750
"""Return the root path for persisted datasets."""
3851
return self._root
3952

4053
def ensure_layout(self) -> None:
4154
"""Create the storage layout if it does not already exist."""
42-
self._root.mkdir(parents=True, exist_ok=True)
43-
44-
def list_datasets(self) -> Dict[str, "Path"]:
55+
try:
56+
self._fs.makedirs(self._fs_path, exist_ok=True)
57+
except Exception:
58+
# S3/GCS might not support directory creation or it might be implicit.
59+
# We treat failure here as non-fatal if the path is actually accessible
60+
# later,
61+
# but usually makedirs is safe on object stores (no-op).
62+
pass
63+
64+
def list_datasets(self) -> Dict[str, Path | str]:
4565
"""Enumerate known Lance datasets."""
46-
datasets: Dict[str, Path] = {}
47-
if not self._root.exists():
48-
return datasets
49-
for child in self._root.iterdir():
50-
if child.is_dir() and child.suffix == ".lance":
51-
datasets[child.stem] = child
66+
datasets: Dict[str, Path | str] = {}
67+
68+
try:
69+
if not self._fs.exists(self._fs_path):
70+
return datasets
71+
infos = self._fs.ls(self._fs_path, detail=True)
72+
except Exception as e:
73+
# We want to swallow "not found" errors but raise others (like Auth errors)
74+
if isinstance(e, FileNotFoundError):
75+
return datasets
76+
77+
msg = str(e).lower()
78+
if "not found" in msg or "no such file" in msg or "does not exist" in msg:
79+
return datasets
80+
raise
81+
82+
root_str = str(self._root)
83+
for info in infos:
84+
name = info["name"].rstrip("/")
85+
base_name = name.split("/")[-1]
86+
if info["type"] == "directory" and base_name.endswith(".lance"):
87+
dataset_name = base_name[:-6]
88+
full_path = f"{root_str.rstrip('/')}/{base_name}"
89+
if isinstance(self._root, Path):
90+
datasets[dataset_name] = Path(full_path)
91+
else:
92+
datasets[dataset_name] = full_path
5293
return datasets
5394

54-
def _dataset_path(self, name: str) -> "Path":
95+
def _dataset_path(self, name: str) -> Path | str:
5596
"""Create the canonical path for a dataset."""
5697
safe_name = name.replace("/", "_")
57-
return self._root / f"{safe_name}.lance"
98+
if isinstance(self._root, Path):
99+
return self._root / f"{safe_name}.lance"
100+
return f"{self._root.rstrip('/')}/{safe_name}.lance"
58101

59102
def _get_lance(self) -> ModuleType:
60103
if not self._lance_attempted:
@@ -77,6 +120,20 @@ def _get_lance(self) -> ModuleType:
77120
raise ImportError("Lance module failed to load")
78121
return self._lance
79122

123+
def _path_exists(self, path: Path | str) -> bool:
124+
if isinstance(path, Path):
125+
return path.exists()
126+
try:
127+
fs, p = fsspec.core.url_to_fs(path)
128+
except Exception:
129+
# If we cannot resolve the filesystem (e.g. missing gcsfs), we should raise
130+
# rather than assuming the path does not exist.
131+
raise
132+
try:
133+
return fs.exists(p)
134+
except Exception:
135+
return False
136+
80137
def load_tables(
81138
self,
82139
names: Optional[Iterable[str]] = None,
@@ -91,17 +148,18 @@ def load_tables(
91148
tables: Dict[str, "pa.Table"] = {}
92149
for name in requested:
93150
path = available.get(name, self._dataset_path(name))
94-
if not path.exists():
151+
if not self._path_exists(path):
95152
raise FileNotFoundError(f"Dataset '{name}' not found at {path}")
96-
dataset = lance.dataset(str(path))
153+
dataset = lance.dataset(
154+
str(path), storage_options=self.config.storage_options
155+
)
97156
table = dataset.scanner().to_table()
98157
tables[name] = table
99158
return tables
100159

101160
def write_tables(self, tables: Mapping[str, "pa.Table"]) -> None:
102161
"""Persist PyArrow tables as Lance datasets."""
103162
lance = self._get_lance()
104-
import pyarrow as pa # Local import; optional dependency
105163

106164
self.ensure_layout()
107165
for name, table in tables.items():
@@ -110,5 +168,7 @@ def write_tables(self, tables: Mapping[str, "pa.Table"]) -> None:
110168
f"Dataset '{name}' must be a pyarrow.Table (got {type(table)!r})"
111169
)
112170
path = self._dataset_path(name)
113-
mode = "overwrite" if path.exists() else "create"
114-
lance.write_dataset(table, str(path), mode=mode)
171+
mode = "overwrite" if self._path_exists(path) else "create"
172+
lance.write_dataset(
173+
table, str(path), mode=mode, storage_options=self.config.storage_options
174+
)

0 commit comments

Comments
 (0)