Skip to content

Commit

Permalink
Move functions to its own module, load them dynamically in expr apply.
Browse files Browse the repository at this point in the history
  • Loading branch information
kinow committed Feb 4, 2022
1 parent 9781f7b commit eb455f9
Show file tree
Hide file tree
Showing 8 changed files with 627 additions and 460 deletions.
7 changes: 5 additions & 2 deletions wdl2cwl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from typing import Any, Callable, Optional, Type, cast

import WDL
from WDL._error_util import SourcePosition

# Inspired by https://github.com/common-workflow-language/schema_salad/blob/661fb0fa8c745ed70253dda93bd12002007f6b33/schema_salad/sourceline.py#L232

Expand Down Expand Up @@ -46,7 +45,7 @@ def __exit__(

def makeLead(self) -> str:
"""Caculate the error message prefix."""
pos: SourcePosition = cast(SourcePosition, self.item.pos)
pos: WDL.SourcePosition = cast(WDL.SourcePosition, self.item.pos)
return f"{pos.uri}:{pos.line}:{pos.column}:"

def makeError(self, msg: str) -> Any:
Expand All @@ -69,3 +68,7 @@ def makeError(self, msg: str) -> Any:
else:
errs.append(f"{lead} {m}")
return self.raise_type("\n".join(errs))


class ConversionException(Exception):
"""Error during conversion."""
259 changes: 259 additions & 0 deletions wdl2cwl/expr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
"""WDL Expressions (literal values, arithmetic, comparison, conditional, string interpolation, array, map, and functions)."""

from typing import Union, Any, cast

import WDL
from wdl2cwl.errors import WDLSourceLine, ConversionException
from wdl2cwl.util import get_input, ConversionContext


def get_literal_name(
expr: Union[WDL.Expr.Boolean, WDL.Expr.Int, WDL.Expr.Float, WDL.Expr.Array]
) -> str:
"""Translate WDL Boolean, Int, Float, or Array Expression."""
# if the literal expr is used inside WDL.Expr.Apply
# the literal value is what's needed
parent = expr.parent # type: ignore[union-attr]
if isinstance(parent, (WDL.Expr.Apply, WDL.Expr.IfThenElse)):
return expr.literal.value # type: ignore
raise WDLSourceLine(expr, ConversionException).makeError(
f"The parent expression for {expr} is not WDL.Expr.Apply, but {parent}."
)


def get_expr_ifthenelse(
wdl_ifthenelse: WDL.Expr.IfThenElse, ctx: ConversionContext
) -> str:
"""Translate WDL IfThenElse Expressions."""
condition = get_expr(wdl_ifthenelse.condition, ctx)
if_true = get_expr(wdl_ifthenelse.consequent, ctx)
if_false = get_expr(wdl_ifthenelse.alternative, ctx)
return f"{condition} ? {if_true} : {if_false}"


def translate_wdl_placeholder(
wdl_placeholder: WDL.Expr.Placeholder, ctx: ConversionContext
) -> str:
"""Translate WDL Expr Placeholder to a valid CWL command string."""
cwl_command_str = ""
expr = wdl_placeholder.expr
if expr is None:
raise WDLSourceLine(wdl_placeholder, ConversionException).makeError(
f"Placeholder '{wdl_placeholder}' has no expr."
)
placeholder_expr = get_expr(expr, ctx)
options = wdl_placeholder.options
if options:
if "true" in options:
true_value = options["true"]
false_value = options["false"]
true_str = f'"{true_value}"' if '"' not in true_value else f"'{true_value}'"
false_str = (
f'"{false_value}"' if '"' not in false_value else f"'{false_value}'"
)
is_optional = False
if isinstance(expr, WDL.Expr.Get):
is_optional = expr.type.optional
elif isinstance(expr, WDL.Expr.Apply):
is_optional = expr.arguments[0].type.optional
if not is_optional:
cwl_command_str = f"$({placeholder_expr} ? {true_str} : {false_str})"
else:
cwl_command_str = (
f"$({placeholder_expr} === null ? {false_str} : {true_str})"
)
elif "sep" in options:
seperator = options["sep"]
if isinstance(expr.type, WDL.Type.Array):
item_type: WDL.Expr.Base = expr.type.item_type # type: ignore
if isinstance(item_type, WDL.Type.String):
cwl_command_str = f'$({placeholder_expr}.join("{seperator}"))'
elif isinstance(item_type, WDL.Type.File):
cwl_command_str = (
f"$({placeholder_expr}.map("
+ 'function(el) {return el.path}).join("'
+ seperator
+ '"))'
)
else:
raise WDLSourceLine(wdl_placeholder, ConversionException).makeError(
f"{wdl_placeholder} with separator and item type {item_type} is not yet handled"
)
else:
raise WDLSourceLine(wdl_placeholder, ConversionException).makeError(
f"{wdl_placeholder} with expr of type {expr.type} is not yet handled"
)
else:
raise WDLSourceLine(wdl_placeholder, ConversionException).makeError(
f"Placeholders with options {options} are not yet handled."
)
else:
# for the one case where the $(input.some_input_name) is used within the placeholder_expr
# we return the placeholder_expr without enclosing in another $()
cwl_command_str = (
f"$({placeholder_expr})"
if placeholder_expr[-1] != ")"
else placeholder_expr
)
# sometimes placeholders are used inside WDL.Expr.String.
# with the parent and grand_parent we can confirm that we are in
# the command string (WDL.Expr.String) and task (WDL.Tree.Task) respectively
parent = wdl_placeholder.parent # type: ignore
grand_parent = parent.parent
return (
cwl_command_str
if isinstance(parent, WDL.Expr.String)
and isinstance(grand_parent, WDL.Tree.Task)
else cwl_command_str[2:-1]
)


def get_expr_string(wdl_expr_string: WDL.Expr.String, ctx: ConversionContext) -> str:
"""Translate WDL String Expressions."""
if wdl_expr_string.literal is not None:
return f'"{wdl_expr_string.literal.value}"'
string = ""
parts = wdl_expr_string.parts
for index, part in enumerate(parts[1:-1], start=1):
if isinstance(
part,
(WDL.Expr.Placeholder, WDL.Expr.Apply, WDL.Expr.Get, WDL.Expr.Ident),
):
placeholder = get_expr(part, ctx)
part = (
"" if parts[index - 1] == '"' or parts[index - 1] == "'" else "' + " # type: ignore
)
part += placeholder
part += (
"" if parts[index + 1] == '"' or parts[index + 1] == "'" else " + '" # type: ignore
)
string += part
# condition to determine if the opening and closing quotes should be added to string
# for cases where a placeholder begins or ends a WDL.Expr.String
if type(parts[1]) == str:
string = "'" + string
if type(parts[-2]) == str:
string = string + "'"
return string


def get_expr_name(wdl_expr: WDL.Expr.Ident) -> str:
"""Extract name from WDL expr."""
if not hasattr(wdl_expr, "name"):
raise WDLSourceLine(wdl_expr, ConversionException).makeError(
f"{type(wdl_expr)} has not attribute 'name'"
)
return get_input(wdl_expr.name)


def get_expr(wdl_expr: Any, ctx: ConversionContext) -> str:
"""Translate WDL Expressions."""
if isinstance(wdl_expr, WDL.Expr.Apply):
return get_expr_apply(wdl_expr, ctx)
elif isinstance(wdl_expr, WDL.Expr.Get):
return get_expr_get(wdl_expr, ctx)
elif isinstance(wdl_expr, WDL.Expr.IfThenElse):
return get_expr_ifthenelse(wdl_expr, ctx)
elif isinstance(wdl_expr, WDL.Expr.Placeholder):
return translate_wdl_placeholder(wdl_expr, ctx)
elif isinstance(wdl_expr, WDL.Expr.String):
return get_expr_string(wdl_expr, ctx)
elif isinstance(wdl_expr, WDL.Tree.Decl):
return get_expr(wdl_expr.expr, ctx)
elif isinstance(
wdl_expr,
(
WDL.Expr.Boolean,
WDL.Expr.Int,
WDL.Expr.Float,
WDL.Expr.Array,
),
):
return get_literal_name(wdl_expr)
else:
raise WDLSourceLine(wdl_expr, ConversionException).makeError(
f"The expression '{wdl_expr}' is not handled yet."
)


def get_expr_apply(wdl_apply_expr: WDL.Expr.Apply, ctx: ConversionContext) -> str:
"""Translate WDL Apply Expressions."""
# N.B: This import here avoids circular dependency error when loading the modules.
from wdl2cwl import functions

function_name = wdl_apply_expr.function_name
arguments = wdl_apply_expr.arguments
if not arguments:
raise WDLSourceLine(wdl_apply_expr, ConversionException).makeError(
f"The '{wdl_apply_expr}' expression has no arguments."
)
treat_as_optional = wdl_apply_expr.type.optional

# Call the function if we have it in our wdl2cwl.functions module
if hasattr(functions, function_name):
kwargs = {
"treat_as_optional": treat_as_optional,
"wdl_apply_expr": wdl_apply_expr,
}
return cast(
str,
getattr(functions, function_name)(arguments, ctx, **kwargs),
)

raise WDLSourceLine(wdl_apply_expr, ConversionException).makeError(
f"Function name '{function_name}' not yet handled."
)


def get_expr_get(wdl_get_expr: WDL.Expr.Get, ctx: ConversionContext) -> str:
"""Translate WDL Get Expressions."""
member = wdl_get_expr.member
if not member:
return get_expr_ident(wdl_get_expr.expr, ctx) # type: ignore
struct_name = get_expr(wdl_get_expr.expr, ctx)
member_str = f"{struct_name}.{member}"
return (
member_str
if not isinstance(wdl_get_expr.type, WDL.Type.File)
else f"{member_str}.path"
)


def get_expr_ident(wdl_ident_expr: WDL.Expr.Ident, ctx: ConversionContext) -> str:
"""Translate WDL Ident Expressions."""
id_name = wdl_ident_expr.name
ident_name = get_input(id_name)
referee: Any = wdl_ident_expr.referee
optional = wdl_ident_expr.type.optional
if referee:
with WDLSourceLine(referee, ConversionException):
if isinstance(referee, WDL.Tree.Call):
return id_name
if referee.expr and (
wdl_ident_expr.name in ctx.optional_cwl_null
or wdl_ident_expr.name not in ctx.non_static_values
):
return get_expr(referee.expr, ctx)
if optional and isinstance(wdl_ident_expr.type, WDL.Type.File):
# To prevent null showing on the terminal for inputs of type File
name_with_file_check = get_expr_name_with_is_file_check(wdl_ident_expr)
return f'{ident_name} === null ? "" : {name_with_file_check}'
return (
ident_name
if not isinstance(wdl_ident_expr.type, WDL.Type.File)
else f"{ident_name}.path"
)


def get_expr_name_with_is_file_check(wdl_expr: WDL.Expr.Ident) -> str:
"""Extract name from WDL expr and check if it's a file path."""
if wdl_expr is None or not hasattr(wdl_expr, "name"):
raise WDLSourceLine(wdl_expr, ConversionException).makeError(
f"{type(wdl_expr)} has not attribute 'name'"
)
expr_name = get_input(wdl_expr.name)
is_file = isinstance(wdl_expr.type, WDL.Type.File)
return expr_name if not is_file else f"{expr_name}.path"


__all__ = ["get_expr", "get_expr_string", "translate_wdl_placeholder"]
Loading

0 comments on commit eb455f9

Please sign in to comment.