Skip to content

Commit

Permalink
Merge pull request #6 from lfse-slafleur/types_docs_small_fix
Browse files Browse the repository at this point in the history
Add typehints, missing documentation & formatting
  • Loading branch information
sloisel authored Jul 25, 2024
2 parents dfb0046 + efaa6cb commit 78898ec
Showing 1 changed file with 214 additions and 95 deletions.
309 changes: 214 additions & 95 deletions streamcapture/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
`monkeypatch` optional parameter to the constructor. When enabled, the workaround
overwrites `stream.write(...)` by an implementation that sends everything to `os.write(self.fd,...)`.
This workaround is enabled when `monkeypatch=True` and disabled when `monkeypatch=False`.
The default is `monkeypatch=None`, in which case monkeypatching is enabled only when
The default is `monkeypatch=None`, in which case monkeypatching is enabled only when
`platform.system()=='Windows'`.
When writing to multiple streams and file descriptors, sometimes the order in which the writes
Expand Down Expand Up @@ -99,111 +99,230 @@
import sys, streamcapture
writer = streamcapture.Writer(open('logfile.txt','wb'),2)
with streamcapture.StreamCapture(sys.stdout,writer), streamcapture.StreamCapture(sys.stderr,writer):
print("This goes to stdout and is captured to logfile.txt")
print("This goes to stderr and is also captured to logfile.txt",file=sys.stderr)
print("This goes to stdout and is captured to logfile.txt")
print("This goes to stderr and is also captured to logfile.txt",file=sys.stderr)
```
In the above example, writer will be closed twice: once from the `StreamCapture(sys.stdout,...)`
object, and once from the `StreamCapture(sys.stderr,...)` object. Correspondingly, the `count` parameter
of the `streamcapture.Writer` was set to `2`, so that the underlying stream is only closed after 2
calls to `writer.close()`.
"""
import io
import os, threading, platform
from types import TracebackType
from typing import Optional, Callable, Union, Type, TextIO

import os, sys, threading, platform, select

class Writer:
def __init__(self,stream,count = None,lock_write = False):
"""`Writer` constructor."""
(self.stream,self.lock_write) = (stream,lock_write)
if count is None:
(self.count,self.increment) = (0,1)
else:
(self.count,self.increment) = (count,0)
self.lock = threading.Lock()
self._write = self.locked_write if lock_write else stream.write
def write_from(self,data,cap):
self._write(data)
def writer_open(self):
with self.lock:
self.count += self.increment
def close(self):
"""When one is done using a `Writer`, one calls `Writer.close()`. This acquires `Writer.lock` so it is
thread-safe. Each time `Writer.close()` is called, `Writer.count` is decremented. When `Writer.count`
reaches `0`, `stream.close()` is called."""
with self.lock:
self.count -= 1
if self.count>0:
return
self.stream.close()
def locked_write(self,z):
with self.lock:
self.stream.write(z)
stream: io.IOBase
count: int
increment: int
lock: threading.Lock
_write: Callable[[bytes], int]

def __init__(self, stream: io.IOBase, count: Optional[int] = None, lock_write: bool = False):
"""`Writer` constructor.
Wrapper of a stream to which bytes may be written. Introduces an optional lock for which write which
may be enabled through `lock_write`.
:param stream: The stream to wrap.
:param count: The starting number of users of this writer.
:param lock_write: Grab the lock before each write operation.
"""
(self.stream, self.lock_write) = (stream, lock_write)
if count is None:
(self.count, self.increment) = (0, 1)
else:
(self.count, self.increment) = (count, 0)
self.lock = threading.Lock()
self._write = self.locked_write if lock_write else stream.write # type: ignore[assignment]

def write_from(self, data: bytes, cap: 'FDCapture') -> int:
"""Perform a write operation.
:param data: The bytes to write.
:param cap: Unused. Remains for legacy purposes.
:return: The amount of bytes written.
"""
return self._write(data)

def writer_open(self) -> None:
"""Register that the writer is used."""
with self.lock:
self.count += self.increment

def close(self) -> None:
"""Closes the writer and the underlying stream
When one is done using a `Writer`, one calls `Writer.close()`. This acquires `Writer.lock` so it is
thread-safe. Each time `Writer.close()` is called, `Writer.count` is decremented. When `Writer.count`
reaches `0`, `stream.close()` is called.
"""
with self.lock:
self.count -= 1
if self.count > 0:
return
self.stream.close()

def locked_write(self, z: bytes) -> int:
"""Perform the write operation in a thread-safe manner.
:param z: Bytes to write.
:return: Return the amount of bytes written
"""
with self.lock:
written = self.stream.write(z)
return written


class FDCapture:
def __init__(self,fd,writer,echo=True,magic=b'\x04\x81\x00\xff'):
"""`FDCapture` constructor."""
if(hasattr(writer,'writer_open')):
writer.writer_open()
(self.active, self.writer, self.fd, self.echo, self.magic) = (True,writer,fd,echo,magic)
self.write = (lambda data: self.writer.write_from(data,self)) if hasattr(writer,'write_from') else writer.write
(self.pipe_read_fd, self.pipe_write_fd) = os.pipe()
self.dup_fd = os.dup(fd)
os.dup2(self.pipe_write_fd,fd)
self.thread = threading.Thread(target=self.printer)
self.thread.start()
def printer(self):
"""This is the thread that listens to the pipe output and passes it to the writer stream."""
try:
looping = True
while looping:
data = os.read(self.pipe_read_fd,100000)
foo = data.split(self.magic)

if len(foo)>=2:
looping = False

for segment in foo:
if len(segment) == 0:
# Pipe is closed
looping = False
break
self.write(segment)
if self.echo:
os.write(self.dup_fd,segment)
finally:
os.close(self.pipe_read_fd)
def close(self):
"""When you want to "uncapture" a stream, use this method."""
if not self.active:
return
self.active = False
os.write(self.fd,self.magic)
self.thread.join()
os.dup2(self.dup_fd,self.fd)
os.close(self.pipe_write_fd)
os.close(self.dup_fd)

def __enter__(self):
return self
def __exit__(self,a,b,c):
self.close()
"""Redirect all output from a file descriptor and write it to `writer`."""

active: bool
writer: Union[io.IOBase, Writer]
fd: int
echo: bool
magic: bytes
write: Callable[[bytes], int]

pipe_read_fd: int
pipe_write_fd: int
dup_fd: int
"""Placeholder filedescriptor where the stream originally wrote to."""
thread: threading.Thread

def __init__(
self,
fd: int,
writer: Union[io.IOBase, Writer],
echo: bool,
magic: bytes = b"\x04\x81\x00\xff",
):
"""`FDCapture` constructor.
:param fd: The filedescriptor to capture.
:param writer: Any bytes received from `fd` are written to this writer.
:param echo: Enable to also write bytes received to `fd` as well.
:param magic: The magic packet which denotes that the capturing process should stop.
"""
if hasattr(writer, "writer_open"):
writer.writer_open()
(self.active, self.writer, self.fd, self.echo, self.magic) = (True, writer, fd, echo, magic)
self.write = (
(lambda data: self.writer.write_from(data, self)) # type: ignore[union-attr, assignment]
if hasattr(writer, "write_from")
else writer.write
)
(self.pipe_read_fd, self.pipe_write_fd) = os.pipe()
self.dup_fd = os.dup(fd)
os.dup2(self.pipe_write_fd, fd)
self.thread = threading.Thread(target=self.printer)
self.thread.start()

def printer(self):
"""This is the thread that listens to the pipe output and passes it to the writer stream."""
try:
looping = True
while looping:
data = os.read(self.pipe_read_fd, 100000)
foo = data.split(self.magic)

# magic segment was found in data
if len(foo) >= 2:
looping = False

for segment in foo:
# Pipe is closed
if len(segment) == 0:
looping = False
break
self.write(segment)
if self.echo:
os.write(self.dup_fd, segment)
finally:
os.close(self.pipe_read_fd)

def close(self):
"""When you want to "uncapture" a stream, use this method."""
if not self.active:
return
self.active = False

os.write(self.fd, self.magic)
self.thread.join()
os.dup2(self.dup_fd, self.fd)
os.close(self.pipe_write_fd)
os.close(self.dup_fd)

def __enter__(self):
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.close()


class StreamCapture:
def __init__(self,stream,writer,echo=True,monkeypatch=None):
"""The `StreamCapture` constructor."""
self.fdcapture = FDCapture(stream.fileno(),writer,echo)
self.stream = stream
self.monkeypatch = platform.system()=='Windows' if monkeypatch is None else monkeypatch
if self.monkeypatch:
self.oldwrite = stream.write
stream.buffer.write = lambda z: os.write(stream.fileno(), z)
def close(self):
"""When you want to "uncapture" a stream, use this method."""
self.stream.flush()
self.fdcapture.close()
if self.monkeypatch:
self.stream.write = self.oldwrite
def __enter__(self):
return self
def __exit__(self,a,b,c):
self.close()
"""Interface for users to redirect a stream to another `io.IOBase`"""

fdcapture: FDCapture
stream: Union[io.IOBase, TextIO]
monkeypatch: bool
oldwrite: Optional[Callable[[Union[bytes, str]], None]]

def __init__(
self,
stream_to_redirect: Union[io.IOBase, TextIO],
writer: io.IOBase,
echo: bool = True,
monkeypatch: Optional[bool] = None,
) -> None:
"""The `StreamCapture` constructor.
:param stream_to_redirect: Stream which will be redirected.
:param writer: The stream will be redirected to this writer. It must derive from io.IOBase.
:param echo: If the redirected stream should also write any output to the original stream.
:param monkeypatch: If monkeypatching is necessary. Default is None which will perform
the monkeypatch in case this is run on Windows. Otherwise, the value of monkeypatch
is used.
"""
self.fdcapture = FDCapture(stream_to_redirect.fileno(), writer, echo)
self.stream = stream_to_redirect
self.monkeypatch = platform.system() == "Windows" if monkeypatch is None else monkeypatch
if self.monkeypatch:
self.oldwrite = stream_to_redirect.write # type: ignore[assignment]
stream_to_redirect.write = lambda z: os.write( # type: ignore[method-assign]
stream_to_redirect.fileno(), z.encode() if hasattr(z, "encode") else z
)
else:
self.oldwrite = None

def close(self) -> None:
"""When you want to "uncapture" a stream, use this method."""
self.stream.flush()
self.fdcapture.close()
if self.monkeypatch:
self.stream.write = self.oldwrite # type: ignore[assignment,method-assign]

def __enter__(self):
"""Start the stream redirect as a contextmanager."""
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""Stop the stream redirect as a contextmanager.
Same as running StreamCapture.close()
"""
self.close()

0 comments on commit 78898ec

Please sign in to comment.