diff --git a/pkg/controller/nodes/array/handler.go b/pkg/controller/nodes/array/handler.go index a8cc13c85..33291450f 100644 --- a/pkg/controller/nodes/array/handler.go +++ b/pkg/controller/nodes/array/handler.go @@ -18,6 +18,7 @@ import ( "github.com/flyteorg/flytepropeller/pkg/compiler/validators" "github.com/flyteorg/flytepropeller/pkg/controller/config" "github.com/flyteorg/flytepropeller/pkg/controller/executors" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/common" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/errors" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/interfaces" @@ -177,8 +178,8 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu arrayNodeState := nCtx.NodeStateReader().GetArrayNodeState() currentArrayNodePhase := arrayNodeState.Phase - var externalResources []*event.ExternalResourceInfo - taskPhaseVersion := arrayNodeState.TaskPhaseVersion + //var externalResources []*event.ExternalResourceInfo + //taskPhaseVersion := arrayNodeState.TaskPhaseVersion switch currentArrayNodePhase { case v1alpha1.ArrayNodePhaseNone: @@ -238,7 +239,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } // initialize externalResources - externalResources = make([]*event.ExternalResourceInfo, 0, size) + /*externalResources = make([]*event.ExternalResourceInfo, 0, size) for i := 0; i < size; i++ { externalResources = append(externalResources, &event.ExternalResourceInfo{ ExternalId: buildSubNodeID(nCtx, i, 0), @@ -247,7 +248,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu RetryAttempt: 0, Phase: idlcore.TaskExecution_QUEUED, }) - } + }*/ // transition ArrayNode to `ArrayNodePhaseExecuting` arrayNodeState.Phase = v1alpha1.ArrayNodePhaseExecuting @@ -255,7 +256,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // process array node subNodes currentParallelism := uint32(0) messageCollector := errorcollector.NewErrorMessageCollector() - externalResources = make([]*event.ExternalResourceInfo, 0) + //externalResources = make([]*event.ExternalResourceInfo, 0) for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { nodePhase := v1alpha1.NodePhase(nodePhaseUint64) @@ -265,7 +266,8 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } // create array contexts - arrayNodeExecutor, arrayExecutionContext, arrayDAGStructure, arrayNodeLookup, subNodeSpec, subNodeStatus, arrayEventRecorder, err := + //arrayNodeExecutor, arrayExecutionContext, arrayDAGStructure, arrayNodeLookup, subNodeSpec, subNodeStatus, arrayEventRecorder, err := + arrayNodeExecutor, arrayExecutionContext, arrayDAGStructure, arrayNodeLookup, subNodeSpec, subNodeStatus, _, err := a.buildArrayNodeContext(ctx, nCtx, &arrayNodeState, arrayNode, i, ¤tParallelism) if err != nil { return handler.UnknownTransition, err @@ -283,7 +285,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } // process events - cacheStatus := idlcore.CatalogCacheStatus_CACHE_DISABLED + /*cacheStatus := idlcore.CatalogCacheStatus_CACHE_DISABLED for _, nodeExecutionEvent := range arrayEventRecorder.NodeEvents() { switch target := nodeExecutionEvent.TargetMetadata.(type) { case *event.NodeExecutionEvent_TaskNodeMetadata: @@ -291,13 +293,13 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu cacheStatus = target.TaskNodeMetadata.CacheStatus } } - } + }*/ - retryAttempt := subNodeStatus.GetAttempts() + //retryAttempt := subNodeStatus.GetAttempts() // fastcache will not emit task events for cache hits. we need to manually detect a // transition to `SUCCEEDED` and add an `ExternalResourceInfo` for it. - if cacheStatus == idlcore.CatalogCacheStatus_CACHE_HIT && len(arrayEventRecorder.TaskEvents()) == 0 { + /*if cacheStatus == idlcore.CatalogCacheStatus_CACHE_HIT && len(arrayEventRecorder.TaskEvents()) == 0 { externalResources = append(externalResources, &event.ExternalResourceInfo{ ExternalId: buildSubNodeID(nCtx, i, retryAttempt), Index: uint32(i), @@ -305,9 +307,9 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu Phase: idlcore.TaskExecution_SUCCEEDED, CacheStatus: cacheStatus, }) - } + }*/ - for _, taskExecutionEvent := range arrayEventRecorder.TaskEvents() { + /*for _, taskExecutionEvent := range arrayEventRecorder.TaskEvents() { for _, log := range taskExecutionEvent.Logs { log.Name = fmt.Sprintf("%s-%d", log.Name, i) } @@ -320,7 +322,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu Phase: taskExecutionEvent.Phase, CacheStatus: cacheStatus, }) - } + }*/ // update subNode state arrayNodeState.SubNodePhases.SetItem(i, uint64(subNodeStatus.GetPhase())) @@ -464,7 +466,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // if there were changes to subNode status externalResources will be populated and must be // reported to admin through a TaskExecutionEvent. - if len(externalResources) > 0 { + /*if len(externalResources) > 0 { // determine task phase from ArrayNodePhase taskPhase := idlcore.TaskExecution_UNDEFINED switch currentArrayNodePhase { @@ -497,7 +499,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) return handler.UnknownTransition, err } - } + }*/ // update array node status if err := nCtx.NodeStateWriter().PutArrayNodeState(arrayNodeState); err != nil { @@ -597,7 +599,14 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter // initialize mocks arrayNodeLookup := newArrayNodeLookup(nCtx.ContextualNodeLookup(), subNodeID, &subNodeSpec, subNodeStatus) - arrayExecutionContext := newArrayExecutionContext(nCtx.ExecutionContext(), subNodeIndex, currentParallelism, arrayNode.GetParallelism()) + newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt()) + if err != nil { + return nil, nil, nil, nil, nil, nil, nil, err + } + arrayExecutionContext := newArrayExecutionContext( + executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo), + subNodeIndex, currentParallelism, arrayNode.GetParallelism()) + //arrayExecutionContext := newArrayExecutionContext(nCtx.ExecutionContext(), subNodeIndex, currentParallelism, arrayNode.GetParallelism()) arrayEventRecorder := newArrayEventRecorder() arrayNodeExecutionContextBuilder := newArrayNodeExecutionContextBuilder(a.nodeExecutor.GetNodeExecutionContextBuilder(), diff --git a/pkg/controller/nodes/array/node_execution_context.go b/pkg/controller/nodes/array/node_execution_context.go index af3ea42f7..9f05fa830 100644 --- a/pkg/controller/nodes/array/node_execution_context.go +++ b/pkg/controller/nodes/array/node_execution_context.go @@ -117,9 +117,9 @@ type arrayNodeExecutionContext struct { taskReader interfaces.TaskReader } -func (a *arrayNodeExecutionContext) EventsRecorder() interfaces.EventRecorder { +/*func (a *arrayNodeExecutionContext) EventsRecorder() interfaces.EventRecorder { return a.eventRecorder -} +}*/ func (a *arrayNodeExecutionContext) ExecutionContext() executors.ExecutionContext { return a.executionContext