Skip to content

Commit

Permalink
fix sorting mechanism to sort first by size of free cores and then by…
Browse files Browse the repository at this point in the history
… friendly
  • Loading branch information
nikosT committed Sep 1, 2024
1 parent fb05b64 commit 0e5a763
Showing 1 changed file with 39 additions and 10 deletions.
49 changes: 39 additions & 10 deletions oar/kao/custom_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def get_nodes_characterization(session: Session):
.join(Job, Job.id == JobType.job_id)
.join(AssignedResource, Job.assigned_moldable_job == AssignedResource.moldable_id)
.join(Resource, AssignedResource.resource_id == Resource.id)
.filter(Job.state.in_(("toLaunch", "Running", "Resuming", "Terminated")))
.filter(Job.state.in_(("toLaunch", "Running", "Resuming"))) # .filter(Job.state.in_(("toLaunch", "Running", "Resuming", "Terminated")))
.group_by(Resource.network_address, JobType.type)
.subquery()
)
Expand Down Expand Up @@ -93,14 +93,14 @@ def compact(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=Tru
_found = list(filter(lambda x: x[0] == network_address, chars))
if _found:
char = (
allocation[0] if _found[0][1] == 'unfriendly'
allocation[0] if _found[0][1] == 'unfriendly'
else allocation[1] if _found[0][1] == 'friendly'
else allocation[2]
)
else:
char = 3
char = allocation[2]

agg.append((nodes[network_address], char))
agg.append((nodes[network_address], len(nodes[network_address] & itvs_cts_slots),char))

# end_time = time.time()
# elapsed_time = end_time - start_time
Expand All @@ -120,7 +120,7 @@ def compact(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=Tru

sorted_agg = sorted(
agg,
key=lambda x: (len(x[0] & itvs_cts_slots), x[1]),
key=lambda x: (x[1], x[2]),
reverse=reverse
)

Expand Down Expand Up @@ -156,7 +156,7 @@ def compact(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=Tru
return result


def spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True):
def spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True, allocation=(1, 3, 2)):
"""
Given a job resource request and a set of resources this function tries to find a matching allocation.
Expand Down Expand Up @@ -189,15 +189,44 @@ def spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True
avail_cores = soc & itvs_cts_slots
itvs_cts_slots -= ProcSet(*avail_cores[int(len(soc) / 2) : len(soc)])

logger.info(__file__)
config, db = init_oar(no_db=False)
chars = get_nodes_characterization(session)

rs = ResourceSet(session, config)

nodes = {}
list(map(lambda x: nodes.setdefault(x[1], []).append(x[0]), rs.roid_2_network_address.items()))

nodes = {k: ProcSet(*v) for k, v in nodes.items()}

agg = []
for network_address in nodes.keys():
_found = list(filter(lambda x: x[0] == network_address, chars))
if _found:
char = (
allocation[0] if _found[0][1] == 'unfriendly'
else allocation[1] if _found[0][1] == 'friendly'
else allocation[2]
)
else:
char = allocation[2]

agg.append((nodes[network_address], len(nodes[network_address] & itvs_cts_slots2),char))

# Select unused resources first (top-down).
try:
# create a nodes Procset list sorted by min/max free cores
hy_nodes = sorted(
hy["network_address"],
key=lambda i: len(i & itvs_cts_slots2),
reverse=reverse,
sorted_agg = sorted(
agg,
key=lambda x: (x[1], x[2]),
reverse=reverse
)

logger.info(sorted_agg)

hy_nodes = list(map(lambda x: x[0],sorted_agg))

hy_levels = []
for node in hy_nodes:
# collect cpu Procset for particular node
Expand Down

0 comments on commit 0e5a763

Please sign in to comment.