From af8da67056acb8776b4260fccdbec374ba4a6898 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sun, 21 Apr 2024 14:02:52 +0000 Subject: [PATCH] count fds --- parsl/dataflow/dflow.py | 10 ++++++++-- parsl/executors/taskvine/executor.py | 2 ++ parsl/executors/workqueue/executor.py | 2 ++ parsl/log_utils.py | 6 +++--- parsl/monitoring/monitoring.py | 6 +++--- parsl/monitoring/router.py | 6 +++--- parsl/tests/conftest.py | 12 ++++++++++++ parsl/tests/test_providers/test_local_provider.py | 1 + 8 files changed, 34 insertions(+), 11 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 3f4d560a31..2919485591 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -93,7 +93,7 @@ def __init__(self, config: Config) -> None: self.run_dir = make_rundir(config.run_dir) if config.initialize_logging: - parsl.set_file_logger("{}/parsl.log".format(self.run_dir), level=logging.DEBUG) + _, self.logging_handler = parsl.set_file_logger("{}/parsl.log".format(self.run_dir), level=logging.DEBUG) logger.info("Starting DataFlowKernel with config\n{}".format(config)) @@ -1340,7 +1340,13 @@ def cleanup(self) -> None: else: logger.debug("Cleaning up non-default DFK - not unregistering") - logger.info("DFK cleanup complete") + # TODO: do this in parsl/logutils.py + logger.info("DFK cleanup complete - removing parsl.log handler") + logger_to_remove = logging.getLogger("parsl") + logger_to_remove.removeHandler(self.logging_handler) + self.logging_handler.close() + + logger.info("handler closed - is this going to break things?") def checkpoint(self, tasks: Optional[Sequence[TaskRecord]] = None) -> str: """Checkpoint the dfk incrementally to a checkpoint file. diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 6cfedf92bb..f700032f86 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -607,11 +607,13 @@ def shutdown(self, *args, **kwargs): # Join all processes before exiting logger.debug("Joining on submit process") self._submit_process.join() + self._submit_process.close() logger.debug("Joining on collector thread") self._collector_thread.join() if self.worker_launch_method == 'factory': logger.debug("Joining on factory process") self._factory_process.join() + self._factory_process.close() # Shutdown multiprocessing queues self._ready_task_queue.close() diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index e715c23891..81abe22e76 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -722,6 +722,8 @@ def shutdown(self, *args, **kwargs): logger.debug("Joining on submit process") self.submit_process.join() + self.submit_process.close() + logger.debug("Joining on collector thread") self.collector_thread.join() diff --git a/parsl/log_utils.py b/parsl/log_utils.py index ff25249b1b..b5d483b8e6 100644 --- a/parsl/log_utils.py +++ b/parsl/log_utils.py @@ -12,7 +12,7 @@ """ import io import logging -from typing import Optional +from typing import Optional, Tuple import typeguard @@ -65,7 +65,7 @@ def set_stream_logger(name: str = 'parsl', def set_file_logger(filename: str, name: str = 'parsl', level: int = logging.DEBUG, - format_string: Optional[str] = None) -> logging.Logger: + format_string: Optional[str] = None) -> Tuple[logging.Logger, logging.FileHandler]: """Add a file log handler. Args: @@ -93,4 +93,4 @@ def set_file_logger(filename: str, futures_logger = logging.getLogger("concurrent.futures") futures_logger.addHandler(handler) - return logger + return (logger, handler) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 1a58d1240c..4ac0ef43e4 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -283,9 +283,9 @@ def close(self) -> None: @wrap_with_logs def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]", run_dir: str) -> None: - logger = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir), - name="monitoring_filesystem_radio", - level=logging.INFO) + logger, _ = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir), + name="monitoring_filesystem_radio", + level=logging.INFO) logger.info("Starting filesystem radio receiver") setproctitle("parsl: monitoring filesystem receiver") diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index e4b345ca8f..b0f3122cdd 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -55,9 +55,9 @@ def __init__(self, """ os.makedirs(logdir, exist_ok=True) - self.logger = set_file_logger("{}/monitoring_router.log".format(logdir), - name="monitoring_router", - level=logging_level) + self.logger, _ = set_file_logger("{}/monitoring_router.log".format(logdir), + name="monitoring_router", + level=logging_level) self.logger.debug("Monitoring router starting") self.hub_address = hub_address diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index 59abb08e21..a7db8a4469 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -180,6 +180,9 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session): config = pytestconfig.getoption('config')[0] if config != 'local': + this_process = psutil.Process() + start_fds = this_process.num_fds() + logger.error(f"BENC: open fds: {start_fds}") assert threading.active_count() == 1, "precondition: only one thread can be running before this test: " + repr(threading.enumerate()) spec = importlib.util.spec_from_file_location('', config) @@ -211,6 +214,9 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session): assert DataFlowKernelLoader._dfk is None assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate()) + end_fds = this_process.num_fds() + logger.error(f"BENC: end open fds: {end_fds} (vs {start_fds} at start)") + assert start_fds == end_fds, "number of open fds changed across test run" else: yield @@ -273,6 +279,12 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session): logger.error(f"BENC: end open fds: {end_fds} (vs start {start_fds}") assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate()) + end_fds = this_process.num_fds() + logger.error(f"BENC: open fds END: {end_fds}") + if end_fds > start_fds: + logger.error(f"Open files (not all fds, though?): {this_process.open_files()!r}") + os.system(f"ls -l /proc/{os.getpid()}/fd") + pytest.fail("BENC: number of open fds increased across test") else: yield diff --git a/parsl/tests/test_providers/test_local_provider.py b/parsl/tests/test_providers/test_local_provider.py index 25cfb78dbf..c6844b00c0 100644 --- a/parsl/tests/test_providers/test_local_provider.py +++ b/parsl/tests/test_providers/test_local_provider.py @@ -109,6 +109,7 @@ def test_ssh_channel(): def _stop_sshd(sshd_thread): sshd_thread.stop() + sshd_thread.join() class SSHDThread(threading.Thread):