Skip to content

Commit

Permalink
Merge pull request #28 from aodn/PyVersion
Browse files Browse the repository at this point in the history
Python Version update
  • Loading branch information
lbesnard authored Jul 5, 2024
2 parents 0b37123 + 928793c commit 9117406
Show file tree
Hide file tree
Showing 14 changed files with 228 additions and 195 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ on:
jobs:
build-linux:
runs-on: ubuntu-latest
strategy:
matrix:
python-version:
- '3.10.14'
- '3.11.9'
- '3.12.4'

defaults:
run:
shell: bash -e {0}
Expand All @@ -22,7 +29,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10.14'
python-version: ${{ matrix.python-version }}

#----------------------------------------------
# ----- install & configure poetry -----
Expand Down
26 changes: 14 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@ Requirements:
curl -s https://raw.githubusercontent.com/aodn/aodn_cloud_optimised/main/install.sh | bash
```

Otherwise go to
github.com/aodn/aodn_cloud_optimised/releases/latest
Otherwise go to the [release](http://github.com/aodn/aodn_cloud_optimised/releases/latest) page.

## Development
Requirements:
* Mamba from miniforge3: https://github.com/conda-forge/miniforge
* Mamba from [miniforge3](https://github.com/conda-forge/miniforge)

```bash
mamba env create --file=environment.yml
Expand All @@ -60,8 +59,12 @@ poetry install --with dev
```bash
poetry update
```
to update the poetry.lock file. Commit the changes to poetry.lock
to update and commit the changes to ```poetry.lock```

To update the ```requirements.txt```, run
```bash
poetry export -f requirements.txt --without-hashes -o requirements.txt
```

# Requirements
AWS SSO to push files to S3
Expand Down Expand Up @@ -96,8 +99,8 @@ options:
Cluster mode to use. Options: 'local' or 'remote'. Default is 'local'.
--optimised-bucket-name OPTIMISED_BUCKET_NAME
Bucket name where cloud optimised object will be created. Default is 'imos-data-lab-optimised'
--root_prefix-cloud-optimised-path ROOT_PREFIX_CLOUD_OPTIMISED_PATH
Prefix value for the root location of the cloud optimised objects, such as s3://optimised-bucket-name/root_prefix-cloud-optimised-path/... Default is 'cloud_optimised/cluster_testing'
--root-prefix-cloud-optimised-path ROOT_PREFIX_CLOUD_OPTIMISED_PATH
Prefix value for the root location of the cloud optimised objects, such as s3://optimised-bucket-name/root-prefix-cloud-optimised-path/... Default is 'cloud_optimised/cluster_testing'
--bucket-raw BUCKET_RAW
Bucket name where input object files will be searched for. Default is 'imos-data'

Expand All @@ -110,7 +113,7 @@ Examples:
## As a python module
```python
import importlib.resources
from importlib.resources import files

from aodn_cloud_optimised.lib.CommonHandler import cloud_optimised_creation
from aodn_cloud_optimised.lib.config import (
Expand All @@ -125,17 +128,16 @@ def main():
nc_obj_ls = s3_ls(BUCKET_RAW_DEFAULT, "IMOS/SRS/SST/ghrsst/L3S-1d/dn/2024")

dataset_config = load_dataset_config(
str(
importlib.resources.path(
"aodn_cloud_optimised.config.dataset", "satellite_ghrsst_l3s_1day_daynighttime_single_sensor_australia.json"
str(files("aodn_cloud_optimised").joinpath("config").joinpath("dataset").joinpath("satellite_ghrsst_l3s_1day_daynighttime_single_sensor_australia.json")
)
)
)

cloud_optimised_creation(
nc_obj_ls,
dataset_config=dataset_config,
reprocess=True,
# clear_existing_data=True, # this will delete existing data, be cautious! If testing change the paths below
# optimised_bucket_name="imos-data-lab-optimised", # optional, default value in config/common.json
# root_prefix_cloud_optimised_path="cloud_optimised/cluster_testing", # optional, default value in config/common.json
cluster_mode='remote'
)

Expand Down
15 changes: 6 additions & 9 deletions aodn_cloud_optimised/bin/generic_cloud_optimised_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@
--force-previous-parquet-deletion: Flag to force the search of previous equivalent parquet file created. Much slower. Default is False. Only for Parquet processing.
--cluster-mode: Cluster mode to use. Options: 'local' or 'remote'. Default is 'local'.
--optimised-bucket-name: Bucket name where cloud optimised object will be created. Default is the value of BUCKET_OPTIMISED_DEFAULT from the config.
--root_prefix-cloud-optimised-path: Prefix value for the root location of the cloud optimised objects. Default is the value of ROOT_PREFIX_CLOUD_OPTIMISED_PATH from the config.
--root-prefix-cloud-optimised-path: Prefix value for the root location of the cloud optimised objects. Default is the value of ROOT_PREFIX_CLOUD_OPTIMISED_PATH from the config.
--bucket-raw: Bucket name where input object files will be searched for. Default is the value of BUCKET_RAW_DEFAULT from the config.
"""

import argparse
import importlib.resources
from importlib.resources import files

from aodn_cloud_optimised.lib.CommonHandler import cloud_optimised_creation
from aodn_cloud_optimised.lib.config import (
load_variable_from_config,
Expand Down Expand Up @@ -99,10 +100,10 @@ def main():
)

parser.add_argument(
"--root_prefix-cloud-optimised-path",
"--root-prefix-cloud-optimised-path",
default=load_variable_from_config("ROOT_PREFIX_CLOUD_OPTIMISED_PATH"),
help=f"Prefix value for the root location of the cloud optimised objects, such as "
f"s3://optimised-bucket-name/root_prefix-cloud-optimised-path/... "
f"s3://optimised-bucket-name/root-prefix-cloud-optimised-path/... "
f"Default is '{load_variable_from_config('ROOT_PREFIX_CLOUD_OPTIMISED_PATH')}'",
)

Expand All @@ -129,11 +130,7 @@ def main():
# Load dataset config
dataset_config_path = args.dataset_config
dataset_config = load_dataset_config(
str(
importlib.resources.path(
"aodn_cloud_optimised.config.dataset", dataset_config_path
)
)
str(files("aodn_cloud_optimised.config.dataset").joinpath(dataset_config_path))
)

# Call cloud_optimised_creation
Expand Down
4 changes: 2 additions & 2 deletions aodn_cloud_optimised/config/dataset/argo.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
"compute_purchase_option": "spot_with_fallback",
"worker_options": {
"nthreads": 8,
"memory_limit": "32GB" }
"memory_limit": "8GB" }
},
"batch_size": 40,
"batch_size": 600,
"metadata_uuid": "4402cb50-e20a-44ee-93e6-4728259250d2",
"gattrs_to_variables": [],
"partition_keys": [
Expand Down
10 changes: 5 additions & 5 deletions aodn_cloud_optimised/config/dataset/satellite_ghrsst_main.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
"logger_name": "srs_ghrsst",
"cloud_optimised_format": "zarr",
"cluster_options" : {
"n_workers": [2, 30],
"scheduler_vm_types": "t3.small",
"worker_vm_types": "t3.medium",
"n_workers": [2, 40],
"scheduler_vm_types": "t3.medium",
"worker_vm_types": "t3.large",
"allow_ingress_from": "me",
"compute_purchase_option": "spot_with_fallback",
"worker_options": {
"nthreads": 8,
"memory_limit": "240GB" }
"memory_limit": "6GB" }
},
"batch_size": 20,
"batch_size": 30,
"metadata_uuid": "",
"dimensions": {
"time": {"name": "time",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
"cloud_optimised_format": "parquet",
"cluster_options" : {
"n_workers": [4, 20],
"scheduler_vm_types": "t3.medium",
"worker_vm_types": "t3.xlarge",
"scheduler_vm_types": "t3.small",
"worker_vm_types": "t3.large",
"allow_ingress_from": "me",
"compute_purchase_option": "spot_with_fallback",
"worker_options": {
"nthreads": 8,
"memory_limit": "32GB" }
"memory_limit": "6GB" }
},
"batch_size": 30,
"metadata_uuid": "a681fdba-c6d9-44ab-90b9-113b0ed03536",
Expand Down
6 changes: 1 addition & 5 deletions aodn_cloud_optimised/lib/CommonHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
import timeit
from typing import List

import boto3
import s3fs
import xarray as xr
import yaml
from coiled import Cluster
from dask.distributed import Client
from dask.distributed import LocalCluster
Expand Down Expand Up @@ -217,8 +215,6 @@ def close_cluster(self, client, cluster):
except Exception as e:
self.logger.error(f"Error while closing the cluster or client: {e}")

from dask.distributed import Client, LocalCluster

def get_batch_size(self, client=None):
"""
Calculate the optimal batch size for processing files with Dask on a cluster.
Expand Down Expand Up @@ -349,7 +345,7 @@ def validate_json(self, json_validation_path):
```
Schema Loading:
The pyarrow_schema is loaded from a JSON file using `importlib.resources.path`.
The pyarrow_schema is loaded from a JSON file using `importlib.resources.files`.
Ensure the pyarrow_schema file (`schema_validation_parquet.json`) is accessible within the
`aodn_cloud_optimised.config.dataset` package.
Expand Down
39 changes: 29 additions & 10 deletions aodn_cloud_optimised/lib/GenericParquetHandler.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
import gc
import importlib.resources
import os
import re
import timeit
import traceback
from typing import Tuple, Generator

import boto3
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import traceback
import xarray as xr
from dask.distributed import Client
from dask.distributed import wait
from shapely.geometry import Point, Polygon

from .schema import create_pyarrow_schema, generate_json_schema_var_from_netcdf
from aodn_cloud_optimised.lib.logging import get_logger
from aodn_cloud_optimised.lib.s3Tools import (
delete_objects_in_prefix,
split_s3_path,
prefix_exists,
create_fileset,
)

from aodn_cloud_optimised.lib.logging import get_logger
from .CommonHandler import CommonHandler
from dask.distributed import wait
from .schema import create_pyarrow_schema, generate_json_schema_var_from_netcdf


# TODO: improve log for parallism by adding a uuid for each task
Expand Down Expand Up @@ -52,9 +53,9 @@ def __init__(self, **kwargs):
)

json_validation_path = str(
importlib.resources.path(
"aodn_cloud_optimised.config", "schema_validation_parquet.json"
)
importlib.resources.files("aodn_cloud_optimised")
.joinpath("config")
.joinpath("schema_validation_parquet.json")
)
self.validate_json(
json_validation_path
Expand Down Expand Up @@ -954,18 +955,36 @@ def task(f, i):
client, cluster = self.create_cluster()

batch_size = self.get_batch_size(client=client)
# restart_client_interval = 3 # Restart client every 5 batches, to manage memory leak. Only the dask client is restarted. This Approach might not work

# Do it in batches. maybe more efficient
ii = 0
total_batches = len(s3_file_uri_list) // batch_size + 1

for i in range(0, len(s3_file_uri_list), batch_size):
self.logger.info(f"Processing batch {ii + 1}...")
self.logger.info(f"Processing batch {ii + 1}/{total_batches}...")
batch = s3_file_uri_list[i : i + batch_size]
batch_tasks = [
client.submit(task, f, idx + 1) for idx, f in enumerate(batch)
]

wait(batch_tasks, timeout=batch_size * 120)
# timeout = batch_size * 120 # Initial timeout
done, not_done = wait(batch_tasks, return_when="ALL_COMPLETED")

ii += 1

# Cleanup memory
del batch_tasks

# Trigger garbage collection
gc.collect()

# # TODO: seem useless
# # Restart client every `restart_client_interval` batches
# if ii % restart_client_interval == 0 and ii != total_batches:
# client.close()
# client = Client(cluster)
# self.logger.info(f"Dask client restarted to avoid memory leaks")

self.close_cluster(client, cluster)
self.logger.handlers.clear()
7 changes: 4 additions & 3 deletions aodn_cloud_optimised/lib/GenericZarrHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)

json_validation_path = str(
importlib.resources.path(
"aodn_cloud_optimised.config", "schema_validation_zarr.json"
)
importlib.resources.files("aodn_cloud_optimised")
.joinpath("config")
.joinpath("schema_validation_zarr.json")
)

self.validate_json(
json_validation_path
) # we cannot validate the json config until self.dataset_config and self.logger are set
Expand Down
6 changes: 4 additions & 2 deletions aodn_cloud_optimised/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import yaml
import os
from collections import OrderedDict
from importlib.resources import path
from importlib.resources import files


def merge_dicts(parent, child):
Expand Down Expand Up @@ -88,7 +88,9 @@ def load_variable_from_config(variable_name) -> str:
:raises KeyError: If the variable is not found in the configuration file.
"""
# Obtain the file path using the context manager
with path("aodn_cloud_optimised.config", "common.json") as common_config_path:
with files("aodn_cloud_optimised").joinpath("config").joinpath(
"common.json"
) as common_config_path:
return load_variable_from_file(str(common_config_path), variable_name)


Expand Down
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ channels:
- conda-forge
- defaults
dependencies:
- python=3.10.14
- python>=3.10.14
- notebook
- h5py
- scipy
Expand Down
Loading

0 comments on commit 9117406

Please sign in to comment.