Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Removing event mocking in ArrayNode #621

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"github.com/flyteorg/flytepropeller/pkg/compiler/validators"
"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/common"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/errors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/interfaces"
Expand Down Expand Up @@ -177,8 +178,8 @@
arrayNodeState := nCtx.NodeStateReader().GetArrayNodeState()
currentArrayNodePhase := arrayNodeState.Phase

var externalResources []*event.ExternalResourceInfo
taskPhaseVersion := arrayNodeState.TaskPhaseVersion
//var externalResources []*event.ExternalResourceInfo
//taskPhaseVersion := arrayNodeState.TaskPhaseVersion

switch currentArrayNodePhase {
case v1alpha1.ArrayNodePhaseNone:
Expand Down Expand Up @@ -238,7 +239,7 @@
}

// initialize externalResources
externalResources = make([]*event.ExternalResourceInfo, 0, size)
/*externalResources = make([]*event.ExternalResourceInfo, 0, size)
for i := 0; i < size; i++ {
externalResources = append(externalResources, &event.ExternalResourceInfo{
ExternalId: buildSubNodeID(nCtx, i, 0),
Expand All @@ -247,15 +248,15 @@
RetryAttempt: 0,
Phase: idlcore.TaskExecution_QUEUED,
})
}
}*/

// transition ArrayNode to `ArrayNodePhaseExecuting`
arrayNodeState.Phase = v1alpha1.ArrayNodePhaseExecuting
case v1alpha1.ArrayNodePhaseExecuting:
// process array node subNodes
currentParallelism := uint32(0)
messageCollector := errorcollector.NewErrorMessageCollector()
externalResources = make([]*event.ExternalResourceInfo, 0)
//externalResources = make([]*event.ExternalResourceInfo, 0)
for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() {
nodePhase := v1alpha1.NodePhase(nodePhaseUint64)

Expand All @@ -265,7 +266,8 @@
}

// create array contexts
arrayNodeExecutor, arrayExecutionContext, arrayDAGStructure, arrayNodeLookup, subNodeSpec, subNodeStatus, arrayEventRecorder, err :=
//arrayNodeExecutor, arrayExecutionContext, arrayDAGStructure, arrayNodeLookup, subNodeSpec, subNodeStatus, arrayEventRecorder, err :=
arrayNodeExecutor, arrayExecutionContext, arrayDAGStructure, arrayNodeLookup, subNodeSpec, subNodeStatus, _, err :=
a.buildArrayNodeContext(ctx, nCtx, &arrayNodeState, arrayNode, i, &currentParallelism)
if err != nil {
return handler.UnknownTransition, err
Expand All @@ -283,31 +285,31 @@
}

// process events
cacheStatus := idlcore.CatalogCacheStatus_CACHE_DISABLED
/*cacheStatus := idlcore.CatalogCacheStatus_CACHE_DISABLED
for _, nodeExecutionEvent := range arrayEventRecorder.NodeEvents() {
switch target := nodeExecutionEvent.TargetMetadata.(type) {
case *event.NodeExecutionEvent_TaskNodeMetadata:
if target.TaskNodeMetadata != nil {
cacheStatus = target.TaskNodeMetadata.CacheStatus
}
}
}
}*/

retryAttempt := subNodeStatus.GetAttempts()
//retryAttempt := subNodeStatus.GetAttempts()

// fastcache will not emit task events for cache hits. we need to manually detect a
// transition to `SUCCEEDED` and add an `ExternalResourceInfo` for it.
if cacheStatus == idlcore.CatalogCacheStatus_CACHE_HIT && len(arrayEventRecorder.TaskEvents()) == 0 {
/*if cacheStatus == idlcore.CatalogCacheStatus_CACHE_HIT && len(arrayEventRecorder.TaskEvents()) == 0 {
externalResources = append(externalResources, &event.ExternalResourceInfo{
ExternalId: buildSubNodeID(nCtx, i, retryAttempt),
Index: uint32(i),
RetryAttempt: retryAttempt,
Phase: idlcore.TaskExecution_SUCCEEDED,
CacheStatus: cacheStatus,
})
}
}*/

for _, taskExecutionEvent := range arrayEventRecorder.TaskEvents() {
/*for _, taskExecutionEvent := range arrayEventRecorder.TaskEvents() {
for _, log := range taskExecutionEvent.Logs {
log.Name = fmt.Sprintf("%s-%d", log.Name, i)
}
Expand All @@ -320,7 +322,7 @@
Phase: taskExecutionEvent.Phase,
CacheStatus: cacheStatus,
})
}
}*/

// update subNode state
arrayNodeState.SubNodePhases.SetItem(i, uint64(subNodeStatus.GetPhase()))
Expand Down Expand Up @@ -464,7 +466,7 @@

// if there were changes to subNode status externalResources will be populated and must be
// reported to admin through a TaskExecutionEvent.
if len(externalResources) > 0 {
/*if len(externalResources) > 0 {
// determine task phase from ArrayNodePhase
taskPhase := idlcore.TaskExecution_UNDEFINED
switch currentArrayNodePhase {
Expand Down Expand Up @@ -497,7 +499,7 @@
logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error())
return handler.UnknownTransition, err
}
}
}*/

// update array node status
if err := nCtx.NodeStateWriter().PutArrayNodeState(arrayNodeState); err != nil {
Expand Down Expand Up @@ -542,7 +544,7 @@
// injecting environment variables for flytekit maptask execution, aggregating eventing so that rather than tracking state for
// each subnode individually it sends a single event for the whole ArrayNode, and many more.
func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx interfaces.NodeExecutionContext, arrayNodeState *handler.ArrayNodeState, arrayNode v1alpha1.ExecutableArrayNode, subNodeIndex int, currentParallelism *uint32) (
interfaces.Node, executors.ExecutionContext, executors.DAGStructure, executors.NodeLookup, *v1alpha1.NodeSpec, *v1alpha1.NodeStatus, *arrayEventRecorder, error) {

Check failure on line 547 in pkg/controller/nodes/array/handler.go

View workflow job for this annotation

GitHub Actions / Lint / Run Lint

(*arrayNodeHandler).buildArrayNodeContext - result 6 (*github.com/flyteorg/flytepropeller/pkg/controller/nodes/array.arrayEventRecorder) is never used (unparam)

nodePhase := v1alpha1.NodePhase(arrayNodeState.SubNodePhases.GetItem(subNodeIndex))
taskPhase := int(arrayNodeState.SubNodeTaskPhases.GetItem(subNodeIndex))
Expand Down Expand Up @@ -597,7 +599,14 @@
// initialize mocks
arrayNodeLookup := newArrayNodeLookup(nCtx.ContextualNodeLookup(), subNodeID, &subNodeSpec, subNodeStatus)

arrayExecutionContext := newArrayExecutionContext(nCtx.ExecutionContext(), subNodeIndex, currentParallelism, arrayNode.GetParallelism())
newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt())
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, err
}
arrayExecutionContext := newArrayExecutionContext(
executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo),
subNodeIndex, currentParallelism, arrayNode.GetParallelism())
//arrayExecutionContext := newArrayExecutionContext(nCtx.ExecutionContext(), subNodeIndex, currentParallelism, arrayNode.GetParallelism())

arrayEventRecorder := newArrayEventRecorder()
arrayNodeExecutionContextBuilder := newArrayNodeExecutionContextBuilder(a.nodeExecutor.GetNodeExecutionContextBuilder(),
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/nodes/array/node_execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ type arrayNodeExecutionContext struct {
taskReader interfaces.TaskReader
}

func (a *arrayNodeExecutionContext) EventsRecorder() interfaces.EventRecorder {
/*func (a *arrayNodeExecutionContext) EventsRecorder() interfaces.EventRecorder {
return a.eventRecorder
}
}*/

func (a *arrayNodeExecutionContext) ExecutionContext() executors.ExecutionContext {
return a.executionContext
Expand Down
Loading