Skip to content

Commit

Permalink
chore: syntetic Integration ownership
Browse files Browse the repository at this point in the history
  • Loading branch information
squakez committed Jan 9, 2024
1 parent bc63675 commit 2bbe92b
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 122 deletions.
6 changes: 3 additions & 3 deletions docs/modules/ROOT/pages/running/import.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ The operator immediately creates a synthetic Integration:
```
$ kubectl get it
NAMESPACE NAME PHASE RUNTIME PROVIDER RUNTIME VERSION KIT REPLICAS
test-79c385c3-d58e-4c28-826d-b14b6245f908 my-it Cannot Monitor Pods
test-79c385c3-d58e-4c28-826d-b14b6245f908 my-it Running
```
You can see it will be in `Cannot Monitor Pods` status phase. This is expected because the way Camel K operator monitor Pods. It requires that the same label applied to the Deployment is inherited by the generated Pods. For this reason, beside labelling the Deployment, we need to add a label in the Deployment template.
You can see it will be in `Running` status phase. However, checking the conditions you will be able to see that the Integration is not yet able to be fully monitored. This is expected because the way Camel K operator monitor Pods. It requires that the same label applied to the Deployment is inherited by the generated Pods. For this reason, beside labelling the Deployment, we need to add a label in the Deployment template.
```
$ kubectl patch deployment my-camel-sb-svc --patch '{"spec": {"template": {"metadata": {"labels": {"camel.apache.org/integration": "my-it"}}}}}'
```
Also this operator can be performed manually or automated in the deployment procedure. We can see now that the operator will be able to monitor accordingly the status of the Pods:
Also this operation can be performed manually or automated in the deployment procedure. We can see now that the operator will be able to monitor accordingly the status of the Pods:
```
$ kubectl get it
NAMESPACE NAME PHASE RUNTIME PROVIDER RUNTIME VERSION KIT REPLICAS
Expand Down
33 changes: 26 additions & 7 deletions e2e/commonwithcustominstall/synthetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,26 @@ import (
corev1 "k8s.io/api/core/v1"
)

func TestSyntheticIntegrationOff(t *testing.T) {
RegisterTestingT(t)
WithNewTestNamespace(t, func(ns string) {
// Install Camel K without synthetic Integration feature variable (default)
operatorID := "camel-k-synthetic-env-off"
Expect(KamelInstallWithID(operatorID, ns).Execute()).To(Succeed())

// Run the external deployment
ExpectExecSucceed(t, Kubectl("apply", "-f", "files/deploy.yaml", "-n", ns))
Eventually(DeploymentCondition(ns, "my-camel-sb-svc", appsv1.DeploymentProgressing), TestTimeoutShort).
Should(MatchFields(IgnoreExtras, Fields{
"Status": Equal(corev1.ConditionTrue),
"Reason": Equal("NewReplicaSetAvailable"),
}))

// Label the deployment --> Verify the Integration is not created
ExpectExecSucceed(t, Kubectl("label", "deploy", "my-camel-sb-svc", "camel.apache.org/integration=my-it", "-n", ns))
Eventually(Integration(ns, "my-it"), TestTimeoutShort).Should(BeNil())
})
}
func TestSyntheticIntegrationFromDeployment(t *testing.T) {
RegisterTestingT(t)
WithNewTestNamespace(t, func(ns string) {
Expand All @@ -53,8 +73,10 @@ func TestSyntheticIntegrationFromDeployment(t *testing.T) {

// Label the deployment --> Verify the Integration is created (cannot still monitor)
ExpectExecSucceed(t, Kubectl("label", "deploy", "my-camel-sb-svc", "camel.apache.org/integration=my-it", "-n", ns))
Eventually(IntegrationPhase(ns, "my-it"), TestTimeoutShort).Should(Equal(v1.IntegrationPhaseCannotMonitor))
Eventually(IntegrationConditionStatus(ns, "my-it", v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionTrue))
Eventually(IntegrationPhase(ns, "my-it"), TestTimeoutShort).Should(Equal(v1.IntegrationPhaseRunning))
Eventually(IntegrationConditionStatus(ns, "my-it", v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionFalse))
Eventually(IntegrationCondition(ns, "my-it", v1.IntegrationConditionReady), TestTimeoutShort).Should(
WithTransform(IntegrationConditionReason, Equal(v1.IntegrationConditionMonitoringPodsAvailableReason)))

// Label the deployment template --> Verify the Integration is monitored
ExpectExecSucceed(t, Kubectl("patch", "deployment", "my-camel-sb-svc", "--patch", `{"spec": {"template": {"metadata": {"labels": {"camel.apache.org/integration": "my-it"}}}}}`, "-n", ns))
Expand All @@ -63,12 +85,9 @@ func TestSyntheticIntegrationFromDeployment(t *testing.T) {
one := int32(1)
Eventually(IntegrationStatusReplicas(ns, "my-it"), TestTimeoutShort).Should(Equal(&one))

// Delete the deployment --> Verify the Integration is in missing status
// Delete the deployment --> Verify the Integration is eventually garbage collected
ExpectExecSucceed(t, Kubectl("delete", "deploy", "my-camel-sb-svc", "-n", ns))
Eventually(IntegrationPhase(ns, "my-it"), TestTimeoutShort).Should(Equal(v1.IntegrationPhaseImportMissing))
Eventually(IntegrationConditionStatus(ns, "my-it", v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionFalse))
zero := int32(0)
Eventually(IntegrationStatusReplicas(ns, "my-it"), TestTimeoutShort).Should(Equal(&zero))
Eventually(Integration(ns, "my-it"), TestTimeoutShort).Should(BeNil())

// Recreate the deployment and label --> Verify the Integration is monitored
ExpectExecSucceed(t, Kubectl("apply", "-f", "files/deploy.yaml", "-n", ns))
Expand Down
6 changes: 0 additions & 6 deletions pkg/apis/camel/v1/integration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,6 @@ const (
IntegrationPhaseRunning IntegrationPhase = "Running"
// IntegrationPhaseError --.
IntegrationPhaseError IntegrationPhase = "Error"
// IntegrationPhaseImportMissing used when the application from which the Integration is imported has been deleted.
IntegrationPhaseImportMissing IntegrationPhase = "Application Missing"
// IntegrationPhaseCannotMonitor used when the application from which the Integration has not enough information to monitor its pods.
IntegrationPhaseCannotMonitor IntegrationPhase = "Cannot Monitor Pods"

// IntegrationConditionReady --.
IntegrationConditionReady IntegrationConditionType = "Ready"
Expand Down Expand Up @@ -186,8 +182,6 @@ const (
IntegrationConditionProbesAvailable IntegrationConditionType = "ProbesAvailable"
// IntegrationConditionTraitInfo --.
IntegrationConditionTraitInfo IntegrationConditionType = "TraitInfo"
// IntegrationConditionMonitoringPodsAvailable used to specify that the Pods generated are available for monitoring.
IntegrationConditionMonitoringPodsAvailable IntegrationConditionType = "MonitoringPodsAvailable"

// IntegrationConditionKitAvailableReason --.
IntegrationConditionKitAvailableReason string = "IntegrationKitAvailable"
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID
synthEnvVal, synth := os.LookupEnv("CAMEL_K_SYNTHETIC_INTEGRATIONS")
if synth && synthEnvVal == "true" {
log.Info("Starting the synthetic Integration manager")
exitOnError(synthetic.ManageSyntheticIntegrations(ctx, ctrlClient, mgr.GetCache(), mgr.GetAPIReader()), "synthetic Integration manager error")
exitOnError(synthetic.ManageSyntheticIntegrations(ctx, ctrlClient, mgr.GetCache()), "synthetic Integration manager error")
} else {
log.Info("Synthetic Integration manager not configured, skipping")
}
Expand Down
25 changes: 19 additions & 6 deletions pkg/controller/integration/integration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,15 +415,28 @@ func watchCronJobResources(b *builder.Builder) {
}

func watchKnativeResources(ctx context.Context, c client.Client, b *builder.Builder) error {
// Check for permission to watch the Knative Service resource
checkCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
if ok, err := kubernetes.CheckPermission(checkCtx, c, serving.GroupName, "services", platform.GetOperatorWatchNamespace(), "", "watch"); err != nil {
// Watch for the owned Knative Services conditionally
if ok, err := kubernetes.IsAPIResourceInstalled(c, servingv1.SchemeGroupVersion.String(), reflect.TypeOf(servingv1.Service{}).Name()); err != nil {
return err
} else if ok {
// Watch for the owned Knative Services
b.Owns(&servingv1.Service{}, builder.WithPredicates(StatusChangedPredicate{}))
// Check for permission to watch the Knative Service resource
checkCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
if ok, err = kubernetes.CheckPermission(checkCtx, c, serving.GroupName, "services", platform.GetOperatorWatchNamespace(), "", "watch"); err != nil {
return err
} else if ok {
log.Info("KnativeService resources installed in the cluster. RBAC privileges assigned correctly, you can use Knative features.")
b.Owns(&servingv1.Service{}, builder.WithPredicates(StatusChangedPredicate{}))
} else {
log.Info(` KnativeService resources installed in the cluster. However Camel K operator has not the required RBAC privileges. You can't use Knative features.
Make sure to apply the required RBAC privileges and restart the Camel K Operator Pod to be able to watch for Camel K managed Knative Services.`)
}
} else {
log.Info(`KnativeService resources are not installed in the cluster. You can't use Knative features. If you install Knative Serving resources after the
Camel K operator, make sure to apply the required RBAC privileges and restart the Camel K Operator Pod to be able to watch for
Camel K managed Knative Services.`)
}

return nil
}

Expand Down
19 changes: 2 additions & 17 deletions pkg/controller/integration/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ func (action *monitorAction) Name() string {
func (action *monitorAction) CanHandle(integration *v1.Integration) bool {
return integration.Status.Phase == v1.IntegrationPhaseDeploying ||
integration.Status.Phase == v1.IntegrationPhaseRunning ||
integration.Status.Phase == v1.IntegrationPhaseError ||
integration.Status.Phase == v1.IntegrationPhaseCannotMonitor
integration.Status.Phase == v1.IntegrationPhaseError
}

func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) {
Expand Down Expand Up @@ -142,10 +141,9 @@ func (action *monitorAction) monitorPods(ctx context.Context, environment *trait
if !controller.hasTemplateIntegrationLabel() {
// This is happening when the Deployment, CronJob, etc resources
// miss the Integration label, required to identify sibling Pods.
integration.Status.Phase = v1.IntegrationPhaseCannotMonitor
integration.Status.SetConditions(
v1.IntegrationCondition{
Type: v1.IntegrationConditionMonitoringPodsAvailable,
Type: v1.IntegrationConditionReady,
Status: corev1.ConditionFalse,
Reason: v1.IntegrationConditionMonitoringPodsAvailableReason,
Message: fmt.Sprintf(
Expand All @@ -158,13 +156,6 @@ func (action *monitorAction) monitorPods(ctx context.Context, environment *trait
return integration, nil
}

integration.Status.SetConditions(
v1.IntegrationCondition{
Type: v1.IntegrationConditionMonitoringPodsAvailable,
Status: corev1.ConditionTrue,
Reason: v1.IntegrationConditionMonitoringPodsAvailableReason,
},
)
// Enforce the scale sub-resource label selector.
// It is used by the HPA that queries the scale sub-resource endpoint,
// to list the pods owned by the integration.
Expand Down Expand Up @@ -296,8 +287,6 @@ type controller interface {
checkReadyCondition(ctx context.Context) (bool, error)
getPodSpec() corev1.PodSpec
updateReadyCondition(readyPods int) bool
getSelector() metav1.LabelSelector
isEmptySelector() bool
hasTemplateIntegrationLabel() bool
getControllerName() string
}
Expand Down Expand Up @@ -359,10 +348,6 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(
ctx context.Context, controller controller, environment *trait.Environment, integration *v1.Integration,
pendingPods []corev1.Pod, runningPods []corev1.Pod,
) error {
controller, err := action.newController(environment, integration)
if err != nil {
return err
}
if done, err := controller.checkReadyCondition(ctx); done || err != nil {
// There may be pods that are not ready but still probable for getting error messages.
// Ignore returned error from probing as it's expected when the ctrl obj is not ready.
Expand Down
15 changes: 2 additions & 13 deletions pkg/controller/integration/monitor_synthetic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package integration

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"

Expand All @@ -46,18 +45,8 @@ func (action *monitorSyntheticAction) Handle(ctx context.Context, integration *v
if err != nil {
// Importing application no longer available
if k8serrors.IsNotFound(err) {
// It could be a normal condition, don't report as an error
integration.Status.Phase = v1.IntegrationPhaseImportMissing
message := fmt.Sprintf(
"import %s %s no longer available",
integration.Annotations[v1.IntegrationImportedKindLabel],
integration.Annotations[v1.IntegrationImportedNameLabel],
)
integration.SetReadyConditionError(message)
zero := int32(0)
integration.Status.Phase = v1.IntegrationPhaseImportMissing
integration.Status.Replicas = &zero
return integration, nil
// Application was deleted. The GC will take care of
return nil, nil
}
// other reasons, likely some error to report
integration.Status.Phase = v1.IntegrationPhaseError
Expand Down
18 changes: 3 additions & 15 deletions pkg/controller/integration/monitor_synthetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,10 @@ func TestMonitorSyntheticIntegrationCannotMonitorPods(t *testing.T) {
assert.True(t, a.CanHandle(importedIt))
handledIt, err := a.Handle(context.TODO(), importedIt)
assert.Nil(t, err)
assert.Equal(t, v1.IntegrationPhaseCannotMonitor, handledIt.Status.Phase)
// Ready condition should be still true
assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Status)
assert.Equal(t, corev1.ConditionFalse, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Status)
// Check monitoring pods condition
assert.Equal(t, corev1.ConditionFalse, handledIt.Status.GetCondition(v1.IntegrationConditionMonitoringPodsAvailable).Status)
assert.Equal(t, v1.IntegrationConditionMonitoringPodsAvailableReason, handledIt.Status.GetCondition(v1.IntegrationConditionMonitoringPodsAvailable).Reason)
assert.Equal(t, "Could not find `camel.apache.org/integration: my-imported-it` label in the Deployment/my-deploy template. Make sure to include this label in the template for Pod monitoring purposes.", handledIt.Status.GetCondition(v1.IntegrationConditionMonitoringPodsAvailable).Message)
assert.Equal(t, v1.IntegrationConditionMonitoringPodsAvailableReason, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Reason)
assert.Equal(t, "Could not find `camel.apache.org/integration: my-imported-it` label in the Deployment/my-deploy template. Make sure to include this label in the template for Pod monitoring purposes.", handledIt.Status.GetCondition(v1.IntegrationConditionReady).Message)
}

func TestMonitorSyntheticIntegrationDeployment(t *testing.T) {
Expand Down Expand Up @@ -246,9 +243,6 @@ func TestMonitorSyntheticIntegrationDeployment(t *testing.T) {
assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Status)
assert.Equal(t, v1.IntegrationConditionDeploymentReadyReason, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Reason)
assert.Equal(t, "1/1 ready replicas", handledIt.Status.GetCondition(v1.IntegrationConditionReady).Message)
// Check monitoring pods condition
assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionMonitoringPodsAvailable).Status)
assert.Equal(t, v1.IntegrationConditionMonitoringPodsAvailableReason, handledIt.Status.GetCondition(v1.IntegrationConditionMonitoringPodsAvailable).Reason)

// Remove label from deployment
deploy.Labels = nil
Expand Down Expand Up @@ -369,9 +363,6 @@ func TestMonitorSyntheticIntegrationCronJob(t *testing.T) {
assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Status)
assert.Equal(t, v1.IntegrationConditionCronJobCreatedReason, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Reason)
assert.Equal(t, "cronjob created", handledIt.Status.GetCondition(v1.IntegrationConditionReady).Message)
// Check monitoring pods condition
assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionMonitoringPodsAvailable).Status)
assert.Equal(t, v1.IntegrationConditionMonitoringPodsAvailableReason, handledIt.Status.GetCondition(v1.IntegrationConditionMonitoringPodsAvailable).Reason)
}

func TestMonitorSyntheticIntegrationKnativeService(t *testing.T) {
Expand Down Expand Up @@ -492,7 +483,4 @@ func TestMonitorSyntheticIntegrationKnativeService(t *testing.T) {
// Ready condition
assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Status)
assert.Equal(t, v1.IntegrationConditionKnativeServiceReadyReason, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Reason)
// Check monitoring pods condition
assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionMonitoringPodsAvailable).Status)
assert.Equal(t, v1.IntegrationConditionMonitoringPodsAvailableReason, handledIt.Status.GetCondition(v1.IntegrationConditionMonitoringPodsAvailable).Reason)
}
Loading

0 comments on commit 2bbe92b

Please sign in to comment.