diff --git a/jobs/jobs/airflow_utils.py b/jobs/jobs/airflow_utils.py index 2e394e85c..43a7ede5d 100644 --- a/jobs/jobs/airflow_utils.py +++ b/jobs/jobs/airflow_utils.py @@ -65,6 +65,7 @@ async def run( job_id: int, files: List[pipeline.PipelineFile], current_tenant: str, + datasets: List[pipeline.Dataset], ) -> None: configuration = get_configuration() with client.ApiClient(configuration) as api_client: @@ -74,7 +75,10 @@ async def run( dag_run_id=dag_run_id, conf=dataclasses.asdict( pipeline.PipelineRunArgs( - job_id=job_id, tenant=current_tenant, files_data=files + job_id=job_id, + tenant=current_tenant, + files_data=files, + datasets=datasets, ) ), ) @@ -92,5 +96,6 @@ async def run( job_id: str, files: List[pipeline.PipelineFile], current_tenant: str, + datasets: List[pipeline.Dataset], ) -> None: - return await run(pipeline_id, job_id, files, current_tenant) + return await run(pipeline_id, job_id, files, current_tenant, datasets) diff --git a/jobs/jobs/databricks_utils.py b/jobs/jobs/databricks_utils.py index eea2b73c9..079b9f660 100644 --- a/jobs/jobs/databricks_utils.py +++ b/jobs/jobs/databricks_utils.py @@ -45,6 +45,7 @@ async def run( job_id: int, files: List[pipeline.PipelineFile], current_tenant: str, + datasets: List[pipeline.Dataset], ) -> None: logger.info( "Running pipeline %s, job_id %s, current_tenant: %s with arguments %s", @@ -60,7 +61,10 @@ async def run( "badgerdoc_job_parameters": json.dumps( dataclasses.asdict( pipeline.PipelineRunArgs( - job_id=job_id, tenant=current_tenant, files_data=files + job_id=job_id, + tenant=current_tenant, + files_data=files, + datasets=datasets, ) ) ) @@ -79,5 +83,8 @@ async def run( job_id: str, files: List[pipeline.PipelineFile], current_tenant: str, + datasets: List[pipeline.Dataset], ) -> None: - await run(pipeline_id, int(job_id), files, current_tenant) + await run( + pipeline_id, int(job_id), files, current_tenant, datasets=datasets + ) diff --git a/jobs/jobs/pipeline.py b/jobs/jobs/pipeline.py index 142eae1c4..58419b12e 100644 --- a/jobs/jobs/pipeline.py +++ b/jobs/jobs/pipeline.py @@ -8,12 +8,17 @@ class PipelineFileInput: job_id: int +class Dataset(TypedDict, total=False): + id: int + name: str + + class PipelineFile(TypedDict, total=False): bucket: str input: PipelineFileInput input_path: str pages: List[int] - datasets: List[int] + datasets: List[Dataset] revision: Optional[str] output_path: Optional[str] signed_url: Optional[str] @@ -25,6 +30,7 @@ class PipelineRunArgs: job_id: int tenant: str files_data: List[PipelineFile] + datasets: List[Dataset] @dataclass @@ -44,5 +50,6 @@ async def run( job_id: str, files: List[PipelineFile], current_tenant: str, + datasets: List[Dataset], ) -> None: raise NotImplementedError() diff --git a/jobs/jobs/utils.py b/jobs/jobs/utils.py index c213c47c6..53b282268 100644 --- a/jobs/jobs/utils.py +++ b/jobs/jobs/utils.py @@ -291,21 +291,26 @@ def files_data_to_pipeline_arg( ) -> Iterator[pipeline.PipelineFile]: data = previous_jobs_data if previous_jobs_data else files_data for file in data: - # todo: change me - _, job_id, file_id, *_ = file["output_path"].strip().split("/") - - pipeline_file: pipeline.PipelineFile = { - "bucket": file["bucket"], - "input": pipeline.PipelineFileInput(job_id=job_id), - "input_path": file["file"], - "pages": file["pages"], - "file_id": file_id, - "datasets": file["datasets"], - } - rev = file.get("revision") - if rev: - pipeline_file["revision"] = rev - yield pipeline_file + try: + # todo: change me + _, job_id, file_id, *_ = file["output_path"].strip().split("/") + + pipeline_file: pipeline.PipelineFile = { + "bucket": file["bucket"], + "input": pipeline.PipelineFileInput(job_id=job_id), + "input_path": file["file"], + "pages": file["pages"], + "file_id": file_id, + "datasets": file["datasets"], + } + except KeyError as err: + logger.error(err) + continue + else: + rev = file.get("revision") + if rev: + pipeline_file["revision"] = rev + yield pipeline_file def fill_signed_url(files: List[Dict[str, Any]]) -> List[Dict[str, Any]]: