From 65aa753230ad667e4aeca67d4222f5c6bb35127f Mon Sep 17 00:00:00 2001 From: bjwswang Date: Mon, 4 Mar 2024 10:46:18 +0000 Subject: [PATCH] feat(worker): add RunnerKubeAGI(developed by kubeagi team) to host reranking models Signed-off-by: bjwswang --- api/base/v1alpha1/worker.go | 1 + .../arcadia_v1alpha1_worker_reranking.yaml | 32 +++++++++ pkg/worker/runner.go | 67 +++++++++++++++++++ pkg/worker/worker.go | 13 +++- 4 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 config/samples/arcadia_v1alpha1_worker_reranking.yaml diff --git a/api/base/v1alpha1/worker.go b/api/base/v1alpha1/worker.go index b4e39ed01..aa8c18983 100644 --- a/api/base/v1alpha1/worker.go +++ b/api/base/v1alpha1/worker.go @@ -30,6 +30,7 @@ type WorkerType string const ( WorkerTypeFastchatNormal WorkerType = "fastchat" WorkerTypeFastchatVLLM WorkerType = "fastchat-vllm" + WorkerTypeKubeAGI WorkerType = "kubeagi" WorkerTypeUnknown WorkerType = "unknown" ) diff --git a/config/samples/arcadia_v1alpha1_worker_reranking.yaml b/config/samples/arcadia_v1alpha1_worker_reranking.yaml new file mode 100644 index 000000000..e96c0ae71 --- /dev/null +++ b/config/samples/arcadia_v1alpha1_worker_reranking.yaml @@ -0,0 +1,32 @@ +apiVersion: arcadia.kubeagi.k8s.com.cn/v1alpha1 +kind: Model +metadata: + name: bge-reranker-large + namespace: arcadia +spec: + displayName: "bge-reranker-large" + description: | + bge-raranker-large 是一个通用reranking,由北京智源人工智能研究院(BAAI)推出,同时支持中英文 + + 官网: https://www.baai.ac.cn/ + Github: https://github.com/FlagOpen/FlagEmbedding + HuggingFace: https://huggingface.co/BAAI/bge-reranker-large + arXiv: https://arxiv.org/pdf/2309.07597 + + 北京智源人工智能研究院是北京大学的直属研究机构,主要从事人工智能的数理基础、机器学习、智能信息检索与挖掘、智能体系架构与芯片、自然语言处理等领域研究。 + types: "reranking" + huggingFaceRepo: BAAI/bge-reranker-large +--- +apiVersion: arcadia.kubeagi.k8s.com.cn/v1alpha1 +kind: Worker +metadata: + name: bge-reranker-large + namespace: arcadia +spec: + displayName: BGE Reranking模型 + description: "bge-raranker-large 是一个通用reranking,由北京智源人工智能研究院(BAAI)推出" + type: "kubeagi" + model: + kind: "Models" + name: "bge-reranker-large" + replicas: 1 \ No newline at end of file diff --git a/pkg/worker/runner.go b/pkg/worker/runner.go index 2bb2e9b23..96c9cbaad 100644 --- a/pkg/worker/runner.go +++ b/pkg/worker/runner.go @@ -35,12 +35,16 @@ const ( // tag is the same version as fastchat defaultFastChatImage = "kubeagi/arcadia-fastchat-worker:v0.2.36" defaultFastchatVLLMImage = "kubeagi/arcadia-fastchat-worker:vllm-v0.2.36" + // defaultKubeAGIImage for RunnerKubeAGI + defaultKubeAGIImage = "kubeagi/core-library-cli:v0.0.1" ) // ModelRunner run a model service type ModelRunner interface { // Device used when running model Device() Device + // NumberOfGPUs used when running model + NumberOfGPUs() string // Build a model runner instance Build(ctx context.Context, model *arcadiav1alpha1.TypedObjectReference) (any, error) } @@ -63,6 +67,7 @@ func NewRunnerFastchat(c client.Client, w *arcadiav1alpha1.Worker, modelFileFrom }, nil } +// Device utilized by this runner func (runner *RunnerFastchat) Device() Device { return DeviceBasedOnResource(runner.w.Spec.Resources.Limits) } @@ -255,3 +260,65 @@ func (runner *RunnerFastchatVLLM) Build(ctx context.Context, model *arcadiav1alp return container, nil } + +var _ ModelRunner = (*KubeAGIRunner)(nil) + +// KubeAGIRunner utilizes core-library-cli(https://github.com/kubeagi/core-library/tree/main/libs/cli) to run model services +// Mainly for reranking,whisper,etc.. +type KubeAGIRunner struct { + c client.Client + w *arcadiav1alpha1.Worker +} + +func NewKubeAGIRunner(c client.Client, w *arcadiav1alpha1.Worker) (ModelRunner, error) { + return &KubeAGIRunner{ + c: c, + w: w, + }, nil +} + +// Device used when running model +func (runner *KubeAGIRunner) Device() Device { + return DeviceBasedOnResource(runner.w.Spec.Resources.Limits) +} + +// NumberOfGPUs utilized by this runner +func (runner *KubeAGIRunner) NumberOfGPUs() string { + return NumberOfGPUs(runner.w.Spec.Resources.Limits) +} + +// Build a model runner instance +func (runner *KubeAGIRunner) Build(ctx context.Context, model *arcadiav1alpha1.TypedObjectReference) (any, error) { + if model == nil { + return nil, errors.New("nil model") + } + + img := defaultKubeAGIImage + if runner.w.Spec.Runner.Image != "" { + img = runner.w.Spec.Runner.Image + } + + // read worker address + mountPath := "/data/models" + container := &corev1.Container{ + Name: "runner", + Image: img, + ImagePullPolicy: runner.w.Spec.Runner.ImagePullPolicy, + Command: []string{ + "python", "kubeagi_cli/cli.py", "serve", "--host", "0.0.0.0", "--port", "21002", + }, + Env: []corev1.EnvVar{ + // Only reranking supported for now + {Name: "RERANKING_MODEL_PATH", Value: fmt.Sprintf("%s/%s", mountPath, model.Name)}, + }, + Ports: []corev1.ContainerPort{ + {Name: "http", ContainerPort: 21002}, + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "models", MountPath: mountPath}, + }, + Resources: runner.w.Spec.Resources, + } + + return container, nil +} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 5e3f5e133..7268c531e 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -228,7 +228,6 @@ func NewPodWorker(ctx context.Context, c client.Client, s *runtime.Scheme, w *ar return nil, fmt.Errorf("datasource %s with type %s not supported in worker", d.Name, d.Spec.Type()) } - // init runner return podWorker, nil } @@ -245,6 +244,12 @@ func (podWorker *PodWorker) Model() *arcadiav1alpha1.Model { // Now we have a pvc(if configured), service, LLM(if a llm model), Embedder(if a embedding model) func (podWorker *PodWorker) BeforeStart(ctx context.Context) error { var err error + + // Capability Checks + if podWorker.Model().IsRerankingModel() && podWorker.Worker().Type() != arcadiav1alpha1.WorkerTypeKubeAGI { + return errors.New("only kubeagi runner can host reranking models") + } + // If the local directory is mounted, there is no need to create the pvc if podWorker.Worker().Spec.Storage != nil && podWorker.storage.HostPath == nil { pvc := &corev1.PersistentVolumeClaim{ @@ -387,6 +392,12 @@ func (podWorker *PodWorker) Start(ctx context.Context) error { return fmt.Errorf("failed to new a runner with %w", err) } podWorker.r = r + case arcadiav1alpha1.WorkerTypeKubeAGI: + r, err := NewKubeAGIRunner(podWorker.c, podWorker.w.DeepCopy()) + if err != nil { + return fmt.Errorf("failed to new a runner with %w", err) + } + podWorker.r = r default: return fmt.Errorf("worker %s with type %s not supported in worker", podWorker.w.Name, podWorker.w.Type()) }