Skip to content

Commit

Permalink
Added logic to save and read whole object as single json
Browse files Browse the repository at this point in the history
tendrl-spec: Tendrl/specifications#174
tendrl-bug-id: Tendrl#657
Signed-off-by: Shubhendu <shtripat@redhat.com>
  • Loading branch information
Shubhendu committed Aug 29, 2017
1 parent 73b4418 commit 51e8736
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 172 deletions.
25 changes: 10 additions & 15 deletions tendrl/commons/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def stop(self):

def process_job(job):
jid = job.key.split('/')[-1]
job_status_key = "/queue/%s/status" % jid
job_obj = Job(job_id=jid).load()
job_lock_key = "/queue/%s/locked_by" % jid
NS.node_context = NS.node_context.load()
# Check job not already locked by some agent
Expand All @@ -65,8 +65,7 @@ def process_job(job):

# Check job not already "finished", or "processing"
try:
_status = NS._int.client.read(job_status_key).value
if _status in ["finished", "processing"]:
if job_obj.status in ["finished", "processing"]:
return
except etcd.EtcdKeyNotFound:
pass
Expand Down Expand Up @@ -94,9 +93,8 @@ def process_job(job):
# mark status as "failed" and Job.error =
# "Timed out"
try:
NS._int.wclient.write(job_status_key,
"failed",
prevValue="new")
job_obj.status = "failed"
job_obj.save()
except etcd.EtcdCompareFailed:
pass
else:
Expand Down Expand Up @@ -143,7 +141,6 @@ def process_job(job):
)
return

job_status_key = "/queue/%s/status" % job.job_id
job_lock_key = "/queue/%s/locked_by" % job.job_id
try:
lock_info = dict(node_id=NS.node_context.node_id,
Expand All @@ -152,8 +149,8 @@ def process_job(job):
type=NS.type)
NS._int.wclient.write(job_lock_key,
json.dumps(lock_info))
NS._int.wclient.write(job_status_key, "processing",
prevValue="new")
job.status = "processing"
job.save()
except etcd.EtcdCompareFailed:
# job is already being processed by some tendrl
# agent
Expand Down Expand Up @@ -197,9 +194,8 @@ def process_job(job):
)
the_flow.run()
try:
NS._int.wclient.write(job_status_key,
"finished",
prevValue="processing")
job.status = "finished"
job.save()
except etcd.EtcdCompareFailed:
# This should not happen!
_msg = "Cannot mark job as 'finished', " \
Expand Down Expand Up @@ -254,9 +250,8 @@ def process_job(job):
)

try:
NS._int.wclient.write(job_status_key,
"failed",
prevValue="processing")
job.status = "failed"
job.save()
except etcd.EtcdCompareFailed:
# This should not happen!
_msg = "Cannot mark job as 'failed', current" \
Expand Down
206 changes: 55 additions & 151 deletions tendrl/commons/objects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ def load_definition(self):
raise Exception(msg)

def save(self, update=True, ttl=None):
self.render()
_curr_hash = ''
if "Message" not in self.__class__.__name__:
try:
# Generate current in memory object hash
self.hash = self._hash()
_curr_hash = self._hash()
_hash_key = "/{0}/hash".format(self.value)
_stored_hash = None
try:
Expand All @@ -87,7 +87,7 @@ def save(self, update=True, ttl=None):
if type(ex) != etcd.EtcdKeyNotFound:
NS._int.reconnect()
_stored_hash = NS._int.client.read(_hash_key).value
if self.hash == _stored_hash:
if _curr_hash == _stored_hash:
# No changes in stored object and current object,
# dont save current object to central store
if ttl:
Expand All @@ -97,94 +97,29 @@ def save(self, update=True, ttl=None):
# no hash for this object, save the current hash as is
pass

if update:
current_obj = self.load()
for attr, val in vars(self).iteritems():
if isinstance(val, (types.FunctionType,
types.BuiltinFunctionType,
types.MethodType, types.BuiltinMethodType,
types.UnboundMethodType)) or \
attr.startswith("_") or attr in ['value', 'list']:
continue

if val is None and hasattr(current_obj, attr):
# if self.attr is None, use attr value from central
# store (i.e. current_obj.attr)
if getattr(current_obj, attr):
setattr(self, attr, getattr(current_obj, attr))

self.updated_at = str(time_utils.now())
for item in self.render():
'''
Note: Log messages in this file have try-except
blocks to run
in the condition when the node_agent has not been
started and
name spaces are being created.
'''
try:
Event(
Message(
priority="debug",
publisher=NS.publisher_id,
payload={"message": "Writing %s to %s" %
(item['key'], item['value'])
}
)
)
except KeyError:
sys.stdout.write("Writing %s to %s" % (item['key'],
item['value']))
# convert list, dict (json) to python based on definitions
_type = self._defs.get("attrs", {}).get(item['name'],
{}).get("type")
if _type:
if _type.lower() in ['json', 'list']:
if item['value']:
try:
item['value'] = json.dumps(item['value'])
except ValueError as ex:
_msg = "Error save() attr %s for object %s" % \
(item['name'], self.__name__)
Event(
ExceptionMessage(
priority="debug",
publisher=NS.publisher_id,
payload={"message": _msg,
"exception": ex
}
)
)
try:
NS._int.wclient.write(item['key'], item['value'], quorum=True)
except (etcd.EtcdConnectionFailed, etcd.EtcdException):
NS._int.wreconnect()
NS._int.wclient.write(item['key'], item['value'], quorum=True)
if ttl:
etcd_utils.refresh(self.value, ttl)

def load_all(self):
self.render()
value = '/'.join(self.value.split('/')[:-1])
data_key = self.value + '/data'
updated_at_key = self.value + '/updated_at'
hash_key = self.value + '/hash'
try:
etcd_resp = NS._int.client.read(value)
except (etcd.EtcdConnectionFailed, etcd.EtcdException) as ex:
if type(ex) != etcd.EtcdKeyNotFound:
NS._int.reconnect()
etcd_resp = NS._int.client.read(value)
else:
return None
ins = []
for item in etcd_resp.leaves:
self.value = item.key
ins.append(self.load())
return ins
NS._int.wclient.write(data_key, self.json)
NS._int.wclient.write(updated_at_key, str(time_utils.now()))
NS._int.wclient.write(hash_key, _curr_hash)
except (etcd.EtcdConnectionFailed, etcd.EtcdException):
NS._int.wreconnect()
NS._int.wclient.write(data_key, self.json)
NS._int.wclient.write(updated_at_key, str(time_utils.now()))
NS._int.wclient.write(hash_key, _curr_hash)

if ttl:
etcd_utils.refresh(self.value, ttl)

def load(self):
_curr_hash = ''
if "Message" not in self.__class__.__name__:
try:
# Generate current in memory object hash
self.hash = self._hash()
_curr_hash = self._hash()
_hash_key = "/{0}/hash".format(self.value)
_stored_hash = None
try:
Expand All @@ -193,81 +128,53 @@ def load(self):
if type(ex) != etcd.EtcdKeyNotFound:
NS._int.reconnect()
_stored_hash = NS._int.client.read(_hash_key).value
if self.hash == _stored_hash:
if _curr_hash == _stored_hash:
# No changes in stored object and current object,
# dont save current object to central store
return self
except TypeError:
# no hash for this object, save the current hash as is
pass

_copy = self._copy_vars()
# Check if self.value already set, use it
if self.value.find('{') < 0:
_copy.value = self.value
for item in _copy.render():
try:
Event(
Message(
priority="debug",
publisher=NS.publisher_id,
payload={"message": "Reading %s" % item['key']}
)
)
except KeyError:
sys.stdout.write("Reading %s" % item['key'])

try:
etcd_resp = NS._int.client.read(item['key'], quorum=True)
except (etcd.EtcdConnectionFailed, etcd.EtcdException) as ex:
if type(ex) == etcd.EtcdKeyNotFound:
continue
self.render()
key = self.value + '/data'
try:
val_str = NS._int.client.read(key).value
except etcd.EtcdKeyNotFound:
return self
loc_dict = json.loads(val_str)
for attr_name, attr_val in vars(self).iteritems():
if not attr_name.startswith('_') and attr_name not in ["value", "list"]:
_type = self._defs.get("attrs", {}).get(attr_name, {}).get("type")
if loc_dict.get(attr_name) is None or loc_dict.get(attr_name) == "":
if _type and _type.lower() == 'list':
setattr(self, attr_name, list())
if _type and _type.lower() == 'json':
setattr(self, attr_name, dict())
else:
NS._int.reconnect()
etcd_resp = NS._int.client.read(item['key'], quorum=True)

value = etcd_resp.value
if item['dir']:
key = item['key'].split('/')[-1]
dct = dict(key=value)
if hasattr(_copy, item['name']):
dct = getattr(_copy, item['name'])
if type(dct) == dict:
dct[key] = value
#if _type and _type.lower() in ['json', 'list']:
if _type and _type.lower() in ['list']:
setattr(self, attr_name, json.loads(loc_dict[attr_name]))
else:
setattr(_copy, item['name'], dct)
else:
setattr(_copy, item['name'], dct)
continue
setattr(self, attr_name, loc_dict[attr_name])
return self

# convert list, dict (json) to python based on definitions
_type = self._defs.get("attrs", {}).get(item['name'],
{}).get("type")
if _type:
if _type.lower() in ['json', 'list']:
if value:
try:
value = json.loads(value.decode('utf-8'))
except ValueError as ex:
_msg = "Error load() attr %s for object %s" % \
(item['name'], self.__name__)
Event(
ExceptionMessage(
priority="debug",
publisher=NS.publisher_id,
payload={"message": _msg,
"exception": ex
}
)
)
def load_all(self):
self.render()
value = '/'.join(self.value.split('/')[:-1])
try:
etcd_resp = NS._int.client.read(value)
except (etcd.EtcdConnectionFailed, etcd.EtcdException) as ex:
if type(ex) != etcd.EtcdKeyNotFound:
NS._int.reconnect()
etcd_resp = NS._int.client.read(value)
else:
if _type.lower() == "list":
value = list()
if _type.lower() == "json":
value = dict()

setattr(_copy, item['name'], value)
return _copy
return None
ins = []
for item in etcd_resp.leaves:
self.value = item.key
ins.append(self.load())
return ins

def exists(self):
self.render()
Expand Down Expand Up @@ -336,9 +243,6 @@ def json(self):
return json.dumps(data)

def _hash(self):
self.hash = None
self.updated_at = None

# Above items cant be part of hash
_obj_str = "".join(sorted(self.json))
return hashlib.md5(_obj_str).hexdigest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ def run(self):
new_params['Node[]'] = [node]
# create same flow for each node in node list except
# $this
payload = {"tags": ["tendrl/node_%s" % node],
payload = {"tags": ["tendrl/integration/%s" % integration_id],
"run": "tendrl.flows.ImportCluster",
"status": "new",
"parameters": new_params,
"parent": self.parameters['job_id'],
"type": "node"
Expand Down
2 changes: 1 addition & 1 deletion tendrl/commons/objects/definition/master.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ namespace.tendrl:
attrs:
interface:
help: "network interface name"
type: List
type: String
ipv4:
help: "ipv4 addresses"
type: List
Expand Down
9 changes: 6 additions & 3 deletions tendrl/commons/objects/node_context/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ def __init__(self, node_id=None, fqdn=None,
self.node_id = node_id or self._get_node_id() or self._create_node_id()
self.fqdn = fqdn or socket.getfqdn()

curr_tags = []
curr_tags = ''
try:
curr_tags = NS._int.client.read("/nodes/%s/NodeContext/tags" %
self.node_id).value
nc_data = NS._int.client.read(
"/nodes/%s/NodeContext/data" %
self.node_id
).value
curr_tags = json.loads(nc_data)['tags']
except etcd.EtcdKeyNotFound:
pass

Expand Down

0 comments on commit 51e8736

Please sign in to comment.