From f56c92cf2dc8d529cc8e5d732dbbc3eb77e49067 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 6 May 2024 16:07:15 +0000 Subject: [PATCH 01/15] fiddle with interchange default values there are default values in the interchange code, but they are all specified in the executor code too, so these defautls will never be used. remove them as misleading. see similar changes to process worker pool, PR #2973, for more detailed justification needs to change zmq sockets test because that assumes the arbitrary defaults are present. which is no longer the case. but if you want to initialize an interchange that requires you to specify all this stuff, and want some arbitrary values, then make those arbitrary values yourself. client address parameter is now supplied by the executor - it was not before, and so the default/hard-coded value now lives in the executor, not the interchange --- parsl/executors/high_throughput/executor.py | 3 +- .../executors/high_throughput/interchange.py | 41 ++++++++++--------- parsl/tests/test_htex/test_zmq_binding.py | 28 ++++++++++--- 3 files changed, 45 insertions(+), 27 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 39ebccecde..b623e287f0 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -532,7 +532,8 @@ def _start_local_interchange_process(self): get the worker task and result ports that the interchange has bound to. """ self.interchange_proc = ForkProcess(target=interchange.starter, - kwargs={"client_ports": (self.outgoing_q.port, + kwargs={"client_address": "127.0.0.1", + "client_ports": (self.outgoing_q.port, self.incoming_q.port, self.command_client.port), "interchange_address": self.address, diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 7034d703c7..50cfbd1b34 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -67,18 +67,19 @@ class Interchange: 3. Detect workers that have failed using heartbeats """ def __init__(self, - client_address: str = "127.0.0.1", - interchange_address: Optional[str] = None, - client_ports: Tuple[int, int, int] = (50055, 50056, 50057), - worker_ports: Optional[Tuple[int, int]] = None, - worker_port_range: Tuple[int, int] = (54000, 55000), - hub_address: Optional[str] = None, - hub_zmq_port: Optional[int] = None, - heartbeat_threshold: int = 60, - logdir: str = ".", - logging_level: int = logging.INFO, - poll_period: int = 10, - cert_dir: Optional[str] = None, + *, + client_address: str, + interchange_address: Optional[str], + client_ports: Tuple[int, int, int], + worker_ports: Optional[Tuple[int, int]], + worker_port_range: Tuple[int, int], + hub_address: Optional[str], + hub_zmq_port: Optional[int], + heartbeat_threshold: int, + logdir: str, + logging_level: int, + poll_period: int, + cert_dir: Optional[str], ) -> None: """ Parameters @@ -94,34 +95,34 @@ def __init__(self, The ports at which the client can be reached worker_ports : tuple(int, int) - The specific two ports at which workers will connect to the Interchange. Default: None + The specific two ports at which workers will connect to the Interchange. worker_port_range : tuple(int, int) The interchange picks ports at random from the range which will be used by workers. - This is overridden when the worker_ports option is set. Default: (54000, 55000) + This is overridden when the worker_ports option is set. hub_address : str The IP address at which the interchange can send info about managers to when monitoring is enabled. - Default: None (meaning monitoring disabled) + When None, monitoring is disabled. hub_zmq_port : str The port at which the interchange can send info about managers to when monitoring is enabled. - Default: None (meaning monitoring disabled) + When None, monitoring is disabled. heartbeat_threshold : int Number of seconds since the last heartbeat after which worker is considered lost. logdir : str - Parsl log directory paths. Logs and temp files go here. Default: '.' + Parsl log directory paths. Logs and temp files go here. logging_level : int - Logging level as defined in the logging module. Default: logging.INFO + Logging level as defined in the logging module. poll_period : int - The main thread polling period, in milliseconds. Default: 10ms + The main thread polling period, in milliseconds. cert_dir : str | None - Path to the certificate directory. Default: None + Path to the certificate directory. """ self.cert_dir = cert_dir self.logdir = logdir diff --git a/parsl/tests/test_htex/test_zmq_binding.py b/parsl/tests/test_htex/test_zmq_binding.py index eaf2e9731b..62925d5821 100644 --- a/parsl/tests/test_htex/test_zmq_binding.py +++ b/parsl/tests/test_htex/test_zmq_binding.py @@ -2,6 +2,7 @@ from typing import Optional from unittest import mock +import logging import psutil import pytest import zmq @@ -10,6 +11,21 @@ from parsl.executors.high_throughput.interchange import Interchange +def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[str]) -> Interchange: + return Interchange(interchange_address=interchange_address, + cert_dir=cert_dir, + client_address="127.0.0.1", + client_ports=(50055, 50056, 50057), + worker_ports=None, + worker_port_range=(54000, 55000), + hub_address=None, + hub_zmq_port=None, + heartbeat_threshold=60, + logdir=".", + logging_level=logging.INFO, + poll_period=10) + + @pytest.fixture def encrypted(request: pytest.FixtureRequest): if hasattr(request, "param"): @@ -31,7 +47,7 @@ def test_interchange_curvezmq_sockets( mock_socket: mock.MagicMock, cert_dir: Optional[str], encrypted: bool ): address = "127.0.0.1" - ix = Interchange(interchange_address=address, cert_dir=cert_dir) + ix = make_interchange(interchange_address=address, cert_dir=cert_dir) assert isinstance(ix.zmq_context, curvezmq.ServerContext) assert ix.zmq_context.encrypted is encrypted assert mock_socket.call_count == 5 @@ -40,7 +56,7 @@ def test_interchange_curvezmq_sockets( @pytest.mark.local @pytest.mark.parametrize("encrypted", (True, False), indirect=True) def test_interchange_binding_no_address(cert_dir: Optional[str]): - ix = Interchange(cert_dir=cert_dir) + ix = make_interchange(interchange_address=None, cert_dir=cert_dir) assert ix.interchange_address == "*" @@ -49,7 +65,7 @@ def test_interchange_binding_no_address(cert_dir: Optional[str]): def test_interchange_binding_with_address(cert_dir: Optional[str]): # Using loopback address address = "127.0.0.1" - ix = Interchange(interchange_address=address, cert_dir=cert_dir) + ix = make_interchange(interchange_address=address, cert_dir=cert_dir) assert ix.interchange_address == address @@ -60,7 +76,7 @@ def test_interchange_binding_with_non_ipv4_address(cert_dir: Optional[str]): # Confirm that a ipv4 address is required address = "localhost" with pytest.raises(zmq.error.ZMQError): - Interchange(interchange_address=address, cert_dir=cert_dir) + make_interchange(interchange_address=address, cert_dir=cert_dir) @pytest.mark.local @@ -69,7 +85,7 @@ def test_interchange_binding_bad_address(cert_dir: Optional[str]): """Confirm that we raise a ZMQError when a bad address is supplied""" address = "550.0.0.0" with pytest.raises(zmq.error.ZMQError): - Interchange(interchange_address=address, cert_dir=cert_dir) + make_interchange(interchange_address=address, cert_dir=cert_dir) @pytest.mark.local @@ -77,7 +93,7 @@ def test_interchange_binding_bad_address(cert_dir: Optional[str]): def test_limited_interface_binding(cert_dir: Optional[str]): """When address is specified the worker_port would be bound to it rather than to 0.0.0.0""" address = "127.0.0.1" - ix = Interchange(interchange_address=address, cert_dir=cert_dir) + ix = make_interchange(interchange_address=address, cert_dir=cert_dir) ix.worker_result_port proc = psutil.Process() conns = proc.connections(kind="tcp") From 1733deb0533b79fa9b9c20b75e75f0bb02cea735 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 6 May 2024 15:59:29 +0000 Subject: [PATCH 02/15] make interchange into a "first-order" process like process worker pool, not multiprocessing any downstream packaging will need to be aware of the presence of interchange.py as a new command-line invocable script and this might break some build instructions which do not configure installed scripts onto the path. this PR replaces keyword arguments with argparse command line parameters. it does not attempt to make those command line arguments differently-optional than the constructor of the Interchange class (for example, worker_ports and worker_port_range are both mandatory, because they are both specified before this PR) i'm somewhat uncomfortable with this seeming like an ad-hoc serialise/deserialise protocol for what was previously effecting a dict of typed python objects... but it's what process worker pool does. see issue #3373 for interchange specific issue see issue #2343 for parsl general fork vs threads issue see possibly issue #3378? --- parsl/executors/high_throughput/executor.py | 76 +++++++------ .../executors/high_throughput/interchange.py | 103 ++++++++++++++++-- setup.py | 1 + 3 files changed, 142 insertions(+), 38 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index b623e287f0..8de57d4494 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -5,8 +5,8 @@ import logging import threading import pickle +import subprocess from dataclasses import dataclass -from multiprocessing import Process from typing import Dict, Sequence from typing import List, Optional, Tuple, Union, Callable import math @@ -19,7 +19,6 @@ from parsl.app.errors import RemoteExceptionWrapper from parsl.jobs.states import JobStatus, JobState, TERMINAL_STATES from parsl.executors.high_throughput import zmq_pipes -from parsl.executors.high_throughput import interchange from parsl.executors.high_throughput.errors import CommandClientTimeoutError from parsl.executors.errors import ( BadMessage, ScalingFailed, @@ -36,7 +35,6 @@ from parsl.addresses import get_all_addresses from parsl.process_loggers import wrap_with_logs -from parsl.multiprocessing import ForkProcess from parsl.utils import RepresentationMixin from parsl.providers import LocalProvider @@ -310,7 +308,7 @@ def __init__(self, self._task_counter = 0 self.worker_ports = worker_ports self.worker_port_range = worker_port_range - self.interchange_proc: Optional[Process] = None + self.interchange_proc: Optional[subprocess.Popen] = None self.interchange_port_range = interchange_port_range self.heartbeat_threshold = heartbeat_threshold self.heartbeat_period = heartbeat_period @@ -525,32 +523,51 @@ def _queue_management_worker(self): logger.info("Queue management worker finished") - def _start_local_interchange_process(self): + def _start_local_interchange_process(self) -> None: """ Starts the interchange process locally - Starts the interchange process locally and uses an internal command queue to + Starts the interchange process locally and uses the command queue to get the worker task and result ports that the interchange has bound to. """ - self.interchange_proc = ForkProcess(target=interchange.starter, - kwargs={"client_address": "127.0.0.1", - "client_ports": (self.outgoing_q.port, - self.incoming_q.port, - self.command_client.port), - "interchange_address": self.address, - "worker_ports": self.worker_ports, - "worker_port_range": self.worker_port_range, - "hub_address": self.hub_address, - "hub_zmq_port": self.hub_zmq_port, - "logdir": self.logdir, - "heartbeat_threshold": self.heartbeat_threshold, - "poll_period": self.poll_period, - "logging_level": logging.DEBUG if self.worker_debug else logging.INFO, - "cert_dir": self.cert_dir, - }, - daemon=True, - name="HTEX-Interchange" - ) - self.interchange_proc.start() + # self.interchange_proc = ForkProcess(target=interchange.starter, + # kwargs={"client_address": "127.0.0.1", + # "client_ports": (self.outgoing_q.port, + # self.incoming_q.port, + # self.command_client.port), + # "interchange_address": self.address, + # "worker_ports": self.worker_ports, + # "worker_port_range": self.worker_port_range, + # "hub_address": self.hub_address, + # "hub_zmq_port": self.hub_zmq_port, + # "logdir": self.logdir, + # "heartbeat_threshold": self.heartbeat_threshold, + # "poll_period": self.poll_period, + # "logging_level": logging.DEBUG if self.worker_debug else logging.INFO, + # "cert_dir": self.cert_dir, + # }, + # daemon=True, + # name="HTEX-Interchange" + # ) + # self.interchange_proc.start() + + cli: List[str] = ["interchange.py", + "--client-address", "127.0.0.1", + "--client-ports", f"{self.outgoing_q.port},{self.incoming_q.port},{self.command_client.port}", + "--interchange-address", str(self.address), + "--worker-ports", f"{self.worker_ports[0]},{self.worker_ports[1]}" + if self.worker_ports else "None", + "--worker-port-range", f"{self.worker_port_range[0]},{self.worker_port_range[1]}" + if self.worker_port_range else "None", + "--hub-address", str(self.hub_address), + "--hub-zmq-port", str(self.hub_zmq_port), + "--logdir", self.logdir, + "--heartbeat-threshold", str(self.heartbeat_threshold), + "--poll-period", str(self.poll_period), + "--logging-level", str(logging.DEBUG) if self.worker_debug else str(logging.INFO), + "--cert-dir", str(self.cert_dir) + ] + logger.info(f"BENC: cli = {cli}") + self.interchange_proc = subprocess.Popen(cli) try: (self.worker_task_port, self.worker_result_port) = self.command_client.run("WORKER_PORTS", timeout_s=120) @@ -562,7 +579,7 @@ def _start_queue_management_thread(self): """Method to start the management thread as a daemon. Checks if a thread already exists, then starts it. - Could be used later as a restart if the management thread dies. + Could be used later as a restart if the management thread dies """ if self._queue_management_thread is None: logger.debug("Starting queue management thread") @@ -815,13 +832,10 @@ def shutdown(self, timeout: float = 10.0): logger.info("Attempting HighThroughputExecutor shutdown") self.interchange_proc.terminate() - self.interchange_proc.join(timeout=timeout) - if self.interchange_proc.is_alive(): + if self.interchange_proc.poll() is None: logger.info("Unable to terminate Interchange process; sending SIGKILL") self.interchange_proc.kill() - self.interchange_proc.close() - logger.info("Finished HighThroughputExecutor shutdown attempt") def get_usage_information(self): diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 50cfbd1b34..69cd5fea0c 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +import argparse import zmq import os import sys @@ -674,13 +675,101 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string: logger.addHandler(handler) -@wrap_with_logs(target="interchange") -def starter(*args: Any, **kwargs: Any) -> None: - """Start the interchange process +# @wrap_with_logs(target="interchange") +# def starter(*args: Any, **kwargs: Any) -> None: +# """Start the interchange process - The executor is expected to call this function. The args, kwargs match that of the Interchange.__init__ - """ +# The executor is expected to call this function. The args, kwargs match that of the Interchange.__init__ +# """ +# setproctitle("parsl: HTEX interchange") +# # logger = multiprocessing.get_logger() +# ic = Interchange(*args, **kwargs) +# ic.start() + +if __name__ == "__main__": setproctitle("parsl: HTEX interchange") - # logger = multiprocessing.get_logger() - ic = Interchange(*args, **kwargs) + + parser = argparse.ArgumentParser() + + parser.add_argument("--client-address", required=True, + help="Address to connect back to submitting client") + parser.add_argument("--interchange-address", required=True, + help="Address this interchange should listen on") + parser.add_argument("--client-ports", required=True, + help="Three ports on submitting client that the interchange should connect back to") + parser.add_argument("--worker-ports", required=True, + help="Two ports on this interchange that workers can connect to") + parser.add_argument("--worker-port-range", required=True, + help="Low and high port numbers that interchange will select worker ports from") + parser.add_argument("--hub-address", required=True, + help="Address to connect to send monitoring info") + parser.add_argument("--hub-zmq-port", required=True, + help="Address to connect to send monitoring info") + parser.add_argument("--heartbeat-threshold", required=True, + help="Number of seconds without heartbeat after which a worker is considered lost") + parser.add_argument("--logdir", required=True, + help="Directory in which to create interchange.log") + parser.add_argument("--logging-level", required=True, + help="Level to log at") + parser.add_argument("--poll-period", required=True, + help="Main thread polling period, in milliseconds") + parser.add_argument("--cert-dir", required=True, + help="Directory in which to find CurveZMQ certificates") + + args = parser.parse_args() + + def parseNone(s: str) -> Optional[str]: + if s == "None": + return None + else: + return s + + def parseInt2(s: str) -> Tuple[int, int]: + t = [int(v) for v in s.split(',')] + if len(t) != 2: + raise RuntimeError("Bad parse for 2-tuple of ints") + return (t[0], t[1]) + + def parseInt2Optional(s: str) -> Optional[Tuple[int, int]]: + if s == "None": + return None + else: + t = [int(v) for v in s.split(',')] + if len(t) != 2: + raise RuntimeError("Bad parse for 2-tuple of ints") + return (t[0], t[1]) + + def parseInt3(s: str) -> Tuple[int, int, int]: + t = [int(v) for v in s.split(',')] + if len(t) != 3: + raise RuntimeError("Bad parse for 2-tuple of ints") + return (t[0], t[1], t[2]) + + def parseNoneInt(s: str) -> Optional[int]: + if s == "None": + return None + else: + return int(s) + + # TODO: can these parses move into argparse so that argparse handles errors? + + # TODO: all these ad-hoc parsers are pretty horrible - using a command line + # is a bit of a horrible way to do this... + + # TODO: initialize logging here and log any exceptions raised during + # these two lines into the log file: + + ic = Interchange(client_address=args.client_address, + interchange_address=parseNone(args.interchange_address), + client_ports=parseInt3(args.client_ports), + worker_ports=parseInt2Optional(args.worker_ports), + worker_port_range=parseInt2(args.worker_port_range), + hub_address=parseNone(args.hub_address), + hub_zmq_port=parseNoneInt(args.hub_zmq_port), + heartbeat_threshold=int(args.heartbeat_threshold), + logdir=args.logdir, + logging_level=int(args.logging_level), # TODO: is this ever None? + poll_period=int(args.poll_period), + cert_dir=parseNone(args.cert_dir), + ) ic.start() diff --git a/setup.py b/setup.py index 2c829ac994..3b030e49ec 100755 --- a/setup.py +++ b/setup.py @@ -56,6 +56,7 @@ python_requires=">=3.8.0", install_requires=install_requires, scripts = ['parsl/executors/high_throughput/process_worker_pool.py', + 'parsl/executors/high_throughput/interchange.py', 'parsl/executors/workqueue/exec_parsl_function.py', 'parsl/executors/workqueue/parsl_coprocess.py', ], From ed6fae5a4dd5be7ff28bc2855f313a2182dcf4dc Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 22:05:56 +0000 Subject: [PATCH 03/15] Restore accidentally removed dot --- parsl/executors/high_throughput/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index ed013bb854..b121a292d7 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -554,7 +554,7 @@ def _start_queue_management_thread(self): """Method to start the management thread as a daemon. Checks if a thread already exists, then starts it. - Could be used later as a restart if the management thread dies + Could be used later as a restart if the management thread dies. """ if self._queue_management_thread is None: logger.debug("Starting queue management thread") From 10c02fe0ad33377904d1d992e75223d3fd0d314c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 11 Jun 2024 12:26:26 +0000 Subject: [PATCH 04/15] call wait with a timeout, not poll, at shutdown, to replicate previous behaviour --- parsl/executors/high_throughput/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index b121a292d7..10631ff085 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -807,7 +807,7 @@ def shutdown(self, timeout: float = 10.0): logger.info("Attempting HighThroughputExecutor shutdown") self.interchange_proc.terminate() - if self.interchange_proc.poll() is None: + if self.interchange_proc.wait(timeout=timeout) is None: logger.info("Unable to terminate Interchange process; sending SIGKILL") self.interchange_proc.kill() From 8e59cebbf42d67766329f9a7cad4861fb37ae6bf Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 11 Jun 2024 12:26:38 +0000 Subject: [PATCH 05/15] Update interchange mock to look more like new interchange behaviour --- parsl/tests/test_htex/test_htex.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index ca95773e1b..9b98c9c63f 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -1,11 +1,11 @@ import pathlib import warnings from unittest import mock +from subprocess import Popen import pytest from parsl import HighThroughputExecutor, curvezmq -from parsl.multiprocessing import ForkProcess _MOCK_BASE = "parsl.executors.high_throughput.executor" @@ -78,16 +78,16 @@ def test_htex_shutdown( timeout_expires: bool, htex: HighThroughputExecutor, ): - mock_ix_proc = mock.Mock(spec=ForkProcess) + mock_ix_proc = mock.Mock(spec=Popen) if started: htex.interchange_proc = mock_ix_proc - mock_ix_proc.is_alive.return_value = True + mock_ix_proc.wait.return_value = None if not timeout_expires: # Simulate termination of the Interchange process def kill_interchange(*args, **kwargs): - mock_ix_proc.is_alive.return_value = False + mock_ix_proc.wait.return_value = 0 mock_ix_proc.terminate.side_effect = kill_interchange @@ -96,16 +96,16 @@ def kill_interchange(*args, **kwargs): mock_logs = mock_logger.info.call_args_list if started: assert mock_ix_proc.terminate.called - assert mock_ix_proc.join.called - assert {"timeout": 10} == mock_ix_proc.join.call_args[1] + assert mock_ix_proc.wait.called + assert {"timeout": 10} == mock_ix_proc.wait.call_args[1] if timeout_expires: - assert "Unable to terminate Interchange" in mock_logs[1][0][0] + assert "Unable to terminate Interchange" in mock_logs[1][0][0], "here are mock logs: " + repr(mock_logs) assert mock_ix_proc.kill.called assert "Attempting" in mock_logs[0][0][0] assert "Finished" in mock_logs[-1][0][0] else: assert not mock_ix_proc.terminate.called - assert not mock_ix_proc.join.called + assert not mock_ix_proc.wait.called assert "has not started" in mock_logs[0][0][0] From 4a608747f5070c9daf53d8c3e7078f58ad0496bc Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 11 Jun 2024 12:27:03 +0000 Subject: [PATCH 06/15] isort --- parsl/tests/test_htex/test_htex.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index 9b98c9c63f..05546d1072 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -1,7 +1,7 @@ import pathlib import warnings -from unittest import mock from subprocess import Popen +from unittest import mock import pytest From cd2d879c233cba0dd88a39c9d362ef1d8970048b Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 11 Jun 2024 12:38:22 +0000 Subject: [PATCH 07/15] Add broken test comment --- parsl/tests/test_htex/test_htex.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index 05546d1072..f06eda27d3 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -80,6 +80,12 @@ def test_htex_shutdown( ): mock_ix_proc = mock.Mock(spec=Popen) + # TODO: I think this is implementing the wrong behaviour for wait: + # wait is supposed to raise an exception on timeout, but this code + # below does not do so, and so I think it is not correctly + # detecting a broken impl in htex shutdown in the presence of + # wait timeouts... + if started: htex.interchange_proc = mock_ix_proc mock_ix_proc.wait.return_value = None From 0de296b8ab22aeed85ac28b7e067f216f7e63896 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 11 Jun 2024 13:12:56 +0000 Subject: [PATCH 08/15] Update mock to I think documented Popen behaviour, and fix shutdown behaviour to pass. Not tested against a real hanging interchange though... --- parsl/executors/high_throughput/executor.py | 4 ++- parsl/tests/test_htex/test_htex.py | 31 ++++++++++++++------- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 10631ff085..6e00c36d1c 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -807,7 +807,9 @@ def shutdown(self, timeout: float = 10.0): logger.info("Attempting HighThroughputExecutor shutdown") self.interchange_proc.terminate() - if self.interchange_proc.wait(timeout=timeout) is None: + try: + self.interchange_proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: logger.info("Unable to terminate Interchange process; sending SIGKILL") self.interchange_proc.kill() diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index f06eda27d3..2227529f82 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -1,6 +1,6 @@ import pathlib import warnings -from subprocess import Popen +from subprocess import Popen, TimeoutExpired from unittest import mock import pytest @@ -80,20 +80,31 @@ def test_htex_shutdown( ): mock_ix_proc = mock.Mock(spec=Popen) - # TODO: I think this is implementing the wrong behaviour for wait: - # wait is supposed to raise an exception on timeout, but this code - # below does not do so, and so I think it is not correctly - # detecting a broken impl in htex shutdown in the presence of - # wait timeouts... - if started: htex.interchange_proc = mock_ix_proc - mock_ix_proc.wait.return_value = None + + # This will, in the absence of any exit trigger, block forever if + # no timeout is given and if the interchange does not terminate. + # Raise an exception to report that, rather than actually block, + # and hope that nothing is catching that exception. + + # this function implements the behaviour if the interchange has + # not received a termination call + def proc_wait_alive(timeout): + if timeout: + raise TimeoutExpired(cmd="mock-interchange", timeout=timeout) + else: + raise RuntimeError("This wait call would hang forever") + + def proc_wait_terminated(timeout): + return 0 + + mock_ix_proc.wait.side_effect = proc_wait_alive if not timeout_expires: # Simulate termination of the Interchange process def kill_interchange(*args, **kwargs): - mock_ix_proc.wait.return_value = 0 + mock_ix_proc.wait.side_effect = proc_wait_terminated mock_ix_proc.terminate.side_effect = kill_interchange @@ -105,7 +116,7 @@ def kill_interchange(*args, **kwargs): assert mock_ix_proc.wait.called assert {"timeout": 10} == mock_ix_proc.wait.call_args[1] if timeout_expires: - assert "Unable to terminate Interchange" in mock_logs[1][0][0], "here are mock logs: " + repr(mock_logs) + assert "Unable to terminate Interchange" in mock_logs[1][0][0] assert mock_ix_proc.kill.called assert "Attempting" in mock_logs[0][0][0] assert "Finished" in mock_logs[-1][0][0] From 83f15ad124f0bff9579d988dbeeeb509d0587776 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 11 Jun 2024 13:32:04 +0000 Subject: [PATCH 09/15] Rename variable --- parsl/executors/high_throughput/executor.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 6e00c36d1c..37d2bfb927 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -525,7 +525,7 @@ def _start_local_interchange_process(self) -> None: Starts the interchange process locally and uses the command queue to get the worker task and result ports that the interchange has bound to. """ - cli: List[str] = ["interchange.py", + cmd: List[str] = ["interchange.py", "--client-address", "127.0.0.1", "--client-ports", f"{self.outgoing_q.port},{self.incoming_q.port},{self.command_client.port}", "--interchange-address", str(self.address), @@ -541,8 +541,7 @@ def _start_local_interchange_process(self) -> None: "--logging-level", str(logging.DEBUG) if self.worker_debug else str(logging.INFO), "--cert-dir", str(self.cert_dir) ] - logger.info(f"BENC: cli = {cli}") - self.interchange_proc = subprocess.Popen(cli) + self.interchange_proc = subprocess.Popen(cmd) try: (self.worker_task_port, self.worker_result_port) = self.command_client.run("WORKER_PORTS", timeout_s=120) From 0de37c4bb804c3e09491aaea4437f6baf8a07343 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 11 Jun 2024 13:33:03 +0000 Subject: [PATCH 10/15] Remove commented-out code --- parsl/executors/high_throughput/interchange.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 4a68a79f7c..1beff1674f 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -673,17 +673,6 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string: logger.addHandler(handler) -# @wrap_with_logs(target="interchange") -# def starter(*args: Any, **kwargs: Any) -> None: -# """Start the interchange process - -# The executor is expected to call this function. The args, kwargs match that of the Interchange.__init__ -# """ -# setproctitle("parsl: HTEX interchange") -# # logger = multiprocessing.get_logger() -# ic = Interchange(*args, **kwargs) -# ic.start() - if __name__ == "__main__": setproctitle("parsl: HTEX interchange") From ae9806d07e227694ebb7bd32a081860b26eb543b Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 11 Jun 2024 13:34:33 +0000 Subject: [PATCH 11/15] Remove a logging TODO, not the subject of this PR --- parsl/executors/high_throughput/interchange.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 1beff1674f..7cfb068e8e 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -743,9 +743,6 @@ def parseNoneInt(s: str) -> Optional[int]: # TODO: all these ad-hoc parsers are pretty horrible - using a command line # is a bit of a horrible way to do this... - # TODO: initialize logging here and log any exceptions raised during - # these two lines into the log file: - ic = Interchange(client_address=args.client_address, interchange_address=parseNone(args.interchange_address), client_ports=parseInt3(args.client_ports), From d07ad3a863b987e6edd7ce4943d1b4c9ef1cc367 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 11 Jun 2024 13:38:31 +0000 Subject: [PATCH 12/15] Rename some parsers --- parsl/executors/high_throughput/interchange.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 7cfb068e8e..428dc3e668 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -705,7 +705,7 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string: args = parser.parse_args() - def parseNone(s: str) -> Optional[str]: + def parseOptStr(s: str) -> Optional[str]: if s == "None": return None else: @@ -717,7 +717,7 @@ def parseInt2(s: str) -> Tuple[int, int]: raise RuntimeError("Bad parse for 2-tuple of ints") return (t[0], t[1]) - def parseInt2Optional(s: str) -> Optional[Tuple[int, int]]: + def parseOptInt2(s: str) -> Optional[Tuple[int, int]]: if s == "None": return None else: @@ -732,7 +732,7 @@ def parseInt3(s: str) -> Tuple[int, int, int]: raise RuntimeError("Bad parse for 2-tuple of ints") return (t[0], t[1], t[2]) - def parseNoneInt(s: str) -> Optional[int]: + def parseOptInt(s: str) -> Optional[int]: if s == "None": return None else: @@ -744,16 +744,16 @@ def parseNoneInt(s: str) -> Optional[int]: # is a bit of a horrible way to do this... ic = Interchange(client_address=args.client_address, - interchange_address=parseNone(args.interchange_address), + interchange_address=parseOptStr(args.interchange_address), client_ports=parseInt3(args.client_ports), - worker_ports=parseInt2Optional(args.worker_ports), + worker_ports=parseOptInt2(args.worker_ports), worker_port_range=parseInt2(args.worker_port_range), - hub_address=parseNone(args.hub_address), - hub_zmq_port=parseNoneInt(args.hub_zmq_port), + hub_address=parseOptStr(args.hub_address), + hub_zmq_port=parseOptInt(args.hub_zmq_port), heartbeat_threshold=int(args.heartbeat_threshold), logdir=args.logdir, logging_level=int(args.logging_level), # TODO: is this ever None? poll_period=int(args.poll_period), - cert_dir=parseNone(args.cert_dir), + cert_dir=parseOptStr(args.cert_dir), ) ic.start() From 2c477628d926f8222b2f99e2d577994cb26db297 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 11 Jun 2024 13:39:01 +0000 Subject: [PATCH 13/15] Fix typo in parse error message --- parsl/executors/high_throughput/interchange.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 428dc3e668..c08ca816e9 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -729,7 +729,7 @@ def parseOptInt2(s: str) -> Optional[Tuple[int, int]]: def parseInt3(s: str) -> Tuple[int, int, int]: t = [int(v) for v in s.split(',')] if len(t) != 3: - raise RuntimeError("Bad parse for 2-tuple of ints") + raise RuntimeError("Bad parse for 3-tuple of ints") return (t[0], t[1], t[2]) def parseOptInt(s: str) -> Optional[int]: From e7d18aa4725310534e0ec4aa1ce5601d01ec2ecc Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 11 Jun 2024 14:24:20 +0000 Subject: [PATCH 14/15] replace argparse with shipping the config kwargs dict using base64-pickle --- parsl/executors/high_throughput/executor.py | 39 +++++---- .../executors/high_throughput/interchange.py | 83 +------------------ 2 files changed, 26 insertions(+), 96 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 37d2bfb927..4b4180b46b 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -1,3 +1,4 @@ +import base64 import logging import math import pickle @@ -525,22 +526,28 @@ def _start_local_interchange_process(self) -> None: Starts the interchange process locally and uses the command queue to get the worker task and result ports that the interchange has bound to. """ - cmd: List[str] = ["interchange.py", - "--client-address", "127.0.0.1", - "--client-ports", f"{self.outgoing_q.port},{self.incoming_q.port},{self.command_client.port}", - "--interchange-address", str(self.address), - "--worker-ports", f"{self.worker_ports[0]},{self.worker_ports[1]}" - if self.worker_ports else "None", - "--worker-port-range", f"{self.worker_port_range[0]},{self.worker_port_range[1]}" - if self.worker_port_range else "None", - "--hub-address", str(self.hub_address), - "--hub-zmq-port", str(self.hub_zmq_port), - "--logdir", self.logdir, - "--heartbeat-threshold", str(self.heartbeat_threshold), - "--poll-period", str(self.poll_period), - "--logging-level", str(logging.DEBUG) if self.worker_debug else str(logging.INFO), - "--cert-dir", str(self.cert_dir) - ] + + interchange_config = {"client_address": "127.0.0.1", + "client_ports": (self.outgoing_q.port, + self.incoming_q.port, + self.command_client.port), + "interchange_address": self.address, + "worker_ports": self.worker_ports, + "worker_port_range": self.worker_port_range, + "hub_address": self.hub_address, + "hub_zmq_port": self.hub_zmq_port, + "logdir": self.logdir, + "heartbeat_threshold": self.heartbeat_threshold, + "poll_period": self.poll_period, + "logging_level": logging.DEBUG if self.worker_debug else logging.INFO, + "cert_dir": self.cert_dir, + } + + encoded = base64.b64encode(pickle.dumps(interchange_config)) + + cmd: List[bytes] = [b"interchange.py", + encoded + ] self.interchange_proc = subprocess.Popen(cmd) try: diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index c08ca816e9..b196c49814 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -import argparse +import base64 import datetime import json import logging @@ -676,84 +676,7 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string: if __name__ == "__main__": setproctitle("parsl: HTEX interchange") - parser = argparse.ArgumentParser() - - parser.add_argument("--client-address", required=True, - help="Address to connect back to submitting client") - parser.add_argument("--interchange-address", required=True, - help="Address this interchange should listen on") - parser.add_argument("--client-ports", required=True, - help="Three ports on submitting client that the interchange should connect back to") - parser.add_argument("--worker-ports", required=True, - help="Two ports on this interchange that workers can connect to") - parser.add_argument("--worker-port-range", required=True, - help="Low and high port numbers that interchange will select worker ports from") - parser.add_argument("--hub-address", required=True, - help="Address to connect to send monitoring info") - parser.add_argument("--hub-zmq-port", required=True, - help="Address to connect to send monitoring info") - parser.add_argument("--heartbeat-threshold", required=True, - help="Number of seconds without heartbeat after which a worker is considered lost") - parser.add_argument("--logdir", required=True, - help="Directory in which to create interchange.log") - parser.add_argument("--logging-level", required=True, - help="Level to log at") - parser.add_argument("--poll-period", required=True, - help="Main thread polling period, in milliseconds") - parser.add_argument("--cert-dir", required=True, - help="Directory in which to find CurveZMQ certificates") - - args = parser.parse_args() - - def parseOptStr(s: str) -> Optional[str]: - if s == "None": - return None - else: - return s + config = pickle.loads(base64.b64decode(sys.argv[1])) - def parseInt2(s: str) -> Tuple[int, int]: - t = [int(v) for v in s.split(',')] - if len(t) != 2: - raise RuntimeError("Bad parse for 2-tuple of ints") - return (t[0], t[1]) - - def parseOptInt2(s: str) -> Optional[Tuple[int, int]]: - if s == "None": - return None - else: - t = [int(v) for v in s.split(',')] - if len(t) != 2: - raise RuntimeError("Bad parse for 2-tuple of ints") - return (t[0], t[1]) - - def parseInt3(s: str) -> Tuple[int, int, int]: - t = [int(v) for v in s.split(',')] - if len(t) != 3: - raise RuntimeError("Bad parse for 3-tuple of ints") - return (t[0], t[1], t[2]) - - def parseOptInt(s: str) -> Optional[int]: - if s == "None": - return None - else: - return int(s) - - # TODO: can these parses move into argparse so that argparse handles errors? - - # TODO: all these ad-hoc parsers are pretty horrible - using a command line - # is a bit of a horrible way to do this... - - ic = Interchange(client_address=args.client_address, - interchange_address=parseOptStr(args.interchange_address), - client_ports=parseInt3(args.client_ports), - worker_ports=parseOptInt2(args.worker_ports), - worker_port_range=parseInt2(args.worker_port_range), - hub_address=parseOptStr(args.hub_address), - hub_zmq_port=parseOptInt(args.hub_zmq_port), - heartbeat_threshold=int(args.heartbeat_threshold), - logdir=args.logdir, - logging_level=int(args.logging_level), # TODO: is this ever None? - poll_period=int(args.poll_period), - cert_dir=parseOptStr(args.cert_dir), - ) + ic = Interchange(**config) ic.start() From e6115826036acf7aa0f220d77e8c5221556b5a71 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 12 Jun 2024 09:23:28 +0000 Subject: [PATCH 15/15] Switch to stdin for config object transmission --- parsl/executors/high_throughput/executor.py | 17 ++++++++++------- parsl/executors/high_throughput/interchange.py | 3 +-- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 4b4180b46b..92a1965bb1 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -1,4 +1,3 @@ -import base64 import logging import math import pickle @@ -543,18 +542,22 @@ def _start_local_interchange_process(self) -> None: "cert_dir": self.cert_dir, } - encoded = base64.b64encode(pickle.dumps(interchange_config)) + config_pickle = pickle.dumps(interchange_config) - cmd: List[bytes] = [b"interchange.py", - encoded - ] - self.interchange_proc = subprocess.Popen(cmd) + self.interchange_proc = subprocess.Popen(b"interchange.py", stdin=subprocess.PIPE) + stdin = self.interchange_proc.stdin + assert stdin is not None, "Popen should have created an IO object (vs default None) because of PIPE mode" + logger.debug("Popened interchange process. Writing config object") + stdin.write(config_pickle) + stdin.flush() + logger.debug("Sent config object. Requesting worker ports") try: (self.worker_task_port, self.worker_result_port) = self.command_client.run("WORKER_PORTS", timeout_s=120) except CommandClientTimeoutError: - logger.error("Interchange has not completed initialization in 120s. Aborting") + logger.error("Interchange has not completed initialization. Aborting") raise Exception("Interchange failed to start") + logger.debug("Got worker ports") def _start_queue_management_thread(self): """Method to start the management thread as a daemon. diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index b196c49814..9fe94dbabd 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -import base64 import datetime import json import logging @@ -676,7 +675,7 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string: if __name__ == "__main__": setproctitle("parsl: HTEX interchange") - config = pickle.loads(base64.b64decode(sys.argv[1])) + config = pickle.load(sys.stdin.buffer) ic = Interchange(**config) ic.start()