Skip to content

Commit

Permalink
feat(ds): supports running as a DaemonSet
Browse files Browse the repository at this point in the history
  • Loading branch information
Zippo-Wang committed Apr 25, 2024
1 parent 52aa6a9 commit dd28993
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 198 deletions.
79 changes: 79 additions & 0 deletions manifests/huawei-cloud-controller-manager-daemonset.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: huawei-cloud-controller-manager
namespace: kube-system
labels:
k8s-app: huawei-cloud-controller-manager
spec:
selector:
matchLabels:
k8s-app: huawei-cloud-controller-manager
template:
metadata:
labels:
k8s-app: huawei-cloud-controller-manager
spec:
nodeSelector:
kubernetes.io/os: linux
securityContext:
runAsUser: 1001
tolerations:
- key: node.cloudprovider.kubernetes.io/uninitialized
value: "true"
effect: NoSchedule
- key: node-role.kubernetes.io/master
effect: NoSchedule
- key: node-role.kubernetes.io/control-plane
effect: NoSchedule
serviceAccountName: cloud-controller-manager
containers:
- name: huawei-cloud-controller-manager
image: swr.cn-north-4.myhuaweicloud.com/k8s-cloudprovider/huawei-cloud-controller-manager:v0.26.7
args:
- /bin/huawei-cloud-controller-manager
- --v=5
- --cloud-config=/etc/config/cloud-config
- --cloud-provider=huaweicloud
- --use-service-account-credentials=true
- --node-status-update-frequency=5s
- --node-monitor-period=5s
- --leader-elect-lease-duration=30s
- --leader-elect-renew-deadline=20s
- --leader-elect-retry-period=2s
volumeMounts:
- mountPath: /etc/kubernetes
name: k8s-certs
readOnly: true
- mountPath: /etc/ssl/certs
name: ca-certs
readOnly: true
- mountPath: /etc/config
name: cloud-config-volume
readOnly: true
- mountPath: /usr/libexec/kubernetes/kubelet-plugins/volume/exec
name: flexvolume-dir
resources:
requests:
cpu: 200m
memory: 100Mi
limits:
cpu: 2
memory: 2Gi
hostNetwork: true
volumes:
- hostPath:
path: /usr/libexec/kubernetes/kubelet-plugins/volume/exec
type: DirectoryOrCreate
name: flexvolume-dir
- hostPath:
path: /etc/kubernetes
type: DirectoryOrCreate
name: k8s-certs
- hostPath:
path: /etc/ssl/certs
type: DirectoryOrCreate
name: ca-certs
- name: cloud-config-volume
secret:
secretName: cloud-config
7 changes: 0 additions & 7 deletions pkg/cloudprovider/huaweicloud/dedicatedloadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,13 +720,6 @@ func (d *DedicatedLoadBalancer) ensureHealthCheck(loadbalancerID string, pool *e
monitorID := pool.HealthmonitorId
klog.Infof("add or update or remove health check: %s : %#v", monitorID, healthCheckOpts)

if healthCheckOpts.Enable {
err := d.allowHealthCheckRule(node)
if err != nil {
return err
}
}

// create health monitor
if monitorID == "" && healthCheckOpts.Enable {
_, err := d.createHealthMonitor(loadbalancerID, pool.Id, pool.Protocol, healthCheckOpts)
Expand Down
196 changes: 13 additions & 183 deletions pkg/cloudprovider/huaweicloud/huaweicloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"io"
"os"
"strings"
"sync"
"time"

ecsmodel "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ecs/v2/model"
gocache "github.com/patrickmn/go-cache"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -46,11 +46,6 @@ import (
"k8s.io/cloud-provider/options"
servicehelper "k8s.io/cloud-provider/service/helpers"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"

ecsmodel "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ecs/v2/model"
vpcmodel "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/vpc/v2/model"

"sigs.k8s.io/cloud-provider-huaweicloud/pkg/cloudprovider/huaweicloud/wrapper"
"sigs.k8s.io/cloud-provider-huaweicloud/pkg/common"
"sigs.k8s.io/cloud-provider-huaweicloud/pkg/config"
Expand Down Expand Up @@ -108,17 +103,15 @@ const (
ProtocolHTTPS = "HTTPS"
ProtocolTerminatedHTTPS = "TERMINATED_HTTPS"

healthCheckCidr = "100.125.0.0/16"

endpointAdded = "endpointAdded"
endpointUpdate = "endpointUpdate"

kubeSystemNamespace = "kube-system"
)

type ELBProtocol string
type ELBAlgorithm string

var healthCheckCidrOptLock = &sync.Mutex{}

type Basic struct {
cloudControllerManagerOpts *options.CloudControllerManagerOptions
cloudConfig *config.CloudConfig
Expand Down Expand Up @@ -217,103 +210,6 @@ func (b Basic) getNodeSubnetID(node *v1.Node) (string, error) {
return "", fmt.Errorf("failed to get node subnet ID")
}

func (b Basic) allowHealthCheckRule(node *v1.Node) error {
if b.loadbalancerOpts.DisableCreateSecurityGroup {
klog.Infof("automatic creation of security groups has been disabled")
return nil
}
// Avoid adding security group rules in parallel.
healthCheckCidrOptLock.Lock()
defer func() {
healthCheckCidrOptLock.Unlock()
}()

instance, err := b.ecsClient.GetByNodeName(node.Name)
if err != nil {
return err
}

secGroups, err := b.ecsClient.ListSecurityGroups(instance.Id)
if err != nil {
return err
}
if len(secGroups) == 0 {
klog.Warningf("not found any security groups on %s", node.Name)
return nil
}

for _, sg := range secGroups {
rules, err := b.vpcClient.ListSecurityGroupRules(sg.Id)
if err != nil {
return fmt.Errorf("failed to list security group[%s] rules: %s", sg.Id, err)
}

for _, r := range rules {
if r.Direction == "ingress" && r.RemoteIpPrefix == healthCheckCidr &&
r.Ethertype == "IPv4" && r.PortRangeMin == 0 && r.PortRangeMax == 0 {
klog.Infof("the health check IP is already in the security group, no need to add rules")
return nil
}
}
}

desc := fmt.Sprintf("DO NOT EDIT. %s are internal IP addresses used by ELB to check the health of backend"+
" servers. Created by K8s CCM.", healthCheckCidr)

securityGroupID := secGroups[0].Id
_, err = b.vpcClient.CreateSecurityGroupRule(&vpcmodel.CreateSecurityGroupRuleOption{
SecurityGroupId: securityGroupID,
Description: &desc,
Direction: "ingress",
Ethertype: pointer.String("IPv4"),
RemoteIpPrefix: pointer.String(healthCheckCidr),
})

if err != nil {
return fmt.Errorf("failed to create security group[%s] rules: %s", securityGroupID, err)
}

return err
}

func (b Basic) removeHealthCheckRules(node *v1.Node) error {
instance, err := b.ecsClient.GetByNodeName(node.Name)
if err != nil {
return err
}

secGroups, err := b.ecsClient.ListSecurityGroups(instance.Id)
if err != nil {
return err
}
if len(secGroups) == 0 {
klog.Warningf("not found any security groups on %s", node.Name)
return nil
}

desc := fmt.Sprintf("DO NOT EDIT. %s are internal IP addresses used by ELB to check the health of backend"+
" servers. Created by K8s CCM.", healthCheckCidr)

for _, sg := range secGroups {
rules, err := b.vpcClient.ListSecurityGroupRules(sg.Id)
if err != nil {
return fmt.Errorf("failed to list security group[%s] rules: %s", sg.Id, err)
}

for _, r := range rules {
if r.Direction == "ingress" && r.RemoteIpPrefix == healthCheckCidr &&
r.Ethertype == "IPv4" && r.PortRangeMin == 0 && r.PortRangeMax == 0 && r.Description == desc {
err := b.vpcClient.DeleteSecurityGroupRule(r.Id)
if err != nil {
klog.Errorf("failed to delete security group[%s] rule[%s]", sg.Id, r.Id)
}
}
}
}

return nil
}

func (b Basic) updateService(service *v1.Service, lbStatus *v1.LoadBalancerStatus) {
if service.Spec.LoadBalancerClass == nil || *service.Spec.LoadBalancerClass != LoadBalancerClass {
return
Expand Down Expand Up @@ -702,7 +598,6 @@ type LoadBalancerServiceListener struct {
func (e *LoadBalancerServiceListener) stopListenerSlice() {
klog.Warningf("Stop listening to Endpoints")
e.stopChannel <- struct{}{}
close(e.stopChannel)
}

func (e *LoadBalancerServiceListener) startEndpointListener(handle func(*v1.Service, bool)) {
Expand Down Expand Up @@ -861,73 +756,6 @@ func (e *LoadBalancerServiceListener) dispatcher(namespace, name, eType string,
handle(svc, false)
}

func (e *LoadBalancerServiceListener) autoRemoveHealthCheckRule(handle func(node *v1.Node) error) {
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return e.kubeClient.Services(metav1.NamespaceAll).List(context.Background(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return e.kubeClient.Services(metav1.NamespaceAll).Watch(context.Background(), options)
},
},
&v1.Service{},
0,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

_, err := informer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
service := obj.(*v1.Service)
if service.Spec.Type != v1.ServiceTypeLoadBalancer {
return
}

key := fmt.Sprintf("%s/%s", service.Namespace, service.Name)
e.serviceCache[key] = service
klog.V(6).Infof("new LoadBalancer service %s/%s added, cache size: %v",
service.Namespace, service.Name, len(e.serviceCache))
},
UpdateFunc: func(oldObj, newObj interface{}) {},
DeleteFunc: func(obj interface{}) {
service := obj.(*v1.Service)
if service.Spec.Type != v1.ServiceTypeLoadBalancer {
return
}
key := fmt.Sprintf("%s/%s", service.Namespace, service.Name)
delete(e.serviceCache, key)
klog.V(6).Infof("found LoadBalancer service %s/%s deleted, cache size: %v",
service.Namespace, service.Name, len(e.serviceCache))

if len(e.serviceCache) > 0 {
klog.V(6).Infof("found %v LoadBalancer service(s), "+
"skip clearing the security group rules for ELB health check", len(e.serviceCache))
return
}

nodes, err := e.kubeClient.Nodes().List(context.TODO(), metav1.ListOptions{
Limit: 1,
})
if err != nil {
klog.Errorf("failed to query a list of nodes in autoRemoveHealthCheckRule: %s", err)
}

if len(nodes.Items) <= 0 {
klog.Warningf("not found any nodes, skip clearing the security group rules for ELB health check")
return
}
klog.Infof("all LoadBalancer services has been deleted, start to clean health check rules")
n := nodes.Items[0]
handle(&n) //nolint:errcheck
},
}, 5*time.Second)
if err != nil {
klog.Errorf("failed to watch service: %s", err)
}

informer.Run(e.stopChannel)
}

func (h *CloudProvider) listenerDeploy() error {
listener := LoadBalancerServiceListener{
Basic: h.Basic,
Expand All @@ -946,12 +774,6 @@ func (h *CloudProvider) listenerDeploy() error {
}

go leaderElection(id, h.restConfig, h.eventRecorder, func(ctx context.Context) {
if !h.loadbalancerOpts.DisableCreateSecurityGroup {
go listener.autoRemoveHealthCheckRule(h.removeHealthCheckRules)
} else {
klog.Infof("automatic creation of security groups has been disabled")
}

listener.startEndpointListener(func(service *v1.Service, isDelete bool) {
klog.Infof("Got service %s/%s using loadbalancer class %s",
service.Namespace, service.Name, utils.ToString(service.Spec.LoadBalancerClass))
Expand Down Expand Up @@ -1008,9 +830,10 @@ func (h *CloudProvider) listenerDeploy() error {
service.Namespace, service.Name, err)
})
}, func() {
listener.stopListenerSlice()
listener.goroutinePool.Stop()
listener.stopListenerSlice()
})

return nil
}

Expand All @@ -1021,7 +844,7 @@ func leaderElection(id string, restConfig *rest.Config, recorder record.EventRec
retryPeriod := 30 * time.Second

configmapLock, err := resourcelock.NewFromKubeconfig(resourcelock.ConfigMapsLeasesResourceLock,
"kube-system",
kubeSystemNamespace,
leaseName,
resourcelock.ResourceLockConfig{
Identity: fmt.Sprintf("%s_%s", id, string(uuid.NewUUID())),
Expand All @@ -1047,6 +870,13 @@ func leaderElection(id string, restConfig *rest.Config, recorder record.EventRec
klog.Infof("[Listener EndpointSlices] leader election lost: %s", id)
onStop()
},
OnNewLeader: func(identity string) {
klog.Infof("[Listener EndpointSlices] leader chenged to %s", identity)
if strings.Contains(identity, id) {
return
}
onStop()
},
},
Name: leaseName,
})
Expand Down
7 changes: 0 additions & 7 deletions pkg/cloudprovider/huaweicloud/sharedloadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,6 @@ func (l *SharedLoadBalancer) ensureHealthCheck(loadbalancerID string, pool *elbm
monitorID := pool.HealthmonitorId
klog.Infof("add or update or remove health check: %s : %#v", monitorID, healthCheckOpts)

if healthCheckOpts.Enable {
err := l.allowHealthCheckRule(node)
if err != nil {
return err
}
}

protocolStr := parseProtocol(service, port)
// create health monitor
if monitorID == "" && healthCheckOpts.Enable {
Expand Down
3 changes: 2 additions & 1 deletion pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ limitations under the License.
package common

import (
"k8s.io/klog/v2"
"os"
"os/signal"
"syscall"
"time"

"k8s.io/klog/v2"

"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/sdkerr"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down

0 comments on commit dd28993

Please sign in to comment.