From e5d35f09118738edb5e88dc5c7275bbd07f5c350 Mon Sep 17 00:00:00 2001 From: Federico Bechini Date: Wed, 7 Jan 2026 21:58:52 -0300 Subject: [PATCH] pbl: add flash log download and dehash tools Implement 'pbl flash_logs' to download raw logs from the watch flash and 'tools/dehash_flash_logs.py' to parse and dehash them. Signed-off-by: Federico Bechini --- python_libs/pbl/pbl/__init__.py | 1 + python_libs/pbl/pbl/commands/flash_logs.py | 96 ++++++++++ tools/dehash_flash_logs.py | 200 +++++++++++++++++++++ 3 files changed, 297 insertions(+) create mode 100644 python_libs/pbl/pbl/commands/flash_logs.py create mode 100755 tools/dehash_flash_logs.py diff --git a/python_libs/pbl/pbl/__init__.py b/python_libs/pbl/pbl/__init__.py index f306f806b..65d97bfc5 100644 --- a/python_libs/pbl/pbl/__init__.py +++ b/python_libs/pbl/pbl/__init__.py @@ -7,6 +7,7 @@ from .commands import install_lang from .commands import test from .commands import install_firmware +from .commands import flash_logs # TODO: unopened logging ports cause super noisy logs, fix this in the # pulse package then remove this diff --git a/python_libs/pbl/pbl/commands/flash_logs.py b/python_libs/pbl/pbl/commands/flash_logs.py new file mode 100644 index 000000000..66e703189 --- /dev/null +++ b/python_libs/pbl/pbl/commands/flash_logs.py @@ -0,0 +1,96 @@ +# SPDX-FileCopyrightText: 2025 Federico Bechini +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import absolute_import, print_function + +from libpebble2.services.getbytes import GetBytesService +from libpebble2.exceptions import GetBytesError +from libpebble2.protocol.transfers import GetBytesInfoResponse + +from pebble_tool.commands.base import PebbleCommand +from pebble_tool.exceptions import ToolError + +import os + +class FlashLogsCommand(PebbleCommand): + """Dump flash logs (PBL_LOG) from the watch.""" + command = 'flash_logs' + + @classmethod + def add_parser(cls, parser): + parser = super(FlashLogsCommand, cls).add_parser(parser) + parser.add_argument('--board', required=True, type=str.lower, + help='Board name (e.g., aplite, basalt, asterix)') + return parser + + def __call__(self, args): + super(FlashLogsCommand, self).__call__(args) + get_bytes = GetBytesService(self.pebble) + + # Map board names to (start_address, size) + # Sizes are mostly 128KB (0x20000) + FLASH_LOG_REGIONS = { + # Legacy Platforms + 'aplite': (0x3E0000, 0x20000), + 'tintin': (0x3E0000, 0x20000), + + # Snowy / Spalding (Bottom Boot) + 'basalt': (0x000000, 0x20000), + 'snowy': (0x000000, 0x20000), + 'chalk': (0x000000, 0x20000), + 'spalding': (0x000000, 0x20000), + + # Silk / Diorite + 'diorite': (0x280000, 0x20000), + 'silk': (0x280000, 0x20000), + + # Robert / Calculus + 'robert': (0x480000, 0x20000), + 'calculus': (0x480000, 0x20000), + + # Asterix + 'asterix': (0x1FD0000, 0x20000), + + # Obelix / Getafix + 'obelix': (0x1FCF000, 0x20000), + 'getafix': (0x1FCF000, 0x20000), + } + + # Normalize board name + board = args.board + + region = FLASH_LOG_REGIONS.get(board) + if not region: + # Try simple aliasing or partial matching if needed, but for now strict map + print("Error: Unknown board '{}'.".format(board)) + print("Supported boards: {}".format(", ".join(sorted(FLASH_LOG_REGIONS.keys())))) + return + + flash_log_start, flash_log_size = region + + print("Board: {}".format(board)) + print("Reading flash log region: 0x{:X} - 0x{:X} ({} KB)".format( + flash_log_start, flash_log_start + flash_log_size, flash_log_size // 1024)) + + try: + flash_data = get_bytes.get_flash_region(flash_log_start, flash_log_size) + print("Read {} bytes from flash".format(len(flash_data))) + + # Save to file + import datetime + filename = datetime.datetime.now().strftime("flash_logs_{}_%Y-%m-%d_%H-%M-%S.bin".format(board)) + filepath = os.path.abspath(filename) + with open(filename, "wb") as log_file: + log_file.write(flash_data) + print("Saved flash logs to {}".format(filepath)) + + print("\nTo parse and dehash the logs:") + print(" tools/dehash_flash_logs.py {}".format(filename)) + + except GetBytesError as ex: + if ex.code == GetBytesInfoResponse.ErrorCode.DoesNotExist: + raise ToolError('Could not read flash region. This may require non-release firmware.') + else: + raise + + diff --git a/tools/dehash_flash_logs.py b/tools/dehash_flash_logs.py new file mode 100755 index 000000000..ad5d38fd5 --- /dev/null +++ b/tools/dehash_flash_logs.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: 2025 Federico Bechini +# SPDX-License-Identifier: Apache-2.0 + +""" +Parser for Pebble flash logs (PBL_LOG). +Parses the binary circular buffer format and extracts log messages. +""" + +import argparse +import struct +import sys +import json +import os +from datetime import datetime + +# Setup paths for dehash libraries +PYTHON_LIBS_PATH = os.path.join(os.path.dirname(__file__), '..', 'python_libs') +LOG_HASHING_PATH = os.path.join(os.path.dirname(__file__), '..', 'tools', 'log_hashing') + +if PYTHON_LIBS_PATH not in sys.path: + sys.path.insert(0, PYTHON_LIBS_PATH) +if LOG_HASHING_PATH not in sys.path: + sys.path.insert(0, LOG_HASHING_PATH) + +try: + import logdehash + DEHASH_AVAILABLE = True +except ImportError: + DEHASH_AVAILABLE = False + +# Firmware Constants +LOG_MAGIC = 0x21474F4C # "LOG!" +LOG_VERSION = 0x1 +LOG_FLAGS_VALID = 0x1 +LOG_PAGE_SIZE = 0x2000 # 8KB +MAX_MSG_LEN = 253 + +FLASH_LOGGING_HEADER_SIZE = 4 + 1 + 20 + 1 + 1 + 1 # 28 bytes +LOG_RECORD_HEADER_SIZE = 2 +LOG_BINARY_MESSAGE_BASE_SIZE = 4 + 1 + 1 + 2 + 16 # 24 bytes + +def parse_flash_logging_header(data, offset): + if offset + FLASH_LOGGING_HEADER_SIZE > len(data): + return None + + magic, = struct.unpack_from(' len(data): + return None + + timestamp, = struct.unpack_from('>I', data, offset) + log_level = data[offset + 4] + line_number, = struct.unpack_from('>H', data, offset + 6) + + filename_bytes = data[offset + 8:offset + 8 + 16] + null_pos = filename_bytes.find(b'\x00') + if null_pos >= 0: + filename_bytes = filename_bytes[:null_pos] + filename = filename_bytes.decode('utf-8', errors='ignore') + + message_bytes = data[offset + 24:offset + 24 + msg_length] + message = message_bytes.decode('utf-8', errors='ignore').rstrip('\x00') + + return { + 'timestamp': timestamp, + 'log_level': log_level, + 'line_number': line_number, + 'filename': filename, + 'message': message + } + +def parse_flash_logs(flash_data): + logs = [] + # Find all pages with valid headers + pages = [] + for page_start in range(0, len(flash_data), LOG_PAGE_SIZE): + header = parse_flash_logging_header(flash_data, page_start) + if header: + pages.append((page_start, header)) + + # Sort pages by file_id and chunk_id + pages.sort(key=lambda x: (x[1]['log_file_id'], x[1]['log_chunk_id'])) + + for page_start, header in pages: + page_offset = page_start + FLASH_LOGGING_HEADER_SIZE + while page_offset < page_start + LOG_PAGE_SIZE: + if page_offset + LOG_RECORD_HEADER_SIZE > len(flash_data): + break + + flags = flash_data[page_offset] + length = flash_data[page_offset + 1] + + if length == 0 or length > MAX_MSG_LEN: + break + + if (flags & LOG_FLAGS_VALID) == 0: + msg_offset = page_offset + LOG_RECORD_HEADER_SIZE + if msg_offset + length <= len(flash_data): + msg_length = flash_data[msg_offset + 5] + log_msg = parse_log_binary_message(flash_data, msg_offset, msg_length) + if log_msg: + logs.append(log_msg) + + page_offset += LOG_RECORD_HEADER_SIZE + length + else: + break + return logs + +_dehasher = None + +def get_dehasher(loghash_dict_path): + global _dehasher + if _dehasher is None and DEHASH_AVAILABLE and loghash_dict_path: + try: + _dehasher = logdehash.LogDehash('', monitor_dict_file=False) + with open(loghash_dict_path, 'r') as f: + _dehasher.load_log_strings_from_dict(json.load(f)) + except Exception as e: + print(f"Warning: Failed to load dehash dictionary: {e}") + _dehasher = None + return _dehasher + +def format_log_message(log_msg, dehasher=None): + try: + dt = datetime.fromtimestamp(log_msg['timestamp']) + ts = dt.strftime("%H:%M:%S.%f")[:-3] + except: + ts = f"0x{log_msg['timestamp']:08X}" + + level = {0:'A', 1:'E', 2:'W', 3:'I', 4:'D'}.get(log_msg['log_level'], '?') + msg = log_msg['message'] + + if dehasher and msg.startswith('NL:'): + result = dehasher.dehash(f":0> {msg}") + if result and 'formatted_msg' in result: + msg = result['formatted_msg'] + + filename = log_msg['filename'] or 'unknown' + return f"{level} {ts} {filename}:{log_msg['line_number']}> {msg}" + +def main(): + parser = argparse.ArgumentParser(description="Parse Pebble flash logs") + parser.add_argument('file', help='Binary flash log file') + parser.add_argument('--filter', help='Filter messages containing text') + parser.add_argument('--output', help='Output file') + parser.add_argument('--show', action='store_true', help='Show logs in stdout') + parser.add_argument('--dehash', help='Path to loghash_dict.json') + args = parser.parse_args() + + if not os.path.exists(args.file): + print(f"Error: File {args.file} does not exist") + sys.exit(1) + + with open(args.file, 'rb') as f: + data = f.read() + + # Try to find default dictionary if not provided + dehash_path = args.dehash + if not dehash_path: + default_dict = os.path.join(os.path.dirname(__file__), '..', 'build', 'src', 'fw', 'tintin_fw_loghash_dict.json') + if os.path.exists(default_dict): + dehash_path = default_dict + + logs = parse_flash_logs(data) + dehasher = get_dehasher(dehash_path) + + if args.filter: + logs = [l for l in logs if args.filter in l['message']] + + output_lines = [format_log_message(l, dehasher) for l in logs] + output_text = '\n'.join(output_lines) + + current_ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + out_file = args.output or os.path.splitext(args.file)[0] + f"_parsed_{current_ts}.txt" + with open(out_file, 'w', encoding='utf-8') as f: + f.write(output_text) + + print(f"Successfully parsed {len(logs)} messages. Saved to: {out_file}") + + if args.show: + for line in output_lines: + print(line) + +if __name__ == '__main__': + main()