From b9cbb2287601cf604ba04f6f436b3b0862dbc36b Mon Sep 17 00:00:00 2001 From: David Mandelberg Date: Wed, 8 Nov 2023 20:36:06 -0500 Subject: [PATCH] Add a class to represent Entity data, and use it --- rock_paper_sand/wikidata.py | 59 +++++++++++++--------- rock_paper_sand/wikidata_test.py | 84 ++++++++++++++++--------------- rock_paper_sand/wikidata_value.py | 17 ++++++- 3 files changed, 94 insertions(+), 66 deletions(-) diff --git a/rock_paper_sand/wikidata.py b/rock_paper_sand/wikidata.py index 6562533..2c2fad0 100644 --- a/rock_paper_sand/wikidata.py +++ b/rock_paper_sand/wikidata.py @@ -277,24 +277,30 @@ def __init__( session: requests.Session, ) -> None: self._session = session - self._item_by_id: dict[wikidata_value.ItemRef, Any] = {} - self._item_classes: ( - dict[wikidata_value.ItemRef, Set[wikidata_value.ItemRef]] + self._entity_by_ref: ( + dict[wikidata_value.EntityRef, wikidata_value.Entity] + ) = {} + self._entity_classes: ( + dict[wikidata_value.EntityRef, Set[wikidata_value.ItemRef]] ) = {} self._transitive_subclasses: ( dict[wikidata_value.ItemRef, Set[wikidata_value.ItemRef]] ) = {} self._related_media: dict[wikidata_value.ItemRef, RelatedMedia] = {} - def item(self, item_id: wikidata_value.ItemRef) -> Any: - """Returns an item in full JSON format.""" - if item_id not in self._item_by_id: + def entity( + self, entity_ref: wikidata_value.EntityRef + ) -> wikidata_value.Entity: + """Returns an entity.""" + if entity_ref not in self._entity_by_ref: response = self._session.get( - f"https://www.wikidata.org/wiki/Special:EntityData/{item_id.id}.json" # pylint: disable=line-too-long + f"https://www.wikidata.org/wiki/Special:EntityData/{entity_ref.id}.json" # pylint: disable=line-too-long ) response.raise_for_status() - self._item_by_id[item_id] = response.json()["entities"][item_id.id] - return self._item_by_id[item_id] + self._entity_by_ref[entity_ref] = wikidata_value.Entity( + json_full=response.json()["entities"][entity_ref.id], + ) + return self._entity_by_ref[entity_ref] def sparql(self, query: str) -> Any: """Returns results from a SPARQL query.""" @@ -307,18 +313,19 @@ def sparql(self, query: str) -> Any: response.raise_for_status() return response.json()["results"]["bindings"] - def item_classes( - self, item_id: wikidata_value.ItemRef + def entity_classes( + self, entity_ref: wikidata_value.EntityRef ) -> Set[wikidata_value.ItemRef]: - """Returns the classes that the item is an instance of.""" - if item_id not in self._item_classes: - self._item_classes[item_id] = frozenset( + """Returns the classes that the entity is an instance of.""" + if entity_ref not in self._entity_classes: + self._entity_classes[entity_ref] = frozenset( _parse_snak_item(statement["mainsnak"]) for statement in _truthy_statements( - self.item(item_id), wikidata_value.P_INSTANCE_OF + self.entity(entity_ref).json_full, + wikidata_value.P_INSTANCE_OF, ) ) - return self._item_classes[item_id] + return self._entity_classes[entity_ref] def transitive_subclasses( self, class_id: wikidata_value.ItemRef @@ -397,7 +404,9 @@ def related_media(self, item_id: wikidata_value.ItemRef) -> RelatedMedia: _parse_sparql_result_string(result["relation"]) ].add(related_item) for related_item, classes in item_classes.items(): - self._item_classes.setdefault(related_item, frozenset(classes)) + self._entity_classes.setdefault( + related_item, frozenset(classes) + ) related_media = RelatedMedia( parents=frozenset(items_by_relation.pop("parent", ())), siblings=frozenset(items_by_relation.pop("sibling", ())), @@ -576,9 +585,9 @@ def _is_ignored( return True elif ( item in self._ignored_items - or self._api.item_classes(item) & self._ignored_classes + or self._api.entity_classes(item) & self._ignored_classes or any( - self._api.item_classes(item) + self._api.entity_classes(item) & self._api.transitive_subclasses(ignored_class) for ignored_class in config_classes_ignore ) @@ -607,8 +616,8 @@ def _integral_child_classes( def _is_integral_child( self, parent: wikidata_value.ItemRef, child: wikidata_value.ItemRef ) -> bool: - parent_classes = self._api.item_classes(parent) - child_classes = self._api.item_classes(child) + parent_classes = self._api.entity_classes(parent) + child_classes = self._api.entity_classes(child) for ( parent_classes_to_check, child_classes_to_check, @@ -643,7 +652,7 @@ def _should_cross_parent_child_border( self, parent: wikidata_value.ItemRef, child: wikidata_value.ItemRef ) -> bool: del child # Unused. - parent_classes = self._api.item_classes(parent) + parent_classes = self._api.entity_classes(parent) for collection in ( wikidata_value.Q_ANTHOLOGY, wikidata_value.Q_LIST, @@ -665,7 +674,7 @@ def _update_unprocessed( for item in iterable: reached_from.setdefault(item, current) if ( - self._api.item_classes(item) + self._api.entity_classes(item) & self._unlikely_to_be_processed_classes ): unprocessed_unlikely.add(item) @@ -677,7 +686,7 @@ def _related_item_result_extra( category: str, item: wikidata_value.ItemRef, ) -> media_filter.ResultExtra: - item_data = self._api.item(item) + item_data = self._api.entity(item).json_full item_description_parts = [] if (label := _label(item_data, self._config.languages)) is not None: item_description_parts.append(label) @@ -806,7 +815,7 @@ def filter_implementation( return media_filter.FilterResult(False) extra_information: set[media_filter.ResultExtra] = set() if self._config.release_statuses: - item = self._api.item(request.item.wikidata_item) + item = self._api.entity(request.item.wikidata_item).json_full if ( _release_status(item, now=request.now) not in self._config.release_statuses diff --git a/rock_paper_sand/wikidata_test.py b/rock_paper_sand/wikidata_test.py index 2d04183..8ae1362 100644 --- a/rock_paper_sand/wikidata_test.py +++ b/rock_paper_sand/wikidata_test.py @@ -98,17 +98,17 @@ def setUp(self) -> None: ) self._api = wikidata.Api(session=self._mock_session) - def test_item(self) -> None: - item = {"foo": "bar"} + def test_entity(self) -> None: + entity = wikidata_value.Entity(json_full={"foo": "bar"}) self._mock_session.get.return_value.json.return_value = { - "entities": {"Q1": item} + "entities": {"Q1": entity.json_full} } - first_response = self._api.item(wikidata_value.ItemRef("Q1")) - second_response = self._api.item(wikidata_value.ItemRef("Q1")) + first_response = self._api.entity(wikidata_value.ItemRef("Q1")) + second_response = self._api.entity(wikidata_value.ItemRef("Q1")) - self.assertEqual(item, first_response) - self.assertEqual(item, second_response) + self.assertEqual(entity, first_response) + self.assertEqual(entity, second_response) self.assertSequenceEqual( ( # Note that this only happens once because the second time is @@ -143,7 +143,7 @@ def test_sparql(self) -> None: self._mock_session.mock_calls, ) - def test_item_classes(self) -> None: + def test_entity_classes(self) -> None: self._mock_session.get.return_value.json.return_value = { "entities": { "Q1": { @@ -157,8 +157,8 @@ def test_item_classes(self) -> None: } } - first_result = self._api.item_classes(wikidata_value.ItemRef("Q1")) - second_result = self._api.item_classes(wikidata_value.ItemRef("Q1")) + first_result = self._api.entity_classes(wikidata_value.ItemRef("Q1")) + second_result = self._api.entity_classes(wikidata_value.ItemRef("Q1")) expected_classes = { wikidata_value.ItemRef("Q2"), @@ -272,7 +272,7 @@ def test_related_media( first_result = self._api.related_media(wikidata_value.ItemRef("Q1")) second_result = self._api.related_media(wikidata_value.ItemRef("Q1")) actual_classes = { - item: self._api.item_classes(item) + item: self._api.entity_classes(item) for item in { *first_result.parents, *first_result.siblings, @@ -805,28 +805,28 @@ def setUp(self) -> None: testcase_name="no_match_conditions", filter_config={}, item={"name": "foo", "wikidata": "Q1"}, - api_items={"Q1": {}}, + api_entities={"Q1": {}}, expected_result=media_filter.FilterResult(True), ), dict( testcase_name="release_statuses_no_match", filter_config={"releaseStatuses": ["RELEASED"]}, item={"name": "foo", "wikidata": "Q1"}, - api_items={"Q1": {"claims": {}}}, + api_entities={"Q1": {"claims": {}}}, expected_result=media_filter.FilterResult(False), ), dict( testcase_name="release_statuses_unknown", filter_config={"releaseStatuses": ["RELEASE_STATUS_UNSPECIFIED"]}, item={"name": "foo", "wikidata": "Q1"}, - api_items={"Q1": {"claims": {}}}, + api_entities={"Q1": {"claims": {}}}, expected_result=media_filter.FilterResult(True), ), dict( testcase_name="release_statuses_before_range", filter_config={"releaseStatuses": ["UNRELEASED"]}, item={"name": "foo", "wikidata": "Q1"}, - api_items={ + api_entities={ "Q1": { "claims": { wikidata_value.P_START_TIME.id: [ @@ -850,7 +850,7 @@ def setUp(self) -> None: testcase_name="release_statuses_before_start", filter_config={"releaseStatuses": ["UNRELEASED"]}, item={"name": "foo", "wikidata": "Q1"}, - api_items={ + api_entities={ "Q1": { "claims": { wikidata_value.P_START_TIME.id: [ @@ -868,7 +868,7 @@ def setUp(self) -> None: testcase_name="release_statuses_in_range", filter_config={"releaseStatuses": ["ONGOING"]}, item={"name": "foo", "wikidata": "Q1"}, - api_items={ + api_entities={ "Q1": { "claims": { wikidata_value.P_START_TIME.id: [ @@ -892,7 +892,7 @@ def setUp(self) -> None: testcase_name="release_statuses_after_start", filter_config={"releaseStatuses": ["ONGOING"]}, item={"name": "foo", "wikidata": "Q1"}, - api_items={ + api_entities={ "Q1": { "claims": { wikidata_value.P_START_TIME.id: [ @@ -910,7 +910,7 @@ def setUp(self) -> None: testcase_name="release_statuses_before_end", filter_config={"releaseStatuses": ["ONGOING"]}, item={"name": "foo", "wikidata": "Q1"}, - api_items={ + api_entities={ "Q1": { "claims": { wikidata_value.P_END_TIME.id: [ @@ -928,7 +928,7 @@ def setUp(self) -> None: testcase_name="release_statuses_after_range", filter_config={"releaseStatuses": ["RELEASED"]}, item={"name": "foo", "wikidata": "Q1"}, - api_items={ + api_entities={ "Q1": { "claims": { wikidata_value.P_START_TIME.id: [ @@ -952,7 +952,7 @@ def setUp(self) -> None: testcase_name="release_statuses_after_end", filter_config={"releaseStatuses": ["RELEASED"]}, item={"name": "foo", "wikidata": "Q1"}, - api_items={ + api_entities={ "Q1": { "claims": { wikidata_value.P_END_TIME.id: [ @@ -970,7 +970,7 @@ def setUp(self) -> None: testcase_name="release_statuses_before_release", filter_config={"releaseStatuses": ["UNRELEASED"]}, item={"name": "foo", "wikidata": "Q1"}, - api_items={ + api_entities={ "Q1": { "claims": { wikidata_value.P_PUBLICATION_DATE.id: [ @@ -988,7 +988,7 @@ def setUp(self) -> None: testcase_name="release_statuses_after_release", filter_config={"releaseStatuses": ["RELEASED"]}, item={"name": "foo", "wikidata": "Q1"}, - api_items={ + api_entities={ "Q1": { "claims": { wikidata_value.P_PUBLICATION_DATE.id: [ @@ -1031,12 +1031,12 @@ def setUp(self) -> None: "wikidata": "Q1", "parts": [{"name": "bar", "wikidata": "Q4"}], }, - api_items={ + api_entities={ "Q2": {"labels": {}, "descriptions": {}}, "Q3": {"labels": {}, "descriptions": {}}, "Q5": {"labels": {}, "descriptions": {}}, }, - api_item_classes={ + api_entity_classes={ "Q1": set(), "Q2": set(), "Q3": set(), @@ -1105,10 +1105,10 @@ def setUp(self) -> None: "wikidata": "Q1", "parts": [{"name": "bar", "wikidata": "Q2"}], }, - api_items={ + api_entities={ "Q3": {"labels": {}, "descriptions": {}}, }, - api_item_classes={ + api_entity_classes={ "Q2": set(), "Q3": set(), }, @@ -1174,7 +1174,7 @@ def setUp(self) -> None: "wikidataIgnore": ["Q3", "Q4", "Q5"], "wikidataClassesIgnore": ["Q61"], }, - api_item_classes={ + api_entity_classes={ "Q1": set(), "Q2": {wikidata_value.Q_FICTIONAL_ENTITY}, "Q3": set(), @@ -1206,13 +1206,13 @@ def setUp(self) -> None: testcase_name="related_media_ignores_integral_children", filter_config={"relatedMedia": {}}, item={"name": "foo", "wikidata": "Q1"}, - api_items={ + api_entities={ "Q2": {"labels": {}, "descriptions": {}}, "Q22": {"labels": {}, "descriptions": {}}, "Q3": {"labels": {}, "descriptions": {}}, "Q4": {"labels": {}, "descriptions": {}}, }, - api_item_classes={ + api_entity_classes={ "Q1": set(), "Q2": {wikidata_value.Q_TELEVISION_SERIES}, "Q21": {wikidata_value.Q_TELEVISION_SERIES_EPISODE}, @@ -1316,10 +1316,10 @@ def setUp(self) -> None: testcase_name="related_media_does_not_traverse_collections", filter_config={"relatedMedia": {}}, item={"name": "foo", "wikidata": "Q1"}, - api_items={ + api_entities={ "Q3": {"labels": {}, "descriptions": {}}, }, - api_item_classes={ + api_entity_classes={ "Q1": set(), "Q2": {wikidata_value.Q_LIST}, "Q3": {wikidata_value.Q_ANTHOLOGY}, @@ -1352,7 +1352,7 @@ def setUp(self) -> None: testcase_name="related_media_includes_label_and_description", filter_config={"languages": ["en"], "relatedMedia": {}}, item={"name": "foo", "wikidata": "Q1"}, - api_items={ + api_entities={ "Q2": { "labels": {"en": {"value": "film 2"}}, "descriptions": {"en": {"value": "2002 film"}}, @@ -1362,7 +1362,7 @@ def setUp(self) -> None: "descriptions": {"en": {"value": "2003 film"}}, }, }, - api_item_classes={ + api_entity_classes={ "Q2": set(), "Q3": set(), }, @@ -1401,8 +1401,8 @@ def test_filter( filter_config: Any, item: Any, parent_fully_qualified_name: str | None = None, - api_items: Mapping[str, Any] = immutabledict.immutabledict(), - api_item_classes: Mapping[str, Set[wikidata_value.ItemRef]] = ( + api_entities: Mapping[str, Any] = immutabledict.immutabledict(), + api_entity_classes: Mapping[str, Set[wikidata_value.ItemRef]] = ( immutabledict.immutabledict() ), api_related_media: Mapping[str, wikidata.RelatedMedia] = ( @@ -1410,9 +1410,13 @@ def test_filter( ), expected_result: media_filter.FilterResult, ) -> None: - self._mock_api.item.side_effect = lambda item_id: api_items[item_id.id] - self._mock_api.item_classes.side_effect = ( - lambda item_id: api_item_classes[item_id.id] + self._mock_api.entity.side_effect = ( + lambda entity_ref: wikidata_value.Entity( + json_full=api_entities[entity_ref.id] + ) + ) + self._mock_api.entity_classes.side_effect = ( + lambda entity_ref: api_entity_classes[entity_ref.id] ) self._mock_api.related_media.side_effect = ( lambda item_id: api_related_media[item_id.id] @@ -1434,7 +1438,7 @@ def test_filter( self.assertEqual(expected_result, result) def test_too_many_related_items(self) -> None: - self._mock_api.item_classes.return_value = set() + self._mock_api.entity_classes.return_value = set() self._mock_api.related_media.return_value = wikidata.RelatedMedia( parents=set(), siblings={wikidata_value.ItemRef(f"Q{n}") for n in range(1001)}, diff --git a/rock_paper_sand/wikidata_value.py b/rock_paper_sand/wikidata_value.py index a24764a..ef73ff9 100644 --- a/rock_paper_sand/wikidata_value.py +++ b/rock_paper_sand/wikidata_value.py @@ -20,7 +20,7 @@ from collections.abc import Collection import dataclasses import re -from typing import Self +from typing import Any, Self def _parse_id( @@ -193,3 +193,18 @@ def human_readable_url_prefix(cls) -> str: "https://www.wikidata.org/wiki/Property:P1434" ) del _p + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class Entity: + """Data about an entity. + + Attributes: + json_full: JSON data about the entity, full flavor. See + https://www.wikidata.org/wiki/Wikidata:Data_access#Linked_Data_Interface_(URI) + for how to get the data and + https://doc.wikimedia.org/Wikibase/master/php/docs_topics_json.html + for the format. + """ + + json_full: Any