diff --git a/changelog/13322.bugfix.rst b/changelog/13322.bugfix.rst new file mode 100644 index 00000000000..c20d2ac6736 --- /dev/null +++ b/changelog/13322.bugfix.rst @@ -0,0 +1 @@ +Fixed an issue where a plugin hook (e.g. ``pytest_sessionfinish`` or ``pytest_terminal_summary``) that called ``CaptureManager.suspend_global_capture(in_=True)`` from a context where ``out`` and ``err`` were already suspended would have its subsequent ``CaptureManager.resume_global_capture()`` unexpectedly re-enable stdout/stderr capture, causing terminal-width and capture-state inconsistencies. diff --git a/src/_pytest/capture.py b/src/_pytest/capture.py index 6d98676be5f..b9ba9021d15 100644 --- a/src/_pytest/capture.py +++ b/src/_pytest/capture.py @@ -301,6 +301,9 @@ def buffer(self) -> BinaryIO: class CaptureBase(abc.ABC, Generic[AnyStr]): EMPTY_BUFFER: AnyStr + # Set by stateful subclasses in ``__init__``; left as ``None`` on + # stateless implementations such as ``NoCapture``. + _state: str | None = None @abc.abstractmethod def __init__(self, fd: int) -> None: @@ -330,6 +333,10 @@ def writeorg(self, data: AnyStr) -> None: def snap(self) -> AnyStr: raise NotImplementedError() + def is_started(self) -> bool: + """Whether actively capturing -- not initialized, suspended, or done.""" + return self._state == "started" + patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"} @@ -627,7 +634,11 @@ class CaptureResult( class MultiCapture(Generic[AnyStr]): _state = None - _in_suspended = False + # Snapshot of (out_started, err_started) captured at the moment of the + # first nested ``suspend_capturing(in_=True)`` call. ``None`` when no + # nested suspend is in progress; non-``None`` iff the matching + # ``resume_capturing`` still needs to restore that prior state (#13322). + _pre_suspend_state: tuple[bool, bool] | None = None def __init__( self, @@ -640,10 +651,13 @@ def __init__( self.err: CaptureBase[AnyStr] | None = err def __repr__(self) -> str: - return ( + base = ( f"" + f"_state={self._state!r}" ) + if self._pre_suspend_state is not None: + base += f" _pre_suspend_state={self._pre_suspend_state!r}" + return base + ">" def start_capturing(self) -> None: self._state = "started" @@ -666,25 +680,57 @@ def pop_outerr_to_orig(self) -> tuple[AnyStr, AnyStr]: return out, err def suspend_capturing(self, in_: bool = False) -> None: + """Suspend out/err (and ``in_`` when ``in_=True``). + + When ``in_=True`` and no nested suspend is already in progress, the + prior started/suspended state of out and err is snapshotted so that + the matching :meth:`resume_capturing` restores them to that state + instead of unconditionally restarting them. This supports plugin + hooks that suspend capture once for out/err and then nest a second + ``in_=True`` suspend to read user input -- without that snapshot, + the resume would re-enable out/err even though the caller intended + them to remain suspended (#13322). + + The contract is that ``suspend_capturing(in_=True)`` and + :meth:`resume_capturing` are called as a matched pair; nesting is + single-depth, not arbitrary. + """ self._state = "suspended" + if in_ and self.in_ is not None and self._pre_suspend_state is None: + self._pre_suspend_state = ( + self.out is not None and self.out.is_started(), + self.err is not None and self.err.is_started(), + ) + self.in_.suspend() if self.out: self.out.suspend() if self.err: self.err.suspend() - if in_ and self.in_: - self.in_.suspend() - self._in_suspended = True def resume_capturing(self) -> None: + """Resume out/err (and ``in_`` if a matching nested suspend is active). + + If the matching :meth:`suspend_capturing` was called with + ``in_=True``, the snapshot taken at that point determines whether + out/err are restored to ``started`` -- streams that were already + suspended before the nested suspend remain suspended (#13322). + Otherwise, out/err are unconditionally resumed (the original + behaviour). + """ self._state = "started" - if self.out: - self.out.resume() - if self.err: - self.err.resume() - if self._in_suspended: + snap = self._pre_suspend_state + if snap is not None: assert self.in_ is not None self.in_.resume() - self._in_suspended = False + out_was_started, err_was_started = snap + self._pre_suspend_state = None + else: + out_was_started = self.out is not None + err_was_started = self.err is not None + if out_was_started and self.out is not None: + self.out.resume() + if err_was_started and self.err is not None: + self.err.resume() def stop_capturing(self) -> None: """Stop capturing and reset capturing streams.""" diff --git a/testing/test_capture.py b/testing/test_capture.py index 11fd18f08ff..265b5bc8f41 100644 --- a/testing/test_capture.py +++ b/testing/test_capture.py @@ -101,6 +101,67 @@ def test_init_capturing(self): finally: capouter.stop_capturing() + @pytest.mark.parametrize("method", ["sys", "fd"]) + def test_suspend_in_preserves_out_err_suspended_state(self, method) -> None: + """suspend_global_capture(in_=True) + resume_global_capture() must + not re-enable out/err capture if they were already suspended (#13322). + """ + capouter = StdCaptureFD() + try: + capman = CaptureManager(method) + capman.start_global_capturing() + mc = capman._global_capturing + assert mc is not None + assert mc.out is not None + assert mc.err is not None + assert mc.in_ is not None + + capman.suspend_global_capture(in_=False) + assert not mc.out.is_started() + assert not mc.err.is_started() + assert mc.in_.is_started() + + capman.suspend_global_capture(in_=True) + assert not mc.in_.is_started() + + capman.resume_global_capture() + assert not mc.out.is_started() + assert not mc.err.is_started() + assert mc.in_.is_started() + + capman.stop_global_capturing() + finally: + capouter.stop_capturing() + + @pytest.mark.parametrize("method", ["sys", "fd"]) + def test_suspend_in_restores_out_err_started_state(self, method) -> None: + """suspend_global_capture(in_=True) + resume_global_capture() restores + out/err to started when that was their state before the suspend (#13322). + """ + capouter = StdCaptureFD() + try: + capman = CaptureManager(method) + capman.start_global_capturing() + mc = capman._global_capturing + assert mc is not None + assert mc.out is not None + assert mc.err is not None + assert mc.in_ is not None + + capman.suspend_global_capture(in_=True) + assert not mc.out.is_started() + assert not mc.err.is_started() + assert not mc.in_.is_started() + + capman.resume_global_capture() + assert mc.out.is_started() + assert mc.err.is_started() + assert mc.in_.is_started() + + capman.stop_global_capturing() + finally: + capouter.stop_capturing() + @pytest.mark.parametrize("method", ["fd", "sys"]) def test_capturing_unicode(pytester: Pytester, method: str) -> None: @@ -1706,6 +1767,51 @@ def test_logging(): result.stdout.no_fnmatch_line("*during collection*") +def test_nested_suspend_in_preserves_outer_suspended_state( + pytester: Pytester, +) -> None: + """Black-box regression for #13322. + + A plugin hook that suspends out/err first and then nests a + ``suspend_global_capture(in_=True)`` must observe that the matching + ``resume_global_capture()`` leaves out/err in their prior suspended + state, so that prints from the hook after the resume reach the real + terminal *immediately* (i.e. before the summary line). + + On the pre-fix implementation, ``MARK_AFTER_RESUME`` would be + re-captured by the buggy resume and only flushed to real stdout at + ``stop_global_capturing`` cleanup time, which runs after + ``pytest_unconfigure`` and therefore after the ``N passed`` summary + line. Asserting the marker appears *before* the summary line is the + deterministic discriminator. + """ + pytester.makeconftest(""" + import pytest + + + def pytest_terminal_summary(config): + capture = config.pluginmanager.getplugin("capturemanager") + capture.suspend_global_capture(in_=False) + print("MARK_BEFORE_NESTED") + capture.suspend_global_capture(in_=True) + capture.resume_global_capture() + print("MARK_AFTER_RESUME") + """) + pytester.makepyfile("def test_x(): pass") + result = pytester.runpytest_subprocess() + # Both marks must appear in order, AND both must appear before the + # final summary line. On the buggy code MARK_AFTER_RESUME would only + # be flushed at CaptureManager teardown -- i.e. after "1 passed". + result.stdout.fnmatch_lines( + [ + "*MARK_BEFORE_NESTED*", + "*MARK_AFTER_RESUME*", + "*1 passed*", + ] + ) + assert result.ret == 0 + + def test_libedit_workaround(pytester: Pytester) -> None: pytester.makeconftest(""" import pytest