Skip to content

Commit

Permalink
feat(api): add sanitizer to remove sensitive fields (apache#6538)
Browse files Browse the repository at this point in the history
* feat(api): add sanitizer to remove sensitive fields

* refactor(api): fix lint errors
  • Loading branch information
d4x1 committed Nov 30, 2023
1 parent 71a1ddd commit 817934c
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 33 deletions.
2 changes: 1 addition & 1 deletion backend/plugins/starrocks/api/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type StarRocksPipelinePlan [][]struct {
Host string `json:"host"`
Port int `json:"port"`
User string `json:"user"`
Password string `json:"password"`
Password string `json:"password"` // notice:
Database string `json:"database"`
BeHost string `json:"be_host"`
BePort int `json:"be_port"`
Expand Down
12 changes: 4 additions & 8 deletions backend/server/api/blueprints/blueprints.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,16 @@ type PaginatedBlueprint struct {
// @Router /blueprints [post]
func Post(c *gin.Context) {
blueprint := &models.Blueprint{}

err := c.ShouldBind(blueprint)
if err != nil {
shared.ApiOutputError(c, errors.BadInput.Wrap(err, shared.BadRequestBody))
return
}

err = services.CreateBlueprint(blueprint)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error creating blueprint"))
return
}

shared.ApiOutputSuccess(c, blueprint, http.StatusCreated)
}

Expand All @@ -80,7 +77,7 @@ func Index(c *gin.Context) {
shared.ApiOutputError(c, errors.BadInput.Wrap(err, shared.BadRequestBody))
return
}
blueprints, count, err := services.GetBlueprints(&query)
blueprints, count, err := services.GetBlueprints(&query, true)
if err != nil {
shared.ApiOutputAbort(c, errors.Default.Wrap(err, "error getting blueprints"))
return
Expand All @@ -104,7 +101,7 @@ func Get(c *gin.Context) {
shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad blueprintId format supplied"))
return
}
blueprint, err := services.GetBlueprint(id)
blueprint, err := services.GetBlueprint(id, true)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting blueprint"))
return
Expand Down Expand Up @@ -193,7 +190,7 @@ func Trigger(c *gin.Context) {
return
}
}
pipeline, err := services.TriggerBlueprint(id, syncPolicy)
pipeline, err := services.TriggerBlueprint(id, syncPolicy, true)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error triggering blueprint"))
return
Expand Down Expand Up @@ -222,8 +219,7 @@ func GetBlueprintPipelines(c *gin.Context) {
shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad request URI format"))
return
}

pipelines, count, err := services.GetPipelines(&query)
pipelines, count, err := services.GetPipelines(&query, true)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting pipelines"))
return
Expand Down
17 changes: 13 additions & 4 deletions backend/server/api/pipelines/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func Post(c *gin.Context) {
return
}

pipeline, err := services.CreatePipeline(newPipeline)
pipeline, err := services.CreatePipeline(newPipeline, true)
// Return all created tasks to the User
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error creating pipeline"))
Expand Down Expand Up @@ -78,7 +78,7 @@ func Index(c *gin.Context) {
shared.ApiOutputError(c, errors.BadInput.Wrap(err, shared.BadRequestBody))
return
}
pipelines, count, err := services.GetPipelines(&query)
pipelines, count, err := services.GetPipelines(&query, true)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting pipelines"))
return
Expand Down Expand Up @@ -107,7 +107,7 @@ func Get(c *gin.Context) {
shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad pipelineID format supplied"))
return
}
pipeline, err := services.GetPipeline(id)
pipeline, err := services.GetPipeline(id, true)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting pipeline"))
return
Expand Down Expand Up @@ -154,7 +154,7 @@ func DownloadLogs(c *gin.Context) {
shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad pipeline ID format supplied"))
return
}
pipeline, err := services.GetPipeline(id)
pipeline, err := services.GetPipeline(id, true)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting pipeline"))
return
Expand Down Expand Up @@ -189,5 +189,14 @@ func PostRerun(c *gin.Context) {
shared.ApiOutputError(c, errors.Default.Wrap(err, "failed to rerun pipeline"))
return
}
for idx, task := range rerunTasks {
taskOption, err := services.SanitizePluginOption(task.Plugin, task.Options)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "failed to sanitize task"))
return
}
task.Options = taskOption
rerunTasks[idx] = task
}
shared.ApiOutputSuccess(c, rerunTasks, http.StatusOK)
}
2 changes: 1 addition & 1 deletion backend/server/api/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func GetTaskByPipeline(c *gin.Context) {
shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid pipeline ID format"))
return
}
tasks, err := services.GetTasksWithLastStatus(pipelineId)
tasks, err := services.GetTasksWithLastStatus(pipelineId, true)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting tasks"))
return
Expand Down
51 changes: 41 additions & 10 deletions backend/server/services/blueprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func CreateBlueprint(blueprint *models.Blueprint) errors.Error {
if err != nil {
return err
}
if err := SanitizeBlueprint(blueprint); err != nil {
return errors.Convert(err)
}
err = ReloadBlueprints(cronManager)
if err != nil {
return errors.Internal.Wrap(err, "error reloading blueprints")
Expand All @@ -84,26 +87,44 @@ func CreateBlueprint(blueprint *models.Blueprint) errors.Error {
}

// GetBlueprints returns a paginated list of Blueprints based on `query`
func GetBlueprints(query *BlueprintQuery) ([]*models.Blueprint, int64, errors.Error) {
return bpManager.GetDbBlueprints(&services.GetBlueprintQuery{
func GetBlueprints(query *BlueprintQuery, shouldSanitize bool) ([]*models.Blueprint, int64, errors.Error) {
blueprints, count, err := bpManager.GetDbBlueprints(&services.GetBlueprintQuery{
Enable: query.Enable,
IsManual: query.IsManual,
Label: query.Label,
SkipRecords: query.GetSkip(),
PageSize: query.GetPageSize(),
Type: query.Type,
})
if err != nil {
return nil, 0, err
}
if shouldSanitize {
for idx, bp := range blueprints {
if err := SanitizeBlueprint(bp); err != nil {
return nil, 0, errors.Convert(err)
} else {
blueprints[idx] = bp
}
}
}
return blueprints, count, nil
}

// GetBlueprint returns the detail of a given Blueprint ID
func GetBlueprint(blueprintId uint64) (*models.Blueprint, errors.Error) {
func GetBlueprint(blueprintId uint64, shouldSanitize bool) (*models.Blueprint, errors.Error) {
blueprint, err := bpManager.GetDbBlueprint(blueprintId)
if err != nil {
if db.IsErrorNotFound(err) {
return nil, errors.NotFound.New("blueprint not found")
}
return nil, errors.Internal.Wrap(err, "error getting the blueprint from database")
}
if shouldSanitize {
if err := SanitizeBlueprint(blueprint); err != nil {
return nil, errors.Convert(err)
}
}
return blueprint, nil
}

Expand Down Expand Up @@ -194,7 +215,7 @@ func saveBlueprint(blueprint *models.Blueprint) (*models.Blueprint, errors.Error
// PatchBlueprint FIXME ...
func PatchBlueprint(id uint64, body map[string]interface{}) (*models.Blueprint, errors.Error) {
// load record from db
blueprint, err := GetBlueprint(id)
blueprint, err := GetBlueprint(id, false)
if err != nil {
return nil, err
}
Expand All @@ -219,7 +240,9 @@ func PatchBlueprint(id uint64, body map[string]interface{}) (*models.Blueprint,
if err != nil {
return nil, err
}

if err := SanitizeBlueprint(blueprint); err != nil {
return nil, errors.Convert(err)
}
return blueprint, nil
}

Expand Down Expand Up @@ -311,7 +334,7 @@ func createPipelineByBlueprint(blueprint *models.Blueprint, syncPolicy *models.S
if !shouldCreatePipeline {
return nil, ErrEmptyPlan
}
pipeline, err := CreatePipeline(&newPipeline)
pipeline, err := CreatePipeline(&newPipeline, false)
// Return all created tasks to the User
if err != nil {
blueprintLog.Error(err, fmt.Sprintf("%s on blueprint:[%d][%s]", failToCreateCronJob, blueprint.ID, blueprint.Name))
Expand Down Expand Up @@ -376,15 +399,23 @@ func SequencializePipelinePlans(plans ...models.PipelinePlan) models.PipelinePla
}

// TriggerBlueprint triggers blueprint immediately
func TriggerBlueprint(id uint64, syncPolicy *models.SyncPolicy) (*models.Pipeline, errors.Error) {
func TriggerBlueprint(id uint64, syncPolicy *models.SyncPolicy, shouldSanitize bool) (*models.Pipeline, errors.Error) {
// load record from db
blueprint, err := GetBlueprint(id)
blueprint, err := GetBlueprint(id, false)
if err != nil {
logger.Error(err, "GetBlueprint, id: %d", id)
return nil, err
}
blueprint.SkipCollectors = syncPolicy.SkipCollectors
blueprint.FullSync = syncPolicy.FullSync

return createPipelineByBlueprint(blueprint, syncPolicy)
pipeline, err := createPipelineByBlueprint(blueprint, syncPolicy)
if err != nil {
return nil, err
}
if shouldSanitize {
if err := SanitizePipeline(pipeline); err != nil {
return nil, errors.Convert(err)
}
}
return pipeline, nil
}
83 changes: 78 additions & 5 deletions backend/server/services/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package services
import (
"context"
"fmt"
"github.com/spf13/cast"
"net/url"
"os"
"path/filepath"
"strings"
Expand All @@ -39,6 +41,21 @@ import (

var notificationService *NotificationService
var globalPipelineLog = logruslog.Global.Nested("pipeline service")
var pluginOptionSanitizers = map[string]func(map[string]interface{}){
"gitextractor": func(options map[string]interface{}) {
if v, ok := options["url"]; ok {
gitUrl := cast.ToString(v)
u, _ := url.Parse(gitUrl)
if u != nil && u.User != nil {
password, ok := u.User.Password()
if ok {
gitUrl = strings.Replace(gitUrl, password, strings.Repeat("*", len(password)), -1)
options["url"] = gitUrl
}
}
}
},
}

// PipelineQuery is a query for GetPipelines
type PipelineQuery struct {
Expand Down Expand Up @@ -103,16 +120,64 @@ func pipelineServiceInit() {
}

// CreatePipeline and return the model
func CreatePipeline(newPipeline *models.NewPipeline) (*models.Pipeline, errors.Error) {
func CreatePipeline(newPipeline *models.NewPipeline, shouldSanitize bool) (*models.Pipeline, errors.Error) {
pipeline, err := CreateDbPipeline(newPipeline)
if err != nil {
return nil, errors.Convert(err)
}
if shouldSanitize {
if err := SanitizePipeline(pipeline); err != nil {
return nil, errors.Convert(err)
}
}
return pipeline, nil
}

func SanitizeBlueprint(blueprint *models.Blueprint) error {
for planStageIdx, pipelineStage := range blueprint.Plan {
for planTaskIdx := range pipelineStage {
pipelineTask, err := SanitizeTask(blueprint.Plan[planStageIdx][planTaskIdx])
if err != nil {
return err
}
blueprint.Plan[planStageIdx][planTaskIdx] = pipelineTask
}
}
return nil
}

func SanitizePipeline(pipeline *models.Pipeline) error {
for planStageIdx, pipelineStage := range pipeline.Plan {
for planTaskIdx := range pipelineStage {
pipelineTask, err := SanitizeTask(pipeline.Plan[planStageIdx][planTaskIdx])
if err != nil {
return err
}
pipeline.Plan[planStageIdx][planTaskIdx] = pipelineTask
}
}
return nil
}

func SanitizeTask(pipelineTask *models.PipelineTask) (*models.PipelineTask, error) {
pluginName := pipelineTask.Plugin
options, err := SanitizePluginOption(pluginName, pipelineTask.Options)
if err != nil {
return pipelineTask, err
}
pipelineTask.Options = options
return pipelineTask, nil
}

func SanitizePluginOption(pluginName string, option map[string]interface{}) (map[string]interface{}, error) {
if sanitizer, ok := pluginOptionSanitizers[pluginName]; ok {
sanitizer(option)
}
return option, nil
}

// GetPipelines by query
func GetPipelines(query *PipelineQuery) ([]*models.Pipeline, int64, errors.Error) {
func GetPipelines(query *PipelineQuery, shouldSanitize bool) ([]*models.Pipeline, int64, errors.Error) {
pipelines, i, err := GetDbPipelines(query)
if err != nil {
return nil, 0, errors.Convert(err)
Expand All @@ -122,12 +187,17 @@ func GetPipelines(query *PipelineQuery) ([]*models.Pipeline, int64, errors.Error
if err != nil {
return nil, 0, err
}
if shouldSanitize {
if err := SanitizePipeline(p); err != nil {
return nil, 0, errors.Convert(err)
}
}
}
return pipelines, i, nil
}

// GetPipeline by id
func GetPipeline(pipelineId uint64) (*models.Pipeline, errors.Error) {
func GetPipeline(pipelineId uint64, shouldSanitize bool) (*models.Pipeline, errors.Error) {
dbPipeline, err := GetDbPipeline(pipelineId)
if err != nil {
return nil, err
Expand All @@ -136,6 +206,9 @@ func GetPipeline(pipelineId uint64) (*models.Pipeline, errors.Error) {
if err != nil {
return nil, err
}
if err := SanitizePipeline(dbPipeline); err != nil {
return nil, errors.Convert(err)
}
return dbPipeline, nil
}

Expand Down Expand Up @@ -260,7 +333,7 @@ func NotifyExternal(pipelineId uint64) errors.Error {
return nil
}
// send notification to an external web endpoint
pipeline, err := GetPipeline(pipelineId)
pipeline, err := GetPipeline(pipelineId, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -368,7 +441,7 @@ func RerunPipeline(pipelineId uint64, task *models.Task) (tasks []*models.Task,
}
failedTasks = append(failedTasks, task)
} else {
tasks, err := GetTasksWithLastStatus(pipelineId)
tasks, err := GetTasksWithLastStatus(pipelineId, false)
if err != nil {
return nil, errors.Default.Wrap(err, "error getting tasks")
}
Expand Down
2 changes: 1 addition & 1 deletion backend/server/services/pipeline_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func GetPipelineLogger(pipeline *models.Pipeline) log.Logger {

// runPipeline start a pipeline actually
func runPipeline(pipelineId uint64) errors.Error {
ppl, err := GetPipeline(pipelineId)
ppl, err := GetPipeline(pipelineId, false)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 817934c

Please sign in to comment.