Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sigmf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# SPDX-License-Identifier: LGPL-3.0-or-later

# version of this python module
__version__ = "1.2.10"
__version__ = "1.2.11"
# matching version of the SigMF specification
__specification__ = "1.2.5"

Expand Down
1 change: 0 additions & 1 deletion sigmf/sigmffile.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,6 @@ def validate(self):
"""
Check schema and throw error if issue.
"""
version = self.get_global_field(self.VERSION_KEY)
validate.validate(self._metadata, self.get_schema())

def archive(self, name=None, fileobj=None):
Expand Down
44 changes: 42 additions & 2 deletions sigmf/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
import os
import sys
import warnings

# multi-threading library - should work well as I/O will be the primary
# cost for small SigMF files. Swap to ProcessPool if files are large.
Expand All @@ -25,6 +26,34 @@
from . import error, schema, sigmffile


def _get_namespaces_declared(metadata: dict) -> set:
"""Get set of declared extension namespaces."""
extensions = metadata.get("global", {}).get(sigmffile.SigMFFile.EXTENSIONS_KEY, [])
return {ext["name"].split(":")[0] for ext in extensions}


def _get_namespaces_used(metadata: dict) -> set:
"""Get set of used extension namespaces."""
used = set()

def check_dict(ddd: dict):
"""Check keys for non-core namespaces."""
for key in ddd:
if ":" in key:
namespace = key.split(":")[0]
if namespace != "core":
used.add(namespace)

for section in metadata:
if isinstance(metadata[section], dict):
check_dict(metadata[section])
elif isinstance(metadata[section], list):
for item in metadata[section]:
check_dict(item)

return used


def validate(metadata, ref_schema=schema.get_schema()) -> None:
"""
Check that the provided `metadata` dict is valid according to the `ref_schema` dict.
Expand All @@ -46,11 +75,22 @@ def validate(metadata, ref_schema=schema.get_schema()) -> None:
"""
jsonschema.validators.validate(instance=metadata, schema=ref_schema)

# check namespaces
undeclared = _get_namespaces_used(metadata) - _get_namespaces_declared(metadata)
if undeclared:
warnings.warn(
f"Found undeclared extensions in use: {', '.join(sorted(undeclared))}. "
f"All extensions should be declared in {sigmffile.SigMFFile.EXTENSIONS_KEY}. "
"This will raise a ValidationError in future versions.",
DeprecationWarning,
stacklevel=2,
)

# ensure captures and annotations have monotonically increasing sample_start
for key in ["captures", "annotations"]:
count = -1
for item in metadata[key]:
new_count = item["core:sample_start"]
new_count = item[sigmffile.SigMFFile.START_INDEX_KEY]
if new_count < count:
raise jsonschema.exceptions.ValidationError(f"{key} has incorrect sample start ordering.")
count = new_count
Expand Down Expand Up @@ -121,7 +161,7 @@ def main(arg_tuple: Optional[Tuple[str, ...]] = None) -> None:
n_total = len(paths)
# estimate number of CPU cores
# https://stackoverflow.com/questions/1006289/how-to-find-out-the-number-of-cpus-using-python
est_num_workers = len(os.sched_getaffinity(0)) if os.name == 'posix' else os.cpu_count()
est_num_workers = len(os.sched_getaffinity(0)) if os.name == "posix" else os.cpu_count()
# create a thread pool
# https://docs.python.org/3.7/library/concurrent.futures.html#threadpoolexecutor
with ThreadPoolExecutor(max_workers=est_num_workers) as executor:
Expand Down
47 changes: 39 additions & 8 deletions tests/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

"""Tests for Validator"""

import copy
import tempfile
import unittest
from pathlib import Path
Expand All @@ -18,9 +19,12 @@
from .testdata import TEST_FLOAT32_DATA, TEST_METADATA


def test_valid_data():
"""ensure the default metadata is OK"""
SigMFFile(TEST_METADATA).validate()
class NominalCases(unittest.TestCase):
"""Cases where the validator should succeed."""

def test_nominal(self):
"""nominal case should pass"""
SigMFFile(copy.deepcopy(TEST_METADATA)).validate()


class CommandLineValidator(unittest.TestCase):
Expand All @@ -32,7 +36,7 @@ def setUp(self):
self.tmp_path = tmp_path = Path(self.tmp_dir.name)
junk_path = tmp_path / "junk"
TEST_FLOAT32_DATA.tofile(junk_path)
some_meta = SigMFFile(TEST_METADATA, data_file=junk_path)
some_meta = SigMFFile(copy.deepcopy(TEST_METADATA), data_file=junk_path)
some_meta.tofile(tmp_path / "a")
some_meta.tofile(tmp_path / "b")
some_meta.tofile(tmp_path / "c", toarchive=True)
Expand Down Expand Up @@ -75,13 +79,14 @@ class FailingCases(unittest.TestCase):
"""Cases where the validator should raise an exception."""

def setUp(self):
self.metadata = dict(TEST_METADATA)
self.metadata = copy.deepcopy(TEST_METADATA)

def test_no_version(self):
"""core:version must be present"""
del self.metadata[SigMFFile.GLOBAL_KEY][SigMFFile.VERSION_KEY]
"""version key must be present"""
meta = SigMFFile(copy.deepcopy(self.metadata))
del meta._metadata[SigMFFile.GLOBAL_KEY][SigMFFile.VERSION_KEY]
with self.assertRaises(ValidationError):
SigMFFile(self.metadata).validate()
meta.validate()

def test_extra_top_level_key(self):
"""no extra keys allowed on the top level"""
Expand Down Expand Up @@ -128,3 +133,29 @@ def test_invalid_hash(self):
self.metadata[SigMFFile.GLOBAL_KEY][SigMFFile.HASH_KEY] = "derp"
with self.assertRaises(sigmf.error.SigMFFileError):
SigMFFile(metadata=self.metadata, data_file=temp_file.name)


class CheckNamespace(unittest.TestCase):
"""Cases where namespace issues are involved"""

def setUp(self):
self.metadata = copy.deepcopy(TEST_METADATA)

def test_undeclared_namespace(self):
"""unknown namespace should raise a warning"""
self.metadata[SigMFFile.GLOBAL_KEY]["other_namespace:key"] = 0
with self.assertWarns(Warning):
SigMFFile(self.metadata).validate()

def test_declared_namespace(self):
"""known namespace should not raise a warning"""
self.metadata[SigMFFile.GLOBAL_KEY]["other_namespace:key"] = 0
# define other_namespace
self.metadata[SigMFFile.GLOBAL_KEY][SigMFFile.EXTENSIONS_KEY] = [
{
"name": "other_namespace",
"version": "0.0.1",
"optional": False,
}
]
SigMFFile(self.metadata).validate()