Skip to content

Commit

Permalink
Heatmap aware for cg, ft, mg
Browse files Browse the repository at this point in the history
  • Loading branch information
nikosT committed Sep 10, 2024
1 parent a68af02 commit bb26b23
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 105 deletions.
2 changes: 1 addition & 1 deletion etc/oar/admission_rules.d/15_check_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
r8 = "^allowed=\\w+$"
r9 = "^inner=\\w+$"
r10 = "^timesharing=(?:(?:\\*|user),(?:\\*|name)|(?:\\*|name),(?:\\*|user))$"
r11 = "^(?:compact|spread|f_spread|co_loc|f_co_loc|no_pref|exclusive|e_compact|e_spread|friendly|unfriendly)$"
r11 = "^(?:compact|spread|f_spread|co_loc|f_co_loc|no_pref|exclusive|e_spread|cg|ft|mg|find=cg|find=ft|find=mg)$"
all_re = re.compile(
"(%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s)"
% (r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11)
Expand Down
120 changes: 17 additions & 103 deletions oar/kao/custom_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def get_nodes_characterization(session: Session):
order_by=[
desc(func.count(AssignedResource.resource_id)),
case(
(JobType.type == 'unfriendly', 1),
(JobType.type == 'friendly', 2),
(JobType.type == 'cg', 1),
(JobType.type == 'ft', 2),
else_=3
)
]
Expand All @@ -59,103 +59,6 @@ def get_nodes_characterization(session: Session):
return results


def e_compact(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=(1, 3, 2)):
"""
Given a job resource request and a set of resources this function tries to find a matching allocation.
.. note::
This` can be override with the oar `extension <../admin/extensions.html#functions-assign-and-find>`_ mechanism.
:param session: The DB session
:param itvs_slots: A procset of the resources available for the allocation
:type itvs_slots: :class:`procset.ProcSet`
:param hy_res_rqts: The job's request
:param hy: The definition of the resources hierarchy
:return [ProcSet]: \
The allocation if found, otherwise an empty :class:`procset.ProcSet`
"""
# import time
# start_time = time.time()

logger.debug(__file__)
config, db = init_oar(no_db=False)
chars = get_nodes_characterization(session)

rs = ResourceSet(session, config)

nodes = {}
list(map(lambda x: nodes.setdefault(x[1], []).append(x[0]), rs.roid_2_network_address.items()))

nodes = {k: ProcSet(*v) for k, v in nodes.items()}

agg = []
for network_address in nodes.keys():
_found = list(filter(lambda x: x[0] == network_address, chars))
if _found:
char = (
allocation[0] if _found[0][1] == 'unfriendly'
else allocation[1] if _found[0][1] == 'friendly'
else allocation[2]
)
else:
char = allocation[2]

agg.append((nodes[network_address], len(nodes[network_address] & itvs_cts_slots),char))

# end_time = time.time()
# elapsed_time = end_time - start_time
# logger.info(f"Function execution time: {elapsed_time:.4f} seconds")

result = ProcSet()
for hy_res_rqt in hy_res_rqts:
(hy_level_nbs, constraints) = hy_res_rqt
hy_levels = []
hy_nbs = []
for hy_l_n in hy_level_nbs:
(l_name, n) = hy_l_n
hy_levels.append(hy[l_name])
hy_nbs.append(n)

itvs_cts_slots = constraints & itvs_slots

sorted_agg = sorted(
agg,
key=lambda x: (x[1], x[2]),
reverse=reverse
)

logger.debug(sorted_agg)

hy_nodes = list(map(lambda x: x[0],sorted_agg))

hy_levels = []
for node in hy_nodes:
# collect cpu Procset for particular node
n_cpus = list(filter(lambda p: p.issubset(node), hy["cpu"]))
# sort cpu Procset list sorted by min/max free cores
n_cpus = sorted(
n_cpus, key=lambda i: len(i & itvs_cts_slots), reverse=reverse
)
# map cpu Procset to core procset
hy_levels += list(
map(ProcSet, itertools.chain.from_iterable(map(iter, n_cpus)))
)

# there is an Admission Rule that blocks other resources than core
# so only 1 resource type will be given
hy_levels = [hy_levels]

res = find_resource_hierarchies_scattered(
itvs_cts_slots, list(hy_levels), hy_nbs
)
if res:
result = result | res
else:
return ProcSet()

return result


def e_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=(1, 3, 2)):
"""
Given a job resource request and a set of resources this function tries to find a matching allocation.
Expand All @@ -172,6 +75,7 @@ def e_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=Tr
The allocation if found, otherwise an empty :class:`procset.ProcSet`
"""
result = ProcSet()

for hy_res_rqt in hy_res_rqts:
(hy_level_nbs, constraints) = hy_res_rqt
hy_levels = []
Expand Down Expand Up @@ -205,12 +109,12 @@ def e_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=Tr
_found = list(filter(lambda x: x[0] == network_address, chars))
if _found:
char = (
allocation[0] if _found[0][1] == 'unfriendly'
else allocation[1] if _found[0][1] == 'friendly'
allocation[0] if _found[0][1] == 'cg'
else allocation[1] if _found[0][1] == 'ft'
else allocation[2]
)
else:
char = allocation[2]
char = 4

agg.append((nodes[network_address], len(nodes[network_address] & itvs_cts_slots2),char))

Expand All @@ -219,7 +123,7 @@ def e_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=Tr
# create a nodes Procset list sorted by min/max free cores
sorted_agg = sorted(
agg,
key=lambda x: (x[1], x[2]),
key=lambda x: (x[1], x[2], -x[0][0]),
reverse=reverse
)

Expand Down Expand Up @@ -257,6 +161,16 @@ def e_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=Tr
return result


def cg(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=(1, 3, 2)):
return e_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=allocation)

def ft(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=(2, 1, 3)):
return e_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=allocation)

def mg(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=(2, 3, 1)):
return e_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=allocation)


def compact(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True):
"""
Given a job resource request and a set of resources this function tries to find a matching allocation.
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ requires = ["poetry>=0.12"]
build-backend = "poetry.masonry.api"

[tool.poetry.plugins."oar.find_func"]
e_compact = "oar.kao.custom_scheduling:e_compact"
cg = "oar.kao.custom_scheduling:cg"
ft = "oar.kao.custom_scheduling:ft"
mg = "oar.kao.custom_scheduling:mg"
e_spread = "oar.kao.custom_scheduling:e_spread"
compact = "oar.kao.custom_scheduling:compact"
spread = "oar.kao.custom_scheduling:spread"
Expand Down

0 comments on commit bb26b23

Please sign in to comment.