Skip to content

Commit

Permalink
Initial commit for new runtime and base class
Browse files Browse the repository at this point in the history
The idea here is to split the responsibilities of the current base class
into two: A runtime and an extractor.

The runtime is responsible for parsing command line arguments, loading
config files and so on, before spawning the extractor in a separate
process. The runtime will automatically restart the extractor if it
crashes, but can also be asked by the extractor to restart it -  for
example after a config change.

The extractor class is then only responsible for running the extractor
application itself, making it much cleaner.
  • Loading branch information
mathialo committed Sep 12, 2024
1 parent 39053c6 commit 1994737
Show file tree
Hide file tree
Showing 9 changed files with 399 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ __pycache__/
# Local test files
test.py
test.yaml
local-test.yaml
pyrightconfig.json

# Tokens, etc
Expand Down
6 changes: 4 additions & 2 deletions cognite/extractorutils/configtools/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ def translate_camel(key: str) -> str:
raise ValueError(f"Invalid case style: {case_style}")


def _load_certificate_data(cert_path: str, password: Optional[str]) -> Union[Tuple[str, str], Tuple[bytes, bytes]]:
path = Path(cert_path)
def _load_certificate_data(
cert_path: str | Path, password: Optional[str]
) -> Union[Tuple[str, str], Tuple[bytes, bytes]]:
path = Path(cert_path) if isinstance(cert_path, str) else cert_path
cert_data = Path(path).read_bytes()

if path.suffix == ".pem":
Expand Down
6 changes: 3 additions & 3 deletions cognite/extractorutils/unstable/configuration/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from enum import Enum
from io import StringIO
from pathlib import Path
from typing import Dict, Optional, TextIO, Type, TypeVar, Union
from typing import Dict, Optional, TextIO, Tuple, Type, TypeVar, Union

from pydantic import ValidationError

Expand Down Expand Up @@ -33,7 +33,7 @@ def load_file(path: Path, schema: Type[_T]) -> _T:

def load_from_cdf(
cognite_client: CogniteClient, external_id: str, schema: Type[_T], revision: Optional[int] = None
) -> _T:
) -> Tuple[_T, int]:
params: Dict[str, Union[str, int]] = {"externalId": external_id}
if revision:
params["revision"] = revision
Expand All @@ -44,7 +44,7 @@ def load_from_cdf(
)
response.raise_for_status()
data = response.json()
return load_io(StringIO(data["config"]), ConfigFormat.YAML, schema)
return load_io(StringIO(data["config"]), ConfigFormat.YAML, schema), data["revision"]


def load_io(stream: TextIO, format: ConfigFormat, schema: Type[_T]) -> _T:
Expand Down
70 changes: 68 additions & 2 deletions cognite/extractorutils/unstable/configuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,20 @@
from datetime import timedelta
from enum import Enum
from pathlib import Path
from typing import Annotated, Any, Dict, List, Literal, Optional, Union
from typing import Annotated, Any, Dict, List, Literal, Optional, Union, assert_never

from humps import kebabize
from pydantic import BaseModel, ConfigDict, Field, GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema

from cognite.client import CogniteClient
from cognite.client.config import ClientConfig
from cognite.client.credentials import (
CredentialProvider,
OAuthClientCertificate,
OAuthClientCredentials,
)
from cognite.extractorutils.configtools._util import _load_certificate_data
from cognite.extractorutils.exceptions import InvalidConfigError


Expand All @@ -33,7 +41,9 @@ class _ClientCredentialsConfig(ConfigModel):
class _ClientCertificateConfig(ConfigModel):
type: Literal["client-certificate"]
client_id: str
certificate_path: Path
path: Path
password: Optional[str] = None
authority_url: str
scopes: List[str]


Expand Down Expand Up @@ -121,6 +131,7 @@ class _ConnectionParameters(ConfigModel):
max_connection_pool_size: int = 50
ssl_verify: bool = True
proxies: Dict[str, str] = Field(default_factory=dict)
timeout: TimeIntervalConfig = Field(default_factory=lambda: TimeIntervalConfig("30s"))


class ConnectionConfig(ConfigModel):
Expand All @@ -133,6 +144,61 @@ class ConnectionConfig(ConfigModel):

connection: _ConnectionParameters = Field(default_factory=_ConnectionParameters)

def get_cognite_client(self, client_name: str) -> CogniteClient:
from cognite.client.config import global_config

global_config.disable_pypi_version_check = True
global_config.disable_gzip = not self.connection.gzip_compression
global_config.status_forcelist = set(self.connection.status_forcelist)
global_config.max_retries = self.connection.max_retries
global_config.max_retries_connect = self.connection.max_retries_connect
global_config.max_retry_backoff = self.connection.max_retry_backoff.seconds
global_config.max_connection_pool_size = self.connection.max_connection_pool_size
global_config.disable_ssl = not self.connection.ssl_verify
global_config.proxies = self.connection.proxies

credential_provider: CredentialProvider
match self.authentication:
case _ClientCredentialsConfig() as client_credentials:
kwargs = {
"token_url": client_credentials.token_url,
"client_id": client_credentials.client_id,
"client_secret": client_credentials.client_secret,
"scopes": client_credentials.scopes,
}
if client_credentials.audience is not None:
kwargs["audience"] = client_credentials.audience
if client_credentials.resource is not None:
kwargs["resource"] = client_credentials.resource

credential_provider = OAuthClientCredentials(**kwargs) # type: ignore # I know what I'm doing

case _ClientCertificateConfig() as client_certificate:
thumbprint, key = _load_certificate_data(
client_certificate.path,
client_certificate.password,
)
credential_provider = OAuthClientCertificate(
authority_url=client_certificate.authority_url,
client_id=client_certificate.client_id,
cert_thumbprint=str(thumbprint),
certificate=str(key),
scopes=client_certificate.scopes,
)

case _:
assert_never()

client_config = ClientConfig(
project=self.project,
base_url=self.base_url,
client_name=client_name,
timeout=self.connection.timeout.seconds,
credentials=credential_provider,
)

return CogniteClient(client_config)


class LogLevel(Enum):
CRITICAL = "CRITICAL"
Expand Down
Empty file.
31 changes: 31 additions & 0 deletions cognite/extractorutils/unstable/core/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
Example of how you would build an extractor with the new base class
"""

from cognite.extractorutils.unstable.configuration.models import ExtractorConfig

from .base import Extractor
from .runtime import Runtime


class MyConfig(ExtractorConfig):
parameter_one: int
parameter_two: str


class MyExtractor(Extractor[MyConfig]):
NAME = "Test extractor"
EXTERNAL_ID = "test-extractor"
DESCRIPTION = "Test of the new runtime"
VERSION = "1.0.0"
CONFIG_TYPE = MyConfig

def run(self) -> None:
self.logger.info("Started!")
if not self.cancellation_token.wait(10):
raise ValueError("Oops")


if __name__ == "__main__":
runtime = Runtime(MyExtractor)
runtime.run()
5 changes: 5 additions & 0 deletions cognite/extractorutils/unstable/core/_messaging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from enum import Enum


class RuntimeMessage(Enum):
RESTART = 1
119 changes: 119 additions & 0 deletions cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import logging
from multiprocessing import Queue
from threading import RLock, Thread
from types import TracebackType
from typing import Generic, Literal, Optional, Type, TypeVar, Union

try:
from typing import Self
except ImportError:
from typing_extensions import Self

from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.unstable.configuration.models import ConnectionConfig, ExtractorConfig
from cognite.extractorutils.unstable.core._messaging import RuntimeMessage

ConfigType = TypeVar("ConfigType", bound=ExtractorConfig)
ConfigRevision = Union[Literal["local"], int]


class Extractor(Generic[ConfigType]):
NAME: str
EXTERNAL_ID: str
DESCRIPTION: str
VERSION: str

CONFIG_TYPE: Type[ConfigType]

def __init__(
self,
connection_config: ConnectionConfig,
application_config: ConfigType,
current_config_revision: ConfigRevision,
) -> None:
self.cancellation_token = CancellationToken()
self.cancellation_token.cancel_on_interrupt()

self.connection_config = connection_config
self.application_config = application_config
self.current_config_revision = current_config_revision

self.cognite_client = self.connection_config.get_cognite_client(f"{self.EXTERNAL_ID}-{self.VERSION}")

self._checkin_lock = RLock()
self._runtime_messages: Optional[Queue[RuntimeMessage]] = None

self.logger = logging.getLogger(f"{self.EXTERNAL_ID}.main")

def _set_runtime_message_queue(self, queue: Queue) -> None:
self._runtime_messages = queue

def _run_checkin(self) -> None:
def checkin() -> None:
body = {"externalId": self.connection_config.extraction_pipeline}

with self._checkin_lock:
res = self.cognite_client.post(
f"/api/v1/projects/{self.cognite_client.config.project}/odin/checkin",
json=body,
headers={"cdf-version": "alpha"},
)
new_config_revision = res.json().get("lastConfigRevision")

if new_config_revision and new_config_revision != self.current_config_revision:
self.restart()

while not self.cancellation_token.is_cancelled:
try:
checkin()
except Exception:
self.logger.exception("Error during checkin")
self.cancellation_token.wait(10)

def restart(self) -> None:
if self._runtime_messages:
self._runtime_messages.put(RuntimeMessage.RESTART)
self.cancellation_token.cancel()

@classmethod
def init_from_runtime(
cls,
connection_config: ConnectionConfig,
application_config: ConfigType,
current_config_revision: ConfigRevision,
) -> Self:
return cls(connection_config, application_config, current_config_revision)

def start(self) -> None:
self.cognite_client.post(
f"/api/v1/projects/{self.cognite_client.config.project}/odin/extractorinfo",
json={
"externalId": self.connection_config.extraction_pipeline,
"activeConfigRevision": self.current_config_revision,
"extractor": {
"version": self.VERSION,
"externalId": self.EXTERNAL_ID,
},
},
headers={"cdf-version": "alpha"},
)
Thread(target=self._run_checkin, name="ExtractorCheckin", daemon=True).start()

def stop(self) -> None:
self.cancellation_token.cancel()

def __enter__(self) -> Self:
self.start()
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> bool:
self.stop()
return exc_val is None

def run(self) -> None:
raise NotImplementedError()
Loading

0 comments on commit 1994737

Please sign in to comment.