Skip to content

Commit

Permalink
fix: use system client to get config
Browse files Browse the repository at this point in the history
Signed-off-by: Abirdcfly <fp544037857@gmail.com>
  • Loading branch information
Abirdcfly committed Apr 8, 2024
1 parent 5525fc8 commit b780f8e
Show file tree
Hide file tree
Showing 29 changed files with 138 additions and 65 deletions.
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN go env -w GOPROXY=${GOPROXY}
RUN go mod download
RUN go mod download -x

# Copy the go source
COPY main.go main.go
Expand All @@ -19,8 +19,8 @@ COPY pkg/ pkg/
COPY apiserver/ apiserver/

# Build
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o manager main.go
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o apiserver-bin apiserver/main.go
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -a -o manager main.go
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o apiserver-bin apiserver/main.go

# Use alpine as minimal base image to package the manager binary
FROM alpine:3.19.1
Expand Down
2 changes: 1 addition & 1 deletion apiserver/pkg/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ func UploadIcon(ctx context.Context, client client.Client, icon, appName, namesp
return "", err
}

system, err := config.GetSystemDatasource(ctx, client)
system, err := config.GetSystemDatasource(ctx)
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion apiserver/pkg/chat/chat_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (cs *ChatServer) BuildConversationKnowledgeBase(ctx context.Context, req Co
return err
}
// systemDatasource which stores the document
systemDatasource, err := config.GetSystemDatasource(ctx, cs.cli)
systemDatasource, err := config.GetSystemDatasource(ctx)
if err != nil {
return err
}
Expand Down
18 changes: 13 additions & 5 deletions apiserver/pkg/chat/chat_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,21 @@ type ChatServer struct {
cli runtimeclient.Client
storage storage.Storage
once sync.Once
isGpts bool
}

func NewChatServer(cli runtimeclient.Client) *ChatServer {
func NewChatServer(cli runtimeclient.Client, isGpts bool) *ChatServer {
return &ChatServer{
cli: cli,
cli: cli,
isGpts: isGpts,
}
}

func (cs *ChatServer) Storage() storage.Storage {
if cs.storage == nil {
cs.once.Do(func() {
ctx := context.TODO()
ds, err := pkgconfig.GetRelationalDatasource(ctx, cs.cli)
ds, err := pkgconfig.GetRelationalDatasource(ctx)
if err != nil || ds == nil {
if err != nil {
klog.Infof("get relational datasource failed: %s, use memory storage for chat", err.Error())
Expand Down Expand Up @@ -183,7 +185,7 @@ func (cs *ChatServer) AppRun(ctx context.Context, req ChatReqBody, respStream ch

func (cs *ChatServer) ListConversations(ctx context.Context, req APPMetadata) ([]storage.Conversation, error) {
currentUser, _ := ctx.Value(auth.UserNameContextKey).(string)
return cs.Storage().ListConversations(storage.WithAppNamespace(req.AppNamespace), storage.WithAppName(req.APPName), storage.WithUser(currentUser), storage.WithUser(currentUser))
return cs.Storage().ListConversations(storage.WithAppNamespace(req.AppNamespace), storage.WithAppName(req.APPName), storage.WithUser(currentUser))
}

func (cs *ChatServer) DeleteConversation(ctx context.Context, conversationID string) error {
Expand Down Expand Up @@ -390,7 +392,13 @@ The question you asked is:`

func (cs *ChatServer) GetApp(ctx context.Context, appName, appNamespace string) (*v1alpha1.Application, runtimeclient.Client, error) {
token := auth.ForOIDCToken(ctx)
c, err := client.GetClient(token)
var c runtimeclient.Client
var err error
if !cs.isGpts {
c, err = client.GetClient(token)
} else {
c = cs.cli
}
if err != nil {
return nil, nil, fmt.Errorf("failed to get a client: %w", err)
}
Expand Down
8 changes: 4 additions & 4 deletions apiserver/pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var (
)

func SystemDatasourceOSS(ctx context.Context, mgrClient client.Client) (*datasource.OSS, error) {
systemDatasource, err := config.GetSystemDatasource(ctx, mgrClient)
systemDatasource, err := config.GetSystemDatasource(ctx)
if err != nil {
return nil, err
}
Expand All @@ -77,7 +77,7 @@ func SystemDatasourceOSS(ctx context.Context, mgrClient client.Client) (*datasou
// Embedder and vectorstore are both required when generating a new embedding.That's why we call it a `EmbeddingSuit`
func SystemEmbeddingSuite(ctx context.Context, cli client.Client) (*v1alpha1.Embedder, *v1alpha1.VectorStore, error) {
// get the built-in system embedder
emd, err := config.GetEmbedder(ctx, cli)
emd, err := config.GetEmbedder(ctx)
if err != nil {
return nil, nil, err
}
Expand All @@ -86,7 +86,7 @@ func SystemEmbeddingSuite(ctx context.Context, cli client.Client) (*v1alpha1.Emb
return nil, nil, err
}
// get the built-in system vectorstore
vs, err := config.GetVectorStore(ctx, cli)
vs, err := config.GetVectorStore(ctx)
if err != nil {
return nil, nil, err
}
Expand All @@ -100,7 +100,7 @@ func SystemEmbeddingSuite(ctx context.Context, cli client.Client) (*v1alpha1.Emb
// GetAPIServer returns the api server url to access arcadia's worker
// if external is true,then this func will return the external api server
func GetAPIServer(ctx context.Context, cli client.Client, external bool) (string, error) {
gateway, err := config.GetGateway(ctx, cli)
gateway, err := config.GetGateway(ctx)
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion apiserver/pkg/gpt/gpt.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func getHot(app *v1alpha1.Application, cli client.Client) int64 {
if chatStorage == nil {
once.Do(
func() {
chatStorage = chat.NewChatServer(cli).Storage()
chatStorage = chat.NewChatServer(cli, true).Storage()
})
}
if chatStorage == nil {
Expand Down
2 changes: 1 addition & 1 deletion apiserver/pkg/knowledgebase/knowledgebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func knowledgebase2model(ctx context.Context, c client.Client, knowledgebase *v1
func CreateKnowledgeBase(ctx context.Context, c client.Client, input generated.CreateKnowledgeBaseInput) (*generated.KnowledgeBase, error) {
var filegroups []v1alpha1.FileGroup
var vectorstore v1alpha1.TypedObjectReference
vector, _ := config.GetVectorStore(ctx, c)
vector, _ := config.GetVectorStore(ctx)
displayname, description, embedder := "", "", ""
if input.DisplayName != nil {
displayname = *input.DisplayName
Expand Down
2 changes: 1 addition & 1 deletion apiserver/pkg/ray/raycluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func ListRayClusters(ctx context.Context, c client.Client, input generated.ListCommonInput) (*generated.PaginatedResult, error) {
clusters, err := config.GetRayClusters(ctx, c)
clusters, err := config.GetRayClusters(ctx)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions apiserver/service/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ type ChatService struct {
server *chat.ChatServer
}

func NewChatService(cli runtimeclient.Client) (*ChatService, error) {
return &ChatService{chat.NewChatServer(cli)}, nil
func NewChatService(cli runtimeclient.Client, isGpts bool) (*ChatService, error) {
return &ChatService{chat.NewChatServer(cli, isGpts)}, nil
}

// @Summary chat with application
Expand Down Expand Up @@ -450,7 +450,7 @@ func registerChat(g *gin.RouterGroup, conf config.ServerConfig) {
panic(err)
}

chatService, err := NewChatService(c)
chatService, err := NewChatService(c, false)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion apiserver/service/gpts.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func registerGptsChat(g *gin.RouterGroup, conf config.ServerConfig) {
panic(err)
}

chatService, err := NewChatService(c)
chatService, err := NewChatService(c, true)
if err != nil {
panic(err)
}
Expand Down
4 changes: 4 additions & 0 deletions apiserver/service/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (

"github.com/kubeagi/arcadia/apiserver/config"
"github.com/kubeagi/arcadia/apiserver/docs"
"github.com/kubeagi/arcadia/apiserver/pkg/client"
"github.com/kubeagi/arcadia/apiserver/pkg/oidc"
pkgconfig "github.com/kubeagi/arcadia/pkg/config"
)

func Cors() gin.HandlerFunc {
Expand Down Expand Up @@ -55,6 +57,8 @@ func NewServerAndRun(conf config.ServerConfig) {
if conf.EnableOIDC {
oidc.InitOIDCArgs(conf.IssuerURL, conf.MasterURL, conf.ClientSecret, conf.ClientID)
}
systemcli, _ := client.GetClient(nil)
pkgconfig.InitSystemClient(systemcli)

bffGroup := r.Group("/bff")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (r *RerankRetrieverReconciler) reconcile(ctx context.Context, log logr.Logg
}
}
if instance.Spec.Model == nil {
model, err := config.GetDefaultRerankModel(ctx, r.Client)
model, err := config.GetDefaultRerankModel(ctx)
if err != nil {
instance.Status.SetConditions(instance.Status.ErrorCondition(fmt.Sprintf("no model provided. please set model in reranker or set system default reranking model in config :%s", err))...)
return instance, ctrl.Result{RequeueAfter: 30 * time.Second}, err
Expand Down
5 changes: 3 additions & 2 deletions controllers/base/knowledgebase_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(
kb *arcadiav1alpha1.KnowledgeBase,
vectorStore *arcadiav1alpha1.VectorStore,
embedder *arcadiav1alpha1.Embedder,
groupIndex, fileIndex int) (err error) {
groupIndex, fileIndex int,
) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("failed to reconcile FileGroup: %w", err)
Expand Down Expand Up @@ -436,7 +437,7 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(
if !versionedDataset.Status.IsReady() {
return errDataSourceNotReady
}
system, err := config.GetSystemDatasource(ctx, r.Client)
system, err := config.GetSystemDatasource(ctx)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions controllers/base/model_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (r *ModelReconciler) CheckModel(ctx context.Context, logger logr.Logger, in
// otherwise we consider the model file for the trans-core service to be ready.
if instance.Spec.Source == nil && (instance.Spec.HuggingFaceRepo == "" && instance.Spec.ModelScopeRepo == "") {
logger.V(5).Info(fmt.Sprintf("model %s source is empty, check minio status.", instance.Name))
system, err := config.GetSystemDatasource(ctx, r.Client)
system, err := config.GetSystemDatasource(ctx)
if err != nil {
return r.UpdateStatus(ctx, instance, err)
}
Expand Down Expand Up @@ -223,7 +223,7 @@ func (r *ModelReconciler) RemoveModel(ctx context.Context, logger logr.Logger, i
var ds datasource.Datasource
var info any

system, err := config.GetSystemDatasource(ctx, r.Client)
system, err := config.GetSystemDatasource(ctx)
if err != nil {
return r.UpdateStatus(ctx, instance, err)
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/base/namespace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (r *NamespaceReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

func (r *NamespaceReconciler) ossClient(ctx context.Context) (*datasource.OSS, error) {
systemDatasource, err := config.GetSystemDatasource(ctx, r.Client)
systemDatasource, err := config.GetSystemDatasource(ctx)
if err != nil {
klog.Errorf("get system datasource error %s", err)
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions controllers/base/versioneddataset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (r *VersionedDatasetReconciler) preUpdate(ctx context.Context, logger logr.
func (r *VersionedDatasetReconciler) checkStatus(ctx context.Context, logger logr.Logger, instance *v1alpha1.VersionedDataset) (bool, []v1alpha1.FileStatus, error) {
// TODO: Currently, we think there is only one default minio environment,
// so we get the minio client directly through the configuration.
systemDatasource, err := config.GetSystemDatasource(ctx, r.Client)
systemDatasource, err := config.GetSystemDatasource(ctx)
if err != nil {
logger.Error(err, "Failed to get system datasource")
return false, nil, err
Expand All @@ -233,7 +233,7 @@ func (r *VersionedDatasetReconciler) checkStatus(ctx context.Context, logger log
}

func (r *VersionedDatasetReconciler) removeBucketFiles(ctx context.Context, logger logr.Logger, instance *v1alpha1.VersionedDataset) error {
systemDatasource, err := config.GetSystemDatasource(ctx, r.Client)
systemDatasource, err := config.GetSystemDatasource(ctx)
if err != nil {
logger.Error(err, "Failed to get system datasource")
return err
Expand Down
2 changes: 1 addition & 1 deletion controllers/base/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (r *WorkerReconciler) reconcile(ctx context.Context, logger logr.Logger, wo
return worker, errors.Wrap(err, "model config datasource, but get it failed.")
}
} else {
datasource, err = config.GetSystemDatasource(ctx, r.Client)
datasource, err = config.GetSystemDatasource(ctx)
if err != nil {
return worker, errors.Wrap(err, "Failed to get system datasource")
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/evaluation/rag_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (r *RAGReconciler) WhenJobChanged(job *batchv1.Job) {

func (r *RAGReconciler) RemoveRAGFiles(ctx context.Context, rag *evaluationarcadiav1alpha1.RAG) {
logger := log.FromContext(ctx, "RAG", rag.Name, "Namespace", rag.Namespace, "Action", "DeleteRAGFiles")
systemDatasource, err := config.GetSystemDatasource(ctx, r.Client)
systemDatasource, err := config.GetSystemDatasource(ctx)
if err != nil {
logger.Error(err, "failed to get system datasource")
return
Expand Down
50 changes: 50 additions & 0 deletions deploy/charts/arcadia/templates/role-templates.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,54 @@ spec:
- get
- patch
- update
- category: 智能体管理
displayName: Prompt 权限
rules:
- apiGroups:
- prompt.arcadia.kubeagi.k8s.com.cn
resources:
- prompts
verbs:
- create
- delete
- get
- list
- patch
- update
- category: 智能体管理
displayName: Prompt 状态权限
rules:
- apiGroups:
- prompt.arcadia.kubeagi.k8s.com.cn
resources:
- prompts/status
verbs:
- get
- patch
- update
- category: 智能体管理
displayName: 向量数据库权限
rules:
- apiGroups:
- arcadia.kubeagi.k8s.com.cn
resources:
- vectorstores
verbs:
- create
- delete
- get
- list
- patch
- update
- category: 智能体管理
displayName: 向量数据库状态权限
rules:
- apiGroups:
- arcadia.kubeagi.k8s.com.cn
resources:
- vectorstores/status
verbs:
- get
- patch
- update
{{- end }}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func main() {
_ = mgr.AddMetricsExtraHandler("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
}

config.InitSystemClient(mgr.GetClient())
setupLog.Info("starting manager")
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
Expand Down
2 changes: 1 addition & 1 deletion pkg/appruntime/documentloader/documentloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (dl *DocumentLoader) Run(ctx context.Context, cli client.Client, args map[s
if err := cli.Get(ctx, types.NamespacedName{Namespace: dl.RefNamespace(), Name: dl.Ref.Name}, dl.Instance); err != nil {
return args, fmt.Errorf("can't find the documentloader in cluster: %w", err)
}
system, err := config.GetSystemDatasource(ctx, cli)
system, err := config.GetSystemDatasource(ctx)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/arctl/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ var (

func SysatemDatasource(ctx context.Context, kubeClient client.Client) (*basev1alpha1.Datasource, error) {
once.Do(func() {
systemDatasource, systemError = config.GetSystemDatasource(ctx, kubeClient)
systemDatasource, systemError = config.GetSystemDatasource(ctx)
})
return systemDatasource, systemError
}
Expand Down
Loading

0 comments on commit b780f8e

Please sign in to comment.