Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

annotations #14

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,7 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

test.*
test.*

# cleanup script
linting_cleanup.sh
25 changes: 16 additions & 9 deletions schemas/performance_testing.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pylint:disable=import-error,unused-import,missing-function-docstring,unspecified-encoding,invalid-name
#pylint:disable=unsupported-assignment-operation,unsubscriptable-object
"""
Datasets to use:

Expand All @@ -18,6 +20,7 @@
import xgi

warnings.simplefilter("ignore")
#pylint:disable=consider-using-with
sys.stdout = open("performance_testing_output.txt", "a")


Expand All @@ -26,12 +29,13 @@ def marktime(msg=None):
print(temp.strftime("%d/%m/%y %H:%M:%S"), ": ", msg, flush=True)
return temp


schema = json.load(open("hif_schema_v0.1.0.json", "r"))
with open("hif_schema_v0.1.0.json", "r") as f:
schema = json.load(f)
validator = fastjsonschema.compile(schema)

### high_school data as dataframes for hnx;
hs = json.load(open(f"../examples/contacts-high-school.json", "r"))
with open("../examples/contacts-high-school.json", "r") as f:
hs = json.load(f)
hs_df = pd.DataFrame(hs["hyperedges"]).fillna("")
hs_df["edge"] = hs_df.interaction.map(lambda x: x[0])
hs_df["node"] = hs_df.interaction.map(lambda x: x[1])
Expand All @@ -42,14 +46,17 @@ def marktime(msg=None):


### HNX constructors
#pylint:disable=unused-argument
def hnx_hypergraph(df, nodedf=None, edgedf=None):
return hnx.Hypergraph(df, node_properties=nodedf)


def hnx_to_hif(hg):
edgj = hg.edges.to_dataframe
#pylint:disable=protected-access
edid = edgj.index._name or "index"
nodj = hg.nodes.to_dataframe
#pylint:disable=protected-access
ndid = nodj.index._name or "index"
edgj = edgj.reset_index().rename(columns={edid: "edge"}).to_dict(orient="records")
nodj = nodj.reset_index().rename(columns={ndid: "node"}).to_dict(orient="records")
Expand All @@ -58,14 +65,14 @@ def hnx_to_hif(hg):
.rename(columns={"nodes": "node", "edges": "edge"})
.to_dict(orient="records")
)
hif = {"edges": edgj, "nodes": nodj, "incidences": incj}
return hif
hif_converted = {"edges": edgj, "nodes": nodj, "incidences": incj}
return hif_converted


def hnx_from_hif(hif):
edges = pd.DataFrame(hif["edges"])
nodes = pd.DataFrame(hif["nodes"])
incidences = pd.DataFrame(hif["incidences"])
def hnx_from_hif(hif_to_convert):
edges = pd.DataFrame(hif_to_convert["edges"])
nodes = pd.DataFrame(hif_to_convert["nodes"])
incidences = pd.DataFrame(hif_to_convert["incidences"])
return hnx.Hypergraph(incidences, node_properties=nodes, edge_properties=edges)


Expand Down
1 change: 1 addition & 0 deletions scripts/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
#pylint:disable=missing-module-docstring
from .hif import *
176 changes: 171 additions & 5 deletions scripts/hif.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,172 @@
"""
this script provides a function `validate_hif`
which returns a dictionary specifying whether every part of the HIF specification is followed.
"""

from __future__ import annotations
import json
from collections import defaultdict
from os import PathLike
from typing import List, Literal, Optional, TypeAlias,TypedDict, Union
from warnings import warn

SpecificationPart : TypeAlias = str
StatusCode : TypeAlias = Union[Literal[0],Literal[1]]

class SpecificationMet(TypedDict):
"""
every part of the HIF specification
has a status code
"""
valid_field_names: StatusCode
incidences_exist: StatusCode
validate_network_type: StatusCode
metadata_dict: StatusCode
node_record_length: StatusCode
node_attr_dict: StatusCode
edge_record_length: StatusCode
edge_attr_dict: StatusCode
incidence_record_length: StatusCode
incidence_attr_dict : StatusCode

def all_good() -> SpecificationMet:
"""
all the good status codes
"""
return SpecificationMet({"valid_field_names":0,
"incidences_exist":0,
"validate_network_type":0,
"metadata_dict":0,
"node_record_length":0,
"node_attr_dict":0,
"edge_record_length":0,
"edge_attr_dict":0,
"incidence_record_length":0,
"incidence_attr_dict":0})

SPECIFICATION_MET_PARTS = len(all_good())

def which_bad(info: SpecificationMet) -> List[str]:
"""
which part of the specification have bad status codes
"""
return [k for k, v in info.items() if v != 0]

#pylint:disable=too-many-branches,too-many-statements,too-many-locals
def validate_hif(path : Union[str,PathLike],*,data: Optional[dict] = None) -> SpecificationMet:
"""
a dictionary specifying whether every part of the HIF specification is followed
for the file with the given path
alternatively, can just provide the loaded data directly and the path will be ignored
"""

#pylint:disable=unspecified-encoding
if data is None:
with open(path) as file:
# load JSON file
data = json.loads(file.read())

# dictionary to store statuses
info_class = all_good()

# check that keys do not deviate from the standard field names
info_class["valid_field_names"] = 0
fields = {"network-type", "metadata", "nodes", "edges", "incidences"}
if not set(data).issubset(fields):
fields_warn = ", ".join(fields)
data_warn = ", ".join(set(data))
warn(
f"Acceptable field names are: {fields_warn}\nand the field names are {data_warn}"
)
info_class["valid_field_names"] = 1

# incidences are required; check that they exist
info_class["incidences_exist"] = 0
if "incidences" not in data:
warn("The file must contain an field for incidences.")
info_class["incidences_exist"] = 1

# check network type
info_class["validate_network_type"] = 0
network_types = {"asc", "undirected", "directed"}
if "network-type" in data:
if data["network-type"] not in network_types:
network_types_warn = ", ".join(network_types)
warn(
f"Unsupported network type. Valid types are: {network_types_warn}"
)
info_class["validate_network_type"] = 1

# check network metadata
info_class["metadata_dict"] = 0
if "metadata" in data:
if not isinstance(data["metadata"], dict):
warn("The metadata must be dict-like.")
info_class["metadata_dict"] = 1

# check node attributes
info_class["node_record_length"] = 0
info_class["node_attr_dict"] = 0
if "nodes" in data:
for _i, record in enumerate(data["nodes"]):
if len(record) != 2:
warn(
" ".join(["Each node record must have two entries:",
"an ID and the dictionary of corresponding attributes."])
)
info_class["node_record_length"] = 1

if len(record)>1 and not isinstance(record[1], dict):
warn("The node attributes must be dict-like.")
info_class["node_attr_dict"] = 1

# check edge attributes
info_class["edge_record_length"] = 0
info_class["edge_attr_dict"] = 0
if "edges" in data:
for _i, record in enumerate(data["edges"]):
if len(record) != 2:
warn(
" ".join(["Each edge record must have two entries:",
"an ID and the dictionary of corresponding attributes."])
)
info_class["edge_record_length"] = 1

if len(record) > 1 and not isinstance(record[1], dict):
warn("The edge attributes must be dict-like.")
info_class["edge_attr_dict"] = 1

if "incidences" in data:
info_class["incidence_record_length"] = 0
info_class["incidence_attr_dict"] = 0

for _i, record in enumerate(data["incidences"]):
if len(record) != 3:
warn(
" ".join(["Each incidence record must have three entries:",
"an edge ID, a node ID,",
"and the dictionary of corresponding attributes."])
)
info_class["incidence_record_length"] = 1

if len(record)>2 and not isinstance(record[2], dict):
warn("The incidence attributes must be dict-like.")
info_class["incidence_attr_dict"] = 1

# in the case of directed hypergraphs, each incidence must
# have the "direction" attribute
if "network-type" in data and data["network-type"] == "directed":
data["direction-exists-for-directed"] = 0
for _i, record in enumerate(data["edges"]):
if len(record)<2 or "direction" not in record[2]:
warn(
" ".join(["Each incidence record must have have",
"the 'direction' attribute for directed hypergraphs."])
)
data["direction-exists-for-directed"] = 1
return info_class

def validate_network_type(data, verbose):
def validate_network_type(data, verbose : bool):
"""
Custom validations for network types
"""
Expand All @@ -10,12 +175,13 @@ def validate_network_type(data, verbose):
and data["network-type"] == "directed"
and "incidences" in data
):
for i, record in enumerate(data["incidences"]):
for _i, record in enumerate(data["incidences"]):
if "direction" not in record[2]:
status = 1
_status = 1
if verbose:
print(
f"Each incidence record must have have the 'direction' attribute for directed hypergraphs."
"".join(["Each incidence record must have have",
"the 'direction' attribute for directed hypergraphs."])
)

# in the case of simplicial complexes, make sure that the edges are maximal
Expand All @@ -30,5 +196,5 @@ def validate_network_type(data, verbose):
if e1 != e2 and edge1.issubset(edge2):
if verbose:
print(
f"Only maximal faces should be stored for simplicial complexes."
"Only maximal faces should be stored for simplicial complexes."
)
26 changes: 22 additions & 4 deletions scripts/nx.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
"""
convert to a networkx Graph
"""
from typing import Union
import networkx as nx

def from_hif(data) -> nx.Graph:
g = nx.Graph()
def from_hif(data) -> Union[nx.Graph,nx.DiGraph]:
"""
convert to a networkx Graph
"""
is_directed = data.get("network-type","undirected") == "directed"
if is_directed:
g = nx.DiGraph()
else:
g = nx.Graph()
for n in data.get("nodes", []):
g.add_node(n["node"], bipartite=0, weight=n.get("weight", 0))
for e in data.get("edges", []):
g.add_node(e["edge"], bipartite=1, weight=e.get("weight", 0))
for i in data["incidences"]:
g.add_edge(i["node"], i["edge"], weight=i.get("weight", 0))
return g
if is_directed:
# TODO the default is ambiguous and requires discussion
if i.get("direction","head") == "head":
g.add_edge(i["edge"],i["node"], weight=i.get("weight", 0))
else:
g.add_edge(i["node"], i["edge"], weight=i.get("weight", 0))
else:
g.add_edge(i["node"], i["edge"], weight=i.get("weight", 0))
return g
34 changes: 20 additions & 14 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,59 @@
"""
in order to load each of the files needed for the tests
"""
#pylint:disable = missing-function-docstring,unspecified-encoding
import json

import pytest
import requests
import fastjsonschema

schema = "schemas/hif_schema_v0.1.0.json"
json_dir = "tests/test_files"
SCHEMA = "schemas/hif_schema_v0.1.0.json"
JSON_DIR = "tests/test_files"


@pytest.fixture
def validator():
return fastjsonschema.compile(json.load(open(schema)))
return fastjsonschema.compile(json.load(open(SCHEMA)))

@pytest.fixture
def empty():
return json.load(open(f"{json_dir}/empty.json", "r"))
return json.load(open(f"{JSON_DIR}/empty.json", "r"))

@pytest.fixture
def single_node():
return json.load(open(f"{json_dir}/single_node.json", "r"))
return json.load(open(f"{JSON_DIR}/single_node.json", "r"))

@pytest.fixture
def single_edge():
return json.load(open(f"{json_dir}/single_edge.json", "r"))
return json.load(open(f"{JSON_DIR}/single_edge.json", "r"))

@pytest.fixture
def single_incidence():
return json.load(open(f"{json_dir}/single_incidence.json", "r"))
return json.load(open(f"{JSON_DIR}/single_incidence.json", "r"))

@pytest.fixture
def directed_incidence():
return json.load(open(f"{JSON_DIR}/directed_incidence.json", "r"))

@pytest.fixture
def bad_top_level_field():
return json.load(open(f"{json_dir}/bad_top_level_field.json", "r"))
return json.load(open(f"{JSON_DIR}/bad_top_level_field.json", "r"))


@pytest.fixture
def bad_network_type():
return json.load(open(f"{json_dir}/bad_network_type.json", "r"))
return json.load(open(f"{JSON_DIR}/bad_network_type.json", "r"))


@pytest.fixture
def bad_node_without_id():
return json.load(open(f"{json_dir}/bad_node_without_id.json", "r"))

return json.load(open(f"{JSON_DIR}/bad_node_without_id.json", "r"))

@pytest.fixture
def metadata_as_list():
return json.load(open(f"{json_dir}/metadata_as_list.json", "r"))
return json.load(open(f"{JSON_DIR}/metadata_as_list.json", "r"))


@pytest.fixture
def empty_hypergraph():
return json.load(open(f"{json_dir}/empty_hypergraph.json", "r"))
return json.load(open(f"{JSON_DIR}/empty_hypergraph.json", "r"))
1 change: 1 addition & 0 deletions tests/test_files/directed_incidence.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"network-type": "directed","incidences": [ { "edge": "abcd", "node": 42, "direction": "head"} ]}
Loading