Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies = [
"referencing>=0.36.2",
"jsonref>=1.1.0",
"jsonpath-ng>=1.7.0",
"more-itertools>=10.8.0",
]
classifiers = [
"Development Status :: 3 - Alpha",
Expand Down
2 changes: 1 addition & 1 deletion snapcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description: |
base: core24
confinement: strict
adopt-info: craft-ls
license: BSD-3-Clause
license: BSD-3-Clause

parts:
craft-ls:
Expand Down
138 changes: 98 additions & 40 deletions src/craft_ls/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import re
from collections import deque
from importlib.resources import files
from itertools import chain, tee
from itertools import chain
from typing import Iterable, cast

import jsonref
Expand All @@ -15,6 +15,7 @@
from jsonschema.protocols import Validator
from jsonschema.validators import validator_for
from lsprotocol import types as lsp
from more_itertools import peekable
from referencing import Registry, Resource
from referencing.jsonschema import DRAFT202012
from yaml.events import (
Expand All @@ -31,16 +32,18 @@
BlockEndToken,
BlockMappingStartToken,
BlockSequenceStartToken,
KeyToken,
ScalarToken,
Token,
ValueToken,
)

from craft_ls.types_ import (
CompleteParsedResult,
DocumentNode,
IncompleteParsedResult,
MissingTypeCharmcraftValidator,
MissingTypeSnapcraftValidator,
Node,
ParsedResult,
Schema,
YamlDocument,
Expand Down Expand Up @@ -202,9 +205,11 @@ def robust_load(instance_document: str) -> tuple[YamlDocument, list[Event]]:
return cast(YamlDocument, yaml.safe_load(truncated_file)), truncated_file


def segmentize_nodes(root: yaml.CollectionNode) -> list[tuple[tuple[str, ...], Node]]:
def segmentize_nodes(
root: yaml.CollectionNode,
) -> list[tuple[tuple[str, ...], DocumentNode]]:
"""Flatten graph into path segments."""
segments: list[tuple[tuple[str, ...], Node]] = []
segments: list[tuple[tuple[str, ...], DocumentNode]] = []
nodes = list(root.value)

for node_pair in nodes:
Expand All @@ -217,7 +222,7 @@ def _do_segmentize_nodes(
first: yaml.CollectionNode,
second: yaml.CollectionNode,
prefix: tuple[str, ...] | None = None,
) -> list[tuple[tuple[str, ...], Node]]:
) -> list[tuple[tuple[str, ...], DocumentNode]]:
"""Recursive node segmentation.

Craft tools don't usually go over three levels, so we don't have to worry about recursion limits.
Expand All @@ -227,7 +232,7 @@ def _do_segmentize_nodes(

match second:
case yaml.ScalarNode(end_mark=selection_end):
current_node = Node(
current_node = DocumentNode(
value=first.value,
start=first.start_mark,
end=first.end_mark,
Expand All @@ -236,7 +241,7 @@ def _do_segmentize_nodes(
segments.append((prefix + (str(first.value),), current_node))

case yaml.MappingNode(end_mark=selection_end, value=children):
current_node = Node(
current_node = DocumentNode(
value=first.value,
start=first.start_mark,
end=first.end_mark,
Expand All @@ -256,8 +261,9 @@ def _do_segmentize_nodes(
)
)

case yaml.CollectionNode(end_mark=selection_end):
current_node = Node(
case yaml.SequenceNode(end_mark=selection_end):
print(selection_end.line)
current_node = DocumentNode(
value=first.value,
start=first.start_mark,
end=first.end_mark,
Expand All @@ -271,7 +277,9 @@ def _do_segmentize_nodes(


def get_diagnostics(
validator: Validator, instance: YamlDocument, segments: dict[tuple[str, ...], Node]
validator: Validator,
instance: YamlDocument,
segments: dict[tuple[str, ...], DocumentNode],
) -> list[lsp.Diagnostic]:
"""Validate a document against its schema."""
diagnostics = []
Expand Down Expand Up @@ -351,14 +359,8 @@ def get_diagnostics(
return diagnostics


def peek(tee_iterator: Iterable[Token]) -> Token | None:
"""Return the next value without moving the input forward."""
[forked_iterator] = tee(tee_iterator, 1)
return next(forked_iterator, None)


def get_diagnostic_range(
document_segments: dict[tuple[str, ...], Node], diag_segments: Iterable[str]
document_segments: dict[tuple[str, ...], DocumentNode], diag_segments: Iterable[str]
) -> lsp.Range:
"""Link the validation error to the position in the original document."""
if (
Expand Down Expand Up @@ -409,43 +411,68 @@ def get_description_from_path(path: Iterable[str | int], schema: Schema) -> str:
return MISSING_DESC


def get_schema_path_from_token_position(
position: lsp.Position, tokens: list[Token]
) -> deque[str] | None:
"""Parse the document to find the path to the current position."""
def get_exact_cursor_path(position: lsp.Position, tokens: list[Token]) -> deque[str]: # noqa: C901
"""Get the exact path to the cursor position."""
current_path: deque[str] = deque()
iterator = peekable(tokens)
last_scalar_token: str = ""
start_mark: yaml.Mark
end_mark: yaml.Mark
previous: Token | None = None
token: Token | None = None
next_token: Token | None = None

for token in iterator:
next_token = iterator.peek(None)
early_stop = (
not next_token
or next_token.start_mark.line > position.line
or (
next_token.start_mark.line >= position.line
and next_token.start_mark.column >= position.character
)
)
if early_stop:
break

for token in tokens:
match token:
case BlockMappingStartToken() | BlockSequenceStartToken():
current_path.append(last_scalar_token)

if last_scalar_token:
current_path.append(last_scalar_token)
case BlockEndToken():
current_path.pop()
if current_path:
current_path.pop()

case ScalarToken(value=value, start_mark=start_mark, end_mark=end_mark):
is_line_matching = start_mark.line == position.line
is_col_matching = (
start_mark.column <= position.character <= end_mark.column
)
if is_line_matching and is_col_matching:
current_path.append(value)
current_path.remove("")
return current_path
case KeyToken():
last_scalar_token = ""

else:
case ScalarToken(value=value):
if isinstance(previous, yaml.KeyToken):
last_scalar_token = value

case _:
continue
previous = token

if (
isinstance(token, (ValueToken, ScalarToken))
and last_scalar_token
and next_token
):
current_path.append(last_scalar_token)

return current_path


def get_node_path_from_token_position(
position: lsp.Position, segments: dict[tuple[str, ...], DocumentNode]
) -> tuple[str, ...] | None:
"""Find the innermost node path corresponding to the current position."""
for segment, node in reversed(segments.items()):
if node.contains(position):
return segment

return None


def list_symbols(
instance: YamlDocument, segments: dict[tuple[str, ...], Node]
instance: YamlDocument, segments: dict[tuple[str, ...], DocumentNode]
) -> list[lsp.DocumentSymbol]:
"""List document symbols.

Expand Down Expand Up @@ -496,3 +523,34 @@ def list_symbols(
symbols.append(symbol)

return symbols


def get_completion_items_from_path(
segments: Iterable[str], schema: Schema, instance: YamlDocument
) -> list[lsp.CompletionItem]:
"""Get possible values for children nodes or enum values."""
sub_instance = instance
sub_schema = schema
for segment in segments:
sub_schema = sub_schema.get("properties", {}).get(segment, {})
sub_instance = sub_instance.get(segment, {})

already_present = (
set(sub_instance.keys()) if isinstance(sub_instance, dict) else set()
)
items = []

if "cons" in sub_schema.keys():
items = [lsp.CompletionItem(label=str(key)) for key in [sub_schema["cons"]]]

elif "enum" in sub_schema.keys():
items = [
lsp.CompletionItem(label=str(key)) for key in sub_schema["enum"] if key
]

elif "properties" in sub_schema.keys():
items = [
lsp.CompletionItem(label=str(key))
for key in set(sub_schema["properties"].keys()) - already_present
]
return items
Loading
Loading