Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python_libs/pbl/pbl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .commands import install_lang
from .commands import test
from .commands import install_firmware
from .commands import flash_logs

# TODO: unopened logging ports cause super noisy logs, fix this in the
# pulse package then remove this
Expand Down
96 changes: 96 additions & 0 deletions python_libs/pbl/pbl/commands/flash_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# SPDX-FileCopyrightText: 2025 Federico Bechini
# SPDX-License-Identifier: Apache-2.0

from __future__ import absolute_import, print_function

from libpebble2.services.getbytes import GetBytesService
from libpebble2.exceptions import GetBytesError
from libpebble2.protocol.transfers import GetBytesInfoResponse

from pebble_tool.commands.base import PebbleCommand
from pebble_tool.exceptions import ToolError

import os

class FlashLogsCommand(PebbleCommand):
"""Dump flash logs (PBL_LOG) from the watch."""
command = 'flash_logs'

@classmethod
def add_parser(cls, parser):
parser = super(FlashLogsCommand, cls).add_parser(parser)
parser.add_argument('--board', required=True, type=str.lower,
help='Board name (e.g., aplite, basalt, asterix)')
return parser

def __call__(self, args):
super(FlashLogsCommand, self).__call__(args)
get_bytes = GetBytesService(self.pebble)

# Map board names to (start_address, size)
# Sizes are mostly 128KB (0x20000)
FLASH_LOG_REGIONS = {
# Legacy Platforms
'aplite': (0x3E0000, 0x20000),
'tintin': (0x3E0000, 0x20000),

# Snowy / Spalding (Bottom Boot)
'basalt': (0x000000, 0x20000),
'snowy': (0x000000, 0x20000),
'chalk': (0x000000, 0x20000),
'spalding': (0x000000, 0x20000),

# Silk / Diorite
'diorite': (0x280000, 0x20000),
'silk': (0x280000, 0x20000),

# Robert / Calculus
'robert': (0x480000, 0x20000),
'calculus': (0x480000, 0x20000),

# Asterix
'asterix': (0x1FD0000, 0x20000),

# Obelix / Getafix
'obelix': (0x1FCF000, 0x20000),
'getafix': (0x1FCF000, 0x20000),
}

# Normalize board name
board = args.board

region = FLASH_LOG_REGIONS.get(board)
if not region:
# Try simple aliasing or partial matching if needed, but for now strict map
print("Error: Unknown board '{}'.".format(board))
print("Supported boards: {}".format(", ".join(sorted(FLASH_LOG_REGIONS.keys()))))
return

flash_log_start, flash_log_size = region

print("Board: {}".format(board))
print("Reading flash log region: 0x{:X} - 0x{:X} ({} KB)".format(
flash_log_start, flash_log_start + flash_log_size, flash_log_size // 1024))

try:
flash_data = get_bytes.get_flash_region(flash_log_start, flash_log_size)
print("Read {} bytes from flash".format(len(flash_data)))

# Save to file
import datetime
filename = datetime.datetime.now().strftime("flash_logs_{}_%Y-%m-%d_%H-%M-%S.bin".format(board))
filepath = os.path.abspath(filename)
with open(filename, "wb") as log_file:
log_file.write(flash_data)
print("Saved flash logs to {}".format(filepath))

print("\nTo parse and dehash the logs:")
print(" tools/dehash_flash_logs.py {}".format(filename))

except GetBytesError as ex:
if ex.code == GetBytesInfoResponse.ErrorCode.DoesNotExist:
raise ToolError('Could not read flash region. This may require non-release firmware.')
else:
raise


200 changes: 200 additions & 0 deletions tools/dehash_flash_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2025 Federico Bechini
# SPDX-License-Identifier: Apache-2.0

"""
Parser for Pebble flash logs (PBL_LOG).
Parses the binary circular buffer format and extracts log messages.
"""

import argparse
import struct
import sys
import json
import os
from datetime import datetime

# Setup paths for dehash libraries
PYTHON_LIBS_PATH = os.path.join(os.path.dirname(__file__), '..', 'python_libs')
LOG_HASHING_PATH = os.path.join(os.path.dirname(__file__), '..', 'tools', 'log_hashing')

if PYTHON_LIBS_PATH not in sys.path:
sys.path.insert(0, PYTHON_LIBS_PATH)
if LOG_HASHING_PATH not in sys.path:
sys.path.insert(0, LOG_HASHING_PATH)

try:
import logdehash
DEHASH_AVAILABLE = True
except ImportError:
DEHASH_AVAILABLE = False

# Firmware Constants
LOG_MAGIC = 0x21474F4C # "LOG!"
LOG_VERSION = 0x1
LOG_FLAGS_VALID = 0x1
LOG_PAGE_SIZE = 0x2000 # 8KB
MAX_MSG_LEN = 253

FLASH_LOGGING_HEADER_SIZE = 4 + 1 + 20 + 1 + 1 + 1 # 28 bytes
LOG_RECORD_HEADER_SIZE = 2
LOG_BINARY_MESSAGE_BASE_SIZE = 4 + 1 + 1 + 2 + 16 # 24 bytes

def parse_flash_logging_header(data, offset):
if offset + FLASH_LOGGING_HEADER_SIZE > len(data):
return None

magic, = struct.unpack_from('<I', data, offset)
if magic != LOG_MAGIC:
return None

version = data[offset + 4]
if version != LOG_VERSION:
return None

return {
'build_id': data[offset + 5:offset + 5 + 20].hex(),
'log_file_id': data[offset + 25],
'log_chunk_id': data[offset + 26],
'offset': offset
}

def parse_log_binary_message(data, offset, msg_length):
if offset + LOG_BINARY_MESSAGE_BASE_SIZE + msg_length > len(data):
return None

timestamp, = struct.unpack_from('>I', data, offset)
log_level = data[offset + 4]
line_number, = struct.unpack_from('>H', data, offset + 6)

filename_bytes = data[offset + 8:offset + 8 + 16]
null_pos = filename_bytes.find(b'\x00')
if null_pos >= 0:
filename_bytes = filename_bytes[:null_pos]
filename = filename_bytes.decode('utf-8', errors='ignore')

message_bytes = data[offset + 24:offset + 24 + msg_length]
message = message_bytes.decode('utf-8', errors='ignore').rstrip('\x00')

return {
'timestamp': timestamp,
'log_level': log_level,
'line_number': line_number,
'filename': filename,
'message': message
}

def parse_flash_logs(flash_data):
logs = []
# Find all pages with valid headers
pages = []
for page_start in range(0, len(flash_data), LOG_PAGE_SIZE):
header = parse_flash_logging_header(flash_data, page_start)
if header:
pages.append((page_start, header))

# Sort pages by file_id and chunk_id
pages.sort(key=lambda x: (x[1]['log_file_id'], x[1]['log_chunk_id']))

for page_start, header in pages:
page_offset = page_start + FLASH_LOGGING_HEADER_SIZE
while page_offset < page_start + LOG_PAGE_SIZE:
if page_offset + LOG_RECORD_HEADER_SIZE > len(flash_data):
break

flags = flash_data[page_offset]
length = flash_data[page_offset + 1]

if length == 0 or length > MAX_MSG_LEN:
break

if (flags & LOG_FLAGS_VALID) == 0:
msg_offset = page_offset + LOG_RECORD_HEADER_SIZE
if msg_offset + length <= len(flash_data):
msg_length = flash_data[msg_offset + 5]
log_msg = parse_log_binary_message(flash_data, msg_offset, msg_length)
if log_msg:
logs.append(log_msg)

page_offset += LOG_RECORD_HEADER_SIZE + length
else:
break
return logs

_dehasher = None

def get_dehasher(loghash_dict_path):
global _dehasher
if _dehasher is None and DEHASH_AVAILABLE and loghash_dict_path:
try:
_dehasher = logdehash.LogDehash('', monitor_dict_file=False)
with open(loghash_dict_path, 'r') as f:
_dehasher.load_log_strings_from_dict(json.load(f))
except Exception as e:
print(f"Warning: Failed to load dehash dictionary: {e}")
_dehasher = None
return _dehasher

def format_log_message(log_msg, dehasher=None):
try:
dt = datetime.fromtimestamp(log_msg['timestamp'])
ts = dt.strftime("%H:%M:%S.%f")[:-3]
except:
ts = f"0x{log_msg['timestamp']:08X}"

level = {0:'A', 1:'E', 2:'W', 3:'I', 4:'D'}.get(log_msg['log_level'], '?')
msg = log_msg['message']

if dehasher and msg.startswith('NL:'):
result = dehasher.dehash(f":0> {msg}")
if result and 'formatted_msg' in result:
msg = result['formatted_msg']

filename = log_msg['filename'] or 'unknown'
return f"{level} {ts} {filename}:{log_msg['line_number']}> {msg}"

def main():
parser = argparse.ArgumentParser(description="Parse Pebble flash logs")
parser.add_argument('file', help='Binary flash log file')
parser.add_argument('--filter', help='Filter messages containing text')
parser.add_argument('--output', help='Output file')
parser.add_argument('--show', action='store_true', help='Show logs in stdout')
parser.add_argument('--dehash', help='Path to loghash_dict.json')
args = parser.parse_args()

if not os.path.exists(args.file):
print(f"Error: File {args.file} does not exist")
sys.exit(1)

with open(args.file, 'rb') as f:
data = f.read()

# Try to find default dictionary if not provided
dehash_path = args.dehash
if not dehash_path:
default_dict = os.path.join(os.path.dirname(__file__), '..', 'build', 'src', 'fw', 'tintin_fw_loghash_dict.json')
if os.path.exists(default_dict):
dehash_path = default_dict

logs = parse_flash_logs(data)
dehasher = get_dehasher(dehash_path)

if args.filter:
logs = [l for l in logs if args.filter in l['message']]

output_lines = [format_log_message(l, dehasher) for l in logs]
output_text = '\n'.join(output_lines)

current_ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
out_file = args.output or os.path.splitext(args.file)[0] + f"_parsed_{current_ts}.txt"
with open(out_file, 'w', encoding='utf-8') as f:
f.write(output_text)

print(f"Successfully parsed {len(logs)} messages. Saved to: {out_file}")

if args.show:
for line in output_lines:
print(line)

if __name__ == '__main__':
main()
Loading