Skip to content

Commit

Permalink
quick slotset fix: using files (slightly modified) from commit eb2af22~1
Browse files Browse the repository at this point in the history
  • Loading branch information
nikosT committed Aug 20, 2024
1 parent b2313f0 commit 1c2fed5
Show file tree
Hide file tree
Showing 4 changed files with 385 additions and 778 deletions.
250 changes: 74 additions & 176 deletions oar/kao/quotas.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from typing import List, Optional

import simplejson as json
from rich import print

from oar.lib import configuration
from oar.lib.globals import get_logger, init_oar
Expand Down Expand Up @@ -67,7 +66,7 @@ def __init__(self, json_quotas, config):
if "periodical" in json_quotas:
default = None
for p in json_quotas["periodical"]:
if p[0] == "* * * *" or p[0] == "default":
if p[0] == "*,*,*,*" or p[0] == "default":
default = self.quotas_rules2id[p[1]]
else:
self.periodical_fromJson(p)
Expand All @@ -87,7 +86,7 @@ def __init__(self, json_quotas, config):
t,
t1 - t,
default,
"* * * *",
"*,*,*,*",
)
self.nb_periodicals += 1
t = t1 + duration
Expand All @@ -97,7 +96,7 @@ def __init__(self, json_quotas, config):
t,
7 * 86400 - t,
default,
"* * * *",
"*,*,*,*",
)
self.nb_periodicals += 1
# reorder the all
Expand Down Expand Up @@ -313,7 +312,6 @@ def next_rules(self, t_epoch):
return (rules_id, remaining_duration)

def show(self, t=None, begin=None, end=None, check=True, json=False):

t_epoch = None
if t:
try:
Expand Down Expand Up @@ -411,6 +409,7 @@ def show(self, t=None, begin=None, end=None, check=True, json=False):

class Quotas(object):
"""
Implements quotas on:
- the amount of busy resources at a time
- the number of running jobs at a time
Expand Down Expand Up @@ -515,7 +514,6 @@ class Quotas(object):
enabled: bool = False
calendar: Optional[Calendar] = None
default_rules = {}
# Job types are apart, so they can extends the quotas globally ?
job_types: List[str] = ["*"]

@classmethod
Expand All @@ -533,17 +531,11 @@ def enable(cls, config: configuration, resource_set=None):
all_value = resource_set.nb_resources_default_not_dead
else:
all_value = None

cls.load_quotas_rules(config, all_value)

def __init__(self, rules=None):
def __init__(self):
self.counters = defaultdict(lambda: [0, 0, 0])
if not rules:
self.rules = Quotas.default_rules
else:
self.rules = rules
# Init the tree
self.init_rule_tree()
self.rules = Quotas.default_rules

def deepcopy_from(self, quotas):
self.counters = deepcopy(quotas.counters)
Expand Down Expand Up @@ -573,7 +565,6 @@ def update(self, job, prev_nb_res=0, prev_duration=0):

for t in Quotas.job_types:
if (t == "*") or (t in job.types):
# TODO: can't we instead loop over active rules?
# Update the number of used resources
self.counters["*", "*", t, "*"][0] += nb_resources
self.counters["*", "*", t, user][0] += nb_resources
Expand Down Expand Up @@ -610,164 +601,78 @@ def combine(self, quotas):
self.counters[key][2] += value[2]
# self.show_counters('combine after')

def init_rule_tree(self):
# Create the rule multi-tree from all active rules
# The three depths correspond to the current entity on which the level applies
# For instance the first level is the queue, the second level is the project etc

# The rule set ["*", "*", "*", "/"], ["*", "*", "besteffort", "*"] leads to the three
# queue: '*'
# /
# project _____'*'_____
# / \
# job_types _'*'_ besteffort
# / \ /
# user: '/' '*' '*'

self.rule_tree: dict[str, dict[str, dict[str, (int, int, float)]]] = dict()

for fields, rule in self.rules.items():
current = self.rule_tree
for f in fields:
if f not in current:
current[f] = dict()
current = current[f]
queue, project, job_type, user = fields

self.rule_tree[queue][project][job_type][user] = rule

return self.rule_tree

def find_applicable_rule(self, job):
""" """
# Function that get the rule that should be applied to the current job in parameter.
# Only one rule applies to the job, and the rule is found by looking at the rule tree
# from top to bottom with the following priority:
# '*' < '/' < $var
# $var is any specified value for instance 'toto' for the user or 'besteffort' for the queue
(queue, project, job_types, user) = (
job.queue_name,
job.project,
job.types,
job.user,
)

def get_item(d: dict, value: str):
if value in d:
return value
if "/" in d:
return "/"
if "*" in d:
return "*"

return None

# Init the rule to return
rule = None # [-1, -1, -1]
rule_key = None

key_queue, key_project, key_job_type, key_user = (
get_item(self.rule_tree, queue),
None,
None,
None,
)
# Walk the rule tree to find the rule
if key_queue:
key_project = get_item(self.rule_tree[key_queue], project)
if key_project:
for jtype in list(job_types) + Quotas.job_types:
key_job_type = get_item(
self.rule_tree[key_queue][key_project], jtype
)
break

if key_job_type:
key_user = get_item(
self.rule_tree[key_queue][key_project][key_job_type], user
)
if key_user:
rule = self.rule_tree[key_queue][key_project][key_job_type][
key_user
]
rule_key = key_queue, key_project, key_job_type, key_user

# '/' -> substitute by job value
if key_queue == "/":
key_queue = queue
if key_project == "/":
key_project = project
if key_user == "/":
key_user = user

# Final key to get the corresponding counter from Quotas.counters
rule_counter = key_queue, key_project, key_job_type, key_user

return (rule, rule_counter, rule_key)

def check(self, job) -> tuple[bool, str, str, int]:
(rule, complete_key, rl_quotas) = self.find_applicable_rule(job)
print("on en est la", self.rule_tree, self.rules, rule, complete_key, rl_quotas)

if rule and complete_key in self.counters:
rl_nb_resources, rl_nb_jobs, rl_resources_time = rule
(key_queue, key_project, key_job_type, key_user) = complete_key

count = self.counters[key_queue, key_project, key_job_type, key_user]
nb_resources, nb_jobs, resources_time = count

# test quotas values plus job's ones
# 1) test nb_resources
if (rl_nb_resources > -1) and (rl_nb_resources < nb_resources):
return (
False,
"nb resources quotas failed",
rl_quotas,
rl_nb_resources,
)

# 2) test nb_jobs
if (rl_nb_jobs > -1) and (rl_nb_jobs < nb_jobs):
return (
False,
"nb jobs quotas failed",
rl_quotas,
rl_nb_jobs,
)
# 3) test resources_time (work)
if (rl_resources_time > -1) and (rl_resources_time < resources_time):
return (
False,
"resources hours quotas failed",
rl_quotas,
rl_resources_time,
)

def check(self, job):
# self.show_counters('before check, job id: ' + str(job.id))
for rl_fields, rl_quotas in self.rules.items():
# pdb.set_trace()
rl_queue, rl_project, rl_job_type, rl_user = rl_fields
rl_nb_resources, rl_nb_jobs, rl_resources_time = rl_quotas
for fields, counters in self.counters.items():
queue, project, job_type, user = fields
nb_resources, nb_jobs, resources_time = counters
# match queue
if (
((rl_queue == "*") and (queue == "*"))
or ((rl_queue == queue) and (job.queue_name == queue))
or (rl_queue == "/")
):
# match project
if (
((rl_project == "*") and (project == "*"))
or ((rl_project == project) and (job.project == project))
or (rl_project == "/")
):
# match job_typ
if ((rl_job_type == "*") and (job_type == "*")) or (
(rl_job_type == job_type) and (job_type in job.types)
):
# match user
if (
((rl_user == "*") and (user == "*"))
or ((rl_user == user) and (job.user == user))
or (rl_user == "/")
):
# test quotas values plus job's ones
# 1) test nb_resources
if (rl_nb_resources > -1) and (
rl_nb_resources < nb_resources
):
return (
False,
"nb resources quotas failed",
rl_fields,
rl_nb_resources,
)
# 2) test nb_jobs
if (rl_nb_jobs > -1) and (rl_nb_jobs < nb_jobs):
return (
False,
"nb jobs quotas failed",
rl_fields,
rl_nb_jobs,
)
# 3) test resources_time (work)
if (rl_resources_time > -1) and (
rl_resources_time < resources_time
):
return (
False,
"resources hours quotas failed",
rl_fields,
rl_resources_time,
)
return (True, "quotas ok", "", 0)

@staticmethod
def check_slots_quotas(
slots,
sid_left: int,
sid_right: int,
job,
job_nb_resources: int,
duration: int,
):
def check_slots_quotas(slots, sid_left, sid_right, job, job_nb_resources, duration):
# loop over slot_set
slots_quotas: dict[int, Quotas] = {}

slots_quotas = Quotas()
slots_quotas.rules = slots[sid_left].quotas.rules
sid = sid_left
while True:
slot = slots[sid]

if slot.quotas_rules_id not in slots_quotas:
slots_quotas[slot.quotas_rules_id] = Quotas(slots[sid].quotas.rules)

quotas = slots_quotas[slot.quotas_rules_id]

# slot.quotas.show_counters('check_slots_quotas, b e: ' + str(slot.b) + ' ' + str(slot.e))
quotas.combine(slot.quotas)
slots_quotas.combine(slot.quotas)

if sid == sid_right:
break
Expand All @@ -776,22 +681,15 @@ def check_slots_quotas(
if slot.next and (
slot.quotas_rules_id != slots[slot.next].quotas_rules_id
):
logger.debug("job on two different quotas periods")

for id, quotas in slots_quotas.items():
quotas.update(job, job_nb_resources, duration)
res = quotas.check(job)
if not res[0]:
return res

# return last one that should be a success anyway
return res
return (False, "different quotas rules over job's time", "", 0)
# print('slots b e :' + str(slots[sid_left].b) + " " + str(slots[sid_right].e))
slots_quotas.update(job, job_nb_resources, duration)
return slots_quotas.check(job)

def set_rules(self, rules_id):
"""Use for temporal calendar, when rules must be change from default"""
if Quotas.calendar:
self.rules = Quotas.calendar.quotas_rules_list[rules_id]
self.init_rule_tree()

@staticmethod
def quotas_rules_fromJson(json_quotas_rules, all_value=None):
Expand Down
Loading

0 comments on commit 1c2fed5

Please sign in to comment.