Skip to content

Commit

Permalink
Merge pull request #679 from Abirdcfly/knowleadgebase-controller
Browse files Browse the repository at this point in the history
feat: added watch on Embedder and VectorStore for KnowledgeBase reconciliation
  • Loading branch information
bjwswang committed Jan 31, 2024
2 parents b51da6b + 9a4932e commit 1e22fb4
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 11 deletions.
86 changes: 80 additions & 6 deletions controllers/base/knowledgebase_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

arcadiav1alpha1 "github.com/kubeagi/arcadia/api/base/v1alpha1"
"github.com/kubeagi/arcadia/pkg/config"
Expand All @@ -51,6 +54,9 @@ import (
)

const (
EmbedderIndexKey = "metadata.embedder"
VectorStoreIndexKey = "metadata.vectorstore"

waitLonger = time.Hour
waitSmaller = time.Second * 3
waitMedium = time.Second * 30
Expand Down Expand Up @@ -162,9 +168,69 @@ func (r *KnowledgeBaseReconciler) patchStatus(ctx context.Context, log logr.Logg
}

// SetupWithManager sets up the controller with the Manager.
func (r *KnowledgeBaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *KnowledgeBaseReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(ctx, &arcadiav1alpha1.KnowledgeBase{}, EmbedderIndexKey,
func(o client.Object) []string {
kb, ok := o.(*arcadiav1alpha1.KnowledgeBase)
if !ok {
return nil
}
if kb.Spec.Embedder == nil || kb.Spec.Embedder.Name == "" {
return nil
}
return []string{
fmt.Sprintf("%s/%s", kb.Spec.Embedder.GetNamespace(kb.Namespace), kb.Spec.Embedder.Name),
}
},
); err != nil {
return err
}

if err := mgr.GetFieldIndexer().IndexField(ctx, &arcadiav1alpha1.KnowledgeBase{}, VectorStoreIndexKey,
func(o client.Object) []string {
kb, ok := o.(*arcadiav1alpha1.KnowledgeBase)
if !ok {
return nil
}
if kb.Spec.VectorStore == nil || kb.Spec.VectorStore.Name == "" {
return nil
}
return []string{
fmt.Sprintf("%s/%s", kb.Spec.VectorStore.GetNamespace(kb.Namespace), kb.Spec.VectorStore.Name),
}
},
); err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&arcadiav1alpha1.KnowledgeBase{}).
Watches(&source.Kind{Type: &arcadiav1alpha1.Embedder{}},
handler.EnqueueRequestsFromMapFunc(func(o client.Object) (reqs []reconcile.Request) {
var list arcadiav1alpha1.KnowledgeBaseList
if err := r.List(ctx, &list, client.MatchingFields{EmbedderIndexKey: client.ObjectKeyFromObject(o).String()}); err != nil {
ctrl.LoggerFrom(ctx).Error(err, "failed to list Knowlegebase for embedder changes")
return nil
}
for _, i := range list.Items {
i := i
reqs = append(reqs, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(&i)})
}
return reqs
})).
Watches(&source.Kind{Type: &arcadiav1alpha1.VectorStore{}},
handler.EnqueueRequestsFromMapFunc(func(o client.Object) (reqs []reconcile.Request) {
var list arcadiav1alpha1.KnowledgeBaseList
if err := r.List(ctx, &list, client.MatchingFields{VectorStoreIndexKey: client.ObjectKeyFromObject(o).String()}); err != nil {
ctrl.LoggerFrom(ctx).Error(err, "failed to list Knowlegebase for vectorstore changes")
return nil
}
for _, i := range list.Items {
i := i
reqs = append(reqs, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(&i)})
}
return reqs
})).
Complete(r)
}

Expand Down Expand Up @@ -193,11 +259,6 @@ func (r *KnowledgeBaseReconciler) reconcile(ctx context.Context, log logr.Logger
return kb, ctrl.Result{}, nil
}

if kb.Status.IsReady() || r.isReady(kb) {
log.Info("KnowledgeBase is ready, skip reconcile")
return kb, ctrl.Result{}, nil
}

embedderReq := kb.Spec.Embedder
vectorStoreReq := kb.Spec.VectorStore
fileGroupsReq := kb.Spec.FileGroups
Expand All @@ -215,6 +276,10 @@ func (r *KnowledgeBaseReconciler) reconcile(ctx context.Context, log logr.Logger
kb = r.setCondition(log, kb, kb.ErrorCondition(err.Error()))
return kb, ctrl.Result{}, err
}
if !embedder.Status.IsReady() {
kb = r.setCondition(log, kb, kb.ErrorCondition(errEmbedderNotReady.Error()))
return kb, ctrl.Result{RequeueAfter: waitMedium}, nil
}

vectorStore := &arcadiav1alpha1.VectorStore{}
if err := r.Get(ctx, types.NamespacedName{Name: kb.Spec.VectorStore.Name, Namespace: kb.Spec.VectorStore.GetNamespace(kb.GetNamespace())}, vectorStore); err != nil {
Expand All @@ -225,6 +290,15 @@ func (r *KnowledgeBaseReconciler) reconcile(ctx context.Context, log logr.Logger
kb = r.setCondition(log, kb, kb.ErrorCondition(err.Error()))
return kb, ctrl.Result{}, err
}
if !vectorStore.Status.IsReady() {
kb = r.setCondition(log, kb, kb.ErrorCondition(errVectorStoreNotReady.Error()))
return kb, ctrl.Result{RequeueAfter: waitMedium}, nil
}

if kb.Status.IsReady() || r.isReady(kb) {
log.Info("KnowledgeBase is ready, skip reconcile")
return kb, ctrl.Result{}, nil
}

errs := make([]error, 0)
for _, fileGroup := range kb.Spec.FileGroups {
Expand Down
9 changes: 4 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package main

import (
"context"
"flag"
"net/http"
"net/http/pprof"
Expand Down Expand Up @@ -133,7 +132,7 @@ func main() {
os.Exit(1)
}
}

ctx := ctrl.SetupSignalHandler()
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), options)
if err != nil {
setupLog.Error(err, "unable to start manager")
Expand All @@ -145,7 +144,7 @@ func main() {
if err != nil {
panic(err)
}
_, err = clientset.CoreV1().ConfigMaps(utils.GetCurrentNamespace()).Get(context.Background(), config.EnvConfigDefaultValue, metav1.GetOptions{})
_, err = clientset.CoreV1().ConfigMaps(utils.GetCurrentNamespace()).Get(ctx, config.EnvConfigDefaultValue, metav1.GetOptions{})
if err != nil {
setupLog.Error(err, "failed to find required configMap", utils.GetCurrentNamespace(), config.EnvConfigDefaultValue)
panic(err)
Expand Down Expand Up @@ -213,7 +212,7 @@ func main() {
Scheme: mgr.GetScheme(),
HasHandledSuccessPath: make(map[string]bool),
ReadyMap: make(map[string]bool),
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "KnowledgeBase")
os.Exit(1)
}
Expand Down Expand Up @@ -307,7 +306,7 @@ func main() {
}

setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
Expand Down

0 comments on commit 1e22fb4

Please sign in to comment.