Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge with main #378

Merged
merged 43 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
4945700
first attempt to sketch out cpu affinity bindings
Apr 28, 2023
5c85221
updates to polaris app run
May 19, 2023
9f1b1ca
updates to polaris app run
May 19, 2023
c811f37
attempt to fix cpu affinity in Polaris app_run
May 24, 2023
0936c68
added polaris gpu affinity script
May 25, 2023
08e0976
fixes to the affinity script
May 25, 2023
e61c12c
some style changes
May 25, 2023
7bcbc52
reverting affinity script addition, put in different branch
May 25, 2023
b0973cf
removed helper function
May 26, 2023
77f8941
Updates to polaris cmdline implementation after dev discussion; inclu…
May 26, 2023
2efaa8e
remove turam path from polaris job-template.sh
May 26, 2023
1281a79
more updates to polaris cmdline
May 26, 2023
1b64cdb
changes to make depth paramter for Polaris app_run consistent with docs
Jun 1, 2023
937947e
Removed blank lines
Jun 1, 2023
8d6f5f0
lint fixes
Jun 1, 2023
c57beb7
fix type error
Jun 1, 2023
0691ed3
fix type error
Jun 1, 2023
72a6e3d
Merge pull request #360 from argonne-lcf/cli_dev
cms21 Jun 8, 2023
3675411
Problems with docker compose and the latest python:3-slim version
basvandervlies Jun 9, 2023
ad0e661
made change to accept a user setting cpu_bind to none
Jun 13, 2023
a37901d
Merge pull request #361 from basvandervlies/python_version_issue_343
tomuram Jun 30, 2023
ee582b3
updated theta launch params
Jul 14, 2023
ea919c6
removed white space
Jul 14, 2023
1d23048
Merge pull request #369 from argonne-lcf/theta
cms21 Jul 14, 2023
a56bcdc
allow session to sort jobs according to parameter passed in optional …
Jul 19, 2023
58a3b44
change to SessionAcquire
Jul 21, 2023
4ef3e75
change to sort_by option
Jul 21, 2023
babca79
Update config.py
cms21 Jul 24, 2023
08d0986
updates
Jul 25, 2023
172f900
made arg function
Jul 25, 2023
54ccb1b
undo changes
cms21 Jul 31, 2023
14d39b3
added new window function for node footprint calc
cms21 Aug 1, 2023
7ab9cf8
lint fixes
cms21 Aug 1, 2023
0e22498
lint fixes
cms21 Aug 1, 2023
063a48b
lint fixes
cms21 Aug 1, 2023
ff9dd8c
lint fixes
cms21 Aug 1, 2023
51b9c7d
Merge pull request #370 from argonne-lcf/server-sort
cms21 Aug 1, 2023
6a10eb7
polaris app_run cleanup
Aug 4, 2023
020ae44
lint fix
Aug 4, 2023
3890bfa
Merge branch 'main' into polaris
cms21 Aug 4, 2023
10f5d27
Merge pull request #357 from argonne-lcf/polaris
cms21 Aug 4, 2023
273c95e
Removing unnecessary logging from _mpi_mode.py
cms21 Aug 8, 2023
5034973
Merge pull request #371 from argonne-lcf/cms21-patch-2
cms21 Aug 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3-slim
FROM python:3.10-slim

WORKDIR /balsam

Expand Down
2 changes: 2 additions & 0 deletions balsam/_api/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ def acquire_jobs(
max_nodes_per_job: Optional[int] = None,
max_aggregate_nodes: Optional[float] = None,
serial_only: bool = False,
sort_by: Optional[str] = None,
filter_tags: Optional[Dict[str, str]] = None,
states: Set[JobState] = RUNNABLE_STATES,
app_ids: Optional[Set[int]] = None,
Expand All @@ -385,6 +386,7 @@ def acquire_jobs(
max_nodes_per_job=max_nodes_per_job,
max_aggregate_nodes=max_aggregate_nodes,
serial_only=serial_only,
sort_by=sort_by,
filter_tags=filter_tags,
states=states,
app_ids=app_ids,
Expand Down
1 change: 1 addition & 0 deletions balsam/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class LauncherSettings(BaseSettings):
local_app_launcher: Type[AppRun] = Field("balsam.platform.app_run.LocalAppRun")
mpirun_allows_node_packing: bool = False
serial_mode_prefetch_per_rank: int = 64
sort_by: Optional[str] = None
serial_mode_startup_params: Dict[str, str] = {"cpu_affinity": "none"}

@validator("compute_node", pre=True, always=True)
Expand Down
2 changes: 0 additions & 2 deletions balsam/config/defaults/alcf_polaris/job-template.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
export http_proxy="http://proxy:3128"
export https_proxy="http://proxy:3128"

export PYTHONPATH=/home/turam/dev/polaris/balsam:$PYTHONPATH

#remove export PMI_NO_FORK=1
export BALSAM_SITE_PATH={{balsam_site_path}}
cd $BALSAM_SITE_PATH
Expand Down
22 changes: 18 additions & 4 deletions balsam/platform/app_run/app_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import psutil # type: ignore

from balsam.platform.compute_node import ComputeNode
from balsam.site.launcher import NodeSpec

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -67,10 +68,23 @@ def get_num_ranks(self) -> int:
return self._ranks_per_node * len(self._node_spec.node_ids)

def get_cpus_per_rank(self) -> int:
cpu_per_rank = len(self._node_spec.cpu_ids[0]) // self._ranks_per_node
if not cpu_per_rank:
cpu_per_rank = max(1, int(self._threads_per_rank // self._threads_per_core))
return cpu_per_rank
# Get the list of cpus assigned to the job. If it is a single node job, that is stored in
# the NodeSpec object. If it is a multinode job, the cpu_ids assigned to NodeSpec is empty,
# so we will assume all cpus on a compute node are available to the job. The list of cpus is
# just the list of cpus on the node in that case.
cpu_ids = self._node_spec.cpu_ids[0]
cpus_per_node = len(cpu_ids)
if not cpu_ids:
compute_node = ComputeNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0])
cpus_per_node = len(compute_node.cpu_ids)

cpus_per_rank = cpus_per_node // self._ranks_per_node

# If ranks are oversubscribed to cpus (ranks_per_node > cpus_per_node), set it to a minimum of
# 1 cpu per rank or the number of cores per rank from the threading settings
if not cpus_per_rank:
cpus_per_rank = max(1, int(self._threads_per_rank // self._threads_per_core))
return cpus_per_rank

@abstractmethod
def start(self) -> None:
Expand Down
93 changes: 91 additions & 2 deletions balsam/platform/app_run/polaris.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import logging
import os

from balsam.platform.compute_node import PolarisNode

from .app_run import SubprocessAppRun

logger = logging.getLogger(__name__)


class PolarisRun(SubprocessAppRun):
"""
Expand All @@ -8,7 +15,59 @@ class PolarisRun(SubprocessAppRun):

def _build_cmdline(self) -> str:
node_ids = [h for h in self._node_spec.hostnames]
cpu_bind = self._launch_params.get("cpu_bind", "none")

# If the user does not set a cpu_bind option,
# this code sets cpu-bind to be optimal for the gpus being used.
# This does not handle the case where the application is using less than
# 8 cpus per gpu. This code will not skip the appropriate number of cpus
# in the rank binding assignments.
if "cpu_bind" in self._launch_params.keys():
cpu_bind = self._launch_params.get("cpu_bind")
elif "--cpu-bind" in self._launch_params.keys():
cpu_bind = self._launch_params.get("--cpu-bind")
else:
# Here we grab the cpu_ids assigned to the job in the NodeSpec object
# If this is not set in NodeSpec (it is only set for single node jobs),
# then we take the cpu_id list from the Polaris ComputeNode subclass,
# assuming the job will have use of all the cpus in nodes assigned to it.
cpu_ids = self._node_spec.cpu_ids[0]
polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0])
if not cpu_ids:
cpu_ids = polaris_node.cpu_ids

cpus_per_rank = self.get_cpus_per_rank()

# PolarisNode reverses the order of the gpu_ids, so assigning the cpu-bind
# in ascending cpu order is what we want here.
cpu_bind_list = ["list"]
for irank in range(self._ranks_per_node):
cpu_bind_list.append(":")
for i in range(cpus_per_rank):
if i > 0:
cpu_bind_list.append(",")
cid = str(cpu_ids[i + cpus_per_rank * irank])
cpu_bind_list.append(cid)
# If the job is using 2 hardware threads per core, we need to add those threads to the list
# The additional threads should go in the same ascending order (threads 0 and 32 are on the
# same physical core, threads 31 and 63 are on the same physical core)
if self._threads_per_core == 2:
cpu_bind_list.append(",")
cid = str(cpu_ids[i + cpus_per_rank * irank] + len(polaris_node.cpu_ids))
cpu_bind_list.append(cid)
cpu_bind = "".join(cpu_bind_list)

launch_params = []
for k in self._launch_params.keys():
if k != "cpu_bind" and k != "--cpu-bind":
launch_params.append(str(self._launch_params[k]))

# The value of -d depends on the setting of cpu_bind. If cpu-bind=core, -d is the number of
# physical cores per rank, otherwise it is the number of hardware threads per rank
# https://docs.alcf.anl.gov/running-jobs/example-job-scripts/
depth = self._threads_per_rank
if "core" == cpu_bind:
depth = self.get_cpus_per_rank()

nid_str = ",".join(map(str, node_ids))
args = [
"mpiexec",
Expand All @@ -21,7 +80,37 @@ def _build_cmdline(self) -> str:
"--cpu-bind",
cpu_bind,
"-d",
self._threads_per_rank,
depth,
*launch_params,
self._cmdline,
]
return " ".join(str(arg) for arg in args)

def _set_envs(self) -> None:
envs = os.environ.copy()
envs.update(self._envs)

# Here we grab the gpus assigned to the job from NodeSpec. NodeSpec only
# sets this for single node jobs. For multinode jobs, gpu_ids below will
# be an empty list of lists (e.g. [[], []]). The ordering of the gpu_ids
# is reversed in PolarisNode and therefore the reverse ordering of
# cpus to gpus should be reflected here
gpu_ids = self._node_spec.gpu_ids[0]
cpu_ids = self._node_spec.cpu_ids[0]
logger.info(f"Polaris set_envs: gpu_ids={gpu_ids} cpu_ids={cpu_ids}")

# Here we set CUDA_VISIBLE_DEVICES for single node jobs only. We assume
# for multinode jobs that the job has access to all gpus, and
# CUDA_VISIBLE_DEVICES is set by the user, for example by local rank with an
# gpu_affinity.sh script that wraps around the user application in the
# ApplicationDefinition.
# One special case: if your job has one node, 2 ranks, and 1 gpu per rank, the
# code here will set CUDA_VISIBLE_DEVICES to "3,2" or "1,0". A user provided
# gpu_affinity.sh script should take this assigment and use it to reset
# CUDA_VISIBLE_DEVICES for each local rank. The user script should NOT
# round-robin the setting CUDA_VISIBLE_DEVICES starting from 3.
if gpu_ids:
envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids))
envs["OMP_NUM_THREADS"] = str(self._threads_per_rank)
self._envs = envs
8 changes: 8 additions & 0 deletions balsam/platform/app_run/theta.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ def _pre_popen(self) -> None:
def _build_cmdline(self) -> str:
node_ids = [nid for nid in self._node_spec.node_ids]
nid_str = ",".join(map(str, node_ids))

launch_params = []
for k in self._launch_params.keys():
if k != "cpu_affinity":
launch_params.append(k)
launch_params.append(str(self._launch_params[k]))

cpu_affinity = self._launch_params.get("cpu_affinity", "none")
if cpu_affinity not in ["none", "depth"]:
cpu_affinity = "none"
Expand All @@ -31,6 +38,7 @@ def _build_cmdline(self) -> str:
self._threads_per_rank,
"-j",
self._threads_per_core,
*launch_params,
self._cmdline,
]
return " ".join(str(arg) for arg in args)
6 changes: 4 additions & 2 deletions balsam/platform/compute_node/alcf_polaris_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@


class PolarisNode(ComputeNode):
# turam: confirm number of cpus
cpu_ids = list(range(64))
cpu_ids = list(range(32))
gpu_ids: List[IntStr] = list(range(4))

# cms21: optimal gpu/cpu binding on Polaris nodes goes in reverse order
gpu_ids.reverse()

@classmethod
def get_job_nodelist(cls) -> List["PolarisNode"]:
"""
Expand Down
1 change: 1 addition & 0 deletions balsam/schemas/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class SessionAcquire(BaseModel):
max_nodes_per_job: Optional[int]
max_aggregate_nodes: Optional[float]
serial_only: bool = False
sort_by: Optional[str] = None
filter_tags: Dict[str, str]
states: Set[JobState] = RUNNABLE_STATES
app_ids: Set[int] = set()
Expand Down
59 changes: 48 additions & 11 deletions balsam/server/models/crud/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def create(db: Session, owner: schemas.UserOut, session: schemas.SessionCreate)
def _acquire_jobs(db: orm.Session, job_q: Select, session: models.Session) -> List[Dict[str, Any]]:
acquired_jobs = [{str(key): value for key, value in job.items()} for job in db.execute(job_q).mappings()]
acquired_ids = [job["id"] for job in acquired_jobs]
# logger.info(f"*** in _acquire_jobs acquired_ids={acquired_ids}")

stmt = update(models.Job.__table__).where(models.Job.id.in_(acquired_ids)).values(session_id=session.id)

Expand All @@ -130,7 +131,7 @@ def _acquire_jobs(db: orm.Session, job_q: Select, session: models.Session) -> Li
return acquired_jobs


def _footprint_func() -> Any:
def _footprint_func_nodes() -> Any:
footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float)
return (
func.sum(footprint)
Expand All @@ -146,6 +147,22 @@ def _footprint_func() -> Any:
)


def _footprint_func_walltime() -> Any:
footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float)
return (
func.sum(footprint)
.over(
order_by=(
models.Job.wall_time_min.desc(),
models.Job.num_nodes.desc(),
models.Job.node_packing_count.desc(),
models.Job.id.asc(),
)
)
.label("aggregate_footprint")
)


def acquire(
db: Session, owner: schemas.UserOut, session_id: int, spec: schemas.SessionAcquire
) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -182,21 +199,41 @@ def acquire(
return _acquire_jobs(db, job_q, session)

# MPI Mode Launcher will take this path:
lock_ids_q = (
job_q.with_only_columns([models.Job.id])
.order_by(
models.Job.num_nodes.asc(),
models.Job.node_packing_count.desc(),
models.Job.wall_time_min.desc(),
# logger.info(f"*** In session.acquire: spec.sort_by = {spec.sort_by}")
if spec.sort_by == "long_large_first":
lock_ids_q = (
job_q.with_only_columns([models.Job.id])
.order_by(
models.Job.wall_time_min.desc(),
models.Job.num_nodes.desc(),
models.Job.node_packing_count.desc(),
)
.limit(spec.max_num_jobs)
.with_for_update(of=models.Job.__table__, skip_locked=True)
)
.limit(spec.max_num_jobs)
.with_for_update(of=models.Job.__table__, skip_locked=True)
)
else:
lock_ids_q = (
job_q.with_only_columns([models.Job.id])
.order_by(
models.Job.num_nodes.asc(),
models.Job.node_packing_count.desc(),
models.Job.wall_time_min.desc(),
)
.limit(spec.max_num_jobs)
.with_for_update(of=models.Job.__table__, skip_locked=True)
)

locked_ids = db.execute(lock_ids_q).scalars().all()
# logger.info(f"*** locked_ids: {locked_ids}")
if spec.sort_by == "long_large_first":
subq = select(models.Job.__table__, _footprint_func_walltime()).where(models.Job.id.in_(locked_ids)).subquery() # type: ignore
else:
subq = select(models.Job.__table__, _footprint_func_nodes()).where(models.Job.id.in_(locked_ids)).subquery() # type: ignore

subq = select(models.Job.__table__, _footprint_func()).where(models.Job.id.in_(locked_ids)).subquery() # type: ignore
# logger.info(f"*** max_aggregate_nodes: {spec.max_aggregate_nodes}")
cols = [c for c in subq.c if c.name not in ["aggregate_footprint", "session_id"]]
job_q = select(cols).where(subq.c.aggregate_footprint <= spec.max_aggregate_nodes)

return _acquire_jobs(db, job_q, session)


Expand Down
6 changes: 6 additions & 0 deletions balsam/site/job_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(
filter_tags: Optional[Dict[str, str]] = None,
states: Set[str] = {"PREPROCESSED", "RESTART_READY"},
serial_only: bool = False,
sort_by: Optional[str] = None,
max_wall_time_min: Optional[int] = None,
max_nodes_per_job: Optional[int] = None,
max_aggregate_nodes: Optional[float] = None,
Expand All @@ -90,6 +91,7 @@ def __init__(
self.app_ids = set() if app_ids is None else app_ids
self.states = states
self.serial_only = serial_only
self.sort_by = sort_by
self.max_wall_time_min = max_wall_time_min
self.max_nodes_per_job = max_nodes_per_job
self.max_aggregate_nodes = max_aggregate_nodes
Expand Down Expand Up @@ -158,6 +160,7 @@ def _get_acquire_parameters(self, num_jobs: int) -> Dict[str, Any]:
max_aggregate_nodes=self.max_aggregate_nodes,
max_wall_time_min=request_time,
serial_only=self.serial_only,
sort_by=self.sort_by,
filter_tags=self.filter_tags,
states=self.states,
app_ids=self.app_ids,
Expand All @@ -182,6 +185,7 @@ def __init__(
filter_tags: Optional[Dict[str, str]] = None,
states: Set[JobState] = {JobState.preprocessed, JobState.restart_ready},
serial_only: bool = False,
sort_by: Optional[str] = None,
max_wall_time_min: Optional[int] = None,
scheduler_id: Optional[int] = None,
app_ids: Optional[Set[int]] = None,
Expand All @@ -192,6 +196,7 @@ def __init__(
self.app_ids = set() if app_ids is None else app_ids
self.states = states
self.serial_only = serial_only
self.sort_by = sort_by
self.max_wall_time_min = max_wall_time_min
self.start_time = time.time()

Expand Down Expand Up @@ -229,6 +234,7 @@ def get_jobs(
max_aggregate_nodes=max_aggregate_nodes,
max_wall_time_min=request_time,
serial_only=self.serial_only,
sort_by=self.sort_by,
filter_tags=self.filter_tags,
states=self.states,
app_ids=self.app_ids,
Expand Down
2 changes: 2 additions & 0 deletions balsam/site/launcher/_mpi_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,12 @@ def main(
)

scheduler_id = node_cls.get_scheduler_id()

job_source = SynchronousJobSource(
client=site_config.client,
site_id=site_config.site_id,
filter_tags=filter_tags_dict,
sort_by=site_config.settings.launcher.sort_by,
max_wall_time_min=wall_time_min,
scheduler_id=scheduler_id,
)
Expand Down
1 change: 1 addition & 0 deletions balsam/site/launcher/_serial_mode_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ def master_main(wall_time_min: int, master_port: int, log_filename: str, num_wor
max_wall_time_min=wall_time_min,
scheduler_id=scheduler_id,
serial_only=True,
sort_by=site_config.settings.launcher.sort_by,
max_nodes_per_job=1,
)
status_updater = BulkStatusUpdater(site_config.client)
Expand Down
Loading
Loading