Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions aexpect/ops_linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from shlex import quote

from aexpect.exceptions import ShellCmdError

# Need this import for sphinx and other documentation to produce links later on
# from .client import ShellSession

Expand Down Expand Up @@ -231,7 +232,7 @@ def ls(session, path, quote_path=True, flags="-1UNq"): # pylint: disable=C0103
Just like :py:func:`os.listdir`, does not include file names starting with
dot (`'.'`)
"""
cmd = f'ls {flags} {quote(path)}' if quote_path else f'ls {flags} {path}'
cmd = f"ls {flags} {quote(path)}" if quote_path else f"ls {flags} {path}"
status, output = session.cmd_status_output(cmd)
status, output = _process_status_output(cmd, status, output)
return output.splitlines()
Expand All @@ -253,7 +254,7 @@ def glob(session, glob_pattern):
Alternative implementations are either shell specific or use `echo` and
thus cannot handle spaces in paths well.
"""
cmd = f'find {glob_pattern} -maxdepth 0'
cmd = f"find {glob_pattern} -maxdepth 0"
status, output = session.cmd_status_output(cmd)
if status == 1:
return []
Expand All @@ -276,8 +277,11 @@ def move(session, source, target, quote_path=True, flags=""):
Calls `mv source target` through a session. See `man mv` for what source
and target can be and what behavior to expect.
"""
cmd = f'mv {flags} {quote(source)} {quote(target)}' if quote_path \
else f'mv {flags} {source} {target}'
cmd = (
f"mv {flags} {quote(source)} {quote(target)}"
if quote_path
else f"mv {flags} {source} {target}"
)
status, output = session.cmd_status_output(cmd)
_process_status_output(cmd, status, output)

Expand All @@ -297,8 +301,11 @@ def copy(session, source, target, quote_path=True, flags=""):
Calls `cp source target` through a session. See `man cp` for what source
and target can be and what behavior to expect.
"""
cmd = f'cp {flags} {quote(source)} {quote(target)}' if quote_path \
else f'cp {flags} {source} {target}'
cmd = f"cp {flags} "
if quote_path:
cmd += f"{quote(source)} {quote(target)}"
else:
cmd += f"{source} {target}"
status, output = session.cmd_status_output(cmd)
_process_status_output(cmd, status, output)

Expand All @@ -316,7 +323,7 @@ def remove(session, path, quote_path=True, flags="-fr"):

Calls `rm -rf`.
"""
cmd = f'rm {flags} {quote(path)}' if quote_path else f'rm {flags} {path}'
cmd = f"rm {flags} {quote(path)}" if quote_path else f"rm {flags} {path}"
status, output = session.cmd_status_output(cmd)
_process_status_output(cmd, status, output)

Expand All @@ -335,7 +342,7 @@ def make_tempdir(session, template=None):

Calls `mktemp -d`, refer to `man` for more info.
"""
cmd = f'mktemp -d {template}' if template is not None else 'mktemp -d'
cmd = f"mktemp -d {template}" if template is not None else "mktemp -d"
status, output = session.cmd_status_output(cmd)
_, output = _process_status_output(cmd, status, output)
return output
Expand All @@ -355,7 +362,7 @@ def make_tempfile(session, template=None):

Calls `mktemp`, refer to `man` for more info.
"""
cmd = f'mktemp {template}' if template is not None else 'mktemp'
cmd = f"mktemp {template}" if template is not None else "mktemp"
status, output = session.cmd_status_output(cmd)
_, output = _process_status_output(cmd, status, output)
return output
Expand All @@ -377,7 +384,7 @@ def cat(session, filename, quote_path=True, flags=""):
Should only be used for very small files without tabs or other fancy
contents. Otherwise, it is better to download the file or use some other method.
"""
cmd = f'cat {flags} {quote(filename)}' if quote_path else f'cat {flags} {filename}'
cmd = f"cat {flags} {quote(filename)}" if quote_path else f"cat {flags} {filename}"
status, output = session.cmd_status_output(cmd)
_, output = _process_status_output(cmd, status, output)
return output
Expand Down Expand Up @@ -486,15 +493,16 @@ def hash_file(session, filename, size="", method="md5"):
if output:
output = output.strip()
if status != 0 or not output:
raise RuntimeError(f'Could not hash {filename} using {cmd}: {output}')
raise RuntimeError(f"Could not hash {filename} using {cmd}: {output}")

# parse output
hash_str = output.split(maxsplit=1)[0].lower()

# check that all chars are hex
if hash_str.strip('0123456789abcdef'):
raise RuntimeError('Resulting hash string has unexpected characters: '
+ hash_str)
if hash_str.strip("0123456789abcdef"):
raise RuntimeError(
f"Resulting hash string has unexpected characters: {hash_str}"
)
return hash_str


Expand All @@ -509,6 +517,6 @@ def extract_tarball(session, tarball, target_dir, flags="-ap"):
:param str flags: extra flags passed to ``tar`` on the command line
:raises: :py:class:`RuntimeError` if tar command returned non-null
"""
cmd = f'tar -C {quote(target_dir)} {flags} -xf {quote(tarball)}'
cmd = f"tar -C {quote(target_dir)} {flags} -xf {quote(tarball)}"
status, output = session.cmd_status_output(cmd)
_process_status_output(cmd, status, output)
87 changes: 56 additions & 31 deletions aexpect/ops_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def ps_cmd(session, script, timeout=60):

This function was slightly based on a similar function from `pywinrm`.
"""

def nicely_log_str(header, string):
LOG.info(header)
LOG.info("-" * len(header))
Expand All @@ -78,12 +79,14 @@ def nicely_log_str(header, string):
# must use utf16 little endian on Windows
encoded_cmd = b64encode(script.encode("utf_16_le")).decode("ascii")
try:
output = session.cmd(f"powershell -NoLogo -NonInteractive -OutputFormat text "
f"-ExecutionPolicy Bypass -EncodedCommand {encoded_cmd}",
timeout=timeout)
output = session.cmd(
f"powershell -NoLogo -NonInteractive -OutputFormat text "
f"-ExecutionPolicy Bypass -EncodedCommand {encoded_cmd}",
timeout=timeout,
)

# there was an error but the exit code was still zero
if output.startswith("#< CLIXML") and "<S S=\"Error\">" in output:
if output.startswith("#< CLIXML") and '<S S="Error">' in output:
raise ShellError(script, output)

# TODO: PowerShell sometimes ignores the -OutputFormat and outputs
Expand Down Expand Up @@ -113,7 +116,7 @@ def ps_file(session, filename, timeout=60):
:returns: the output of the command
:rtype: str
"""
cmd = f"powershell -ExecutionPolicy RemoteSigned -File \"{filename}\""
cmd = f'powershell -ExecutionPolicy RemoteSigned -File "{filename}"'
LOG.info("Executing PowerShell file with `%s`", cmd)
return session.cmd(cmd, timeout=timeout)

Expand All @@ -132,7 +135,7 @@ def _clean_error_message(message):

output = []
# the strings are between <S S="Error">...</S> tags
for msgstr in message.split("<S S=\"Error\">"):
for msgstr in message.split('<S S="Error">'):
if not msgstr.endswith("</S>"):
continue
msgstr = re.sub(r"</S>$", "", msgstr).replace("_x000D__x000A_", "")
Expand Down Expand Up @@ -203,10 +206,16 @@ def query_registry(session, key_name, value_name):
:returns: value of the corresponding sub key and value name or None if not found
:rtype: str or None
"""
key_types = ["REG_SZ", "REG_MULTI_SZ", "REG_EXPAND_SZ",
"REG_DWORD", "REG_BINARY", "REG_NONE"]
key_types = [
"REG_SZ",
"REG_MULTI_SZ",
"REG_EXPAND_SZ",
"REG_DWORD",
"REG_BINARY",
"REG_NONE",
]

out = session.cmd_output(f"reg query \"{key_name}\" /v {value_name}").strip()
out = session.cmd_output(f'reg query "{key_name}" /v {value_name}').strip()
LOG.info("Reg query for key %s and value %s: %s", key_name, value_name, out)
for k in key_types:
parts = out.split(k)
Expand All @@ -216,7 +225,9 @@ def query_registry(session, key_name, value_name):
return None


def add_reg_key(session, path, name, value, key_type, force=True): # pylint: disable=R0913
def add_reg_key(
session, path, name, value, key_type, force=True
): # pylint: disable=R0913
"""
Wrapper around the reg add command.

Expand All @@ -231,7 +242,7 @@ def add_reg_key(session, path, name, value, key_type, force=True): # pylint: d
if not isinstance(key_type, RegistryKeyType):
raise TypeError(f"{key_type} must be instance of RegistryKeyType")

cmd = f"reg add \"{path}\" /v {name} /d {value} /t {key_type.value}"
cmd = f'reg add "{path}" /v {name} /d {value} /t {key_type.value}'
if force:
cmd += " /f"

Expand Down Expand Up @@ -269,7 +280,7 @@ def path_exists(session, path):
:returns: whether the given path exists
:rtype: str
"""
output = session.cmd_output(f"if exist \"{path}\" echo yes")
output = session.cmd_output(f'if exist "{path}" echo yes')
return output.strip() == "yes"


Expand All @@ -283,7 +294,7 @@ def hash_file(session, filename):
:returns: hash value
:rtype: str
"""
output = session.cmd(f"certutil -hashfile \"{filename}\" MD5")
output = session.cmd(f'certutil -hashfile "{filename}" MD5')
LOG.debug("certutil output for %s is %s", filename, output)
hash_value = output.splitlines()[1]
LOG.debug("Hash value is %s", hash_value)
Expand All @@ -302,9 +313,9 @@ def mkdir(session, path, exist_ok=True):
exists and exist_ok is False
"""
if exist_ok:
cmd = f"if not exist \"{path}\" mkdir \"{path}\""
cmd = f'if not exist "{path}" mkdir "{path}"'
else:
cmd = f"mkdir \"{path}\""
cmd = f'mkdir "{path}"'
session.cmd(cmd)


Expand All @@ -318,7 +329,7 @@ def touch(session, path):
:returns: path of the file created
:rtype: str
"""
session.cmd(f"type nul > \"{path}\"")
session.cmd(f'type nul > "{path}"')
return path


Expand Down Expand Up @@ -381,7 +392,9 @@ def tempfile(session, local=True):
###############################################################################


def run_as(session, user, password, command, timeout=60, background=False): # pylint: disable=R0913
def run_as(
session, user, password, command, timeout=60, background=False
): # pylint: disable=R0913
"""
Run a command as different user.

Expand All @@ -401,17 +414,17 @@ def run_as(session, user, password, command, timeout=60, background=False): #
def _run_as(cmd_to_run):
LOG.debug("Running command `%s`", cmd_to_run)
session.sendline(cmd_to_run)
LOG.debug("Entering password `%s`", password)
LOG.debug("Entering password")
session.read_until_output_matches(["Geben Sie das Kennwort"])
session.sendline(password)

if background is True:
_run_as(f"runas /user:{user} \"{command}\"")
_run_as(f'runas /user:{user} "{command}"')
return None

outfile = tempfile(session, local=False)
# runas produces no output, so we add a redirection to the inner command
cmd = f"runas /user:{user} \"cmd.exe /c {command} > {outfile}\""
cmd = f'runas /user:{user} "cmd.exe /c {command} > {outfile}"'
_run_as(cmd)
LOG.debug("Waiting %s seconds for the command to finish", timeout)
# no need to capture anything, we redirected the output
Expand All @@ -432,7 +445,7 @@ def kill_program(session, program, kill_children=True):
:param str program: name of the program to kill
:param bool kill_children: whether to kill child processes
"""
cmd = f"taskkill /f /im \"{program}\""
cmd = f'taskkill /f /im "{program}"'
if kill_children:
cmd += " /t"
session.cmd(cmd)
Expand All @@ -452,8 +465,9 @@ def wait_process_end(session, process, timeout=30):
LOG.info("Waiting for process %s to end using PowerShell", process)

# give some extra timeout to wait for PowerShell to return
ps_cmd(session, f"Wait-Process -Name {process} -Timeout {timeout}",
timeout=timeout+5)
ps_cmd(
session, f"Wait-Process -Name {process} -Timeout {timeout}", timeout=timeout + 5
)


def kill_session(session):
Expand Down Expand Up @@ -504,7 +518,9 @@ def find_unused_port(session, start_port=10024):
:rtype: int
"""
# pylint: disable=C0301
output = ps_cmd(session, f"""
output = ps_cmd(
session,
f"""
$port = {start_port}
while($true) {{
try {{
Expand All @@ -521,7 +537,8 @@ def find_unused_port(session, start_port=10024):
}}

Write-Host "::unused=$port"
""")
""",
)
# pylint: enable=C0301
port = re.search(r"::unused=(\d+)", output).group(1)
return int(port)
Expand All @@ -540,7 +557,9 @@ def curl(session, url, proxy_address=None, proxy_port=None, insecure=False):
:rtype: (int, str)
"""
# extra indentation is not allowed within PS heredocs
skip_ssl_template = dedent("""\
# pylint: disable=C0301
skip_ssl_template = dedent(
"""\
Add-Type -Language CSharp @"
namespace System.Net
{
Expand All @@ -555,22 +574,28 @@ def curl(session, url, proxy_address=None, proxy_port=None, insecure=False):
}
}
"@;
[System.Net.Util]::Init()""")
[System.Net.Util]::Init()"""
)

# TODO: WebRequest (wrapper on top of System.Net.WebClient) is painfully slow
# for some reason compared to the 3rd-party curl.exe tool, but let's use it for now.
webrequest_cmd = f"Invoke-WebRequest -Uri \"{url}\""
webrequest_cmd = f'Invoke-WebRequest -Uri "{url}"'
if proxy_address is not None:
# the session user must have access to the proxy (otherwise use -ProxyCredential)
webrequest_cmd += f" -Proxy http://{proxy_address}:{proxy_port} -ProxyUseDefaultCredentials"
webrequest_cmd += (
f" -Proxy http://{proxy_address}:{proxy_port} -ProxyUseDefaultCredentials"
)

output = ps_cmd(session, f"""
output = ps_cmd(
session,
f"""
{skip_ssl_template if insecure else ''}
$ProgressPreference = 'SilentlyContinue'
$result = {webrequest_cmd}
Write-Output $result.StatusCode
Write-Output $result.Content
""")
""",
)

LOG.debug("Powershell request produced output:\n'%s'", output)
try:
Expand Down
Loading