Skip to content
266 changes: 266 additions & 0 deletions script/abfile_inspect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
#!/usr/bin/env python3
import argparse
import base64
import json
import os
import re
import sys
from pathlib import Path
from typing import Iterator, List, Optional, Tuple


def base64url_decode_to_bytes(b64url_string: str) -> bytes:
padding = "=" * (-len(b64url_string) % 4)
return base64.urlsafe_b64decode(b64url_string + padding)


def base64url_decode_str(b64url_string: str) -> str:
return base64url_decode_to_bytes(b64url_string).decode("utf-8")


def _decoded_len_b64url(seg: str) -> int:
padding = "=" * (-len(seg) % 4)
return len(base64.urlsafe_b64decode(seg + padding))


def try_decode_filename(name: str) -> Optional[str]:
try:
return base64url_decode_str(name)
except Exception:
return None


_B64URL_RE = re.compile(r"^[A-Za-z0-9_-]*$") # empty allowed (for detached JWS payload)


def _is_b64url(s: str) -> bool:
return bool(_B64URL_RE.fullmatch(s))


def _decode_json_segment(segment: str) -> Optional[dict]:
try:
return json.loads(base64url_decode_to_bytes(segment).decode("utf-8"))
except Exception:
return None


def _classify_compact_jwt(compact: str) -> Tuple[Optional[dict], Optional[str]]:
"""
Returns (parsed_obj, type_label) for compact JOSE forms:
- JWS (3 parts): {"jws": {"header": {...}, "payload": <obj|str>, "sig_len": int, "detached": bool}}
- JWE (5 parts): {"jwe": {"protected": {...}, "parts": 5, "note": "..."}}
If not JWT-like, returns (None, None).
"""
s = compact.strip()
if not s:
return None, None

parts = s.split(".")
if len(parts) not in (3, 5):
return None, None

# All parts must be base64url (payload may be empty for detached JWS)
if not all(_is_b64url(p) for p in parts):
return None, None

# Decode protected header
header = _decode_json_segment(parts[0])
if header is None:
return None, None

typ = header.get("typ")

if len(parts) == 3:
# JWS: header SHOULD have "alg"
if "alg" not in header:
return None, None

detached = parts[1] == ""
payload_obj = None if detached else _decode_json_segment(parts[1])
payload_repr = (
payload_obj
if payload_obj is not None
else ("[detached]" if detached else "[non-JSON payload]")
)

try:
sig_len = _decoded_len_b64url(parts[2])
except Exception:
return None, None

label = (
f"JWT (JWS{', typ='+typ if typ else ''}{', detached' if detached else ''})"
)
return {
"jws": {
"header": header,
"payload": payload_repr,
"sig_len": sig_len,
"detached": detached,
}
}, label

# len(parts) == 5 -> JWE: header SHOULD have "enc"
if "enc" not in header:
return None, None

label = f"JWT (JWE{', typ='+typ if typ else ''})"
return {
"jwe": {
"protected": header,
"parts": 5,
"note": "Compact JWE; content not decrypted or verified.",
}
}, label


def parse_content_text(text: str) -> Tuple[dict, str]:
"""
Decide between JSON vs Compact JWT vs Other. No signature verification.
JSON is attempted first. If the JSON is a string that looks like compact JOSE,
classify that token instead.
Returns (parsed_object, label).
"""
s = text.strip()
# Try full JSON first
try:
obj = json.loads(s)
except json.JSONDecodeError:
obj = None

if obj is not None:
# If content is a JSON *string* holding a compact JOSE value, decode it.
if isinstance(obj, str):
parsed, label = _classify_compact_jwt(obj)
if parsed is not None:
return parsed, f"{label} (wrapped in JSON string)"
return obj, "JSON"

# Not JSON. Try compact JOSE next.
parsed, label = _classify_compact_jwt(s)
if parsed is not None:
return parsed, label

# 3) Fallback
return {"error": "Unrecognized content (not JSON and not compact JWT)."}, "Other"


def iter_entity_files(root: Path) -> Iterator[Tuple[Path, str]]:
"""
Yield (file_path, decoded_label) for files whose basename is base64url-decodable.
Skips files ending in '.lock'.
"""
if root.is_file():
name = root.name
if name.endswith(".lock"):
return
decoded = try_decode_filename(name)
if decoded is not None:
yield root, decoded
return

if root.is_dir():
for dirpath, _, filenames in os.walk(root, followlinks=False):
for fname in filenames:
if fname.endswith(".lock"):
continue
decoded = try_decode_filename(fname)
if decoded is None:
continue
p = Path(dirpath) / fname
if p.is_file():
yield p, decoded


def list_files_only(files: List[Tuple[Path, str]]) -> None:
for p, decoded in sorted(files, key=lambda x: str(x[0])):
print(f"{str(p)} -> {decoded}")


def describe_file(path: Path, decoded_label: str, max_bytes: int) -> None:
print(f"Path: {path}")
print(f" Decoded label: {decoded_label}")

try:
size = path.stat().st_size
except Exception:
size = None
print(f" Size: {size} bytes" if size is not None else " Size: [unknown]")

# Read up to max_bytes for safety
try:
with path.open("rb") as f:
data = f.read(max_bytes)
truncated = size is not None and size > max_bytes
text = data.decode("utf-8", errors="replace")
parsed, fmt = parse_content_text(text)
print(f" Parsed as: {fmt}")
if truncated:
print(f" [NOTE] Content truncated to --max-bytes={max_bytes} for safety.")
print(json.dumps(parsed, indent=2, ensure_ascii=False))
except Exception as e:
print(" Parsed as: Other")
print(json.dumps({"error": str(e)}, indent=2))
print("-" * 80)


def main():
parser = argparse.ArgumentParser(
description=(
"List and inspect AbstractFileSystem files stored on the local filesystem."
"A file is included if its basename is base64url-decodable to UTF-8."
"False positives are expected.To reduce noise, scope the input directory "
"at the shell level or use --list and post-filter with standard tools like grep, awk, or jq."
)
)
parser.add_argument(
"path",
nargs="?",
default=".",
help="Path to scan or inspect (file or directory)",
)
parser.add_argument(
"--list",
action="store_true",
help="Only list base64url-decodable filenames with their decoded labels",
)
parser.add_argument(
"--max-bytes",
type=int,
default=1_000_000,
help="Max bytes to read from a file for inspection (default: 1MB)",
)
args = parser.parse_args()

root = Path(args.path)

if not (root.exists() or root.is_file()):
print(f"Path does not exist or is not accessible: {args.path}", file=sys.stderr)
sys.exit(1)

if root.is_file():
name = root.name
if name.endswith(".lock"):
print(f"Skipping lock file: {str(root)}", file=sys.stderr)
sys.exit(1)
decoded = try_decode_filename(name)
if decoded is None:
print(f"Not a base64url-decodable filename: {str(root)}", file=sys.stderr)
sys.exit(1)
if args.list:
list_files_only([(root, decoded)])
else:
describe_file(root, decoded, args.max_bytes)
return

entries = list(iter_entity_files(root))
if args.list:
list_files_only(entries)
else:
print(f"Scanning for base64url-decodable filenames under: {args.path}\n")
for p, decoded in sorted(entries, key=lambda x: str(x[0])):
describe_file(p, decoded, args.max_bytes)


if __name__ == "__main__":
main()
55 changes: 55 additions & 0 deletions script/create_trust_mark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/usr/bin/env python3
import argparse
import os
from pathlib import Path
import contextlib

from idpyoidc.util import load_config_file

from fedservice.utils import \
make_federation_entity # type: ignore[attr-defined]


@contextlib.contextmanager
def pushd(new_dir: Path):
prev = Path.cwd()
os.chdir(new_dir)
try:
yield
finally:
os.chdir(prev)


def main() -> int:
p = argparse.ArgumentParser(
description="Create a Trust Mark with a fedservice Trust Mark Issuer.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p.add_argument("-d", "--dir_name", required=True, help="Directory containing conf.json")
p.add_argument("-e", "--entity_id", required=True, help="Subject entity_id the Trust Mark applies to")
p.add_argument("-m", "--trust_mark_id", required=True, help="Trust Mark type identifier (URI)")
p.add_argument("--base-dir", help="Working directory to resolve relative paths in conf.json. "
"Defaults to the directory that contains conf.json.")
p.add_argument("-o", "--out", help="Write JWT to this file instead of stdout")
args = p.parse_args()

conf_path = Path(args.dir_name, "conf.json").resolve()
base_dir = Path(args.base_dir).resolve() if args.base_dir else Path(args.dir_name).resolve().parent

print(f"DIR: {base_dir}")
with pushd(base_dir):
cnf = load_config_file(str(conf_path))
fe = make_federation_entity(**cnf["entity"])

# Trust Mark Issuer interface (handle both layouts)
tmi = getattr(fe, "trust_mark_entity", None) or getattr(fe, "server").trust_mark_entity
jwt_compact = tmi.create_trust_mark(args.trust_mark_id, args.entity_id)

if args.out:
Path(args.out).write_text(jwt_compact + "\n", encoding="utf-8")
else:
print(jwt_compact)


if __name__ == "__main__":
main()
66 changes: 66 additions & 0 deletions script/entity_configuration_to_abfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/env python3
import argparse
import json
import sys

from idpyoidc.storage.abfile import AbstractFileSystem


def parse_args():
parser = argparse.ArgumentParser(description="Load Entity Configuration into an AbstractFileSystem store.")
parser.add_argument("-s", "--source", required=True, help="Path to JSON file or '-' for stdin")
parser.add_argument("-t", "--target", required=True, help="AbstractFilesystem target directory")
parser.add_argument("-r", "--trust_anchor_info", action="store_true", help="Store Trust Anchor")
parser.add_argument("-u", "--subordinate_info", action="store_true", help="Store Subordinate info")

return parser.parse_args()


def read_json(source: str):
if source == "-":
return json.load(sys.stdin)
with open(source, "r", encoding="utf-8") as fp:
return json.load(fp)


def main():
args = parse_args()

entity_configuration = read_json(args.source)

info = None
if args.trust_anchor_info and not args.subordinate_info:
info = {entity_configuration["sub"]: entity_configuration["jwks"]}
elif args.subordinate_info and not args.trust_anchor_info:
_sub_info = {
"entity_types": list(entity_configuration["metadata"].keys()),
"jwks": entity_configuration["jwks"],
}
# Publishing the list endpoint makes this an intermediate
if (
"federation_list_endpoint"
in entity_configuration["metadata"]["federation_entity"]
):
_sub_info["intermediate"] = True
info = {entity_configuration["sub"]: _sub_info}
elif args.subordinate_info and args.trust_anchor_info:
print("You can only do one at the time!!")
else:
print("What do you expect me to do ??")

store = AbstractFileSystem(
fdir=args.target,
key_conv="idpyoidc.util.Base64",
value_conv="idpyoidc.util.JSON",
)

wrote = 0
for key, val in info.items():
store[key] = val
wrote += 1

print(f"Done. {wrote} entr{'y' if wrote == 1 else 'ies'} written to {args.target}.")


if __name__ == "__main__":
main()
Loading
Loading