Skip to content

Commit

Permalink
implement a file format for smaller csv storage
Browse files Browse the repository at this point in the history
  • Loading branch information
pudo committed Jun 26, 2023
1 parent fdc4009 commit 5ac1db8
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 36 deletions.
8 changes: 5 additions & 3 deletions nomenklatura/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from nomenklatura.dataset import DS
from nomenklatura.publish.names import pick_name
from nomenklatura.statement.statement import Statement
from nomenklatura.util import BASE_ID

if TYPE_CHECKING:
from nomenklatura.store import View
Expand Down Expand Up @@ -60,6 +61,7 @@ def __init__(
self._statements: Dict[str, Set[Statement]] = {}

properties = data.pop("properties", None)
# external = data.pop("external", None)
if isinstance(properties, Mapping):
for key, value in properties.items():
self.add(key, value, cleaned=cleaned, quiet=True)
Expand Down Expand Up @@ -88,8 +90,8 @@ def statements(self) -> Generator[Statement, None, None]:
yield Statement(
canonical_id=self.id,
entity_id=self.id,
prop=Statement.BASE,
prop_type=Statement.BASE,
prop=BASE_ID,
prop_type=BASE_ID,
schema=self.schema.name,
value=self.checksum(),
dataset=self.default_dataset,
Expand Down Expand Up @@ -165,7 +167,7 @@ def add_statement(self, stmt: Statement) -> None:
self.schema = model.common_schema(self.schema, stmt.schema)
except InvalidData as exc:
raise InvalidData(f"{self.id}: {exc}") from exc
if stmt.prop != Statement.BASE:
if stmt.prop != BASE_ID:
self._statements.setdefault(stmt.prop, set())
self._statements[stmt.prop].add(stmt)

Expand Down
62 changes: 60 additions & 2 deletions nomenklatura/statement/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
import orjson
from pathlib import Path
from io import TextIOWrapper
from typing import BinaryIO, Generator, Iterable, Type
from typing import BinaryIO, Generator, Iterable, Type, Dict, Any
from followthemoney.cli.util import MAX_LINE

from nomenklatura.statement.statement import S
from nomenklatura.util import pack_prop, unpack_prop, bool_text

JSON = "json"
CSV = "csv"
FORMATS = [JSON, CSV]
PACK = "pack"
FORMATS = [JSON, CSV, PACK]

CSV_COLUMNS = [
"canonical_id",
Expand All @@ -29,6 +31,20 @@
"id",
]

PACK_COLUMNS = [
"entity_id",
"prop",
"value",
"dataset",
"lang",
"original_value",
"target",
"external",
"first_seen",
"last_seen",
"id",
]


def read_json_statements(
fh: BinaryIO,
Expand All @@ -48,11 +64,26 @@ def read_csv_statements(
yield statement_type.from_row(row)


def read_pack_statements(
fh: BinaryIO, statement_type: Type[S]
) -> Generator[S, None, None]:
wrapped = TextIOWrapper(fh, encoding="utf-8")
for row in csv.DictReader(wrapped, dialect=csv.unix_dialect):
row["canonical_id"] = row["entity_id"]
schema, prop_type, prop = unpack_prop(row["prop"])
row["schema"] = schema
row["prop"] = prop
row["prop_type"] = prop_type
yield statement_type.from_row(row)


def read_statements(
fh: BinaryIO, format: str, statement_type: Type[S]
) -> Generator[S, None, None]:
if format == CSV:
yield from read_csv_statements(fh, statement_type)
elif format == PACK:
yield from read_pack_statements(fh, statement_type)
else:
yield from read_json_statements(fh, statement_type)

Expand Down Expand Up @@ -88,8 +119,35 @@ def write_csv_statements(fh: BinaryIO, statements: Iterable[S]) -> None:
writer.writerow([row.get(c) for c in CSV_COLUMNS])


def pack_statement(stmt: S) -> Dict[str, Any]:
row = stmt.to_row()
row.pop("canonical_id", None)
row.pop("prop_type", None)
prop = row.pop("prop")
schema = row.pop("schema")
if prop is None or schema is None:
raise ValueError("Cannot pack statement without prop and schema")
row["prop"] = pack_prop(prop, schema)
return row


def write_pack_statements(fh: BinaryIO, statements: Iterable[S]) -> None:
with TextIOWrapper(fh, encoding="utf-8") as wrapped:
writer = csv.writer(
wrapped,
dialect=csv.unix_dialect,
quoting=csv.QUOTE_MINIMAL,
)
writer.writerow(PACK_COLUMNS)
for stmt in statements:
row = pack_statement(stmt)
writer.writerow([row.get(c) for c in PACK_COLUMNS])


def write_statements(fh: BinaryIO, format: str, statements: Iterable[S]) -> None:
if format == CSV:
write_csv_statements(fh, statements)
elif format == PACK:
write_pack_statements(fh, statements)
else:
write_json_statements(fh, statements)
12 changes: 6 additions & 6 deletions nomenklatura/statement/statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import cast, TYPE_CHECKING
from typing import Any, Dict, Generator, Optional, Type, TypeVar, TypedDict

from nomenklatura.util import bool_text, datetime_iso, text_bool
from nomenklatura.util import bool_text, datetime_iso, text_bool, BASE_ID

if TYPE_CHECKING:
from nomenklatura.entity import CE
Expand Down Expand Up @@ -39,7 +39,7 @@ class Statement(object):
want to support making property-less entities.
"""

BASE = "id"
BASE = BASE_ID

__slots__ = [
"id",
Expand Down Expand Up @@ -132,8 +132,8 @@ def __eq__(self, other: Any) -> bool:
return not self.id != other.id

def __lt__(self, other: Any) -> bool:
self_key = (self.prop != self.BASE, self.id or "")
other_key = (other.prop != self.BASE, other.id or "")
self_key = (self.prop != BASE_ID, self.id or "")
other_key = (other.prop != BASE_ID, other.id or "")
return self_key < other_key

def clone(self: S) -> S:
Expand Down Expand Up @@ -227,8 +227,8 @@ def from_entity(
if entity.id is not None:
yield cls(
entity_id=entity.id,
prop=cls.BASE,
prop_type=cls.BASE,
prop=BASE_ID,
prop_type=BASE_ID,
schema=entity.schema.name,
value=entity.id,
dataset=dataset,
Expand Down
25 changes: 1 addition & 24 deletions nomenklatura/store/util.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,7 @@
import orjson
from hashlib import sha1
from functools import cache
from typing import Tuple
from followthemoney import model

from nomenklatura.statement import Statement

QNAME_PREFIX = 5


def pack_prop(schema: str, prop: str) -> str:
return f"{schema}:{prop}"


@cache
def unpack_prop(id: str) -> Tuple[str, str, str]:
schema, prop = id.split(":", 1)
if prop == Statement.BASE:
return schema, Statement.BASE, Statement.BASE
schema_obj = model.get(schema)
if schema_obj is None:
raise TypeError("Schema not found: %s" % schema)
prop_obj = schema_obj.get(prop)
if prop_obj is None:
raise TypeError("Property not found: %s" % prop)
return schema, prop_obj.type.name, prop
from nomenklatura.util import pack_prop, unpack_prop


def pack_statement(stmt: Statement) -> bytes:
Expand Down
21 changes: 20 additions & 1 deletion nomenklatura/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

DATA_PATH = Path(os.path.join(os.path.dirname(__file__), "data")).resolve()
QID = re.compile(r"^Q(\d+)$")
BASE_ID = "id"
PathLike = Union[str, os.PathLike[str]]
ParamsType = Union[None, Iterable[Tuple[str, Any]], Mapping[str, Any]]

Expand Down Expand Up @@ -63,7 +64,7 @@ def iso_to_version(value: str) -> Optional[str]:
def bool_text(value: Optional[bool]) -> Optional[str]:
if value is None:
return None
return "true" if value else "false"
return "t" if value else "f"


@cache
Expand Down Expand Up @@ -99,3 +100,21 @@ def normalize_name(original: str) -> Optional[str]:
def levenshtein(left: str, right: str) -> int:
"""Compute the Levenshtein distance between two strings."""
return Levenshtein.distance(left[:128], right[:128])


def pack_prop(schema: str, prop: str) -> str:
return f"{schema}:{prop}"


@cache
def unpack_prop(id: str) -> Tuple[str, str, str]:
schema, prop = id.split(":", 1)
if prop == BASE_ID:
return schema, BASE_ID, BASE_ID
schema_obj = model.get(schema)
if schema_obj is None:
raise TypeError("Schema not found: %s" % schema)
prop_obj = schema_obj.get(prop)
if prop_obj is None:
raise TypeError("Property not found: %s" % prop)
return schema, prop_obj.type.name, prop

0 comments on commit 5ac1db8

Please sign in to comment.