From 89461cb3e11835c7389cecffd761dc83409b9abb Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Wed, 18 Sep 2024 17:02:17 -0600 Subject: [PATCH] Fixing the cwl_dag.py to accept a URL --- airflow/dags/cwl_dag.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/airflow/dags/cwl_dag.py b/airflow/dags/cwl_dag.py index 6f660f4..fa52a94 100644 --- a/airflow/dags/cwl_dag.py +++ b/airflow/dags/cwl_dag.py @@ -145,20 +145,12 @@ def setup(ti=None, **context): logging.info(f"Selecting node pool={node_pool}") ti.xcom_push(key="node_pool", value=node_pool) - # select arguments and determine if ECR login is required - cwl_dag_args = json.loads(context["params"]["cwl_args"]) + # select "use_ecr" argument and determine if ECR login is required logging.info("Use ECR: %s", context["params"]["use_ecr"]) if context["params"]["use_ecr"]: - # cwl_dag_args["cwltool:overrides"] = { - # context["params"]["cwl_workflow"]: { - # "requirements": {"DockerRequirement": {"dockerPull": ecr_uri}} - # } - # } ecr_login = os.environ["AIRFLOW_VAR_ECR_URI"] ti.xcom_push(key="ecr_login", value=ecr_login) logging.info("ECR login: %s", ecr_login) - ti.xcom_push(key="cwl_dag_arguments", value=json.dumps(cwl_dag_args)) - logging.info("CWL DAG arguments: %s", cwl_dag_args) setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag) @@ -177,7 +169,7 @@ def setup(ti=None, **context): "-w", "{{ params.cwl_workflow }}", "-j", - "{{ ti.xcom_pull(task_ids='Setup', key='cwl_dag_arguments') }}", + "{{ params.cwl_args }}", "-e", "{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}", ],