From 6a0c83dcd4fe5930f86f16bf8a53935665909117 Mon Sep 17 00:00:00 2001 From: Kay Robbins <1189050+VisLab@users.noreply.github.com> Date: Tue, 6 Jan 2026 06:26:50 -0600 Subject: [PATCH 1/2] Added the script to extract column information --- docs/requirements.txt | 8 +- hed/scripts/extract_tabular_summary.py | 417 ++++++++++++++ hed/tools/analysis/tabular_summary.py | 30 +- hed/tools/util/io_util.py | 6 +- pyproject.toml | 6 +- requirements-dev.txt | 8 +- tests/scripts/test_extract_tabular_summary.py | 526 ++++++++++++++++++ tests/tools/analysis/test_tabular_summary.py | 215 ++++++- 8 files changed, 1189 insertions(+), 27 deletions(-) create mode 100644 hed/scripts/extract_tabular_summary.py create mode 100644 tests/scripts/test_extract_tabular_summary.py diff --git a/docs/requirements.txt b/docs/requirements.txt index 23ace825..9ce8bd47 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,6 +1,6 @@ -sphinx>=7.0.0 -furo>=2023.9.10 +sphinx>=7.1.0,<8.2.0 +furo>=2024.1.29 sphinx-copybutton>=0.5.2 -myst-parser>=2.0.0 +myst-parser>=3.0.0 sphinx-autodoc-typehints>=1.24.0 -linkify-it-py>=2.0.0 +linkify-it-py>=2.0.3 diff --git a/hed/scripts/extract_tabular_summary.py b/hed/scripts/extract_tabular_summary.py new file mode 100644 index 00000000..b12ceb40 --- /dev/null +++ b/hed/scripts/extract_tabular_summary.py @@ -0,0 +1,417 @@ +#!/usr/bin/env python3 +""" +Command-line script for extracting tabular summaries from datasets without BIDS organization. + +This script processes TSV (tab-separated values) files and generates summary statistics about +the columns and their values. Unlike hed_extract_bids_sidecar, this script does not assume +BIDS dataset organization and can process any collection of TSV files matching specified +criteria. + +Logging Options: +- Default: WARNING level logs go to stderr (quiet unless there are issues) +- --verbose or --log-level INFO: Show informational messages about progress +- --log-level DEBUG: Show detailed debugging information +- --log-file FILE: Save logs to a file instead of/in addition to stderr +- --log-quiet: When using --log-file, suppress stderr output (file only) + +Examples: + # Extract summary from event TSV files (default suffix='events') + extract_tabular_summary /path/to/data + + # Extract summary from all TSV files using wildcard + extract_tabular_summary /path/to/data --suffix '*' + + # Extract summary with verbose output and save to file + extract_tabular_summary /path/to/data --verbose --output-file summary.json + + # Extract summary with categorical value limit + extract_tabular_summary /path/to/data --categorical-limit 50 + + # Process files with specific suffix and exclude certain directories + extract_tabular_summary /path/to/data --suffix participants --exclude-dirs test backup + + # Filter to only process files containing 'sub-01' in filename + extract_tabular_summary /path/to/data --filter 'sub-01' + + # Filter to only process files from task 'rest' with all TSV files + extract_tabular_summary /path/to/data --suffix '*' --filter 'task-rest' +""" + +import argparse +import json +import logging +import sys +from pathlib import Path +from hed import _version as vr +from hed.tools.util.io_util import get_file_list +from hed.tools.analysis.tabular_summary import TabularSummary + + +def get_parser(): + """Create the argument parser for extract_tabular_summary. + + Returns: + argparse.ArgumentParser: Configured argument parser. + """ + parser = argparse.ArgumentParser( + description="Extract tabular summary from a collection of tabular files.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + + # Required arguments + parser.add_argument("data_path", help="Full path of root directory containing TSV files to process.") + + # File selection arguments + parser.add_argument( + "-p", + "--prefix", + dest="name_prefix", + default=None, + help="Optional prefix for base filename (e.g., 'sub-' to match 'sub-01_events.tsv').", + ) + parser.add_argument( + "-s", + "--suffix", + dest="name_suffix", + default="events", + help="Suffix for base filename (e.g., 'events' to match files ending with '_events.tsv'). " + "Use '*' to match all TSV files regardless of suffix. Default: events", + ) + parser.add_argument( + "-x", "--exclude-dirs", nargs="*", default=[], dest="exclude_dirs", help="Directory names to exclude from file search." + ) + parser.add_argument( + "-fl", + "--filter", + dest="filename_filter", + default=None, + help="Optional string to filter filenames. Only files containing this string in their name will be processed.", + ) + + # Column processing arguments + parser.add_argument( + "-vc", + "--value-columns", + dest="value_columns", + nargs="*", + default=None, + help="List of column names to treat as value columns (numeric/continuous data).", + ) + parser.add_argument( + "-sc", + "--skip-columns", + dest="skip_columns", + nargs="*", + default=None, + help="List of column names to skip in the extraction.", + ) + parser.add_argument( + "-cl", + "--categorical-limit", + dest="categorical_limit", + type=int, + default=None, + help="Maximum number of unique values to store for a categorical column. " + "If a column has more unique values, it will be truncated. Default: None (no limit).", + ) + + # Output arguments + parser.add_argument( + "-o", + "--output-file", + dest="output_file", + default="", + help="Full path of output file for the tabular summary (JSON format). " + "If not specified, output written to standard out.", + ) + parser.add_argument( + "-f", + "--format", + dest="output_format", + choices=["json", "text"], + default="json", + help="Output format: 'json' for JSON structure or 'text' for human-readable summary. Default: json", + ) + + # Logging arguments + parser.add_argument( + "-l", + "--log-level", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + default="WARNING", + help="Log level (case insensitive). Default: WARNING", + ) + parser.add_argument( + "-lf", + "--log-file", + dest="log_file", + default=None, + help="Full path to save log output to file. If not specified, logs go to stderr.", + ) + parser.add_argument( + "-lq", + "--log-quiet", + action="store_true", + dest="log_quiet", + help="If present, suppress log output to stderr (only applies if --log-file is used).", + ) + parser.add_argument( + "-v", + "--verbose", + action="store_true", + help="If present, output informative messages as computation progresses (equivalent to --log-level INFO).", + ) + + return parser + + +def extract_summary(args): + """Extract tabular summary from files in the specified directory. + + Parameters: + args (argparse.Namespace): Parsed command line arguments. + + Returns: + TabularSummary: The combined summary of all processed files. + + Raises: + FileNotFoundError: If no files matching criteria are found. + Exception: For various file processing errors. + """ + logger = logging.getLogger("extract_tabular_summary") + logger.info(f"Data directory: {args.data_path}") + logger.info(f"HED tools version: {str(vr.get_versions())}") + logger.debug(f"Name prefix: {args.name_prefix}") + logger.debug(f"Name suffix: {args.name_suffix}") + logger.debug(f"Exclude directories: {args.exclude_dirs}") + logger.debug(f"Filename filter: {args.filename_filter}") + logger.debug(f"Value columns: {args.value_columns}") + logger.debug(f"Skip columns: {args.skip_columns}") + logger.debug(f"Categorical limit: {args.categorical_limit}") + + try: + # Handle wildcard suffix - '*' means match all files + suffix_filter = None if args.name_suffix == "*" else args.name_suffix + + # Get list of TSV files matching criteria + logger.info("Searching for TSV files matching criteria...") + if args.name_suffix == "*": + logger.debug("Using wildcard suffix - matching all TSV files") + + file_list = get_file_list( + root_path=args.data_path, + name_prefix=args.name_prefix, + name_suffix=suffix_filter, + extensions=[".tsv"], + exclude_dirs=args.exclude_dirs, + ) + + # Apply filename filter if specified + if args.filename_filter: + original_count = len(file_list) + file_list = [f for f in file_list if args.filename_filter in Path(f).name] + logger.info(f"Filename filter '{args.filename_filter}' reduced files from {original_count} to {len(file_list)}") + + if not file_list: + error_msg = ( + f"No TSV files found matching criteria in {args.data_path}. " + f"Prefix: {args.name_prefix}, " + f"Suffix: {args.name_suffix}, " + f"Filter: {args.filename_filter}" + ) + logger.error(error_msg) + raise FileNotFoundError(error_msg) + + logger.info(f"Found {len(file_list)} files to process") + if logger.isEnabledFor(logging.DEBUG): + for file_path in file_list: + logger.debug(f" - {file_path}") + + # Create the overall TabularSummary + logger.info("Creating overall tabular summary...") + overall_summary = TabularSummary( + value_cols=args.value_columns, + skip_cols=args.skip_columns, + name=f"Summary of {Path(args.data_path).name}", + categorical_limit=args.categorical_limit, + ) + + # Process each file + logger.info("Processing files...") + successful_files = 0 + failed_files = 0 + + for file_path in file_list: + try: + logger.debug(f"Processing: {file_path}") + + # Create a TabularSummary for this individual file + file_summary = TabularSummary( + value_cols=args.value_columns, + skip_cols=args.skip_columns, + name=Path(file_path).name, + categorical_limit=args.categorical_limit, + ) + + # Update the file summary with the file's data + file_summary.update(file_path, name=file_path) + + # Add this file's summary to the overall summary + overall_summary.update_summary(file_summary) + + successful_files += 1 + logger.debug(f"Successfully processed: {file_path}") + + except Exception as e: + failed_files += 1 + logger.warning(f"Failed to process {file_path}: {e}") + logger.debug(f"Full exception for {file_path}:", exc_info=True) + + # Log final statistics + logger.info("Processing complete:") + logger.info(f" Successfully processed: {successful_files} files") + if failed_files > 0: + logger.warning(f" Failed to process: {failed_files} files") + logger.info(f" Total events across all files: {overall_summary.total_events}") + logger.info(f" Categorical columns: {len(overall_summary.categorical_info)}") + logger.info(f" Value columns: {len(overall_summary.value_info)}") + + if successful_files == 0: + raise Exception("No files were successfully processed") + + return overall_summary + + except Exception as e: + logger.error(f"Error during summary extraction: {e}") + logger.debug("Full exception details:", exc_info=True) + raise + + +def format_output(summary, args): + """Format the summary for output based on requested format. + + Parameters: + summary (TabularSummary): The tabular summary to format. + args (argparse.Namespace): Parsed command line arguments. + + Returns: + str: Formatted output string. + """ + if args.output_format == "text": + # Return human-readable text format + return str(summary) + else: + # Return JSON format + summary_dict = summary.get_summary(as_json=False) + output_dict = { + "tabular_summary": summary_dict, + "hedtools_version": str(vr.get_versions()), + "parameters": { + "data_path": args.data_path, + "name_prefix": args.name_prefix, + "name_suffix": args.name_suffix, + "exclude_dirs": args.exclude_dirs, + "value_columns": args.value_columns, + "skip_columns": args.skip_columns, + "categorical_limit": args.categorical_limit, + }, + } + return json.dumps(output_dict, indent=4) + + +def setup_logging(args): + """Configure logging based on command line arguments. + + Parameters: + args (argparse.Namespace): Parsed command line arguments. + + Returns: + logging.Logger: Configured logger instance. + """ + # Determine log level + log_level = args.log_level.upper() if args.log_level else "WARNING" + if args.verbose: + log_level = "INFO" + + # Configure logging format + log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + date_format = "%Y-%m-%d %H:%M:%S" + + # Clear any existing handlers from root logger + root_logger = logging.getLogger() + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + # Set the root logger level + root_logger.setLevel(getattr(logging, log_level)) + + # Create formatter + formatter = logging.Formatter(log_format, datefmt=date_format) + + # File handler if log file specified + if args.log_file: + file_handler = logging.FileHandler(args.log_file, mode="w", encoding="utf-8") + file_handler.setLevel(getattr(logging, log_level)) + file_handler.setFormatter(formatter) + root_logger.addHandler(file_handler) + + # Console handler (stderr) unless explicitly quieted and file logging is used + if not args.log_quiet or not args.log_file: + console_handler = logging.StreamHandler(sys.stderr) + console_handler.setLevel(getattr(logging, log_level)) + console_handler.setFormatter(formatter) + root_logger.addHandler(console_handler) + + logger = logging.getLogger("extract_tabular_summary") + logger.info(f"Starting tabular summary extraction with log level: {log_level}") + if args.log_file: + logger.info(f"Log output will be saved to: {args.log_file}") + + return logger + + +def main(arg_list=None): + """Main entry point for the script. + + Parameters: + arg_list (list, None): Optional list of command line arguments for testing. + If None, uses sys.argv. + + Returns: + int: Exit code (0 for success, non-zero for failure). + """ + # Create the argument parser + parser = get_parser() + + # Parse the arguments + args = parser.parse_args(arg_list) + + # Setup logging + logger = setup_logging(args) + + try: + # Extract the summary + summary = extract_summary(args) + + # Format output + output = format_output(summary, args) + + # Write to file or print to stdout + if args.output_file: + logger.info(f"Writing output to: {args.output_file}") + with open(args.output_file, "w", encoding="utf-8") as fp: + fp.write(output) + else: + print(output) + + logger.info("Extraction completed successfully") + return 0 + + except Exception as e: + logger.error(f"Extraction failed with exception: {e}") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/hed/tools/analysis/tabular_summary.py b/hed/tools/analysis/tabular_summary.py index c40ef6b2..5f3e33f1 100644 --- a/hed/tools/analysis/tabular_summary.py +++ b/hed/tools/analysis/tabular_summary.py @@ -17,7 +17,7 @@ def __init__(self, value_cols=None, skip_cols=None, name="", categorical_limit=N value_cols (list, None): List of columns to be treated as value columns. skip_cols (list, None): List of columns to be skipped. name (str): Name associated with the dictionary. - categorical_limit (int, None): Maximum number of unique values to store for categorical columns. + categorical_limit (int, None): Maximum number of unique values to store for a categorical column. """ @@ -25,6 +25,7 @@ def __init__(self, value_cols=None, skip_cols=None, name="", categorical_limit=N self.categorical_info = {} self.value_info = {} self.categorical_counts = {} + self.overflow_columns = set() self.categorical_limit = categorical_limit if value_cols and skip_cols and set(value_cols).intersection(skip_cols): raise HedFileError( @@ -45,6 +46,9 @@ def __str__(self): """Return a str version of this summary.""" indent = " " summary_list = [f"Summary for column dictionary {self.name}:"] + if self.overflow_columns: + sorted_overflow = sorted(self.overflow_columns) + summary_list.append(f"{indent}Overflow columns ({len(sorted_overflow)}): {', '.join(sorted_overflow)}") sorted_keys = sorted(self.categorical_info.keys()) summary_list.append(f"{indent}Categorical columns ({len(sorted_keys)}):") for key in sorted_keys: @@ -106,6 +110,7 @@ def get_summary(self, as_json=False) -> Union[dict, str]: "Name": self.name, "Total events": self.total_events, "Total files": self.total_files, + "Overflow columns": sorted(self.overflow_columns), "Categorical columns": categorical_cols, "Categorical counts": self.categorical_counts, "Value columns": value_cols, @@ -170,6 +175,7 @@ def update_summary(self, tab_sum): self.total_events = self.total_events + tab_sum.total_events for file, _key in tab_sum.files.items(): self.files[file] = "" + self.overflow_columns.update(tab_sum.overflow_columns) self._update_dict_skip(tab_sum) self._update_dict_value(tab_sum) self._update_dict_categorical(tab_sum) @@ -192,12 +198,20 @@ def _update_categorical(self, tab_name, values, cat_counts): self.categorical_counts[tab_name][1] += cat_counts[1] total_values = self.categorical_info[tab_name] for name, value in values.items(): - if self.categorical_limit is not None and len(total_values) >= self.categorical_limit: - break - value_list = total_values.get(name, [0, 0]) - if not isinstance(value, list): - value = [value, 1] - total_values[name] = [value_list[0] + value[0], value_list[1] + value[1]] + # If value already exists, always update its count + if name in total_values: + value_list = total_values[name] + if not isinstance(value, list): + value = [value, 1] + total_values[name] = [value_list[0] + value[0], value_list[1] + value[1]] + # Only add new values if we haven't reached the limit + elif self.categorical_limit is None or len(total_values) < self.categorical_limit: + if not isinstance(value, list): + value = [value, 1] + total_values[name] = [value[0], value[1]] + else: + # Mark this column as having overflowed + self.overflow_columns.add(tab_name) def _update_dataframe(self, data, name): """Update the information based on columnar data. @@ -226,8 +240,6 @@ def _update_dataframe(self, data, name): cat_counts[0] += len(col_values) cat_counts[1] += 1 self.categorical_counts[col_name] = cat_counts - if self.categorical_limit is not None and len(col_values) > self.categorical_limit: - continue col_values = col_values.astype(str) values = col_values.value_counts(ascending=True) self._update_categorical(col_name, values, cat_counts) diff --git a/hed/tools/util/io_util.py b/hed/tools/util/io_util.py index 2345bed2..c2cd0f36 100644 --- a/hed/tools/util/io_util.py +++ b/hed/tools/util/io_util.py @@ -215,9 +215,9 @@ def get_file_list(root_path, name_prefix=None, name_suffix=None, extensions=None """Return paths satisfying various conditions. Parameters: - root_path (str): Full path of the directory tree to be traversed (no ending slash). - name_prefix (list, str, None): An optional prefix for the base filename. - name_suffix (list, str, None): An optional suffix for the base filename. + root_path (str): Full path of the directory tree to be traversed (no ending slash). + name_prefix (list, str, None): An optional prefix for the base filename. + name_suffix (list, str, None): An optional suffix for the base filename. extensions (list, None): A list of extensions to be selected. exclude_dirs (list, None): A list of paths to be excluded. diff --git a/pyproject.toml b/pyproject.toml index abccc202..419e34b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,12 +75,12 @@ dev = [ "mdformat-tables>=0.4.0", ] docs = [ - "sphinx>=7.0.0", - "furo>=2023.9.10", + "sphinx>=7.1.0,<8.2.0", + "furo>=2024.1.29", "sphinx-copybutton>=0.5.2", "myst-parser>=2.0.0", "sphinx-autodoc-typehints>=1.24.0", - "linkify-it-py>=2.0.0", + "linkify-it-py>=2.0.3", ] test = [ "coverage>=7.0.0", diff --git a/requirements-dev.txt b/requirements-dev.txt index bee8afab..a5db4e31 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -13,12 +13,12 @@ codespell>=2.2.0 black[jupyter]>=24.0.0 # Documentation requirements -sphinx>=7.0.0 -furo>=2023.9.10 +sphinx>=7.1.0,<8.2.0 +furo>=2024.1.29 sphinx-copybutton>=0.5.2 -myst-parser>=2.0.0 +myst-parser>=3.0.0 sphinx-autodoc-typehints>=1.24.0 -linkify-it-py>=2.0.0 +linkify-it-py>=2.0.3 # Jupyter notebook requirements for examples jupyter>=1.0.0 diff --git a/tests/scripts/test_extract_tabular_summary.py b/tests/scripts/test_extract_tabular_summary.py new file mode 100644 index 00000000..12464014 --- /dev/null +++ b/tests/scripts/test_extract_tabular_summary.py @@ -0,0 +1,526 @@ +import os +import io +import json +import tempfile +import unittest +from unittest.mock import patch +from hed.scripts.extract_tabular_summary import main, get_parser, extract_summary + + +class TestExtractTabularSummary(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.data_root = os.path.realpath(os.path.join(os.path.dirname(__file__), "../data/bids_tests/eeg_ds003645s_hed_demo")) + # Suppress logging for cleaner test output + cls.mock_logger_patch = patch("logging.getLogger") + cls.mock_logger = cls.mock_logger_patch.start() + cls.mock_logger.return_value.info.return_value = None + cls.mock_logger.return_value.debug.return_value = None + cls.mock_logger.return_value.warning.return_value = None + cls.mock_logger.return_value.error.return_value = None + cls.mock_logger.return_value.isEnabledFor.return_value = False + + @classmethod + def tearDownClass(cls): + cls.mock_logger_patch.stop() + + def _get_summary_dict(self, output_dict): + """Helper to extract the tabular_summary from JSON output if wrapped.""" + if "tabular_summary" in output_dict: + return output_dict["tabular_summary"] + return output_dict + + def test_get_parser(self): + """Test that argument parser is created correctly.""" + parser = get_parser() + self.assertIsNotNone(parser) + + # Test parsing valid arguments + args = parser.parse_args([self.data_root]) + self.assertEqual(args.data_path, self.data_root) + self.assertEqual(args.name_suffix, "events") # Default + self.assertEqual(args.output_format, "json") # Default + self.assertIsNone(args.categorical_limit) # Default + self.assertIsNone(args.filename_filter) # Default + + def test_parser_with_all_arguments(self): + """Test parser with all arguments specified.""" + parser = get_parser() + args = parser.parse_args( + [ + self.data_root, + "-p", + "sub-", + "-s", + "participants", + "-x", + "derivatives", + "code", + "-fl", + "sub-002", + "-vc", + "age", + "weight", + "-sc", + "notes", + "comments", + "-cl", + "25", + "-o", + "output.json", + "-f", + "text", + "-l", + "INFO", + "-v", + ] + ) + + self.assertEqual(args.data_path, self.data_root) + self.assertEqual(args.name_prefix, "sub-") + self.assertEqual(args.name_suffix, "participants") + self.assertEqual(args.exclude_dirs, ["derivatives", "code"]) + self.assertEqual(args.filename_filter, "sub-002") + self.assertEqual(args.value_columns, ["age", "weight"]) + self.assertEqual(args.skip_columns, ["notes", "comments"]) + self.assertEqual(args.categorical_limit, 25) + self.assertEqual(args.output_file, "output.json") + self.assertEqual(args.output_format, "text") + self.assertEqual(args.log_level, "INFO") + self.assertTrue(args.verbose) + + def test_main_default_events_json(self): + """Test basic extraction with default events suffix and JSON output.""" + arg_list = [self.data_root] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list) + self.assertEqual(result, 0) + + output = mock_stdout.getvalue() + output_dict = json.loads(output) + summary_dict = self._get_summary_dict(output_dict) + + # Should have required summary fields + self.assertIn("Name", summary_dict) + self.assertIn("Total files", summary_dict) + self.assertIn("Total events", summary_dict) + self.assertIn("Files", summary_dict) + self.assertIn("Categorical columns", summary_dict) + self.assertIn("Value columns", summary_dict) + + # Should have processed multiple files + total_files = summary_dict["Total files"] + self.assertGreater(total_files, 0) + + # Should have categorical columns + categorical = summary_dict["Categorical columns"] + self.assertGreater(len(categorical), 0) + + def test_main_text_output_format(self): + """Test extraction with text output format.""" + arg_list = [self.data_root, "-f", "text"] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list) + self.assertEqual(result, 0) + + output = mock_stdout.getvalue() + + # Text output should contain specific markers + self.assertIn("Summary for column dictionary", output) + self.assertIn("Categorical columns (", output) + self.assertIn("Value columns (", output) + + def test_main_with_output_file_json(self): + """Test extraction with JSON output file.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as tmp_file: + output_path = tmp_file.name + + try: + arg_list = [self.data_root, "-o", output_path, "-f", "json"] + + with patch("sys.stderr", new=io.StringIO()): + result = main(arg_list) + self.assertEqual(result, 0) + + # Verify the file was created and contains valid JSON + self.assertTrue(os.path.exists(output_path)) + with open(output_path, "r", encoding="utf-8") as f: + output_dict = json.load(f) + + summary_dict = self._get_summary_dict(output_dict) + self.assertIn("Name", summary_dict) + self.assertIn("Total events", summary_dict) + self.assertGreater(summary_dict["Total files"], 0) + + finally: + if os.path.exists(output_path): + os.remove(output_path) + + def test_main_with_output_file_text(self): + """Test extraction with text output file.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as tmp_file: + output_path = tmp_file.name + + try: + arg_list = [self.data_root, "-o", output_path, "-f", "text"] + + with patch("sys.stderr", new=io.StringIO()): + result = main(arg_list) + self.assertEqual(result, 0) + + # Verify the file was created and contains text + self.assertTrue(os.path.exists(output_path)) + with open(output_path, "r", encoding="utf-8") as f: + output = f.read() + + self.assertIn("Summary for column dictionary", output) + self.assertIn("Categorical columns (", output) + self.assertIn("Value columns (", output) + + finally: + if os.path.exists(output_path): + os.remove(output_path) + + def test_wildcard_suffix(self): + """Test using wildcard suffix to match all TSV files.""" + arg_list = [self.data_root, "-s", "*"] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list) + self.assertEqual(result, 0) + + output = mock_stdout.getvalue() + output_dict = json.loads(output) + summary_dict = self._get_summary_dict(output_dict) + + # Should process more files than just events + total_files = summary_dict["Total files"] + self.assertGreater(total_files, 0) + + # Files dict should include various suffixes + files = summary_dict["Files"] + self.assertIsInstance(files, dict) + self.assertGreater(len(files), 0) + + def test_with_skip_columns(self): + """Test that skip columns are excluded from summary.""" + arg_list = [self.data_root, "-s", "events", "-sc", "stim_file", "value"] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list) + self.assertEqual(result, 0) + + output = mock_stdout.getvalue() + output_dict = json.loads(output) + summary_dict = self._get_summary_dict(output_dict) + + categorical = summary_dict["Categorical columns"] + + # stim_file and value should not be in the summary + self.assertNotIn("stim_file", categorical) + self.assertNotIn("value", categorical) + + # But other columns should be there + self.assertGreater(len(categorical), 0) + + def test_with_value_columns(self): + """Test specifying value columns for numeric data.""" + arg_list = [self.data_root, "-s", "events", "-vc", "trial", "rep_lag"] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list) + self.assertEqual(result, 0) + + output = mock_stdout.getvalue() + output_dict = json.loads(output) + summary_dict = self._get_summary_dict(output_dict) + + value_cols = summary_dict["Value columns"] + + # trial and rep_lag should be treated as value columns + self.assertIn("trial", value_cols) + self.assertIn("rep_lag", value_cols) + + # Value columns store [total_values, num_files] + for col in ["trial", "rep_lag"]: + self.assertIsInstance(value_cols[col], list) + self.assertEqual(len(value_cols[col]), 2) + self.assertGreater(value_cols[col][0], 0) # total values > 0 + self.assertGreater(value_cols[col][1], 0) # num files > 0 + + def test_with_categorical_limit(self): + """Test categorical limit parameter.""" + arg_list = [self.data_root, "-s", "events", "-cl", "10"] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list) + self.assertEqual(result, 0) + + output = mock_stdout.getvalue() + output_dict = json.loads(output) + summary_dict = self._get_summary_dict(output_dict) + + self.assertIn("Categorical limit", summary_dict) + self.assertEqual(summary_dict["Categorical limit"], "10") + + # Check if overflow columns are tracked + if "Overflow columns" in summary_dict: + # Some columns should have overflowed with limit of 10 + self.assertIsInstance(summary_dict["Overflow columns"], list) + + def test_with_filename_filter(self): + """Test filename filter parameter.""" + arg_list = [self.data_root, "-s", "*", "-fl", "sub-002"] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list) + self.assertEqual(result, 0) + + output = mock_stdout.getvalue() + output_dict = json.loads(output) + summary_dict = self._get_summary_dict(output_dict) + + # Check that only sub-002 files were processed + files = summary_dict["Files"] + for file_path in files: + self.assertIn("sub-002", file_path) + + def test_with_filename_filter_and_suffix(self): + """Test combining filename filter with suffix.""" + arg_list = [self.data_root, "-s", "events", "-fl", "run-1"] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list) + self.assertEqual(result, 0) + + output = mock_stdout.getvalue() + output_dict = json.loads(output) + summary_dict = self._get_summary_dict(output_dict) + + # Should process events files for run-1 only + files = summary_dict["Files"] + for file_path in files: + self.assertIn("events", file_path) + self.assertIn("run-1", file_path) + + # Should have at least one file but not all run files + self.assertGreater(len(files), 0) + + def test_with_prefix(self): + """Test name prefix parameter.""" + arg_list = [self.data_root, "-p", "sub-", "-s", "*"] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list) + self.assertEqual(result, 0) + + output = mock_stdout.getvalue() + output_dict = json.loads(output) + summary_dict = self._get_summary_dict(output_dict) + + # Should process files starting with sub- + total_files = summary_dict["Total files"] + self.assertGreater(total_files, 0) + + def test_with_exclude_dirs(self): + """Test exclude directories parameter.""" + # First get count without exclusions + arg_list1 = [self.data_root, "-s", "*"] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list1) + self.assertEqual(result, 0) + output = mock_stdout.getvalue() + output_dict1 = json.loads(output) + summary_dict1 = self._get_summary_dict(output_dict1) + summary_dict1["Total files"] + + # Now exclude a directory (this dataset might not have these, but test the parameter) + arg_list2 = [self.data_root, "-s", "*", "-x", "derivatives", "code"] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list2) + self.assertEqual(result, 0) + output = mock_stdout.getvalue() + output_dict2 = json.loads(output) + summary_dict2 = self._get_summary_dict(output_dict2) + total_files2 = summary_dict2["Total files"] + + # Should have at least processed some files + self.assertGreater(total_files2, 0) + + def test_participants_suffix(self): + """Test extraction with participants suffix.""" + arg_list = [self.data_root, "-s", "participants"] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list) + self.assertEqual(result, 0) + + output = mock_stdout.getvalue() + output_dict = json.loads(output) + summary_dict = self._get_summary_dict(output_dict) + + # Should have categorical columns from participants file + categorical = summary_dict["Categorical columns"] + self.assertIn("sex", categorical) + + def test_no_files_found(self): + """Test handling when no files match the criteria.""" + arg_list = [self.data_root, "-s", "nonexistent_suffix"] + + with patch("sys.stderr", new=io.StringIO()): + result = main(arg_list) + self.assertEqual(result, 1) # Should return error code + + def test_no_files_after_filter(self): + """Test handling when filter eliminates all files.""" + arg_list = [self.data_root, "-s", "events", "-fl", "nonexistent_subject"] + + with patch("sys.stderr", new=io.StringIO()): + result = main(arg_list) + self.assertEqual(result, 1) # Should return error code + + def test_categorical_columns_have_counts(self): + """Test that categorical columns include value counts.""" + arg_list = [self.data_root, "-s", "events"] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list) + self.assertEqual(result, 0) + + output = mock_stdout.getvalue() + output_dict = json.loads(output) + summary_dict = self._get_summary_dict(output_dict) + + categorical = summary_dict["Categorical columns"] + + # Pick a known column and verify it has values with counts + if "event_type" in categorical: + event_type_data = categorical["event_type"] + self.assertIsInstance(event_type_data, dict) + # Should have some values + self.assertGreater(len(event_type_data), 0) + # Each value should have a count (list with [count, files]) + for _value, count_info in event_type_data.items(): + self.assertIsInstance(count_info, list) + self.assertEqual(len(count_info), 2) # [count, num_files] + + def test_value_columns_have_statistics(self): + """Test that value columns include proper count information.""" + arg_list = [self.data_root, "-s", "events", "-vc", "trial"] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list) + self.assertEqual(result, 0) + + output = mock_stdout.getvalue() + output_dict = json.loads(output) + summary_dict = self._get_summary_dict(output_dict) + + value_cols = summary_dict["Value columns"] + self.assertIn("trial", value_cols) + + # Value columns store [total_values, num_files] + trial_info = value_cols["trial"] + self.assertIsInstance(trial_info, list) + self.assertEqual(len(trial_info), 2) + self.assertGreater(trial_info[0], 0) # total values > 0 + self.assertGreater(trial_info[1], 0) # num files > 0 + + def test_multiple_runs_combination(self): + """Test that data from multiple runs is properly combined.""" + arg_list = [self.data_root, "-s", "events", "-fl", "sub-002"] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list) + self.assertEqual(result, 0) + + output = mock_stdout.getvalue() + output_dict = json.loads(output) + summary_dict = self._get_summary_dict(output_dict) + + # Should have processed multiple event files for sub-002 + files = summary_dict["Files"] + events_files = [f for f in files if "events" in f] + self.assertGreater(len(events_files), 1) + + # Total events should be sum across all files + total_events = summary_dict["Total events"] + self.assertGreater(total_events, 0) + + def test_categorical_limit_zero(self): + """Test edge case of categorical limit of 0.""" + arg_list = [self.data_root, "-s", "events", "-cl", "0"] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list) + self.assertEqual(result, 0) + + output = mock_stdout.getvalue() + output_dict = json.loads(output) + summary_dict = self._get_summary_dict(output_dict) + + self.assertEqual(summary_dict["Categorical limit"], "0") + + # Categorical columns should have empty dicts + categorical = summary_dict["Categorical columns"] + for _col_name, col_data in categorical.items(): + self.assertEqual(len(col_data), 0) + + def test_overflow_columns_in_output(self): + """Test that overflow columns are included when limit is exceeded.""" + arg_list = [self.data_root, "-s", "events", "-cl", "5"] + + with patch("sys.stdout", new=io.StringIO()) as mock_stdout: + result = main(arg_list) + self.assertEqual(result, 0) + + output = mock_stdout.getvalue() + output_dict = json.loads(output) + summary_dict = self._get_summary_dict(output_dict) + + # With limit of 5, some columns should overflow + if "Overflow columns" in summary_dict: + overflow = summary_dict["Overflow columns"] + self.assertIsInstance(overflow, list) + # stim_file definitely has > 5 unique values + self.assertIn("stim_file", overflow) + + def test_extract_summary_function_directly(self): + """Test the extract_summary function directly with args object.""" + parser = get_parser() + args = parser.parse_args([self.data_root, "-s", "events"]) + + summary = extract_summary(args) + + # Should return a TabularSummary object + from hed.tools.analysis.tabular_summary import TabularSummary + + self.assertIsInstance(summary, TabularSummary) + + # Should have processed files + self.assertGreater(len(summary.files), 0) + + def test_verbose_flag(self): + """Test that verbose flag doesn't cause errors.""" + arg_list = [self.data_root, "-s", "events", "-v"] + + with patch("sys.stdout", new=io.StringIO()): + result = main(arg_list) + self.assertEqual(result, 0) + + def test_log_level_debug(self): + """Test that debug log level doesn't cause errors.""" + arg_list = [self.data_root, "-s", "events", "-l", "DEBUG"] + + with patch("sys.stdout", new=io.StringIO()): + result = main(arg_list) + self.assertEqual(result, 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/tools/analysis/test_tabular_summary.py b/tests/tools/analysis/test_tabular_summary.py index e902cfeb..7afb9bb8 100644 --- a/tests/tools/analysis/test_tabular_summary.py +++ b/tests/tools/analysis/test_tabular_summary.py @@ -80,7 +80,7 @@ def test_get_summary(self): ) summary1 = dict1.get_summary(as_json=False) self.assertIsInstance(summary1, dict) - self.assertEqual(len(summary1), 9) + self.assertEqual(len(summary1), 10) # Now includes overflow_columns field summary2 = dict1.get_summary(as_json=True).replace('"', "") self.assertIsInstance(summary2, str) @@ -272,17 +272,17 @@ def test_categorical_limit_enforced(self): self.assertGreater(dict_with_limit.categorical_counts[col_name][0], 0) def test_categorical_limit_columns_with_many_values(self): - # Test that columns with many values are skipped during initial update + # Test that columns with many values are tracked in counts wh_df = get_new_dataframe(self.wh_events_path) # Set limit to 5 dict1 = TabularSummary(categorical_limit=5) dict1.update(wh_df) - # Columns with more than 5 unique values at collection time should still be tracked in counts + # Columns should be tracked in counts for col_name, counts in dict1.categorical_counts.items(): self.assertGreater(counts[0], 0, f"Column {col_name} should have event count > 0") - self.assertEqual(counts[1], 1, f"Column {col_name} should have been updated once") + self.assertGreaterEqual(counts[1], 1, f"Column {col_name} should have been updated at least once") def test_categorical_limit_in_summary(self): # Test that categorical_limit appears in the summary output @@ -334,6 +334,213 @@ def test_categorical_limit_update_dict(self): f"Column {col_name} should have at most 3 unique values after update_summary", ) + def test_overflow_columns_initialization(self): + # Test that overflow_columns is initialized as an empty set + dict1 = TabularSummary() + self.assertIsInstance(dict1.overflow_columns, set) + self.assertEqual(len(dict1.overflow_columns), 0) + + def test_overflow_columns_tracking(self): + # Test that overflow_columns tracks columns that exceed the limit + wh_df = get_new_dataframe(self.wh_events_path) + + # Set a low limit to ensure some columns overflow + dict1 = TabularSummary(categorical_limit=5) + dict1.update(wh_df) + + # Check that overflow_columns is populated + self.assertIsInstance(dict1.overflow_columns, set) + self.assertGreater(len(dict1.overflow_columns), 0, "Some columns should overflow with limit of 5") + + # Verify that columns in overflow_columns actually have many unique values + for col_name in dict1.overflow_columns: + if col_name in dict1.categorical_info: + # The column is tracked, should have exactly the limit + self.assertEqual(len(dict1.categorical_info[col_name]), 5) + + def test_overflow_columns_no_limit(self): + # Test that overflow_columns remains empty when there's no limit + stern_df = get_new_dataframe(self.stern_map_path) + + dict1 = TabularSummary(categorical_limit=None) + dict1.update(stern_df) + + self.assertEqual(len(dict1.overflow_columns), 0, "No columns should overflow without a limit") + + def test_overflow_columns_in_summary(self): + # Test that overflow_columns appears in the summary output + wh_df = get_new_dataframe(self.wh_events_path) + + dict1 = TabularSummary(categorical_limit=3) + dict1.update(wh_df) + + summary = dict1.get_summary(as_json=False) + self.assertIn("Overflow columns", summary) + self.assertIsInstance(summary["Overflow columns"], list) + self.assertGreater(len(summary["Overflow columns"]), 0) + + def test_overflow_columns_in_str(self): + # Test that overflow_columns appears in the string representation + wh_df = get_new_dataframe(self.wh_events_path) + + dict1 = TabularSummary(categorical_limit=3) + dict1.update(wh_df) + + str_output = str(dict1) + self.assertIn("Overflow columns", str_output) + for col_name in dict1.overflow_columns: + self.assertIn(col_name, str_output) + + def test_overflow_columns_update_summary(self): + # Test that overflow_columns are merged correctly with update_summary + stern_df = get_new_dataframe(self.stern_test1_path) + + dict1 = TabularSummary(categorical_limit=2) + dict1.update(stern_df) + overflow1 = dict1.overflow_columns.copy() + + dict2 = TabularSummary(categorical_limit=2) + dict2.update(stern_df) + overflow2 = dict2.overflow_columns.copy() + + # Merge dict2 into dict1 + dict1.update_summary(dict2) + + # Overflow columns should be the union of both + expected_overflow = overflow1.union(overflow2) + self.assertEqual(dict1.overflow_columns, expected_overflow) + + def test_categorical_limit_preserves_existing_values(self): + # Test that categorical_limit continues to update counts for existing values + # even after the limit is reached + stern_df1 = get_new_dataframe(self.stern_test1_path) + stern_df2 = get_new_dataframe(self.stern_test2_path) + + dict1 = TabularSummary(categorical_limit=5) + dict1.update(stern_df1) + + # Get initial counts for a column that exists in both files + initial_counts = {} + for col_name in dict1.categorical_info: + if col_name in dict1.categorical_info: + initial_counts[col_name] = {} + for val, count in dict1.categorical_info[col_name].items(): + initial_counts[col_name][val] = count[0] + + # Update with second dataframe + dict1.update(stern_df2) + + # Verify that counts for existing values have increased + for col_name in initial_counts: + if col_name in dict1.categorical_info: + for val in initial_counts[col_name]: + if val in dict1.categorical_info[col_name]: + # Count should have increased or stayed the same + self.assertGreaterEqual( + dict1.categorical_info[col_name][val][0], + initial_counts[col_name][val], + f"Count for {col_name}[{val}] should not decrease", + ) + + def test_categorical_limit_multiple_files(self): + # Test categorical limit behavior with multiple file updates + bids_demo_dir = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "../../data/bids_tests/eeg_ds003645s_hed_demo" + ) + + if not os.path.exists(bids_demo_dir): + self.skipTest(f"Test data directory not found: {bids_demo_dir}") + + files = get_file_list(bids_demo_dir, extensions=[".tsv"], name_suffix="events") + if not files: + self.skipTest(f"No event files found in {bids_demo_dir}") + + # Create summary with limit + dict1 = TabularSummary(categorical_limit=10, skip_cols=["HED", "HED_assembled"]) + + # Process multiple files + for file_path in files[:3]: # Just use first 3 files for speed + dict1.update(file_path) + + # Verify that: + # 1. Some columns have exactly the limit number of unique values + # 2. Those columns are in overflow_columns + # 3. Counts are accurate across files + limited_cols = [col for col in dict1.categorical_info if len(dict1.categorical_info[col]) == 10] + + if limited_cols: + # At least one limited column should be in overflow + overflow_intersection = set(limited_cols).intersection(dict1.overflow_columns) + self.assertGreater(len(overflow_intersection), 0, "At least one limited column should be marked as overflow") + + # Verify file count is correct + self.assertEqual(dict1.total_files, 3, "Should have processed 3 files") + + def test_overflow_columns_preserved_across_updates(self): + # Test that once a column is marked as overflow, it stays marked + wh_df = get_new_dataframe(self.wh_events_path) + + dict1 = TabularSummary(categorical_limit=3) + dict1.update(wh_df) + + initial_overflow = dict1.overflow_columns.copy() + self.assertGreater(len(initial_overflow), 0) + + # Update again with same data + dict1.update(wh_df) + + # Overflow columns should still include the initial ones + self.assertTrue(initial_overflow.issubset(dict1.overflow_columns)) + + def test_categorical_counts_with_limit(self): + # Test that categorical_counts tracks all values even when categorical_info is limited + wh_df = get_new_dataframe(self.wh_events_path) + + dict1 = TabularSummary(categorical_limit=3) + dict1.update(wh_df) + + # For columns that overflowed, categorical_counts should show more values + # than are stored in categorical_info + for col_name in dict1.overflow_columns: + if col_name in dict1.categorical_counts and col_name in dict1.categorical_info: + stored_values = len(dict1.categorical_info[col_name]) + total_values = dict1.categorical_counts[col_name][0] + + # We stored only 3 unique values, but there are more total values + self.assertEqual(stored_values, 3, f"{col_name} should store exactly 3 values") + self.assertGreater(total_values, 3, f"{col_name} should have more than 3 total occurrences") + + def test_categorical_limit_zero(self): + # Test edge case: categorical_limit of 0 + stern_df = get_new_dataframe(self.stern_map_path) + + dict1 = TabularSummary(categorical_limit=0) + dict1.update(stern_df) + + # Should track counts but store no unique values + for col_name in dict1.categorical_info: + self.assertEqual(len(dict1.categorical_info[col_name]), 0, f"{col_name} should have no stored values with limit=0") + + # But categorical_counts should still be populated + self.assertGreater(len(dict1.categorical_counts), 0) + + def test_mixed_value_and_categorical_with_limit(self): + # Test that value columns and categorical columns with limits work together + stern_df = get_new_dataframe(self.stern_test1_path) + + dict1 = TabularSummary(categorical_limit=5, value_cols=["latency"], skip_cols=["event_type"]) + dict1.update(stern_df) + + # Value columns should not be affected by categorical_limit + self.assertIn("latency", dict1.value_info) + self.assertNotIn("latency", dict1.categorical_info) + self.assertNotIn("latency", dict1.overflow_columns) + + # Skip columns should not appear anywhere + self.assertNotIn("event_type", dict1.categorical_info) + self.assertNotIn("event_type", dict1.value_info) + self.assertNotIn("event_type", dict1.overflow_columns) + if __name__ == "__main__": unittest.main() From 83ec83327f71eeb0b0b57355986caa3d7d15be51 Mon Sep 17 00:00:00 2001 From: Kay Robbins <1189050+VisLab@users.noreply.github.com> Date: Tue, 6 Jan 2026 16:23:41 -0600 Subject: [PATCH 2/2] Updated the myst version requirement --- docs/requirements.txt | 2 +- pyproject.toml | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/requirements.txt b/docs/requirements.txt index 9ce8bd47..7da139d8 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,4 +1,4 @@ -sphinx>=7.1.0,<8.2.0 +sphinx>=7.1.0,<10.0 furo>=2024.1.29 sphinx-copybutton>=0.5.2 myst-parser>=3.0.0 diff --git a/pyproject.toml b/pyproject.toml index 419e34b4..25ffea91 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,13 +72,12 @@ dev = [ "black>=24.0.0", "mdformat>=0.7.0", "mdformat-myst>=0.1.5", - "mdformat-tables>=0.4.0", ] docs = [ - "sphinx>=7.1.0,<8.2.0", + "sphinx>=7.1.0,<10.0", "furo>=2024.1.29", "sphinx-copybutton>=0.5.2", - "myst-parser>=2.0.0", + "myst-parser>=3.0.0", "sphinx-autodoc-typehints>=1.24.0", "linkify-it-py>=2.0.3", ]