Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform FD capturing even if the FD is invalid #7091

Merged
merged 1 commit into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/7091.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
When ``fd`` capturing is used, through ``--capture=fd`` or the ``capfd`` and
``capfdbinary`` fixtures, and the file descriptor (0, 1, 2) cannot be
duplicated, FD capturing is still performed. Previously, direct writes to the
file descriptors would fail or be lost in this case.
86 changes: 48 additions & 38 deletions src/_pytest/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,49 +513,57 @@ class FDCaptureBinary:

def __init__(self, targetfd, tmpfile=None):
self.targetfd = targetfd

try:
self.targetfd_save = os.dup(self.targetfd)
os.fstat(targetfd)
except OSError:
self.start = lambda: None
self.done = lambda: None
# FD capturing is conceptually simple -- create a temporary file,
# redirect the FD to it, redirect back when done. But when the
# target FD is invalid it throws a wrench into this loveley scheme.
#
# Tests themselves shouldn't care if the FD is valid, FD capturing
# should work regardless of external circumstances. So falling back
# to just sys capturing is not a good option.
#
# Further complications are the need to support suspend() and the
# possibility of FD reuse (e.g. the tmpfile getting the very same
# target FD). The following approach is robust, I believe.
self.targetfd_invalid = os.open(os.devnull, os.O_RDWR)
os.dup2(self.targetfd_invalid, targetfd)
else:
self.targetfd_invalid = None
self.targetfd_save = os.dup(targetfd)

if targetfd == 0:
assert not tmpfile, "cannot set tmpfile with stdin"
tmpfile = open(os.devnull)
self.syscapture = SysCapture(targetfd)
else:
self.start = self._start
self.done = self._done
if targetfd == 0:
assert not tmpfile, "cannot set tmpfile with stdin"
tmpfile = open(os.devnull)
self.syscapture = SysCapture(targetfd)
if tmpfile is None:
tmpfile = EncodedFile(
TemporaryFile(buffering=0),
encoding="utf-8",
errors="replace",
write_through=True,
)
if targetfd in patchsysdict:
self.syscapture = SysCapture(targetfd, tmpfile)
else:
if tmpfile is None:
tmpfile = EncodedFile(
TemporaryFile(buffering=0),
encoding="utf-8",
errors="replace",
write_through=True,
)
if targetfd in patchsysdict:
self.syscapture = SysCapture(targetfd, tmpfile)
else:
self.syscapture = NoCapture()
self.tmpfile = tmpfile
self.tmpfile_fd = tmpfile.fileno()
self.syscapture = NoCapture()
self.tmpfile = tmpfile

def __repr__(self):
return "<{} {} oldfd={} _state={!r} tmpfile={}>".format(
return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format(
self.__class__.__name__,
self.targetfd,
getattr(self, "targetfd_save", "<UNSET>"),
self.targetfd_save,
self._state,
hasattr(self, "tmpfile") and repr(self.tmpfile) or "<UNSET>",
self.tmpfile,
)

def _start(self):
def start(self):
""" Start capturing on targetfd using memorized tmpfile. """
try:
os.fstat(self.targetfd_save)
except (AttributeError, OSError):
raise ValueError("saved filedescriptor not valid anymore")
os.dup2(self.tmpfile_fd, self.targetfd)
os.dup2(self.tmpfile.fileno(), self.targetfd)
self.syscapture.start()
self._state = "started"

Expand All @@ -566,12 +574,15 @@ def snap(self):
self.tmpfile.truncate()
return res

def _done(self):
def done(self):
""" stop capturing, restore streams, return original capture file,
seeked to position zero. """
targetfd_save = self.__dict__.pop("targetfd_save")
os.dup2(targetfd_save, self.targetfd)
os.close(targetfd_save)
os.dup2(self.targetfd_save, self.targetfd)
os.close(self.targetfd_save)
if self.targetfd_invalid is not None:
if self.targetfd_invalid != self.targetfd:
os.close(self.targetfd)
os.close(self.targetfd_invalid)
self.syscapture.done()
self.tmpfile.close()
self._state = "done"
Expand All @@ -583,7 +594,7 @@ def suspend(self):

def resume(self):
self.syscapture.resume()
os.dup2(self.tmpfile_fd, self.targetfd)
os.dup2(self.tmpfile.fileno(), self.targetfd)
self._state = "resumed"

def writeorg(self, data):
Expand All @@ -609,8 +620,7 @@ def snap(self):

def writeorg(self, data):
""" write to original file descriptor. """
data = data.encode("utf-8") # XXX use encoding of original stream
os.write(self.targetfd_save, data)
super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream


class SysCaptureBinary:
Expand Down
48 changes: 43 additions & 5 deletions testing/test_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,8 +943,8 @@ def test_simple_resume_suspend(self):
pytest.raises(AttributeError, cap.suspend)

assert repr(cap) == (
"<FDCapture 1 oldfd=<UNSET> _state='done' tmpfile={!r}>".format(
cap.tmpfile
"<FDCapture 1 oldfd={} _state='done' tmpfile={!r}>".format(
cap.targetfd_save, cap.tmpfile
)
)
# Should not crash with missing "_old".
Expand Down Expand Up @@ -1150,6 +1150,7 @@ def test_stdcapture_fd_invalid_fd(self, testdir):
testdir.makepyfile(
"""
import os
from fnmatch import fnmatch
from _pytest import capture

def StdCaptureFD(out=True, err=True, in_=True):
Expand All @@ -1158,26 +1159,63 @@ def StdCaptureFD(out=True, err=True, in_=True):
def test_stdout():
os.close(1)
cap = StdCaptureFD(out=True, err=False, in_=False)
assert repr(cap.out) == "<FDCapture 1 oldfd=<UNSET> _state=None tmpfile=<UNSET>>"
assert fnmatch(repr(cap.out), "<FDCapture 1 oldfd=* _state=None tmpfile=*>")
cap.start_capturing()
os.write(1, b"stdout")
assert cap.readouterr() == ("stdout", "")
cap.stop_capturing()

def test_stderr():
os.close(2)
cap = StdCaptureFD(out=False, err=True, in_=False)
assert repr(cap.err) == "<FDCapture 2 oldfd=<UNSET> _state=None tmpfile=<UNSET>>"
assert fnmatch(repr(cap.err), "<FDCapture 2 oldfd=* _state=None tmpfile=*>")
cap.start_capturing()
os.write(2, b"stderr")
assert cap.readouterr() == ("", "stderr")
cap.stop_capturing()

def test_stdin():
os.close(0)
cap = StdCaptureFD(out=False, err=False, in_=True)
assert repr(cap.in_) == "<FDCapture 0 oldfd=<UNSET> _state=None tmpfile=<UNSET>>"
assert fnmatch(repr(cap.in_), "<FDCapture 0 oldfd=* _state=None tmpfile=*>")
cap.stop_capturing()
"""
)
result = testdir.runpytest_subprocess("--capture=fd")
assert result.ret == 0
assert result.parseoutcomes()["passed"] == 3

def test_fdcapture_invalid_fd_with_fd_reuse(self, testdir):
with saved_fd(1):
os.close(1)
cap = capture.FDCaptureBinary(1)
cap.start()
os.write(1, b"started")
cap.suspend()
os.write(1, b" suspended")
cap.resume()
os.write(1, b" resumed")
assert cap.snap() == b"started resumed"
cap.done()
with pytest.raises(OSError):
os.write(1, b"done")

def test_fdcapture_invalid_fd_without_fd_reuse(self, testdir):
with saved_fd(1), saved_fd(2):
os.close(1)
os.close(2)
cap = capture.FDCaptureBinary(2)
cap.start()
os.write(2, b"started")
cap.suspend()
os.write(2, b" suspended")
cap.resume()
os.write(2, b" resumed")
assert cap.snap() == b"started resumed"
cap.done()
with pytest.raises(OSError):
os.write(2, b"done")


def test_capture_not_started_but_reset():
capsys = StdCapture()
Expand Down