Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Launch interchange as a fresh process #3463

Merged
merged 20 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f56c92c
fiddle with interchange default values
benclifford May 6, 2024
1733deb
make interchange into a "first-order" process like process worker poo…
benclifford May 6, 2024
50d70e7
Merge remote-tracking branch 'origin/master' into benc-interchange-fork
benclifford Jun 4, 2024
cf9eebb
Merge branch 'master' into benc-interchange-fork
benclifford Jun 5, 2024
d762aaf
Merge remote-tracking branch 'origin/master' into benc-interchange-fork
benclifford Jun 10, 2024
ed6fae5
Restore accidentally removed dot
benclifford Jun 10, 2024
10c02fe
call wait with a timeout, not poll, at shutdown, to replicate previou…
benclifford Jun 11, 2024
8e59ceb
Update interchange mock to look more like new interchange behaviour
benclifford Jun 11, 2024
4a60874
isort
benclifford Jun 11, 2024
cd2d879
Add broken test comment
benclifford Jun 11, 2024
0de296b
Update mock to I think documented Popen behaviour, and fix shutdown b…
benclifford Jun 11, 2024
83f15ad
Rename variable
benclifford Jun 11, 2024
0de37c4
Remove commented-out code
benclifford Jun 11, 2024
ae9806d
Remove a logging TODO, not the subject of this PR
benclifford Jun 11, 2024
d07ad3a
Rename some parsers
benclifford Jun 11, 2024
2c47762
Fix typo in parse error message
benclifford Jun 11, 2024
853d12a
Merge remote-tracking branch 'origin/master' into benc-interchange-fork
benclifford Jun 11, 2024
e7d18aa
replace argparse with shipping the config kwargs dict using base64-pi…
benclifford Jun 11, 2024
e611582
Switch to stdin for config object transmission
benclifford Jun 12, 2024
7f4b7dd
Merge branch 'master' into benc-interchange-fork
benclifford Jun 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 36 additions & 31 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging
import math
import pickle
import subprocess
import threading
import typing
import warnings
from collections import defaultdict
from concurrent.futures import Future
from dataclasses import dataclass
from multiprocessing import Process
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union

import typeguard
Expand All @@ -18,15 +18,14 @@
from parsl.app.errors import RemoteExceptionWrapper
from parsl.data_provider.staging import Staging
from parsl.executors.errors import BadMessage, ScalingFailed
from parsl.executors.high_throughput import interchange, zmq_pipes
from parsl.executors.high_throughput import zmq_pipes
from parsl.executors.high_throughput.errors import CommandClientTimeoutError
from parsl.executors.high_throughput.mpi_prefix_composer import (
VALID_LAUNCHERS,
validate_resource_spec,
)
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus
from parsl.multiprocessing import ForkProcess
from parsl.process_loggers import wrap_with_logs
from parsl.providers import LocalProvider
from parsl.providers.base import ExecutionProvider
Expand Down Expand Up @@ -305,7 +304,7 @@ def __init__(self,
self._task_counter = 0
self.worker_ports = worker_ports
self.worker_port_range = worker_port_range
self.interchange_proc: Optional[Process] = None
self.interchange_proc: Optional[subprocess.Popen] = None
self.interchange_port_range = interchange_port_range
self.heartbeat_threshold = heartbeat_threshold
self.heartbeat_period = heartbeat_period
Expand Down Expand Up @@ -520,38 +519,45 @@ def _queue_management_worker(self):

logger.info("Queue management worker finished")

def _start_local_interchange_process(self):
def _start_local_interchange_process(self) -> None:
""" Starts the interchange process locally

Starts the interchange process locally and uses an internal command queue to
Starts the interchange process locally and uses the command queue to
get the worker task and result ports that the interchange has bound to.
"""
self.interchange_proc = ForkProcess(target=interchange.starter,
kwargs={"client_address": "127.0.0.1",
"client_ports": (self.outgoing_q.port,
self.incoming_q.port,
self.command_client.port),
"interchange_address": self.address,
"worker_ports": self.worker_ports,
"worker_port_range": self.worker_port_range,
"hub_address": self.hub_address,
"hub_zmq_port": self.hub_zmq_port,
"logdir": self.logdir,
"heartbeat_threshold": self.heartbeat_threshold,
"poll_period": self.poll_period,
"logging_level": logging.DEBUG if self.worker_debug else logging.INFO,
"cert_dir": self.cert_dir,
},
daemon=True,
name="HTEX-Interchange"
)
self.interchange_proc.start()

interchange_config = {"client_address": "127.0.0.1",
"client_ports": (self.outgoing_q.port,
self.incoming_q.port,
self.command_client.port),
"interchange_address": self.address,
"worker_ports": self.worker_ports,
"worker_port_range": self.worker_port_range,
"hub_address": self.hub_address,
"hub_zmq_port": self.hub_zmq_port,
"logdir": self.logdir,
"heartbeat_threshold": self.heartbeat_threshold,
"poll_period": self.poll_period,
"logging_level": logging.DEBUG if self.worker_debug else logging.INFO,
"cert_dir": self.cert_dir,
}

config_pickle = pickle.dumps(interchange_config)

self.interchange_proc = subprocess.Popen(b"interchange.py", stdin=subprocess.PIPE)
stdin = self.interchange_proc.stdin
assert stdin is not None, "Popen should have created an IO object (vs default None) because of PIPE mode"

logger.debug("Popened interchange process. Writing config object")
stdin.write(config_pickle)
stdin.flush()
logger.debug("Sent config object. Requesting worker ports")
try:
(self.worker_task_port, self.worker_result_port) = self.command_client.run("WORKER_PORTS", timeout_s=120)
except CommandClientTimeoutError:
logger.error("Interchange has not completed initialization in 120s. Aborting")
logger.error("Interchange has not completed initialization. Aborting")
raise Exception("Interchange failed to start")
logger.debug("Got worker ports")

def _start_queue_management_thread(self):
"""Method to start the management thread as a daemon.
Expand Down Expand Up @@ -810,13 +816,12 @@ def shutdown(self, timeout: float = 10.0):
logger.info("Attempting HighThroughputExecutor shutdown")

self.interchange_proc.terminate()
self.interchange_proc.join(timeout=timeout)
if self.interchange_proc.is_alive():
try:
self.interchange_proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
logger.info("Unable to terminate Interchange process; sending SIGKILL")
self.interchange_proc.kill()

self.interchange_proc.close()

logger.info("Finished HighThroughputExecutor shutdown attempt")

def get_usage_information(self):
Expand Down
13 changes: 5 additions & 8 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,13 +672,10 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string:
logger.addHandler(handler)


@wrap_with_logs(target="interchange")
def starter(*args: Any, **kwargs: Any) -> None:
"""Start the interchange process

The executor is expected to call this function. The args, kwargs match that of the Interchange.__init__
"""
if __name__ == "__main__":
setproctitle("parsl: HTEX interchange")
# logger = multiprocessing.get_logger()
ic = Interchange(*args, **kwargs)

config = pickle.load(sys.stdin.buffer)

ic = Interchange(**config)
khk-globus marked this conversation as resolved.
Show resolved Hide resolved
ic.start()
31 changes: 24 additions & 7 deletions parsl/tests/test_htex/test_htex.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import pathlib
import warnings
from subprocess import Popen, TimeoutExpired
from unittest import mock

import pytest

from parsl import HighThroughputExecutor, curvezmq
from parsl.multiprocessing import ForkProcess

_MOCK_BASE = "parsl.executors.high_throughput.executor"

Expand Down Expand Up @@ -78,16 +78,33 @@ def test_htex_shutdown(
timeout_expires: bool,
htex: HighThroughputExecutor,
):
mock_ix_proc = mock.Mock(spec=ForkProcess)
mock_ix_proc = mock.Mock(spec=Popen)

if started:
htex.interchange_proc = mock_ix_proc
mock_ix_proc.is_alive.return_value = True

# This will, in the absence of any exit trigger, block forever if
# no timeout is given and if the interchange does not terminate.
# Raise an exception to report that, rather than actually block,
# and hope that nothing is catching that exception.

# this function implements the behaviour if the interchange has
# not received a termination call
def proc_wait_alive(timeout):
if timeout:
raise TimeoutExpired(cmd="mock-interchange", timeout=timeout)
else:
raise RuntimeError("This wait call would hang forever")

def proc_wait_terminated(timeout):
return 0

mock_ix_proc.wait.side_effect = proc_wait_alive

if not timeout_expires:
# Simulate termination of the Interchange process
def kill_interchange(*args, **kwargs):
mock_ix_proc.is_alive.return_value = False
mock_ix_proc.wait.side_effect = proc_wait_terminated

mock_ix_proc.terminate.side_effect = kill_interchange

Expand All @@ -96,16 +113,16 @@ def kill_interchange(*args, **kwargs):
mock_logs = mock_logger.info.call_args_list
if started:
assert mock_ix_proc.terminate.called
assert mock_ix_proc.join.called
assert {"timeout": 10} == mock_ix_proc.join.call_args[1]
assert mock_ix_proc.wait.called
assert {"timeout": 10} == mock_ix_proc.wait.call_args[1]
if timeout_expires:
assert "Unable to terminate Interchange" in mock_logs[1][0][0]
assert mock_ix_proc.kill.called
assert "Attempting" in mock_logs[0][0][0]
assert "Finished" in mock_logs[-1][0][0]
else:
assert not mock_ix_proc.terminate.called
assert not mock_ix_proc.join.called
assert not mock_ix_proc.wait.called
assert "has not started" in mock_logs[0][0][0]


Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
python_requires=">=3.8.0",
install_requires=install_requires,
scripts = ['parsl/executors/high_throughput/process_worker_pool.py',
'parsl/executors/high_throughput/interchange.py',
'parsl/executors/workqueue/exec_parsl_function.py',
'parsl/executors/workqueue/parsl_coprocess.py',
],
Expand Down
Loading