|
1 | 1 | """Tmux-based terminal backend implementation.""" |
2 | 2 |
|
| 3 | +import fcntl |
| 4 | +import os |
3 | 5 | import threading |
4 | 6 | import time |
5 | 7 | import uuid |
|
16 | 18 |
|
17 | 19 | logger = get_logger(__name__) |
18 | 20 |
|
19 | | -# Global lock to serialize tmux session creations |
20 | | -# This prevents race conditions in libtmux when many agents start simultaneously |
21 | | -_TMUX_SESSION_LOCK = threading.Lock() |
| 21 | +# File-based lock for tmux session creation |
| 22 | +# Uses fcntl.flock() which works across multiple processes (e.g., uvicorn workers) |
| 23 | +# Also keeps a threading lock for safety within the same process |
| 24 | +_TMUX_LOCK_FILE = "/tmp/openhands-tmux-session.lock" |
| 25 | +_TMUX_THREAD_LOCK = threading.Lock() |
22 | 26 | _LOCK_COUNTER = 0 # Debug counter to track lock acquisitions |
23 | 27 |
|
24 | | -logger.info(f"tmux_terminal module loaded, lock id={id(_TMUX_SESSION_LOCK)}") |
| 28 | +logger.info(f"tmux_terminal module loaded, lock file={_TMUX_LOCK_FILE}") |
25 | 29 |
|
26 | 30 |
|
27 | 31 | class TmuxTerminal(TerminalInterface): |
@@ -62,46 +66,60 @@ def initialize(self) -> None: |
62 | 66 | logger.debug(f"Initializing tmux terminal with command: {window_command}") |
63 | 67 | session_name = f"openhands-{self.username}-{uuid.uuid4()}" |
64 | 68 |
|
65 | | - # Serialize tmux session creation to prevent race conditions |
| 69 | + # Serialize tmux session creation using both file lock (cross-process) |
| 70 | + # and thread lock (within process) to handle all concurrency cases |
66 | 71 | global _LOCK_COUNTER |
67 | 72 | thread_id = threading.current_thread().ident |
68 | | - logger.info(f"[{session_name[:20]}] Thread {thread_id} waiting for lock (counter={_LOCK_COUNTER})") |
69 | | - with _TMUX_SESSION_LOCK: |
70 | | - _LOCK_COUNTER += 1 |
71 | | - my_counter = _LOCK_COUNTER |
72 | | - logger.info(f"[{session_name[:20]}] Thread {thread_id} ACQUIRED lock #{my_counter}") |
73 | | - # Retry session creation to handle race conditions in libtmux |
74 | | - # where the session is created but can't be found immediately |
75 | | - max_retries = 3 |
76 | | - retry_delay = 0.5 |
77 | | - last_error = None |
78 | | - for attempt in range(max_retries): |
79 | | - try: |
80 | | - logger.info(f"[{session_name[:20]}] Attempt {attempt + 1}/{max_retries} - calling new_session") |
81 | | - self.session = self.server.new_session( |
82 | | - session_name=session_name, |
83 | | - start_directory=self.work_dir, |
84 | | - kill_session=True, |
85 | | - x=1000, |
86 | | - y=1000, |
87 | | - ) |
88 | | - logger.info(f"[{session_name[:20]}] SUCCESS! Session created: {self.session.name}") |
89 | | - break |
90 | | - except TmuxObjectDoesNotExist as e: |
91 | | - last_error = e |
92 | | - logger.warning( |
93 | | - f"[{session_name[:20]}] Attempt {attempt + 1}/{max_retries} FAILED: {e}" |
94 | | - ) |
95 | | - if attempt < max_retries - 1: |
96 | | - logger.info(f"[{session_name[:20]}] Sleeping {retry_delay}s before retry...") |
97 | | - time.sleep(retry_delay) |
98 | | - retry_delay *= 2 # Exponential backoff |
99 | | - else: |
100 | | - logger.error(f"[{session_name[:20]}] All {max_retries} attempts FAILED") |
101 | | - raise RuntimeError( |
102 | | - f"Failed to create tmux session after {max_retries} attempts" |
103 | | - ) from last_error |
104 | | - logger.info(f"[{session_name[:20]}] Thread {thread_id} RELEASING lock #{my_counter}") |
| 73 | + pid = os.getpid() |
| 74 | + short_name = session_name[:20] |
| 75 | + |
| 76 | + logger.info(f"[{short_name}] PID {pid} Thread {thread_id} waiting for locks (counter={_LOCK_COUNTER})") |
| 77 | + |
| 78 | + # Acquire thread lock first (fast path for same-process threads) |
| 79 | + with _TMUX_THREAD_LOCK: |
| 80 | + # Then acquire file lock (for cross-process synchronization) |
| 81 | + lock_fd = os.open(_TMUX_LOCK_FILE, os.O_CREAT | os.O_RDWR) |
| 82 | + try: |
| 83 | + fcntl.flock(lock_fd, fcntl.LOCK_EX) |
| 84 | + _LOCK_COUNTER += 1 |
| 85 | + my_counter = _LOCK_COUNTER |
| 86 | + logger.info(f"[{short_name}] PID {pid} Thread {thread_id} ACQUIRED both locks #{my_counter}") |
| 87 | + |
| 88 | + # Retry session creation to handle race conditions in libtmux |
| 89 | + max_retries = 3 |
| 90 | + retry_delay = 0.5 |
| 91 | + last_error = None |
| 92 | + for attempt in range(max_retries): |
| 93 | + try: |
| 94 | + logger.info(f"[{short_name}] Attempt {attempt + 1}/{max_retries} - calling new_session") |
| 95 | + self.session = self.server.new_session( |
| 96 | + session_name=session_name, |
| 97 | + start_directory=self.work_dir, |
| 98 | + kill_session=True, |
| 99 | + x=1000, |
| 100 | + y=1000, |
| 101 | + ) |
| 102 | + logger.info(f"[{short_name}] SUCCESS! Session created: {self.session.name}") |
| 103 | + break |
| 104 | + except TmuxObjectDoesNotExist as e: |
| 105 | + last_error = e |
| 106 | + logger.warning( |
| 107 | + f"[{short_name}] Attempt {attempt + 1}/{max_retries} FAILED: {e}" |
| 108 | + ) |
| 109 | + if attempt < max_retries - 1: |
| 110 | + logger.info(f"[{short_name}] Sleeping {retry_delay}s before retry...") |
| 111 | + time.sleep(retry_delay) |
| 112 | + retry_delay *= 2 # Exponential backoff |
| 113 | + else: |
| 114 | + logger.error(f"[{short_name}] All {max_retries} attempts FAILED") |
| 115 | + raise RuntimeError( |
| 116 | + f"Failed to create tmux session after {max_retries} attempts" |
| 117 | + ) from last_error |
| 118 | + |
| 119 | + logger.info(f"[{short_name}] PID {pid} Thread {thread_id} RELEASING locks #{my_counter}") |
| 120 | + finally: |
| 121 | + fcntl.flock(lock_fd, fcntl.LOCK_UN) |
| 122 | + os.close(lock_fd) |
105 | 123 | for k, v in env.items(): |
106 | 124 | self.session.set_environment(k, v) |
107 | 125 |
|
|
0 commit comments