Skip to content

Commit

Permalink
refactor; regen
Browse files Browse the repository at this point in the history
  • Loading branch information
doctrino committed Sep 30, 2024
1 parent 161bef7 commit 020b9ec
Showing 1 changed file with 243 additions and 0 deletions.
243 changes: 243 additions & 0 deletions examples-pydantic-v1/omni_sub_pydantic_v1/_api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
from __future__ import annotations

import warnings
from pathlib import Path
from typing import Any, Sequence

from cognite.client import ClientConfig, CogniteClient, data_modeling as dm
from cognite.client.data_classes import TimeSeriesList, FileMetadataList, SequenceList
from cognite.client.credentials import OAuthClientCredentials

from ._api import (
ConnectionItemAAPI,
ConnectionItemBAPI,
ConnectionItemCNodeAPI,
)
from ._api._core import SequenceNotStr, GraphQLQueryResponse
from .data_classes._core import GraphQLList
from . import data_classes


class OmniSubClient:
"""
OmniSubClient
Generated with:
pygen = 0.99.34
cognite-sdk = 7.62.4
pydantic = 1.10.7
Data Model:
space: pygen-models
externalId: OmniSub
version: 1
"""

def __init__(self, config_or_client: CogniteClient | ClientConfig):
if isinstance(config_or_client, CogniteClient):
client = config_or_client
elif isinstance(config_or_client, ClientConfig):
client = CogniteClient(config_or_client)
else:
raise ValueError(f"Expected CogniteClient or ClientConfig, got {type(config_or_client)}")
# The client name is used for aggregated logging of Pygen Usage
client.config.client_name = "CognitePygen:0.99.34"

self._client = client

self.connection_item_a = ConnectionItemAAPI(client)
self.connection_item_b = ConnectionItemBAPI(client)
self.connection_item_c_node = ConnectionItemCNodeAPI(client)

def upsert(
self,
items: data_classes.DomainModelWrite | Sequence[data_classes.DomainModelWrite],
replace: bool = False,
write_none: bool = False,
allow_version_increase: bool = False,
) -> data_classes.ResourcesWriteResult:
"""Add or update (upsert) items.
This method will create the nodes, edges, timeseries, files and sequences of the supplied items.
Args:
items: One or more instances of the pygen generated data classes.
replace (bool): How do we behave when a property value exists? Do we replace all matching and existing values with the supplied values (true)?
Or should we merge in new values for properties together with the existing values (false)? Note: This setting applies for all nodes or edges specified in the ingestion call.
write_none (bool): This method will, by default, skip properties that are set to None. However, if you want to set properties to None,
you can set this parameter to True. Note this only applies to properties that are nullable.
allow_version_increase (bool): If set to true, the version of the instance will be increased if the instance already exists.
If you get an error: 'A version conflict caused the ingest to fail', you can set this to true to allow
the version to increase.
Returns:
Created instance(s), i.e., nodes, edges, and time series.
"""
instances = self._create_instances(items, write_none, allow_version_increase)
result = self._client.data_modeling.instances.apply(
nodes=instances.nodes,
edges=instances.edges,
auto_create_start_nodes=True,
auto_create_end_nodes=True,
replace=replace,
)
time_series = TimeSeriesList([])
if instances.time_series:
time_series = self._client.time_series.upsert(instances.time_series, mode="patch")
files = FileMetadataList([])
if instances.files:
for file in instances.files:
created, _ = self._client.files.create(file, overwrite=True)
files.append(created)

sequences = SequenceList([])
if instances.sequences:
sequences = self._client.sequences.upsert(instances.sequences, mode="patch")

return data_classes.ResourcesWriteResult(result.nodes, result.edges, time_series, files, sequences)

def _create_instances(
self,
items: data_classes.DomainModelWrite | Sequence[data_classes.DomainModelWrite],
write_none: bool,
allow_version_increase: bool,
) -> data_classes.ResourcesWrite:
if isinstance(items, data_classes.DomainModelWrite):
instances = items.to_instances_write(write_none, allow_version_increase)
else:
instances = data_classes.ResourcesWrite()
cache: set[tuple[str, str]] = set()
for item in items:
instances.extend(
item._to_instances_write(
cache,
write_none,
allow_version_increase,
)
)
return instances

def apply(
self,
items: data_classes.DomainModelWrite | Sequence[data_classes.DomainModelWrite],
replace: bool = False,
write_none: bool = False,
) -> data_classes.ResourcesWriteResult:
"""[DEPRECATED] Add or update (upsert) items.
Args:
items: One or more instances of the pygen generated data classes.
replace (bool): How do we behave when a property value exists? Do we replace all matching and existing values with the supplied values (true)?
Or should we merge in new values for properties together with the existing values (false)? Note: This setting applies for all nodes or edges specified in the ingestion call.
write_none (bool): This method will, by default, skip properties that are set to None. However, if you want to set properties to None,
you can set this parameter to True. Note this only applies to properties that are nullable.
Returns:
Created instance(s), i.e., nodes, edges, and time series.
"""
warnings.warn(
"The .apply method is deprecated and will be removed in v1.0. "
"Please use the .upsert method on the instead."
"The motivation is that .upsert is a more descriptive name for the operation.",
UserWarning,
stacklevel=2,
)
return self.upsert(items, replace, write_none)

def delete(
self,
external_id: (
str | SequenceNotStr[str] | data_classes.DomainModelWrite | Sequence[data_classes.DomainModelWrite]
),
space: str | None = None,
) -> dm.InstancesDeleteResult:
"""Delete one or more items.
If you pass in an item, it will be deleted recursively, i.e., all connected nodes and edges
will be deleted as well.
Args:
external_id: The external id or items(s) to delete.
space: The space where all the item(s) are located.
Returns:
The instance(s), i.e., nodes and edges which has been deleted. Empty list if nothing was deleted.
Examples:
Delete item by id:
>>> from omni_sub_pydantic_v1 import OmniSubClient
>>> client = OmniSubClient()
>>> client.delete("my_node_external_id")
"""
if space is None and (
isinstance(external_id, str)
or (isinstance(external_id, Sequence) and all(isinstance(item, str) for item in external_id))
):
raise ValueError("Expected space to be set when deleting by external_id")
if isinstance(external_id, str):
return self._client.data_modeling.instances.delete(nodes=(space, external_id)) # type: ignore[arg-type]
elif isinstance(external_id, Sequence) and all(isinstance(item, str) for item in external_id):
return self._client.data_modeling.instances.delete(
nodes=[(space, id_) for id_ in external_id if isinstance(id_, str)], # type: ignore[arg-type]
)
elif isinstance(external_id, data_classes.DomainModelWrite) or (
isinstance(external_id, Sequence)
and not isinstance(external_id, str)
and all(isinstance(item, data_classes.DomainModelWrite) for item in external_id)
):
resources = self._create_instances(external_id, False, False)
return self._client.data_modeling.instances.delete(
nodes=resources.nodes.as_ids(),
edges=resources.edges.as_ids(),
)
else:
raise ValueError(
"Expected str, list of str, or DomainModelWrite, list of DomainModelWrite," f"got {type(external_id)}"
)

def graphql_query(self, query: str, variables: dict[str, Any] | None = None) -> GraphQLList:
"""Execute a GraphQl query against the OmniSub data model.
Args:
query (str): The GraphQL query to issue.
variables (dict[str, Any] | None): An optional dict of variables to pass to the query.
"""
data_model_id = dm.DataModelId("pygen-models", "OmniSub", "1")
result = self._client.data_modeling.graphql.query(data_model_id, query, variables)
return GraphQLQueryResponse(data_model_id).parse(result)

@classmethod
def azure_project(
cls, tenant_id: str, client_id: str, client_secret: str, cdf_cluster: str, project: str
) -> OmniSubClient:
credentials = OAuthClientCredentials.default_for_azure_ad(tenant_id, client_id, client_secret, cdf_cluster)
config = ClientConfig.default(project, cdf_cluster, credentials)

return cls(config)

@classmethod
def from_toml(cls, file_path: Path | str, section: str | None = "cognite") -> OmniSubClient:
import toml

toml_content = toml.load(file_path)
if section is not None:
try:
toml_content = toml_content[section]
except KeyError as e:
raise ValueError(f"Could not find section '{section}' in {file_path}") from e

return cls.azure_project(**toml_content)

def _repr_html_(self) -> str:
return """<strong>OmniSubClient</strong> generated from data model ("pygen-models", "OmniSub", "1")<br />
with the following APIs available<br />
&nbsp;&nbsp;&nbsp;&nbsp;.connection_item_a<br />
&nbsp;&nbsp;&nbsp;&nbsp;.connection_item_b<br />
&nbsp;&nbsp;&nbsp;&nbsp;.connection_item_c_node<br />
<br />
and with the methods:<br />
&nbsp;&nbsp;&nbsp;&nbsp;.upsert - Create or update any instance.<br />
&nbsp;&nbsp;&nbsp;&nbsp;.delete - Delete instances.<br />
"""

0 comments on commit 020b9ec

Please sign in to comment.