diff --git a/README.md b/README.md index f93d8b0..55ffc39 100644 --- a/README.md +++ b/README.md @@ -195,6 +195,18 @@ Get the list of docker_compose commands to be executed for test clean-up actions Override this fixture in your tests if you need to change clean-up actions. Returning anything that would evaluate to False will skip this command. +## Docker Live Output + +```python +@pytest.fixture(scope="session") +def http_service(docker_ip, docker_services): + docker_services.display_live_logs("service_name") +``` + +```bash +pytest --capture=tee-sys +``` + # Development Use of a virtual environment is recommended. See the diff --git a/src/pytest_docker/plugin.py b/src/pytest_docker/plugin.py index c0a0ecd..8f8c54e 100644 --- a/src/pytest_docker/plugin.py +++ b/src/pytest_docker/plugin.py @@ -1,3 +1,4 @@ +from concurrent.futures import Future, ThreadPoolExecutor import contextlib import os from pathlib import Path @@ -5,13 +6,16 @@ import subprocess import time import timeit -from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union +from types import TracebackType +from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union, Type import attr import pytest from _pytest.config import Config from _pytest.fixtures import FixtureRequest +_MAX_LOG_WORKERS = 100 + @pytest.fixture def container_scope_fixture(request: FixtureRequest) -> Any: @@ -40,6 +44,20 @@ def execute(command: str, success_codes: Iterable[int] = (0,), ignore_stderr: bo return output +def execute_via_run(command: str, success_codes: Iterable[int] = (0,)) -> None: + try: + process = subprocess.run(command, stderr=subprocess.STDOUT, shell=True) + returncode = process.returncode + except subprocess.CalledProcessError as error: + returncode = error.returncode + command = error.cmd + + if returncode not in success_codes: + raise Exception( + 'Command {} returned {}'.format(command, returncode) + ) + + def get_docker_ip() -> Union[str, Any]: # When talking to the Docker daemon via a UNIX socket, route all TCP # traffic to docker containers via the TCP loopback interface. @@ -59,9 +77,18 @@ def docker_ip() -> Union[str, Any]: @attr.s(frozen=True) -class Services: - _docker_compose: Any = attr.ib() - _services: Dict[Any, Dict[Any, Any]] = attr.ib(init=False, default=attr.Factory(dict)) +class Services(contextlib.AbstractContextManager): # type: ignore + _docker_compose: "DockerComposeExecutor" = attr.ib() + _services: Dict[Any, Dict[Any, Any]] = attr.ib( + init=False, default=attr.Factory(dict) + ) + _live_logs: Dict[str, Future[Any]] = attr.ib(init=False, default=attr.Factory(dict)) + _thread_pool_executor: ThreadPoolExecutor = attr.ib( + init=False, + default=attr.Factory( + lambda: ThreadPoolExecutor(max_workers=_MAX_LOG_WORKERS, thread_name_prefix="docker_") + ), + ) def port_for(self, service: str, container_port: int) -> int: """Return the "host" port for `service` and `container_port`. @@ -119,6 +146,36 @@ def wait_until_responsive( raise Exception("Timeout reached while waiting on service!") + def display_live_logs(self, service: str) -> None: + """Run `logs` command with the follow flag to show live logs of a service.""" + if service in self._live_logs: + return + + if len(self._live_logs) == _MAX_LOG_WORKERS: + raise NotImplementedError( + f"""\ +{_MAX_LOG_WORKERS} worker threads are supported to display live logs. \ +Please submit a PR if you want to change that.""" + ) + + self._live_logs[service] = self._thread_pool_executor.submit( + self._docker_compose.execute_via_run, f"logs {service} -f" + ) + + def close(self) -> None: + for _, fut in self._live_logs.items(): + _ = fut.cancel() + self._thread_pool_executor.shutdown(wait=False) + + def __exit__( + self, + _exc_type: Optional[Type[BaseException]], + __exc_value: Optional[BaseException], + __traceback: Optional[TracebackType], + ) -> None: + self.close() + return None + def str_to_list(arg: Union[str, Path, List[Any], Tuple[Any]]) -> Union[List[Any], Tuple[Any]]: if isinstance(arg, (list, tuple)): @@ -133,11 +190,17 @@ class DockerComposeExecutor: _compose_project_name: str = attr.ib() def execute(self, subcommand: str, **kwargs: Any) -> Union[bytes, Any]: + return execute(self._format_cmd(subcommand), **kwargs) + + def execute_via_run(self, subcommand: str) -> None: + execute_via_run(self._format_cmd(subcommand)) + + def _format_cmd(self, subcommand: str) -> str: command = self._compose_command for compose_file in self._compose_files: command += ' -f "{}"'.format(compose_file) command += ' -p "{}" {}'.format(self._compose_project_name, subcommand) - return execute(command, **kwargs) + return command @pytest.fixture(scope=containers_scope) @@ -213,7 +276,8 @@ def get_docker_services( try: # Let test(s) run. - yield Services(docker_compose) + with Services(docker_compose) as services: + yield services finally: # Clean up. if docker_cleanup: