From 12973f18560d73d66bc44716b92aa0d213789eb8 Mon Sep 17 00:00:00 2001 From: Cobord Date: Tue, 27 Aug 2024 18:38:43 -0400 Subject: [PATCH 1/4] linting --- schemas/performance_testing.py | 25 ++-- scripts/__init__.py | 1 + scripts/hif.py | 76 +++++++++--- scripts/nx.py | 9 +- tests/conftest.py | 32 +++-- tests/test_files/bad_node_without_id.json | 1 + tests/test_nx.py | 8 +- tests/test_schema.py | 32 ++++- validate_hif.py | 145 ++-------------------- 9 files changed, 154 insertions(+), 175 deletions(-) create mode 100644 tests/test_files/bad_node_without_id.json diff --git a/schemas/performance_testing.py b/schemas/performance_testing.py index 1997568..061cf78 100644 --- a/schemas/performance_testing.py +++ b/schemas/performance_testing.py @@ -1,3 +1,5 @@ +#pylint:disable=import-error,unused-import,missing-function-docstring,unspecified-encoding,invalid-name +#pylint:disable=unsupported-assignment-operation,unsubscriptable-object """ Datasets to use: @@ -18,6 +20,7 @@ import xgi warnings.simplefilter("ignore") +#pylint:disable=consider-using-with sys.stdout = open("performance_testing_output.txt", "a") @@ -26,12 +29,13 @@ def marktime(msg=None): print(temp.strftime("%d/%m/%y %H:%M:%S"), ": ", msg, flush=True) return temp - -schema = json.load(open("hif_schema_v0.1.0.json", "r")) +with open("hif_schema_v0.1.0.json", "r") as f: + schema = json.load(f) validator = fastjsonschema.compile(schema) ### high_school data as dataframes for hnx; -hs = json.load(open(f"../examples/contacts-high-school.json", "r")) +with open("../examples/contacts-high-school.json", "r") as f: + hs = json.load(f) hs_df = pd.DataFrame(hs["hyperedges"]).fillna("") hs_df["edge"] = hs_df.interaction.map(lambda x: x[0]) hs_df["node"] = hs_df.interaction.map(lambda x: x[1]) @@ -42,14 +46,17 @@ def marktime(msg=None): ### HNX constructors +#pylint:disable=unused-argument def hnx_hypergraph(df, nodedf=None, edgedf=None): return hnx.Hypergraph(df, node_properties=nodedf) def hnx_to_hif(hg): edgj = hg.edges.to_dataframe + #pylint:disable=protected-access edid = edgj.index._name or "index" nodj = hg.nodes.to_dataframe + #pylint:disable=protected-access ndid = nodj.index._name or "index" edgj = edgj.reset_index().rename(columns={edid: "edge"}).to_dict(orient="records") nodj = nodj.reset_index().rename(columns={ndid: "node"}).to_dict(orient="records") @@ -58,14 +65,14 @@ def hnx_to_hif(hg): .rename(columns={"nodes": "node", "edges": "edge"}) .to_dict(orient="records") ) - hif = {"edges": edgj, "nodes": nodj, "incidences": incj} - return hif + hif_converted = {"edges": edgj, "nodes": nodj, "incidences": incj} + return hif_converted -def hnx_from_hif(hif): - edges = pd.DataFrame(hif["edges"]) - nodes = pd.DataFrame(hif["nodes"]) - incidences = pd.DataFrame(hif["incidences"]) +def hnx_from_hif(hif_to_convert): + edges = pd.DataFrame(hif_to_convert["edges"]) + nodes = pd.DataFrame(hif_to_convert["nodes"]) + incidences = pd.DataFrame(hif_to_convert["incidences"]) return hnx.Hypergraph(incidences, node_properties=nodes, edge_properties=edges) diff --git a/scripts/__init__.py b/scripts/__init__.py index 66abe04..a3326e2 100644 --- a/scripts/__init__.py +++ b/scripts/__init__.py @@ -1 +1,2 @@ +#pylint:disable=missing-module-docstring from .hif import * diff --git a/scripts/hif.py b/scripts/hif.py index 45f4962..e18cc6a 100644 --- a/scripts/hif.py +++ b/scripts/hif.py @@ -1,9 +1,50 @@ +""" +this script provides a function `validate_hif` +which returns a dictionary specifying whether every part of the HIF specification is followed. +""" + import json from collections import defaultdict +from typing import Dict from warnings import warn +def validate_network_type(data, verbose : bool): + if ( + "network-type" in data + and data["network-type"] == "directed" + and "incidences" in data + ): + for _i, record in enumerate(data["incidences"]): + if "direction" not in record[2]: + status = 1 + if verbose: + print( + "".join(["Each incidence record must have have", + "the 'direction' attribute for directed hypergraphs."]) + ) + + # in the case of simplicial complexes, make sure that the edges are maximal + if "network-type" in data and data["network-type"] == "asc" and "incidences" in data: + edgedict = defaultdict(set) + for record in data["incidences"]: + e = record[0] + n = record[1] + edgedict[e].add(n) + for e1, edge1 in edgedict.items(): + for e2, edge2 in edgedict.items(): + if e1 != e2 and edge1.issubset(edge2): + if verbose: + print( + "Only maximal faces should be stored for simplicial complexes." + ) -def validate_hif(path): +#pylint:disable=too-many-branches,too-many-statements +def validate_hif(path) -> Dict[str,int]: + """ + a dictionary specifying whether every part of the HIF specification is followed + for the file with the given path + """ + #pylint:disable=unspecified-encoding with open(path) as file: # load JSON file data = json.loads(file.read()) @@ -26,7 +67,7 @@ def validate_hif(path): info["incidences-exist"] = 0 if "incidences" not in data: - warn(f"The file must contain an field for incidences.") + warn("The file must contain an field for incidences.") info["incidences-exist"] = 1 # check network type @@ -46,7 +87,7 @@ def validate_hif(path): if "metadata" in data: if not isinstance(data["metadata"], dict): - warn(f"The metadata must be dict-like.") + warn("The metadata must be dict-like.") info["metadata-dict"] = 1 # check node attributes @@ -54,15 +95,16 @@ def validate_hif(path): info["node-attr-dict"] = 0 if "nodes" in data: - for i, record in enumerate(data["nodes"]): + for _i, record in enumerate(data["nodes"]): if len(record) != 2: warn( - f"Each node record must have two entries: an ID and the dictionary of corresponding attributes." + " ".join(["Each node record must have two entries:", + "an ID and the dictionary of corresponding attributes."]) ) info["node-record-length"] = 1 if not isinstance(record[1], dict): - warn(f"The node attributes must be dict-like.") + warn("The node attributes must be dict-like.") info["node-attr-dict"] = 1 # check edge attributes @@ -70,40 +112,44 @@ def validate_hif(path): info["edge-attr-dict"] = 0 if "edges" in data: - for i, record in enumerate(data["edges"]): + for _i, record in enumerate(data["edges"]): if len(record) != 2: warn( - f"Each edge record must have two entries: an ID and the dictionary of corresponding attributes." + " ".join(["Each edge record must have two entries:", + "an ID and the dictionary of corresponding attributes."]) ) info["edge-record-length"] = 1 if not isinstance(record[1], dict): - warn(f"The edge attributes must be dict-like.") + warn("The edge attributes must be dict-like.") info["edge-attr-dict"] = 1 if "incidences" in data: info["incidence-record-length"] = 0 info["incidence-attr-dict"] = 0 - for i, record in enumerate(data["incidences"]): + for _i, record in enumerate(data["incidences"]): if len(record) != 3: warn( - f"Each incidence record must have three entries: an edge ID, a node ID, and the dictionary of corresponding attributes." + " ".join(["Each incidence record must have three entries:", + "an edge ID, a node ID,", + "and the dictionary of corresponding attributes."]) ) info["incidence-record-length"] = 1 if not isinstance(record[2], dict): - warn(f"The incidence attributes must be dict-like.") + warn("The incidence attributes must be dict-like.") info["incidence-attr-dict"] = 1 # in the case of directed hypergraphs, each incidence must # have the "direction" attribute if "network-type" in data and data["network-type"] == "directed": data["direction-exists-for-directed"] = 0 - for i, record in enumerate(data["edges"]): + for _i, record in enumerate(data["edges"]): if "direction" not in record[2]: warn( - f"Each incidence record must have have the 'direction' attribute for directed hypergraphs." + " ".join(["Each incidence record must have have", + "the 'direction' attribute for directed hypergraphs."]) ) data["direction-exists-for-directed"] = 1 @@ -119,7 +165,7 @@ def validate_hif(path): for e2, edge2 in edgedict.items(): if e1 != e2 and edge1.issubset(edge2): warn( - f"Only maximal faces should be stored for simplicial complexes." + "Only maximal faces should be stored for simplicial complexes." ) data["maximal-edges-for-asc"] = 1 diff --git a/scripts/nx.py b/scripts/nx.py index 6d1d1be..f9ee521 100644 --- a/scripts/nx.py +++ b/scripts/nx.py @@ -1,6 +1,13 @@ +""" +convert to a networkx Graph +""" + import networkx as nx def from_hif(data) -> nx.Graph: + """ + convert to a networkx Graph + """ g = nx.Graph() for n in data.get("nodes", []): g.add_node(n["node"], bipartite=0, weight=n.get("weight", 0)) @@ -8,4 +15,4 @@ def from_hif(data) -> nx.Graph: g.add_node(e["edge"], bipartite=1, weight=e.get("weight", 0)) for i in data["incidences"]: g.add_edge(i["node"], i["edge"], weight=i.get("weight", 0)) - return g \ No newline at end of file + return g diff --git a/tests/conftest.py b/tests/conftest.py index 7df4f09..343327b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,48 +1,56 @@ +""" +in order to load each of the files needed for the tests +""" +#pylint:disable = missing-function-docstring,unspecified-encoding import json import pytest -import requests import fastjsonschema -schema = "schemas/hif_schema_v0.1.0.json" -json_dir = "tests/test_files" +SCHEMA = "schemas/hif_schema_v0.1.0.json" +JSON_DIR = "tests/test_files" @pytest.fixture def validator(): - return fastjsonschema.compile(json.load(open(schema))) + return fastjsonschema.compile(json.load(open(SCHEMA))) @pytest.fixture def empty(): - return json.load(open(f"{json_dir}/empty.json", "r")) + return json.load(open(f"{JSON_DIR}/empty.json", "r")) @pytest.fixture def single_node(): - return json.load(open(f"{json_dir}/single_node.json", "r")) + return json.load(open(f"{JSON_DIR}/single_node.json", "r")) @pytest.fixture def single_edge(): - return json.load(open(f"{json_dir}/single_edge.json", "r")) + return json.load(open(f"{JSON_DIR}/single_edge.json", "r")) @pytest.fixture def single_incidence(): - return json.load(open(f"{json_dir}/single_incidence.json", "r")) + return json.load(open(f"{JSON_DIR}/single_incidence.json", "r")) @pytest.fixture def bad_top_level_field(): - return json.load(open(f"{json_dir}/bad_top_level_field.json", "r")) + return json.load(open(f"{JSON_DIR}/bad_top_level_field.json", "r")) @pytest.fixture def bad_network_type(): - return json.load(open(f"{json_dir}/bad_network_type.json", "r")) + return json.load(open(f"{JSON_DIR}/bad_network_type.json", "r")) + + +@pytest.fixture +def bad_node_without_id(): + return json.load(open(f"{JSON_DIR}/bad_node_without_id.json", "r")) @pytest.fixture def metadata_as_list(): - return json.load(open(f"{json_dir}/metadata_as_list.json", "r")) + return json.load(open(f"{JSON_DIR}/metadata_as_list.json", "r")) @pytest.fixture def empty_hypergraph(): - return json.load(open(f"{json_dir}/empty_hypergraph.json", "r")) + return json.load(open(f"{JSON_DIR}/empty_hypergraph.json", "r")) diff --git a/tests/test_files/bad_node_without_id.json b/tests/test_files/bad_node_without_id.json new file mode 100644 index 0000000..05ec473 --- /dev/null +++ b/tests/test_files/bad_node_without_id.json @@ -0,0 +1 @@ +{"incidences": [], "nodes": [ { } ]} \ No newline at end of file diff --git a/tests/test_nx.py b/tests/test_nx.py index 6f38125..1d72249 100644 --- a/tests/test_nx.py +++ b/tests/test_nx.py @@ -1,7 +1,13 @@ +""" +the networkx Graph +produced from the loaded data from the test_files +match the corresponding expected Graph +""" +#pylint:disable = missing-function-docstring + import networkx as nx from scripts.nx import from_hif - def test_empty_hypergraph(empty_hypergraph): result = from_hif(empty_hypergraph) expected = nx.Graph() diff --git a/tests/test_schema.py b/tests/test_schema.py index fb9f219..006c773 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -1,23 +1,47 @@ +""" +each of the fixtures against the validator +all of which are defined in ``conftest.py`` +""" + +#pylint:disable = missing-function-docstring +from fastjsonschema import JsonSchemaValueException import pytest def test_empty(validator, empty): - with pytest.raises(ValueError): + with pytest.raises(JsonSchemaValueException): validator(empty) def test_bad_top_level_field(validator, bad_top_level_field): - with pytest.raises(ValueError): + with pytest.raises(JsonSchemaValueException): validator(bad_top_level_field) def test_bad_network_type(validator, bad_network_type): - with pytest.raises(ValueError): + with pytest.raises(JsonSchemaValueException): validator(bad_network_type) +def test_bad_node_without_id(validator, bad_node_without_id): + with pytest.raises(JsonSchemaValueException): + validator(bad_node_without_id) + + +def test_single_node(validator, single_node): + validator(single_node) + + +def test_single_edge(validator, single_edge): + validator(single_edge) + + +def test_single_incidence(validator, single_incidence): + validator(single_incidence) + + def test_metadata_as_list(validator, metadata_as_list): - with pytest.raises(ValueError): + with pytest.raises(JsonSchemaValueException): validator(metadata_as_list) diff --git a/validate_hif.py b/validate_hif.py index 82fcd9f..4aa0be3 100644 --- a/validate_hif.py +++ b/validate_hif.py @@ -1,149 +1,28 @@ +""" +an executable that checks whether a file follows the HIF standard +""" + +#pylint:disable=unspecified-encoding import json import sys -from collections import defaultdict import fastjsonschema -# 0 - OK, 1 - bad JSON -status = 0 +from scripts.hif import validate_network_type if len(sys.argv) > 2 and sys.argv[2] == "--silent": - verbose = False + VERBOSE = False else: - verbose = True + VERBOSE = True # network parameters -filename = "lesmis-hif.json" -schema_filename = "hif_schema.json" - +filename = sys.argv[1] +SCHEMA_FILENAME = "schemas/hif_schema_v0.1.0.json" -with open(filename) as file, open(schema_filename) as schema_file: +with open(filename,'r') as file, open(SCHEMA_FILENAME,'r') as schema_file: # load JSON file validate_schema = fastjsonschema.compile(json.load(schema_file)) data = json.load(file) validate_schema(data) - -# check that keys do not deviate from the standard field names -# DESCRIPTIONS OF THE FIELDS -# "network-type": a string indicating what type of network the dataset is. -# Valid choices currently include: -# - "undirected" (undirected hypergraph with potential multiedges) -# - "asc" (simplicial complex where only the maximal faces are stored) -# - "directed" (directed hypergraph with potential multiedges) -# "metadata": any dataset-level attributes (e.g., name, author, etc.) which must be dict-like -# "nodes": a list of 2-entries, where the first entry of a record is the node ID -# and the second entry is dict-like and stores the associated attributes. -# "edges": a list of 2-entries, where the first entry of a record is the edge ID -# and the second entry is dict-like and stores the associated attributes. -# "incidences": a list of 3-entries, where the first entry of a record is the edge ID, -# the second entry is the node ID, and the third entry is dict-like and -# stores the associated attributes. -# **Note that this is the only required field. - -fields = {"network-type", "metadata", "nodes", "edges", "incidences"} -if not set(data).issubset(fields): - status = 1 - if verbose: - field_names = ", ".join(fields) - new_field_names = ", ".join(set(data)) - print( - f"Acceptable field names are: {field_names}\nand the field names are {new_field_names}" - ) - -# incidences are required -if "incidences" not in data: - status = 1 - if verbose: - print(f"The file must contain an field for incidences.") - -# check network type -network_types = {"asc", "undirected", "directed"} -if "network-type" in data: - if data["network-type"] not in network_types: - status = 1 - - if verbose: - network_types = ", ".join(network_types) - print(f"Unsupported network type. Valid types are: {network_types}") - -# check network metadata -if "metadata" in data: - if not isinstance(data["metadata"], dict): - status = 1 - if verbose: - print(f"The metadata must be dict-like.") - -# check node attributes -if "nodes" in data: - for i, record in enumerate(data["nodes"]): - if len(record) != 2: - status = 1 - if verbose: - print( - f"Each node record must have two entries: an ID and the dictionary of corresponding attributes." - ) - if not isinstance(record[1], dict): - status = 1 - if verbose: - print(f"The node attributes must be dict-like.") - -# check edge attributes -if "edges" in data: - for i, record in enumerate(data["edges"]): - if len(record) != 2: - status = 1 - if verbose: - print( - f"Each edge record must have two entries: an ID and the dictionary of corresponding attributes." - ) - if not isinstance(record[1], dict): - status = 1 - if verbose: - print(f"The edge attributes must be dict-like.") - -if "incidences" in data: - for i, record in enumerate(data["incidences"]): - if len(record) != 3: - status = 1 - if verbose: - print( - f"Each incidence record must have three entries: an edge ID, a node ID, and the dictionary of corresponding attributes." - ) - if not isinstance(record[2], dict): - status = 1 - if verbose: - print(f"The incidence attributes must be dict-like.") - -# in the case of directed hypergraphs, each incidence must -# have the "direction" attribute -if ( - "network-type" in data - and data["network-type"] == "directed" - and "incidences" in data -): - for i, record in enumerate(data["incidences"]): - if "direction" not in record[2]: - status = 1 - if verbose: - print( - f"Each incidence record must have have the 'direction' attribute for directed hypergraphs." - ) - -# in the case of simplicial complexes, make sure that the edges are maximal -if "network-type" in data and data["network-type"] == "asc" and "incidences" in data: - edgedict = defaultdict(set) - for record in data["incidences"]: - e = record[0] - n = record[1] - edgedict[e].add(n) - for e1, edge1 in edgedict.items(): - for e2, edge2 in edgedict.items(): - if e1 != e2 and edge1.issubset(edge2): - status = 1 - if verbose: - print( - f"Only maximal faces should be stored for simplicial complexes." - ) - -print(f"Exit status {status}.") + validate_network_type(data, VERBOSE) From 908b3b0a41019406e46a42042c48b7dcb3b23fd4 Mon Sep 17 00:00:00 2001 From: Cobord Date: Tue, 27 Aug 2024 18:52:59 -0400 Subject: [PATCH 2/4] info dictionary has known string keys and 0,1 values --- scripts/hif.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/scripts/hif.py b/scripts/hif.py index e18cc6a..87eac5d 100644 --- a/scripts/hif.py +++ b/scripts/hif.py @@ -8,6 +8,7 @@ from typing import Dict from warnings import warn +#pylint:disable=missing-function-docstring def validate_network_type(data, verbose : bool): if ( "network-type" in data @@ -16,7 +17,7 @@ def validate_network_type(data, verbose : bool): ): for _i, record in enumerate(data["incidences"]): if "direction" not in record[2]: - status = 1 + _status = 1 if verbose: print( "".join(["Each incidence record must have have", @@ -38,8 +39,11 @@ def validate_network_type(data, verbose : bool): "Only maximal faces should be stored for simplicial complexes." ) +type SpecificationPart = str +type StatusCode = int + #pylint:disable=too-many-branches,too-many-statements -def validate_hif(path) -> Dict[str,int]: +def validate_hif(path) -> Dict[SpecificationPart,StatusCode]: """ a dictionary specifying whether every part of the HIF specification is followed for the file with the given path @@ -50,7 +54,7 @@ def validate_hif(path) -> Dict[str,int]: data = json.loads(file.read()) # dictionary to store statuses - info = {} + info : Dict[SpecificationPart,StatusCode] = {} # check that keys do not deviate from the standard field names info["valid-field-names"] = 0 @@ -65,14 +69,12 @@ def validate_hif(path) -> Dict[str,int]: # incidences are required; check that they exist info["incidences-exist"] = 0 - if "incidences" not in data: warn("The file must contain an field for incidences.") info["incidences-exist"] = 1 # check network type info["valid-network-type"] = 0 - network_types = {"asc", "undirected", "directed"} if "network-type" in data: if data["network-type"] not in network_types: @@ -84,7 +86,6 @@ def validate_hif(path) -> Dict[str,int]: # check network metadata info["metadata-dict"] = 0 - if "metadata" in data: if not isinstance(data["metadata"], dict): warn("The metadata must be dict-like.") @@ -93,7 +94,6 @@ def validate_hif(path) -> Dict[str,int]: # check node attributes info["node-record-length"] = 0 info["node-attr-dict"] = 0 - if "nodes" in data: for _i, record in enumerate(data["nodes"]): if len(record) != 2: @@ -110,7 +110,6 @@ def validate_hif(path) -> Dict[str,int]: # check edge attributes info["edge-record-length"] = 0 info["edge-attr-dict"] = 0 - if "edges" in data: for _i, record in enumerate(data["edges"]): if len(record) != 2: From 1dca539bff817d74538fa848db35cdb44dbd00c7 Mon Sep 17 00:00:00 2001 From: Cobord Date: Tue, 27 Aug 2024 21:16:11 -0400 Subject: [PATCH 3/4] use typed dict, make the test cases check why they failed to validate --- scripts/hif.py | 116 ++++++++++++++++++++++++++++--------------- tests/test_schema.py | 57 ++++++++++++++++++--- 2 files changed, 126 insertions(+), 47 deletions(-) diff --git a/scripts/hif.py b/scripts/hif.py index 87eac5d..dcd1e4e 100644 --- a/scripts/hif.py +++ b/scripts/hif.py @@ -3,9 +3,11 @@ which returns a dictionary specifying whether every part of the HIF specification is followed. """ +from __future__ import annotations import json from collections import defaultdict -from typing import Dict +from os import PathLike +from typing import List, Literal, Optional, TypeAlias,TypedDict, Union from warnings import warn #pylint:disable=missing-function-docstring @@ -39,61 +41,97 @@ def validate_network_type(data, verbose : bool): "Only maximal faces should be stored for simplicial complexes." ) -type SpecificationPart = str -type StatusCode = int +SpecificationPart : TypeAlias = str +StatusCode : TypeAlias = Union[Literal[0],Literal[1]] -#pylint:disable=too-many-branches,too-many-statements -def validate_hif(path) -> Dict[SpecificationPart,StatusCode]: +class SpecificationMet(TypedDict): + """ + every part of the HIF specification + has a status code + """ + valid_field_names: StatusCode + incidences_exist: StatusCode + validate_network_type: StatusCode + metadata_dict: StatusCode + node_record_length: StatusCode + node_attr_dict: StatusCode + edge_record_length: StatusCode + edge_attr_dict: StatusCode + incidence_record_length: StatusCode + incidence_attr_dict : StatusCode + +def all_good() -> SpecificationMet: + return SpecificationMet({"valid_field_names":0, + "incidences_exist":0, + "validate_network_type":0, + "metadata_dict":0, + "node_record_length":0, + "node_attr_dict":0, + "edge_record_length":0, + "edge_attr_dict":0, + "incidence_record_length":0, + "incidence_attr_dict":0}) + +SPECIFICATION_MET_PARTS = len(all_good()) + +def which_bad(info: SpecificationMet) -> List[str]: + return [k for k, v in info.items() if v != 0] + +#pylint:disable=too-many-branches,too-many-statements,too-many-locals +def validate_hif(path : Union[str,PathLike],*,data: Optional[dict] = None) -> SpecificationMet: """ a dictionary specifying whether every part of the HIF specification is followed for the file with the given path + alternatively, can just provide the loaded data directly and the path will be ignored """ + #pylint:disable=unspecified-encoding - with open(path) as file: - # load JSON file - data = json.loads(file.read()) + if data is None: + with open(path) as file: + # load JSON file + data = json.loads(file.read()) # dictionary to store statuses - info : Dict[SpecificationPart,StatusCode] = {} + info_class = all_good() # check that keys do not deviate from the standard field names - info["valid-field-names"] = 0 + info_class["valid_field_names"] = 0 fields = {"network-type", "metadata", "nodes", "edges", "incidences"} if not set(data).issubset(fields): - fields = ", ".join(fields) - data = ", ".join(set(data)) + fields_warn = ", ".join(fields) + data_warn = ", ".join(set(data)) warn( - f"Acceptable field names are: {fields}\nand the field names are {data}" + f"Acceptable field names are: {fields_warn}\nand the field names are {data_warn}" ) - info["valid-field-names"] = 1 + info_class["valid_field_names"] = 1 # incidences are required; check that they exist - info["incidences-exist"] = 0 + info_class["incidences_exist"] = 0 if "incidences" not in data: warn("The file must contain an field for incidences.") - info["incidences-exist"] = 1 + info_class["incidences_exist"] = 1 # check network type - info["valid-network-type"] = 0 + info_class["validate_network_type"] = 0 network_types = {"asc", "undirected", "directed"} if "network-type" in data: if data["network-type"] not in network_types: - network_types = ", ".join(network_types) + network_types_warn = ", ".join(network_types) warn( - f"Unsupported network type. Valid types are: {network_types}" + f"Unsupported network type. Valid types are: {network_types_warn}" ) - info["valid-network-type"] = 1 + info_class["validate_network_type"] = 1 # check network metadata - info["metadata-dict"] = 0 + info_class["metadata_dict"] = 0 if "metadata" in data: if not isinstance(data["metadata"], dict): warn("The metadata must be dict-like.") - info["metadata-dict"] = 1 + info_class["metadata_dict"] = 1 # check node attributes - info["node-record-length"] = 0 - info["node-attr-dict"] = 0 + info_class["node_record_length"] = 0 + info_class["node_attr_dict"] = 0 if "nodes" in data: for _i, record in enumerate(data["nodes"]): if len(record) != 2: @@ -101,15 +139,15 @@ def validate_hif(path) -> Dict[SpecificationPart,StatusCode]: " ".join(["Each node record must have two entries:", "an ID and the dictionary of corresponding attributes."]) ) - info["node-record-length"] = 1 + info_class["node_record_length"] = 1 - if not isinstance(record[1], dict): + if len(record)>1 and not isinstance(record[1], dict): warn("The node attributes must be dict-like.") - info["node-attr-dict"] = 1 + info_class["node_attr_dict"] = 1 # check edge attributes - info["edge-record-length"] = 0 - info["edge-attr-dict"] = 0 + info_class["edge_record_length"] = 0 + info_class["edge_attr_dict"] = 0 if "edges" in data: for _i, record in enumerate(data["edges"]): if len(record) != 2: @@ -117,15 +155,15 @@ def validate_hif(path) -> Dict[SpecificationPart,StatusCode]: " ".join(["Each edge record must have two entries:", "an ID and the dictionary of corresponding attributes."]) ) - info["edge-record-length"] = 1 + info_class["edge_record_length"] = 1 - if not isinstance(record[1], dict): + if len(record) > 1 and not isinstance(record[1], dict): warn("The edge attributes must be dict-like.") - info["edge-attr-dict"] = 1 + info_class["edge_attr_dict"] = 1 if "incidences" in data: - info["incidence-record-length"] = 0 - info["incidence-attr-dict"] = 0 + info_class["incidence_record_length"] = 0 + info_class["incidence_attr_dict"] = 0 for _i, record in enumerate(data["incidences"]): if len(record) != 3: @@ -134,18 +172,18 @@ def validate_hif(path) -> Dict[SpecificationPart,StatusCode]: "an edge ID, a node ID,", "and the dictionary of corresponding attributes."]) ) - info["incidence-record-length"] = 1 + info_class["incidence_record_length"] = 1 - if not isinstance(record[2], dict): + if len(record)>2 and not isinstance(record[2], dict): warn("The incidence attributes must be dict-like.") - info["incidence-attr-dict"] = 1 + info_class["incidence_attr_dict"] = 1 # in the case of directed hypergraphs, each incidence must # have the "direction" attribute if "network-type" in data and data["network-type"] == "directed": data["direction-exists-for-directed"] = 0 for _i, record in enumerate(data["edges"]): - if "direction" not in record[2]: + if len(record)<2 or "direction" not in record[2]: warn( " ".join(["Each incidence record must have have", "the 'direction' attribute for directed hypergraphs."]) @@ -168,4 +206,4 @@ def validate_hif(path) -> Dict[SpecificationPart,StatusCode]: ) data["maximal-edges-for-asc"] = 1 - return info + return info_class diff --git a/tests/test_schema.py b/tests/test_schema.py index 006c773..a0ae708 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -4,46 +4,87 @@ """ #pylint:disable = missing-function-docstring +import warnings from fastjsonschema import JsonSchemaValueException import pytest +from scripts.hif import SPECIFICATION_MET_PARTS, validate_hif, validate_network_type, which_bad def test_empty(validator, empty): with pytest.raises(JsonSchemaValueException): validator(empty) - + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + full_info = validate_hif("",data=empty) + assert len(full_info) == SPECIFICATION_MET_PARTS + assert which_bad(full_info) == ["incidences_exist"] def test_bad_top_level_field(validator, bad_top_level_field): with pytest.raises(JsonSchemaValueException): validator(bad_top_level_field) - + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + full_info = validate_hif("",data=bad_top_level_field) + assert len(full_info) == SPECIFICATION_MET_PARTS + assert which_bad(full_info) == ["valid_field_names"] def test_bad_network_type(validator, bad_network_type): with pytest.raises(JsonSchemaValueException): validator(bad_network_type) - + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + full_info = validate_hif("",data=bad_network_type) + assert len(full_info) == SPECIFICATION_MET_PARTS + assert which_bad(full_info) == ["validate_network_type"] def test_bad_node_without_id(validator, bad_node_without_id): with pytest.raises(JsonSchemaValueException): validator(bad_node_without_id) - + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + full_info = validate_hif("",data=bad_node_without_id) + assert len(full_info) == SPECIFICATION_MET_PARTS + assert which_bad(full_info) == ["node_record_length"] def test_single_node(validator, single_node): validator(single_node) - + validate_network_type(single_node,False) + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + full_info = validate_hif("",data=single_node) + assert len(full_info) == SPECIFICATION_MET_PARTS + assert which_bad(full_info) == ["node_record_length"] def test_single_edge(validator, single_edge): validator(single_edge) - + validate_network_type(single_edge,False) + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + full_info = validate_hif("",data=single_edge) + assert len(full_info) == SPECIFICATION_MET_PARTS + assert which_bad(full_info) == ["edge_record_length"] def test_single_incidence(validator, single_incidence): validator(single_incidence) - + validate_network_type(single_incidence,False) + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + full_info = validate_hif("",data=single_incidence) + assert len(full_info) == SPECIFICATION_MET_PARTS + assert which_bad(full_info) == ["incidence_record_length"] def test_metadata_as_list(validator, metadata_as_list): with pytest.raises(JsonSchemaValueException): validator(metadata_as_list) - + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + full_info = validate_hif("",data=metadata_as_list) + assert len(full_info) == SPECIFICATION_MET_PARTS + assert which_bad(full_info) == ["metadata_dict"] def test_empty_hypergraph(validator, empty_hypergraph): validator(empty_hypergraph) + validate_network_type(empty_hypergraph,False) + full_info = validate_hif("",data=empty_hypergraph) + assert len(full_info) == SPECIFICATION_MET_PARTS + assert len(which_bad(full_info)) == 0 From 6f173d7d56921761161fedf921e17e717cf651df Mon Sep 17 00:00:00 2001 From: Cobord Date: Tue, 27 Aug 2024 23:33:08 -0400 Subject: [PATCH 4/4] directed incidence --- .gitignore | 5 ++++- scripts/nx.py | 19 +++++++++++++++---- tests/conftest.py | 4 ++++ tests/test_files/directed_incidence.json | 1 + tests/test_nx.py | 6 ++++++ tests/test_schema.py | 3 +++ 6 files changed, 33 insertions(+), 5 deletions(-) create mode 100644 tests/test_files/directed_incidence.json diff --git a/.gitignore b/.gitignore index 2d96d78..0e773a3 100644 --- a/.gitignore +++ b/.gitignore @@ -161,4 +161,7 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ -test.* \ No newline at end of file +test.* + +# cleanup script +linting_cleanup.sh \ No newline at end of file diff --git a/scripts/nx.py b/scripts/nx.py index f9ee521..28c32f8 100644 --- a/scripts/nx.py +++ b/scripts/nx.py @@ -1,18 +1,29 @@ """ convert to a networkx Graph """ - +from typing import Union import networkx as nx -def from_hif(data) -> nx.Graph: +def from_hif(data) -> Union[nx.Graph,nx.DiGraph]: """ convert to a networkx Graph """ - g = nx.Graph() + is_directed = data.get("network-type","undirected") == "directed" + if is_directed: + g = nx.DiGraph() + else: + g = nx.Graph() for n in data.get("nodes", []): g.add_node(n["node"], bipartite=0, weight=n.get("weight", 0)) for e in data.get("edges", []): g.add_node(e["edge"], bipartite=1, weight=e.get("weight", 0)) for i in data["incidences"]: - g.add_edge(i["node"], i["edge"], weight=i.get("weight", 0)) + if is_directed: + # TODO the default is ambiguous and requires discussion + if i.get("direction","head") == "head": + g.add_edge(i["edge"],i["node"], weight=i.get("weight", 0)) + else: + g.add_edge(i["node"], i["edge"], weight=i.get("weight", 0)) + else: + g.add_edge(i["node"], i["edge"], weight=i.get("weight", 0)) return g diff --git a/tests/conftest.py b/tests/conftest.py index 343327b..576ca83 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -31,6 +31,10 @@ def single_edge(): def single_incidence(): return json.load(open(f"{JSON_DIR}/single_incidence.json", "r")) +@pytest.fixture +def directed_incidence(): + return json.load(open(f"{JSON_DIR}/directed_incidence.json", "r")) + @pytest.fixture def bad_top_level_field(): return json.load(open(f"{JSON_DIR}/bad_top_level_field.json", "r")) diff --git a/tests/test_files/directed_incidence.json b/tests/test_files/directed_incidence.json new file mode 100644 index 0000000..1c551e4 --- /dev/null +++ b/tests/test_files/directed_incidence.json @@ -0,0 +1 @@ +{"network-type": "directed","incidences": [ { "edge": "abcd", "node": 42, "direction": "head"} ]} \ No newline at end of file diff --git a/tests/test_nx.py b/tests/test_nx.py index 1d72249..d8d047d 100644 --- a/tests/test_nx.py +++ b/tests/test_nx.py @@ -30,3 +30,9 @@ def test_single_incidence(single_incidence): expected = nx.Graph() expected.add_edge("abcd", 42, weight=0) assert nx.utils.graphs_equal(result, expected) + +def test_directed_incidence(directed_incidence): + result = from_hif(directed_incidence) + expected = nx.DiGraph() + expected.add_edge("abcd", 42, weight=0) + assert nx.utils.graphs_equal(result, expected) diff --git a/tests/test_schema.py b/tests/test_schema.py index a0ae708..d333677 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -73,6 +73,9 @@ def test_single_incidence(validator, single_incidence): assert len(full_info) == SPECIFICATION_MET_PARTS assert which_bad(full_info) == ["incidence_record_length"] +def test_directed_incidence(validator, directed_incidence): + validator(directed_incidence) + def test_metadata_as_list(validator, metadata_as_list): with pytest.raises(JsonSchemaValueException): validator(metadata_as_list)