Skip to content

Commit

Permalink
Merge pull request #33 from StefanoTavonatti/main
Browse files Browse the repository at this point in the history
Validation and pollution as Airflow DAGs in sequence
  • Loading branch information
dulvui committed Jun 17, 2024
2 parents 26f94de + 92f14ff commit 5a4816d
Show file tree
Hide file tree
Showing 74 changed files with 3,421 additions and 1,298 deletions.
2 changes: 2 additions & 0 deletions pollution_v2/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ venv*
celerybeat-schedule.db
/sample_data/input/odh.db
docker-compose.yaml.custom
docker-compose.yaml.uh
/sample_data/odh.db
113 changes: 69 additions & 44 deletions pollution_v2/README.md

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions pollution_v2/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,17 @@ x-airflow-common:
AIRFLOW_VAR_ODH_PAGINATION_SIZE: '10000'
AIRFLOW_VAR_ODH_PASSWORD: '<your_odh_password>'
AIRFLOW_VAR_ODH_USERNAME: '<your_odh_username>'
# AIRFLOW_VAR_ODH_MINIMUM_STARTING_DATE: '<your_odh_minimum_starting_date>'
# AIRFLOW_VAR_COMPUTATION_CHECKPOINT_REDIS_HOST: '<your_computation_checkpoint_redis_host>'
# AIRFLOW_VAR_COMPUTATION_CHECKPOINT_REDIS_PORT: '<your_computation_checkpoint_redis_port>'
# AIRFLOW_VAR_COMPUTATION_CHECKPOINT_REDIS_DB: '<your_computation_checkpoint_redis_db>'
AIRFLOW_VAR_ODH_MINIMUM_STARTING_DATE: "2018-01-01"
AIRFLOW_VAR_COMPUTATION_CHECKPOINT_REDIS_HOST: 'redis'
AIRFLOW_VAR_COMPUTATION_CHECKPOINT_REDIS_PORT: 6379
AIRFLOW_VAR_COMPUTATION_CHECKPOINT_REDIS_DB: 10
AIRFLOW_VAR_DATATYPE_PREFIX: ""
# AIRFLOW__CORE__REMOTE_LOGGING: '<your_choice_on_remote_logging>'
# AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: '<your_bucket_for_remote_logging>'
# AIRFLOW__CORE__REMOTE_LOG_CONN_ID: '<your_connection_id_for_remote_logging>'
# AIRFLOW__CORE__ENCRYPT_S3_LOGS: '<your_choice_on_encrypt_logs>'
# AIRFLOW_CONN_MINIO_S3_CONN: '<your_connection_details_for_remote_logging>'
# AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: 4
NO_PROXY: '*'
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/src:/opt/airflow/dags
Expand Down
2 changes: 1 addition & 1 deletion pollution_v2/infrastructure/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: CC0-1.0

FROM apache/airflow:2.8.1-python3.11
FROM apache/airflow:2.9.2-python3.11
USER root
RUN apt update
RUN apt install -y gcc g++ build-essential
Expand Down
1 change: 0 additions & 1 deletion pollution_v2/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
celery==5.3.6
redis==4.3.4
sentry-sdk==1.9.9
blinker==1.5
python-keycloak==2.5.0
requests~=2.28
dateutils==0.6.12
Expand Down
File renamed without changes.
4 changes: 4 additions & 0 deletions pollution_v2/src/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# SPDX-FileCopyrightText: NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: AGPL-3.0-or-later

File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,19 @@
import json
import logging
from abc import ABC, abstractmethod
from enum import Enum
from json import JSONDecodeError
from typing import Optional, TypeVar, Generic, Type

import redis

logger = logging.getLogger("pollution_connector.cache.redis_cache")
logger = logging.getLogger("pollution_v2.common.cache.common")


class TrafficManagerClass(Enum):

POLLUTION = "POLLUTION"
VALIDATION = "VALIDATION"


class CacheData(ABC):
Expand Down Expand Up @@ -41,7 +48,7 @@ def __init__(self, r: redis.Redis, cache_data_type: Type[CacheDataType]) -> None
self._r = r

def get(self, key: str) -> Optional[CacheDataType]:
logger.info(f"Getting cached data for key [{key}]")
logger.debug(f"Getting cached data for key [{key}]")
result = self._r.get(key)
if result is not None:
try:
Expand All @@ -59,10 +66,9 @@ def set(self, data: CacheDataType, ttl: Optional[int] = None) -> None:
Cache the data in dict format inside the redis db.
:param data: the data to cache
:param key: the key to save the data
:param ttl: the time to live of the data entry (expressed in seconds)
"""
logger.info(f"Caching data for key [{data.unique_id()}] and ttl [{ttl}].")
logger.debug(f"Caching data for key [{data.unique_id()}] and ttl [{ttl}].")
if ttl:
self._r.set(data.unique_id(), json.dumps(data.to_repr()), ex=ttl)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import redis

from pollution_connector.cache.common import CacheData, RedisCache
from common.cache.common import CacheData, RedisCache
from common.data_model.common import Station


Expand All @@ -19,31 +19,34 @@ class ComputationCheckpoint(CacheData):

station_code: str
checkpoint_dt: datetime
manager_code: str

@staticmethod
def get_id_for_station(station: Union[Station, str]) -> str:
def get_id_for_station(station: Union[Station, str], manager_code: str) -> str:
base_key = "ComputationCheckpoint"
if isinstance(station, str):
return f"{base_key}-{station}"
return f"{base_key}-{station}-{manager_code}"
elif isinstance(station, Station):
return f"{base_key}-{station.code}"
return f"{base_key}-{station.code}-{manager_code}"
else:
raise TypeError(f"Unable to handle an object of type [{type(station)}]")
raise TypeError(f"Unable to handle an object of type [{type(station)}] for manager [{manager_code}]")

def unique_id(self) -> str:
return self.get_id_for_station(self.station_code)
return self.get_id_for_station(self.station_code, self.manager_code)

def to_repr(self) -> dict:
return {
"stationCode": self.station_code,
"checkpointDT": self.checkpoint_dt.isoformat()
"checkpointDT": self.checkpoint_dt.isoformat(),
"trafficManagerCode": self.manager_code
}

@staticmethod
def from_repr(raw_data: dict) -> CacheData:
return ComputationCheckpoint(
station_code=raw_data["stationCode"],
checkpoint_dt=datetime.fromisoformat(raw_data["checkpointDT"])
checkpoint_dt=datetime.fromisoformat(raw_data["checkpointDT"]),
manager_code=raw_data["trafficManagerCode"]
)


Expand Down
39 changes: 38 additions & 1 deletion pollution_v2/src/common/connector/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@

from __future__ import absolute_import, annotations

from common.connector.history import HistoryODHConnector
from common.connector.pollution import PollutionODHConnector
from common.connector.traffic import TrafficODHConnector
from common.connector.validation import ValidationODHConnector
from common.settings import ODH_AUTHENTICATION_URL, ODH_USERNAME, ODH_PASSWORD, ODH_CLIENT_ID, \
ODH_CLIENT_SECRET, ODH_PAGINATION_SIZE, REQUESTS_TIMEOUT, REQUESTS_MAX_RETRIES, REQUESTS_SLEEP_TIME, \
REQUESTS_RETRY_SLEEP_TIME, ODH_BASE_READER_URL, ODH_BASE_WRITER_URL, ODH_GRANT_TYPE, ODH_MAX_POST_BATCH_SIZE


class ConnectorCollector:

def __init__(self, traffic: TrafficODHConnector, pollution: PollutionODHConnector):
def __init__(self, traffic: TrafficODHConnector, history: HistoryODHConnector, validation: ValidationODHConnector,
pollution: PollutionODHConnector):
self.traffic = traffic
self.history = history
self.validation = validation
self.pollution = pollution

@staticmethod
Expand Down Expand Up @@ -51,6 +56,38 @@ def build_from_env() -> ConnectorCollector:
requests_sleep_time=requests_sleep_time,
requests_retry_sleep_time=requests_retry_sleep_time
),
history=HistoryODHConnector(
base_reader_url=base_reader_url,
base_writer_url=base_writer_url,
authentication_url=authentication_url,
username=user_name,
password=password,
client_id=client_id,
client_secret=client_secret,
grant_type=grant_type,
pagination_size=pagination_size,
max_post_batch_size=max_post_batch_size,
requests_timeout=requests_timeout,
requests_max_retries=requests_max_retries,
requests_sleep_time=requests_sleep_time,
requests_retry_sleep_time=requests_retry_sleep_time
),
validation=ValidationODHConnector(
base_reader_url=base_reader_url,
base_writer_url=base_writer_url,
authentication_url=authentication_url,
username=user_name,
password=password,
client_id=client_id,
client_secret=client_secret,
grant_type=grant_type,
pagination_size=pagination_size,
max_post_batch_size=max_post_batch_size,
requests_timeout=requests_timeout,
requests_max_retries=requests_max_retries,
requests_sleep_time=requests_sleep_time,
requests_retry_sleep_time=requests_retry_sleep_time
),
pollution=PollutionODHConnector(
base_reader_url=base_reader_url,
base_writer_url=base_writer_url,
Expand Down
Loading

0 comments on commit 5a4816d

Please sign in to comment.