Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ __pycache__/
data/scans/
scans/
config/customers.yaml
config/google_drive_credentials.json
data/*.json
auto_scan_config.json
results.xml
Expand Down
104 changes: 96 additions & 8 deletions app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from flask import Flask, request
from datetime import datetime
from flask_socketio import SocketIO
from flask_cors import CORS
import sys
Expand Down Expand Up @@ -75,8 +76,11 @@
from nmapui.google_drive import (
build_google_drive_auth_status,
build_google_drive_auth_url,
create_google_drive_folder,
disconnect_google_drive,
exchange_google_drive_auth_code,
ensure_google_drive_reports_folder,
save_google_drive_credentials,
upload_files_to_google_drive,
)
from nmapui.runtime import (
Expand All @@ -95,6 +99,7 @@
extract_scan_statistics,
find_latest_saved_scan_for_pdf,
get_most_recent_scan_xml,
build_artifact_downloads,
merge_nmap_xml_files,
parse_scan_xml_for_assets,
save_scan_metadata,
Expand Down Expand Up @@ -295,6 +300,85 @@ def safe_emit(event, data=None):
)
run_traceroute = traceroute_bindings["run_traceroute"]

def _sanitize_drive_component(value: str) -> str:
return re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("_")


def _format_scan_folder_name(metadata: dict, scan_path: str) -> str:
timestamp = metadata.get("scan_start_time") or metadata.get("timestamp") or ""
scan_time = None
if timestamp:
normalized = str(timestamp).replace("Z", "+00:00")
try:
scan_time = datetime.fromisoformat(normalized)
except ValueError:
scan_time = None
if scan_time is None:
scan_time = datetime.now()
time_label = scan_time.strftime("%Y-%m-%d_%H-%M-%S")
target_label = _sanitize_drive_component(str(metadata.get("target") or ""))
customer_label = _sanitize_drive_component(str(metadata.get("customer_name") or ""))
path_label = _sanitize_drive_component(scan_path)
parts = [time_label]
if customer_label:
parts.append(customer_label)
if target_label:
parts.append(target_label)
if path_label:
parts.append(path_label)
return "_".join(parts)


def _format_scan_basename(metadata: dict, scan_path: str) -> str:
return _format_scan_folder_name(metadata, scan_path)


def _build_drive_upload_names(file_paths, metadata, scan_path) -> dict[str, str]:
base = _format_scan_basename(metadata, scan_path)
downloads = build_artifact_downloads(metadata or {}, customer_fingerprinter=customer_fingerprinter)
names: dict[str, str] = {}
for file_path in file_paths:
name = file_path.name
if name == "scan_report.pdf":
names[str(file_path)] = downloads.get("pdf", f"{base}.pdf")
elif name == "scan.xml":
names[str(file_path)] = downloads.get("xml", f"{base}.xml")
elif name == "scan_web.html":
names[str(file_path)] = downloads.get("html", f"{base}.html")
elif name == "scan_pdf.html":
names[str(file_path)] = f"{base}_pdf.html"
elif name == "scan.nmap":
names[str(file_path)] = f"{base}.nmap"
elif name == "scan.gnmap":
names[str(file_path)] = f"{base}.gnmap"
else:
names[str(file_path)] = f"{base}_{name}"
return names


def upload_report_artifacts_to_google_drive(*, scan_path, file_paths, metadata, settings_state):
base_folder_id = str((((settings_state or {}).get("sync") or {}).get("google_drive") or {}).get("folder_id", "") or "").strip() or None
folder_name = _format_scan_folder_name(metadata or {}, scan_path or "")
folder_result = create_google_drive_folder(
name=folder_name,
parent_id=base_folder_id,
credentials_path=GOOGLE_DRIVE_CREDENTIALS_FILE,
token_path=GOOGLE_DRIVE_TOKEN_FILE,
key_path=GOOGLE_DRIVE_TOKEN_KEY_FILE,
requests_module=requests,
)
if not folder_result.get("success"):
return folder_result
file_name_map = _build_drive_upload_names(file_paths, metadata or {}, scan_path or "")
return upload_files_to_google_drive(
credentials_path=GOOGLE_DRIVE_CREDENTIALS_FILE,
token_path=GOOGLE_DRIVE_TOKEN_FILE,
key_path=GOOGLE_DRIVE_TOKEN_KEY_FILE,
file_paths=file_paths,
folder_id=folder_result.get("folder_id", ""),
file_name_map=file_name_map,
requests_module=requests,
)

register_app_handlers(
app=app,
Expand Down Expand Up @@ -341,14 +425,7 @@ def safe_emit(event, data=None):
startup_state=startup_state,
runtime_store=runtime_store,
get_auto_scan_thread=runtime_bindings["get_auto_scan_thread"],
upload_report_artifacts_to_google_drive=lambda *, scan_path, file_paths, metadata, settings_state: upload_files_to_google_drive(
credentials_path=GOOGLE_DRIVE_CREDENTIALS_FILE,
token_path=GOOGLE_DRIVE_TOKEN_FILE,
key_path=GOOGLE_DRIVE_TOKEN_KEY_FILE,
file_paths=file_paths,
folder_id=str((((settings_state or {}).get("sync") or {}).get("google_drive") or {}).get("folder_id", "") or "").strip(),
requests_module=requests,
),
upload_report_artifacts_to_google_drive=upload_report_artifacts_to_google_drive,
get_customer_fingerprinter=lambda: customer_fingerprinter,
get_current_customer=lambda: get_current_customer_state(request.sid),
set_current_customer=lambda value: set_current_customer_state(
Expand Down Expand Up @@ -395,6 +472,16 @@ def safe_emit(event, data=None):
state=state,
requests_module=requests,
),
ensure_google_drive_reports_folder=lambda: ensure_google_drive_reports_folder(
credentials_path=GOOGLE_DRIVE_CREDENTIALS_FILE,
token_path=GOOGLE_DRIVE_TOKEN_FILE,
key_path=GOOGLE_DRIVE_TOKEN_KEY_FILE,
requests_module=requests,
),
save_google_drive_credentials=lambda credentials: save_google_drive_credentials(
GOOGLE_DRIVE_CREDENTIALS_FILE,
credentials,
),
disconnect_google_drive=lambda: disconnect_google_drive(
token_path=GOOGLE_DRIVE_TOKEN_FILE,
key_path=GOOGLE_DRIVE_TOKEN_KEY_FILE,
Expand Down Expand Up @@ -451,6 +538,7 @@ def safe_emit(event, data=None):
current_customer=current_customer,
extract_scan_statistics=extract_scan_statistics,
customer_fingerprinter=customer_fingerprinter,
upload_report_artifacts_to_google_drive=upload_report_artifacts_to_google_drive,
runtime_store=runtime_store,
find_latest_saved_scan_for_pdf=find_latest_saved_scan_for_pdf,
load_json_document=load_json_document,
Expand Down
6 changes: 6 additions & 0 deletions nmapui/app_composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def build_report_task_deps(
save_scan_metadata,
scans_dir,
settings_state,
upload_report_artifacts_to_google_drive,
socketio_sleep,
split_subnet_into_chunks,
stylesheet,
Expand All @@ -148,6 +149,7 @@ def build_report_task_deps(
"create_scan_folder": create_scan_folder,
"scans_dir": scans_dir,
"settings_state": settings_state,
"upload_report_artifacts_to_google_drive": upload_report_artifacts_to_google_drive,
"sanitize_customer_dir_name": sanitize_customer_dir_name,
"run_nmap_with_xml_output": run_nmap_with_xml_output,
"merge_nmap_xml_files": merge_nmap_xml_files,
Expand Down Expand Up @@ -405,6 +407,8 @@ def build_settings_routes_deps(
get_google_drive_auth_status,
build_google_drive_auth_url,
exchange_google_drive_auth_code,
ensure_google_drive_reports_folder,
save_google_drive_credentials,
disconnect_google_drive,
):
return {
Expand All @@ -416,6 +420,8 @@ def build_settings_routes_deps(
"get_google_drive_auth_status": get_google_drive_auth_status,
"build_google_drive_auth_url": build_google_drive_auth_url,
"exchange_google_drive_auth_code": exchange_google_drive_auth_code,
"ensure_google_drive_reports_folder": ensure_google_drive_reports_folder,
"save_google_drive_credentials": save_google_drive_credentials,
"disconnect_google_drive": disconnect_google_drive,
}

Expand Down
4 changes: 4 additions & 0 deletions nmapui/app_handler_registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ def register_app_handlers(
get_google_drive_auth_status,
build_google_drive_auth_url,
exchange_google_drive_auth_code,
ensure_google_drive_reports_folder,
save_google_drive_credentials,
disconnect_google_drive,
logger,
):
Expand Down Expand Up @@ -188,6 +190,8 @@ def register_app_handlers(
get_google_drive_auth_status=get_google_drive_auth_status,
build_google_drive_auth_url=build_google_drive_auth_url,
exchange_google_drive_auth_code=exchange_google_drive_auth_code,
ensure_google_drive_reports_folder=ensure_google_drive_reports_folder,
save_google_drive_credentials=save_google_drive_credentials,
disconnect_google_drive=disconnect_google_drive,
),
)
2 changes: 2 additions & 0 deletions nmapui/app_task_bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def build_task_bindings(
current_customer,
extract_scan_statistics,
customer_fingerprinter,
upload_report_artifacts_to_google_drive,
runtime_store,
find_latest_saved_scan_for_pdf,
load_json_document,
Expand Down Expand Up @@ -136,6 +137,7 @@ def generate_report_task(sid, data):
create_scan_folder=create_scan_folder,
scans_dir=scans_dir,
settings_state=settings_state,
upload_report_artifacts_to_google_drive=upload_report_artifacts_to_google_drive,
sanitize_customer_dir_name=sanitize_customer_dir_name,
run_nmap_with_xml_output=run_nmap_with_xml_output,
merge_nmap_xml_files=merge_nmap_xml_files,
Expand Down
130 changes: 129 additions & 1 deletion nmapui/google_drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,27 @@
GOOGLE_DRIVE_REVOKE_ENDPOINT = "https://oauth2.googleapis.com/revoke"
GOOGLE_DRIVE_UPLOAD_ENDPOINT = "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart&fields=id,name,webViewLink"
GOOGLE_DRIVE_SCOPE = "https://www.googleapis.com/auth/drive.file"
GOOGLE_DRIVE_FILES_ENDPOINT = "https://www.googleapis.com/drive/v3/files"
GOOGLE_DRIVE_FOLDER_MIME = "application/vnd.google-apps.folder"


def _format_google_drive_error(payload: dict) -> str:
if not isinstance(payload, dict):
return "Unknown error"
error = payload.get("error")
if isinstance(error, dict):
message = error.get("message") or error.get("status") or "Google Drive API error"
reasons = []
for entry in error.get("errors", []) or []:
if isinstance(entry, dict) and entry.get("reason"):
reasons.append(entry["reason"])
if reasons:
unique = ", ".join(sorted(set(reasons)))
return f"{message} ({unique})"
return message
if isinstance(error, str):
return error
return payload.get("error_description") or payload.get("message") or "Unknown error"
ENCRYPTED_TOKEN_SCHEMA_VERSION = 1


Expand Down Expand Up @@ -82,6 +103,14 @@ def load_google_drive_credentials(credentials_path: Path) -> dict:
return payload if isinstance(payload, dict) else {}


def save_google_drive_credentials(credentials_path: Path, payload: dict) -> dict:
if not isinstance(payload, dict):
return {"success": False, "error": "Invalid credentials payload"}
_save_json_file(credentials_path, payload)
_set_owner_only_permissions(credentials_path)
return {"success": True, "status": "Google Drive credentials saved"}


def load_google_drive_token_state(token_path: Path, key_path: Path | None = None) -> dict:
key_path = key_path or token_path.with_suffix(".key")
try:
Expand Down Expand Up @@ -316,6 +345,7 @@ def upload_files_to_google_drive(
key_path: Path | None = None,
file_paths: list[Path],
folder_id: str,
file_name_map: dict[str, str] | None = None,
requests_module,
) -> dict:
access_token = ensure_google_drive_access_token(
Expand All @@ -326,7 +356,8 @@ def upload_files_to_google_drive(
)
uploaded = []
for file_path in file_paths:
metadata = {"name": file_path.name}
override_name = (file_name_map or {}).get(str(file_path))
metadata = {"name": override_name or file_path.name}
if folder_id:
metadata["parents"] = [folder_id]
with file_path.open("rb") as file_handle:
Expand Down Expand Up @@ -355,3 +386,100 @@ def upload_files_to_google_drive(
"uploaded": uploaded,
"status": f"Uploaded {len(uploaded)} file(s) to Google Drive",
}


def ensure_google_drive_reports_folder(
*,
credentials_path: Path,
token_path: Path,
key_path: Path | None = None,
requests_module,
folder_name: str = "nmapui-reports",
) -> dict:
access_token = ensure_google_drive_access_token(
credentials_path=credentials_path,
token_path=token_path,
key_path=key_path,
requests_module=requests_module,
)
headers = {"Authorization": f"Bearer {access_token}"}
query = (
"mimeType='application/vnd.google-apps.folder' "
f"and name='{folder_name}' and trashed=false"
)
response = requests_module.get(
GOOGLE_DRIVE_FILES_ENDPOINT,
headers=headers,
params={"q": query, "fields": "files(id,name)"},
timeout=10,
)
payload = response.json()
if response.status_code >= 400:
error_detail = _format_google_drive_error(payload)
return {
"success": False,
"error": f"Failed to query Drive folder (HTTP {response.status_code}): {error_detail}",
}

files = payload.get("files") or []
if files:
folder_id = files[0].get("id")
return {"success": True, "folder_id": folder_id, "status": "Drive folder ready"}

create_response = requests_module.post(
GOOGLE_DRIVE_FILES_ENDPOINT,
headers={**headers, "Content-Type": "application/json"},
json={"name": folder_name, "mimeType": GOOGLE_DRIVE_FOLDER_MIME},
timeout=10,
)
create_payload = create_response.json()
if create_response.status_code >= 400:
error_detail = _format_google_drive_error(create_payload)
return {
"success": False,
"error": f"Failed to create Drive folder (HTTP {create_response.status_code}): {error_detail}",
}
return {
"success": True,
"folder_id": create_payload.get("id"),
"status": "Drive folder created",
}


def create_google_drive_folder(
*,
name: str,
parent_id: str | None,
credentials_path: Path,
token_path: Path,
key_path: Path | None = None,
requests_module,
) -> dict:
access_token = ensure_google_drive_access_token(
credentials_path=credentials_path,
token_path=token_path,
key_path=key_path,
requests_module=requests_module,
)
headers = {"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"}
payload = {"name": name, "mimeType": GOOGLE_DRIVE_FOLDER_MIME}
if parent_id:
payload["parents"] = [parent_id]
response = requests_module.post(
GOOGLE_DRIVE_FILES_ENDPOINT,
headers=headers,
json=payload,
timeout=10,
)
create_payload = response.json()
if response.status_code >= 400:
error_detail = _format_google_drive_error(create_payload)
return {
"success": False,
"error": f"Failed to create Drive folder (HTTP {response.status_code}): {error_detail}",
}
return {
"success": True,
"folder_id": create_payload.get("id"),
"status": "Drive folder created",
}
Loading
Loading