Skip to content

Commit

Permalink
--story=119857476 [bcs-ingress-controller] 修复listener并发读写导致的crash问题 (…
Browse files Browse the repository at this point in the history
…merge request !2041)

Squash merge branch 'porterlin-dev' into 'master'
--story=119857476 [bcs-ingress-controller] 修复listener并发读写导致的crash问题

 



TAPD: --story=119857476
  • Loading branch information
porterlin authored and wessonli committed Sep 25, 2024
1 parent 8abe4f3 commit 81cd8a9
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,9 @@ func (h *EventHandler) deleteMultiListeners(listeners []*networkextensionv1.List
}
return err
}
listener.Finalizers = common.RemoveString(listener.Finalizers, constant.FinalizerNameBcsIngressController)
return h.k8sCli.Update(context.Background(), listener)
cpListener := listener.DeepCopy()
cpListener.Finalizers = common.RemoveString(cpListener.Finalizers, constant.FinalizerNameBcsIngressController)
return h.k8sCli.Update(context.Background(), cpListener)
}); err != nil {
blog.Warnf("failed to remove finalizer from listener %s/%s, err %s",
li.GetNamespace(), li.GetName(), err.Error())
Expand Down Expand Up @@ -536,8 +537,9 @@ func (h *EventHandler) deleteListener(li *networkextensionv1.Listener) error {
}
return err
}
listener.Finalizers = common.RemoveString(listener.Finalizers, constant.FinalizerNameBcsIngressController)
return h.k8sCli.Update(context.Background(), listener)
cpListener := listener.DeepCopy()
cpListener.Finalizers = common.RemoveString(cpListener.Finalizers, constant.FinalizerNameBcsIngressController)
return h.k8sCli.Update(context.Background(), cpListener)
}); err != nil {
blog.Errorf("failed to remove finalizer from listener %s/%s, err %s",
li.GetNamespace(), li.GetName(), err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ func (lc *ListenerBypassReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
if err := lc.Client.Get(context.TODO(), req.NamespacedName, li); err != nil {
return err
}
cpListener := li.DeepCopy()
cpListener.Finalizers = common.RemoveString(cpListener.Finalizers, constant.FinalizerNameUptimeCheck)

li.Finalizers = common.RemoveString(li.Finalizers, constant.FinalizerNameUptimeCheck)

return lc.Client.Update(context.TODO(), li)
return lc.Client.Update(context.TODO(), cpListener)
}); err != nil {
blog.Errorf("remove finalizer for listeners '%s/%s' failed, err: %s", listener.GetNamespace(),
listener.GetNamespace(), err.Error())
Expand Down Expand Up @@ -153,9 +153,10 @@ func (lc *ListenerBypassReconciler) ensureFinalizer(listener *networkextensionv1
return err
}

li.Finalizers = append(li.Finalizers, constant.FinalizerNameUptimeCheck)
cpListener := li.DeepCopy()
cpListener.Finalizers = append(cpListener.Finalizers, constant.FinalizerNameUptimeCheck)

return lc.Client.Update(context.TODO(), li)
return lc.Client.Update(context.TODO(), cpListener)
}); err != nil {
return err
}
Expand All @@ -181,12 +182,13 @@ func (lc *ListenerBypassReconciler) updateListenerStatus(namespacedName k8stypes
}
return err
}
li.Status.UptimeCheckStatus = &networkextensionv1.UptimeCheckTaskStatus{
cpListener := li.DeepCopy()
cpListener.Status.UptimeCheckStatus = &networkextensionv1.UptimeCheckTaskStatus{
ID: taskID,
Status: status,
Msg: msg,
}
if err := lc.Client.Update(context.TODO(), li, &client.UpdateOptions{}); err != nil {
if err := lc.Client.Update(context.TODO(), cpListener, &client.UpdateOptions{}); err != nil {
blog.Errorf("update uptime_check status failed, err: %s", err.Error())
return err
}
Expand All @@ -198,8 +200,10 @@ func (lc *ListenerBypassReconciler) updateListenerStatus(namespacedName k8stypes
func getListenerByPassPredicate() predicate.Predicate {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
newListener, okNew := e.ObjectNew.(*networkextensionv1.Listener)
oldListener, okOld := e.ObjectOld.(*networkextensionv1.Listener)
objNew := e.ObjectNew.DeepCopyObject()
objOld := e.ObjectOld.DeepCopyObject()
newListener, okNew := objNew.(*networkextensionv1.Listener)
oldListener, okOld := objOld.(*networkextensionv1.Listener)
if !okNew || !okOld {
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ import (
func getListenerPredicate() predicate.Predicate {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
newListener, okNew := e.ObjectNew.(*networkextensionv1.Listener)
oldListener, okOld := e.ObjectOld.(*networkextensionv1.Listener)
objectNew := e.ObjectNew.DeepCopyObject()
objectOld := e.ObjectOld.DeepCopyObject()
newListener, okNew := objectNew.(*networkextensionv1.Listener)
oldListener, okOld := objectOld.(*networkextensionv1.Listener)
if !okNew || !okOld {
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,20 @@ func (pbih *portBindingItemHandler) ensureItem(
return err
}

cpListener := li.DeepCopy()
// listener has no targetGroup or ip has changed
li.Spec.ListenerAttribute = portPool.Spec.ListenerAttribute
cpListener.Spec.ListenerAttribute = portPool.Spec.ListenerAttribute
if item.ListenerAttribute != nil {
li.Spec.ListenerAttribute = item.ListenerAttribute
cpListener.Spec.ListenerAttribute = item.ListenerAttribute
}
li.Status.Status = networkextensionv1.ListenerStatusNotSynced
li.Spec.TargetGroup = tmpTargetGroup
if li.Labels == nil {
li.Labels = make(map[string]string)
cpListener.Status.Status = networkextensionv1.ListenerStatusNotSynced
cpListener.Spec.TargetGroup = tmpTargetGroup
if cpListener.Labels == nil {
cpListener.Labels = make(map[string]string)
}
li.Labels[networkextensionv1.LabelKeyForSourceNamespace] = portBinding.GetNamespace()
cpListener.Labels[networkextensionv1.LabelKeyForSourceNamespace] = portBinding.GetNamespace()

if err := pbih.k8sClient.Update(context.Background(), li, &client.UpdateOptions{}); err != nil {
if err := pbih.k8sClient.Update(context.Background(), cpListener, &client.UpdateOptions{}); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -154,21 +155,21 @@ func (pbih *portBindingItemHandler) deleteItem(
return pbih.generateStatus(item, constant.PortBindingItemStatusDeleting)
}
// do not update informer cache directly
listener := rawListener.DeepCopy()
if listener.Spec.TargetGroup == nil || len(listener.Spec.TargetGroup.Backends) == 0 {
if listener.Status.Status == networkextensionv1.ListenerStatusSynced {
cpListener := rawListener.DeepCopy()
if cpListener.Spec.TargetGroup == nil || len(cpListener.Spec.TargetGroup.Backends) == 0 {
if cpListener.Status.Status == networkextensionv1.ListenerStatusSynced {
blog.Infof("listener %s/%s backend cleaned and synced", listenerName, item.PoolNamespace)
continue
}
blog.Warnf("listener %s/%s backend cleaned, but not synced", listenerName, item.PoolNamespace)
return pbih.generateStatus(item, constant.PortBindingItemStatusDeleting)
}
listener.Spec.TargetGroup = nil
listener.Status.Status = networkextensionv1.ListenerStatusNotSynced
if listener.Labels != nil {
delete(listener.Labels, networkextensionv1.LabelKeyForSourceNamespace)
cpListener.Spec.TargetGroup = nil
cpListener.Status.Status = networkextensionv1.ListenerStatusNotSynced
if cpListener.Labels != nil {
delete(cpListener.Labels, networkextensionv1.LabelKeyForSourceNamespace)
}
if err := pbih.k8sClient.Update(context.Background(), listener, &client.UpdateOptions{}); err != nil {
if err := pbih.k8sClient.Update(context.Background(), cpListener, &client.UpdateOptions{}); err != nil {
blog.Warnf("failed to update listener %s/%s, err %s", listenerName, item.PoolNamespace, err.Error())
return pbih.generateStatus(item, constant.PortBindingItemStatusDeleting)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,17 @@ func (ppih *PortPoolItemHandler) ensureListeners(region, lbID string, item *nete
}, li); inErr != nil {
return inErr
}

cpListener := li.DeepCopy()
// 部分旧版本监听器labels不全需要补齐
poolNameLabel := common.GetPortPoolListenerLabelKey(ppih.PortPoolName, item.ItemName)
li.Labels[poolNameLabel] = netextv1.LabelValueForPortPoolItemName
li.Labels[netextv1.LabelKeyForOwnerKind] = constant.KindPortPool
li.Labels[netextv1.LabelKeyForOwnerName] = ppih.PortPoolName
cpListener.Labels[poolNameLabel] = netextv1.LabelValueForPortPoolItemName
cpListener.Labels[netextv1.LabelKeyForOwnerKind] = constant.KindPortPool
cpListener.Labels[netextv1.LabelKeyForOwnerName] = ppih.PortPoolName

li.Spec.Certificate = item.Certificate
li.Spec.ListenerAttribute = ppih.ListenerAttr
if inErr := ppih.K8sClient.Update(context.Background(), li); inErr != nil {
cpListener.Spec.Certificate = item.Certificate
cpListener.Spec.ListenerAttribute = ppih.ListenerAttr
if inErr := ppih.K8sClient.Update(context.Background(), cpListener); inErr != nil {
return inErr
}
return nil
Expand Down

0 comments on commit 81cd8a9

Please sign in to comment.