Skip to content

Commit

Permalink
koord-descheduler: LowNodeLoad check if evicted pod can cause new nod…
Browse files Browse the repository at this point in the history
…e over utilized (#2142)

Signed-off-by: songtao98 <songtao2603060@gmail.com>
  • Loading branch information
songtao98 committed Sep 2, 2024
1 parent cb89d6d commit 749b14d
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 4 deletions.
1 change: 1 addition & 0 deletions pkg/descheduler/evictions/evictions.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func NewEvictorFilter(
return nil
})
}
// todo: to align with the k8s descheduling framework, nodeFit should be moved into PreEvictionFilter in the future
if options.nodeFit {
ev.constraints = append(ev.constraints, func(pod *corev1.Pod) error {
nodes, err := nodeGetter()
Expand Down
2 changes: 2 additions & 0 deletions pkg/descheduler/framework/plugins/loadaware/low_node_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ func (pl *LowNodeLoad) processOneNodePool(ctx context.Context, nodePool *desched
abnormalProdNodes,
prodLowNodes,
bothLowNodes,
nodeUsages,
nodeThresholds,
pl.args.DryRun,
pl.args.NodeFit,
nodePool.ResourceWeights,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1297,7 +1297,7 @@ func TestLowNodeLoad(t *testing.T) {
test.BuildTestPod("p10", 400, 0, n3NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p11", 400, 0, n3NodeName, test.SetRSOwnerRef),
},
expectedPodsEvicted: 4,
expectedPodsEvicted: 3,
evictedPods: []string{},
},
}
Expand Down
59 changes: 56 additions & 3 deletions pkg/descheduler/framework/plugins/loadaware/utilization_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/apis/extension"
Expand Down Expand Up @@ -309,6 +310,8 @@ func evictPodsFromSourceNodes(
nodePoolName string,
sourceNodes, destinationNodes,
prodSourceNodes, prodDestinationNodes, bothDestinationNodes []NodeInfo,
nodeUsages map[string]*NodeUsage,
nodeThresholds map[string]NodeThresholds,
dryRun bool,
nodeFit bool,
resourceWeights map[corev1.ResourceName]int64,
Expand Down Expand Up @@ -346,7 +349,8 @@ func evictPodsFromSourceNodes(
klog.V(4).InfoS("Total node usage capacity to be moved", nodeKeysAndValues...)

targetNodes = append(targetNodes, bothTotalNodes...)
balancePods(ctx, nodePoolName, sourceNodes, targetNodes, nodeTotalAvailableUsages, dryRun, nodeFit, false, resourceWeights, podEvictor,
balancePods(ctx, nodePoolName, sourceNodes, targetNodes, nodeUsages, nodeThresholds,
nodeTotalAvailableUsages, dryRun, nodeFit, false, resourceWeights, podEvictor,
podFilter, nodeIndexer, continueEviction, evictionReasonGenerator)

// bothLowNode will be used by nodeHigh and prodHigh nodes, needs sub resources used by pods on nodeHigh.
Expand Down Expand Up @@ -384,7 +388,8 @@ func evictPodsFromSourceNodes(
prodKeysAndValues = append(prodKeysAndValues, string(resourceName), quantity.String())
}
klog.V(4).InfoS("Total prod usage capacity to be moved", prodKeysAndValues...)
balancePods(ctx, nodePoolName, prodSourceNodes, prodTargetNodes, prodTotalAvailableUsages, dryRun, nodeFit, true, resourceWeights, podEvictor,
balancePods(ctx, nodePoolName, prodSourceNodes, prodTargetNodes, nodeUsages, nodeThresholds,
prodTotalAvailableUsages, dryRun, nodeFit, true, resourceWeights, podEvictor,
podFilter, nodeIndexer, continueEviction, evictionReasonGenerator)
}

Expand All @@ -409,6 +414,8 @@ func balancePods(ctx context.Context,
nodePoolName string,
sourceNodes []NodeInfo,
targetNodes []*corev1.Node,
nodeUsages map[string]*NodeUsage,
nodeThresholds map[string]NodeThresholds,
totalAvailableUsages map[corev1.ResourceName]*resource.Quantity,
dryRun bool,
nodeFit, prod bool,
Expand All @@ -431,7 +438,9 @@ func balancePods(ctx context.Context,
if !nodeFit {
return true
}
return nodeutil.PodFitsAnyNode(nodeIndexer, pod, targetNodes)
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
podMetric := srcNode.podMetrics[podNamespacedName]
return podFitsAnyNodeWithThreshold(nodeIndexer, pod, targetNodes, nodeUsages, nodeThresholds, prod, podMetric)
}),
)
klog.V(4).InfoS("Evicting pods from node",
Expand Down Expand Up @@ -701,3 +710,47 @@ func sortPodsOnOneOverloadedNode(srcNode NodeInfo, removablePods []*corev1.Pod,
weights,
)
}

// podFitsAnyNodeWithThreshold checks if the given pod will fit any of the given nodes. It also checks if the node
// utilization will exceed the threshold after this pod was scheduled on it.
func podFitsAnyNodeWithThreshold(nodeIndexer podutil.GetPodsAssignedToNodeFunc, pod *corev1.Pod, nodes []*corev1.Node,
nodeUsages map[string]*NodeUsage, nodeThresholds map[string]NodeThresholds, prod bool, podMetric *slov1alpha1.ResourceMap) bool {
for _, node := range nodes {
errors := nodeutil.NodeFit(nodeIndexer, pod, node)
if len(errors) == 0 {
// check if node utilization exceeds threshold if pod scheduled
nodeUsage, usageOk := nodeUsages[node.Name]
nodeThreshold, thresholdOk := nodeThresholds[node.Name]
if usageOk && thresholdOk {
var usage, thresholds map[corev1.ResourceName]*resource.Quantity
if prod {
usage = nodeUsage.prodUsage
thresholds = nodeThreshold.prodHighResourceThreshold
} else {
usage = nodeUsage.usage
thresholds = nodeThreshold.highResourceThreshold
}
exceeded := false
for resourceName, threshold := range thresholds {
if used := usage[resourceName]; used != nil {
used.Add(podMetric.ResourceList[resourceName])
if used.Cmp(*threshold) > 0 {
exceeded = true
break
}
}

}
if exceeded {
klog.V(4).InfoS("Pod may cause node over-utilized", "pod", klog.KObj(pod), "node", klog.KObj(node))
continue
}
}
klog.V(4).InfoS("Pod fits on node", "pod", klog.KObj(pod), "node", klog.KObj(node))
return true
} else {
klog.V(4).InfoS("Pod does not fit on node", "pod", klog.KObj(pod), "node", klog.KObj(node), "errors", utilerrors.NewAggregate(errors))
}
}
return false
}
111 changes: 111 additions & 0 deletions pkg/descheduler/framework/plugins/loadaware/utilization_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@ limitations under the License.
package loadaware

import (
"context"
"math"
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"

slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/descheduler/test"
)

var (
Expand Down Expand Up @@ -229,3 +234,109 @@ func TestSortPodsOnOneOverloadedNode(t *testing.T) {
sortPodsOnOneOverloadedNode(nodeInfo, removablePods, resourceWeights, false)
assert.Equal(t, expectedResult, removablePods)
}

func TestPodFitsAnyNodeWithThreshold(t *testing.T) {
tests := []struct {
name string
pod *corev1.Pod
nodes []*corev1.Node
nodeUsages map[string]*NodeUsage
nodeThresholds map[string]NodeThresholds
prod bool
podMetric *slov1alpha1.ResourceMap
want bool
}{
{
name: "Nodes matches the Pod via affinity, but exceeds threshold",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-pod-1",
},
Spec: corev1.PodSpec{
Affinity: &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "test-node-type",
Operator: corev1.NodeSelectorOpIn,
Values: []string{
"test-node-type-A",
},
},
},
},
},
},
},
},
},
},
nodes: []*corev1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Labels: map[string]string{
"test-node-type": "test-node-type-A",
},
},
},
},
nodeUsages: map[string]*NodeUsage{
"test-node-1": {
usage: map[corev1.ResourceName]*resource.Quantity{
corev1.ResourceCPU: resource.NewMilliQuantity(1000, resource.DecimalSI),
corev1.ResourceMemory: resource.NewQuantity(2000000000, resource.BinarySI),
},
},
},
nodeThresholds: map[string]NodeThresholds{
"test-node-1": {
highResourceThreshold: map[corev1.ResourceName]*resource.Quantity{
corev1.ResourceCPU: resource.NewMilliQuantity(2000, resource.DecimalSI),
corev1.ResourceMemory: resource.NewMilliQuantity(3000000000, resource.DecimalSI),
},
},
},
podMetric: &slov1alpha1.ResourceMap{
ResourceList: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(1500, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewMilliQuantity(1500000000, resource.DecimalSI),
},
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var objs []runtime.Object
for _, node := range tt.nodes {
objs = append(objs, node)
}
objs = append(objs, tt.pod)

fakeClient := fake.NewSimpleClientset(objs...)

sharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
podInformer := sharedInformerFactory.Core().V1().Pods()

getPodsAssignedToNode, err := test.BuildGetPodsAssignedToNodeFunc(podInformer)
if err != nil {
t.Errorf("Build get pods assigned to node function error: %v", err)
}

sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())

if got := podFitsAnyNodeWithThreshold(getPodsAssignedToNode, tt.pod, tt.nodes, tt.nodeUsages, tt.nodeThresholds, false, tt.podMetric); got != tt.want {
t.Errorf("PodFitsAnyNode() = %v, want %v", got, tt.want)
}
})
}
}

0 comments on commit 749b14d

Please sign in to comment.