Skip to content

Commit

Permalink
[PYG-201] 🦘 Single edge with properties (#277)
Browse files Browse the repository at this point in the history
* refactor: regen

* refactor: updated example

* refactor: generate edge with properties

* refactor; regen
  • Loading branch information
doctrino authored Aug 14, 2024
1 parent 883c806 commit eb60269
Show file tree
Hide file tree
Showing 21 changed files with 1,154 additions and 19 deletions.
7 changes: 6 additions & 1 deletion cognite/pygen/_core/models/data_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,14 @@ def one_to_one_edge_without_properties(self) -> Iterable[OneToOneConnectionField

@property
def one_to_many_edges_with_properties(self) -> Iterable[OneToManyConnectionField]:
"""All MultiEdges with properties on the edge."""
"""All MultiEdges with properties."""
return (field_ for field_ in self.fields_of_type(OneToManyConnectionField) if field_.is_edge_with_properties)

@property
def one_to_one_edges_with_properties(self) -> Iterable[OneToOneConnectionField]:
"""All SingleEdges with properties."""
return (field_ for field_ in self.fields_of_type(OneToOneConnectionField) if field_.is_edge_with_properties)

@property
def one_to_one_direct_relations_with_source(self) -> Iterable[OneToOneConnectionField]:
"""All direct relations."""
Expand Down
7 changes: 4 additions & 3 deletions cognite/pygen/_core/models/fields/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,7 @@ def _is_supported_one_to_many_connection(cls, prop: dm.ConnectionDefinition | dm
def _is_supported_one_to_one_connection(cls, prop: dm.ConnectionDefinition | dm.MappedProperty) -> bool:
if isinstance(prop, dm.MappedProperty) and isinstance(prop.type, dm.DirectRelation) and not prop.type.is_list:
return True
# SingleEdgeConnection with properties are not yet supported
elif isinstance(prop, SingleEdgeConnection) and not prop.edge_source:
elif isinstance(prop, SingleEdgeConnection):
return True
elif isinstance(prop, SingleReverseDirectRelation):
return True
Expand Down Expand Up @@ -379,7 +378,9 @@ def as_write(self) -> str:
if self.destination_class and not self.destination_class.is_writable:
method = "as_id"

return self._create_as_method(method, "DomainModel", self.use_node_reference_in_type_hint)
base_cls = "DomainRelation" if self.is_edge_with_properties else "DomainModel"

return self._create_as_method(method, base_cls, self.use_node_reference_in_type_hint)

def as_read_graphql(self) -> str:
"""Return the code to convert the field from the GraphQL to the read data class."""
Expand Down
20 changes: 17 additions & 3 deletions cognite/pygen/_core/templates/data_class_node.py.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -242,17 +242,23 @@ class {{ data_class.read_name }}({{ data_class.read_base_class }}{% if is_pydant
warnings.warn(
f"Expected one edge for '{{ field.name }}' in {instance.as_id()}."
f"Ignoring new edge {value!s} in favor of {instance.{{ field.name }}!s}."
){% endfor %}{% for field in data_class.one_to_one_edge_with_properties %}
){% endfor %}{% for field in data_class.one_to_one_edges_with_properties %}
if edge_type == {{ field.edge_type_str }} and isinstance(
value, {{ field.destination_class.read_name }}
value, {{ field.edge_class.read_name }}
):
if instance.{{ field.name }} is None:
instance.{{ field.name }} = value
elif instance.{{ field.name }} == value:
# This is the same edge, so we don't need to do anything...
...
else:
warnings.warn(
f"Expected one edge for '{{ field.name }}' in {instance.as_id()}."
f"Ignoring new edge {value!s} in favor of {instance.{{ field.name }}!s}."
){% endfor %}
)

if end_node := nodes_by_id.get(as_pygen_node_id(value.end_node)):
value.end_node = end_node # type: ignore[assignment]{% endfor %}
{% for field in data_class.one_to_many_edges_without_properties %}
instance.{{ field.name }} = {{ field.name }} or None{% endfor %}{% for field in data_class.one_to_many_edges_with_properties %}
instance.{{ field.name }} = {{ field.name }}{% endfor %}
Expand Down Expand Up @@ -364,6 +370,14 @@ class {{ data_class.write_name }}({{ data_class.write_base_class }}{% if is_pyda
{{ field.edge_type_str }},
)
resources.extend(other_resources)
{% endfor %}{% for field in data_class.one_to_one_edges_with_properties %}
if self.{{ field.name }} is not None:
other_resources = self.{{ field.name }}._to_instances_write(
cache,
self,
{{ field.edge_type_str }},
)
resources.extend(other_resources)
{% endfor %}{% for field in data_class.one_to_many_edges_without_properties %}
edge_type = {{ field.edge_type_str }}
for {{ field.variable }} in self.{{ field.name }} or []:{% if field.edge_direction == 'outwards' %}
Expand Down
3 changes: 3 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ Changes are grouped as follows
now have the property name prefixed with `no_` to make it a valid Python variable name instead
of raising an exception.

## Added
- Support for edges with properties of type `single_edge_connection`.

## [0.99.30] - 24-08-13
### Added
- New approach to querying. This is accessed through the `.query()` method
Expand Down
40 changes: 38 additions & 2 deletions examples-pydantic-v1/omni_pydantic_v1/_api/connection_item_e.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
ConnectionItemEList,
ConnectionItemEWriteList,
ConnectionItemETextFields,
ConnectionEdgeA,
ConnectionEdgeAWrite,
ConnectionEdgeAList,
ConnectionEdgeA,
ConnectionItemD,
ConnectionItemF,
)
from omni_pydantic_v1.data_classes._connection_item_e import (
ConnectionItemEQuery,
Expand All @@ -39,6 +44,7 @@
SequenceNotStr,
)
from .connection_item_e_inwards_single import ConnectionItemEInwardsSingleAPI
from .connection_item_e_inwards_single_property import ConnectionItemEInwardsSinglePropertyAPI
from .connection_item_e_query import ConnectionItemEQueryAPI


Expand All @@ -53,6 +59,7 @@ def __init__(self, client: CogniteClient):
super().__init__(client=client)

self.inwards_single_edge = ConnectionItemEInwardsSingleAPI(client)
self.inwards_single_property_edge = ConnectionItemEInwardsSinglePropertyAPI(client)

def __call__(
self,
Expand Down Expand Up @@ -101,7 +108,7 @@ def apply(
"""Add or update (upsert) connection item es.
Note: This method iterates through all nodes and timeseries linked to connection_item_e and creates them including the edges
between the nodes. For example, if any of `inwards_single` are set, then these
between the nodes. For example, if any of `inwards_single` or `inwards_single_property` are set, then these
nodes as well as any nodes linked to them, and all the edges linking these nodes will be created.
Args:
Expand Down Expand Up @@ -208,6 +215,13 @@ def retrieve(
"inwards",
dm.ViewId("pygen-models", "ConnectionItemD", "1"),
),
(
self.inwards_single_property_edge,
"inwards_single_property",
dm.DirectRelationReference("pygen-models", "multiProperty"),
"inwards",
dm.ViewId("pygen-models", "ConnectionItemF", "1"),
),
],
)

Expand Down Expand Up @@ -486,7 +500,7 @@ def list(
sort: (Advanced) If sort_by and direction are not sufficient, you can write your own sorting.
This will override the sort_by and direction. This allowos you to sort by multiple fields and
specify the direction for each field as well as how to handle null values.
retrieve_connections: Whether to retrieve `direct_reverse_multi`, `direct_reverse_single` and `inwards_single` for the connection item es. Defaults to 'skip'.
retrieve_connections: Whether to retrieve `direct_reverse_multi`, `direct_reverse_single`, `inwards_single` and `inwards_single_property` for the connection item es. Defaults to 'skip'.
'skip' will not retrieve any connections, 'identifier' will only retrieve the identifier of the connected items, and 'full' will retrieve the full connected items.
Returns:
Expand Down Expand Up @@ -545,6 +559,18 @@ def list(
),
)
)
edge_inwards_single_property = builder.create_name(from_root)
builder.append(
EdgeQueryStep(
edge_inwards_single_property,
dm.query.EdgeResultSetExpression(
from_=from_root,
direction="inwards",
chain_to="destination",
),
ConnectionEdgeA,
)
)
if retrieve_connections == "full":
builder.append(
NodeQueryStep(
Expand All @@ -556,6 +582,16 @@ def list(
ConnectionItemD,
)
)
builder.append(
NodeQueryStep(
builder.create_name(edge_inwards_single_property),
dm.query.NodeResultSetExpression(
from_=edge_inwards_single_property,
filter=dm.filters.HasData(views=[ConnectionItemF._view_id]),
),
ConnectionItemF,
)
)
builder.append(
NodeQueryStep(
builder.create_name(from_root),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from __future__ import annotations

import datetime

from cognite.client import data_modeling as dm

from omni_pydantic_v1.data_classes import (
ConnectionEdgeA,
ConnectionEdgeAList,
ConnectionEdgeAWrite,
)
from omni_pydantic_v1.data_classes._connection_edge_a import _create_connection_edge_a_filter

from ._core import DEFAULT_LIMIT_READ, EdgePropertyAPI
from omni_pydantic_v1.data_classes._core import DEFAULT_INSTANCE_SPACE


class ConnectionItemEInwardsSinglePropertyAPI(EdgePropertyAPI):
_view_id = dm.ViewId("pygen-models", "ConnectionEdgeA", "1")
_class_type = ConnectionEdgeA
_class_write_type = ConnectionEdgeAWrite
_class_list = ConnectionEdgeAList

def list(
self,
from_connection_item_e: str | list[str] | dm.NodeId | list[dm.NodeId] | None = None,
from_connection_item_e_space: str = DEFAULT_INSTANCE_SPACE,
to_connection_item_f: str | list[str] | dm.NodeId | list[dm.NodeId] | None = None,
to_connection_item_f_space: str = DEFAULT_INSTANCE_SPACE,
min_end_time: datetime.datetime | None = None,
max_end_time: datetime.datetime | None = None,
name: str | list[str] | None = None,
name_prefix: str | None = None,
min_start_time: datetime.datetime | None = None,
max_start_time: datetime.datetime | None = None,
external_id_prefix: str | None = None,
space: str | list[str] | None = None,
limit=DEFAULT_LIMIT_READ,
) -> ConnectionEdgeAList:
"""List inwards single property edges of a connection item e.
Args:
from_connection_item_e: ID of the source connection item e.
from_connection_item_e_space: Location of the connection item es.
to_connection_item_f: ID of the target connection item f.
to_connection_item_f_space: Location of the connection item fs.
min_end_time: The minimum value of the end time to filter on.
max_end_time: The maximum value of the end time to filter on.
name: The name to filter on.
name_prefix: The prefix of the name to filter on.
min_start_time: The minimum value of the start time to filter on.
max_start_time: The maximum value of the start time to filter on.
external_id_prefix: The prefix of the external ID to filter on.
space: The space to filter on.
limit: Maximum number of inwards single property edges to return. Defaults to 25. Set to -1, float("inf") or None
to return all items.
Returns:
The requested inwards single property edges.
Examples:
List 5 inwards single property edges connected to "my_connection_item_e":
>>> from omni_pydantic_v1 import OmniClient
>>> client = OmniClient()
>>> connection_item_e = client.connection_item_e.inwards_single_property_edge.list("my_connection_item_e", limit=5)
"""
filter_ = _create_connection_edge_a_filter(
dm.DirectRelationReference("pygen-models", "multiProperty"),
self._view_id,
to_connection_item_f,
to_connection_item_f_space,
from_connection_item_e,
from_connection_item_e_space,
min_end_time,
max_end_time,
name,
name_prefix,
min_start_time,
max_start_time,
external_id_prefix,
space,
)
return self._list(filter_=filter_, limit=limit)
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@
from omni_pydantic_v1.data_classes import (
DomainModelCore,
ConnectionItemE,
ConnectionEdgeA,
)
from omni_pydantic_v1.data_classes._connection_item_d import (
ConnectionItemD,
_create_connection_item_d_filter,
)
from omni_pydantic_v1.data_classes._connection_item_f import (
ConnectionItemF,
_create_connection_item_f_filter,
)
from ._core import (
DEFAULT_QUERY_LIMIT,
EdgeQueryStep,
Expand All @@ -23,8 +28,13 @@
_create_edge_filter,
)

from omni_pydantic_v1.data_classes._connection_edge_a import (
_create_connection_edge_a_filter,
)

if TYPE_CHECKING:
from .connection_item_d_query import ConnectionItemDQueryAPI
from .connection_item_f_query import ConnectionItemFQueryAPI


class ConnectionItemEQueryAPI(QueryAPI[T_DomainModelList]):
Expand Down Expand Up @@ -117,6 +127,90 @@ def inwards_single(
)
return ConnectionItemDQueryAPI(self._client, self._builder, node_filer, limit)

def inwards_single_property(
self,
direct_list: str | tuple[str, str] | list[str] | list[tuple[str, str]] | None = None,
name: str | list[str] | None = None,
name_prefix: str | None = None,
external_id_prefix: str | None = None,
space: str | list[str] | None = None,
min_end_time_edge: datetime.datetime | None = None,
max_end_time_edge: datetime.datetime | None = None,
name_edge: str | list[str] | None = None,
name_prefix_edge: str | None = None,
min_start_time_edge: datetime.datetime | None = None,
max_start_time_edge: datetime.datetime | None = None,
external_id_prefix_edge: str | None = None,
space_edge: str | list[str] | None = None,
filter: dm.Filter | None = None,
limit: int = DEFAULT_QUERY_LIMIT,
) -> ConnectionItemFQueryAPI[T_DomainModelList]:
"""Query along the inwards single property edges of the connection item e.
Args:
direct_list: The direct list to filter on.
name: The name to filter on.
name_prefix: The prefix of the name to filter on.
external_id_prefix: The prefix of the external ID to filter on.
space: The space to filter on.
min_end_time_edge: The minimum value of the end time to filter on.
max_end_time_edge: The maximum value of the end time to filter on.
name_edge: The name to filter on.
name_prefix_edge: The prefix of the name to filter on.
min_start_time_edge: The minimum value of the start time to filter on.
max_start_time_edge: The maximum value of the start time to filter on.
external_id_prefix_edge: The prefix of the external ID to filter on.
space_edge: The space to filter on.
filter: (Advanced) Filter applied to node. If the filtering available in the above is not sufficient, you can write your own filtering which will be ANDed with the filter above.
limit: Maximum number of inwards single property edges to return. Defaults to 3. Set to -1, float("inf") or None
to return all items.
Returns:
ConnectionItemFQueryAPI: The query API for the connection item f.
"""
from .connection_item_f_query import ConnectionItemFQueryAPI

# from is a string as we added a node query step in the __init__ method
from_ = cast(str, self._builder.get_from())
edge_view = ConnectionEdgeA._view_id
edge_filter = _create_connection_edge_a_filter(
dm.DirectRelationReference("pygen-models", "multiProperty"),
edge_view,
min_end_time=min_end_time_edge,
max_end_time=max_end_time_edge,
name=name_edge,
name_prefix=name_prefix_edge,
min_start_time=min_start_time_edge,
max_start_time=max_start_time_edge,
external_id_prefix=external_id_prefix_edge,
space=space_edge,
)
self._builder.append(
EdgeQueryStep(
name=self._builder.create_name(from_),
expression=dm.query.EdgeResultSetExpression(
filter=edge_filter,
from_=from_,
direction="inwards",
),
result_cls=ConnectionEdgeA,
max_retrieve_limit=limit,
)
)

view_id = ConnectionItemFQueryAPI._view_id
has_data = dm.filters.HasData(views=[view_id])
node_filer = _create_connection_item_f_filter(
view_id,
direct_list,
name,
name_prefix,
external_id_prefix,
space,
(filter and dm.filters.And(filter, has_data)) or has_data,
)
return ConnectionItemFQueryAPI(self._client, self._builder, node_filer, limit)

def query(
self,
) -> T_DomainModelList:
Expand Down
Loading

0 comments on commit eb60269

Please sign in to comment.