diff --git a/Dockerfile b/Dockerfile index a916ee3a..c32c9f57 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3-slim +FROM python:3.10-slim WORKDIR /balsam diff --git a/balsam/_api/bases.py b/balsam/_api/bases.py index e7560bde..0ee4840e 100644 --- a/balsam/_api/bases.py +++ b/balsam/_api/bases.py @@ -370,6 +370,7 @@ def acquire_jobs( max_nodes_per_job: Optional[int] = None, max_aggregate_nodes: Optional[float] = None, serial_only: bool = False, + sort_by: Optional[str] = None, filter_tags: Optional[Dict[str, str]] = None, states: Set[JobState] = RUNNABLE_STATES, app_ids: Optional[Set[int]] = None, @@ -385,6 +386,7 @@ def acquire_jobs( max_nodes_per_job=max_nodes_per_job, max_aggregate_nodes=max_aggregate_nodes, serial_only=serial_only, + sort_by=sort_by, filter_tags=filter_tags, states=states, app_ids=app_ids, diff --git a/balsam/config/config.py b/balsam/config/config.py index 00d95c69..e4c8f066 100644 --- a/balsam/config/config.py +++ b/balsam/config/config.py @@ -193,6 +193,7 @@ class LauncherSettings(BaseSettings): local_app_launcher: Type[AppRun] = Field("balsam.platform.app_run.LocalAppRun") mpirun_allows_node_packing: bool = False serial_mode_prefetch_per_rank: int = 64 + sort_by: Optional[str] = None serial_mode_startup_params: Dict[str, str] = {"cpu_affinity": "none"} @validator("compute_node", pre=True, always=True) diff --git a/balsam/config/defaults/alcf_polaris/job-template.sh b/balsam/config/defaults/alcf_polaris/job-template.sh index 8dae69c2..dd090dee 100644 --- a/balsam/config/defaults/alcf_polaris/job-template.sh +++ b/balsam/config/defaults/alcf_polaris/job-template.sh @@ -8,8 +8,6 @@ export http_proxy="http://proxy:3128" export https_proxy="http://proxy:3128" -export PYTHONPATH=/home/turam/dev/polaris/balsam:$PYTHONPATH - #remove export PMI_NO_FORK=1 export BALSAM_SITE_PATH={{balsam_site_path}} cd $BALSAM_SITE_PATH diff --git a/balsam/platform/app_run/app_run.py b/balsam/platform/app_run/app_run.py index ff9f2cf7..3c25c4f6 100644 --- a/balsam/platform/app_run/app_run.py +++ b/balsam/platform/app_run/app_run.py @@ -8,6 +8,7 @@ import psutil # type: ignore +from balsam.platform.compute_node import ComputeNode from balsam.site.launcher import NodeSpec logger = logging.getLogger(__name__) @@ -67,10 +68,23 @@ def get_num_ranks(self) -> int: return self._ranks_per_node * len(self._node_spec.node_ids) def get_cpus_per_rank(self) -> int: - cpu_per_rank = len(self._node_spec.cpu_ids[0]) // self._ranks_per_node - if not cpu_per_rank: - cpu_per_rank = max(1, int(self._threads_per_rank // self._threads_per_core)) - return cpu_per_rank + # Get the list of cpus assigned to the job. If it is a single node job, that is stored in + # the NodeSpec object. If it is a multinode job, the cpu_ids assigned to NodeSpec is empty, + # so we will assume all cpus on a compute node are available to the job. The list of cpus is + # just the list of cpus on the node in that case. + cpu_ids = self._node_spec.cpu_ids[0] + cpus_per_node = len(cpu_ids) + if not cpu_ids: + compute_node = ComputeNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) + cpus_per_node = len(compute_node.cpu_ids) + + cpus_per_rank = cpus_per_node // self._ranks_per_node + + # If ranks are oversubscribed to cpus (ranks_per_node > cpus_per_node), set it to a minimum of + # 1 cpu per rank or the number of cores per rank from the threading settings + if not cpus_per_rank: + cpus_per_rank = max(1, int(self._threads_per_rank // self._threads_per_core)) + return cpus_per_rank @abstractmethod def start(self) -> None: diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 72834f39..20f6ea22 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -1,5 +1,12 @@ +import logging +import os + +from balsam.platform.compute_node import PolarisNode + from .app_run import SubprocessAppRun +logger = logging.getLogger(__name__) + class PolarisRun(SubprocessAppRun): """ @@ -8,7 +15,59 @@ class PolarisRun(SubprocessAppRun): def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] - cpu_bind = self._launch_params.get("cpu_bind", "none") + + # If the user does not set a cpu_bind option, + # this code sets cpu-bind to be optimal for the gpus being used. + # This does not handle the case where the application is using less than + # 8 cpus per gpu. This code will not skip the appropriate number of cpus + # in the rank binding assignments. + if "cpu_bind" in self._launch_params.keys(): + cpu_bind = self._launch_params.get("cpu_bind") + elif "--cpu-bind" in self._launch_params.keys(): + cpu_bind = self._launch_params.get("--cpu-bind") + else: + # Here we grab the cpu_ids assigned to the job in the NodeSpec object + # If this is not set in NodeSpec (it is only set for single node jobs), + # then we take the cpu_id list from the Polaris ComputeNode subclass, + # assuming the job will have use of all the cpus in nodes assigned to it. + cpu_ids = self._node_spec.cpu_ids[0] + polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) + if not cpu_ids: + cpu_ids = polaris_node.cpu_ids + + cpus_per_rank = self.get_cpus_per_rank() + + # PolarisNode reverses the order of the gpu_ids, so assigning the cpu-bind + # in ascending cpu order is what we want here. + cpu_bind_list = ["list"] + for irank in range(self._ranks_per_node): + cpu_bind_list.append(":") + for i in range(cpus_per_rank): + if i > 0: + cpu_bind_list.append(",") + cid = str(cpu_ids[i + cpus_per_rank * irank]) + cpu_bind_list.append(cid) + # If the job is using 2 hardware threads per core, we need to add those threads to the list + # The additional threads should go in the same ascending order (threads 0 and 32 are on the + # same physical core, threads 31 and 63 are on the same physical core) + if self._threads_per_core == 2: + cpu_bind_list.append(",") + cid = str(cpu_ids[i + cpus_per_rank * irank] + len(polaris_node.cpu_ids)) + cpu_bind_list.append(cid) + cpu_bind = "".join(cpu_bind_list) + + launch_params = [] + for k in self._launch_params.keys(): + if k != "cpu_bind" and k != "--cpu-bind": + launch_params.append(str(self._launch_params[k])) + + # The value of -d depends on the setting of cpu_bind. If cpu-bind=core, -d is the number of + # physical cores per rank, otherwise it is the number of hardware threads per rank + # https://docs.alcf.anl.gov/running-jobs/example-job-scripts/ + depth = self._threads_per_rank + if "core" == cpu_bind: + depth = self.get_cpus_per_rank() + nid_str = ",".join(map(str, node_ids)) args = [ "mpiexec", @@ -21,7 +80,37 @@ def _build_cmdline(self) -> str: "--cpu-bind", cpu_bind, "-d", - self._threads_per_rank, + depth, + *launch_params, self._cmdline, ] return " ".join(str(arg) for arg in args) + + def _set_envs(self) -> None: + envs = os.environ.copy() + envs.update(self._envs) + + # Here we grab the gpus assigned to the job from NodeSpec. NodeSpec only + # sets this for single node jobs. For multinode jobs, gpu_ids below will + # be an empty list of lists (e.g. [[], []]). The ordering of the gpu_ids + # is reversed in PolarisNode and therefore the reverse ordering of + # cpus to gpus should be reflected here + gpu_ids = self._node_spec.gpu_ids[0] + cpu_ids = self._node_spec.cpu_ids[0] + logger.info(f"Polaris set_envs: gpu_ids={gpu_ids} cpu_ids={cpu_ids}") + + # Here we set CUDA_VISIBLE_DEVICES for single node jobs only. We assume + # for multinode jobs that the job has access to all gpus, and + # CUDA_VISIBLE_DEVICES is set by the user, for example by local rank with an + # gpu_affinity.sh script that wraps around the user application in the + # ApplicationDefinition. + # One special case: if your job has one node, 2 ranks, and 1 gpu per rank, the + # code here will set CUDA_VISIBLE_DEVICES to "3,2" or "1,0". A user provided + # gpu_affinity.sh script should take this assigment and use it to reset + # CUDA_VISIBLE_DEVICES for each local rank. The user script should NOT + # round-robin the setting CUDA_VISIBLE_DEVICES starting from 3. + if gpu_ids: + envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" + envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) + envs["OMP_NUM_THREADS"] = str(self._threads_per_rank) + self._envs = envs diff --git a/balsam/platform/app_run/theta.py b/balsam/platform/app_run/theta.py index 9576e5f4..e79b3167 100644 --- a/balsam/platform/app_run/theta.py +++ b/balsam/platform/app_run/theta.py @@ -14,6 +14,13 @@ def _pre_popen(self) -> None: def _build_cmdline(self) -> str: node_ids = [nid for nid in self._node_spec.node_ids] nid_str = ",".join(map(str, node_ids)) + + launch_params = [] + for k in self._launch_params.keys(): + if k != "cpu_affinity": + launch_params.append(k) + launch_params.append(str(self._launch_params[k])) + cpu_affinity = self._launch_params.get("cpu_affinity", "none") if cpu_affinity not in ["none", "depth"]: cpu_affinity = "none" @@ -31,6 +38,7 @@ def _build_cmdline(self) -> str: self._threads_per_rank, "-j", self._threads_per_core, + *launch_params, self._cmdline, ] return " ".join(str(arg) for arg in args) diff --git a/balsam/platform/compute_node/alcf_polaris_node.py b/balsam/platform/compute_node/alcf_polaris_node.py index b5283c3b..208490a1 100644 --- a/balsam/platform/compute_node/alcf_polaris_node.py +++ b/balsam/platform/compute_node/alcf_polaris_node.py @@ -10,10 +10,12 @@ class PolarisNode(ComputeNode): - # turam: confirm number of cpus - cpu_ids = list(range(64)) + cpu_ids = list(range(32)) gpu_ids: List[IntStr] = list(range(4)) + # cms21: optimal gpu/cpu binding on Polaris nodes goes in reverse order + gpu_ids.reverse() + @classmethod def get_job_nodelist(cls) -> List["PolarisNode"]: """ diff --git a/balsam/schemas/session.py b/balsam/schemas/session.py index e978c7f4..31fb0298 100644 --- a/balsam/schemas/session.py +++ b/balsam/schemas/session.py @@ -29,6 +29,7 @@ class SessionAcquire(BaseModel): max_nodes_per_job: Optional[int] max_aggregate_nodes: Optional[float] serial_only: bool = False + sort_by: Optional[str] = None filter_tags: Dict[str, str] states: Set[JobState] = RUNNABLE_STATES app_ids: Set[int] = set() diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index ff40985a..8862e6f3 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -114,6 +114,7 @@ def create(db: Session, owner: schemas.UserOut, session: schemas.SessionCreate) def _acquire_jobs(db: orm.Session, job_q: Select, session: models.Session) -> List[Dict[str, Any]]: acquired_jobs = [{str(key): value for key, value in job.items()} for job in db.execute(job_q).mappings()] acquired_ids = [job["id"] for job in acquired_jobs] + # logger.info(f"*** in _acquire_jobs acquired_ids={acquired_ids}") stmt = update(models.Job.__table__).where(models.Job.id.in_(acquired_ids)).values(session_id=session.id) @@ -130,7 +131,7 @@ def _acquire_jobs(db: orm.Session, job_q: Select, session: models.Session) -> Li return acquired_jobs -def _footprint_func() -> Any: +def _footprint_func_nodes() -> Any: footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float) return ( func.sum(footprint) @@ -146,6 +147,22 @@ def _footprint_func() -> Any: ) +def _footprint_func_walltime() -> Any: + footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float) + return ( + func.sum(footprint) + .over( + order_by=( + models.Job.wall_time_min.desc(), + models.Job.num_nodes.desc(), + models.Job.node_packing_count.desc(), + models.Job.id.asc(), + ) + ) + .label("aggregate_footprint") + ) + + def acquire( db: Session, owner: schemas.UserOut, session_id: int, spec: schemas.SessionAcquire ) -> List[Dict[str, Any]]: @@ -182,21 +199,41 @@ def acquire( return _acquire_jobs(db, job_q, session) # MPI Mode Launcher will take this path: - lock_ids_q = ( - job_q.with_only_columns([models.Job.id]) - .order_by( - models.Job.num_nodes.asc(), - models.Job.node_packing_count.desc(), - models.Job.wall_time_min.desc(), + # logger.info(f"*** In session.acquire: spec.sort_by = {spec.sort_by}") + if spec.sort_by == "long_large_first": + lock_ids_q = ( + job_q.with_only_columns([models.Job.id]) + .order_by( + models.Job.wall_time_min.desc(), + models.Job.num_nodes.desc(), + models.Job.node_packing_count.desc(), + ) + .limit(spec.max_num_jobs) + .with_for_update(of=models.Job.__table__, skip_locked=True) ) - .limit(spec.max_num_jobs) - .with_for_update(of=models.Job.__table__, skip_locked=True) - ) + else: + lock_ids_q = ( + job_q.with_only_columns([models.Job.id]) + .order_by( + models.Job.num_nodes.asc(), + models.Job.node_packing_count.desc(), + models.Job.wall_time_min.desc(), + ) + .limit(spec.max_num_jobs) + .with_for_update(of=models.Job.__table__, skip_locked=True) + ) + locked_ids = db.execute(lock_ids_q).scalars().all() + # logger.info(f"*** locked_ids: {locked_ids}") + if spec.sort_by == "long_large_first": + subq = select(models.Job.__table__, _footprint_func_walltime()).where(models.Job.id.in_(locked_ids)).subquery() # type: ignore + else: + subq = select(models.Job.__table__, _footprint_func_nodes()).where(models.Job.id.in_(locked_ids)).subquery() # type: ignore - subq = select(models.Job.__table__, _footprint_func()).where(models.Job.id.in_(locked_ids)).subquery() # type: ignore + # logger.info(f"*** max_aggregate_nodes: {spec.max_aggregate_nodes}") cols = [c for c in subq.c if c.name not in ["aggregate_footprint", "session_id"]] job_q = select(cols).where(subq.c.aggregate_footprint <= spec.max_aggregate_nodes) + return _acquire_jobs(db, job_q, session) diff --git a/balsam/site/job_source.py b/balsam/site/job_source.py index fe4a6bef..537969a3 100644 --- a/balsam/site/job_source.py +++ b/balsam/site/job_source.py @@ -72,6 +72,7 @@ def __init__( filter_tags: Optional[Dict[str, str]] = None, states: Set[str] = {"PREPROCESSED", "RESTART_READY"}, serial_only: bool = False, + sort_by: Optional[str] = None, max_wall_time_min: Optional[int] = None, max_nodes_per_job: Optional[int] = None, max_aggregate_nodes: Optional[float] = None, @@ -90,6 +91,7 @@ def __init__( self.app_ids = set() if app_ids is None else app_ids self.states = states self.serial_only = serial_only + self.sort_by = sort_by self.max_wall_time_min = max_wall_time_min self.max_nodes_per_job = max_nodes_per_job self.max_aggregate_nodes = max_aggregate_nodes @@ -158,6 +160,7 @@ def _get_acquire_parameters(self, num_jobs: int) -> Dict[str, Any]: max_aggregate_nodes=self.max_aggregate_nodes, max_wall_time_min=request_time, serial_only=self.serial_only, + sort_by=self.sort_by, filter_tags=self.filter_tags, states=self.states, app_ids=self.app_ids, @@ -182,6 +185,7 @@ def __init__( filter_tags: Optional[Dict[str, str]] = None, states: Set[JobState] = {JobState.preprocessed, JobState.restart_ready}, serial_only: bool = False, + sort_by: Optional[str] = None, max_wall_time_min: Optional[int] = None, scheduler_id: Optional[int] = None, app_ids: Optional[Set[int]] = None, @@ -192,6 +196,7 @@ def __init__( self.app_ids = set() if app_ids is None else app_ids self.states = states self.serial_only = serial_only + self.sort_by = sort_by self.max_wall_time_min = max_wall_time_min self.start_time = time.time() @@ -229,6 +234,7 @@ def get_jobs( max_aggregate_nodes=max_aggregate_nodes, max_wall_time_min=request_time, serial_only=self.serial_only, + sort_by=self.sort_by, filter_tags=self.filter_tags, states=self.states, app_ids=self.app_ids, diff --git a/balsam/site/launcher/_mpi_mode.py b/balsam/site/launcher/_mpi_mode.py index c3662984..05f6e957 100644 --- a/balsam/site/launcher/_mpi_mode.py +++ b/balsam/site/launcher/_mpi_mode.py @@ -271,10 +271,12 @@ def main( ) scheduler_id = node_cls.get_scheduler_id() + job_source = SynchronousJobSource( client=site_config.client, site_id=site_config.site_id, filter_tags=filter_tags_dict, + sort_by=site_config.settings.launcher.sort_by, max_wall_time_min=wall_time_min, scheduler_id=scheduler_id, ) diff --git a/balsam/site/launcher/_serial_mode_master.py b/balsam/site/launcher/_serial_mode_master.py index 558fdc9b..ccd7ccd1 100644 --- a/balsam/site/launcher/_serial_mode_master.py +++ b/balsam/site/launcher/_serial_mode_master.py @@ -237,6 +237,7 @@ def master_main(wall_time_min: int, master_port: int, log_filename: str, num_wor max_wall_time_min=wall_time_min, scheduler_id=scheduler_id, serial_only=True, + sort_by=site_config.settings.launcher.sort_by, max_nodes_per_job=1, ) status_updater = BulkStatusUpdater(site_config.client) diff --git a/tests/server/test_auth.py b/tests/server/test_auth.py index d600c60e..a9aedad2 100644 --- a/tests/server/test_auth.py +++ b/tests/server/test_auth.py @@ -12,7 +12,7 @@ def test_unauth_user_cannot_view_sites(anon_client): def test_register(anon_client): login_credentials = {"username": f"user{uuid4()}", "password": "foo"} resp = anon_client.post("/" + urls.PASSWORD_REGISTER, **login_credentials) - assert type(resp["id"]) == int + assert isinstance(resp["id"], int) assert resp["username"] == login_credentials["username"] diff --git a/tests/server/test_sites.py b/tests/server/test_sites.py index 5541bc45..e55b3ace 100644 --- a/tests/server/test_sites.py +++ b/tests/server/test_sites.py @@ -12,7 +12,7 @@ def test_create_site(auth_client): name="thetalogin3.alcf.anl.gov", path="/projects/myProject/balsam-site", ) - assert type(posted_site["id"]) == int + assert isinstance(posted_site["id"], int) site_list = auth_client.get("/sites/")["results"] assert isinstance(site_list, list) assert len(site_list) == 1