Skip to content

Commit

Permalink
Add snapshot sync controller
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandr Demicev <alexandr.demicev@suse.com>
  • Loading branch information
alexander-demicev committed Sep 4, 2024
1 parent 5bd7391 commit 01db8b0
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 10 deletions.
13 changes: 9 additions & 4 deletions exp/etcdrestore/controllers/etcdmachinesnapshot_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ func (r *EtcdMachineSnapshotReconciler) Reconcile(ctx context.Context, req ctrl.
return ctrl.Result{}, err
}

if !etcdMachineSnapshot.Spec.Manual {
log.V(5).Info("Skipping snapshot creation for non-manual EtcdMachineSnapshot")
return ctrl.Result{}, nil
}

// Initialize the patch helper.
patchHelper, err := patch.NewHelper(etcdMachineSnapshot, r.Client)
if err != nil {
Expand Down Expand Up @@ -241,19 +246,19 @@ func checkSnapshotStatus(ctx context.Context, r *EtcdMachineSnapshotReconciler,
// validateETCDSnapshotFile validates the fields of an ETCDSnapshotFile resource.
func validateETCDSnapshotFile(snapshotFile k3sv1.ETCDSnapshotFile) error {
if snapshotFile.Spec.SnapshotName == "" {
return fmt.Errorf("SnapshotName is empty for etcdsnapshotfile %s", snapshotFile.Name)
return fmt.Errorf("snapshotName is empty for etcdsnapshotfile %s", snapshotFile.Name)
}

if snapshotFile.Spec.Location == "" {
return fmt.Errorf("Location is empty for etcdsnapshotfile %s", snapshotFile.Name)
return fmt.Errorf("location is empty for etcdsnapshotfile %s", snapshotFile.Name)
}

if snapshotFile.Spec.NodeName == "" {
return fmt.Errorf("Node name is empty for etcdsnapshotfile %s", snapshotFile.Name)
return fmt.Errorf("node name is empty for etcdsnapshotfile %s", snapshotFile.Name)
}

if snapshotFile.Status.ReadyToUse == nil {
return fmt.Errorf("ReadyToUse field is nil for etcdsnapshotfile %s", snapshotFile.Name)
return fmt.Errorf("readyToUse field is nil for etcdsnapshotfile %s", snapshotFile.Name)
}

return nil
Expand Down
118 changes: 114 additions & 4 deletions exp/etcdrestore/controllers/etcdsnaphotsync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,32 @@ package controllers

import (
"context"
"errors"
"fmt"
"time"

"github.com/rancher/turtles/exp/etcdrestore/controllers/snapshotters"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/remote"
capiutil "sigs.k8s.io/cluster-api/util"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/remote"
const (
RKE2ControlPlaneKind = "RKE2ControlPlane"
)

// EtcdSnapshotSyncReconciler reconciles a EtcdSnapshotSync object.
type EtcdSnapshotSyncReconciler struct {
Client client.Client
client.Client
WatchFilterValue string

controller controller.Controller
Expand All @@ -51,6 +64,103 @@ func (r *EtcdSnapshotSyncReconciler) SetupWithManager(_ context.Context, mgr ctr
return nil
}

func (r *EtcdSnapshotSyncReconciler) Reconcile(_ context.Context, _ ctrl.Request) (res ctrl.Result, reterr error) {
func (r *EtcdSnapshotSyncReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, reterr error) {
log := log.FromContext(ctx)

log.Info("Reconciling CAPI cluster and syncing etcd snapshots")

cluster := &clusterv1.Cluster{}
if err := r.Get(ctx, req.NamespacedName, cluster); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{Requeue: true}, nil
}

return ctrl.Result{Requeue: true}, err
}

if cluster.Spec.Paused {
log.Info("Cluster is paused, skipping reconciliation")
return ctrl.Result{}, nil
}

// Only reconcile RKE2 clusters
if cluster.Spec.ControlPlaneRef.Kind != RKE2ControlPlaneKind { // TODO: Move to predicate
log.Info("Cluster is not an RKE2 cluster, skipping reconciliation")
return ctrl.Result{RequeueAfter: 3 * time.Minute}, nil
}

if !cluster.Status.ControlPlaneReady {
log.Info("Control plane is not ready, skipping reconciliation")
return ctrl.Result{RequeueAfter: 3 * time.Minute}, nil
}

if err := r.watchEtcdSnapshotFiles(ctx, cluster); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to start watch on ETCDSnapshotFile: %w", err)
}

return r.reconcileNormal(ctx, cluster)
}

func (r *EtcdSnapshotSyncReconciler) reconcileNormal(ctx context.Context, cluster *clusterv1.Cluster) (ctrl.Result, error) {
remoteClient, err := r.Tracker.GetClient(ctx, capiutil.ObjectKey(cluster))
if err != nil {
return ctrl.Result{}, err
}

var snapshotter snapshotters.Snapshotter

switch cluster.Spec.ControlPlaneRef.Kind {
case RKE2ControlPlaneKind:
snapshotter = snapshotters.NewRKE2Snapshotter(r.Client, remoteClient, cluster)
default:
return ctrl.Result{}, fmt.Errorf("unsupported control plane kind: %s", cluster.Spec.ControlPlaneRef.Kind)
}

if err := snapshotter.Sync(ctx); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to sync etcd snapshots: %w", err)
}

return ctrl.Result{}, nil
}

func (r *EtcdSnapshotSyncReconciler) watchEtcdSnapshotFiles(ctx context.Context, cluster *clusterv1.Cluster) error {
log := ctrl.LoggerFrom(ctx)

log.V(5).Info("Setting up watch on ETCDSnapshotFile")

etcdnapshotFile := &unstructured.Unstructured{}
etcdnapshotFile.SetGroupVersionKind(schema.GroupVersionKind{
Group: "k3s.cattle.io",
Kind: "ETCDSnapshotFile",
Version: "v1",
})

return r.Tracker.Watch(ctx, remote.WatchInput{
Name: "ETCDSnapshotFiles-watcher",
Cluster: capiutil.ObjectKey(cluster),
Watcher: r.controller,
Kind: etcdnapshotFile,
EventHandler: handler.EnqueueRequestsFromMapFunc(r.etcdSnapshotFile(ctx, cluster)),
})
}

func (r *EtcdSnapshotSyncReconciler) etcdSnapshotFile(ctx context.Context, cluster *clusterv1.Cluster) handler.MapFunc {
log := log.FromContext(ctx)

return func(_ context.Context, o client.Object) []ctrl.Request {
log.Info("Cluster name", "name", cluster.GetName())

gvk := schema.GroupVersionKind{
Group: "k3s.cattle.io",
Kind: "ETCDSnapshotFile",
Version: "v1",
}

if o.GetObjectKind().GroupVersionKind() != gvk {
log.Error(errors.New("got a different object"), "objectGVK", o.GetObjectKind().GroupVersionKind())
return nil
}

return []reconcile.Request{{NamespacedName: capiutil.ObjectKey(cluster)}}
}
}
167 changes: 167 additions & 0 deletions exp/etcdrestore/controllers/snapshotters/rke2snapshotter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
Copyright © 2023 - 2024 SUSE LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package snapshotters

import (
"context"
"fmt"

k3sv1 "github.com/rancher/turtles/exp/etcdrestore/api/k3s/v1"
snapshotrestorev1 "github.com/rancher/turtles/exp/etcdrestore/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

type RKE2Snapshotter struct {
client.Client
remoteClient client.Client
cluster *clusterv1.Cluster
}

func NewRKE2Snapshotter(client client.Client, remoteClient client.Client, cluster *clusterv1.Cluster) *RKE2Snapshotter {
return &RKE2Snapshotter{
Client: client,
remoteClient: remoteClient,
cluster: cluster,
}
}

func (s *RKE2Snapshotter) Sync(ctx context.Context) error {
log := log.FromContext(ctx)

etcdnapshotFileList := &k3sv1.ETCDSnapshotFileList{}

if err := s.remoteClient.List(ctx, etcdnapshotFileList); err != nil {
return fmt.Errorf("failed to list etcd snapshot files: %w", err)
}

for _, snapshotFile := range etcdnapshotFileList.Items {
log.V(5).Info("Found etcd snapshot file", "name", snapshotFile.GetName())

readyToUse := *snapshotFile.Status.ReadyToUse
if !readyToUse {
log.V(5).Info("Snapshot is not ready to use, skipping")
continue
}

machineName, err := s.findMachineForSnapshot(ctx, snapshotFile.Spec.NodeName)
if err != nil {
return fmt.Errorf("failed to find machine for backup: %w", err)
}

if machineName == "" {
log.V(5).Info("Machine not found for backup, skipping. Will try again later.")
continue
}

rke2EtcdMachineSnapshotConfig := &snapshotrestorev1.RKE2EtcdMachineSnapshotConfig{
ObjectMeta: metav1.ObjectMeta{
Name: snapshotFile.Name,
Namespace: s.cluster.Namespace,
},
}

if snapshotFile.Spec.S3 != nil {
s3EndpointCASecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: snapshotFile.Name + "-s3-endpoint-ca",
Namespace: s.cluster.Namespace,
},
StringData: map[string]string{
"ca.crt": snapshotFile.Spec.S3.EndpointCA,
},
}

if err := s.Create(ctx, s3EndpointCASecret); err != nil {
if apierrors.IsAlreadyExists(err) {
log.V(5).Info("S3 endpoint CA secret already exists")
} else {
return fmt.Errorf("failed to create S3 endpoint CA secret: %w", err)
}
}

rke2EtcdMachineSnapshotConfig.Spec.S3 = snapshotrestorev1.S3Config{
Endpoint: snapshotFile.Spec.S3.Endpoint,
EndpointCASecret: &corev1.LocalObjectReference{
Name: s3EndpointCASecret.Name,
},
SkipSSLVerify: snapshotFile.Spec.S3.SkipSSLVerify,
Bucket: snapshotFile.Spec.S3.Bucket,
Region: snapshotFile.Spec.S3.Region,
Insecure: snapshotFile.Spec.S3.Insecure,
Location: snapshotFile.Spec.Location,
}
} else {
rke2EtcdMachineSnapshotConfig.Spec.Local = snapshotrestorev1.LocalConfig{
DataDir: snapshotFile.Spec.Location,
}
}

if err := s.Create(ctx, rke2EtcdMachineSnapshotConfig); err != nil {
if apierrors.IsAlreadyExists(err) {
log.V(5).Info("RKE2EtcdMachineSnapshotConfig already exists")
} else {
return fmt.Errorf("failed to create RKE2EtcdMachineSnapshotConfig: %w", err)
}
}

etcdMachineSnapshot := &snapshotrestorev1.EtcdMachineSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: snapshotFile.Name,
Namespace: s.cluster.Namespace,
},
Spec: snapshotrestorev1.EtcdMachineSnapshotSpec{
ClusterName: s.cluster.Name,
MachineName: machineName,
ConfigRef: corev1.LocalObjectReference{
Name: snapshotFile.Name,
},
},
}

if err := s.Create(ctx, etcdMachineSnapshot); err != nil {
if apierrors.IsAlreadyExists(err) {
log.V(5).Info("EtcdMachineSnapshot already exists")
} else {
return fmt.Errorf("failed to create EtcdMachineSnapshot: %w", err)
}
}
}

return nil
}

func (s *RKE2Snapshotter) findMachineForSnapshot(ctx context.Context, nodeName string) (string, error) {
machineList := &clusterv1.MachineList{}
if err := s.List(ctx, machineList, client.InNamespace(s.cluster.Namespace)); err != nil {
return "", fmt.Errorf("failed to list machines: %w", err)
}

for _, machine := range machineList.Items {
if machine.Spec.ClusterName == s.cluster.Name {
if machine.Status.NodeRef != nil && machine.Status.NodeRef.Name == nodeName {
return machine.Name, nil
}
}
}

return "", nil
}
25 changes: 25 additions & 0 deletions exp/etcdrestore/controllers/snapshotters/snapshotter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Copyright © 2023 - 2024 SUSE LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package snapshotters

import (
"context"
)

type Snapshotter interface {
Sync(ctx context.Context) error
}
4 changes: 4 additions & 0 deletions exp/etcdrestore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"time"

bootstrapv1 "github.com/rancher/cluster-api-provider-rke2/bootstrap/api/v1beta1"
managementv3 "github.com/rancher/turtles/api/rancher/management/v3"
provisioningv1 "github.com/rancher/turtles/api/rancher/provisioning/v1"
k3sv1 "github.com/rancher/turtles/exp/etcdrestore/api/k3s/v1"
snapshotrestorev1 "github.com/rancher/turtles/exp/etcdrestore/api/v1alpha1"
expcontrollers "github.com/rancher/turtles/exp/etcdrestore/controllers"
Expand Down Expand Up @@ -77,6 +79,8 @@ func init() {
utilruntime.Must(snapshotrestorev1.AddToScheme(scheme))
utilruntime.Must(bootstrapv1.AddToScheme(scheme))
utilruntime.Must(k3sv1.AddToScheme(scheme))
utilruntime.Must(provisioningv1.AddToScheme(scheme))
utilruntime.Must(managementv3.AddToScheme(scheme))
}

// initFlags initializes the flags.
Expand Down
4 changes: 2 additions & 2 deletions exp/etcdrestore/webhooks/rke2config.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ func (r *RKE2ConfigWebhook) Default(ctx context.Context, obj runtime.Object) err
systemAgentVersionSetting := &managementv3.Setting{}
if err := r.Get(context.Background(), client.ObjectKey{
Name: "system-agent-version",
}, caSetting); err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("failed to get ca setting: %s", err))
}, systemAgentVersionSetting); err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("failed to get system agent version setting: %s", err))
}

systemAgentVersion := systemAgentVersionSetting.Value
Expand Down

0 comments on commit 01db8b0

Please sign in to comment.