Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pull-model:
docker compose exec ollama ollama pull mistral

test:
docker compose exec app python3 -m pytest src/test/
docker compose exec app python3 -m pytest tests/ -v

clean:
docker compose down -v
Expand Down
49 changes: 49 additions & 0 deletions api/db/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from sqlmodel import SQLModel, Field
from sqlalchemy import Column, JSON
from datetime import datetime
import uuid


class Template(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
Expand All @@ -15,4 +17,51 @@ class FormSubmission(SQLModel, table=True):
template_id: int
input_text: str
output_pdf_path: str
created_at: datetime = Field(default_factory=datetime.utcnow)


class FillJob(SQLModel, table=True):
"""
Tracks an asynchronous form-fill job submitted via POST /forms/fill/async.
Clients poll GET /forms/jobs/{id} to check status and retrieve results.
"""
id: str = Field(default_factory=lambda: str(uuid.uuid4()), primary_key=True)
status: str = Field(default="pending") # pending | running | complete | failed
template_id: int
input_text: str
output_pdf_path: str | None = None
partial_results: dict | None = Field(default=None, sa_column=Column(JSON))
field_confidence: dict | None = Field(default=None, sa_column=Column(JSON))
error_message: str | None = None
created_at: datetime = Field(default_factory=datetime.utcnow)


class BatchSubmission(SQLModel, table=True):
"""
Tracks a multi-template batch fill submitted via POST /forms/fill/batch.

A single BatchSubmission represents one incident transcript filled into
N agency forms simultaneously using a single canonical LLM extraction pass.
The canonical_extraction JSON column stores the full per-field evidence
record (value + verbatim transcript quote + confidence) for audit purposes.

submission_ids links to the individual FormSubmission records created for
each template so clients can retrieve per-template output PDF paths.
errors stores per-template error messages for partial failure cases.
"""
id: str = Field(default_factory=lambda: str(uuid.uuid4()), primary_key=True)
status: str = Field(default="complete") # complete | partial | failed
input_text: str
# Full canonical extraction: category -> {value, evidence, confidence}
canonical_extraction: dict | None = Field(default=None, sa_column=Column(JSON))
# Evidence report: only categories with non-null extracted values
evidence_report: dict | None = Field(default=None, sa_column=Column(JSON))
# List of template IDs that were requested
template_ids: list = Field(sa_column=Column(JSON))
# List of FormSubmission integer IDs created (one per template)
submission_ids: list | None = Field(default=None, sa_column=Column(JSON))
# Per-template output paths: {template_id: output_pdf_path}
output_paths: dict | None = Field(default=None, sa_column=Column(JSON))
# Per-template errors: {template_id: error_message} for partial failures
errors: dict | None = Field(default=None, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=datetime.utcnow)
35 changes: 33 additions & 2 deletions api/db/repositories.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from sqlmodel import Session, select
from api.db.models import Template, FormSubmission
from api.db.models import Template, FormSubmission, FillJob, BatchSubmission

# Templates
def create_template(session: Session, template: Template) -> Template:
Expand All @@ -16,4 +16,35 @@ def create_form(session: Session, form: FormSubmission) -> FormSubmission:
session.add(form)
session.commit()
session.refresh(form)
return form
return form

# Fill Jobs
def create_job(session: Session, job: FillJob) -> FillJob:
session.add(job)
session.commit()
session.refresh(job)
return job

def get_job(session: Session, job_id: str) -> FillJob | None:
return session.get(FillJob, job_id)

def update_job(session: Session, job_id: str, **kwargs) -> FillJob | None:
job = session.get(FillJob, job_id)
if not job:
return None
for key, value in kwargs.items():
setattr(job, key, value)
session.add(job)
session.commit()
session.refresh(job)
return job

# Batch Submissions
def create_batch(session: Session, batch: BatchSubmission) -> BatchSubmission:
session.add(batch)
session.commit()
session.refresh(batch)
return batch

def get_batch(session: Session, batch_id: str) -> BatchSubmission | None:
return session.get(BatchSubmission, batch_id)
14 changes: 11 additions & 3 deletions api/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from fastapi import FastAPI
from api.routes import templates, forms
from api.routes import templates, forms, batch
from api.errors.handlers import register_exception_handlers

app = FastAPI()
app = FastAPI(
title="FireForm",
description="Report once, file everywhere — multi-agency incident form filling.",
version="0.2.0",
)

register_exception_handlers(app)

app.include_router(templates.router)
app.include_router(forms.router)
app.include_router(forms.router)
app.include_router(batch.router)
242 changes: 242 additions & 0 deletions api/routes/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
"""
Batch fill endpoint — the "report once, file everywhere" API.

POST /forms/fill/batch

This endpoint is the architectural completion of FireForm's core promise.
A firefighter records one incident transcript. This endpoint:

1. Extracts a canonical incident record from the transcript in a SINGLE
LLM call (all N agency forms share this extraction).

2. Maps the canonical record to each agency template's field schema
CONCURRENTLY via asyncio.gather() — N fast mapping calls in parallel
instead of N * F sequential full-transcript extractions.

3. Fills all N PDFs concurrently in a ThreadPoolExecutor (pdfrw is
synchronous; offloading prevents event loop blocking).

4. Persists a BatchSubmission record with the full canonical extraction
including per-field evidence attribution (verbatim transcript quotes)
alongside individual FormSubmission records per template.

5. Returns everything in a single response: per-template PDF paths,
success/failure per template, and the complete evidence report.

Time complexity improvement:
Sequential per-form extraction: O(T * F) LLM calls
Batch canonical + mapping: O(1 + T) LLM calls
Example (5 agencies, 10 fields): 50 calls → 6 calls

GET /forms/batches/{batch_id} — lightweight status + output paths
GET /forms/batches/{batch_id}/audit — full evidence trail for legal compliance
"""

import asyncio

from fastapi import APIRouter, Depends
from sqlmodel import Session

from api.deps import get_db
from api.schemas.batch import (
BatchFill,
BatchFillResponse,
BatchStatusResponse,
AuditResponse,
TemplateResult,
EvidenceField,
)
from api.db.repositories import get_template, create_form, create_batch, get_batch
from api.db.models import FormSubmission, BatchSubmission
from api.errors.base import AppError
from src.extractor import IncidentExtractor

router = APIRouter(prefix="/forms", tags=["batch"])


@router.post("/fill/batch", response_model=BatchFillResponse)
async def batch_fill(body: BatchFill, db: Session = Depends(get_db)):
"""
Fill multiple agency-specific PDF forms from a single incident transcript.

Extraction runs once (canonical pass) then maps to each template concurrently.
Partial success is tolerated — if one template fails (bad PDF path, mapping
error), the others still complete and the batch status is reported as "partial".
"""
# ── Validate all templates exist upfront ──────────────────────────────────
templates = {}
for tid in body.template_ids:
tpl = get_template(db, tid)
if not tpl:
raise AppError(f"Template {tid} not found", status_code=404)
templates[tid] = tpl

# ── Pass 1: single canonical extraction ───────────────────────────────────
extractor = IncidentExtractor(body.input_text)
canonical = await extractor.async_extract_canonical()
evidence_report = IncidentExtractor.build_evidence_report(canonical)

# ── Pass 2: concurrent mapping to each template ───────────────────────────
import httpx

async with httpx.AsyncClient(timeout=120.0) as client:
mapping_tasks = [
extractor.async_map_to_template(client, canonical, tpl.fields)
for tpl in templates.values()
]
mappings = await asyncio.gather(*mapping_tasks, return_exceptions=True)

# mappings[i] corresponds to templates.values()[i]
template_list = list(templates.values())
template_id_list = list(templates.keys())

# ── Pass 3: concurrent PDF fill in thread pool ────────────────────────────
loop = asyncio.get_running_loop()

async def _fill_one(tpl, data: dict) -> str:
from src.filler import Filler
filler = Filler()
return await loop.run_in_executor(
None,
lambda: filler.fill_form_with_data(tpl.pdf_path, data),
)

fill_tasks = []
failed_at_mapping: dict[int, str] = {}

for i, (tpl, mapping) in enumerate(zip(template_list, mappings)):
if isinstance(mapping, Exception):
failed_at_mapping[template_id_list[i]] = str(mapping)
fill_tasks.append(asyncio.sleep(0)) # placeholder
else:
fill_tasks.append(_fill_one(tpl, mapping))

fill_results = await asyncio.gather(*fill_tasks, return_exceptions=True)

# ── Persist FormSubmission per template + collect results ─────────────────
results: list[TemplateResult] = []
submission_ids: list[int] = []
output_paths: dict[str, str | None] = {}
errors: dict[str, str] = {}

for i, tpl in enumerate(template_list):
tid = template_id_list[i]

if tid in failed_at_mapping:
err = failed_at_mapping[tid]
results.append(TemplateResult(
template_id=tid, status="failed",
submission_id=None, output_pdf_path=None, error=err,
))
errors[str(tid)] = err
output_paths[str(tid)] = None
continue

pdf_result = fill_results[i]
if isinstance(pdf_result, Exception):
err = str(pdf_result)
results.append(TemplateResult(
template_id=tid, status="failed",
submission_id=None, output_pdf_path=None, error=err,
))
errors[str(tid)] = err
output_paths[str(tid)] = None
continue

submission = FormSubmission(
template_id=tid,
input_text=body.input_text,
output_pdf_path=pdf_result,
)
saved = create_form(db, submission)
submission_ids.append(saved.id)
output_paths[str(tid)] = pdf_result
results.append(TemplateResult(
template_id=tid, status="complete",
submission_id=saved.id, output_pdf_path=pdf_result, error=None,
))

# ── Determine overall batch status ────────────────────────────────────────
total_succeeded = sum(1 for r in results if r.status == "complete")
total_failed = len(results) - total_succeeded

if total_failed == 0:
status = "complete"
elif total_succeeded == 0:
status = "failed"
else:
status = "partial"

# ── Persist BatchSubmission ───────────────────────────────────────────────
batch = BatchSubmission(
status=status,
input_text=body.input_text,
canonical_extraction=canonical,
evidence_report=evidence_report,
template_ids=body.template_ids,
submission_ids=submission_ids if submission_ids else None,
output_paths=output_paths,
errors=errors if errors else None,
)
saved_batch = create_batch(db, batch)

# ── Build response ────────────────────────────────────────────────────────
# Convert evidence_report to EvidenceField instances for schema validation
typed_evidence = {
k: EvidenceField(
value=v.get("value"),
evidence=v.get("evidence"),
confidence=v.get("confidence", "low"),
)
for k, v in evidence_report.items()
} if evidence_report else None

return BatchFillResponse(
batch_id=saved_batch.id,
status=status,
input_text=body.input_text,
template_ids=body.template_ids,
results=results,
evidence_report=typed_evidence,
total_requested=len(body.template_ids),
total_succeeded=total_succeeded,
total_failed=total_failed,
created_at=saved_batch.created_at,
)


@router.get("/batches/{batch_id}", response_model=BatchStatusResponse)
def get_batch_status(batch_id: str, db: Session = Depends(get_db)):
"""
Lightweight status check for a completed batch submission.
Returns per-template output_paths and any errors without the full
canonical extraction payload. Use /audit for the full evidence trail.
"""
batch = get_batch(db, batch_id)
if not batch:
raise AppError("Batch not found", status_code=404)
return batch


@router.get("/batches/{batch_id}/audit", response_model=AuditResponse)
def get_batch_audit(batch_id: str, db: Session = Depends(get_db)):
"""
Returns the full evidence trail for a batch submission.

For each canonical incident category that was extracted, the response
includes the extracted value, the verbatim transcript quote used as
evidence, and the confidence level. This endpoint exists specifically
for legal compliance and chain-of-custody verification: supervisors and
legal teams can confirm that every value in every filed form is traceable
to a specific statement in the original incident transcript.
"""
batch = get_batch(db, batch_id)
if not batch:
raise AppError("Batch not found", status_code=404)
return AuditResponse(
batch_id=batch.id,
input_text=batch.input_text,
canonical_extraction=batch.canonical_extraction,
evidence_report=batch.evidence_report,
created_at=batch.created_at,
)
Loading