Skip to content

Commit

Permalink
Verifying YAML document nodes when parsing used mappings and sequences.
Browse files Browse the repository at this point in the history
  • Loading branch information
Paebbels committed Jun 3, 2024
1 parent 3cb215b commit 17ff2ea
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 2,691 deletions.
117 changes: 92 additions & 25 deletions pyEDAA/Reports/OSVVM/AlertLog.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,22 @@
from enum import Enum, auto
from pathlib import Path
from time import perf_counter_ns
from typing import Optional as Nullable, Dict, Iterator, Iterable
from types import NoneType
from typing import Optional as Nullable, Dict, Iterator, Iterable

from ruamel.yaml import YAML, CommentedMap
from ruamel.yaml import YAML, CommentedSeq, CommentedMap
from pyTooling.Decorators import readonly, export
from pyTooling.MetaClasses import ExtendedType
from pyTooling.Tree import Node

from pyEDAA.Reports.OSVVM import OSVVMException


@export
class AlertLogException(OSVVMException):
pass


@export
class AlertLogStatus(Enum):
Unknown = auto()
Expand All @@ -59,7 +65,7 @@ def Parse(self, name: str) -> "AlertLogStatus":
try:
return self.__MAPPINGS__[name.lower()]
except KeyError as ex:
raise OSVVMException(f"Unknown AlertLog status '{name}'.") from ex
raise AlertLogException(f"Unknown AlertLog status '{name}'.") from ex

def __bool__(self) -> bool:
return self is self.Passed
Expand Down Expand Up @@ -141,10 +147,6 @@ def Name(self) -> str:
def Status(self) -> AlertLogStatus:
return self._status

# @readonly
# def Affirmations(self) -> int:
# return self._afirmations

@readonly
def TotalErrors(self) -> int:
return self._totalErrors
Expand Down Expand Up @@ -260,6 +262,69 @@ def Read(self) -> None:
endAnalysis = perf_counter_ns()
self._analysisDuration = (endAnalysis - startAnalysis) / 1e9

@staticmethod
def _ParseSequenceFromYAML(node: CommentedMap, fieldName: str) -> Nullable[CommentedSeq]:
try:
value = node[fieldName]
except KeyError as ex:
newEx = OSVVMException(f"Sequence field '{fieldName}' not found in node starting at line {node.lc.line + 1}.")
newEx.add_note(f"Available fields: {', '.join(key for key in node)}")
raise newEx from ex

if isinstance(value, NoneType):
return ()
elif not isinstance(value, CommentedSeq):
ex = AlertLogException(f"Field '{fieldName}' is not a sequence.") # TODO: from TypeError??
ex.add_note(f"Found type {value.__class__.__name__} at line {node._yaml_line_col.data[fieldName][0] + 1}.")
raise ex

return value

@staticmethod
def _ParseMapFromYAML(node: CommentedMap, fieldName: str) -> Nullable[CommentedMap]:
try:
value = node[fieldName]
except KeyError as ex:
newEx = OSVVMException(f"Dictionary field '{fieldName}' not found in node starting at line {node.lc.line + 1}.")
newEx.add_note(f"Available fields: {', '.join(key for key in node)}")
raise newEx from ex

if isinstance(value, NoneType):
return {}
elif not isinstance(value, CommentedMap):
ex = AlertLogException(f"Field '{fieldName}' is not a list.") # TODO: from TypeError??
ex.add_note(f"Type mismatch found for line {node._yaml_line_col.data[fieldName][0] + 1}.")
raise ex
return value

@staticmethod
def _ParseStrFieldFromYAML(node: CommentedMap, fieldName: str) -> Nullable[str]:
try:
value = node[fieldName]
except KeyError as ex:
newEx = OSVVMException(f"String field '{fieldName}' not found in node starting at line {node.lc.line + 1}.")
newEx.add_note(f"Available fields: {', '.join(key for key in node)}")
raise newEx from ex

if not isinstance(value, str):
raise AlertLogException(f"Field '{fieldName}' is not of type str.") # TODO: from TypeError??

return value

@staticmethod
def _ParseIntFieldFromYAML(node: CommentedMap, fieldName: str) -> Nullable[int]:
try:
value = node[fieldName]
except KeyError as ex:
newEx = OSVVMException(f"Integer field '{fieldName}' not found in node starting at line {node.lc.line + 1}.")
newEx.add_note(f"Available fields: {', '.join(key for key in node)}")
raise newEx from ex

if not isinstance(value, int):
raise AlertLogException(f"Field '{fieldName}' is not of type int.") # TODO: from TypeError??

return value

def Parse(self) -> None:
if self._yamlDocument is None:
ex = OSVVMException(f"OSVVM AlertLog YAML file '{self._path}' needs to be read and analyzed by a YAML parser.")
Expand All @@ -268,9 +333,9 @@ def Parse(self) -> None:

startConversion = perf_counter_ns()

self._name = self._yamlDocument["Name"]
self._status = AlertLogStatus.Parse(self._yamlDocument["Status"])
for child in self._yamlDocument["Children"]:
self._name = self._ParseStrFieldFromYAML(self._yamlDocument, "Name")
self._status = AlertLogStatus.Parse(self._ParseStrFieldFromYAML(self._yamlDocument, "Status"))
for child in self._ParseSequenceFromYAML(self._yamlDocument, "Children"):
alertLogGroup = self._ParseAlertLogGroup(child)
self._children[alertLogGroup._name] = alertLogGroup
alertLogGroup._parent = self
Expand All @@ -279,22 +344,24 @@ def Parse(self) -> None:
self._modelConversion = (endConversation - startConversion) / 1e9

def _ParseAlertLogGroup(self, child: CommentedMap) -> AlertLogGroup:
results = child["Results"]
results = self._ParseMapFromYAML(child, "Results")
yamlAlertCount = self._ParseMapFromYAML(results, "AlertCount")
yamlDisabledAlertCount = self._ParseMapFromYAML(results, "DisabledAlertCount")
alertLogGroup = AlertLogGroup(
child["Name"],
AlertLogStatus.Parse(child["Status"]),
results["TotalErrors"],
results["AlertCount"]["Warning"],
results["AlertCount"]["Error"],
results["AlertCount"]["Failure"],
results["PassedCount"],
results["AffirmCount"],
results["RequirementsPassed"],
results["RequirementsGoal"],
results["DisabledAlertCount"]["Warning"],
results["DisabledAlertCount"]["Error"],
results["DisabledAlertCount"]["Failure"],
children=(self._ParseAlertLogGroup(ch) for ch in child["Children"])
self._ParseStrFieldFromYAML(child, "Name"),
AlertLogStatus.Parse(self._ParseStrFieldFromYAML(child, "Status")),
self._ParseIntFieldFromYAML(results, "TotalErrors"),
self._ParseIntFieldFromYAML(yamlAlertCount, "Warning"),
self._ParseIntFieldFromYAML(yamlAlertCount, "Error"),
self._ParseIntFieldFromYAML(yamlAlertCount, "Failure"),
self._ParseIntFieldFromYAML(results, "PassedCount"),
self._ParseIntFieldFromYAML(results, "AffirmCount"),
self._ParseIntFieldFromYAML(results, "RequirementsPassed"),
self._ParseIntFieldFromYAML(results, "RequirementsGoal"),
self._ParseIntFieldFromYAML(yamlDisabledAlertCount, "Warning"),
self._ParseIntFieldFromYAML(yamlDisabledAlertCount, "Error"),
self._ParseIntFieldFromYAML(yamlDisabledAlertCount, "Failure"),
children=(self._ParseAlertLogGroup(ch) for ch in self._ParseSequenceFromYAML(child, "Children"))
)

return alertLogGroup
151 changes: 123 additions & 28 deletions pyEDAA/Reports/Unittesting/OSVVM.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
from datetime import timedelta, datetime
from pathlib import Path
from time import perf_counter_ns
from types import NoneType
from typing import Optional as Nullable

from ruamel.yaml import YAML
from ruamel.yaml import YAML, CommentedMap, CommentedSeq
from pyTooling.Decorators import export, notimplemented
from ruamel.yaml.timestamp import TimeStamp

from pyEDAA.Reports.Unittesting import UnittestException, Document, TestcaseStatus
from pyEDAA.Reports.Unittesting import TestsuiteSummary as ut_TestsuiteSummary, Testsuite as ut_Testsuite
Expand Down Expand Up @@ -69,7 +69,7 @@ class TestsuiteSummary(ut_TestsuiteSummary):


@export
class OsvvmYamlDocument(TestsuiteSummary, Document):
class BuildSummaryDocument(TestsuiteSummary, Document):
_yamlDocument: Nullable[YAML]

def __init__(self, yamlReportFile: Path, parse: bool = False) -> None:
Expand Down Expand Up @@ -111,12 +111,105 @@ def Write(self, path: Nullable[Path] = None, overwrite: bool = False) -> None:

if self._yamlDocument is None:
ex = UnittestException(f"Internal YAML document tree is empty and needs to be generated before write is possible.")
ex.add_note(f"Call 'OsvvmYamlDocument.Generate()' or 'OsvvmYamlDocument.Write(..., regenerate=True)'.")
ex.add_note(f"Call 'BuildSummaryDocument.Generate()' or 'BuildSummaryDocument.Write(..., regenerate=True)'.")
raise ex

# with path.open("w", encoding="utf-8") as file:
# self._yamlDocument.writexml(file, addindent="\t", encoding="utf-8", newl="\n")

@staticmethod
def _ParseSequenceFromYAML(node: CommentedMap, fieldName: str) -> Nullable[CommentedSeq]:
try:
value = node[fieldName]
except KeyError as ex:
newEx = UnittestException(f"Sequence field '{fieldName}' not found in node starting at line {node.lc.line + 1}.")
newEx.add_note(f"Available fields: {', '.join(key for key in node)}")
raise newEx from ex

if isinstance(value, NoneType):
return ()
elif not isinstance(value, CommentedSeq):
line = node._yaml_line_col.data[fieldName][0] + 1
ex = UnittestException(f"Field '{fieldName}' is not a sequence.") # TODO: from TypeError??
ex.add_note(f"Found type {value.__class__.__name__} at line {line}.")
raise ex

return value

@staticmethod
def _ParseMapFromYAML(node: CommentedMap, fieldName: str) -> Nullable[CommentedMap]:
try:
value = node[fieldName]
except KeyError as ex:
newEx = UnittestException(f"Dictionary field '{fieldName}' not found in node starting at line {node.lc.line + 1}.")
newEx.add_note(f"Available fields: {', '.join(key for key in node)}")
raise newEx from ex

if isinstance(value, NoneType):
return {}
elif not isinstance(value, CommentedMap):
line = node._yaml_line_col.data[fieldName][0] + 1
ex = UnittestException(f"Field '{fieldName}' is not a list.") # TODO: from TypeError??
ex.add_note(f"Type mismatch found for line {line}.")
raise ex
return value

@staticmethod
def _ParseStrFieldFromYAML(node: CommentedMap, fieldName: str) -> Nullable[str]:
try:
value = node[fieldName]
except KeyError as ex:
newEx = UnittestException(f"String field '{fieldName}' not found in node starting at line {node.lc.line + 1}.")
newEx.add_note(f"Available fields: {', '.join(key for key in node)}")
raise newEx from ex

if not isinstance(value, str):
raise UnittestException(f"Field '{fieldName}' is not of type str.") # TODO: from TypeError??

return value

@staticmethod
def _ParseIntFieldFromYAML(node: CommentedMap, fieldName: str) -> Nullable[int]:
try:
value = node[fieldName]
except KeyError as ex:
newEx = UnittestException(f"Integer field '{fieldName}' not found in node starting at line {node.lc.line + 1}.")
newEx.add_note(f"Available fields: {', '.join(key for key in node)}")
raise newEx from ex

if not isinstance(value, int):
raise UnittestException(f"Field '{fieldName}' is not of type int.") # TODO: from TypeError??

return value

@staticmethod
def _ParseDateFieldFromYAML(node: CommentedMap, fieldName: str) -> Nullable[datetime]:
try:
value = node[fieldName]
except KeyError as ex:
newEx = UnittestException(f"Date field '{fieldName}' not found in node starting at line {node.lc.line + 1}.")
newEx.add_note(f"Available fields: {', '.join(key for key in node)}")
raise newEx from ex

if not isinstance(value, datetime):
raise UnittestException(f"Field '{fieldName}' is not of type datetime.") # TODO: from TypeError??

return value

@staticmethod
def _ParseDurationFieldFromYAML(node: CommentedMap, fieldName: str) -> Nullable[timedelta]:
try:
value = node[fieldName]
except KeyError as ex:
newEx = UnittestException(f"Duration field '{fieldName}' not found in node starting at line {node.lc.line + 1}.")
newEx.add_note(f"Available fields: {', '.join(key for key in node)}")
raise newEx from ex

if not isinstance(value, float):
raise UnittestException(f"Field '{fieldName}' is not of type float.") # TODO: from TypeError??

return timedelta(seconds=value)

def Parse(self) -> None:
if self._yamlDocument is None:
ex = UnittestException(f"OSVVM YAML file '{self._path}' needs to be read and analyzed by a YAML parser.")
Expand All @@ -125,42 +218,44 @@ def Parse(self) -> None:

startConversion = perf_counter_ns()
# self._name = self._yamlDocument["name"]
buildInfo = self._yamlDocument["BuildInfo"]
self._startTime = buildInfo["StartTime"]
self._totalDuration = timedelta(seconds=buildInfo["Elapsed"])
buildInfo = self._ParseMapFromYAML(self._yamlDocument, "BuildInfo")
self._startTime = self._ParseDateFieldFromYAML(buildInfo, "StartTime")
self._totalDuration = self._ParseDurationFieldFromYAML(buildInfo, "Elapsed")

for yamlTestsuite in self._yamlDocument['TestSuites']:
self._ParseTestsuite(self, yamlTestsuite)
if "TestSuites" in self._yamlDocument:
for yamlTestsuite in self._ParseSequenceFromYAML(self._yamlDocument, "TestSuites"):
self._ParseTestsuite(self, yamlTestsuite)

self.Aggregate()
endConversation = perf_counter_ns()
self._modelConversion = (endConversation - startConversion) / 1e9

def _ParseTestsuite(self, parentTestsuite: Testsuite, yamlTestsuite) -> None:
testsuiteName = yamlTestsuite["Name"]
totalDuration = timedelta(seconds=yamlTestsuite["ElapsedTime"])
def _ParseTestsuite(self, parentTestsuite: Testsuite, yamlTestsuite: CommentedMap) -> None:
testsuiteName = self._ParseStrFieldFromYAML(yamlTestsuite, "Name")
totalDuration = self._ParseDurationFieldFromYAML(yamlTestsuite, "ElapsedTime")

testsuite = Testsuite(
testsuiteName,
totalDuration=totalDuration,
parent=parentTestsuite
)

if yamlTestsuite['TestCases'] is not None:
for yamlTestcase in yamlTestsuite['TestCases']:
self._ParseTestcase(testsuite, yamlTestcase)

def _ParseTestcase(self, parentTestsuite: Testsuite, yamlTestcase) -> None:
testcaseName = yamlTestcase["TestCaseName"]
totalDuration = timedelta(seconds=yamlTestcase["ElapsedTime"])
yamlStatus = yamlTestcase["Status"].lower()
yamlResults = yamlTestcase["Results"]
assertionCount = yamlResults["AffirmCount"]
passedAssertionCount = yamlResults["PassedCount"]
totalErrors = yamlResults["TotalErrors"]
warningCount = yamlResults["AlertCount"]["Warning"]
errorCount = yamlResults["AlertCount"]["Error"]
fatalCount = yamlResults["AlertCount"]["Failure"]
# if yamlTestsuite['TestCases'] is not None:
for yamlTestcase in self._ParseSequenceFromYAML(yamlTestsuite, 'TestCases'):
self._ParseTestcase(testsuite, yamlTestcase)

def _ParseTestcase(self, parentTestsuite: Testsuite, yamlTestcase: CommentedMap) -> None:
testcaseName = self._ParseStrFieldFromYAML(yamlTestcase, "TestCaseName")
totalDuration = self._ParseDurationFieldFromYAML(yamlTestcase, "ElapsedTime")
yamlStatus = self._ParseStrFieldFromYAML(yamlTestcase, "Status").lower()
yamlResults = self._ParseMapFromYAML(yamlTestcase, "Results")
assertionCount = self._ParseIntFieldFromYAML(yamlResults, "AffirmCount")
passedAssertionCount = self._ParseIntFieldFromYAML(yamlResults, "PassedCount")
totalErrors = self._ParseIntFieldFromYAML(yamlResults, "TotalErrors")
yamlAlertCount = self._ParseMapFromYAML(yamlResults, "AlertCount")
warningCount = self._ParseIntFieldFromYAML(yamlAlertCount, "Warning")
errorCount = self._ParseIntFieldFromYAML(yamlAlertCount, "Error")
fatalCount = self._ParseIntFieldFromYAML(yamlAlertCount, "Failure")

# FIXME: write a Parse classmethod in enum
if yamlStatus == "passed":
Expand All @@ -182,7 +277,7 @@ def _ParseTestcase(self, parentTestsuite: Testsuite, yamlTestcase) -> None:
else:
status |= TestcaseStatus.Inconsistent

testcase = Testcase(
_ = Testcase(
testcaseName,
totalDuration=totalDuration,
assertionCount=assertionCount,
Expand Down
2 changes: 1 addition & 1 deletion tests/data/OSVVM/OSVVMLibraries_RunAllTests.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Version: 2024.05
Date: 2024-06-01T07:56:17+02:00
TestSuites:
TestSuites:
- Name: StreamTransactionPkg
TestCases:
- TestCaseName: "TbStream_SendGet1"
Expand Down
Loading

0 comments on commit 17ff2ea

Please sign in to comment.