Skip to content

Commit

Permalink
Improve scheduler memory usage (knative#8144)
Browse files Browse the repository at this point in the history
* Improve scheduler memory usage

- Create a namespaced-scoped statefulset lister instead of being
  cluster-wide
- Accept a PodLister rather than creating a cluster-wide one

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Update codegen

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Sep 23, 2024
1 parent f969763 commit be90230
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 71 deletions.
44 changes: 28 additions & 16 deletions pkg/scheduler/statefulset/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,17 @@ import (
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/integer"
"knative.dev/pkg/reconciler"

kubeclient "knative.dev/pkg/client/injection/kube/client"
statefulsetinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"

podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
"knative.dev/eventing/pkg/scheduler"
"knative.dev/eventing/pkg/scheduler/factory"
Expand Down Expand Up @@ -78,19 +76,22 @@ type Config struct {

VPodLister scheduler.VPodLister `json:"-"`
NodeLister corev1listers.NodeLister `json:"-"`
// Pod lister for statefulset: StatefulSetNamespace / StatefulSetName
PodLister corev1listers.PodNamespaceLister `json:"-"`

// getReserved returns reserved replicas
getReserved GetReserved
}

func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) {

podInformer := podinformer.Get(ctx)
podLister := podInformer.Lister().Pods(cfg.StatefulSetNamespace)
if cfg.PodLister == nil {
return nil, fmt.Errorf("Config.PodLister is required")
}

scaleCache := scheduler.NewScaleCache(ctx, cfg.StatefulSetNamespace, kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), cfg.ScaleCacheConfig)

stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, podLister, cfg.NodeLister, scaleCache)
stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, cfg.PodLister, cfg.NodeLister, scaleCache)

var getReserved GetReserved
cfg.getReserved = func() map[types.NamespacedName]map[string]int32 {
Expand All @@ -106,7 +107,7 @@ func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) {
autoscaler.Start(ctx)
}()

s := newStatefulSetScheduler(ctx, cfg, stateAccessor, autoscaler, podLister)
s := newStatefulSetScheduler(ctx, cfg, stateAccessor, autoscaler)
getReserved = s.Reserved
wg.Done()

Expand All @@ -130,7 +131,6 @@ type StatefulSetScheduler struct {
statefulSetName string
statefulSetNamespace string
statefulSetClient clientappsv1.StatefulSetInterface
podLister corev1listers.PodNamespaceLister
vpodLister scheduler.VPodLister
lock sync.Locker
stateAccessor st.StateAccessor
Expand Down Expand Up @@ -168,16 +168,14 @@ func (s *StatefulSetScheduler) Demote(b reconciler.Bucket) {
func newStatefulSetScheduler(ctx context.Context,
cfg *Config,
stateAccessor st.StateAccessor,
autoscaler Autoscaler,
podlister corev1listers.PodNamespaceLister) *StatefulSetScheduler {
autoscaler Autoscaler) *StatefulSetScheduler {

scheduler := &StatefulSetScheduler{
ctx: ctx,
logger: logging.FromContext(ctx),
statefulSetNamespace: cfg.StatefulSetNamespace,
statefulSetName: cfg.StatefulSetName,
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace),
podLister: podlister,
vpodLister: cfg.VPodLister,
lock: new(sync.Mutex),
stateAccessor: stateAccessor,
Expand All @@ -186,11 +184,25 @@ func newStatefulSetScheduler(ctx context.Context,
}

// Monitor our statefulset
statefulsetInformer := statefulsetinformer.Get(ctx)
statefulsetInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithNameAndNamespace(cfg.StatefulSetNamespace, cfg.StatefulSetName),
Handler: controller.HandleAll(scheduler.updateStatefulset),
})
c := kubeclient.Get(ctx)
sif := informers.NewSharedInformerFactoryWithOptions(c,
controller.GetResyncPeriod(ctx),
informers.WithNamespace(cfg.StatefulSetNamespace),
)

sif.Apps().V1().StatefulSets().Informer().
AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithNameAndNamespace(cfg.StatefulSetNamespace, cfg.StatefulSetName),
Handler: controller.HandleAll(scheduler.updateStatefulset),
})

sif.Start(ctx.Done())
_ = sif.WaitForCacheSync(ctx.Done())

go func() {
<-ctx.Done()
sif.Shutdown()
}()

return scheduler
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/statefulset/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ func TestStatefulsetScheduler(t *testing.T) {
StatefulSetName: sfsName,
VPodLister: vpodClient.List,
}
s := newStatefulSetScheduler(ctx, cfg, sa, nil, lsp.GetPodLister().Pods(testNs))
s := newStatefulSetScheduler(ctx, cfg, sa, nil)

// Give some time for the informer to notify the scheduler and set the number of replicas
err = wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, time.Second, true, func(ctx context.Context) (bool, error) {
Expand Down Expand Up @@ -906,7 +906,7 @@ func TestReservePlacements(t *testing.T) {
VPodLister: vpodClient.List,
}
fa := newFakeAutoscaler()
s := newStatefulSetScheduler(ctx, cfg, nil, fa, nil)
s := newStatefulSetScheduler(ctx, cfg, nil, fa)
_ = s.Promote(reconciler.UniversalBucket(), func(bucket reconciler.Bucket, name types.NamespacedName) {})

s.reservePlacements(tc.vpod, tc.vpod.GetPlacements()) //initial reserve
Expand Down

This file was deleted.

1 change: 0 additions & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,6 @@ knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints
knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints/fake
knative.dev/pkg/client/injection/kube/informers/core/v1/namespace
knative.dev/pkg/client/injection/kube/informers/core/v1/namespace/fake
knative.dev/pkg/client/injection/kube/informers/core/v1/pod
knative.dev/pkg/client/injection/kube/informers/core/v1/secret/filtered
knative.dev/pkg/client/injection/kube/informers/core/v1/service
knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake
Expand Down

0 comments on commit be90230

Please sign in to comment.