diff --git a/Dockerfile b/Dockerfile index fd9f6405a..0d9b0e074 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,7 +9,7 @@ COPY go.sum go.sum # cache deps before building and copying source so that we don't need to re-download as much # and so that source changes don't invalidate our downloaded layer RUN go env -w GOPROXY=${GOPROXY} -RUN go mod download +RUN go mod download -x # Copy the go source COPY main.go main.go @@ -19,8 +19,8 @@ COPY pkg/ pkg/ COPY apiserver/ apiserver/ # Build -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o manager main.go -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o apiserver-bin apiserver/main.go +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -a -o manager main.go +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o apiserver-bin apiserver/main.go # Use alpine as minimal base image to package the manager binary FROM alpine:3.19.1 diff --git a/apiserver/pkg/application/application.go b/apiserver/pkg/application/application.go index 66e395e8d..212a2b7ff 100644 --- a/apiserver/pkg/application/application.go +++ b/apiserver/pkg/application/application.go @@ -987,7 +987,7 @@ func UploadIcon(ctx context.Context, client client.Client, icon, appName, namesp return "", err } - system, err := config.GetSystemDatasource(ctx, client) + system, err := config.GetSystemDatasource(ctx) if err != nil { return "", err } diff --git a/apiserver/pkg/chat/chat_docs.go b/apiserver/pkg/chat/chat_docs.go index 027b32420..e4674a224 100644 --- a/apiserver/pkg/chat/chat_docs.go +++ b/apiserver/pkg/chat/chat_docs.go @@ -206,7 +206,7 @@ func (cs *ChatServer) BuildConversationKnowledgeBase(ctx context.Context, req Co return err } // systemDatasource which stores the document - systemDatasource, err := config.GetSystemDatasource(ctx, cs.cli) + systemDatasource, err := config.GetSystemDatasource(ctx) if err != nil { return err } diff --git a/apiserver/pkg/chat/chat_server.go b/apiserver/pkg/chat/chat_server.go index 1600eae36..05c0eab30 100644 --- a/apiserver/pkg/chat/chat_server.go +++ b/apiserver/pkg/chat/chat_server.go @@ -56,11 +56,13 @@ type ChatServer struct { cli runtimeclient.Client storage storage.Storage once sync.Once + isGpts bool } -func NewChatServer(cli runtimeclient.Client) *ChatServer { +func NewChatServer(cli runtimeclient.Client, isGpts bool) *ChatServer { return &ChatServer{ - cli: cli, + cli: cli, + isGpts: isGpts, } } @@ -68,7 +70,7 @@ func (cs *ChatServer) Storage() storage.Storage { if cs.storage == nil { cs.once.Do(func() { ctx := context.TODO() - ds, err := pkgconfig.GetRelationalDatasource(ctx, cs.cli) + ds, err := pkgconfig.GetRelationalDatasource(ctx) if err != nil || ds == nil { if err != nil { klog.Infof("get relational datasource failed: %s, use memory storage for chat", err.Error()) @@ -183,7 +185,7 @@ func (cs *ChatServer) AppRun(ctx context.Context, req ChatReqBody, respStream ch func (cs *ChatServer) ListConversations(ctx context.Context, req APPMetadata) ([]storage.Conversation, error) { currentUser, _ := ctx.Value(auth.UserNameContextKey).(string) - return cs.Storage().ListConversations(storage.WithAppNamespace(req.AppNamespace), storage.WithAppName(req.APPName), storage.WithUser(currentUser), storage.WithUser(currentUser)) + return cs.Storage().ListConversations(storage.WithAppNamespace(req.AppNamespace), storage.WithAppName(req.APPName), storage.WithUser(currentUser)) } func (cs *ChatServer) DeleteConversation(ctx context.Context, conversationID string) error { @@ -390,7 +392,13 @@ The question you asked is:` func (cs *ChatServer) GetApp(ctx context.Context, appName, appNamespace string) (*v1alpha1.Application, runtimeclient.Client, error) { token := auth.ForOIDCToken(ctx) - c, err := client.GetClient(token) + var c runtimeclient.Client + var err error + if !cs.isGpts { + c, err = client.GetClient(token) + } else { + c = cs.cli + } if err != nil { return nil, nil, fmt.Errorf("failed to get a client: %w", err) } diff --git a/apiserver/pkg/common/common.go b/apiserver/pkg/common/common.go index fd0295004..77dd585d4 100644 --- a/apiserver/pkg/common/common.go +++ b/apiserver/pkg/common/common.go @@ -62,7 +62,7 @@ var ( ) func SystemDatasourceOSS(ctx context.Context, mgrClient client.Client) (*datasource.OSS, error) { - systemDatasource, err := config.GetSystemDatasource(ctx, mgrClient) + systemDatasource, err := config.GetSystemDatasource(ctx) if err != nil { return nil, err } @@ -77,7 +77,7 @@ func SystemDatasourceOSS(ctx context.Context, mgrClient client.Client) (*datasou // Embedder and vectorstore are both required when generating a new embedding.That's why we call it a `EmbeddingSuit` func SystemEmbeddingSuite(ctx context.Context, cli client.Client) (*v1alpha1.Embedder, *v1alpha1.VectorStore, error) { // get the built-in system embedder - emd, err := config.GetEmbedder(ctx, cli) + emd, err := config.GetEmbedder(ctx) if err != nil { return nil, nil, err } @@ -86,7 +86,7 @@ func SystemEmbeddingSuite(ctx context.Context, cli client.Client) (*v1alpha1.Emb return nil, nil, err } // get the built-in system vectorstore - vs, err := config.GetVectorStore(ctx, cli) + vs, err := config.GetVectorStore(ctx) if err != nil { return nil, nil, err } @@ -100,7 +100,7 @@ func SystemEmbeddingSuite(ctx context.Context, cli client.Client) (*v1alpha1.Emb // GetAPIServer returns the api server url to access arcadia's worker // if external is true,then this func will return the external api server func GetAPIServer(ctx context.Context, cli client.Client, external bool) (string, error) { - gateway, err := config.GetGateway(ctx, cli) + gateway, err := config.GetGateway(ctx) if err != nil { return "", err } diff --git a/apiserver/pkg/gpt/gpt.go b/apiserver/pkg/gpt/gpt.go index 321550f10..7bb0d6b2c 100644 --- a/apiserver/pkg/gpt/gpt.go +++ b/apiserver/pkg/gpt/gpt.go @@ -118,7 +118,7 @@ func getHot(app *v1alpha1.Application, cli client.Client) int64 { if chatStorage == nil { once.Do( func() { - chatStorage = chat.NewChatServer(cli).Storage() + chatStorage = chat.NewChatServer(cli, true).Storage() }) } if chatStorage == nil { diff --git a/apiserver/pkg/knowledgebase/knowledgebase.go b/apiserver/pkg/knowledgebase/knowledgebase.go index 6c5be549c..08478e704 100644 --- a/apiserver/pkg/knowledgebase/knowledgebase.go +++ b/apiserver/pkg/knowledgebase/knowledgebase.go @@ -179,7 +179,7 @@ func knowledgebase2model(ctx context.Context, c client.Client, knowledgebase *v1 func CreateKnowledgeBase(ctx context.Context, c client.Client, input generated.CreateKnowledgeBaseInput) (*generated.KnowledgeBase, error) { var filegroups []v1alpha1.FileGroup var vectorstore v1alpha1.TypedObjectReference - vector, _ := config.GetVectorStore(ctx, c) + vector, _ := config.GetVectorStore(ctx) displayname, description, embedder := "", "", "" if input.DisplayName != nil { displayname = *input.DisplayName diff --git a/apiserver/pkg/ray/raycluster.go b/apiserver/pkg/ray/raycluster.go index 121b45429..b4be84686 100644 --- a/apiserver/pkg/ray/raycluster.go +++ b/apiserver/pkg/ray/raycluster.go @@ -27,7 +27,7 @@ import ( ) func ListRayClusters(ctx context.Context, c client.Client, input generated.ListCommonInput) (*generated.PaginatedResult, error) { - clusters, err := config.GetRayClusters(ctx, c) + clusters, err := config.GetRayClusters(ctx) if err != nil { return nil, err } diff --git a/apiserver/service/chat.go b/apiserver/service/chat.go index 811a0f45d..4047eef14 100644 --- a/apiserver/service/chat.go +++ b/apiserver/service/chat.go @@ -51,8 +51,8 @@ type ChatService struct { server *chat.ChatServer } -func NewChatService(cli runtimeclient.Client) (*ChatService, error) { - return &ChatService{chat.NewChatServer(cli)}, nil +func NewChatService(cli runtimeclient.Client, isGpts bool) (*ChatService, error) { + return &ChatService{chat.NewChatServer(cli, isGpts)}, nil } // @Summary chat with application @@ -450,7 +450,7 @@ func registerChat(g *gin.RouterGroup, conf config.ServerConfig) { panic(err) } - chatService, err := NewChatService(c) + chatService, err := NewChatService(c, false) if err != nil { panic(err) } diff --git a/apiserver/service/gpts.go b/apiserver/service/gpts.go index be2d1ff6a..18e1fc210 100644 --- a/apiserver/service/gpts.go +++ b/apiserver/service/gpts.go @@ -32,7 +32,7 @@ func registerGptsChat(g *gin.RouterGroup, conf config.ServerConfig) { panic(err) } - chatService, err := NewChatService(c) + chatService, err := NewChatService(c, true) if err != nil { panic(err) } diff --git a/apiserver/service/router.go b/apiserver/service/router.go index 28b03669c..4bf407459 100644 --- a/apiserver/service/router.go +++ b/apiserver/service/router.go @@ -25,7 +25,9 @@ import ( "github.com/kubeagi/arcadia/apiserver/config" "github.com/kubeagi/arcadia/apiserver/docs" + "github.com/kubeagi/arcadia/apiserver/pkg/client" "github.com/kubeagi/arcadia/apiserver/pkg/oidc" + pkgconfig "github.com/kubeagi/arcadia/pkg/config" ) func Cors() gin.HandlerFunc { @@ -55,6 +57,8 @@ func NewServerAndRun(conf config.ServerConfig) { if conf.EnableOIDC { oidc.InitOIDCArgs(conf.IssuerURL, conf.MasterURL, conf.ClientSecret, conf.ClientID) } + systemcli, _ := client.GetClient(nil) + pkgconfig.InitSystemClient(systemcli) bffGroup := r.Group("/bff") diff --git a/controllers/app-node/retriever/rerank_retriever_controller.go b/controllers/app-node/retriever/rerank_retriever_controller.go index 91804a6fb..4f0f6f6ec 100644 --- a/controllers/app-node/retriever/rerank_retriever_controller.go +++ b/controllers/app-node/retriever/rerank_retriever_controller.go @@ -111,7 +111,7 @@ func (r *RerankRetrieverReconciler) reconcile(ctx context.Context, log logr.Logg } } if instance.Spec.Model == nil { - model, err := config.GetDefaultRerankModel(ctx, r.Client) + model, err := config.GetDefaultRerankModel(ctx) if err != nil { instance.Status.SetConditions(instance.Status.ErrorCondition(fmt.Sprintf("no model provided. please set model in reranker or set system default reranking model in config :%s", err))...) return instance, ctrl.Result{RequeueAfter: 30 * time.Second}, err diff --git a/controllers/base/knowledgebase_controller.go b/controllers/base/knowledgebase_controller.go index 57d4c4b81..9d470da9f 100644 --- a/controllers/base/knowledgebase_controller.go +++ b/controllers/base/knowledgebase_controller.go @@ -403,7 +403,8 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup( kb *arcadiav1alpha1.KnowledgeBase, vectorStore *arcadiav1alpha1.VectorStore, embedder *arcadiav1alpha1.Embedder, - groupIndex, fileIndex int) (err error) { + groupIndex, fileIndex int, +) (err error) { defer func() { if err != nil { err = fmt.Errorf("failed to reconcile FileGroup: %w", err) @@ -436,7 +437,7 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup( if !versionedDataset.Status.IsReady() { return errDataSourceNotReady } - system, err := config.GetSystemDatasource(ctx, r.Client) + system, err := config.GetSystemDatasource(ctx) if err != nil { return err } diff --git a/controllers/base/model_controller.go b/controllers/base/model_controller.go index ce320fb96..1cc3fe87a 100644 --- a/controllers/base/model_controller.go +++ b/controllers/base/model_controller.go @@ -188,7 +188,7 @@ func (r *ModelReconciler) CheckModel(ctx context.Context, logger logr.Logger, in // otherwise we consider the model file for the trans-core service to be ready. if instance.Spec.Source == nil && (instance.Spec.HuggingFaceRepo == "" && instance.Spec.ModelScopeRepo == "") { logger.V(5).Info(fmt.Sprintf("model %s source is empty, check minio status.", instance.Name)) - system, err := config.GetSystemDatasource(ctx, r.Client) + system, err := config.GetSystemDatasource(ctx) if err != nil { return r.UpdateStatus(ctx, instance, err) } @@ -223,7 +223,7 @@ func (r *ModelReconciler) RemoveModel(ctx context.Context, logger logr.Logger, i var ds datasource.Datasource var info any - system, err := config.GetSystemDatasource(ctx, r.Client) + system, err := config.GetSystemDatasource(ctx) if err != nil { return r.UpdateStatus(ctx, instance, err) } diff --git a/controllers/base/namespace_controller.go b/controllers/base/namespace_controller.go index fec238eb7..e873cc144 100644 --- a/controllers/base/namespace_controller.go +++ b/controllers/base/namespace_controller.go @@ -140,7 +140,7 @@ func (r *NamespaceReconciler) SetupWithManager(mgr ctrl.Manager) error { } func (r *NamespaceReconciler) ossClient(ctx context.Context) (*datasource.OSS, error) { - systemDatasource, err := config.GetSystemDatasource(ctx, r.Client) + systemDatasource, err := config.GetSystemDatasource(ctx) if err != nil { klog.Errorf("get system datasource error %s", err) return nil, err diff --git a/controllers/base/versioneddataset_controller.go b/controllers/base/versioneddataset_controller.go index 259f34742..1d54bdf59 100644 --- a/controllers/base/versioneddataset_controller.go +++ b/controllers/base/versioneddataset_controller.go @@ -213,7 +213,7 @@ func (r *VersionedDatasetReconciler) preUpdate(ctx context.Context, logger logr. func (r *VersionedDatasetReconciler) checkStatus(ctx context.Context, logger logr.Logger, instance *v1alpha1.VersionedDataset) (bool, []v1alpha1.FileStatus, error) { // TODO: Currently, we think there is only one default minio environment, // so we get the minio client directly through the configuration. - systemDatasource, err := config.GetSystemDatasource(ctx, r.Client) + systemDatasource, err := config.GetSystemDatasource(ctx) if err != nil { logger.Error(err, "Failed to get system datasource") return false, nil, err @@ -233,7 +233,7 @@ func (r *VersionedDatasetReconciler) checkStatus(ctx context.Context, logger log } func (r *VersionedDatasetReconciler) removeBucketFiles(ctx context.Context, logger logr.Logger, instance *v1alpha1.VersionedDataset) error { - systemDatasource, err := config.GetSystemDatasource(ctx, r.Client) + systemDatasource, err := config.GetSystemDatasource(ctx) if err != nil { logger.Error(err, "Failed to get system datasource") return err diff --git a/controllers/base/worker_controller.go b/controllers/base/worker_controller.go index d852a6975..72589a4c3 100644 --- a/controllers/base/worker_controller.go +++ b/controllers/base/worker_controller.go @@ -202,7 +202,7 @@ func (r *WorkerReconciler) reconcile(ctx context.Context, logger logr.Logger, wo return worker, errors.Wrap(err, "model config datasource, but get it failed.") } } else { - datasource, err = config.GetSystemDatasource(ctx, r.Client) + datasource, err = config.GetSystemDatasource(ctx) if err != nil { return worker, errors.Wrap(err, "Failed to get system datasource") } diff --git a/controllers/evaluation/rag_controller.go b/controllers/evaluation/rag_controller.go index 77a351843..84a3b3a21 100644 --- a/controllers/evaluation/rag_controller.go +++ b/controllers/evaluation/rag_controller.go @@ -468,7 +468,7 @@ func (r *RAGReconciler) WhenJobChanged(job *batchv1.Job) { func (r *RAGReconciler) RemoveRAGFiles(ctx context.Context, rag *evaluationarcadiav1alpha1.RAG) { logger := log.FromContext(ctx, "RAG", rag.Name, "Namespace", rag.Namespace, "Action", "DeleteRAGFiles") - systemDatasource, err := config.GetSystemDatasource(ctx, r.Client) + systemDatasource, err := config.GetSystemDatasource(ctx) if err != nil { logger.Error(err, "failed to get system datasource") return diff --git a/deploy/charts/arcadia/templates/role-templates.yaml b/deploy/charts/arcadia/templates/role-templates.yaml index 34e81a9f7..45163e918 100644 --- a/deploy/charts/arcadia/templates/role-templates.yaml +++ b/deploy/charts/arcadia/templates/role-templates.yaml @@ -186,4 +186,54 @@ spec: - get - patch - update + - category: 智能体管理 + displayName: Prompt 权限 + rules: + - apiGroups: + - prompt.arcadia.kubeagi.k8s.com.cn + resources: + - prompts + verbs: + - create + - delete + - get + - list + - patch + - update + - category: 智能体管理 + displayName: Prompt 状态权限 + rules: + - apiGroups: + - prompt.arcadia.kubeagi.k8s.com.cn + resources: + - prompts/status + verbs: + - get + - patch + - update + - category: 智能体管理 + displayName: 向量数据库权限 + rules: + - apiGroups: + - arcadia.kubeagi.k8s.com.cn + resources: + - vectorstores + verbs: + - create + - delete + - get + - list + - patch + - update + - category: 智能体管理 + displayName: 向量数据库状态权限 + rules: + - apiGroups: + - arcadia.kubeagi.k8s.com.cn + resources: + - vectorstores/status + verbs: + - get + - patch + - update {{- end }} diff --git a/main.go b/main.go index dc65f2075..09ac62956 100644 --- a/main.go +++ b/main.go @@ -325,6 +325,7 @@ func main() { _ = mgr.AddMetricsExtraHandler("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) } + config.InitSystemClient(mgr.GetClient()) setupLog.Info("starting manager") if err := mgr.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") diff --git a/pkg/appruntime/documentloader/documentloader.go b/pkg/appruntime/documentloader/documentloader.go index 99f607735..dd3283d0b 100644 --- a/pkg/appruntime/documentloader/documentloader.go +++ b/pkg/appruntime/documentloader/documentloader.go @@ -75,7 +75,7 @@ func (dl *DocumentLoader) Run(ctx context.Context, cli client.Client, args map[s if err := cli.Get(ctx, types.NamespacedName{Namespace: dl.RefNamespace(), Name: dl.Ref.Name}, dl.Instance); err != nil { return args, fmt.Errorf("can't find the documentloader in cluster: %w", err) } - system, err := config.GetSystemDatasource(ctx, cli) + system, err := config.GetSystemDatasource(ctx) if err != nil { return nil, err } diff --git a/pkg/arctl/eval.go b/pkg/arctl/eval.go index 933367d71..d5214f8b1 100644 --- a/pkg/arctl/eval.go +++ b/pkg/arctl/eval.go @@ -259,7 +259,7 @@ var ( func SysatemDatasource(ctx context.Context, kubeClient client.Client) (*basev1alpha1.Datasource, error) { once.Do(func() { - systemDatasource, systemError = config.GetSystemDatasource(ctx, kubeClient) + systemDatasource, systemError = config.GetSystemDatasource(ctx) }) return systemDatasource, systemError } diff --git a/pkg/config/config.go b/pkg/config/config.go index b82ea9d3b..b974d1f66 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -45,7 +45,9 @@ var ( ErrNoConfigStreamlit = fmt.Errorf("config Streamlit in comfigmap is not found") ErrNoConfigRayClusters = fmt.Errorf("config RayClusters in comfigmap is not found") ErrNoConfigRerank = fmt.Errorf("config rerankDefaultEndpoint in comfigmap is not found") + ErrSystemCliNotFound = fmt.Errorf("systemCli is not found") ) +var systemCli client.Client func getDatasource(ctx context.Context, ref arcadiav1alpha1.TypedObjectReference, c client.Client) (ds *arcadiav1alpha1.Datasource, err error) { name := ref.Name @@ -57,24 +59,24 @@ func getDatasource(ctx context.Context, ref arcadiav1alpha1.TypedObjectReference return source, err } -func GetSystemDatasource(ctx context.Context, c client.Client) (*arcadiav1alpha1.Datasource, error) { - config, err := GetConfig(ctx, c) +func GetSystemDatasource(ctx context.Context) (*arcadiav1alpha1.Datasource, error) { + config, err := getConfig(ctx) if err != nil { return nil, err } - return getDatasource(ctx, config.SystemDatasource, c) + return getDatasource(ctx, config.SystemDatasource, systemCli) } -func GetRelationalDatasource(ctx context.Context, c client.Client) (*arcadiav1alpha1.Datasource, error) { - config, err := GetConfig(ctx, c) +func GetRelationalDatasource(ctx context.Context) (*arcadiav1alpha1.Datasource, error) { + config, err := getConfig(ctx) if err != nil { return nil, err } - return getDatasource(ctx, config.RelationalDatasource, c) + return getDatasource(ctx, config.RelationalDatasource, systemCli) } -func GetGateway(ctx context.Context, c client.Client) (*Gateway, error) { - config, err := GetConfig(ctx, c) +func GetGateway(ctx context.Context) (*Gateway, error) { + config, err := getConfig(ctx) if err != nil { return nil, err } @@ -84,14 +86,17 @@ func GetGateway(ctx context.Context, c client.Client) (*Gateway, error) { return config.Gateway, nil } -func GetConfig(ctx context.Context, c client.Client) (config *Config, err error) { +func getConfig(ctx context.Context) (config *Config, err error) { + if systemCli == nil { + return nil, ErrSystemCliNotFound + } cmName := env.GetString(EnvConfigKey, EnvConfigDefaultValue) if cmName == "" { return nil, ErrNoConfigEnv } cmNamespace := utils.GetCurrentNamespace() cm := &corev1.ConfigMap{} - if err = c.Get(ctx, client.ObjectKey{Name: cmName, Namespace: cmNamespace}, cm); err != nil { + if err = systemCli.Get(ctx, client.ObjectKey{Name: cmName, Namespace: cmNamespace}, cm); err != nil { return nil, err } value, ok := cm.Data["config"] @@ -105,8 +110,8 @@ func GetConfig(ctx context.Context, c client.Client) (config *Config, err error) } // GetEmbedder get the default embedder from config -func GetEmbedder(ctx context.Context, c client.Client) (*arcadiav1alpha1.TypedObjectReference, error) { - config, err := GetConfig(ctx, c) +func GetEmbedder(ctx context.Context) (*arcadiav1alpha1.TypedObjectReference, error) { + config, err := getConfig(ctx) if err != nil { return nil, err } @@ -117,8 +122,8 @@ func GetEmbedder(ctx context.Context, c client.Client) (*arcadiav1alpha1.TypedOb } // GetVectorStore get the default vector store from config -func GetVectorStore(ctx context.Context, c client.Client) (*arcadiav1alpha1.TypedObjectReference, error) { - config, err := GetConfig(ctx, c) +func GetVectorStore(ctx context.Context) (*arcadiav1alpha1.TypedObjectReference, error) { + config, err := getConfig(ctx) if err != nil { return nil, err } @@ -129,8 +134,8 @@ func GetVectorStore(ctx context.Context, c client.Client) (*arcadiav1alpha1.Type } // Get the configuration of streamlit tool -func GetStreamlit(ctx context.Context, c client.Client) (*Streamlit, error) { - config, err := GetConfig(ctx, c) +func GetStreamlit(ctx context.Context) (*Streamlit, error) { + config, err := getConfig(ctx) if err != nil { return nil, err } @@ -141,8 +146,8 @@ func GetStreamlit(ctx context.Context, c client.Client) (*Streamlit, error) { } // Get the ray cluster that can be used a resource pool -func GetRayClusters(ctx context.Context, c client.Client) ([]RayCluster, error) { - config, err := GetConfig(ctx, c) +func GetRayClusters(ctx context.Context) ([]RayCluster, error) { + config, err := getConfig(ctx) if err != nil { return nil, err } @@ -153,8 +158,8 @@ func GetRayClusters(ctx context.Context, c client.Client) ([]RayCluster, error) } // GetDefaultRerankModel gets the default reranking model which is recommended by kubeagi -func GetDefaultRerankModel(ctx context.Context, c client.Client) (*arcadiav1alpha1.TypedObjectReference, error) { - config, err := GetConfig(ctx, c) +func GetDefaultRerankModel(ctx context.Context) (*arcadiav1alpha1.TypedObjectReference, error) { + config, err := getConfig(ctx) if err != nil { return nil, err } @@ -164,8 +169,8 @@ func GetDefaultRerankModel(ctx context.Context, c client.Client) (*arcadiav1alph return config.Rerank, nil } -func GetSystemDatasourceOSS(ctx context.Context, mgrClient client.Client) (*datasource.OSS, error) { - systemDatasource, err := GetSystemDatasource(ctx, mgrClient) +func GetSystemDatasourceOSS(ctx context.Context) (*datasource.OSS, error) { + systemDatasource, err := GetSystemDatasource(ctx) if err != nil { return nil, err } @@ -173,5 +178,9 @@ func GetSystemDatasourceOSS(ctx context.Context, mgrClient client.Client) (*data if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil { endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace) } - return datasource.NewOSS(ctx, mgrClient, endpoint) + return datasource.NewOSS(ctx, systemCli, endpoint) +} + +func InitSystemClient(cli client.Client) { + systemCli = cli } diff --git a/pkg/evaluation/jobs.go b/pkg/evaluation/jobs.go index b51832ed7..92726bc8e 100644 --- a/pkg/evaluation/jobs.go +++ b/pkg/evaluation/jobs.go @@ -58,7 +58,7 @@ func PhaseJobName(instance *evav1alpha1.RAG, phase evav1alpha1.RAGPhase) string } func systemEmbeddingSuite(ctx context.Context, mgrClient client.Client) (*v1alpha1.Embedder, error) { // get the built-in system embedder - emd, err := config.GetEmbedder(ctx, mgrClient) + emd, err := config.GetEmbedder(ctx) if err != nil { return nil, err } @@ -303,7 +303,7 @@ func JudgeJobGenerator(ctx context.Context, c client.Client) func(*evav1alpha1.R func UploadJobGenerator(ctx context.Context, client client.Client) func(*evav1alpha1.RAG) (*batchv1.Job, error) { return func(instance *evav1alpha1.RAG) (*batchv1.Job, error) { - datasource, err := config.GetSystemDatasource(ctx, client) + datasource, err := config.GetSystemDatasource(ctx) if err != nil { return nil, err } diff --git a/pkg/langchainwrap/embedder.go b/pkg/langchainwrap/embedder.go index 2645f8630..9fbfc05ed 100644 --- a/pkg/langchainwrap/embedder.go +++ b/pkg/langchainwrap/embedder.go @@ -90,7 +90,7 @@ func GetLangchainEmbedder(ctx context.Context, e *v1alpha1.Embedder, c client.Cl return langchaingoembeddings.NewEmbedder(llm, opts...) } case v1alpha1.ProviderTypeWorker: - gateway, err := config.GetGateway(ctx, c) + gateway, err := config.GetGateway(ctx) if err != nil { return nil, err } diff --git a/pkg/langchainwrap/llm.go b/pkg/langchainwrap/llm.go index 12414fb89..a37a2b9bc 100644 --- a/pkg/langchainwrap/llm.go +++ b/pkg/langchainwrap/llm.go @@ -78,7 +78,7 @@ func GetLangchainLLM(ctx context.Context, llm *v1alpha1.LLM, c client.Client, mo return googleLLM, nil } case v1alpha1.ProviderTypeWorker: - gateway, err := config.GetGateway(ctx, c) + gateway, err := config.GetGateway(ctx) if err != nil { return nil, err } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 2335ef34d..8ad9344cf 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -49,7 +49,7 @@ func NewScheduler(ctx context.Context, c client.Client, instance *v1alpha1.Versi // TODO: Currently, we think there is only one default minio environment, // so we get the minio client directly through the configuration. - systemDatasource, err := config.GetSystemDatasource(ctx1, c) + systemDatasource, err := config.GetSystemDatasource(ctx1) if err != nil { klog.Errorf("generate new scheduler error %s", err) cancel() diff --git a/pkg/streamlit/deployer.go b/pkg/streamlit/deployer.go index 78ea59258..49e910d61 100644 --- a/pkg/streamlit/deployer.go +++ b/pkg/streamlit/deployer.go @@ -65,7 +65,7 @@ func (st *StreamlitDeployer) Install() error { namespace := st.namespace.Name // lookup streamlit image from config - streamlitConfig, err := config.GetStreamlit(st.ctx, st.client) + streamlitConfig, err := config.GetStreamlit(st.ctx) if err != nil { klog.Errorln("failed to get streamlit config", err) return err diff --git a/pkg/worker/runner.go b/pkg/worker/runner.go index d2c838d99..a7a3f38d8 100644 --- a/pkg/worker/runner.go +++ b/pkg/worker/runner.go @@ -88,7 +88,7 @@ func (runner *RunnerFastchat) Build(ctx context.Context, model *arcadiav1alpha1. if model == nil { return nil, errors.New("nil model") } - gw, err := config.GetGateway(ctx, runner.c) + gw, err := config.GetGateway(ctx) if err != nil { return nil, fmt.Errorf("failed to get arcadia config with %w", err) } @@ -188,7 +188,7 @@ func (runner *RunnerFastchatVLLM) Build(ctx context.Context, model *arcadiav1alp if model == nil { return nil, errors.New("nil model") } - gw, err := config.GetGateway(ctx, runner.c) + gw, err := config.GetGateway(ctx) if err != nil { return nil, fmt.Errorf("failed to get arcadia config with %w", err) } @@ -205,7 +205,7 @@ func (runner *RunnerFastchatVLLM) Build(ctx context.Context, model *arcadiav1alp // using existing ray cluster if envItem.Name == "RAY_CLUSTER_INDEX" { externalRayClusterIndex, _ := strconv.Atoi(envItem.Value) - rayClusters, err := config.GetRayClusters(ctx, runner.c) + rayClusters, err := config.GetRayClusters(ctx) if err != nil || len(rayClusters) == 0 { return nil, fmt.Errorf("failed to find ray clusters: %s", err.Error()) }