Skip to content

Commit

Permalink
fix KeyError
Browse files Browse the repository at this point in the history
  • Loading branch information
ervandagadzhanyan committed Sep 4, 2024
1 parent c515181 commit 8016df3
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 20 deletions.
6 changes: 4 additions & 2 deletions jobs/jobs/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ async def run(
job_id: int,
files: List[pipeline.PipelineFile],
current_tenant: str,
datasets: List[pipeline.Dataset],
) -> None:
configuration = get_configuration()
with client.ApiClient(configuration) as api_client:
Expand All @@ -74,7 +75,7 @@ async def run(
dag_run_id=dag_run_id,
conf=dataclasses.asdict(
pipeline.PipelineRunArgs(
job_id=job_id, tenant=current_tenant, files_data=files
job_id=job_id, tenant=current_tenant, files_data=files, datasets=datasets,
)
),
)
Expand All @@ -92,5 +93,6 @@ async def run(
job_id: str,
files: List[pipeline.PipelineFile],
current_tenant: str,
datasets: List[pipeline.Dataset],
) -> None:
return await run(pipeline_id, job_id, files, current_tenant)
return await run(pipeline_id, job_id, files, current_tenant, datasets)
6 changes: 4 additions & 2 deletions jobs/jobs/databricks_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async def run(
job_id: int,
files: List[pipeline.PipelineFile],
current_tenant: str,
datasets: List[pipeline.Dataset],
) -> None:
logger.info(
"Running pipeline %s, job_id %s, current_tenant: %s with arguments %s",
Expand All @@ -60,7 +61,7 @@ async def run(
"badgerdoc_job_parameters": json.dumps(
dataclasses.asdict(
pipeline.PipelineRunArgs(
job_id=job_id, tenant=current_tenant, files_data=files
job_id=job_id, tenant=current_tenant, files_data=files, datasets=datasets
)
)
)
Expand All @@ -79,5 +80,6 @@ async def run(
job_id: str,
files: List[pipeline.PipelineFile],
current_tenant: str,
datasets: List[pipeline.Dataset],
) -> None:
await run(pipeline_id, int(job_id), files, current_tenant)
await run(pipeline_id, int(job_id), files, current_tenant, datasets=datasets)
8 changes: 7 additions & 1 deletion jobs/jobs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ class PipelineFileInput:
job_id: int


class Dataset(TypedDict, total=False):
id: int
name: str

class PipelineFile(TypedDict, total=False):
bucket: str
input: PipelineFileInput
input_path: str
pages: List[int]
datasets: List[int]
datasets: List[Dataset]
revision: Optional[str]
output_path: Optional[str]
signed_url: Optional[str]
Expand All @@ -25,6 +29,7 @@ class PipelineRunArgs:
job_id: int
tenant: str
files_data: List[PipelineFile]
datasets: List[Dataset]


@dataclass
Expand All @@ -44,5 +49,6 @@ async def run(
job_id: str,
files: List[PipelineFile],
current_tenant: str,
datasets: List[Dataset],
) -> None:
raise NotImplementedError()
35 changes: 20 additions & 15 deletions jobs/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,21 +291,26 @@ def files_data_to_pipeline_arg(
) -> Iterator[pipeline.PipelineFile]:
data = previous_jobs_data if previous_jobs_data else files_data
for file in data:
# todo: change me
_, job_id, file_id, *_ = file["output_path"].strip().split("/")

pipeline_file: pipeline.PipelineFile = {
"bucket": file["bucket"],
"input": pipeline.PipelineFileInput(job_id=job_id),
"input_path": file["file"],
"pages": file["pages"],
"file_id": file_id,
"datasets": file["datasets"],
}
rev = file.get("revision")
if rev:
pipeline_file["revision"] = rev
yield pipeline_file
try:
# todo: change me
_, job_id, file_id, *_ = file["output_path"].strip().split("/")

pipeline_file: pipeline.PipelineFile = {
"bucket": file["bucket"],
"input": pipeline.PipelineFileInput(job_id=job_id),
"input_path": file["file"],
"pages": file["pages"],
"file_id": file_id,
"datasets": file["datasets"],
}
except KeyError as err:
logger.error(err)
continue
else:
rev = file.get("revision")
if rev:
pipeline_file["revision"] = rev
yield pipeline_file


def fill_signed_url(files: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
Expand Down

0 comments on commit 8016df3

Please sign in to comment.