From 17ab884ce87f8d412be972a325ee334dc0c10285 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Fri, 9 Aug 2024 15:06:04 +0200 Subject: [PATCH 1/2] Improve scheduler memory usage (#8144) * Improve scheduler memory usage - Create a namespaced-scoped statefulset lister instead of being cluster-wide - Accept a PodLister rather than creating a cluster-wide one Signed-off-by: Pierangelo Di Pilato * Update codegen Signed-off-by: Pierangelo Di Pilato --------- Signed-off-by: Pierangelo Di Pilato --- pkg/scheduler/statefulset/scheduler.go | 44 ++++++++++------ pkg/scheduler/statefulset/scheduler_test.go | 4 +- .../kube/informers/core/v1/pod/pod.go | 52 ------------------- vendor/modules.txt | 1 - 4 files changed, 30 insertions(+), 71 deletions(-) delete mode 100644 vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/pod/pod.go diff --git a/pkg/scheduler/statefulset/scheduler.go b/pkg/scheduler/statefulset/scheduler.go index 62235e474c1..5137d096b50 100644 --- a/pkg/scheduler/statefulset/scheduler.go +++ b/pkg/scheduler/statefulset/scheduler.go @@ -28,6 +28,7 @@ import ( "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" @@ -35,12 +36,9 @@ import ( "knative.dev/pkg/reconciler" kubeclient "knative.dev/pkg/client/injection/kube/client" - statefulsetinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset" "knative.dev/pkg/controller" "knative.dev/pkg/logging" - podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" - duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" "knative.dev/eventing/pkg/scheduler" "knative.dev/eventing/pkg/scheduler/factory" @@ -78,6 +76,8 @@ type Config struct { VPodLister scheduler.VPodLister `json:"-"` NodeLister corev1listers.NodeLister `json:"-"` + // Pod lister for statefulset: StatefulSetNamespace / StatefulSetName + PodLister corev1listers.PodNamespaceLister `json:"-"` // getReserved returns reserved replicas getReserved GetReserved @@ -85,12 +85,13 @@ type Config struct { func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) { - podInformer := podinformer.Get(ctx) - podLister := podInformer.Lister().Pods(cfg.StatefulSetNamespace) + if cfg.PodLister == nil { + return nil, fmt.Errorf("Config.PodLister is required") + } scaleCache := scheduler.NewScaleCache(ctx, cfg.StatefulSetNamespace, kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), cfg.ScaleCacheConfig) - stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, podLister, cfg.NodeLister, scaleCache) + stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, cfg.PodLister, cfg.NodeLister, scaleCache) var getReserved GetReserved cfg.getReserved = func() map[types.NamespacedName]map[string]int32 { @@ -106,7 +107,7 @@ func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) { autoscaler.Start(ctx) }() - s := newStatefulSetScheduler(ctx, cfg, stateAccessor, autoscaler, podLister) + s := newStatefulSetScheduler(ctx, cfg, stateAccessor, autoscaler) getReserved = s.Reserved wg.Done() @@ -130,7 +131,6 @@ type StatefulSetScheduler struct { statefulSetName string statefulSetNamespace string statefulSetClient clientappsv1.StatefulSetInterface - podLister corev1listers.PodNamespaceLister vpodLister scheduler.VPodLister lock sync.Locker stateAccessor st.StateAccessor @@ -168,8 +168,7 @@ func (s *StatefulSetScheduler) Demote(b reconciler.Bucket) { func newStatefulSetScheduler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor, - autoscaler Autoscaler, - podlister corev1listers.PodNamespaceLister) *StatefulSetScheduler { + autoscaler Autoscaler) *StatefulSetScheduler { scheduler := &StatefulSetScheduler{ ctx: ctx, @@ -177,7 +176,6 @@ func newStatefulSetScheduler(ctx context.Context, statefulSetNamespace: cfg.StatefulSetNamespace, statefulSetName: cfg.StatefulSetName, statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), - podLister: podlister, vpodLister: cfg.VPodLister, lock: new(sync.Mutex), stateAccessor: stateAccessor, @@ -186,11 +184,25 @@ func newStatefulSetScheduler(ctx context.Context, } // Monitor our statefulset - statefulsetInformer := statefulsetinformer.Get(ctx) - statefulsetInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: controller.FilterWithNameAndNamespace(cfg.StatefulSetNamespace, cfg.StatefulSetName), - Handler: controller.HandleAll(scheduler.updateStatefulset), - }) + c := kubeclient.Get(ctx) + sif := informers.NewSharedInformerFactoryWithOptions(c, + controller.GetResyncPeriod(ctx), + informers.WithNamespace(cfg.StatefulSetNamespace), + ) + + sif.Apps().V1().StatefulSets().Informer(). + AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.FilterWithNameAndNamespace(cfg.StatefulSetNamespace, cfg.StatefulSetName), + Handler: controller.HandleAll(scheduler.updateStatefulset), + }) + + sif.Start(ctx.Done()) + _ = sif.WaitForCacheSync(ctx.Done()) + + go func() { + <-ctx.Done() + sif.Shutdown() + }() return scheduler } diff --git a/pkg/scheduler/statefulset/scheduler_test.go b/pkg/scheduler/statefulset/scheduler_test.go index a1a0537d5dc..de0db157e0f 100644 --- a/pkg/scheduler/statefulset/scheduler_test.go +++ b/pkg/scheduler/statefulset/scheduler_test.go @@ -810,7 +810,7 @@ func TestStatefulsetScheduler(t *testing.T) { StatefulSetName: sfsName, VPodLister: vpodClient.List, } - s := newStatefulSetScheduler(ctx, cfg, sa, nil, lsp.GetPodLister().Pods(testNs)) + s := newStatefulSetScheduler(ctx, cfg, sa, nil) // Give some time for the informer to notify the scheduler and set the number of replicas err = wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, time.Second, true, func(ctx context.Context) (bool, error) { @@ -906,7 +906,7 @@ func TestReservePlacements(t *testing.T) { VPodLister: vpodClient.List, } fa := newFakeAutoscaler() - s := newStatefulSetScheduler(ctx, cfg, nil, fa, nil) + s := newStatefulSetScheduler(ctx, cfg, nil, fa) _ = s.Promote(reconciler.UniversalBucket(), func(bucket reconciler.Bucket, name types.NamespacedName) {}) s.reservePlacements(tc.vpod, tc.vpod.GetPlacements()) //initial reserve diff --git a/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/pod/pod.go b/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/pod/pod.go deleted file mode 100644 index d547fef8f95..00000000000 --- a/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/pod/pod.go +++ /dev/null @@ -1,52 +0,0 @@ -/* -Copyright 2022 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Code generated by injection-gen. DO NOT EDIT. - -package pod - -import ( - context "context" - - v1 "k8s.io/client-go/informers/core/v1" - factory "knative.dev/pkg/client/injection/kube/informers/factory" - controller "knative.dev/pkg/controller" - injection "knative.dev/pkg/injection" - logging "knative.dev/pkg/logging" -) - -func init() { - injection.Default.RegisterInformer(withInformer) -} - -// Key is used for associating the Informer inside the context.Context. -type Key struct{} - -func withInformer(ctx context.Context) (context.Context, controller.Informer) { - f := factory.Get(ctx) - inf := f.Core().V1().Pods() - return context.WithValue(ctx, Key{}, inf), inf.Informer() -} - -// Get extracts the typed informer from the context. -func Get(ctx context.Context) v1.PodInformer { - untyped := ctx.Value(Key{}) - if untyped == nil { - logging.FromContext(ctx).Panic( - "Unable to fetch k8s.io/client-go/informers/core/v1.PodInformer from context.") - } - return untyped.(v1.PodInformer) -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 68ba3fe0e49..b6c68aa3643 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1276,7 +1276,6 @@ knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints/fake knative.dev/pkg/client/injection/kube/informers/core/v1/namespace knative.dev/pkg/client/injection/kube/informers/core/v1/namespace/fake -knative.dev/pkg/client/injection/kube/informers/core/v1/pod knative.dev/pkg/client/injection/kube/informers/core/v1/secret/filtered knative.dev/pkg/client/injection/kube/informers/core/v1/service knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake From 3060ca83744db39b3467955d0c8ab628f64cb5ed Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Mon, 23 Sep 2024 15:50:23 +0200 Subject: [PATCH 2/2] Remove scheduler `wait`s to speed up recovery time (#8200) Currently, the scheduler and autoscaler are single threads and use a lock to prevent multiple scheduling and autoscaling decision from happening in parallel; this is not a problem for our use cases, however, the multiple `wait` currently present are slowing down recovery time. From my testing, if I delete and recreate the Kafka control plane and data plane, without this patch it takes 1h to recover when there are 400 triggers or 20 minutes when there are 100 triggers; with the patch it is immediate (only a 2/3 minutes with 400 triggers). - Remove `wait`s from state builder and autoscaler - Add additional debug logs - Use logger provided through the context as opposed to gloabal loggers in each individual component to preserve `knative/pkg` resource aware log keys. Signed-off-by: Pierangelo Di Pilato --- pkg/scheduler/scheduler.go | 8 +- pkg/scheduler/scheduler_test.go | 5 +- pkg/scheduler/state/helpers.go | 9 +- pkg/scheduler/state/state.go | 95 ++++++++--------- pkg/scheduler/state/state_test.go | 4 +- pkg/scheduler/statefulset/autoscaler.go | 104 ++++++++++--------- pkg/scheduler/statefulset/autoscaler_test.go | 19 ++-- pkg/scheduler/statefulset/scheduler.go | 93 ++++++++++------- pkg/scheduler/statefulset/scheduler_test.go | 4 +- 9 files changed, 181 insertions(+), 160 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index bc2f043db18..c67ea2b99ac 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -92,18 +92,18 @@ type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) erro // Scheduler is responsible for placing VPods into real Kubernetes pods type Scheduler interface { // Schedule computes the new set of placements for vpod. - Schedule(vpod VPod) ([]duckv1alpha1.Placement, error) + Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) } // SchedulerFunc type is an adapter to allow the use of // ordinary functions as Schedulers. If f is a function // with the appropriate signature, SchedulerFunc(f) is a // Scheduler that calls f. -type SchedulerFunc func(vpod VPod) ([]duckv1alpha1.Placement, error) +type SchedulerFunc func(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) // Schedule implements the Scheduler interface. -func (f SchedulerFunc) Schedule(vpod VPod) ([]duckv1alpha1.Placement, error) { - return f(vpod) +func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) { + return f(ctx, vpod) } // VPod represents virtual replicas placed into real Kubernetes pods diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 597e312f8b4..154a2a64741 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "context" "testing" "github.com/stretchr/testify/require" @@ -28,12 +29,12 @@ func TestSchedulerFuncSchedule(t *testing.T) { called := 0 - var s Scheduler = SchedulerFunc(func(vpod VPod) ([]duckv1alpha1.Placement, error) { + var s Scheduler = SchedulerFunc(func(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) { called++ return nil, nil }) - _, err := s.Schedule(nil) + _, err := s.Schedule(context.Background(), nil) require.Nil(t, err) require.Equal(t, 1, called) } diff --git a/pkg/scheduler/state/helpers.go b/pkg/scheduler/state/helpers.go index 3f8670e7e73..475d2974cd7 100644 --- a/pkg/scheduler/state/helpers.go +++ b/pkg/scheduler/state/helpers.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "knative.dev/eventing/pkg/scheduler" ) @@ -55,10 +56,10 @@ func SatisfyZoneAvailability(feasiblePods []int32, states *State) bool { var zoneName string var err error for _, podID := range feasiblePods { - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID)) - return err == nil, nil - }) + zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID)) + if err != nil { + continue + } zoneMap[zoneName] = struct{}{} } return len(zoneMap) == int(states.NumZones) diff --git a/pkg/scheduler/state/state.go b/pkg/scheduler/state/state.go index 04794805a39..6cf06e93e18 100644 --- a/pkg/scheduler/state/state.go +++ b/pkg/scheduler/state/state.go @@ -22,7 +22,6 @@ import ( "errors" "math" "strconv" - "time" "go.uber.org/zap" v1 "k8s.io/api/core/v1" @@ -30,9 +29,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" corev1 "k8s.io/client-go/listers/core/v1" - "knative.dev/pkg/logging" "knative.dev/eventing/pkg/scheduler" @@ -42,7 +39,7 @@ type StateAccessor interface { // State returns the current state (snapshot) about placed vpods // Take into account reserved vreplicas and update `reserved` to reflect // the current state. - State(reserved map[types.NamespacedName]map[string]int32) (*State, error) + State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error) } // state provides information about the current scheduling of all vpods @@ -152,8 +149,6 @@ func (s *State) IsSchedulablePod(ordinal int32) bool { // stateBuilder reconstruct the state from scratch, by listing vpods type stateBuilder struct { - ctx context.Context - logger *zap.SugaredLogger vpodLister scheduler.VPodLister capacity int32 schedulerPolicy scheduler.SchedulerPolicyType @@ -166,11 +161,9 @@ type stateBuilder struct { } // NewStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested -func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy *scheduler.SchedulerPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor { +func NewStateBuilder(sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor { return &stateBuilder{ - ctx: ctx, - logger: logging.FromContext(ctx), vpodLister: lister, capacity: podCapacity, schedulerPolicy: schedulerPolicy, @@ -183,15 +176,18 @@ func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister sche } } -func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) (*State, error) { +func (s *stateBuilder) State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error) { vpods, err := s.vpodLister() if err != nil { return nil, err } - scale, err := s.statefulSetCache.GetScale(s.ctx, s.statefulSetName, metav1.GetOptions{}) + logger := logging.FromContext(ctx).With("subcomponent", "statebuilder") + ctx = logging.WithLogger(ctx, logger) + + scale, err := s.statefulSetCache.GetScale(ctx, s.statefulSetName, metav1.GetOptions{}) if err != nil { - s.logger.Infow("failed to get statefulset", zap.Error(err)) + logger.Infow("failed to get statefulset", zap.Error(err)) return nil, err } @@ -235,36 +231,35 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) } for podId := int32(0); podId < scale.Spec.Replicas && s.podLister != nil; podId++ { - var pod *v1.Pod - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - pod, err = s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId)) - return err == nil, nil - }) - - if pod != nil { - if isPodUnschedulable(pod) { - // Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod. - continue - } - - node, err := s.nodeLister.Get(pod.Spec.NodeName) - if err != nil { - return nil, err - } + pod, err := s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId)) + if err != nil { + logger.Warnw("Failed to get pod", zap.Int32("ordinal", podId), zap.Error(err)) + continue + } + if isPodUnschedulable(pod) { + // Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod. + logger.Debugw("Pod is unschedulable", zap.Any("pod", pod)) + continue + } - if isNodeUnschedulable(node) { - // Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node. - continue - } + node, err := s.nodeLister.Get(pod.Spec.NodeName) + if err != nil { + return nil, err + } - // Pod has no annotation or not annotated as unschedulable and - // not on an unschedulable node, so add to feasible - schedulablePods.Insert(podId) + if isNodeUnschedulable(node) { + // Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node. + logger.Debugw("Pod is on an unschedulable node", zap.Any("pod", node)) + continue } + + // Pod has no annotation or not annotated as unschedulable and + // not on an unschedulable node, so add to feasible + schedulablePods.Insert(podId) } for _, p := range schedulablePods.List() { - free, last = s.updateFreeCapacity(free, last, PodNameFromOrdinal(s.statefulSetName, p), 0) + free, last = s.updateFreeCapacity(logger, free, last, PodNameFromOrdinal(s.statefulSetName, p), 0) } // Getting current state from existing placements for all vpods @@ -286,15 +281,14 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) // Account for reserved vreplicas vreplicas = withReserved(vpod.GetKey(), podName, vreplicas, reserved) - free, last = s.updateFreeCapacity(free, last, podName, vreplicas) + free, last = s.updateFreeCapacity(logger, free, last, podName, vreplicas) withPlacement[vpod.GetKey()][podName] = true - var pod *v1.Pod - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - pod, err = s.podLister.Get(podName) - return err == nil, nil - }) + pod, err := s.podLister.Get(podName) + if err != nil { + logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err)) + } if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) { nodeName := pod.Spec.NodeName //node name for this pod @@ -315,11 +309,10 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) continue } - var pod *v1.Pod - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - pod, err = s.podLister.Get(podName) - return err == nil, nil - }) + pod, err := s.podLister.Get(podName) + if err != nil { + logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err)) + } if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) { nodeName := pod.Spec.NodeName //node name for this pod @@ -330,7 +323,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) } } - free, last = s.updateFreeCapacity(free, last, podName, rvreplicas) + free, last = s.updateFreeCapacity(logger, free, last, podName, rvreplicas) } } @@ -338,7 +331,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) SchedulerPolicy: s.schedulerPolicy, SchedPolicy: s.schedPolicy, DeschedPolicy: s.deschedPolicy, NodeToZoneMap: nodeToZoneMap, StatefulSetName: s.statefulSetName, PodLister: s.podLister, PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod} - s.logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved))) + logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved))) return state, nil } @@ -350,7 +343,7 @@ func pendingFromVPod(vpod scheduler.VPod) int32 { return int32(math.Max(float64(0), float64(expected-scheduled))) } -func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) { +func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) { ordinal := OrdinalFromPodName(podName) free = grow(free, ordinal, s.capacity) @@ -359,7 +352,7 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri // Assert the pod is not overcommitted if free[ordinal] < 0 { // This should not happen anymore. Log as an error but do not interrupt the current scheduling. - s.logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) + logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) } if ordinal > last { diff --git a/pkg/scheduler/state/state_test.go b/pkg/scheduler/state/state_test.go index dacc4aa2ceb..7716e19c758 100644 --- a/pkg/scheduler/state/state_test.go +++ b/pkg/scheduler/state/state_test.go @@ -645,8 +645,8 @@ func TestStateBuilder(t *testing.T) { scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5}) - stateBuilder := NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, int32(10), tc.schedulerPolicyType, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache) - state, err := stateBuilder.State(tc.reserved) + stateBuilder := NewStateBuilder(sfsName, vpodClient.List, int32(10), tc.schedulerPolicyType, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache) + state, err := stateBuilder.State(ctx, tc.reserved) if err != nil { t.Fatal("unexpected error", err) } diff --git a/pkg/scheduler/statefulset/autoscaler.go b/pkg/scheduler/statefulset/autoscaler.go index fe15aff3a51..19997d173ad 100644 --- a/pkg/scheduler/statefulset/autoscaler.go +++ b/pkg/scheduler/statefulset/autoscaler.go @@ -18,6 +18,7 @@ package statefulset import ( "context" + "fmt" "math" "sync" "sync/atomic" @@ -27,10 +28,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "knative.dev/pkg/reconciler" - "knative.dev/pkg/logging" + "knative.dev/pkg/reconciler" "knative.dev/eventing/pkg/scheduler" st "knative.dev/eventing/pkg/scheduler/state" @@ -58,9 +57,8 @@ type autoscaler struct { statefulSetCache *scheduler.ScaleCache statefulSetName string vpodLister scheduler.VPodLister - logger *zap.SugaredLogger stateAccessor st.StateAccessor - trigger chan struct{} + trigger chan context.Context evictor scheduler.Evictor // capacity is the total number of virtual replicas available per pod. @@ -68,7 +66,9 @@ type autoscaler struct { // refreshPeriod is how often the autoscaler tries to scale down the statefulset refreshPeriod time.Duration - lock sync.Locker + // retryPeriod is how often the autoscaler retry failed autoscale operations + retryPeriod time.Duration + lock sync.Locker // isLeader signals whether a given autoscaler instance is leader or not. // The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a @@ -104,17 +104,17 @@ func (a *autoscaler) Demote(b reconciler.Bucket) { } } -func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor, statefulSetCache *scheduler.ScaleCache) *autoscaler { - return &autoscaler{ - logger: logging.FromContext(ctx).With(zap.String("component", "autoscaler")), +func newAutoscaler(cfg *Config, stateAccessor st.StateAccessor, statefulSetCache *scheduler.ScaleCache) *autoscaler { + a := &autoscaler{ statefulSetCache: statefulSetCache, statefulSetName: cfg.StatefulSetName, vpodLister: cfg.VPodLister, stateAccessor: stateAccessor, evictor: cfg.Evictor, - trigger: make(chan struct{}, 1), + trigger: make(chan context.Context, 1), capacity: cfg.PodCapacity, refreshPeriod: cfg.RefreshPeriod, + retryPeriod: cfg.RetryPeriod, lock: new(sync.Mutex), isLeader: atomic.Bool{}, getReserved: cfg.getReserved, @@ -124,25 +124,38 @@ func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAcces Add(-cfg.RefreshPeriod). Add(-time.Minute), } + + if a.retryPeriod == 0 { + a.retryPeriod = time.Second + } + + return a } func (a *autoscaler) Start(ctx context.Context) { attemptScaleDown := false for { + autoscaleCtx := ctx select { case <-ctx.Done(): return case <-time.After(a.refreshPeriod): - a.logger.Infow("Triggering scale down", zap.Bool("isLeader", a.isLeader.Load())) + logging.FromContext(ctx).Infow("Triggering scale down", zap.Bool("isLeader", a.isLeader.Load())) attemptScaleDown = true - case <-a.trigger: - a.logger.Infow("Triggering scale up", zap.Bool("isLeader", a.isLeader.Load())) + case autoscaleCtx = <-a.trigger: + logging.FromContext(autoscaleCtx).Infow("Triggering scale up", zap.Bool("isLeader", a.isLeader.Load())) attemptScaleDown = false } // Retry a few times, just so that we don't have to wait for the next beat when // a transient error occurs - a.syncAutoscale(ctx, attemptScaleDown) + if err := a.syncAutoscale(autoscaleCtx, attemptScaleDown); err != nil { + logging.FromContext(autoscaleCtx).Errorw("Failed to sync autoscale", zap.Error(err)) + go func() { + time.Sleep(a.retryPeriod) + a.Autoscale(ctx) // Use top-level context for background retries + }() + } } } @@ -150,10 +163,10 @@ func (a *autoscaler) Autoscale(ctx context.Context) { select { // We trigger the autoscaler asynchronously by using the channel so that the scale down refresh // period is reset. - case a.trigger <- struct{}{}: + case a.trigger <- ctx: default: // We don't want to block if the channel's buffer is full, it will be triggered eventually. - + logging.FromContext(ctx).Debugw("Skipping autoscale since autoscale is in progress") } } @@ -161,36 +174,34 @@ func (a *autoscaler) syncAutoscale(ctx context.Context, attemptScaleDown bool) e a.lock.Lock() defer a.lock.Unlock() - var lastErr error - wait.Poll(500*time.Millisecond, 5*time.Second, func() (bool, error) { - err := a.doautoscale(ctx, attemptScaleDown) - if err != nil { - logging.FromContext(ctx).Errorw("Failed to autoscale", zap.Error(err)) - } - lastErr = err - return err == nil, nil - }) - return lastErr + if err := a.doautoscale(ctx, attemptScaleDown); err != nil { + return fmt.Errorf("failed to do autoscale: %w", err) + } + return nil } func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) error { if !a.isLeader.Load() { return nil } - state, err := a.stateAccessor.State(a.getReserved()) + + logger := logging.FromContext(ctx).With("component", "autoscaler") + ctx = logging.WithLogger(ctx, logger) + + state, err := a.stateAccessor.State(ctx, a.getReserved()) if err != nil { - a.logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) + logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return err } scale, err := a.statefulSetCache.GetScale(ctx, a.statefulSetName, metav1.GetOptions{}) if err != nil { // skip a beat - a.logger.Infow("failed to get scale subresource", zap.Error(err)) + logger.Infow("failed to get scale subresource", zap.Error(err)) return err } - a.logger.Debugw("checking adapter capacity", + logger.Debugw("checking adapter capacity", zap.Int32("replicas", scale.Spec.Replicas), zap.Any("state", state)) @@ -234,43 +245,43 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err if newreplicas != scale.Spec.Replicas { scale.Spec.Replicas = newreplicas - a.logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas)) + logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas)) _, err = a.statefulSetCache.UpdateScale(ctx, a.statefulSetName, scale, metav1.UpdateOptions{}) if err != nil { - a.logger.Errorw("updating scale subresource failed", zap.Error(err)) + logger.Errorw("updating scale subresource failed", zap.Error(err)) return err } } else if attemptScaleDown { // since the number of replicas hasn't changed and time has approached to scale down, // take the opportunity to compact the vreplicas - a.mayCompact(state, scaleUpFactor) + return a.mayCompact(logger, state, scaleUpFactor) } return nil } -func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { +func (a *autoscaler) mayCompact(logger *zap.SugaredLogger, s *st.State, scaleUpFactor int32) error { // This avoids a too aggressive scale down by adding a "grace period" based on the refresh // period nextAttempt := a.lastCompactAttempt.Add(a.refreshPeriod) if time.Now().Before(nextAttempt) { - a.logger.Debugw("Compact was retried before refresh period", + logger.Debugw("Compact was retried before refresh period", zap.Time("lastCompactAttempt", a.lastCompactAttempt), zap.Time("nextAttempt", nextAttempt), zap.String("refreshPeriod", a.refreshPeriod.String()), ) - return + return nil } - a.logger.Debugw("Trying to compact and scale down", + logger.Debugw("Trying to compact and scale down", zap.Int32("scaleUpFactor", scaleUpFactor), zap.Any("state", s), ) // when there is only one pod there is nothing to move or number of pods is just enough! if s.LastOrdinal < 1 || len(s.SchedulablePods) <= int(scaleUpFactor) { - return + return nil } if s.SchedulerPolicy == scheduler.MAXFILLUP { @@ -283,7 +294,7 @@ func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { a.lastCompactAttempt = time.Now() err := a.compact(s, scaleUpFactor) if err != nil { - a.logger.Errorw("vreplicas compaction failed", zap.Error(err)) + return fmt.Errorf("vreplicas compaction failed (scaleUpFactor %d): %w", scaleUpFactor, err) } } @@ -303,10 +314,11 @@ func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { a.lastCompactAttempt = time.Now() err := a.compact(s, scaleUpFactor) if err != nil { - a.logger.Errorw("vreplicas compaction failed", zap.Error(err)) + return fmt.Errorf("vreplicas compaction failed (scaleUpFactor %d): %w", scaleUpFactor, err) } } } + return nil } func (a *autoscaler) compact(s *st.State, scaleUpFactor int32) error { @@ -323,16 +335,14 @@ func (a *autoscaler) compact(s *st.State, scaleUpFactor int32) error { ordinal := st.OrdinalFromPodName(placements[i].PodName) if ordinal == s.LastOrdinal-j { - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - if s.PodLister != nil { - pod, err = s.PodLister.Get(placements[i].PodName) - } - return err == nil, nil - }) + pod, err = s.PodLister.Get(placements[i].PodName) + if err != nil { + return fmt.Errorf("failed to get pod %s: %w", placements[i].PodName, err) + } err = a.evictor(pod, vpod, &placements[i]) if err != nil { - return err + return fmt.Errorf("failed to evict pod %s: %w", pod.Name, err) } } } diff --git a/pkg/scheduler/statefulset/autoscaler_test.go b/pkg/scheduler/statefulset/autoscaler_test.go index 37c055635f1..37ae127b9af 100644 --- a/pkg/scheduler/statefulset/autoscaler_test.go +++ b/pkg/scheduler/statefulset/autoscaler_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/types" v1 "k8s.io/client-go/listers/core/v1" gtesting "k8s.io/client-go/testing" + "knative.dev/pkg/logging" "knative.dev/pkg/reconciler" kubeclient "knative.dev/pkg/client/injection/kube/client/fake" @@ -451,7 +452,7 @@ func TestAutoscaler(t *testing.T) { scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5}) - stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lspp, lsnn, scaleCache) + stateAccessor := state.NewStateBuilder(sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lspp, lsnn, scaleCache) sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs) _, err := sfsClient.Create(ctx, tscheduler.MakeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{}) @@ -474,7 +475,7 @@ func TestAutoscaler(t *testing.T) { return tc.reserved }, } - autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache) + autoscaler := newAutoscaler(cfg, stateAccessor, scaleCache) _ = autoscaler.Promote(reconciler.UniversalBucket(), nil) for _, vpod := range tc.vpods { @@ -512,7 +513,7 @@ func TestAutoscalerScaleDownToZero(t *testing.T) { vpodClient := tscheduler.NewVPodClient() ls := listers.NewListers(nil) scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5}) - stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, scheduler.MAXFILLUP, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, nil, ls.GetNodeLister(), scaleCache) + stateAccessor := state.NewStateBuilder(sfsName, vpodClient.List, 10, scheduler.MAXFILLUP, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, nil, ls.GetNodeLister(), scaleCache) sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs) _, err := sfsClient.Create(ctx, tscheduler.MakeStatefulset(testNs, sfsName, 10), metav1.CreateOptions{}) @@ -535,7 +536,7 @@ func TestAutoscalerScaleDownToZero(t *testing.T) { return nil }, } - autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache) + autoscaler := newAutoscaler(cfg, stateAccessor, scaleCache) _ = autoscaler.Promote(reconciler.UniversalBucket(), nil) done := make(chan bool) @@ -950,7 +951,7 @@ func TestCompactor(t *testing.T) { lsp := listers.NewListers(podlist) lsn := listers.NewListers(nodelist) scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5}) - stateAccessor := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache) + stateAccessor := state.NewStateBuilder(sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache) evictions := make(map[types.NamespacedName][]duckv1alpha1.Placement) recordEviction := func(pod *corev1.Pod, vpod scheduler.VPod, from *duckv1alpha1.Placement) error { @@ -966,7 +967,7 @@ func TestCompactor(t *testing.T) { RefreshPeriod: 10 * time.Second, PodCapacity: 10, } - autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache) + autoscaler := newAutoscaler(cfg, stateAccessor, scaleCache) _ = autoscaler.Promote(reconciler.UniversalBucket(), func(bucket reconciler.Bucket, name types.NamespacedName) {}) assert.Equal(t, true, autoscaler.isLeader.Load()) @@ -974,7 +975,7 @@ func TestCompactor(t *testing.T) { vpodClient.Append(vpod) } - state, err := stateAccessor.State(nil) + state, err := stateAccessor.State(ctx, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -988,7 +989,9 @@ func TestCompactor(t *testing.T) { scaleUpFactor = 1 // Non-HA scaling } - autoscaler.mayCompact(state, scaleUpFactor) + if err := autoscaler.mayCompact(logging.FromContext(ctx), state, scaleUpFactor); err != nil { + t.Fatal(err) + } if tc.wantEvictions == nil && len(evictions) != 0 { t.Fatalf("unexpected evictions: %v", evictions) diff --git a/pkg/scheduler/statefulset/scheduler.go b/pkg/scheduler/statefulset/scheduler.go index 5137d096b50..613410ec7a8 100644 --- a/pkg/scheduler/statefulset/scheduler.go +++ b/pkg/scheduler/statefulset/scheduler.go @@ -33,11 +33,11 @@ import ( corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/utils/integer" + "knative.dev/pkg/logging" "knative.dev/pkg/reconciler" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/controller" - "knative.dev/pkg/logging" duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" "knative.dev/eventing/pkg/scheduler" @@ -67,6 +67,8 @@ type Config struct { PodCapacity int32 `json:"podCapacity"` // Autoscaler refresh period RefreshPeriod time.Duration `json:"refreshPeriod"` + // Autoscaler retry period + RetryPeriod time.Duration `json:"retryPeriod"` SchedulerPolicy scheduler.SchedulerPolicyType `json:"schedulerPolicy"` SchedPolicy *scheduler.SchedulerPolicy `json:"schedPolicy"` @@ -91,14 +93,14 @@ func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) { scaleCache := scheduler.NewScaleCache(ctx, cfg.StatefulSetNamespace, kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), cfg.ScaleCacheConfig) - stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, cfg.PodLister, cfg.NodeLister, scaleCache) + stateAccessor := st.NewStateBuilder(cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, cfg.PodLister, cfg.NodeLister, scaleCache) var getReserved GetReserved cfg.getReserved = func() map[types.NamespacedName]map[string]int32 { return getReserved() } - autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache) + autoscaler := newAutoscaler(cfg, stateAccessor, scaleCache) var wg sync.WaitGroup wg.Add(1) @@ -126,8 +128,6 @@ func (p Pending) Total() int32 { // StatefulSetScheduler is a scheduler placing VPod into statefulset-managed set of pods type StatefulSetScheduler struct { - ctx context.Context - logger *zap.SugaredLogger statefulSetName string statefulSetNamespace string statefulSetClient clientappsv1.StatefulSetInterface @@ -171,8 +171,6 @@ func newStatefulSetScheduler(ctx context.Context, autoscaler Autoscaler) *StatefulSetScheduler { scheduler := &StatefulSetScheduler{ - ctx: ctx, - logger: logging.FromContext(ctx), statefulSetNamespace: cfg.StatefulSetNamespace, statefulSetName: cfg.StatefulSetName, statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), @@ -193,7 +191,9 @@ func newStatefulSetScheduler(ctx context.Context, sif.Apps().V1().StatefulSets().Informer(). AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithNameAndNamespace(cfg.StatefulSetNamespace, cfg.StatefulSetName), - Handler: controller.HandleAll(scheduler.updateStatefulset), + Handler: controller.HandleAll(func(i interface{}) { + scheduler.updateStatefulset(ctx, i) + }), }) sif.Start(ctx.Done()) @@ -207,13 +207,13 @@ func newStatefulSetScheduler(ctx context.Context, return scheduler } -func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { +func (s *StatefulSetScheduler) Schedule(ctx context.Context, vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { s.lock.Lock() defer s.lock.Unlock() s.reservedMu.Lock() defer s.reservedMu.Unlock() - placements, err := s.scheduleVPod(vpod) + placements, err := s.scheduleVPod(ctx, vpod) if placements == nil { return placements, err } @@ -228,11 +228,13 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla return placements, err } -func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { - logger := s.logger.With("key", vpod.GetKey(), zap.String("component", "scheduler")) +func (s *StatefulSetScheduler) scheduleVPod(ctx context.Context, vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { + logger := logging.FromContext(ctx).With("key", vpod.GetKey(), zap.String("component", "scheduler")) + ctx = logging.WithLogger(ctx, logger) + // Get the current placements state // Quite an expensive operation but safe and simple. - state, err := s.stateAccessor.State(s.reserved) + state, err := s.stateAccessor.State(ctx, s.reserved) if err != nil { logger.Debug("error while refreshing scheduler state (will retry)", zap.Error(err)) return nil, err @@ -270,13 +272,15 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 } // Handle overcommitted pods. - if state.FreeCap[ordinal] < 0 { + if state.Free(ordinal) < 0 { // vr > free => vr: 9, overcommit 4 -> free: 0, vr: 5, pending: +4 // vr = free => vr: 4, overcommit 4 -> free: 0, vr: 0, pending: +4 // vr < free => vr: 3, overcommit 4 -> free: -1, vr: 0, pending: +3 overcommit := -state.FreeCap[ordinal] + logger.Debugw("overcommit", zap.Any("overcommit", overcommit), zap.Any("placement", p)) + if p.VReplicas >= overcommit { state.SetFree(ordinal, 0) state.Pending[vpod.GetKey()] += overcommit @@ -313,7 +317,9 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 if state.SchedulerPolicy != "" { // Need less => scale down if tr > vpod.GetVReplicas() { - logger.Debugw("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) + logger.Debugw("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) placements = s.removeReplicas(tr-vpod.GetVReplicas(), placements) @@ -323,15 +329,19 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 } // Need more => scale up - logger.Debugw("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) + logger.Debugw("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) placements, left = s.addReplicas(state, vpod.GetVReplicas()-tr, placements) } else { //Predicates and priorities must be used for scheduling // Need less => scale down if tr > vpod.GetVReplicas() && state.DeschedPolicy != nil { - logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) - placements = s.removeReplicasWithPolicy(vpod, tr-vpod.GetVReplicas(), placements) + logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) + placements = s.removeReplicasWithPolicy(ctx, vpod, tr-vpod.GetVReplicas(), placements) // Do not trigger the autoscaler to avoid unnecessary churn @@ -343,8 +353,10 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 // Need more => scale up // rebalancing needed for all vreps most likely since there are pending vreps from previous reconciliation // can fall here when vreps scaled up or after eviction - logger.Infow("scaling up with a rebalance (if needed)", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) - placements, left = s.rebalanceReplicasWithPolicy(vpod, vpod.GetVReplicas(), placements) + logger.Infow("scaling up with a rebalance (if needed)", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) + placements, left = s.rebalanceReplicasWithPolicy(ctx, vpod, vpod.GetVReplicas(), placements) } } @@ -355,10 +367,10 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 // Trigger the autoscaler if s.autoscaler != nil { logger.Infow("Awaiting autoscaler", zap.Any("placement", placements), zap.Int32("left", left)) - s.autoscaler.Autoscale(s.ctx) + s.autoscaler.Autoscale(ctx) } - if state.SchedPolicy != nil { + if state.SchedulerPolicy == "" && state.SchedPolicy != nil { logger.Info("reverting to previous placements") s.reservePlacements(vpod, existingPlacements) // rebalancing doesn't care about new placements since all vreps will be re-placed return existingPlacements, s.notEnoughPodReplicas(left) // requeue to wait for the autoscaler to do its job @@ -380,25 +392,25 @@ func toJSONable(pending map[types.NamespacedName]int32) map[string]int32 { return r } -func (s *StatefulSetScheduler) rebalanceReplicasWithPolicy(vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { +func (s *StatefulSetScheduler) rebalanceReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { s.makeZeroPlacements(vpod, placements) - placements, diff = s.addReplicasWithPolicy(vpod, diff, make([]duckv1alpha1.Placement, 0)) //start fresh with a new placements list + placements, diff = s.addReplicasWithPolicy(ctx, vpod, diff, make([]duckv1alpha1.Placement, 0)) //start fresh with a new placements list return placements, diff } -func (s *StatefulSetScheduler) removeReplicasWithPolicy(vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { - logger := s.logger.Named("remove replicas with policy") +func (s *StatefulSetScheduler) removeReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { + logger := logging.FromContext(ctx).Named("remove replicas with policy") numVreps := diff for i := int32(0); i < numVreps; i++ { //deschedule one vreplica at a time - state, err := s.stateAccessor.State(s.reserved) + state, err := s.stateAccessor.State(ctx, s.reserved) if err != nil { logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return placements } - feasiblePods := s.findFeasiblePods(s.ctx, state, vpod, state.DeschedPolicy) + feasiblePods := s.findFeasiblePods(ctx, state, vpod, state.DeschedPolicy) feasiblePods = s.removePodsNotInPlacement(vpod, feasiblePods) if len(feasiblePods) == 1 { //nothing to score, remove vrep from that pod placementPodID := feasiblePods[0] @@ -409,7 +421,7 @@ func (s *StatefulSetScheduler) removeReplicasWithPolicy(vpod scheduler.VPod, dif continue } - priorityList, err := s.prioritizePods(s.ctx, state, vpod, feasiblePods, state.DeschedPolicy) + priorityList, err := s.prioritizePods(ctx, state, vpod, feasiblePods, state.DeschedPolicy) if err != nil { logger.Info("error while scoring pods using priorities", zap.Error(err)) s.reservePlacements(vpod, placements) @@ -455,13 +467,13 @@ func (s *StatefulSetScheduler) removeSelectionFromPlacements(placementPodID int3 return newPlacements } -func (s *StatefulSetScheduler) addReplicasWithPolicy(vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { - logger := s.logger.Named("add replicas with policy") +func (s *StatefulSetScheduler) addReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { + logger := logging.FromContext(ctx).Named("add replicas with policy") numVreps := diff for i := int32(0); i < numVreps; i++ { //schedule one vreplica at a time (find most suitable pod placement satisying predicates with high score) // Get the current placements state - state, err := s.stateAccessor.State(s.reserved) + state, err := s.stateAccessor.State(ctx, s.reserved) if err != nil { logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return placements, diff @@ -474,7 +486,7 @@ func (s *StatefulSetScheduler) addReplicasWithPolicy(vpod scheduler.VPod, diff i break //end the iteration for all vreps since there are not pods } - feasiblePods := s.findFeasiblePods(s.ctx, state, vpod, state.SchedPolicy) + feasiblePods := s.findFeasiblePods(ctx, state, vpod, state.SchedPolicy) if len(feasiblePods) == 0 { //no pods available to schedule this vreplica logger.Info("no feasible pods available to schedule this vreplica") s.reservePlacements(vpod, placements) @@ -492,7 +504,7 @@ func (s *StatefulSetScheduler) addReplicasWithPolicy(vpod scheduler.VPod, diff i continue } */ - priorityList, err := s.prioritizePods(s.ctx, state, vpod, feasiblePods, state.SchedPolicy) + priorityList, err := s.prioritizePods(ctx, state, vpod, feasiblePods, state.SchedPolicy) if err != nil { logger.Info("error while scoring pods using priorities", zap.Error(err)) s.reservePlacements(vpod, placements) @@ -567,7 +579,7 @@ func (s *StatefulSetScheduler) removePodsNotInPlacement(vpod scheduler.VPod, fea // prioritizePods prioritizes the pods by running the score plugins, which return a score for each pod. // The scores from each plugin are added together to make the score for that pod. func (s *StatefulSetScheduler) prioritizePods(ctx context.Context, states *st.State, vpod scheduler.VPod, feasiblePods []int32, policy *scheduler.SchedulerPolicy) (st.PodScoreList, error) { - logger := s.logger.Named("prioritize all feasible pods") + logger := logging.FromContext(ctx).Named("prioritize all feasible pods") // If no priority configs are provided, then all pods will have a score of one result := make(st.PodScoreList, 0, len(feasiblePods)) @@ -630,7 +642,7 @@ func (s *StatefulSetScheduler) selectPod(podScoreList st.PodScoreList) (int32, e // If any of these plugins doesn't return "Success", the pod is not suitable for placing the vrep. // Meanwhile, the failure message and status are set for the given pod. func (s *StatefulSetScheduler) RunFilterPlugins(ctx context.Context, states *st.State, vpod scheduler.VPod, podID int32, policy *scheduler.SchedulerPolicy) st.PluginToStatus { - logger := s.logger.Named("run all filter plugins") + logger := logging.FromContext(ctx).Named("run all filter plugins") statuses := make(st.PluginToStatus) for _, plugin := range policy.Predicates { @@ -663,7 +675,7 @@ func (s *StatefulSetScheduler) runFilterPlugin(ctx context.Context, pl st.Filter // RunScorePlugins runs the set of configured scoring plugins. It returns a list that stores for each scoring plugin name the corresponding PodScoreList(s). // It also returns *Status, which is set to non-success if any of the plugins returns a non-success status. func (s *StatefulSetScheduler) RunScorePlugins(ctx context.Context, states *st.State, vpod scheduler.VPod, feasiblePods []int32, policy *scheduler.SchedulerPolicy) (st.PluginToPodScores, *st.Status) { - logger := s.logger.Named("run all score plugins") + logger := logging.FromContext(ctx).Named("run all score plugins") pluginToPodScores := make(st.PluginToPodScores, len(policy.Priorities)) for _, plugin := range policy.Priorities { @@ -776,10 +788,11 @@ func (s *StatefulSetScheduler) addReplicas(states *st.State, diff int32, placeme return newPlacements, diff } -func (s *StatefulSetScheduler) updateStatefulset(obj interface{}) { +func (s *StatefulSetScheduler) updateStatefulset(ctx context.Context, obj interface{}) { statefulset, ok := obj.(*appsv1.StatefulSet) if !ok { - s.logger.Fatalw("expected a Statefulset object", zap.Any("object", obj)) + logging.FromContext(ctx).Warnw("expected a Statefulset object", zap.Any("object", obj)) + return } s.lock.Lock() @@ -789,7 +802,7 @@ func (s *StatefulSetScheduler) updateStatefulset(obj interface{}) { s.replicas = 1 } else if s.replicas != *statefulset.Spec.Replicas { s.replicas = *statefulset.Spec.Replicas - s.logger.Infow("statefulset replicas updated", zap.Int32("replicas", s.replicas)) + logging.FromContext(ctx).Infow("statefulset replicas updated", zap.Int32("replicas", s.replicas)) } } diff --git a/pkg/scheduler/statefulset/scheduler_test.go b/pkg/scheduler/statefulset/scheduler_test.go index de0db157e0f..4c1ca02ebd3 100644 --- a/pkg/scheduler/statefulset/scheduler_test.go +++ b/pkg/scheduler/statefulset/scheduler_test.go @@ -804,7 +804,7 @@ func TestStatefulsetScheduler(t *testing.T) { lsp := listers.NewListers(podlist) lsn := listers.NewListers(nodelist) scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5}) - sa := state.NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache) + sa := state.NewStateBuilder(sfsName, vpodClient.List, 10, tc.schedulerPolicyType, tc.schedulerPolicy, tc.deschedulerPolicy, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache) cfg := &Config{ StatefulSetNamespace: testNs, StatefulSetName: sfsName, @@ -823,7 +823,7 @@ func TestStatefulsetScheduler(t *testing.T) { } vpod := vpodClient.Create(vpodNamespace, vpodName, tc.vreplicas, tc.placements) - placements, err := s.Schedule(vpod) + placements, err := s.Schedule(ctx, vpod) if tc.err == nil && err != nil { t.Fatal("unexpected error", err)