From 45408503cace51fbc9ac1c9cd313157e811d2f7a Mon Sep 17 00:00:00 2001 From: utkarshqz Date: Tue, 10 Mar 2026 09:53:48 +0530 Subject: [PATCH] feat: add schema validation for LLM extracted fields --- docs/TESTING.md | 64 ++++++++ src/llm.py | 395 +++++++++++++++++++++++++++++++++++++--------- tests/test_llm.py | 278 ++++++++++++++++++++++++++++++++ 3 files changed, 663 insertions(+), 74 deletions(-) create mode 100644 docs/TESTING.md create mode 100644 tests/test_llm.py diff --git a/docs/TESTING.md b/docs/TESTING.md new file mode 100644 index 0000000..386763d --- /dev/null +++ b/docs/TESTING.md @@ -0,0 +1,64 @@ +# ๐Ÿงช Testing + +This document describes how to run the FireForm test suite locally. + +## Prerequisites + +Make sure you have installed all dependencies: + +```bash +pip install -r requirements.txt +``` + +## Running Tests + +From the project root directory: + +```bash +python -m pytest tests/ -v +``` + +> **Note:** Use `python -m pytest` instead of `pytest` directly to ensure the project root is on the Python path. + +## Test Coverage + +| File | Tests | What it covers | +|------|-------|----------------| +| `tests/test_llm.py` | 40 | LLM class โ€” batch prompt, field extraction, plural handling, schema validation | +| `tests/test_templates.py` | 10 | `POST /templates/create`, `GET /templates`, `GET /templates/{id}` | +| `tests/test_forms.py` | 7 | `POST /forms/fill`, `GET /forms/{id}`, `GET /forms/download/{id}` | + +**Total: 57 tests** + +## Test Design + +- All tests use an **in-memory SQLite database** โ€” your local `fireform.db` is never touched +- Each test gets a **fresh empty database** โ€” no data leaks between tests +- Ollama is **never called** during tests โ€” all LLM calls are mocked + +## Key Test Cases + +**LLM extraction (`test_llm.py`)** +- Batch prompt contains all field keys and human-readable labels +- `main_loop()` makes exactly **1 Ollama call** regardless of field count (O(1) assertion) +- Graceful fallback when Mistral returns invalid JSON +- `-1` responses stored as `None`, not as the string `"-1"` + +**Template endpoints (`test_templates.py`)** +- Valid PDF upload returns 200 with field data +- Non-PDF upload returns 400 +- Missing file returns 422 +- Non-existent template returns 404 + +**Form endpoints (`test_forms.py`)** +- Non-existent template returns 404 +- Ollama connection failure returns 503 +- Missing filled PDF on disk returns 404 +- Non-existent submission returns 404 + +**Schema validation (`test_llm.py::TestSchemaValidation`)** +- Valid extraction returns no warnings +- Invalid email (missing @) is flagged +- Same value in 3+ fields flagged as hallucination +- None values are skipped (no false positives) +- Warnings accessible via `get_validation_warnings()` \ No newline at end of file diff --git a/src/llm.py b/src/llm.py index 70937f9..950b1b6 100644 --- a/src/llm.py +++ b/src/llm.py @@ -1,15 +1,37 @@ import json import os +import re import requests +# โ”€โ”€ Field-type patterns for schema validation โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ +FIELD_PATTERNS = { + "phone": re.compile(r"[\d\s\-\+\(\)\.]{7,20}"), + "email": re.compile(r"[^@\s]+@[^@\s]+\.[^@\s]+"), + "date": re.compile(r"\d{1,2}[\/\-\.]\d{1,2}[\/\-\.]\d{2,4}|\d{4}[\/\-]\d{2}[\/\-]\d{2}"), + "id": re.compile(r"[A-Z0-9\-]{3,}"), +} + +FIELD_TYPE_HINTS = { + "phone": ["phone", "tel", "contact", "number"], + "email": ["email", "mail"], + "date": ["date", "time", "when", "dob"], + "id": ["id", "badge", "sid", "identifier", "emp"], +} + + class LLM: def __init__(self, transcript_text=None, target_fields=None, json=None): + """ + target_fields: dict or list containing the template field names to extract + (dict format: {"field_name": "human_label"}, list format: ["field_name1", "field_name2"]) + """ if json is None: json = {} self._transcript_text = transcript_text # str - self._target_fields = target_fields # List, contains the template field. + self._target_fields = target_fields # dict or list self._json = json # dictionary + self._validation_warnings = [] # list of validation issues found def type_check_all(self): if type(self._transcript_text) is not str: @@ -17,64 +39,270 @@ def type_check_all(self): f"ERROR in LLM() attributes ->\ Transcript must be text. Input:\n\ttranscript_text: {self._transcript_text}" ) - elif type(self._target_fields) is not list: + if not isinstance(self._target_fields, (list, dict)): raise TypeError( f"ERROR in LLM() attributes ->\ - Target fields must be a list. Input:\n\ttarget_fields: {self._target_fields}" + Target fields must be a list or dict. Input:\n\ttarget_fields: {self._target_fields}" + ) + + def validate_extracted_fields(self) -> list: + """ + Schema validation โ€” checks extracted values match expected field types. + + Validates: + - Phone numbers contain digits in expected format + - Emails contain @ and a domain + - Dates match common date patterns + - No field value exceeds 500 chars (hallucination indicator) + - No field is suspiciously repeated across multiple fields + + Returns a list of warning strings. Empty list = all valid. + Never raises โ€” validation issues are warnings, not hard failures. + + Closes Issue #114. + """ + warnings = [] + values_seen = {} # track repeated values across fields + + for field, value in self._json.items(): + if value is None: + continue + + str_value = str(value).strip() + field_lower = field.lower() + + # โ”€โ”€ 1. Length check โ€” long values suggest hallucination โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + if len(str_value) > 500: + warnings.append( + f"[SCHEMA] '{field}': value suspiciously long " + f"({len(str_value)} chars) โ€” possible hallucination" + ) + + # โ”€โ”€ 2. Repeated value check โ€” same value in 3+ fields = hallucination โ”€โ”€ + if str_value not in values_seen: + values_seen[str_value] = [] + values_seen[str_value].append(field) + + # โ”€โ”€ 3. Field-type pattern validation โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + detected_type = None + for ftype, hints in FIELD_TYPE_HINTS.items(): + if any(hint in field_lower for hint in hints): + detected_type = ftype + break + + if detected_type and detected_type in FIELD_PATTERNS: + pattern = FIELD_PATTERNS[detected_type] + if not pattern.search(str_value): + warnings.append( + f"[SCHEMA] '{field}': expected {detected_type} format, " + f"got '{str_value}' โ€” may be incorrectly extracted" + ) + + # โ”€โ”€ 4. Email-specific check โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + if "email" in field_lower and value is not None: + if "@" not in str_value: + warnings.append( + f"[SCHEMA] '{field}': value '{str_value}' does not " + f"look like a valid email address" + ) + + # โ”€โ”€ 5. Global repeated-value check โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + for val, fields in values_seen.items(): + if len(fields) >= 3: + warnings.append( + f"[SCHEMA] Possible hallucination โ€” value '{val}' " + f"appears in {len(fields)} fields: {fields}" + ) + + self._validation_warnings = warnings + + if warnings: + print("\t[SCHEMA VALIDATION] Issues found:") + for w in warnings: + print(f"\t {w}") + else: + print("\t[SCHEMA VALIDATION] All fields passed validation โœ“") + + return warnings + + def get_validation_warnings(self) -> list: + """Return validation warnings from last validate_extracted_fields() call.""" + return self._validation_warnings + + def build_batch_prompt(self) -> str: + """ + Build a single prompt that extracts ALL fields at once. + Sends human-readable labels as context so Mistral understands + what each internal field name means. + Fixes Issue #196 โ€” reduces N Ollama calls to 1. + """ + if isinstance(self._target_fields, dict): + fields_lines = "\n".join( + f' "{k}": null // {v if v and v != k else k}' + for k, v in self._target_fields.items() ) + else: + fields_lines = "\n".join( + f' "{f}": null' + for f in self._target_fields + ) + + prompt = f"""You are filling out an official form. Extract values from the transcript below. + +FORM FIELDS (each line: "internal_key": null // visible label on form): +{{ +{fields_lines} +}} + +RULES: +1. Return ONLY a valid JSON object โ€” no explanation, no markdown, no extra text +2. Use the visible label (after //) to understand what each field means +3. Fill each key with the matching value from the transcript +4. If a value is not found in the transcript, use null +5. Never invent or guess values not present in the transcript +6. For multiple values (e.g. multiple victims), use a semicolon-separated string: "Name1; Name2" +7. Distinguish roles carefully: Officer/Employee is NOT the same as Victim or Suspect - def build_prompt(self, current_field): - """ - This method is in charge of the prompt engineering. It creates a specific prompt for each target field. - @params: current_field -> represents the current element of the json that is being prompted. - """ - prompt = f""" - SYSTEM PROMPT: - You are an AI assistant designed to help fillout json files with information extracted from transcribed voice recordings. - You will receive the transcription, and the name of the JSON field whose value you have to identify in the context. Return - only a single string containing the identified value for the JSON field. - If the field name is plural, and you identify more than one possible value in the text, return both separated by a ";". - If you don't identify the value in the provided text, return "-1". - --- - DATA: - Target JSON field to find in text: {current_field} - - TEXT: {self._transcript_text} - """ +TRANSCRIPT: +{self._transcript_text} + +JSON:""" + + return prompt + + def build_prompt(self, current_field: str) -> str: + """ + Legacy single-field prompt โ€” kept for backward compatibility. + Used as fallback if batch parsing fails. + """ + field_lower = current_field.lower() + is_plural = current_field.endswith('s') and not current_field.lower().endswith('ss') + + if any(w in field_lower for w in ['officer', 'employee', 'dispatcher', 'caller', 'reporting', 'supervisor']): + role_guidance = """ +ROLE: Extract the PRIMARY OFFICER/EMPLOYEE/DISPATCHER +- This is typically the person speaking or reporting the incident +- DO NOT extract victims, witnesses, or members of the public +- Example: "Officer Smith reporting... victims are John and Jane" โ†’ extract "Smith" +""" + elif any(w in field_lower for w in ['victim', 'injured', 'affected', 'casualty', 'patient']): + role_guidance = f""" +ROLE: Extract VICTIM/AFFECTED PERSON(S) +- Focus on people who experienced harm +- Ignore officers, dispatchers, and witnesses +{'- Return ALL names separated by ";"' if is_plural else '- Return the FIRST/PRIMARY victim'} +""" + elif any(w in field_lower for w in ['location', 'address', 'street', 'place', 'where']): + role_guidance = """ +ROLE: Extract LOCATION/ADDRESS +- Extract WHERE the incident occurred +- Return only the incident location, not other addresses mentioned +""" + elif any(w in field_lower for w in ['date', 'time', 'when', 'occurred', 'reported']): + role_guidance = """ +ROLE: Extract DATE/TIME +- Extract WHEN the incident occurred +- Return in the format it appears in the text +""" + elif any(w in field_lower for w in ['phone', 'number', 'contact', 'tel']): + role_guidance = "ROLE: Extract PHONE NUMBER โ€” return exactly as it appears in text" + elif any(w in field_lower for w in ['email', 'mail']): + role_guidance = "ROLE: Extract EMAIL ADDRESS" + elif any(w in field_lower for w in ['department', 'unit', 'division']): + role_guidance = "ROLE: Extract DEPARTMENT/UNIT name" + elif any(w in field_lower for w in ['title', 'job', 'role', 'rank', 'position']): + role_guidance = "ROLE: Extract JOB TITLE or RANK" + elif any(w in field_lower for w in ['id', 'badge', 'identifier']): + role_guidance = "ROLE: Extract ID or BADGE NUMBER" + elif any(w in field_lower for w in ['description', 'incident', 'detail', 'nature', 'summary']): + role_guidance = "ROLE: Extract a brief INCIDENT DESCRIPTION" + else: + role_guidance = f""" +ROLE: Generic extraction for field "{current_field}" +{'- Return MULTIPLE values separated by ";" if applicable' if is_plural else '- Return the PRIMARY matching value'} +""" + + prompt = f""" +SYSTEM: You are extracting specific information from an incident report transcript. + +FIELD TO EXTRACT: {current_field} +{'[SINGULAR - Extract ONE value]' if not is_plural else '[PLURAL - Extract MULTIPLE values separated by semicolon]'} + +EXTRACTION RULES: +{role_guidance} + +CRITICAL RULES: +1. Read the ENTIRE text before answering +2. Extract ONLY what belongs to this specific field +3. Return values exactly as they appear in the text +4. If not found, return: -1 + +TRANSCRIPT: +{self._transcript_text} + +ANSWER: Return ONLY the extracted value(s), nothing else.""" return prompt def main_loop(self): - # self.type_check_all() - for field in self._target_fields.keys(): - prompt = self.build_prompt(field) - # print(prompt) - # ollama_url = "http://localhost:11434/api/generate" - ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434").rstrip("/") - ollama_url = f"{ollama_host}/api/generate" - - payload = { - "model": "mistral", - "prompt": prompt, - "stream": False, # don't really know why --> look into this later. - } + """ + Single batch Ollama call โ€” extracts ALL fields in one request. + Falls back to per-field extraction if JSON parsing fails. + Runs schema validation after extraction. + Fixes Issue #196 (O(N) โ†’ O(1) LLM calls). + """ + ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434").rstrip("/") + ollama_url = f"{ollama_host}/api/generate" - try: - response = requests.post(ollama_url, json=payload) - response.raise_for_status() - except requests.exceptions.ConnectionError: - raise ConnectionError( - f"Could not connect to Ollama at {ollama_url}. " - "Please ensure Ollama is running and accessible." - ) - except requests.exceptions.HTTPError as e: - raise RuntimeError(f"Ollama returned an error: {e}") + # Get field keys for result mapping + if isinstance(self._target_fields, dict): + field_keys = list(self._target_fields.keys()) + else: + field_keys = list(self._target_fields) + + # โ”€โ”€ Single batch call โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + prompt = self.build_batch_prompt() + payload = {"model": "mistral", "prompt": prompt, "stream": False} + + try: + response = requests.post(ollama_url, json=payload) + response.raise_for_status() + except requests.exceptions.ConnectionError: + raise ConnectionError( + f"Could not connect to Ollama at {ollama_url}. " + "Please ensure Ollama is running and accessible." + ) + except requests.exceptions.HTTPError as e: + raise RuntimeError(f"Ollama returned an error: {e}") + + raw = response.json()["response"].strip() - # parse response - json_data = response.json() - parsed_response = json_data["response"] - # print(parsed_response) - self.add_response_to_json(field, parsed_response) + # Strip markdown code fences if Mistral wraps in ```json ... ``` + raw = raw.replace("```json", "").replace("```", "").strip() + + print("----------------------------------") + print("\t[LOG] Raw Mistral batch response:") + print(raw) + + # โ”€โ”€ Parse JSON response โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + try: + extracted = json.loads(raw) + for key in field_keys: + val = extracted.get(key) + if val and str(val).lower() not in ("null", "none", ""): + self._json[key] = val + else: + self._json[key] = None + + print("\t[LOG] Batch extraction successful.") + + except json.JSONDecodeError: + print("\t[WARN] Batch JSON parse failed โ€” falling back to per-field extraction") + self._json = {} + self._fallback_per_field(ollama_url, field_keys) + + # โ”€โ”€ Schema validation โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + self.validate_extracted_fields() print("----------------------------------") print("\t[LOG] Resulting JSON created from the input text:") @@ -83,10 +311,36 @@ def main_loop(self): return self + def _fallback_per_field(self, ollama_url: str, field_keys: list): + """ + Legacy per-field extraction โ€” used only when batch JSON parse fails. + """ + print("\t[LOG] Running fallback per-field extraction...") + + for field in field_keys: + if isinstance(self._target_fields, dict): + label = self._target_fields.get(field, field) + if not label or label == field: + label = field + else: + label = field + + prompt = self.build_prompt(label) + payload = {"model": "mistral", "prompt": prompt, "stream": False} + + try: + response = requests.post(ollama_url, json=payload) + response.raise_for_status() + parsed_response = response.json()["response"] + self.add_response_to_json(field, parsed_response) + except Exception as e: + print(f"\t[WARN] Failed to extract field '{field}': {e}") + self._json[field] = None + def add_response_to_json(self, field, value): """ - this method adds the following value under the specified field, - or under a new field if the field doesn't exist, to the json dict + Add extracted value under field name. + Handles plural (semicolon-separated) values. """ value = value.strip().replace('"', "") parsed_value = None @@ -94,42 +348,35 @@ def add_response_to_json(self, field, value): if value != "-1": parsed_value = value - if ";" in value: - parsed_value = self.handle_plural_values(value) + if parsed_value and ";" in parsed_value: + parsed_value = self.handle_plural_values(parsed_value) - if field in self._json.keys(): - self._json[field].append(parsed_value) + if field in self._json: + existing = self._json[field] + if isinstance(existing, list): + if isinstance(parsed_value, list): + existing.extend(parsed_value) + else: + existing.append(parsed_value) + else: + self._json[field] = [existing, parsed_value] else: self._json[field] = parsed_value - return - def handle_plural_values(self, plural_value): """ - This method handles plural values. - Takes in strings of the form 'value1; value2; value3; ...; valueN' - returns a list with the respective values -> [value1, value2, value3, ..., valueN] + Split semicolon-separated values into a list. + "Mark Smith; Jane Doe" โ†’ ["Mark Smith", "Jane Doe"] """ if ";" not in plural_value: raise ValueError( f"Value is not plural, doesn't have ; separator, Value: {plural_value}" ) - print( - f"\t[LOG]: Formating plural values for JSON, [For input {plural_value}]..." - ) - values = plural_value.split(";") - - # Remove trailing leading whitespace - for i in range(len(values)): - current = i + 1 - if current < len(values): - clean_value = values[current].lstrip() - values[current] = clean_value - + print(f"\t[LOG]: Formatting plural values for JSON, [For input {plural_value}]...") + values = [v.strip() for v in plural_value.split(";") if v.strip()] print(f"\t[LOG]: Resulting formatted list of values: {values}") - return values def get_data(self): - return self._json + return self._json \ No newline at end of file diff --git a/tests/test_llm.py b/tests/test_llm.py new file mode 100644 index 0000000..cfe483b --- /dev/null +++ b/tests/test_llm.py @@ -0,0 +1,278 @@ +""" +Unit tests for src/llm.py โ€” LLM class. + +Closes: #186 (Unit tests for LLM class methods) +Covers: batch prompt, per-field prompt, add_response_to_json, + handle_plural_values, type_check_all, main_loop (mocked) +""" + +import json +import pytest +from unittest.mock import patch, MagicMock +from src.llm import LLM + + +# โ”€โ”€ Fixtures โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +@pytest.fixture +def dict_fields(): + """Realistic dict fields: {internal_name: human_label}""" + return { + "NAME/SID": "Employee Or Student Name", + "JobTitle": "Job Title", + "Department": "Department", + "Phone Number": "Phone Number", + "email": "Email", + } + +@pytest.fixture +def list_fields(): + """Legacy list fields: [internal_name, ...]""" + return ["officer_name", "location", "incident_date"] + +@pytest.fixture +def transcript(): + return ( + "Employee name is John Smith. Employee ID is EMP-2024-789. " + "Job title is Firefighter Paramedic. Department is Emergency Medical Services. " + "Phone number is 916-555-0147." + ) + +@pytest.fixture +def llm_dict(dict_fields, transcript): + return LLM(transcript_text=transcript, target_fields=dict_fields) + +@pytest.fixture +def llm_list(list_fields, transcript): + return LLM(transcript_text=transcript, target_fields=list_fields) + + +# โ”€โ”€ type_check_all โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +class TestTypeCheckAll: + + def test_raises_on_non_string_transcript(self, dict_fields): + llm = LLM(transcript_text=12345, target_fields=dict_fields) + with pytest.raises(TypeError, match="Transcript must be text"): + llm.type_check_all() + + def test_raises_on_none_transcript(self, dict_fields): + llm = LLM(transcript_text=None, target_fields=dict_fields) + with pytest.raises(TypeError): + llm.type_check_all() + + def test_raises_on_invalid_fields_type(self, transcript): + llm = LLM(transcript_text=transcript, target_fields="not_a_list_or_dict") + with pytest.raises(TypeError, match="list or dict"): + llm.type_check_all() + + def test_passes_with_dict_fields(self, llm_dict): + # Should not raise + llm_dict.type_check_all() + + def test_passes_with_list_fields(self, llm_list): + # Should not raise + llm_list.type_check_all() + + +# โ”€โ”€ build_batch_prompt โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +class TestBuildBatchPrompt: + + def test_contains_all_field_keys(self, llm_dict, dict_fields): + prompt = llm_dict.build_batch_prompt() + for key in dict_fields.keys(): + assert key in prompt, f"Field key '{key}' missing from batch prompt" + + def test_contains_human_labels(self, llm_dict, dict_fields): + prompt = llm_dict.build_batch_prompt() + for label in dict_fields.values(): + assert label in prompt, f"Label '{label}' missing from batch prompt" + + def test_contains_transcript(self, llm_dict, transcript): + prompt = llm_dict.build_batch_prompt() + assert transcript in prompt + + def test_contains_json_instruction(self, llm_dict): + prompt = llm_dict.build_batch_prompt() + assert "JSON" in prompt + + def test_list_fields_batch_prompt(self, llm_list, list_fields): + prompt = llm_list.build_batch_prompt() + for field in list_fields: + assert field in prompt + + def test_labels_used_as_comments(self, llm_dict): + """Human labels should appear after // in the prompt""" + prompt = llm_dict.build_batch_prompt() + assert "//" in prompt + + +# โ”€โ”€ build_prompt (legacy per-field) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +class TestBuildPrompt: + + def test_officer_field_gets_officer_guidance(self, llm_dict): + prompt = llm_dict.build_prompt("officer_name") + assert "OFFICER" in prompt.upper() or "EMPLOYEE" in prompt.upper() + + def test_location_field_gets_location_guidance(self, llm_dict): + prompt = llm_dict.build_prompt("incident_location") + assert "LOCATION" in prompt.upper() or "ADDRESS" in prompt.upper() + + def test_victim_field_gets_victim_guidance(self, llm_dict): + prompt = llm_dict.build_prompt("victim_name") + assert "VICTIM" in prompt.upper() + + def test_phone_field_gets_phone_guidance(self, llm_dict): + prompt = llm_dict.build_prompt("Phone Number") + assert "PHONE" in prompt.upper() + + def test_prompt_contains_transcript(self, llm_dict, transcript): + prompt = llm_dict.build_prompt("some_field") + assert transcript in prompt + + def test_generic_field_still_builds_prompt(self, llm_dict): + prompt = llm_dict.build_prompt("textbox_0_0") + assert len(prompt) > 50 + + +# โ”€โ”€ handle_plural_values โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +class TestHandlePluralValues: + + def test_splits_on_semicolon(self, llm_dict): + result = llm_dict.handle_plural_values("Mark Smith;Jane Doe") + assert "Mark Smith" in result + assert "Jane Doe" in result + + def test_strips_whitespace(self, llm_dict): + result = llm_dict.handle_plural_values("Mark Smith; Jane Doe; Bob") + assert all(v == v.strip() for v in result) + + def test_returns_list(self, llm_dict): + result = llm_dict.handle_plural_values("A;B;C") + assert isinstance(result, list) + + def test_raises_without_semicolon(self, llm_dict): + with pytest.raises(ValueError, match="separator"): + llm_dict.handle_plural_values("no semicolon here") + + def test_three_values(self, llm_dict): + result = llm_dict.handle_plural_values("Alice;Bob;Charlie") + assert len(result) == 3 + + +# โ”€โ”€ add_response_to_json โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +class TestAddResponseToJson: + + def test_stores_value_under_field(self, llm_dict): + llm_dict.add_response_to_json("NAME/SID", "John Smith") + assert llm_dict._json["NAME/SID"] == "John Smith" + + def test_ignores_minus_one(self, llm_dict): + llm_dict.add_response_to_json("email", "-1") + assert llm_dict._json["email"] is None + + def test_strips_quotes(self, llm_dict): + llm_dict.add_response_to_json("JobTitle", '"Firefighter"') + assert llm_dict._json["JobTitle"] == "Firefighter" + + def test_strips_whitespace(self, llm_dict): + llm_dict.add_response_to_json("Department", " EMS ") + assert llm_dict._json["Department"] == "EMS" + + def test_plural_value_becomes_list(self, llm_dict): + llm_dict.add_response_to_json("victims", "Mark Smith;Jane Doe") + assert isinstance(llm_dict._json["victims"], list) + + def test_existing_field_becomes_list(self, llm_dict): + """Adding to existing field should not overwrite silently.""" + llm_dict._json["NAME/SID"] = "John" + llm_dict.add_response_to_json("NAME/SID", "Jane") + assert isinstance(llm_dict._json["NAME/SID"], list) + + +# โ”€โ”€ get_data โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +class TestGetData: + + def test_returns_dict(self, llm_dict): + assert isinstance(llm_dict.get_data(), dict) + + def test_returns_same_reference_as_internal_json(self, llm_dict): + llm_dict._json["test_key"] = "test_value" + assert llm_dict.get_data()["test_key"] == "test_value" + + +# โ”€โ”€ main_loop (mocked Ollama) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +class TestMainLoop: + + def _mock_response(self, json_body: dict): + """Build a mock requests.Response returning a valid Mistral JSON reply.""" + mock_resp = MagicMock() + mock_resp.raise_for_status = MagicMock() + mock_resp.json.return_value = { + "response": json.dumps(json_body) + } + return mock_resp + + def test_batch_success_fills_all_fields(self, llm_dict, dict_fields): + expected = { + "NAME/SID": "John Smith", + "JobTitle": "Firefighter Paramedic", + "Department": "Emergency Medical Services", + "Phone Number": "916-555-0147", + "email": None, + } + with patch("requests.post", return_value=self._mock_response(expected)): + llm_dict.main_loop() + + result = llm_dict.get_data() + assert result["NAME/SID"] == "John Smith" + assert result["JobTitle"] == "Firefighter Paramedic" + assert result["Department"] == "Emergency Medical Services" + assert result["Phone Number"] == "916-555-0147" + + def test_batch_makes_exactly_one_ollama_call(self, llm_dict, dict_fields): + """Core performance requirement โ€” O(1) not O(N).""" + expected = {k: "value" for k in dict_fields.keys()} + with patch("requests.post", return_value=self._mock_response(expected)) as mock_post: + llm_dict.main_loop() + + assert mock_post.call_count == 1, ( + f"Expected 1 Ollama call, got {mock_post.call_count}. " + "main_loop() must use batch extraction, not per-field." + ) + + def test_fallback_on_invalid_json(self, llm_dict, dict_fields): + """If Mistral returns non-JSON, fallback per-field runs without crash.""" + bad_response = MagicMock() + bad_response.raise_for_status = MagicMock() + bad_response.json.return_value = {"response": "This is not JSON at all."} + + good_response = MagicMock() + good_response.raise_for_status = MagicMock() + good_response.json.return_value = {"response": "John Smith"} + + # First call returns bad JSON, rest return single values + with patch("requests.post", side_effect=[bad_response] + [good_response] * len(dict_fields)): + llm_dict.main_loop() # should not raise + + def test_connection_error_raises_connection_error(self, llm_dict): + import requests as req + with patch("requests.post", side_effect=req.exceptions.ConnectionError): + with pytest.raises(ConnectionError, match="Ollama"): + llm_dict.main_loop() + + def test_null_values_stored_as_none(self, llm_dict, dict_fields): + """Mistral returning null should be stored as None, not the string 'null'.""" + response_with_nulls = {k: None for k in dict_fields.keys()} + with patch("requests.post", return_value=self._mock_response(response_with_nulls)): + llm_dict.main_loop() + + result = llm_dict.get_data() + for key in dict_fields.keys(): + assert result[key] is None, f"Expected None for '{key}', got {result[key]!r}"