From 817934c9c57b7c3b516337e20090f620db799e31 Mon Sep 17 00:00:00 2001 From: Lynwee Date: Thu, 30 Nov 2023 14:58:04 +0800 Subject: [PATCH] feat(api): add sanitizer to remove sensitive fields (#6538) * feat(api): add sanitizer to remove sensitive fields * refactor(api): fix lint errors --- backend/plugins/starrocks/api/connection.go | 2 +- backend/server/api/blueprints/blueprints.go | 12 +-- backend/server/api/pipelines/pipelines.go | 17 ++++- backend/server/api/task/task.go | 2 +- backend/server/services/blueprint.go | 51 ++++++++++--- backend/server/services/pipeline.go | 83 +++++++++++++++++++-- backend/server/services/pipeline_runner.go | 2 +- backend/server/services/project.go | 2 +- backend/server/services/task.go | 19 ++++- 9 files changed, 157 insertions(+), 33 deletions(-) diff --git a/backend/plugins/starrocks/api/connection.go b/backend/plugins/starrocks/api/connection.go index 30fc6749ce6..078d454e8b0 100644 --- a/backend/plugins/starrocks/api/connection.go +++ b/backend/plugins/starrocks/api/connection.go @@ -43,7 +43,7 @@ type StarRocksPipelinePlan [][]struct { Host string `json:"host"` Port int `json:"port"` User string `json:"user"` - Password string `json:"password"` + Password string `json:"password"` // notice: Database string `json:"database"` BeHost string `json:"be_host"` BePort int `json:"be_port"` diff --git a/backend/server/api/blueprints/blueprints.go b/backend/server/api/blueprints/blueprints.go index b4f7410defc..338fc34b93e 100644 --- a/backend/server/api/blueprints/blueprints.go +++ b/backend/server/api/blueprints/blueprints.go @@ -45,19 +45,16 @@ type PaginatedBlueprint struct { // @Router /blueprints [post] func Post(c *gin.Context) { blueprint := &models.Blueprint{} - err := c.ShouldBind(blueprint) if err != nil { shared.ApiOutputError(c, errors.BadInput.Wrap(err, shared.BadRequestBody)) return } - err = services.CreateBlueprint(blueprint) if err != nil { shared.ApiOutputError(c, errors.Default.Wrap(err, "error creating blueprint")) return } - shared.ApiOutputSuccess(c, blueprint, http.StatusCreated) } @@ -80,7 +77,7 @@ func Index(c *gin.Context) { shared.ApiOutputError(c, errors.BadInput.Wrap(err, shared.BadRequestBody)) return } - blueprints, count, err := services.GetBlueprints(&query) + blueprints, count, err := services.GetBlueprints(&query, true) if err != nil { shared.ApiOutputAbort(c, errors.Default.Wrap(err, "error getting blueprints")) return @@ -104,7 +101,7 @@ func Get(c *gin.Context) { shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad blueprintId format supplied")) return } - blueprint, err := services.GetBlueprint(id) + blueprint, err := services.GetBlueprint(id, true) if err != nil { shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting blueprint")) return @@ -193,7 +190,7 @@ func Trigger(c *gin.Context) { return } } - pipeline, err := services.TriggerBlueprint(id, syncPolicy) + pipeline, err := services.TriggerBlueprint(id, syncPolicy, true) if err != nil { shared.ApiOutputError(c, errors.Default.Wrap(err, "error triggering blueprint")) return @@ -222,8 +219,7 @@ func GetBlueprintPipelines(c *gin.Context) { shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad request URI format")) return } - - pipelines, count, err := services.GetPipelines(&query) + pipelines, count, err := services.GetPipelines(&query, true) if err != nil { shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting pipelines")) return diff --git a/backend/server/api/pipelines/pipelines.go b/backend/server/api/pipelines/pipelines.go index 186aa9931a6..af533bab8aa 100644 --- a/backend/server/api/pipelines/pipelines.go +++ b/backend/server/api/pipelines/pipelines.go @@ -49,7 +49,7 @@ func Post(c *gin.Context) { return } - pipeline, err := services.CreatePipeline(newPipeline) + pipeline, err := services.CreatePipeline(newPipeline, true) // Return all created tasks to the User if err != nil { shared.ApiOutputError(c, errors.Default.Wrap(err, "error creating pipeline")) @@ -78,7 +78,7 @@ func Index(c *gin.Context) { shared.ApiOutputError(c, errors.BadInput.Wrap(err, shared.BadRequestBody)) return } - pipelines, count, err := services.GetPipelines(&query) + pipelines, count, err := services.GetPipelines(&query, true) if err != nil { shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting pipelines")) return @@ -107,7 +107,7 @@ func Get(c *gin.Context) { shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad pipelineID format supplied")) return } - pipeline, err := services.GetPipeline(id) + pipeline, err := services.GetPipeline(id, true) if err != nil { shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting pipeline")) return @@ -154,7 +154,7 @@ func DownloadLogs(c *gin.Context) { shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad pipeline ID format supplied")) return } - pipeline, err := services.GetPipeline(id) + pipeline, err := services.GetPipeline(id, true) if err != nil { shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting pipeline")) return @@ -189,5 +189,14 @@ func PostRerun(c *gin.Context) { shared.ApiOutputError(c, errors.Default.Wrap(err, "failed to rerun pipeline")) return } + for idx, task := range rerunTasks { + taskOption, err := services.SanitizePluginOption(task.Plugin, task.Options) + if err != nil { + shared.ApiOutputError(c, errors.Default.Wrap(err, "failed to sanitize task")) + return + } + task.Options = taskOption + rerunTasks[idx] = task + } shared.ApiOutputSuccess(c, rerunTasks, http.StatusOK) } diff --git a/backend/server/api/task/task.go b/backend/server/api/task/task.go index 07fe7b8d458..cd5a02f377b 100644 --- a/backend/server/api/task/task.go +++ b/backend/server/api/task/task.go @@ -63,7 +63,7 @@ func GetTaskByPipeline(c *gin.Context) { shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid pipeline ID format")) return } - tasks, err := services.GetTasksWithLastStatus(pipelineId) + tasks, err := services.GetTasksWithLastStatus(pipelineId, true) if err != nil { shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting tasks")) return diff --git a/backend/server/services/blueprint.go b/backend/server/services/blueprint.go index 34eef95d9fa..4dd94b9f6f2 100644 --- a/backend/server/services/blueprint.go +++ b/backend/server/services/blueprint.go @@ -76,6 +76,9 @@ func CreateBlueprint(blueprint *models.Blueprint) errors.Error { if err != nil { return err } + if err := SanitizeBlueprint(blueprint); err != nil { + return errors.Convert(err) + } err = ReloadBlueprints(cronManager) if err != nil { return errors.Internal.Wrap(err, "error reloading blueprints") @@ -84,8 +87,8 @@ func CreateBlueprint(blueprint *models.Blueprint) errors.Error { } // GetBlueprints returns a paginated list of Blueprints based on `query` -func GetBlueprints(query *BlueprintQuery) ([]*models.Blueprint, int64, errors.Error) { - return bpManager.GetDbBlueprints(&services.GetBlueprintQuery{ +func GetBlueprints(query *BlueprintQuery, shouldSanitize bool) ([]*models.Blueprint, int64, errors.Error) { + blueprints, count, err := bpManager.GetDbBlueprints(&services.GetBlueprintQuery{ Enable: query.Enable, IsManual: query.IsManual, Label: query.Label, @@ -93,10 +96,23 @@ func GetBlueprints(query *BlueprintQuery) ([]*models.Blueprint, int64, errors.Er PageSize: query.GetPageSize(), Type: query.Type, }) + if err != nil { + return nil, 0, err + } + if shouldSanitize { + for idx, bp := range blueprints { + if err := SanitizeBlueprint(bp); err != nil { + return nil, 0, errors.Convert(err) + } else { + blueprints[idx] = bp + } + } + } + return blueprints, count, nil } // GetBlueprint returns the detail of a given Blueprint ID -func GetBlueprint(blueprintId uint64) (*models.Blueprint, errors.Error) { +func GetBlueprint(blueprintId uint64, shouldSanitize bool) (*models.Blueprint, errors.Error) { blueprint, err := bpManager.GetDbBlueprint(blueprintId) if err != nil { if db.IsErrorNotFound(err) { @@ -104,6 +120,11 @@ func GetBlueprint(blueprintId uint64) (*models.Blueprint, errors.Error) { } return nil, errors.Internal.Wrap(err, "error getting the blueprint from database") } + if shouldSanitize { + if err := SanitizeBlueprint(blueprint); err != nil { + return nil, errors.Convert(err) + } + } return blueprint, nil } @@ -194,7 +215,7 @@ func saveBlueprint(blueprint *models.Blueprint) (*models.Blueprint, errors.Error // PatchBlueprint FIXME ... func PatchBlueprint(id uint64, body map[string]interface{}) (*models.Blueprint, errors.Error) { // load record from db - blueprint, err := GetBlueprint(id) + blueprint, err := GetBlueprint(id, false) if err != nil { return nil, err } @@ -219,7 +240,9 @@ func PatchBlueprint(id uint64, body map[string]interface{}) (*models.Blueprint, if err != nil { return nil, err } - + if err := SanitizeBlueprint(blueprint); err != nil { + return nil, errors.Convert(err) + } return blueprint, nil } @@ -311,7 +334,7 @@ func createPipelineByBlueprint(blueprint *models.Blueprint, syncPolicy *models.S if !shouldCreatePipeline { return nil, ErrEmptyPlan } - pipeline, err := CreatePipeline(&newPipeline) + pipeline, err := CreatePipeline(&newPipeline, false) // Return all created tasks to the User if err != nil { blueprintLog.Error(err, fmt.Sprintf("%s on blueprint:[%d][%s]", failToCreateCronJob, blueprint.ID, blueprint.Name)) @@ -376,15 +399,23 @@ func SequencializePipelinePlans(plans ...models.PipelinePlan) models.PipelinePla } // TriggerBlueprint triggers blueprint immediately -func TriggerBlueprint(id uint64, syncPolicy *models.SyncPolicy) (*models.Pipeline, errors.Error) { +func TriggerBlueprint(id uint64, syncPolicy *models.SyncPolicy, shouldSanitize bool) (*models.Pipeline, errors.Error) { // load record from db - blueprint, err := GetBlueprint(id) + blueprint, err := GetBlueprint(id, false) if err != nil { logger.Error(err, "GetBlueprint, id: %d", id) return nil, err } blueprint.SkipCollectors = syncPolicy.SkipCollectors blueprint.FullSync = syncPolicy.FullSync - - return createPipelineByBlueprint(blueprint, syncPolicy) + pipeline, err := createPipelineByBlueprint(blueprint, syncPolicy) + if err != nil { + return nil, err + } + if shouldSanitize { + if err := SanitizePipeline(pipeline); err != nil { + return nil, errors.Convert(err) + } + } + return pipeline, nil } diff --git a/backend/server/services/pipeline.go b/backend/server/services/pipeline.go index 0855341dd60..78edfe7b0f8 100644 --- a/backend/server/services/pipeline.go +++ b/backend/server/services/pipeline.go @@ -20,6 +20,8 @@ package services import ( "context" "fmt" + "github.com/spf13/cast" + "net/url" "os" "path/filepath" "strings" @@ -39,6 +41,21 @@ import ( var notificationService *NotificationService var globalPipelineLog = logruslog.Global.Nested("pipeline service") +var pluginOptionSanitizers = map[string]func(map[string]interface{}){ + "gitextractor": func(options map[string]interface{}) { + if v, ok := options["url"]; ok { + gitUrl := cast.ToString(v) + u, _ := url.Parse(gitUrl) + if u != nil && u.User != nil { + password, ok := u.User.Password() + if ok { + gitUrl = strings.Replace(gitUrl, password, strings.Repeat("*", len(password)), -1) + options["url"] = gitUrl + } + } + } + }, +} // PipelineQuery is a query for GetPipelines type PipelineQuery struct { @@ -103,16 +120,64 @@ func pipelineServiceInit() { } // CreatePipeline and return the model -func CreatePipeline(newPipeline *models.NewPipeline) (*models.Pipeline, errors.Error) { +func CreatePipeline(newPipeline *models.NewPipeline, shouldSanitize bool) (*models.Pipeline, errors.Error) { pipeline, err := CreateDbPipeline(newPipeline) if err != nil { return nil, errors.Convert(err) } + if shouldSanitize { + if err := SanitizePipeline(pipeline); err != nil { + return nil, errors.Convert(err) + } + } return pipeline, nil } +func SanitizeBlueprint(blueprint *models.Blueprint) error { + for planStageIdx, pipelineStage := range blueprint.Plan { + for planTaskIdx := range pipelineStage { + pipelineTask, err := SanitizeTask(blueprint.Plan[planStageIdx][planTaskIdx]) + if err != nil { + return err + } + blueprint.Plan[planStageIdx][planTaskIdx] = pipelineTask + } + } + return nil +} + +func SanitizePipeline(pipeline *models.Pipeline) error { + for planStageIdx, pipelineStage := range pipeline.Plan { + for planTaskIdx := range pipelineStage { + pipelineTask, err := SanitizeTask(pipeline.Plan[planStageIdx][planTaskIdx]) + if err != nil { + return err + } + pipeline.Plan[planStageIdx][planTaskIdx] = pipelineTask + } + } + return nil +} + +func SanitizeTask(pipelineTask *models.PipelineTask) (*models.PipelineTask, error) { + pluginName := pipelineTask.Plugin + options, err := SanitizePluginOption(pluginName, pipelineTask.Options) + if err != nil { + return pipelineTask, err + } + pipelineTask.Options = options + return pipelineTask, nil +} + +func SanitizePluginOption(pluginName string, option map[string]interface{}) (map[string]interface{}, error) { + if sanitizer, ok := pluginOptionSanitizers[pluginName]; ok { + sanitizer(option) + } + return option, nil +} + // GetPipelines by query -func GetPipelines(query *PipelineQuery) ([]*models.Pipeline, int64, errors.Error) { +func GetPipelines(query *PipelineQuery, shouldSanitize bool) ([]*models.Pipeline, int64, errors.Error) { pipelines, i, err := GetDbPipelines(query) if err != nil { return nil, 0, errors.Convert(err) @@ -122,12 +187,17 @@ func GetPipelines(query *PipelineQuery) ([]*models.Pipeline, int64, errors.Error if err != nil { return nil, 0, err } + if shouldSanitize { + if err := SanitizePipeline(p); err != nil { + return nil, 0, errors.Convert(err) + } + } } return pipelines, i, nil } // GetPipeline by id -func GetPipeline(pipelineId uint64) (*models.Pipeline, errors.Error) { +func GetPipeline(pipelineId uint64, shouldSanitize bool) (*models.Pipeline, errors.Error) { dbPipeline, err := GetDbPipeline(pipelineId) if err != nil { return nil, err @@ -136,6 +206,9 @@ func GetPipeline(pipelineId uint64) (*models.Pipeline, errors.Error) { if err != nil { return nil, err } + if err := SanitizePipeline(dbPipeline); err != nil { + return nil, errors.Convert(err) + } return dbPipeline, nil } @@ -260,7 +333,7 @@ func NotifyExternal(pipelineId uint64) errors.Error { return nil } // send notification to an external web endpoint - pipeline, err := GetPipeline(pipelineId) + pipeline, err := GetPipeline(pipelineId, true) if err != nil { return err } @@ -368,7 +441,7 @@ func RerunPipeline(pipelineId uint64, task *models.Task) (tasks []*models.Task, } failedTasks = append(failedTasks, task) } else { - tasks, err := GetTasksWithLastStatus(pipelineId) + tasks, err := GetTasksWithLastStatus(pipelineId, false) if err != nil { return nil, errors.Default.Wrap(err, "error getting tasks") } diff --git a/backend/server/services/pipeline_runner.go b/backend/server/services/pipeline_runner.go index c11390315e0..8bd51d37dc1 100644 --- a/backend/server/services/pipeline_runner.go +++ b/backend/server/services/pipeline_runner.go @@ -64,7 +64,7 @@ func GetPipelineLogger(pipeline *models.Pipeline) log.Logger { // runPipeline start a pipeline actually func runPipeline(pipelineId uint64) errors.Error { - ppl, err := GetPipeline(pipelineId) + ppl, err := GetPipeline(pipelineId, false) if err != nil { return err } diff --git a/backend/server/services/project.go b/backend/server/services/project.go index e75b28ce39b..9ebc1e7aed1 100644 --- a/backend/server/services/project.go +++ b/backend/server/services/project.go @@ -385,7 +385,7 @@ func makeProjectOutput(project *models.Project, withLastPipeline bool) (*models. PageSize: 1, Page: 1, }, - }) + }, true) if err != nil { logger.Error(err, "GetPipelines, blueprint id: %d", projectOutput.Blueprint.ID) return nil, errors.Default.Wrap(err, "Error to get pipeline by blueprint id") diff --git a/backend/server/services/task.go b/backend/server/services/task.go index 6c19abd7bf0..e3e9a1fb670 100644 --- a/backend/server/services/task.go +++ b/backend/server/services/task.go @@ -111,7 +111,7 @@ func GetTasks(query *TaskQuery) ([]*models.Task, int64, errors.Error) { // GetTasksWithLastStatus returns task list of the pipeline, only the most recently tasks would be returned // TODO: adopts GetLatestTasksOfPipeline -func GetTasksWithLastStatus(pipelineId uint64) ([]*models.Task, errors.Error) { +func GetTasksWithLastStatus(pipelineId uint64, shouldSanitize bool) ([]*models.Task, errors.Error) { var tasks []*models.Task err := db.All(&tasks, dal.Where("pipeline_id = ?", pipelineId), dal.Orderby("id DESC")) if err != nil { @@ -128,13 +128,22 @@ func GetTasksWithLastStatus(pipelineId uint64) ([]*models.Task, errors.Error) { maxCol = task.PipelineCol } } + for _, task := range tasks { index := int64(task.PipelineRow)*int64(maxCol) + int64(task.PipelineCol) + if shouldSanitize { + taskOption, err := SanitizePluginOption(task.Plugin, task.Options) + if err != nil { + return nil, errors.Convert(err) + } + task.Options = taskOption + } if _, ok := taskIds[index]; !ok { taskIds[index] = struct{}{} result = append(result, task) } } + runningTasks.FillProgressDetailToTasks(result) return result, nil } @@ -217,5 +226,11 @@ func RerunTask(taskId uint64) (*models.Task, errors.Error) { if err != nil { return nil, err } - return rerunTasks[0], nil + rerunTask := rerunTasks[0] + taskOption, sanitizePluginOptionErr := SanitizePluginOption(rerunTask.Plugin, rerunTask.Options) + if sanitizePluginOptionErr != nil { + return nil, errors.Convert(err) + } + rerunTask.Options = taskOption + return rerunTask, nil }