Skip to content

Commit 4e15af4

Browse files
Improve recording event flushing: periodic saves to numbered files
Changes: - Flush events every 5 seconds (RECORDING_FLUSH_INTERVAL_SECONDS) - Also flush when events exceed 1 MB (RECORDING_FLUSH_SIZE_MB) - Save events to numbered JSON files (1.json, 2.json, etc.) instead of appending to a single file - Move save_dir parameter from stop_recording to start_recording - Add background task for periodic flushing - Track total events and file count across the recording session This improves performance by: 1. Avoiding memory buildup during long recording sessions 2. Writing smaller, incremental files instead of one large file 3. Spreading I/O across the recording duration Co-authored-by: openhands <openhands@all-hands.dev>
1 parent 167def2 commit 4e15af4

File tree

2 files changed

+146
-67
lines changed

2 files changed

+146
-67
lines changed

openhands-tools/openhands/tools/browser_use/impl.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -432,18 +432,24 @@ async def get_content(self, extract_links: bool, start_from_char: int) -> str:
432432

433433
# Session Recording
434434
async def start_recording(self) -> str:
435-
"""Start recording the browser session using rrweb."""
435+
"""Start recording the browser session using rrweb.
436+
437+
Recording events are periodically flushed to numbered JSON files
438+
(1.json, 2.json, etc.) in the full_output_save_dir if configured.
439+
Events are flushed every 5 seconds or when they exceed 1 MB.
440+
"""
436441
await self._ensure_initialized()
437-
return await self._server._start_recording()
442+
return await self._server._start_recording(save_dir=self.full_output_save_dir)
438443

439444
async def stop_recording(self) -> str:
440-
"""Stop recording and save events to file.
445+
"""Stop recording and save remaining events to file.
441446
442-
Recording is automatically saved to a timestamped JSON file in the
443-
full_output_save_dir if configured. Returns a summary message.
447+
Stops the periodic flush, collects any remaining events, and saves
448+
them to a final numbered JSON file. Returns a summary message with
449+
the total events and file count.
444450
"""
445451
await self._ensure_initialized()
446-
return await self._server._stop_recording(save_dir=self.full_output_save_dir)
452+
return await self._server._stop_recording()
447453

448454
async def close_browser(self) -> str:
449455
"""Close the browser session."""

openhands-tools/openhands/tools/browser_use/server.py

Lines changed: 134 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@
7171
RRWEB_START_MAX_RETRIES = 10
7272
RRWEB_START_RETRY_DELAY_MS = 500
7373

74+
# Recording flush configuration
75+
RECORDING_FLUSH_INTERVAL_SECONDS = 5 # Flush every 5 seconds
76+
RECORDING_FLUSH_SIZE_MB = 1 # Flush when events exceed 1 MB
77+
7478

7579
class CustomBrowserUseServer(LogSafeBrowserUseServer):
7680
"""
@@ -86,6 +90,12 @@ class CustomBrowserUseServer(LogSafeBrowserUseServer):
8690
# Recording state stored on Python side to persist across page navigations
8791
_recording_events: list[dict] = []
8892
_is_recording: bool = False
93+
94+
# Recording flush state
95+
_recording_save_dir: str | None = None
96+
_recording_file_counter: int = 0
97+
_recording_flush_task: "asyncio.Task | None" = None
98+
_recording_total_events: int = 0 # Total events across all files
8999

90100
def set_inject_scripts(self, scripts: list[str]) -> None:
91101
"""Set scripts to be injected into every new document.
@@ -128,10 +138,50 @@ async def _inject_scripts_to_session(self) -> None:
128138
except Exception as e:
129139
logger.warning(f"Failed to inject scripts: {e}")
130140

141+
def _save_events_to_file(self, events: list[dict]) -> str | None:
142+
"""Save events to a numbered JSON file.
143+
144+
Args:
145+
events: List of rrweb events to save.
146+
147+
Returns:
148+
Path to the saved file, or None if save_dir is not configured.
149+
"""
150+
import json
151+
import os
152+
153+
if not self._recording_save_dir or not events:
154+
return None
155+
156+
os.makedirs(self._recording_save_dir, exist_ok=True)
157+
self._recording_file_counter += 1
158+
filename = f"{self._recording_file_counter}.json"
159+
filepath = os.path.join(self._recording_save_dir, filename)
160+
161+
with open(filepath, "w") as f:
162+
json.dump(events, f)
163+
164+
self._recording_total_events += len(events)
165+
logger.debug(
166+
f"Saved {len(events)} events to {filename} "
167+
f"(total: {self._recording_total_events} events in "
168+
f"{self._recording_file_counter} files)"
169+
)
170+
return filepath
171+
172+
def _get_events_size_bytes(self) -> int:
173+
"""Estimate the size of current events in bytes."""
174+
import json
175+
if not self._recording_events:
176+
return 0
177+
# Quick estimation using JSON serialization
178+
return len(json.dumps(self._recording_events))
179+
131180
async def _flush_recording_events(self) -> int:
132181
"""Flush recording events from browser to Python storage.
133182
134-
This should be called before navigation to preserve events across pages.
183+
This collects events from the browser and adds them to Python-side storage.
184+
If events exceed the size threshold, they are saved to disk.
135185
Returns the number of events flushed.
136186
"""
137187
if not self.browser_session or not self._is_recording:
@@ -159,11 +209,38 @@ async def _flush_recording_events(self) -> int:
159209
if events:
160210
self._recording_events.extend(events)
161211
logger.debug(f"Flushed {len(events)} recording events from browser")
212+
213+
# Check if we should save to disk (size threshold)
214+
size_bytes = self._get_events_size_bytes()
215+
if size_bytes > RECORDING_FLUSH_SIZE_MB * 1024 * 1024:
216+
self._save_events_to_file(self._recording_events)
217+
self._recording_events = []
218+
162219
return len(events)
163220
except Exception as e:
164221
logger.warning(f"Failed to flush recording events: {e}")
165222
return 0
166223

224+
async def _periodic_flush_task(self) -> None:
225+
"""Background task that periodically flushes recording events."""
226+
import asyncio
227+
228+
while self._is_recording:
229+
await asyncio.sleep(RECORDING_FLUSH_INTERVAL_SECONDS)
230+
if not self._is_recording:
231+
break
232+
233+
try:
234+
# Flush events from browser to Python storage
235+
await self._flush_recording_events()
236+
237+
# Save to disk if we have any events (periodic save)
238+
if self._recording_events:
239+
self._save_events_to_file(self._recording_events)
240+
self._recording_events = []
241+
except Exception as e:
242+
logger.warning(f"Periodic flush failed: {e}")
243+
167244
async def _set_recording_flag(self, should_record: bool) -> None:
168245
"""Set the recording flag in the browser for auto-start on new pages."""
169246
if not self.browser_session:
@@ -241,14 +318,18 @@ async def _restart_recording_on_new_page(self) -> None:
241318
except Exception as e:
242319
logger.warning(f"Failed to restart recording on new page: {e}")
243320

244-
async def _start_recording(self) -> str:
321+
async def _start_recording(self, save_dir: str | None = None) -> str:
245322
"""Start rrweb session recording with automatic retry.
246323
247324
Will retry up to RRWEB_START_MAX_RETRIES times if rrweb is not loaded yet.
248325
This handles the case where recording is started before the page fully loads.
249326
250-
Recording persists across page navigations - events are stored on the Python
251-
side and automatically collected when stop_recording is called.
327+
Recording persists across page navigations - events are periodically flushed
328+
to numbered JSON files (1.json, 2.json, etc.) in the save_dir.
329+
330+
Args:
331+
save_dir: Directory to save recording files. If provided, events will be
332+
periodically saved to numbered JSON files in this directory.
252333
"""
253334
import asyncio
254335

@@ -258,6 +339,9 @@ async def _start_recording(self) -> str:
258339
# Reset Python-side storage for new recording session
259340
self._recording_events = []
260341
self._is_recording = True
342+
self._recording_save_dir = save_dir
343+
self._recording_file_counter = 0
344+
self._recording_total_events = 0
261345

262346
try:
263347
cdp_session = await self.browser_session.get_or_create_cdp_session()
@@ -296,6 +380,10 @@ async def _start_recording(self) -> str:
296380
status = value.get("status") if isinstance(value, dict) else value
297381

298382
if status == "started":
383+
# Start periodic flush task
384+
self._recording_flush_task = asyncio.create_task(
385+
self._periodic_flush_task()
386+
)
299387
logger.info("Recording started successfully with rrweb")
300388
return "Recording started"
301389

@@ -339,18 +427,18 @@ async def _start_recording(self) -> str:
339427
return f"Error starting recording: {str(e)}"
340428

341429
async def _stop_recording(self, save_dir: str | None = None) -> str:
342-
"""Stop rrweb recording and save events to a file.
430+
"""Stop rrweb recording and save remaining events.
343431
344-
Args:
345-
save_dir: Directory to save the recording file. If provided, events
346-
are saved to a timestamped JSON file in this directory.
432+
Stops the periodic flush task, collects any remaining events from the
433+
browser, and saves them to a final numbered JSON file.
434+
435+
Note: The save_dir parameter is ignored - the directory configured at
436+
start_recording time is used. This parameter is kept for API compatibility.
347437
348438
Returns:
349-
A summary message (not the full events - those are saved to file).
439+
A summary message with the save directory and file count.
350440
"""
351441
import json
352-
import os
353-
from datetime import datetime
354442

355443
if not self.browser_session:
356444
return "Error: No browser session active"
@@ -359,9 +447,19 @@ async def _stop_recording(self, save_dir: str | None = None) -> str:
359447
return "Error: Not recording. Call browser_start_recording first."
360448

361449
try:
450+
# Stop the periodic flush task first
451+
self._is_recording = False
452+
if self._recording_flush_task:
453+
self._recording_flush_task.cancel()
454+
try:
455+
await self._recording_flush_task
456+
except Exception:
457+
pass # Task was cancelled, this is expected
458+
self._recording_flush_task = None
459+
362460
cdp_session = await self.browser_session.get_or_create_cdp_session()
363461

364-
# Stop recording on current page and get its events
462+
# Stop recording on current page and get remaining events
365463
result = await cdp_session.cdp_client.send.Runtime.evaluate(
366464
params={
367465
"expression": """
@@ -389,69 +487,44 @@ async def _stop_recording(self, save_dir: str | None = None) -> str:
389487
current_page_data = json.loads(result.get("result", {}).get("value", "{}"))
390488
current_page_events = current_page_data.get("events", [])
391489

392-
# Combine events from Python storage with current page
393-
all_events = self._recording_events + current_page_events
394-
395-
# Count event types for summary
396-
event_types = {}
397-
type_names = {
398-
0: 'DomContentLoaded',
399-
1: 'Load',
400-
2: 'FullSnapshot',
401-
3: 'IncrementalSnapshot',
402-
4: 'Meta',
403-
5: 'Custom',
404-
6: 'Plugin'
405-
}
406-
for e in all_events:
407-
type_num = e.get("type", -1)
408-
type_name = type_names.get(type_num, f'Unknown_{type_num}')
409-
event_types[type_name] = event_types.get(type_name, 0) + 1
490+
# Add current page events to in-memory storage
491+
if current_page_events:
492+
self._recording_events.extend(current_page_events)
410493

411-
# Count pages (each FullSnapshot typically represents a new page)
412-
pages_recorded = event_types.get('FullSnapshot', 0)
494+
# Save any remaining events to a final file
495+
if self._recording_events:
496+
self._save_events_to_file(self._recording_events)
413497

414-
# Reset state
415-
self._is_recording = False
416498
await self._set_recording_flag(False)
417499

418-
# Save recording to file if save_dir is provided
419-
saved_path = None
420-
if save_dir and all_events:
421-
os.makedirs(save_dir, exist_ok=True)
422-
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
423-
filename = f"browser_recording_{timestamp}.json"
424-
saved_path = os.path.join(save_dir, filename)
425-
426-
recording_data = {
427-
"events": all_events,
428-
"metadata": {
429-
"count": len(all_events),
430-
"pages_recorded": pages_recorded,
431-
"event_types": event_types,
432-
"timestamp": timestamp,
433-
}
434-
}
435-
436-
with open(saved_path, "w") as f:
437-
json.dump(recording_data, f)
438-
439-
logger.info(f"Recording saved to: {saved_path}")
500+
# Calculate totals
501+
total_events = self._recording_total_events
502+
total_files = self._recording_file_counter
503+
save_dir_used = self._recording_save_dir
440504

441505
# Clear Python-side storage
442506
self._recording_events = []
507+
self._recording_save_dir = None
508+
self._recording_file_counter = 0
509+
self._recording_total_events = 0
443510

444-
logger.info(f"Recording stopped: {len(all_events)} events from {pages_recorded} page(s)")
511+
logger.info(
512+
f"Recording stopped: {total_events} events saved to "
513+
f"{total_files} file(s) in {save_dir_used}"
514+
)
445515

446-
# Return a concise summary message (not the full events)
447-
summary = f"Recording stopped. Captured {len(all_events)} events from {pages_recorded} page(s)."
448-
if saved_path:
449-
summary += f" Saved to: {saved_path}"
516+
# Return a concise summary message
517+
summary = f"Recording stopped. Captured {total_events} events in {total_files} file(s)."
518+
if save_dir_used:
519+
summary += f" Saved to: {save_dir_used}"
450520

451521
return summary
452522

453523
except Exception as e:
454524
self._is_recording = False
525+
if self._recording_flush_task:
526+
self._recording_flush_task.cancel()
527+
self._recording_flush_task = None
455528
logger.exception("Error stopping recording", exc_info=e)
456529
return f"Error stopping recording: {str(e)}"
457530

0 commit comments

Comments
 (0)