Skip to content

Commit

Permalink
fix: controller should watch sub resource update
Browse files Browse the repository at this point in the history
Signed-off-by: Abirdcfly <fp544037857@gmail.com>
  • Loading branch information
Abirdcfly committed Jan 31, 2024
1 parent c20b8b5 commit 129ecc9
Show file tree
Hide file tree
Showing 17 changed files with 375 additions and 133 deletions.
11 changes: 11 additions & 0 deletions api/base/v1alpha1/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1alpha1

import (
"fmt"
"sort"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -251,3 +252,13 @@ func (s *ConditionedStatus) ReadyCondition() []Condition {
Message: "Success",
}}
}

func (s *ConditionedStatus) IsReadyOrGetReadyMessage() (isReady bool, msg string) {
if s.IsReady() {
return true, ""
}
for _, cond := range s.Conditions {
msg += fmt.Sprintf("%s:%s", cond.Reason, cond.Message)
}
return false, msg
}
1 change: 1 addition & 0 deletions apiserver/graph/schema/rag.gql
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ query getRAG($name: String!, $namespace: String!){
updateTimestamp
isPublic
status
category
}
prologue
model
Expand Down
26 changes: 26 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,32 @@ rules:
- patch
- update
- watch
- apiGroups:
- arcadia.kubeagi.k8s.com.cn
resources:
- agents
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- arcadia.kubeagi.k8s.com.cn
resources:
- agents/finalizers
verbs:
- update
- apiGroups:
- arcadia.kubeagi.k8s.com.cn
resources:
- agents/status
verbs:
- get
- patch
- update
- apiGroups:
- arcadia.kubeagi.k8s.com.cn
resources:
Expand Down
154 changes: 139 additions & 15 deletions controllers/base/application_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,38 @@ package controllers

import (
"context"
"fmt"
"reflect"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

agentv1alpha1 "github.com/kubeagi/arcadia/api/app-node/agent/v1alpha1"
chainv1alpha1 "github.com/kubeagi/arcadia/api/app-node/chain/v1alpha1"
promptv1alpha1 "github.com/kubeagi/arcadia/api/app-node/prompt/v1alpha1"
retrieveralpha1 "github.com/kubeagi/arcadia/api/app-node/retriever/v1alpha1"
arcadiav1alpha1 "github.com/kubeagi/arcadia/api/base/v1alpha1"
"github.com/kubeagi/arcadia/pkg/appruntime"
)

const (
APIChainIndexKey = "metadata.apichain"
LLMChainIndexKey = "metadata.llmchain"
RetrievalQAChainIndexKey = "metadata.retrievalqachain"
KnowledgebaseIndexKey = "metadata.knowledgebase"
LLMIndexKey = "metadata.llm"
PromptIndexKey = "metadata.prompt"
KnowledgebaseRetrieverIndexKey = "metadata.knowledgebaseretriever"
AgentIndexKey = "metadata.agent"
)

// ApplicationReconciler reconciles an Application object
Expand All @@ -40,6 +61,30 @@ type ApplicationReconciler struct {
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=applications,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=applications/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=applications/finalizers,verbs=update
//+kubebuilder:rbac:groups=chain.arcadia.kubeagi.k8s.com.cn,resources=apichains,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=chain.arcadia.kubeagi.k8s.com.cn,resources=apichains/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=chain.arcadia.kubeagi.k8s.com.cn,resources=apichains/finalizers,verbs=update
//+kubebuilder:rbac:groups=chain.arcadia.kubeagi.k8s.com.cn,resources=llmchains,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=chain.arcadia.kubeagi.k8s.com.cn,resources=llmchains/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=chain.arcadia.kubeagi.k8s.com.cn,resources=llmchains/finalizers,verbs=update
//+kubebuilder:rbac:groups=chain.arcadia.kubeagi.k8s.com.cn,resources=retrievalqachains,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=chain.arcadia.kubeagi.k8s.com.cn,resources=retrievalqachains/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=chain.arcadia.kubeagi.k8s.com.cn,resources=retrievalqachains/finalizers,verbs=update
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=knowledgebases,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=knowledgebases/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=knowledgebases/finalizers,verbs=update
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=llms,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=llms/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=llms/finalizers,verbs=update
//+kubebuilder:rbac:groups=prompt.arcadia.kubeagi.k8s.com.cn,resources=prompts,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=prompt.arcadia.kubeagi.k8s.com.cn,resources=prompts/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=prompt.arcadia.kubeagi.k8s.com.cn,resources=prompts/finalizers,verbs=update
//+kubebuilder:rbac:groups=retriever.arcadia.kubeagi.k8s.com.cn,resources=knowledgebaseretrievers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=retriever.arcadia.kubeagi.k8s.com.cn,resources=knowledgebaseretrievers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=retriever.arcadia.kubeagi.k8s.com.cn,resources=knowledgebaseretrievers/finalizers,verbs=update
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=agents,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=agents/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=agents/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand Down Expand Up @@ -68,7 +113,7 @@ func (r *ApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}
log.Info("Adding Finalizer for Application done")
return ctrl.Result{}, nil
return ctrl.Result{Requeue: true}, nil
}

// Check if the Application instance is marked to be deleted, which is
Expand Down Expand Up @@ -114,32 +159,32 @@ func (r *ApplicationReconciler) validateNodes(ctx context.Context, log logr.Logg
for _, node := range app.Spec.Nodes {
if _, ok := nodeName[node.Name]; ok {
r.setCondition(app, app.Status.ErrorCondition("node name should be unique")...)
return app, ctrl.Result{}, nil
return app, ctrl.Result{RequeueAfter: waitMedium}, nil
}
nodeName[node.Name] = true
if node.Ref.Kind == arcadiav1alpha1.InputNode {
input++
if len(node.NextNodeName) == 0 {
r.setCondition(app, app.Status.ErrorCondition("input node needs one or more next nodes")...)
return app, ctrl.Result{}, nil
return app, ctrl.Result{RequeueAfter: waitMedium}, nil
}
}
if node.Ref.Kind == arcadiav1alpha1.OutputNode {
output++
outputNodeName = node.Name
if len(node.NextNodeName) != 0 {
r.setCondition(app, app.Status.ErrorCondition("output node should not have next nodes")...)
return app, ctrl.Result{}, nil
return app, ctrl.Result{RequeueAfter: waitMedium}, nil
}
}
}
if input != 1 {
r.setCondition(app, app.Status.ErrorCondition("need one input node")...)
return app, ctrl.Result{}, nil
return app, ctrl.Result{RequeueAfter: waitMedium}, nil
}
if output != 1 {
r.setCondition(app, app.Status.ErrorCondition("need one output node")...)
return app, ctrl.Result{}, nil
return app, ctrl.Result{RequeueAfter: waitMedium}, nil
}

var toOutput int
Expand All @@ -151,24 +196,40 @@ func (r *ApplicationReconciler) validateNodes(ctx context.Context, log logr.Logg
group := node.Ref.APIGroup
if group == nil {
r.setCondition(app, app.Status.ErrorCondition("node should have ref.group setting")...)
return app, ctrl.Result{}, nil
return app, ctrl.Result{RequeueAfter: waitMedium}, nil
}
// Only allow chain group or agent node as the ending node
if *group != chainv1alpha1.Group && (*group != agentv1alpha1.Group && node.Ref.Kind != "agent") {
r.setCondition(app, app.Status.ErrorCondition("ending node should be chain or agent")...)
return app, ctrl.Result{}, nil
return app, ctrl.Result{RequeueAfter: waitMedium}, nil
}
}
toOutputNodeNext = len(node.NextNodeName)
}
}
if toOutput != 1 {
r.setCondition(app, app.Status.ErrorCondition("only one node can output")...)
return app, ctrl.Result{}, nil
return app, ctrl.Result{RequeueAfter: waitMedium}, nil
}
if toOutputNodeNext != 1 {
r.setCondition(app, app.Status.ErrorCondition("when this node points to output, it can only point to output")...)
return app, ctrl.Result{}, nil
return app, ctrl.Result{RequeueAfter: waitMedium}, nil
}

for _, node := range app.Spec.Nodes {
n, err := appruntime.InitNode(ctx, app.Namespace, node.Name, *node.Ref)
if err != nil {
r.setCondition(app, app.Status.ErrorCondition(fmt.Sprintf("initnode %s failed: %s", node.Name, err.Error()))...)
return app, ctrl.Result{RequeueAfter: waitMedium}, nil
}
if err := n.Init(ctx, nil, r.Client); err != nil {
r.setCondition(app, app.Status.ErrorCondition(fmt.Sprintf("node %s init failed: %s", node.Name, err.Error()))...)
return app, ctrl.Result{RequeueAfter: waitMedium}, nil
}
if isReady, errMsg := n.Ready(); !isReady {
r.setCondition(app, app.Status.ErrorCondition(fmt.Sprintf("node %s init failed: %s", node.Name, errMsg))...)
return app, ctrl.Result{RequeueAfter: waitMedium}, nil
}
}

r.setCondition(app, app.Status.ReadyCondition()...)
Expand Down Expand Up @@ -197,9 +258,6 @@ func (r *ApplicationReconciler) reconcile(ctx context.Context, log logr.Logger,
if !reflect.DeepEqual(app, appRaw) {
return app, ctrl.Result{Requeue: true}, r.Patch(ctx, app, client.MergeFrom(appRaw))
}
if app.Status.IsReady() {
return app, ctrl.Result{}, nil
}
return r.validateNodes(ctx, log, app)
}

Expand All @@ -216,10 +274,76 @@ func (r *ApplicationReconciler) patchStatus(ctx context.Context, app *arcadiav1a
return r.Client.Status().Patch(ctx, latest, patch, client.FieldOwner("application-controller"))
}

type Dependency struct {
IndexName string
GroupPrefix string
Kind string
}

// SetupWithManager sets up the controller with the Manager.
func (r *ApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *ApplicationReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
dependencies := []Dependency{
{APIChainIndexKey, "chain", "apichain"},
{LLMChainIndexKey, "chain", "llmchain"},
{RetrievalQAChainIndexKey, "chain", "retrievalqachain"},
{KnowledgebaseIndexKey, "", "knowledgebase"},
{LLMIndexKey, "", "llm"},
{PromptIndexKey, "prompt", "prompt"},
{KnowledgebaseRetrieverIndexKey, "retriever", "knowledgebaseretriever"},
{AgentIndexKey, "", "agent"},
}
for _, d := range dependencies {
if err := mgr.GetFieldIndexer().IndexField(ctx, &arcadiav1alpha1.Application{}, d.IndexName,
func(o client.Object) []string {
app, ok := o.(*arcadiav1alpha1.Application)
if !ok {
return nil
}
has, ns, name := appruntime.FindNodesHas(app, d.GroupPrefix, d.Kind)
if !has {
return nil
}
return []string{
fmt.Sprintf("%s/%s", ns, name),
}
},
); err != nil {
return err
}
}

getEventHandler := func(indexKey string) handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(o client.Object) (reqs []reconcile.Request) {
var list arcadiav1alpha1.ApplicationList
if err := r.List(ctx, &list, client.MatchingFields{indexKey: client.ObjectKeyFromObject(o).String()}); err != nil {
ctrl.LoggerFrom(ctx).Error(err, "failed to list Application for"+indexKey)
return nil
}
for _, i := range list.Items {
i := i
reqs = append(reqs, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(&i)})
}
return reqs
})
}

return ctrl.NewControllerManagedBy(mgr).
For(&arcadiav1alpha1.Application{}).
For(&arcadiav1alpha1.Application{}, builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
// Avoid to handle the event that it's not spec update or delete
o := ue.ObjectOld.(*arcadiav1alpha1.Application)
n := ue.ObjectNew.(*arcadiav1alpha1.Application)
return !reflect.DeepEqual(o.Spec, n.Spec) || n.DeletionTimestamp != nil
},
})).
Watches(&source.Kind{Type: &chainv1alpha1.APIChain{}}, getEventHandler(APIChainIndexKey)).
Watches(&source.Kind{Type: &chainv1alpha1.LLMChain{}}, getEventHandler(LLMChainIndexKey)).
Watches(&source.Kind{Type: &chainv1alpha1.RetrievalQAChain{}}, getEventHandler(RetrievalQAChainIndexKey)).
Watches(&source.Kind{Type: &arcadiav1alpha1.KnowledgeBase{}}, getEventHandler(KnowledgebaseIndexKey)).
Watches(&source.Kind{Type: &arcadiav1alpha1.LLM{}}, getEventHandler(LLMIndexKey)).
Watches(&source.Kind{Type: &promptv1alpha1.Prompt{}}, getEventHandler(PromptIndexKey)).
Watches(&source.Kind{Type: &retrieveralpha1.KnowledgeBaseRetriever{}}, getEventHandler(KnowledgebaseRetrieverIndexKey)).
Watches(&source.Kind{Type: &agentv1alpha1.Agent{}}, getEventHandler(AgentIndexKey)).
Complete(r)
}

Expand Down
2 changes: 1 addition & 1 deletion deploy/charts/arcadia/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: v2
name: arcadia
description: A Helm chart(KubeBB Component) for KubeAGI Arcadia
type: application
version: 0.2.27
version: 0.2.28
appVersion: "0.1.0"

keywords:
Expand Down
26 changes: 26 additions & 0 deletions deploy/charts/arcadia/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,32 @@ rules:
- patch
- update
- watch
- apiGroups:
- arcadia.kubeagi.k8s.com.cn
resources:
- agents
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- arcadia.kubeagi.k8s.com.cn
resources:
- agents/finalizers
verbs:
- update
- apiGroups:
- arcadia.kubeagi.k8s.com.cn
resources:
- agents/status
verbs:
- get
- patch
- update
- apiGroups:
- arcadia.kubeagi.k8s.com.cn
resources:
Expand Down
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

agentv1alpha1 "github.com/kubeagi/arcadia/api/app-node/agent/v1alpha1"
apichain "github.com/kubeagi/arcadia/api/app-node/chain/v1alpha1"
apiprompt "github.com/kubeagi/arcadia/api/app-node/prompt/v1alpha1"
apiretriever "github.com/kubeagi/arcadia/api/app-node/retriever/v1alpha1"
Expand Down Expand Up @@ -69,6 +70,7 @@ func init() {
utilruntime.Must(apiretriever.AddToScheme(scheme))
utilruntime.Must(evaluationarcadiav1alpha1.AddToScheme(scheme))
utilruntime.Must(batchv1.AddToScheme(scheme))
utilruntime.Must(agentv1alpha1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}

Expand Down Expand Up @@ -233,7 +235,7 @@ func main() {
if err = (&basecontrollers.ApplicationReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Application")
os.Exit(1)
}
Expand Down
Loading

0 comments on commit 129ecc9

Please sign in to comment.