From ed955e57c8ecbdffeee58f49826c91ae3c27a2a2 Mon Sep 17 00:00:00 2001 From: 0xff-dev Date: Sun, 31 Mar 2024 21:04:47 +0800 Subject: [PATCH] chore: knowledgebase add processing status --- controllers/base/knowledgebase_controller.go | 490 +++++++++++-------- 1 file changed, 279 insertions(+), 211 deletions(-) diff --git a/controllers/base/knowledgebase_controller.go b/controllers/base/knowledgebase_controller.go index cc8e7a02e..57d4c4b81 100644 --- a/controllers/base/knowledgebase_controller.go +++ b/controllers/base/knowledgebase_controller.go @@ -63,6 +63,8 @@ const ( waitLonger = time.Hour waitSmaller = time.Second * 3 waitMedium = time.Minute + + retryForFailed = "for-failed" ) var ( @@ -77,7 +79,6 @@ var ( type KnowledgeBaseReconciler struct { client.Client Scheme *runtime.Scheme - mu sync.Mutex HasHandledSuccessPath map[string]bool readyMu sync.Mutex ReadyMap map[string]bool @@ -111,19 +112,6 @@ func (r *KnowledgeBaseReconciler) Reconcile(ctx context.Context, req ctrl.Reques log = log.WithValues("Generation", kb.GetGeneration(), "ObservedGeneration", kb.Status.ObservedGeneration, "creator", kb.Spec.Creator) log.V(5).Info("Get KnowledgeBase instance") - // Add a finalizer.Then, we can define some operations which should - // occur before the KnowledgeBase to be deleted. - // More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/finalizers - if newAdded := controllerutil.AddFinalizer(kb, arcadiav1alpha1.Finalizer); newAdded { - log.Info("Try to add Finalizer for KnowledgeBase") - if err = r.Update(ctx, kb); err != nil { - log.Error(err, "Failed to update KnowledgeBase to add finalizer, will try again later") - return ctrl.Result{}, err - } - log.Info("Adding Finalizer for KnowledgeBase done") - return ctrl.Result{}, nil - } - // Check if the KnowledgeBase instance is marked to be deleted, which is // indicated by the deletion timestamp being set. if kb.GetDeletionTimestamp() != nil && controllerutil.ContainsFinalizer(kb, arcadiav1alpha1.Finalizer) { @@ -139,23 +127,78 @@ func (r *KnowledgeBaseReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, nil } + // Add a finalizer.Then, we can define some operations which should + // occur before the KnowledgeBase to be deleted. + // More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/finalizers + if newAdded := controllerutil.AddFinalizer(kb, arcadiav1alpha1.Finalizer); newAdded { + log.Info("Try to add Finalizer for KnowledgeBase") + if err = r.Update(ctx, kb); err != nil { + log.Error(err, "Failed to update KnowledgeBase to add finalizer, will try again later") + return ctrl.Result{}, err + } + log.Info("Adding Finalizer for KnowledgeBase done") + return ctrl.Result{}, nil + } + // The previous version of the knowledge base used the paths field to store files. // After the change, the files field is used. // without migration, which will lead to incorrect data display and the inability to vectorize the data normally. if migrated := r.migratePaths2Files(ctx, kb); migrated { + log.Info("start to migrate files") return reconcile.Result{}, r.Client.Update(ctx, kb) } + if len(kb.Status.Conditions) == 0 { + log.Info("start to set Pending Condition") + kb = r.setCondition(log, kb, kb.PendingCondition("")) + return reconcile.Result{}, r.patchStatus(ctx, log, kb) + } - kb, result, err = r.reconcile(ctx, log, kb) + if kb.Status.ObservedGeneration != kb.Generation { + kb.Status.ObservedGeneration = kb.Generation + log.Info("start to set InitCondition") + kb = r.setCondition(log, kb, kb.InitCondition()) + return reconcile.Result{}, r.patchStatus(ctx, log, kb) + } + + if v := kb.Annotations[arcadiav1alpha1.UpdateSourceFileAnnotationKey]; v != "" { + log.Info("Manual update") + kbNew := kb.DeepCopy() + if v != retryForFailed && len(kb.Status.FileGroupDetail) != 0 { + log.Info("set FileGroupDetail to nil to redo embedder...") + kbNew.Status.FileGroupDetail = nil + kbNew = r.setCondition(log, kbNew, kbNew.InitCondition()) + return reconcile.Result{}, r.patchStatus(ctx, log, kbNew) + } + if v == retryForFailed { + found := false + for out, fg := range kbNew.Status.FileGroupDetail { + for in, f := range fg.FileDetails { + if f.Phase == arcadiav1alpha1.FileProcessPhaseFailed { + found = true + kbNew.Status.FileGroupDetail[out].FileDetails[in].Phase = arcadiav1alpha1.FileProcessPhaseProcessing + kbNew.Status.FileGroupDetail[out].FileDetails[in].LastUpdateTime = metav1.Now() + kbNew.Status.FileGroupDetail[out].FileDetails[in].ErrMessage = "" + } + } + } + if found { + log.Info("there are files that failed to be processed and are ready to try again.") + kbNew = r.setCondition(log, kbNew, kbNew.InitCondition()) + return reconcile.Result{}, r.patchStatus(ctx, log, kbNew) + } + } + delete(kbNew.Annotations, arcadiav1alpha1.UpdateSourceFileAnnotationKey) + return reconcile.Result{}, r.Patch(ctx, kbNew, client.MergeFrom(kb)) + } - // Update status after reconciliation. - if updateStatusErr := r.patchStatus(ctx, log, kb); updateStatusErr != nil { - log.Error(updateStatusErr, "unable to update status after reconciliation") - return ctrl.Result{Requeue: true}, updateStatusErr + dp := kb.DeepCopy() + if r.syncStatus(ctx, dp) { + log.V(5).Info(fmt.Sprintf("status is different from spec. new status: %+v\n, old status: %+v", dp.Status.FileGroupDetail, kb.Status.FileGroupDetail)) + return reconcile.Result{}, r.Client.Status().Patch(ctx, dp, client.MergeFrom(kb)) } - log.V(5).Info("Reconcile done") - return result, err + log.Info("start to reconcile") + return r.reconcile(ctx, log, kb) } func (r *KnowledgeBaseReconciler) patchStatus(ctx context.Context, log logr.Logger, kb *arcadiav1alpha1.KnowledgeBase) error { @@ -244,94 +287,97 @@ func (r *KnowledgeBaseReconciler) SetupWithManager(ctx context.Context, mgr ctrl Complete(r) } -func (r *KnowledgeBaseReconciler) reconcile(ctx context.Context, log logr.Logger, kb *arcadiav1alpha1.KnowledgeBase) (*arcadiav1alpha1.KnowledgeBase, ctrl.Result, error) { +func (r *KnowledgeBaseReconciler) reconcile(ctx context.Context, log logr.Logger, kb *arcadiav1alpha1.KnowledgeBase) (ctrl.Result, error) { // Observe generation change or manual update - if kb.Status.ObservedGeneration != kb.Generation || kb.Annotations[arcadiav1alpha1.UpdateSourceFileAnnotationKey] != "" { - r.cleanupHasHandledSuccessPath(kb) - if kb.Status.ObservedGeneration != kb.Generation { - log.Info("Generation changed") - kb.Status.ObservedGeneration = kb.Generation - } - kb = r.setCondition(log, kb, kb.InitCondition()) - if updateStatusErr := r.patchStatus(ctx, log, kb); updateStatusErr != nil { - log.Error(updateStatusErr, "unable to update status after generation update") - return kb, ctrl.Result{Requeue: true}, updateStatusErr - } - if kb.Annotations[arcadiav1alpha1.UpdateSourceFileAnnotationKey] != "" { - log.Info("Manual update") - kbNew := kb.DeepCopy() - delete(kbNew.Annotations, arcadiav1alpha1.UpdateSourceFileAnnotationKey) - err := r.Patch(ctx, kbNew, client.MergeFrom(kb)) - if err != nil { - return kb, ctrl.Result{Requeue: true}, err - } - } - return kb, ctrl.Result{}, nil - } - embedderReq := kb.Spec.Embedder vectorStoreReq := kb.Spec.VectorStore fileGroupsReq := kb.Spec.FileGroups if embedderReq == nil || vectorStoreReq == nil || len(fileGroupsReq) == 0 { kb = r.setCondition(log, kb, kb.PendingCondition("embedder or vectorstore or filegroups is not setting")) - return kb, ctrl.Result{}, nil + return ctrl.Result{}, r.patchStatus(ctx, log, kb) } embedder := &arcadiav1alpha1.Embedder{} if err := r.Get(ctx, types.NamespacedName{Name: kb.Spec.Embedder.Name, Namespace: kb.Spec.Embedder.GetNamespace(kb.GetNamespace())}, embedder); err != nil { + log.Info("get embedder error " + err.Error()) if apierrors.IsNotFound(err) { kb = r.setCondition(log, kb, kb.PendingCondition("embedder is not found")) - return kb, ctrl.Result{RequeueAfter: waitLonger}, nil + } else { + kb = r.setCondition(log, kb, kb.ErrorCondition(err.Error())) } - kb = r.setCondition(log, kb, kb.ErrorCondition(err.Error())) - return kb, ctrl.Result{}, err + return ctrl.Result{}, r.patchStatus(ctx, log, kb) } if !embedder.Status.IsReady() { + log.Info(fmt.Sprintf("embedder %s is not ready", embedder.Name)) kb = r.setCondition(log, kb, kb.ErrorCondition(errEmbedderNotReady.Error())) - return kb, ctrl.Result{RequeueAfter: waitMedium}, nil + return ctrl.Result{}, r.patchStatus(ctx, log, kb) } vectorStore := &arcadiav1alpha1.VectorStore{} if err := r.Get(ctx, types.NamespacedName{Name: kb.Spec.VectorStore.Name, Namespace: kb.Spec.VectorStore.GetNamespace(kb.GetNamespace())}, vectorStore); err != nil { + log.Info("get vectorstore error " + err.Error()) if apierrors.IsNotFound(err) { kb = r.setCondition(log, kb, kb.PendingCondition("vectorStore is not found")) - return kb, ctrl.Result{RequeueAfter: waitLonger}, nil + } else { + kb = r.setCondition(log, kb, kb.ErrorCondition(err.Error())) } - kb = r.setCondition(log, kb, kb.ErrorCondition(err.Error())) - return kb, ctrl.Result{}, err + return ctrl.Result{}, r.patchStatus(ctx, log, kb) } if !vectorStore.Status.IsReady() { + log.Info(fmt.Sprintf("vectorstore %s is not ready", vectorStore.Name)) kb = r.setCondition(log, kb, kb.ErrorCondition(errVectorStoreNotReady.Error())) - return kb, ctrl.Result{RequeueAfter: waitMedium}, nil + return ctrl.Result{}, r.patchStatus(ctx, log, kb) } if kb.Status.IsReady() || r.isReady(kb) { log.Info("KnowledgeBase is ready, skip reconcile") - return kb, ctrl.Result{}, nil + return ctrl.Result{}, nil } - errs := make([]error, 0) - for _, fileGroup := range kb.Spec.FileGroups { - if err := r.reconcileFileGroup(ctx, log, kb, vectorStore, embedder, fileGroup); err != nil { - log.Error(err, "Failed to reconcile FileGroup", "fileGroup", fileGroup) - errs = append(errs, err) + haveFailed := false + for out, fg := range kb.Status.FileGroupDetail { + if fg.Source == nil { + log.Info(fmt.Sprintf("kb.Status.FileGroupDetail[%d] source is nil, skip", out)) + continue } - } - if err := errors.Join(errs...); err != nil { - kb = r.setCondition(log, kb, kb.ErrorCondition(err.Error())) - return kb, ctrl.Result{RequeueAfter: waitMedium}, nil - } else { - for _, fileGroupDetail := range kb.Status.FileGroupDetail { - for _, fileDetail := range fileGroupDetail.FileDetails { - if fileDetail.Phase == arcadiav1alpha1.FileProcessPhaseFailed && fileDetail.ErrMessage != "" { - kb = r.setCondition(log, kb, kb.ErrorCondition(fileDetail.ErrMessage)) - return kb, ctrl.Result{RequeueAfter: waitMedium}, nil + for in, f := range fg.FileDetails { + if f.Phase == arcadiav1alpha1.FileProcessPhaseSkipped { + log.Info(fmt.Sprintf("source %s/%s, file %s, the current phase is skip and will not be processed.", fg.Source.Kind, fg.Source.Name, f.Path)) + continue + } + if f.Phase == arcadiav1alpha1.FileProcessPhasePending { + log.V(5).Info(fmt.Sprintf("source: %s/%s file: %s, cur is Pending,change it to Processing", fg.Source.Kind, fg.Source.Name, f.Path)) + kb.Status.FileGroupDetail[out].FileDetails[in].Phase = arcadiav1alpha1.FileProcessPhaseProcessing + kb.Status.FileGroupDetail[out].FileDetails[in].LastUpdateTime = metav1.Now() + return ctrl.Result{}, r.patchStatus(ctx, log, kb) + } + if f.Phase == arcadiav1alpha1.FileProcessPhaseFailed { + log.Info(fmt.Sprintf("source: %s/%s, file: %s, is failed skip.", fg.Source.Kind, fg.Source.Name, f.Path)) + haveFailed = true + continue + } + if f.Phase == arcadiav1alpha1.FileProcessPhaseSucceeded { + log.Info(fmt.Sprintf("source %s/%s, file %s, processing completed", fg.Source.Kind, fg.Source.Name, f.Path)) + continue + } + if f.Phase == arcadiav1alpha1.FileProcessPhaseProcessing { + log.Info(fmt.Sprintf("source: %s/%s, file: %s, is Processing", fg.Source.Kind, fg.Source.Name, f.Path)) + err := r.reconcileFileGroup(ctx, log, kb, vectorStore, embedder, out, in) + if err != nil { + log.Error(err, "failed to handle single file", "FileName", f.Path) } + return ctrl.Result{Requeue: true}, r.patchStatus(ctx, log, kb) } } + } + if haveFailed { + r.setCondition(log, kb, kb.ErrorCondition("some files failed to process.")) + return ctrl.Result{RequeueAfter: waitMedium}, r.patchStatus(ctx, log, kb) + } + if kb.Status.Conditions[0].Status != corev1.ConditionTrue { kb = r.setCondition(log, kb, kb.ReadyCondition()) } - return kb, ctrl.Result{}, nil + return ctrl.Result{}, r.patchStatus(ctx, log, kb) } func (r *KnowledgeBaseReconciler) setCondition(log logr.Logger, kb *arcadiav1alpha1.KnowledgeBase, condition ...arcadiav1alpha1.Condition) *arcadiav1alpha1.KnowledgeBase { @@ -351,26 +397,31 @@ func (r *KnowledgeBaseReconciler) setCondition(log logr.Logger, kb *arcadiav1alp return kb } -func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log logr.Logger, kb *arcadiav1alpha1.KnowledgeBase, vectorStore *arcadiav1alpha1.VectorStore, embedder *arcadiav1alpha1.Embedder, group arcadiav1alpha1.FileGroup) (err error) { +func (r *KnowledgeBaseReconciler) reconcileFileGroup( + ctx context.Context, + log logr.Logger, + kb *arcadiav1alpha1.KnowledgeBase, + vectorStore *arcadiav1alpha1.VectorStore, + embedder *arcadiav1alpha1.Embedder, + groupIndex, fileIndex int) (err error) { defer func() { if err != nil { err = fmt.Errorf("failed to reconcile FileGroup: %w", err) } }() - - if group.Source == nil { - return errNoSource - } + group := kb.Status.FileGroupDetail[groupIndex] + fileDetail := kb.Status.FileGroupDetail[groupIndex].FileDetails[fileIndex] ns := kb.Namespace if group.Source.Namespace != nil { ns = *group.Source.Namespace } + lowerKind := strings.ToLower(group.Source.Kind) var ds datasource.Datasource info := &arcadiav1alpha1.OSS{Bucket: ns} var vsBasePath string - switch strings.ToLower(group.Source.Kind) { + switch lowerKind { case "versioneddataset": versionedDataset := &arcadiav1alpha1.VersionedDataset{} if err = r.Get(ctx, types.NamespacedName{Name: group.Source.Name, Namespace: ns}, versionedDataset); err != nil { @@ -399,6 +450,8 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo } // basepath for this versioneddataset vsBasePath = filepath.Join("dataset", versionedDataset.Spec.Dataset.Name, versionedDataset.Spec.Version) + info.Object = filepath.Join(vsBasePath, fileDetail.Path) + case "datasource", "": dsObj := &arcadiav1alpha1.Datasource{} if err = r.Get(ctx, types.NamespacedName{Name: group.Source.Name, Namespace: ns}, dsObj); err != nil { @@ -423,132 +476,71 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo if kb.Spec.Type != arcadiav1alpha1.KnowledgeBaseTypeConversation { info.Bucket = dsObj.Spec.OSS.Bucket } + + info.Object = fileDetail.Path default: return fmt.Errorf("source type %s not supported yet", group.Source.Kind) } - if len(kb.Status.FileGroupDetail) == 0 { - // brand new knowledgebase, init status. - kb.Status.FileGroupDetail = make([]arcadiav1alpha1.FileGroupDetail, 1) - kb.Status.FileGroupDetail[0].Init(group) - log.V(5).Info("init filegroupdetail status") - } - var fileGroupDetail *arcadiav1alpha1.FileGroupDetail - pathMap := make(map[string]*arcadiav1alpha1.FileDetails, 1) - for i, detail := range kb.Status.FileGroupDetail { - if detail.Source != nil && detail.Source.Name == group.Source.Name && detail.Source.GetNamespace(kb.GetNamespace()) == ns { - fileGroupDetail = &kb.Status.FileGroupDetail[i] - break - } + info.VersionID = fileDetail.Version + + stat, err := ds.StatFile(ctx, info) + log.V(5).Info(fmt.Sprintf("raw StatFile:%#v", stat), "path", fileDetail.Path) + if err != nil { + log.Error(err, fmt.Sprintf("stat file failed. source: %s/%s/%s, file: %s, error: %s", + group.Source.Kind, ns, group.Source.Name, fileDetail.Path, err)) + kb.Status.FileGroupDetail[groupIndex].FileDetails[fileIndex].UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) + return err } - if fileGroupDetail == nil { - // this group is newly added - log.V(5).Info("new added group, init filegroupdetail status") - fileGroupDetail = &arcadiav1alpha1.FileGroupDetail{} - fileGroupDetail.Init(group) - kb.Status.FileGroupDetail = append(kb.Status.FileGroupDetail, *fileGroupDetail) + + objectStat, ok := stat.(minio.ObjectInfo) + log.V(5).Info(fmt.Sprintf("minio StatFile:%#v", objectStat), "path", fileDetail.Path) + if !ok { + err = fmt.Errorf("failed to convert stat to minio.ObjectInfo:%s", fileDetail.Path) + kb.Status.FileGroupDetail[groupIndex].FileDetails[fileIndex].UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) + return err } - for i, detail := range fileGroupDetail.FileDetails { - pathMap[detail.Path] = &fileGroupDetail.FileDetails[i] + kb.Status.FileGroupDetail[groupIndex].FileDetails[fileIndex].Version = fileDetail.Version + if objectStat.ETag == fileDetail.Checksum { + kb.Status.FileGroupDetail[groupIndex].FileDetails[fileIndex].Phase = arcadiav1alpha1.FileProcessPhaseSucceeded + return nil } + kb.Status.FileGroupDetail[groupIndex].FileDetails[fileIndex].Checksum = objectStat.ETag - errs := make([]error, 0) - for _, path := range group.Files { - r.mu.Lock() - hasHandled := r.HasHandledSuccessPath[r.hasHandledPathKey(kb, group, path.Path, path.Version)] - r.mu.Unlock() - if hasHandled { - continue - } - fileDetail, ok := pathMap[path.Path] - if !ok { - // this path is newly added - fileGroupDetail.FileDetails = append(fileGroupDetail.FileDetails, arcadiav1alpha1.FileDetails{ - Path: path.Path, - Checksum: "", - LastUpdateTime: metav1.Now(), - Phase: arcadiav1alpha1.FileProcessPhasePending, - ErrMessage: "", - }) - fileDetail = &fileGroupDetail.FileDetails[len(fileGroupDetail.FileDetails)-1] - } - - switch strings.ToLower(group.Source.Kind) { - case "versioneddataset": - info.Object = filepath.Join(vsBasePath, path.Path) - case "datasource", "": - info.Object = path.Path - default: - return fmt.Errorf("source type %s not supported yet", group.Source.Kind) - } - info.VersionID = path.Version - - stat, err := ds.StatFile(ctx, info) - log.V(5).Info(fmt.Sprintf("raw StatFile:%#v", stat), "path", path) - if err != nil { - errs = append(errs, err) - fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) - continue - } - - objectStat, ok := stat.(minio.ObjectInfo) - log.V(5).Info(fmt.Sprintf("minio StatFile:%#v", objectStat), "path", path) - if !ok { - err = fmt.Errorf("failed to convert stat to minio.ObjectInfo:%s", path) - errs = append(errs, err) - fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) - continue - } - // NOTE: 如果path.Version是空值,表示最新版本,那么在status中直接更新version为版本号, 而不是保持空值 - fileDetail.Version = objectStat.VersionID - // NOTE: If the version changes, the etag will not necessarily change. - if objectStat.ETag == fileDetail.Checksum { - fileDetail.LastUpdateTime = metav1.Now() - continue - } - fileDetail.Checksum = objectStat.ETag - - tags, err := ds.GetTags(ctx, info) - if err != nil { - errs = append(errs, err) - fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) - continue - } + tags, err := ds.GetTags(ctx, info) + if err != nil { + fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) + kb.Status.FileGroupDetail[groupIndex].FileDetails[fileIndex].UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) + return err + } - // File Size in string - fileDetail.Size = utils.BytesToSizedStr(objectStat.Size) - // File Type in string - fileDetail.Type = tags[arcadiav1alpha1.ObjectTypeTag] - // File data count in string - fileDetail.Count = tags[arcadiav1alpha1.ObjectCountTag] + kb.Status.FileGroupDetail[groupIndex].FileDetails[fileIndex].Size = utils.BytesToSizedStr(objectStat.Size) + // File Type in string + kb.Status.FileGroupDetail[groupIndex].FileDetails[fileIndex].Type = tags[arcadiav1alpha1.ObjectCountTag] + // File data count in string + kb.Status.FileGroupDetail[groupIndex].FileDetails[fileIndex].Count = tags[arcadiav1alpha1.ObjectCountTag] - file, err := ds.ReadFile(ctx, info) - if err != nil { - errs = append(errs, err) - fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) - continue - } - defer file.Close() - startTime := time.Now() - if err = r.handleFile(ctx, log, file, info.Object, tags, kb, vectorStore, embedder); err != nil { - if errors.Is(err, errFileSkipped) { - fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseSkipped) - continue - } - err = fmt.Errorf("failed to handle file:%s: %w", path, err) - errs = append(errs, err) - fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) - continue + file, err := ds.ReadFile(ctx, info) + if err != nil { + kb.Status.FileGroupDetail[groupIndex].FileDetails[fileIndex].UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) + return err + } + defer file.Close() + startTime := time.Now() + if err = r.handleFile(ctx, log, file, info.Object, tags, kb, vectorStore, embedder); err != nil { + if errors.Is(err, errFileSkipped) { + kb.Status.FileGroupDetail[groupIndex].FileDetails[fileIndex].UpdateErr(err, arcadiav1alpha1.FileProcessPhaseSkipped) + } else { + kb.Status.FileGroupDetail[groupIndex].FileDetails[fileIndex].UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) } - // time cost for file process - fileDetail.TimeCost = int64(time.Since(startTime).Milliseconds()) - r.mu.Lock() - r.HasHandledSuccessPath[r.hasHandledPathKey(kb, group, path.Path, path.Version)] = true - r.mu.Unlock() - fileDetail.UpdateErr(nil, arcadiav1alpha1.FileProcessPhaseSucceeded) - log.Info("handle FileGroup succeeded", "timecost(milliseconds)", fileDetail.TimeCost) + return err } - return errors.Join(errs...) + cost := int64(time.Since(startTime).Milliseconds()) + + kb.Status.FileGroupDetail[groupIndex].FileDetails[fileIndex].TimeCost = cost + log.Info("handle FileGroup succeeded", "timecost(milliseconds)", cost) + kb.Status.FileGroupDetail[groupIndex].FileDetails[fileIndex].UpdateErr(err, arcadiav1alpha1.FileProcessPhaseSucceeded) + return nil } func (r *KnowledgeBaseReconciler) handleFile(ctx context.Context, log logr.Logger, file io.ReadCloser, fileName string, tags map[string]string, kb *arcadiav1alpha1.KnowledgeBase, store *arcadiav1alpha1.VectorStore, embedder *arcadiav1alpha1.Embedder) (err error) { @@ -625,8 +617,8 @@ func (r *KnowledgeBaseReconciler) handleFile(ctx context.Context, log logr.Logge } func (r *KnowledgeBaseReconciler) reconcileDelete(ctx context.Context, log logr.Logger, kb *arcadiav1alpha1.KnowledgeBase) { - r.cleanupHasHandledSuccessPath(kb) - r.unready(log, kb) + // r.cleanupHasHandledSuccessPath(kb) + // r.unready(log, kb) vectorStore := &arcadiav1alpha1.VectorStore{} if err := r.Get(ctx, types.NamespacedName{Name: kb.Spec.VectorStore.Name, Namespace: kb.Spec.VectorStore.GetNamespace(kb.GetNamespace())}, vectorStore); err != nil { log.Error(err, "reconcile delete: get vector store error, may leave garbage data") @@ -640,24 +632,6 @@ func (r *KnowledgeBaseReconciler) reconcileDelete(ctx context.Context, log logr. }() } -func (r *KnowledgeBaseReconciler) hasHandledPathKey(kb *arcadiav1alpha1.KnowledgeBase, filegroup arcadiav1alpha1.FileGroup, path, version string) string { - sourceName := "" - if filegroup.Source != nil { - sourceName = filegroup.Source.Name - } - return kb.Name + "/" + kb.Namespace + "/" + sourceName + "/" + path + "/" + version -} - -func (r *KnowledgeBaseReconciler) cleanupHasHandledSuccessPath(kb *arcadiav1alpha1.KnowledgeBase) { - r.mu.Lock() - for _, fg := range kb.Spec.FileGroups { - for _, path := range fg.Files { - delete(r.HasHandledSuccessPath, r.hasHandledPathKey(kb, fg, path.Path, path.Version)) - } - } - r.mu.Unlock() -} - func (r *KnowledgeBaseReconciler) ready(log logr.Logger, kb *arcadiav1alpha1.KnowledgeBase) { r.readyMu.Lock() defer r.readyMu.Unlock() @@ -705,3 +679,97 @@ func (r *KnowledgeBaseReconciler) migratePaths2Files(ctx context.Context, kb *ar } return migrated } + +func isFileDetailDiff(a, b arcadiav1alpha1.FileDetails) bool { + return a.Path != b.Path || + a.Type != b.Type || + a.Count != b.Count || + a.Size != b.Size || + a.Checksum != b.Checksum || + a.TimeCost != b.TimeCost || + a.Phase != b.Phase || + a.ErrMessage != b.ErrMessage || + a.Version != b.Version +} + +func (r *KnowledgeBaseReconciler) syncStatus(ctx context.Context, kb *arcadiav1alpha1.KnowledgeBase) bool { + log, _ := logr.FromContext(ctx) + newStatus := make([]arcadiav1alpha1.FileGroupDetail, 0) + specSource := make(map[string]map[string][2]int) + now := metav1.Now() + for _, fg := range kb.Spec.FileGroups { + if fg.Source == nil { + continue + } + + ns := kb.Namespace + if fg.Source.Namespace != nil { + ns = *fg.Source.Namespace + } + key := fmt.Sprintf("%s/%s/%s", fg.Source.Kind, ns, fg.Source.Name) + if _, ok := specSource[key]; !ok { + specSource[key] = make(map[string][2]int) + newStatus = append(newStatus, arcadiav1alpha1.FileGroupDetail{Source: fg.Source.DeepCopy(), FileDetails: make([]arcadiav1alpha1.FileDetails, 0)}) + } + + index := len(newStatus) - 1 + + for i, f := range fg.Files { + newStatus[index].FileDetails = append(newStatus[index].FileDetails, arcadiav1alpha1.FileDetails{ + Path: f.Path, + Version: f.Version, + Phase: arcadiav1alpha1.FileProcessPhasePending, + LastUpdateTime: now, + }) + specSource[key][f.Path] = [2]int{index, i} + } + } + + for _, fgd := range kb.Status.FileGroupDetail { + ns := kb.Namespace + if fgd.Source.Namespace != nil { + ns = *fgd.Source.Namespace + } + key := fmt.Sprintf("%s/%s/%s", fgd.Source.Kind, ns, fgd.Source.Name) + fileDetails, ok := specSource[key] + if !ok { + continue + } + for _, f := range fgd.FileDetails { + v, ok := fileDetails[f.Path] + if !ok { + continue + } + vv := newStatus[v[0]].FileDetails[v[1]].Version + newStatus[v[0]].FileDetails[v[1]] = f + if newStatus[v[0]].FileDetails[v[1]].Version != vv { + newStatus[v[0]].FileDetails[v[1]].Version = vv + newStatus[v[0]].FileDetails[v[1]].Phase = arcadiav1alpha1.FileProcessPhaseProcessing + } + } + } + log.V(5).Info(fmt.Sprintf("new Status: %+v\n", newStatus)) + log.V(5).Info(fmt.Sprintf("old status: %+v\n", kb.Status.FileGroupDetail)) + if len(newStatus) != len(kb.Status.FileGroupDetail) { + kb.Status.FileGroupDetail = newStatus + return true + } + + for i := 0; i < len(newStatus); i++ { + if len(newStatus[i].FileDetails) != len(kb.Status.FileGroupDetail[i].FileDetails) { + log.V(5).Info(fmt.Sprintf("len diff %+v | %+v\n", newStatus[i].FileDetails, + kb.Status.FileGroupDetail[i].FileDetails)) + kb.Status.FileGroupDetail = newStatus + return true + } + for j := range newStatus[i].FileDetails { + if isFileDetailDiff(newStatus[i].FileDetails[j], kb.Status.FileGroupDetail[i].FileDetails[j]) { + log.V(5).Info(fmt.Sprintf("fileDetail diff %+v | %+v\n", + newStatus[i].FileDetails[j], kb.Status.FileGroupDetail[i].FileDetails[j])) + kb.Status.FileGroupDetail = newStatus + return true + } + } + } + return false +}