Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Tracking DynamicJobSpecUri for DynamicWorkflows (#513)
Browse files Browse the repository at this point in the history
* tracking DynamicJobSpecUri for DynamicWorkflows

Signed-off-by: Daniel Rammer <daniel@union.ai>

* updated flyteidl version

Signed-off-by: Daniel Rammer <daniel@union.ai>

* updated flyteidl

Signed-off-by: Daniel Rammer <daniel@union.ai>

---------

Signed-off-by: Daniel Rammer <daniel@union.ai>
  • Loading branch information
hamersaw committed Mar 10, 2023
1 parent 2d39422 commit f6d949a
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 5 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cloudevents/sdk-go/v2 v2.8.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/flyteorg/flyteidl v1.3.7
github.com/flyteorg/flyteidl v1.3.9
github.com/flyteorg/flyteplugins v1.0.20
github.com/flyteorg/flytepropeller v1.1.51
github.com/flyteorg/flytestdlib v1.0.14
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.3.7 h1:MA7kOqMr/TmPlYPvJZwfsl+CYneuDOJ+kEKx2DocLhE=
github.com/flyteorg/flyteidl v1.3.7/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteidl v1.3.9 h1:MHUa89yKwCz58mQC2OxTzYjr0d3fA14qKG462v+RAyk=
github.com/flyteorg/flyteidl v1.3.9/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteplugins v1.0.20 h1:8ZGN2c0iaZa3d/UmN2VYozLBRhthAIO48aD5g8Wly7s=
github.com/flyteorg/flyteplugins v1.0.20/go.mod h1:ZbZVBxEWh8Icj1AgfNKg0uPzHHGd9twa4eWcY2Yt6xE=
github.com/flyteorg/flytepropeller v1.1.51 h1:ITPH2Fqx+/1hKBFnfb6Rawws3VbEJ3tQ/1tQXSIXvcQ=
Expand Down
5 changes: 3 additions & 2 deletions pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,9 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Empty primary workflow template in loaded dynamic workflow model.")
} else {
response.DynamicWorkflow = &admin.DynamicWorkflowNodeMetadata{
Id: closure.Primary.Template.Id,
CompiledWorkflow: closure,
Id: closure.Primary.Template.Id,
CompiledWorkflow: closure,
DynamicJobSpecUri: nodeExecution.Closure.DynamicJobSpecUri,
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ func UpdateNodeExecutionModel(
nodeExecutionModel.CacheStatus = &st
}
nodeExecutionClosure.TargetMetadata = targetMetadata

// if this is a dynamic task then maintain the DynamicJobSpecUri
dynamicWorkflowMetadata := request.Event.GetTaskNodeMetadata().DynamicWorkflow
if dynamicWorkflowMetadata != nil && len(dynamicWorkflowMetadata.DynamicJobSpecUri) > 0 {
nodeExecutionClosure.DynamicJobSpecUri = dynamicWorkflowMetadata.DynamicJobSpecUri
}
}

marshaledClosure, err := proto.Marshal(&nodeExecutionClosure)
Expand Down
2 changes: 2 additions & 0 deletions pkg/repositories/transformers/node_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ func TestUpdateNodeExecutionModel(t *testing.T) {
},
},
},
DynamicJobSpecUri: "/foo/bar",
},
CheckpointUri: "last checkpoint uri",
},
Expand Down Expand Up @@ -375,6 +376,7 @@ func TestUpdateNodeExecutionModel(t *testing.T) {
CheckpointUri: request.Event.GetTaskNodeMetadata().CheckpointUri,
},
},
DynamicJobSpecUri: request.Event.GetTaskNodeMetadata().DynamicWorkflow.DynamicJobSpecUri,
}
var closureBytes, _ = proto.Marshal(closure)
assert.Equal(t, nodeExecutionModel.Closure, closureBytes)
Expand Down

0 comments on commit f6d949a

Please sign in to comment.