Skip to content

Commit

Permalink
feat: core dm (#349)
Browse files Browse the repository at this point in the history
  • Loading branch information
toondaey authored Aug 30, 2024
1 parent 77bee73 commit 097fbc9
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 70 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ __pycache__/
# Local test files
test.py
test.yaml
pyrightconfig.json

# Tokens, etc
token.txt
Expand Down
4 changes: 2 additions & 2 deletions .readthedocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ version: 2

python:
install:
- requirements: 'docs/requirements.txt'
- requirements: "docs/requirements.txt"
- method: pip
path: .

sphinx:
configuration: 'docs/source/conf.py'
configuration: "docs/source/conf.py"
fail_on_warning: false

build:
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## 7.4.0

## Added

* Upload to Core DM/Classic file.

## 7.3.0

### Changed
Expand Down
165 changes: 117 additions & 48 deletions cognite/extractorutils/uploader/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

from cognite.client import CogniteClient
from cognite.client.data_classes import FileMetadata
from cognite.client.data_classes.data_modeling import NodeId
from cognite.client.data_classes.data_modeling.extractor_extensions.v1 import CogniteExtractorFileApply
from cognite.client.utils._identifier import IdentifierSequence
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.uploader._base import (
RETRIES,
Expand All @@ -48,6 +51,10 @@
# 4000 MiB
_MAX_FILE_CHUNK_SIZE = 4 * 1024 * 1024 * 1000

_CDF_ALPHA_VERSION_HEADER = {"cdf-version": "alpha"}

FileMetadataOrCogniteExtractorFile = Union[FileMetadata, CogniteExtractorFileApply]


class ChunkedStream(RawIOBase, BinaryIO):
"""
Expand Down Expand Up @@ -178,7 +185,7 @@ class IOFileUploadQueue(AbstractUploadQueue):
def __init__(
self,
cdf_client: CogniteClient,
post_upload_function: Optional[Callable[[List[FileMetadata]], None]] = None,
post_upload_function: Optional[Callable[[List[FileMetadataOrCogniteExtractorFile]], None]] = None,
max_queue_size: Optional[int] = None,
trigger_log_level: str = "DEBUG",
thread_name: Optional[str] = None,
Expand Down Expand Up @@ -238,17 +245,89 @@ def _remove_done_from_queue(self) -> None:

self.cancellation_token.wait(5)

def _apply_cognite_file(self, file_apply: CogniteExtractorFileApply) -> NodeId:
instance_result = self.cdf_client.data_modeling.instances.apply(file_apply)
node = instance_result.nodes[0]
return node.as_id()

def _upload_empty(
self, meta_or_apply: FileMetadataOrCogniteExtractorFile
) -> tuple[FileMetadataOrCogniteExtractorFile, str]:
if isinstance(meta_or_apply, CogniteExtractorFileApply):
node_id = self._apply_cognite_file(meta_or_apply)
meta_or_apply, url = self._create_cdm(instance_id=node_id)
else:
meta_or_apply, url = self.cdf_client.files.create(
file_metadata=meta_or_apply, overwrite=self.overwrite_existing
)
return meta_or_apply, url

def _upload_bytes(self, size: int, file: BinaryIO, meta_or_apply: FileMetadataOrCogniteExtractorFile) -> None:
meta_or_apply, url = self._upload_empty(meta_or_apply)
resp = self._httpx_client.send(self._get_file_upload_request(url, file, size))
resp.raise_for_status()

def _upload_multipart(self, size: int, file: BinaryIO, meta_or_apply: FileMetadataOrCogniteExtractorFile) -> None:
chunks = ChunkedStream(file, self.max_file_chunk_size, size)
self.logger.debug(
f"File {meta_or_apply.external_id} is larger than 5GiB ({size})"
f", uploading in {chunks.chunk_count} chunks"
)

returned_file_metadata = self._create_multi_part(meta_or_apply, chunks)
upload_urls = returned_file_metadata["uploadUrls"]
upload_id = returned_file_metadata["uploadId"]
file_meta = FileMetadata.load(returned_file_metadata)

for url in upload_urls:
chunks.next_chunk()
resp = self._httpx_client.send(self._get_file_upload_request(url, chunks, len(chunks)))
resp.raise_for_status()

completed_headers = (
_CDF_ALPHA_VERSION_HEADER if isinstance(meta_or_apply, CogniteExtractorFileApply) is not None else None
)

res = self.cdf_client.files._post(
url_path="/files/completemultipartupload",
json={"id": file_meta.id, "uploadId": upload_id},
headers=completed_headers,
)
res.raise_for_status()

def _create_multi_part(self, meta_or_apply: FileMetadataOrCogniteExtractorFile, chunks: ChunkedStream) -> dict:
if isinstance(meta_or_apply, CogniteExtractorFileApply):
node_id = self._apply_cognite_file(meta_or_apply)
identifiers = IdentifierSequence.load(instance_ids=node_id).as_singleton()
self.cdf_client.files._warn_alpha()
res = self.cdf_client.files._post(
url_path="/files/multiuploadlink",
json={"items": identifiers.as_dicts()},
params={"parts": chunks.chunk_count},
headers=_CDF_ALPHA_VERSION_HEADER,
)
res.raise_for_status()
return res.json()["items"][0]
else:
res = self.cdf_client.files._post(
url_path="/files/initmultipartupload",
json=meta_or_apply.dump(camel_case=True),
params={"overwrite": self.overwrite_existing, "parts": chunks.chunk_count},
)
res.raise_for_status()
return res.json()

def add_io_to_upload_queue(
self,
file_meta: FileMetadata,
meta_or_apply: FileMetadataOrCogniteExtractorFile,
read_file: Callable[[], BinaryIO],
extra_retries: Optional[
Union[Tuple[Type[Exception], ...], Dict[Type[Exception], Callable[[Any], bool]]]
] = None,
) -> None:
"""
Add file to upload queue. The file will start uploading immedeately. If the size of the queue is larger than
the specified max size, this call will block until it's
the specified max size, this call will block until it's completed the upload.
Args:
file_meta: File metadata-object
Expand All @@ -258,7 +337,7 @@ def add_io_to_upload_queue(
"""
retries = cognite_exceptions()
if isinstance(extra_retries, tuple):
retries.update({exc: lambda _e: True for exc in extra_retries or []})
retries.update({exc: lambda _: True for exc in extra_retries or []})
elif isinstance(extra_retries, dict):
retries.update(extra_retries)

Expand All @@ -270,60 +349,36 @@ def add_io_to_upload_queue(
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def upload_file(read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -> None:
def upload_file(read_file: Callable[[], BinaryIO], meta_or_apply: FileMetadataOrCogniteExtractorFile) -> None:
with read_file() as file:
size = super_len(file)
if size == 0:
# upload just the file metadata witout data
file_meta, _url = self.cdf_client.files.create(
file_metadata=file_meta, overwrite=self.overwrite_existing
)
meta_or_apply, _ = self._upload_empty(meta_or_apply)
elif size >= self.max_single_chunk_file_size:
# The minimum chunk size is 4000MiB.
chunks = ChunkedStream(file, self.max_file_chunk_size, size)
self.logger.debug(
f"File {file_meta.external_id} is larger than 5GiB ({size})"
f", uploading in {chunks.chunk_count} chunks"
)

res = self.cdf_client.files._post(
url_path="/files/initmultipartupload",
json=file_meta.dump(camel_case=True),
params={"overwrite": self.overwrite_existing, "parts": chunks.chunk_count},
)
returned_file_metadata = res.json()
upload_urls = returned_file_metadata["uploadUrls"]
upload_id = returned_file_metadata["uploadId"]
file_meta = FileMetadata.load(returned_file_metadata)

for url in upload_urls:
chunks.next_chunk()
resp = self._httpx_client.send(self._get_file_upload_request(url, chunks, len(chunks)))
resp.raise_for_status()

self.cdf_client.files._post(
url_path="/files/completemultipartupload", json={"id": file_meta.id, "uploadId": upload_id}
)
self._upload_multipart(size, file, meta_or_apply)

else:
file_meta, url = self.cdf_client.files.create(
file_metadata=file_meta, overwrite=self.overwrite_existing
)
resp = self._httpx_client.send(self._get_file_upload_request(url, file, size))
resp.raise_for_status()
self._upload_bytes(size, file, meta_or_apply)

if isinstance(meta_or_apply, CogniteExtractorFileApply):
meta_or_apply.is_uploaded = True

if self.post_upload_function:
try:
self.post_upload_function([file_meta])
self.post_upload_function([meta_or_apply])
except Exception as e:
self.logger.error("Error in upload callback: %s", str(e))

def wrapped_upload(read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -> None:
def wrapped_upload(
read_file: Callable[[], BinaryIO], meta_or_apply: FileMetadataOrCogniteExtractorFile
) -> None:
try:
upload_file(read_file, file_meta)
upload_file(read_file, meta_or_apply)

except Exception as e:
self.logger.exception(f"Unexpected error while uploading file: {file_meta.external_id}")
self.logger.exception(f"Unexpected error while uploading file: {meta_or_apply.external_id}")
self.errors.append(e)

finally:
Expand All @@ -340,7 +395,7 @@ def wrapped_upload(read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -
pass

with self.lock:
self.upload_queue.append(self._pool.submit(wrapped_upload, read_file, file_meta))
self.upload_queue.append(self._pool.submit(wrapped_upload, read_file, meta_or_apply))
self.upload_queue_size += 1
self.files_queued.inc()
self.queue_size.set(self.upload_queue_size)
Expand All @@ -364,6 +419,18 @@ def _get_file_upload_request(self, url_str: str, stream: BinaryIO, size: int) ->
headers=headers,
)

def _create_cdm(self, instance_id: NodeId) -> tuple[FileMetadata, str]:
self.cdf_client.files._warn_alpha()
identifiers = IdentifierSequence.load(instance_ids=instance_id).as_singleton()
res = self.cdf_client.files._post(
url_path="/files/uploadlink",
json={"items": identifiers.as_dicts()},
headers=_CDF_ALPHA_VERSION_HEADER,
)
res.raise_for_status()
resp_json = res.json()["items"][0]
return FileMetadata.load(resp_json), resp_json["uploadUrl"]

def upload(self, fail_on_errors: bool = True, timeout: Optional[float] = None) -> None:
"""
Wait for all uploads to finish
Expand Down Expand Up @@ -428,7 +495,7 @@ class FileUploadQueue(IOFileUploadQueue):
def __init__(
self,
cdf_client: CogniteClient,
post_upload_function: Optional[Callable[[List[FileMetadata]], None]] = None,
post_upload_function: Optional[Callable[[List[FileMetadataOrCogniteExtractorFile]], None]] = None,
max_queue_size: Optional[int] = None,
max_upload_interval: Optional[int] = None,
trigger_log_level: str = "DEBUG",
Expand All @@ -447,7 +514,9 @@ def __init__(
cancellation_token,
)

def add_to_upload_queue(self, file_meta: FileMetadata, file_name: Union[str, PathLike]) -> None:
def add_to_upload_queue(
self, meta_or_apply: FileMetadataOrCogniteExtractorFile, file_name: Union[str, PathLike]
) -> None:
"""
Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold
specified in the __init__.
Expand All @@ -461,7 +530,7 @@ def add_to_upload_queue(self, file_meta: FileMetadata, file_name: Union[str, Pat
def load_file_from_path() -> BinaryIO:
return open(file_name, "rb")

self.add_io_to_upload_queue(file_meta, load_file_from_path)
self.add_io_to_upload_queue(meta_or_apply, load_file_from_path)


class BytesUploadQueue(IOFileUploadQueue):
Expand All @@ -481,7 +550,7 @@ class BytesUploadQueue(IOFileUploadQueue):
def __init__(
self,
cdf_client: CogniteClient,
post_upload_function: Optional[Callable[[List[FileMetadata]], None]] = None,
post_upload_function: Optional[Callable[[List[FileMetadataOrCogniteExtractorFile]], None]] = None,
max_queue_size: Optional[int] = None,
trigger_log_level: str = "DEBUG",
thread_name: Optional[str] = None,
Expand All @@ -498,7 +567,7 @@ def __init__(
cancellation_token,
)

def add_to_upload_queue(self, content: bytes, metadata: FileMetadata) -> None:
def add_to_upload_queue(self, content: bytes, meta_or_apply: FileMetadataOrCogniteExtractorFile) -> None:
"""
Add object to upload queue. The queue will be uploaded if the queue size is larger than the threshold
specified in the __init__.
Expand All @@ -510,4 +579,4 @@ def add_to_upload_queue(self, content: bytes, metadata: FileMetadata) -> None:
def get_byte_io() -> BinaryIO:
return BytesIO(content)

self.add_io_to_upload_queue(metadata, get_byte_io)
self.add_io_to_upload_queue(meta_or_apply, get_byte_io)
9 changes: 9 additions & 0 deletions cognite/extractorutils/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import logging
import random
from datetime import datetime, timezone
from functools import partial, wraps
from threading import Thread
from time import time
Expand Down Expand Up @@ -501,3 +502,11 @@ def handle_cognite_errors(exception: CogniteException) -> bool:
return True

return {CogniteException: handle_cognite_errors}


def datetime_to_timestamp(dt: datetime) -> int:
return int(dt.timestamp() * 1000)


def timestamp_to_datetime(ts: int) -> datetime:
return datetime.fromtimestamp(ts / 1000, tz=timezone.utc)
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cognite-extractor-utils"
version = "7.3.0"
version = "7.4.0"
description = "Utilities for easier development of extractors for CDF"
authors = ["Mathias Lohne <mathias.lohne@cognite.com>"]
license = "Apache-2.0"
Expand Down Expand Up @@ -58,7 +58,7 @@ exclude = "tests/*"

[tool.poetry.dependencies]
python = "^3.9.0"
cognite-sdk = "^7.43.3"
cognite-sdk = "^7.54.17"
prometheus-client = ">0.7.0, <=1.0.0"
arrow = "^1.0.0"
pyyaml = ">=5.3.0, <7"
Expand Down
Loading

0 comments on commit 097fbc9

Please sign in to comment.