Skip to content

Commit

Permalink
fix: fix can not start new pod error
Browse files Browse the repository at this point in the history
cancel sync EMQX config from EMQX API

fix #983

Signed-off-by: Rory Z <16801068+Rory-Z@users.noreply.github.com>
  • Loading branch information
Rory-Z committed Jan 4, 2024
1 parent 7cbdb12 commit f90a18a
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 67 deletions.
5 changes: 0 additions & 5 deletions apis/apps/v2beta1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ const (
LabelsPodTemplateHashKey string = "apps.emqx.io/pod-template-hash"
)

const (
// annotations
AnnotationsLastEMQXConfigKey string = "apps.emqx.io/last-emqx-configuration"
)

const (
// https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-readiness-gate
PodOnServing corev1.PodConditionType = "apps.emqx.io/on-serving"
Expand Down
89 changes: 30 additions & 59 deletions controllers/apps/v2beta1/sync_emqx_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v2beta1

import (
"context"
"fmt"
"net/http"
"strings"

Expand All @@ -10,6 +11,7 @@ import (
innerReq "github.com/emqx/emqx-operator/internal/requester"
"github.com/rory-z/go-hocon"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -19,41 +21,37 @@ type syncConfig struct {
}

func (s *syncConfig) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, r innerReq.RequesterInterface) subResult {
hoconConfig, _ := hocon.ParseString(instance.Spec.Config.Data)

// If core nodes is nil, the EMQX is in the process of being created
if len(instance.Status.CoreNodes) == 0 {
configMap := generateConfigMap(instance, instance.Spec.Config.Data)
if err := s.Handler.CreateOrUpdateList(instance, s.Scheme, []client.Object{configMap}); err != nil {
return subResult{err: emperror.Wrap(err, "failed to create or update configMap")}
defaultListenerConfig := ""
defaultListenerConfig += fmt.Sprintln("listeners.tcp.default.bind = 1883")
defaultListenerConfig += fmt.Sprintln("listeners.ssl.default.bind = 8883")
defaultListenerConfig += fmt.Sprintln("listeners.ws.default.bind = 8083")
defaultListenerConfig += fmt.Sprintln("listeners.wss.default.bind = 8084")

hoconConfig, _ := hocon.ParseString(defaultListenerConfig + instance.Spec.Config.Data)
configMap := generateConfigMap(instance, hoconConfig.String())

storageConfigMap := &corev1.ConfigMap{}
if err := s.Client.Get(ctx, client.ObjectKeyFromObject(configMap), storageConfigMap); err != nil {
if k8sErrors.IsNotFound(err) {
if err := s.Handler.Create(configMap); err != nil {
return subResult{err: emperror.Wrap(err, "failed to create configMap")}
}
return subResult{}
}
return subResult{}
return subResult{err: emperror.Wrap(err, "failed to get configMap")}
}

lastConfig, ok := instance.Annotations[appsv2beta1.AnnotationsLastEMQXConfigKey]
if !ok {
// If it is the first time to start and Mode = Replace, update the EMQX configuration once.
if instance.Spec.Config.Mode == "Replace" {
// Delete readonly configs
hoconConfigObj := hoconConfig.GetRoot().(hocon.Object)
delete(hoconConfigObj, "node")
delete(hoconConfigObj, "cluster")
delete(hoconConfigObj, "dashboard")
patchResult, _ := s.Patcher.Calculate(
storageConfigMap.DeepCopy(),
configMap.DeepCopy(),
)

if err := putEMQXConfigsByAPI(r, instance.Spec.Config.Mode, hoconConfigObj.String()); err != nil {
return subResult{err: emperror.Wrap(err, "failed to put emqx config")}
}
}
if instance.Annotations == nil {
instance.Annotations = map[string]string{}
}
instance.Annotations[appsv2beta1.AnnotationsLastEMQXConfigKey] = instance.Spec.Config.Data
if err := s.Client.Update(ctx, instance); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update emqx instance")}
if !patchResult.IsEmpty() && r != nil {
_, coreReady := instance.Status.GetCondition(appsv2beta1.CoreNodesReady)
if coreReady == nil || !instance.Status.IsConditionTrue(appsv2beta1.CoreNodesReady) {
return subResult{}
}
return subResult{}
}
if ok && instance.Spec.Config.Data != lastConfig {

// Delete readonly configs
hoconConfigObj := hoconConfig.GetRoot().(hocon.Object)
delete(hoconConfigObj, "node")
Expand All @@ -64,21 +62,9 @@ func (s *syncConfig) reconcile(ctx context.Context, instance *appsv2beta1.EMQX,
return subResult{err: emperror.Wrap(err, "failed to put emqx config")}
}

instance.Annotations[appsv2beta1.AnnotationsLastEMQXConfigKey] = instance.Spec.Config.Data
if err := s.Client.Update(ctx, instance); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update emqx instance")}
if err := s.Client.Update(ctx, configMap); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update configMap")}
}
return subResult{}
}

config, err := getEMQXConfigsByAPI(r)
if err != nil {
return subResult{err: emperror.Wrap(err, "failed to get emqx config")}
}

configMap := generateConfigMap(instance, config)
if err := s.Handler.CreateOrUpdateList(instance, s.Scheme, []client.Object{configMap}); err != nil {
return subResult{err: emperror.Wrap(err, "failed to create or update configMap")}
}

return subResult{}
Expand All @@ -101,21 +87,6 @@ func generateConfigMap(instance *appsv2beta1.EMQX, data string) *corev1.ConfigMa
}
}

func getEMQXConfigsByAPI(r innerReq.RequesterInterface) (string, error) {
url := r.GetURL("api/v5/configs")

resp, body, err := r.Request("GET", url, nil, http.Header{
"Accept": []string{"text/plain"},
})
if err != nil {
return "", emperror.Wrapf(err, "failed to get API %s", url.String())
}
if resp.StatusCode != 200 {
return "", emperror.Errorf("failed to get API %s, status : %s, body: %s", url.String(), resp.Status, body)
}
return string(body), nil
}

func putEMQXConfigsByAPI(r innerReq.RequesterInterface, mode, config string) error {
url := r.GetURL("api/v5/configs", "mode="+strings.ToLower(mode))

Expand Down
4 changes: 2 additions & 2 deletions deploy/charts/emqx-operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 2.2.5
version: 2.2.6

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
appVersion: 2.2.5
appVersion: 2.2.6
2 changes: 1 addition & 1 deletion e2e/v2beta1/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
Context("replicant template is not nil", func() {
JustBeforeEach(func() {
Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(instance), instance)).Should(Succeed())

instance.Spec = *emqx.Spec.DeepCopy()
instance.Spec.ReplicantTemplate = &appsv2beta1.EMQXReplicantTemplate{
Spec: appsv2beta1.EMQXReplicantTemplateSpec{
Replicas: pointer.Int32Ptr(2),
Expand Down

0 comments on commit f90a18a

Please sign in to comment.