diff --git a/etc/oar/admission_rules.d/15_check_types.py b/etc/oar/admission_rules.d/15_check_types.py index 10e7961e..9f7f535c 100644 --- a/etc/oar/admission_rules.d/15_check_types.py +++ b/etc/oar/admission_rules.d/15_check_types.py @@ -9,7 +9,7 @@ r8 = "^allowed=\\w+$" r9 = "^inner=\\w+$" r10 = "^timesharing=(?:(?:\\*|user),(?:\\*|name)|(?:\\*|name),(?:\\*|user))$" - r11 = "^(?:compact|spread|f_spread|co_loc|f_co_loc|no_pref|exclusive|e_compact|e_spread|friendly|unfriendly)$" + r11 = "^(?:compact|spread|f_spread|co_loc|f_co_loc|no_pref|exclusive|e_spread|cg|ft|mg|find=cg|find=ft|find=mg)$" all_re = re.compile( "(%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s)" % (r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11) diff --git a/oar/kao/custom_scheduling.py b/oar/kao/custom_scheduling.py index 30bf1b2c..9c4434b3 100644 --- a/oar/kao/custom_scheduling.py +++ b/oar/kao/custom_scheduling.py @@ -33,8 +33,8 @@ def get_nodes_characterization(session: Session): order_by=[ desc(func.count(AssignedResource.resource_id)), case( - (JobType.type == 'unfriendly', 1), - (JobType.type == 'friendly', 2), + (JobType.type == 'cg', 1), + (JobType.type == 'ft', 2), else_=3 ) ] @@ -59,103 +59,6 @@ def get_nodes_characterization(session: Session): return results -def e_compact(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=(1, 3, 2)): - """ - Given a job resource request and a set of resources this function tries to find a matching allocation. - - .. note:: - This` can be override with the oar `extension <../admin/extensions.html#functions-assign-and-find>`_ mechanism. - - :param session: The DB session - :param itvs_slots: A procset of the resources available for the allocation - :type itvs_slots: :class:`procset.ProcSet` - :param hy_res_rqts: The job's request - :param hy: The definition of the resources hierarchy - :return [ProcSet]: \ - The allocation if found, otherwise an empty :class:`procset.ProcSet` - """ - # import time - # start_time = time.time() - - logger.debug(__file__) - config, db = init_oar(no_db=False) - chars = get_nodes_characterization(session) - - rs = ResourceSet(session, config) - - nodes = {} - list(map(lambda x: nodes.setdefault(x[1], []).append(x[0]), rs.roid_2_network_address.items())) - - nodes = {k: ProcSet(*v) for k, v in nodes.items()} - - agg = [] - for network_address in nodes.keys(): - _found = list(filter(lambda x: x[0] == network_address, chars)) - if _found: - char = ( - allocation[0] if _found[0][1] == 'unfriendly' - else allocation[1] if _found[0][1] == 'friendly' - else allocation[2] - ) - else: - char = allocation[2] - - agg.append((nodes[network_address], len(nodes[network_address] & itvs_cts_slots),char)) - - # end_time = time.time() - # elapsed_time = end_time - start_time - # logger.info(f"Function execution time: {elapsed_time:.4f} seconds") - - result = ProcSet() - for hy_res_rqt in hy_res_rqts: - (hy_level_nbs, constraints) = hy_res_rqt - hy_levels = [] - hy_nbs = [] - for hy_l_n in hy_level_nbs: - (l_name, n) = hy_l_n - hy_levels.append(hy[l_name]) - hy_nbs.append(n) - - itvs_cts_slots = constraints & itvs_slots - - sorted_agg = sorted( - agg, - key=lambda x: (x[1], x[2]), - reverse=reverse - ) - - logger.debug(sorted_agg) - - hy_nodes = list(map(lambda x: x[0],sorted_agg)) - - hy_levels = [] - for node in hy_nodes: - # collect cpu Procset for particular node - n_cpus = list(filter(lambda p: p.issubset(node), hy["cpu"])) - # sort cpu Procset list sorted by min/max free cores - n_cpus = sorted( - n_cpus, key=lambda i: len(i & itvs_cts_slots), reverse=reverse - ) - # map cpu Procset to core procset - hy_levels += list( - map(ProcSet, itertools.chain.from_iterable(map(iter, n_cpus))) - ) - - # there is an Admission Rule that blocks other resources than core - # so only 1 resource type will be given - hy_levels = [hy_levels] - - res = find_resource_hierarchies_scattered( - itvs_cts_slots, list(hy_levels), hy_nbs - ) - if res: - result = result | res - else: - return ProcSet() - - return result - - def e_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=(1, 3, 2)): """ Given a job resource request and a set of resources this function tries to find a matching allocation. @@ -172,6 +75,7 @@ def e_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=Tr The allocation if found, otherwise an empty :class:`procset.ProcSet` """ result = ProcSet() + for hy_res_rqt in hy_res_rqts: (hy_level_nbs, constraints) = hy_res_rqt hy_levels = [] @@ -205,12 +109,12 @@ def e_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=Tr _found = list(filter(lambda x: x[0] == network_address, chars)) if _found: char = ( - allocation[0] if _found[0][1] == 'unfriendly' - else allocation[1] if _found[0][1] == 'friendly' + allocation[0] if _found[0][1] == 'cg' + else allocation[1] if _found[0][1] == 'ft' else allocation[2] ) else: - char = allocation[2] + char = 4 agg.append((nodes[network_address], len(nodes[network_address] & itvs_cts_slots2),char)) @@ -219,7 +123,7 @@ def e_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=Tr # create a nodes Procset list sorted by min/max free cores sorted_agg = sorted( agg, - key=lambda x: (x[1], x[2]), + key=lambda x: (x[1], x[2], -x[0][0]), reverse=reverse ) @@ -257,6 +161,16 @@ def e_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=Tr return result +def cg(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=(1, 3, 2)): + return e_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=allocation) + +def ft(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=(2, 1, 3)): + return e_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=allocation) + +def mg(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=(2, 3, 1)): + return e_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=allocation) + + def compact(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True): """ Given a job resource request and a set of resources this function tries to find a matching allocation. diff --git a/pyproject.toml b/pyproject.toml index 5b38543e..36388f83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -131,7 +131,9 @@ requires = ["poetry>=0.12"] build-backend = "poetry.masonry.api" [tool.poetry.plugins."oar.find_func"] -e_compact = "oar.kao.custom_scheduling:e_compact" +cg = "oar.kao.custom_scheduling:cg" +ft = "oar.kao.custom_scheduling:ft" +mg = "oar.kao.custom_scheduling:mg" e_spread = "oar.kao.custom_scheduling:e_spread" compact = "oar.kao.custom_scheduling:compact" spread = "oar.kao.custom_scheduling:spread"