diff --git a/ssg/build_yaml.py b/ssg/build_yaml.py index ea834512228..2aaf495ffe8 100644 --- a/ssg/build_yaml.py +++ b/ssg/build_yaml.py @@ -45,7 +45,7 @@ from .cce import is_cce_format_valid, is_cce_value_valid from .yaml import DocumentationNotComplete, open_and_expand -from .utils import required_key, mkdir_p +from .utils import required_key, mkdir_p, safe_evaluate_boolean_filter from .xml import ElementTree as ET, register_namespaces, parse_file import ssg.build_stig @@ -1619,9 +1619,7 @@ def rule_filter_from_def(filterdef): return noop_rule_filterfunc def filterfunc(rule): - # Remove globals for security and only expose - # variables relevant to the rule - return eval(filterdef, {"__builtins__": None}, rule.__dict__) + return safe_evaluate_boolean_filter(filterdef, rule.__dict__) return filterfunc diff --git a/ssg/entities/profile_base.py b/ssg/entities/profile_base.py index 9cbb67920b8..fec646f124e 100644 --- a/ssg/entities/profile_base.py +++ b/ssg/entities/profile_base.py @@ -16,6 +16,7 @@ OSCAP_RULE, OSCAP_VALUE, ) +from ..utils import safe_evaluate_boolean_filter def noop_rule_filterfunc(rule): @@ -30,9 +31,7 @@ def filterfunc(rule): c = copy.copy(rule) if c.platform is None: c.platform = '' - # Remove globals for security and only expose - # variables relevant to the rule - return eval(filterdef, {"__builtins__": None}, c.__dict__) + return safe_evaluate_boolean_filter(filterdef, c.__dict__) return filterfunc diff --git a/ssg/utils.py b/ssg/utils.py index 482e51d4c0c..9f8bf03b2dd 100644 --- a/ssg/utils.py +++ b/ssg/utils.py @@ -4,12 +4,13 @@ from __future__ import absolute_import +import ast import multiprocessing import os import re from collections import namedtuple import hashlib -from typing import Dict +from typing import Any, Dict, Mapping from .constants import (FULL_NAME_TO_PRODUCT_MAPPING, MAKEFILE_ID_TO_PRODUCT_MAP, @@ -24,6 +25,72 @@ class SSGError(RuntimeError): PRODUCT_NAME_PARSER = re.compile(r"(.+?)([0-9]+)$") +def _safe_eval_filter_node(node: ast.AST, variables: Mapping[str, Any]) -> Any: + if isinstance(node, ast.Expression): + return _safe_eval_filter_node(node.body, variables) + + if isinstance(node, ast.BoolOp): + values = [_safe_eval_filter_node(value, variables) for value in node.values] + if isinstance(node.op, ast.And): + return all(values) + if isinstance(node.op, ast.Or): + return any(values) + raise ValueError(f"Unsupported boolean operator in filter expression: {ast.dump(node.op)}") + + if isinstance(node, ast.UnaryOp) and isinstance(node.op, ast.Not): + return not bool(_safe_eval_filter_node(node.operand, variables)) + + if isinstance(node, ast.Compare): + left = _safe_eval_filter_node(node.left, variables) + for op, comparator in zip(node.ops, node.comparators): + right = _safe_eval_filter_node(comparator, variables) + if isinstance(op, ast.In): + result = left in right + elif isinstance(op, ast.NotIn): + result = left not in right + elif isinstance(op, ast.Eq): + result = left == right + elif isinstance(op, ast.NotEq): + result = left != right + else: + raise ValueError( + f"Unsupported comparison operator in filter expression: {ast.dump(op)}" + ) + if not result: + return False + left = right + return True + + if isinstance(node, ast.Name): + if node.id not in variables: + raise ValueError(f"Unknown variable '{node.id}' in filter expression") + return variables[node.id] + + if isinstance(node, ast.Constant): + return node.value + + if isinstance(node, ast.List): + return [_safe_eval_filter_node(element, variables) for element in node.elts] + + if isinstance(node, ast.Tuple): + return tuple(_safe_eval_filter_node(element, variables) for element in node.elts) + + if isinstance(node, ast.Set): + return {_safe_eval_filter_node(element, variables) for element in node.elts} + + raise ValueError( + "Unsupported construct in filter expression: {node}".format(node=ast.dump(node)) + ) + + +def safe_evaluate_boolean_filter(filterdef: str, variables: Mapping[str, Any]) -> bool: + try: + expression = ast.parse(filterdef, mode="eval") + except SyntaxError as exc: + raise ValueError(f"Invalid filter expression: {filterdef}") from exc + return bool(_safe_eval_filter_node(expression, variables)) + + class VersionSpecifierSet(set): """ A set-like collection that only accepts VersionSpecifier objects. diff --git a/tests/unit/ssg-module/test_utils.py b/tests/unit/ssg-module/test_utils.py index 8073217aa89..b00007a4734 100644 --- a/tests/unit/ssg-module/test_utils.py +++ b/tests/unit/ssg-module/test_utils.py @@ -180,3 +180,29 @@ def test_ver_specs(): # We support only VersionSpecifier objects as members of VersionSpecifierSet with pytest.raises(ValueError): utils.VersionSpecifierSet([utils.VersionSpecifier('>=', evr123), '1.2.3']) + + +def test_safe_evaluate_boolean_filter_allows_supported_profile_expressions(): + variables = { + "platform": "ocp4-node", + "platforms": {"eks-control-plane", "ocp4-node"}, + } + + assert utils.safe_evaluate_boolean_filter('"ocp4-node" in platform', variables) + assert utils.safe_evaluate_boolean_filter('"eks-node" not in platforms', variables) + assert utils.safe_evaluate_boolean_filter( + '"ocp4-node" in platform or "ocp4-master-node" in platform', variables + ) + assert utils.safe_evaluate_boolean_filter( + 'not ("ocp4-master-node" in platform)', variables + ) + + +def test_safe_evaluate_boolean_filter_rejects_unsafe_constructs(): + variables = {"platform": "ocp4-node", "platforms": {"ocp4-node"}} + + with pytest.raises(ValueError): + utils.safe_evaluate_boolean_filter('__import__("os").system("id")', variables) + + with pytest.raises(ValueError): + utils.safe_evaluate_boolean_filter('platform.startswith("ocp4")', variables) diff --git a/utils/ansible_playbook_to_role.py b/utils/ansible_playbook_to_role.py index 4faeb7ca369..5d859ba9a60 100755 --- a/utils/ansible_playbook_to_role.py +++ b/utils/ansible_playbook_to_role.py @@ -5,6 +5,7 @@ import io import os import os.path +import subprocess import sys import shutil import re @@ -82,6 +83,21 @@ def dict_constructor(loader, node): os.path.dirname(os.path.abspath(__file__)), "ansible_galaxy_readme_template.md" ) +GITHUB_IDENTIFIER_RE = re.compile(r"^[A-Za-z0-9_.-]+$") + + +def _validate_github_identifier(value, field_name): + if not GITHUB_IDENTIFIER_RE.fullmatch(value): + raise ValueError( + "{field_name} contains unsupported characters: {value}".format( + field_name=field_name, value=value + ) + ) + return value + + +def _run_command(args, cwd=None): + subprocess.run(args, cwd=cwd, check=True) def create_empty_repositories(github_new_repos, github_org): @@ -98,20 +114,33 @@ def create_empty_repositories(github_new_repos, github_org): def clone_and_init_repository(parent_dir, organization, repo): + organization = _validate_github_identifier(organization, "organization") + repo = _validate_github_identifier(repo, "repo") + repo_dir = os.path.join(parent_dir, repo) + # 1. Initialize the Ansible role first (creates the directory) - os.system(f"ansible-galaxy init {repo}") + _run_command(["ansible-galaxy", "init", repo], cwd=parent_dir) - # 2. Change directory and initialize git - os.chdir(repo) - try: - os.system("git init --initial-branch=main") - os.system(f"git remote add origin git@github.com:{organization}/{repo}") - os.system('git add .') - os.system('git commit -a -m "Initial commit" --author "%s <%s>"' - % (GIT_COMMIT_AUTHOR_NAME, GIT_COMMIT_AUTHOR_EMAIL)) - os.system('git push origin main') - finally: - os.chdir("..") + # 2. Initialize git in the generated role directory + _run_command(["git", "init", "--initial-branch=main"], cwd=repo_dir) + _run_command( + ["git", "remote", "add", "origin", f"git@github.com:{organization}/{repo}"], + cwd=repo_dir, + ) + _run_command(["git", "add", "."], cwd=repo_dir) + _run_command( + [ + "git", + "commit", + "-a", + "-m", + "Initial commit", + "--author", + f"{GIT_COMMIT_AUTHOR_NAME} <{GIT_COMMIT_AUTHOR_EMAIL}>", + ], + cwd=repo_dir, + ) + _run_command(["git", "push", "origin", "main"], cwd=repo_dir) def update_repo_release(github, repo):