diff --git a/changelog.d/bug.75e40e8c.entry.yaml b/changelog.d/bug.75e40e8c.entry.yaml new file mode 100644 index 0000000..7e96c9d --- /dev/null +++ b/changelog.d/bug.75e40e8c.entry.yaml @@ -0,0 +1,6 @@ +message: Get rid of using thread in AsyncFakePopen as it causes thread.join() to hang + indefinitely. +pr_ids: +- '169' +timestamp: 1727847419 +type: bug diff --git a/pytest_subprocess/fake_popen.py b/pytest_subprocess/fake_popen.py index acd1d4d..4a376fb 100644 --- a/pytest_subprocess/fake_popen.py +++ b/pytest_subprocess/fake_popen.py @@ -2,6 +2,7 @@ import asyncio import collections.abc +import concurrent.futures import io import os import signal @@ -69,14 +70,14 @@ def __init__( self.args = command self.__stdout: OPTIONAL_TEXT_OR_ITERABLE = stdout self.__stderr: OPTIONAL_TEXT_OR_ITERABLE = stderr - self.__returncode: Optional[int] = returncode - self.__wait: Optional[float] = wait self.__thread: Optional[Thread] = None - self.__callback: Optional[Optional[Callable]] = callback - self.__callback_kwargs: Optional[Dict[str, AnyType]] = callback_kwargs self.__signal_callback: Optional[Callable] = signal_callback self.__stdin_callable: Optional[Optional[Callable]] = stdin_callable self._signals: List[int] = [] + self._returncode: Optional[int] = returncode + self._wait_timeout: Optional[float] = wait + self._callback: Optional[Optional[Callable]] = callback + self._callback_kwargs: Optional[Dict[str, AnyType]] = callback_kwargs def __enter__(self) -> "FakePopen": return self @@ -116,8 +117,8 @@ def _finalize_thread(self, timeout: Optional[float]) -> None: if self.__thread is None: return self.__thread.join(timeout) - if self.returncode is None and self.__returncode is not None: - self.returncode = self.__returncode + if self.returncode is None and self._returncode is not None: + self.returncode = self._returncode if self.__thread.exception: raise self.__thread.exception @@ -133,8 +134,8 @@ def poll(self) -> Optional[int]: return self.returncode def wait(self, timeout: Optional[float] = None) -> int: - if timeout and self.__wait and timeout < self.__wait: - self.__wait -= timeout + if timeout and self._wait_timeout and timeout < self._wait_timeout: + self._wait_timeout -= timeout raise subprocess.TimeoutExpired(self.args, timeout) self._finalize_thread(timeout) if self.returncode is None: @@ -285,21 +286,21 @@ def _wait(self, wait_period: float) -> None: def run_thread(self) -> None: """Run the user-defined callback or wait in a thread.""" - if self.__wait is None and self.__callback is None: + if self._wait_timeout is None and self._callback is None: self._finish_process() else: - if self.__callback: + if self._callback: self.__thread = Thread( - target=self.__callback, + target=self._callback, args=(self,), - kwargs=self.__callback_kwargs or {}, + kwargs=self._callback_kwargs or {}, ) else: - self.__thread = Thread(target=self._wait, args=(self.__wait,)) + self.__thread = Thread(target=self._wait, args=(self._wait_timeout,)) self.__thread.start() def _finish_process(self) -> None: - self.returncode = self.__returncode + self.returncode = self._returncode self._finalize_streams() @@ -335,16 +336,20 @@ async def communicate( # type: ignore # feed eof one more time as streams were opened self._finalize_streams() - - self._finalize_thread(timeout) - + await self._finalize(timeout) return ( await self.stdout.read() if self.stdout else None, await self.stderr.read() if self.stderr else None, ) async def wait(self, timeout: Optional[float] = None) -> int: # type: ignore - return super().wait(timeout) + if timeout and self._wait_timeout and timeout < self._wait_timeout: + self._wait_timeout -= timeout + raise subprocess.TimeoutExpired(self.args, timeout) + await self._finalize(timeout) + if self.returncode is None: + raise exceptions.PluginInternalError + return self.returncode def _get_empty_buffer(self, _: bool) -> asyncio.StreamReader: return asyncio.StreamReader() @@ -362,3 +367,34 @@ async def _reopen_stream( fresh_stream.feed_data(data) return fresh_stream return None + + def run_thread(self) -> None: + """Async impl should not contain any thread based implementation""" + + def evaluate(self) -> None: + """Check if process needs to be finished.""" + if self._wait_timeout is None and self._callback is None: + self._finish_process() + + async def _run_callback_in_executor(self) -> None: + """Run in executor the user-defined callback or wait.""" + loop = asyncio.get_running_loop() + with concurrent.futures.ThreadPoolExecutor() as pool: + if self._callback: + kwargs = self._callback_kwargs or {} + cbk = partial(self._callback, **kwargs) + await loop.run_in_executor(pool, cbk, self) + elif self._wait_timeout is not None: + await loop.run_in_executor(pool, self._wait, self._wait_timeout) + + async def _finalize(self, timeout: Optional[float] = None) -> None: + """Run the user-defined callback or wait. Finish process""" + if self.returncode is not None: + return + if timeout is not None: + await asyncio.wait_for(self._run_callback_in_executor(), timeout=timeout) + else: + await self._run_callback_in_executor() + if self.returncode is None: + self.returncode = self._returncode + self._finalize_streams() diff --git a/pytest_subprocess/process_dispatcher.py b/pytest_subprocess/process_dispatcher.py index 0545a96..35f3da2 100644 --- a/pytest_subprocess/process_dispatcher.py +++ b/pytest_subprocess/process_dispatcher.py @@ -162,7 +162,7 @@ async def _call_async( result = cls._prepare_instance(AsyncFakePopen, command, kwargs, process) if not isinstance(result, AsyncFakePopen): raise exceptions.PluginInternalError - result.run_thread() + result.evaluate() return result @classmethod diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py index 7d4cafd..b77adc1 100644 --- a/tests/test_asyncio.py +++ b/tests/test_asyncio.py @@ -338,10 +338,7 @@ async def test_popen_recorder(fp): pytest.param(None, id="no-callback"), pytest.param( lambda process: process, - id="noop-callback-causes-infinite-loop", - marks=pytest.mark.xfail( - strict=True, raises=asyncio.TimeoutError, reason="Github #120" - ), + id="with-callback", ), ], ) @@ -353,15 +350,38 @@ async def my_async_func(): stderr=asyncio.subprocess.PIPE, ) await process.wait() - - # This reads forever when passing a callback to fp.register - # Add a timeout to abort test when condition occurs. - return await asyncio.wait_for(process.stdout.read(), timeout=1) + return await process.stdout.read() fp.register(["test"], stdout=b"fizz", callback=callback) assert await my_async_func() == b"fizz" +@pytest.mark.asyncio +async def test_asyncio_subprocess_using_communicate_with_callback_kwargs(fp): + expected_some_value = 2 + + def cbk(fake_obj, some_value=None): + assert expected_some_value == some_value + return fake_obj + + async def my_async_func(): + process = await asyncio.create_subprocess_exec( + "test", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + out, _ = await process.communicate() + return out + + fp.register( + ["test"], + stdout=b"fizz", + callback=cbk, + callback_kwargs={"some_value": expected_some_value}, + ) + assert await my_async_func() == b"fizz" + + @pytest.fixture(autouse=True) def skip_on_pypy(): """Async test for some reason crash on pypy 3.6 on Windows"""