Skip to content

Commit 659ac1e

Browse files
committed
Merging pull request 147
Signed-off-by: Lukáš Doktor <[email protected]> * github.com:autotest/aexpect: aexpect: Use time.monotonic() for time-diff operations aexpect: Protect close() from other thread/process entry aexpect.shared: Use BSD flock to protect against other threads aexpect: Avoid indefinite wait on cleanup
2 parents 223e1d1 + 3a8ea81 commit 659ac1e

File tree

5 files changed

+112
-61
lines changed

5 files changed

+112
-61
lines changed

aexpect/client.py

Lines changed: 61 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,9 @@ def __init__(
143143
self.encoding = encoding
144144
self.reader_fds = {}
145145
base_dir = os.path.join(BASE_DIR, f"aexpect_{self.a_id}")
146+
self._close_lockfile = os.path.join(
147+
BASE_DIR, f"aexpect_{self.a_id}.lock"
148+
)
146149

147150
# Define filenames for communication with server
148151
utils_path.init_dir(base_dir)
@@ -263,8 +266,8 @@ def _get_aexpect_helper(self, helper_cmd, pass_fds, echo, command):
263266
# Wait for the server to complete its initialization
264267
full_output = ""
265268
pattern = f"Server {self.a_id} ready"
266-
end_time = time.time() + 60
267-
while time.time() < end_time:
269+
end_time = time.monotonic() + 60
270+
while time.monotonic() < end_time:
268271
output = sub.stdout.readline().decode(self.encoding, "ignore")
269272
if pattern in output:
270273
break
@@ -431,21 +434,48 @@ def close(self, sig=signal.SIGKILL):
431434
432435
:param sig: The signal to send the process when attempting to kill it.
433436
"""
434-
if not self.closed:
435-
self.kill(sig=sig)
436-
# Wait for the server to exit
437-
wait_for_lock(self.lock_server_running_filename)
438-
# Call all cleanup routines
439-
for hook in self.close_hooks:
440-
hook(self)
441-
# Close reader file descriptors
442-
self._close_reader_fds()
443-
self.reader_fds = {}
444-
# Remove all used files
445-
if "AEXPECT_DEBUG" not in os.environ:
446-
shutil.rmtree(os.path.join(BASE_DIR, f"aexpect_{self.a_id}"))
447-
self._close_aexpect_helper()
448-
self.closed = True
437+
if self.closed:
438+
return
439+
lock = None
440+
try:
441+
try:
442+
lock = get_lock_fd(self._close_lockfile, timeout=60)
443+
except FileNotFoundError:
444+
if not self.closed:
445+
raise
446+
if not self.closed:
447+
self.kill(sig=sig)
448+
# Wait for the server to exit
449+
if not wait_for_lock(
450+
self.lock_server_running_filename, timeout=60
451+
):
452+
LOG.warning(
453+
"Failed to get lock, the aexpect_helper "
454+
"process might be left behind. Proceeding "
455+
"anyway..."
456+
)
457+
# Call all cleanup routines
458+
for hook in self.close_hooks:
459+
hook(self)
460+
# Close reader file descriptors
461+
self._close_reader_fds()
462+
self.reader_fds = {}
463+
# Remove all used files
464+
if "AEXPECT_DEBUG" not in os.environ:
465+
shutil.rmtree(
466+
os.path.join(BASE_DIR, f"aexpect_{self.a_id}"),
467+
ignore_errors=True,
468+
)
469+
self._close_aexpect_helper()
470+
self.closed = True
471+
finally:
472+
if lock is not None:
473+
try:
474+
unlock_fd(lock)
475+
os.unlink(self._close_lockfile)
476+
except FileNotFoundError:
477+
# File already removed by other thread
478+
pass
449479

450480
def set_linesep(self, linesep):
451481
"""
@@ -866,7 +896,7 @@ def _read_nonblocking(self, internal_timeout=None, timeout=None):
866896
internal_timeout *= 1000
867897
end_time = None
868898
if timeout:
869-
end_time = time.time() + timeout
899+
end_time = time.monotonic() + timeout
870900
expect_pipe = self._get_fd("expect")
871901
poller = select.poll()
872902
poller.register(expect_pipe, select.POLLIN)
@@ -885,7 +915,7 @@ def _read_nonblocking(self, internal_timeout=None, timeout=None):
885915
data += raw_data.decode(self.encoding, "ignore")
886916
else:
887917
return read, data
888-
if end_time and time.time() > end_time:
918+
if end_time and time.monotonic() > end_time:
889919
return read, data
890920

891921
def read_nonblocking(self, internal_timeout=None, timeout=None):
@@ -979,10 +1009,10 @@ def read_until_output_matches(
9791009
poller = select.poll()
9801010
poller.register(expect_pipe, select.POLLIN)
9811011
output = ""
982-
end_time = time.time() + timeout
1012+
end_time = time.monotonic() + timeout
9831013
while True:
9841014
try:
985-
max_ms = int((end_time - time.time()) * 1000)
1015+
max_ms = int((end_time - time.monotonic()) * 1000)
9861016
poll_timeout_ms = max(0, max_ms)
9871017
poll_status = poller.poll(poll_timeout_ms)
9881018
except select.error:
@@ -991,7 +1021,7 @@ def read_until_output_matches(
9911021
raise ExpectTimeoutError(patterns, output)
9921022
# Read data from child
9931023
read, data = self._read_nonblocking(
994-
internal_timeout, end_time - time.time()
1024+
internal_timeout, end_time - time.monotonic()
9951025
)
9961026
if not read:
9971027
break
@@ -1261,10 +1291,10 @@ def is_responsive(self, timeout=5.0):
12611291
# Send a newline
12621292
self.sendline()
12631293
# Wait up to timeout seconds for some output from the child
1264-
end_time = time.time() + timeout
1265-
while time.time() < end_time:
1294+
end_time = time.monotonic() + timeout
1295+
while time.monotonic() < end_time:
12661296
time.sleep(0.5)
1267-
if self.read_nonblocking(0, end_time - time.time()).strip():
1297+
if self.read_nonblocking(0, end_time - time.monotonic()).strip():
12681298
return True
12691299
# No output -- report unresponsive
12701300
return False
@@ -1384,8 +1414,8 @@ def cmd_output_safe(self, cmd, timeout=60, strip_console_codes=False):
13841414
self.sendline(cmd)
13851415
out = ""
13861416
success = False
1387-
start_time = time.time()
1388-
while (time.time() - start_time) < timeout:
1417+
start_time = time.monotonic()
1418+
while (time.monotonic() - start_time) < timeout:
13891419
try:
13901420
out += self.read_up_to_prompt(0.5)
13911421
success = True
@@ -1721,8 +1751,8 @@ def run_tail(
17211751
encoding=encoding,
17221752
)
17231753

1724-
end_time = time.time() + timeout
1725-
while time.time() < end_time and bg_process.is_alive():
1754+
end_time = time.monotonic() + timeout
1755+
while time.monotonic() < end_time and bg_process.is_alive():
17261756
time.sleep(0.1)
17271757

17281758
return bg_process
@@ -1774,8 +1804,8 @@ def run_bg(
17741804
encoding=encoding,
17751805
)
17761806

1777-
end_time = time.time() + timeout
1778-
while time.time() < end_time and bg_process.is_alive():
1807+
end_time = time.monotonic() + timeout
1808+
while time.monotonic() < end_time and bg_process.is_alive():
17791809
time.sleep(0.1)
17801810

17811811
return bg_process

aexpect/remote.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -532,9 +532,9 @@ def wait_for_login(
532532
client,
533533
timeout,
534534
)
535-
end_time = time.time() + timeout
535+
end_time = time.monotonic() + timeout
536536
verbose = False
537-
while time.time() < end_time:
537+
while time.monotonic() < end_time:
538538
try:
539539
return remote_login(
540540
client,
@@ -1375,9 +1375,9 @@ def transfer(*args, **kwargs):
13751375
msg = f"Copy file from {args[0]}:{args[5]} to {args[6]}, "
13761376
else:
13771377
msg = f"Copy file from {args[5]} to {args[0]}:{args[6]}, "
1378-
start_time = time.time()
1378+
start_time = time.monotonic()
13791379
ret = func(*args, **kwargs)
1380-
elapsed_time = time.time() - start_time
1380+
elapsed_time = time.monotonic() - start_time
13811381
if kwargs.get("filesize", None) is not None:
13821382
throughput = kwargs["filesize"] / elapsed_time
13831383
msg += f"estimated throughput: {throughput:.2f} MB/s"

aexpect/rss_client.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ def __init__(self, address, port, log_func=None, timeout=20):
143143
) from timeout_error
144144
self._send(struct.pack("=i", CHUNKSIZE))
145145
self._log_func = log_func
146-
self._last_time = time.time()
146+
self._last_time = time.monotonic()
147147
self._last_transferred = 0
148148
self.transferred = 0
149149

@@ -173,10 +173,10 @@ def _send(self, data, timeout=60):
173173

174174
def _receive(self, size, timeout=60):
175175
strs = []
176-
end_time = time.time() + timeout
176+
end_time = time.monotonic() + timeout
177177
try:
178178
while size > 0:
179-
timeout = end_time - time.time()
179+
timeout = end_time - time.monotonic()
180180
if timeout <= 0:
181181
raise socket.timeout
182182
self._socket.settimeout(timeout)
@@ -202,15 +202,15 @@ def _receive(self, size, timeout=60):
202202

203203
def _report_stats(self, data):
204204
if self._log_func:
205-
delta = time.time() - self._last_time
205+
delta = time.monotonic() - self._last_time
206206
if delta >= 1:
207207
transferred = self.transferred / 1048576.0
208208
speed = (self.transferred - self._last_transferred) / delta
209209
speed /= 1048576.0
210210
self._log_func(
211211
f"{data} {transferred:.3f} MB ({speed:.3f}" " MB/sec)"
212212
)
213-
self._last_time = time.time()
213+
self._last_time = time.monotonic()
214214
self._last_transferred = self.transferred
215215

216216
def _send_packet(self, data, timeout=60):
@@ -232,10 +232,10 @@ def _send_file_chunks(self, filename, timeout=60):
232232
self._log_func(f"Sending file {filename}")
233233
with open(filename, "rb") as file_handle:
234234
try:
235-
end_time = time.time() + timeout
235+
end_time = time.monotonic() + timeout
236236
while True:
237237
data = file_handle.read(CHUNKSIZE)
238-
self._send_packet(data, int(end_time - time.time()))
238+
self._send_packet(data, int(end_time - time.monotonic()))
239239
if len(data) < CHUNKSIZE:
240240
break
241241
except FileTransferError as error:
@@ -247,9 +247,9 @@ def _receive_file_chunks(self, filename, timeout=60):
247247
self._log_func(f"Receiving file {filename}")
248248
with open(filename, "wb") as file_handle:
249249
try:
250-
end_time = time.time() + timeout
250+
end_time = time.monotonic() + timeout
251251
while True:
252-
data = self._receive_packet(int(end_time - time.time()))
252+
data = self._receive_packet(int(end_time - time.monotonic()))
253253
file_handle.write(data)
254254
if len(data) < CHUNKSIZE:
255255
break
@@ -306,7 +306,7 @@ def _upload_file(self, path, end_time):
306306
if os.path.isfile(path):
307307
self._send_msg(RSS_CREATE_FILE)
308308
self._send_packet(os.path.basename(path).encode())
309-
self._send_file_chunks(path, end_time - time.time())
309+
self._send_file_chunks(path, end_time - time.monotonic())
310310
elif os.path.isdir(path):
311311
self._send_msg(RSS_CREATE_DIR)
312312
self._send_packet(os.path.basename(path).encode())
@@ -349,7 +349,7 @@ def upload(self, src_pattern, dst_path, timeout=600):
349349
message to the client
350350
:note: Other exceptions can be raised.
351351
"""
352-
end_time = time.time() + timeout
352+
end_time = time.monotonic() + timeout
353353
try:
354354
try:
355355
self._send_msg(RSS_SET_PATH)
@@ -371,7 +371,7 @@ def upload(self, src_pattern, dst_path, timeout=600):
371371
"or directories"
372372
)
373373
# Look for RSS_OK or RSS_ERROR
374-
msg = self._receive_msg(int(end_time - time.time()))
374+
msg = self._receive_msg(int(end_time - time.monotonic()))
375375
if msg == RSS_OK:
376376
return
377377
if msg == RSS_ERROR:
@@ -446,7 +446,7 @@ def download(self, src_pattern, dst_path, timeout=600):
446446
:note: Other exceptions can be raised.
447447
"""
448448
dst_path = os.path.abspath(dst_path)
449-
end_time = time.time() + timeout
449+
end_time = time.monotonic() + timeout
450450
file_count = 0
451451
dir_count = 0
452452
try:
@@ -463,7 +463,7 @@ def download(self, src_pattern, dst_path, timeout=600):
463463
if os.path.isdir(dst_path):
464464
dst_path = os.path.join(dst_path, filename)
465465
self._receive_file_chunks(
466-
dst_path, int(end_time - time.time())
466+
dst_path, int(end_time - time.monotonic())
467467
)
468468
dst_path = os.path.dirname(dst_path)
469469
file_count += 1

aexpect/shared.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,36 @@
1414
import os
1515
import fcntl
1616
import termios
17+
import time
1718

1819
BASE_DIR = os.environ.get("TMPDIR", "/tmp")
1920

2021

21-
def get_lock_fd(filename):
22+
def get_lock_fd(filename, timeout=-1):
2223
"""Lock a file"""
2324
if not os.path.exists(filename):
2425
with open(filename, "w", encoding="utf-8"):
2526
pass
27+
2628
lock_fd = os.open(filename, os.O_RDWR)
27-
fcntl.lockf(lock_fd, fcntl.LOCK_EX)
29+
lock_flags = fcntl.LOCK_EX
30+
if timeout > 0:
31+
lock_flags |= fcntl.LOCK_NB
32+
end_time = time.monotonic() + timeout if timeout > 0 else -1
33+
while True:
34+
try:
35+
fcntl.flock(lock_fd, lock_flags)
36+
break
37+
except IOError:
38+
if time.monotonic() > end_time:
39+
os.close(lock_fd)
40+
raise
2841
return lock_fd
2942

3043

3144
def unlock_fd(lock_fd):
3245
"""Unlock a file"""
33-
fcntl.lockf(lock_fd, fcntl.LOCK_UN)
46+
fcntl.flock(lock_fd, fcntl.LOCK_UN)
3447
os.close(lock_fd)
3548

3649

@@ -41,19 +54,27 @@ def is_file_locked(filename):
4154
except OSError:
4255
return False
4356
try:
44-
fcntl.lockf(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
57+
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
4558
except IOError:
4659
os.close(lock_fd)
4760
return True
48-
fcntl.lockf(lock_fd, fcntl.LOCK_UN)
61+
fcntl.flock(lock_fd, fcntl.LOCK_UN)
4962
os.close(lock_fd)
5063
return False
5164

5265

53-
def wait_for_lock(filename):
54-
"""Wait until lock can be acquired, then release it"""
55-
lock_fd = get_lock_fd(filename)
66+
def wait_for_lock(filename, timeout=-1):
67+
"""
68+
Wait until lock can be acquired, then release it
69+
70+
:return: True on success, False on failure/timeout
71+
"""
72+
try:
73+
lock_fd = get_lock_fd(filename, timeout)
74+
except (IOError, FileNotFoundError):
75+
return False
5676
unlock_fd(lock_fd)
77+
return True
5778

5879

5980
def makeraw(shell_fd):

aexpect/utils/wait.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@ def wait_for(func, timeout, first=0.0, step=1.0, text=None):
3030
:param step: Time to sleep between attempts in seconds
3131
:param text: Text to print while waiting, for debug purposes
3232
"""
33-
start_time = time.time()
34-
end_time = time.time() + timeout
33+
start_time = time.monotonic()
34+
end_time = time.monotonic() + timeout
3535

3636
time.sleep(first)
3737

38-
while time.time() < end_time:
38+
while time.monotonic() < end_time:
3939
if text:
40-
_LOG.debug("%s (%f secs)", text, (time.time() - start_time))
40+
_LOG.debug("%s (%f secs)", text, (time.monotonic() - start_time))
4141

4242
output = func()
4343
if output:

0 commit comments

Comments
 (0)