Skip to content

Commit

Permalink
feat: bcs-task manager add ListTask (#3545)
Browse files Browse the repository at this point in the history
* feat: bcs-task manager add ListTask

* feat: bcs-task manager add ListTask
  • Loading branch information
ifooth committed Sep 29, 2024
1 parent 30a5c9b commit 8935dfa
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 46 deletions.
5 changes: 5 additions & 0 deletions bcs-common/common/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ func (m *TaskManager) GetTaskWithID(ctx context.Context, taskId string) (*types.
return GetGlobalStorage().GetTask(ctx, taskId)
}

// ListTask list tasks with options, returns a paginated list of tasks
func (m *TaskManager) ListTask(ctx context.Context, opt *istore.ListOption) (*istore.Pagination[types.Task], error) {
return GetGlobalStorage().ListTask(ctx, opt)
}

// UpdateTask update task
// ! warning: modify task status will cause task status not consistent
func (m *TaskManager) UpdateTask(ctx context.Context, task *types.Task) error {
Expand Down
57 changes: 18 additions & 39 deletions bcs-common/common/task/stores/iface/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,25 @@ import (

// ListOption ...
type ListOption struct {
// Sort map for sort list results
Sort map[string]int
// Offset offset for list results
Offset int64
// Limit limit for list results
Limit int64
// All for all results
All bool
// Count for index
Count bool
// SkipDecrypt skip data decrypt
SkipDecrypt bool
TaskID string
TaskType string
TaskName string
TaskIndex string
TaskIndexType string
CurrentStep string
Status string
Creator string
StartGte *time.Time // StartGte start time greater or equal to
StartLte *time.Time // StartLte start time less or equal to
Sort map[string]int // Sort map for sort list results
Offset int64 // Offset offset for list results
Limit int64 // Limit limit for list results
}

// UpdateOption ...
type UpdateOption struct {
CurrentStep string `json:"currentStep"`
CommonParams map[string]string `json:"commonParams"`
ExtraJson string `json:"extraJson"`
Status string `json:"status"`
Message string `json:"message"`
Start time.Time `json:"start"`
End time.Time `json:"end"`
ExecutionTime uint32 `json:"executionTime"`
LastUpdate time.Time `json:"lastUpdate"`
Updater string `json:"updater"`
StepOptions map[string]*UpdateStepOption `json:"stepOptions"`
}

// UpdateStepOption ...
type UpdateStepOption struct {
Params map[string]string `json:"params"`
Extras string `json:"extras"`
Status string `json:"status"`
Message string `json:"message"`
RetryCount uint32 `json:"retryCount"`
Start time.Time `json:"start"`
End time.Time `json:"end"`
ExecutionTime uint32 `json:"executionTime"`
LastUpdate time.Time `json:"lastUpdate"`
// Pagination generic pagination for list results
type Pagination[T any] struct {
Count int64 `json:"count"`
Items []*T `json:"items"`
}

// PatchOption 主要实时更新params, payload信息
Expand All @@ -74,7 +53,7 @@ type PatchOption struct {
type Store interface {
EnsureTable(ctx context.Context, dst ...any) error
CreateTask(ctx context.Context, task *types.Task) error
ListTask(ctx context.Context, opt *ListOption) ([]types.Task, error)
ListTask(ctx context.Context, opt *ListOption) (*Pagination[types.Task], error)
GetTask(ctx context.Context, taskID string) (*types.Task, error)
DeleteTask(ctx context.Context, taskID string) error
UpdateTask(ctx context.Context, task *types.Task) error
Expand Down
2 changes: 1 addition & 1 deletion bcs-common/common/task/stores/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *memStore) CreateTask(ctx context.Context, task *types.Task) error {
return nil
}

func (s *memStore) ListTask(ctx context.Context, opt *iface.ListOption) ([]types.Task, error) {
func (s *memStore) ListTask(ctx context.Context, opt *iface.ListOption) (*iface.Pagination[types.Task], error) {
return nil, types.ErrNotImplemented
}

Expand Down
10 changes: 7 additions & 3 deletions bcs-common/common/task/stores/mongo/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func (m *ModelTask) GetTask(ctx context.Context, taskID string) (*types.Task, er
}

// ListTask list clusters
func (m *ModelTask) ListTask(ctx context.Context, opt *iface.ListOption) ([]types.Task, error) {
taskList := make([]types.Task, 0)
func (m *ModelTask) ListTask(ctx context.Context, opt *iface.ListOption) (*iface.Pagination[types.Task], error) {
taskList := make([]*types.Task, 0)
finder := m.db.Table(m.tableName).Find(operator.EmptyCondition)
if len(opt.Sort) != 0 {
finder = finder.WithSort(MapInt2MapIf(opt.Sort))
Expand All @@ -182,5 +182,9 @@ func (m *ModelTask) ListTask(ctx context.Context, opt *iface.ListOption) ([]type
if err := finder.All(ctx, &taskList); err != nil {
return nil, err
}
return taskList, nil
result := &iface.Pagination[types.Task]{
Count: int64(len(taskList)),
Items: taskList,
}
return result, nil
}
18 changes: 18 additions & 0 deletions bcs-common/common/task/stores/mysql/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
package mysql

import (
"gorm.io/gorm"

"github.com/Tencent/bk-bcs/bcs-common/common/task/types"
)

Expand Down Expand Up @@ -164,3 +166,19 @@ func getUpdateStepRecord(t *types.Step) *StepRecord {
}
return record
}

// FindByPage 分页查询
func FindByPage[T any](db *gorm.DB, offset int, limit int) (result []*T, count int64, err error) {
err = db.Offset(offset).Limit(limit).Find(&result).Error
if err != nil {
return
}

if size := len(result); 0 < limit && 0 < size && size < limit {
count = int64(size + offset)
return
}

err = db.Offset(-1).Limit(-1).Count(&count).Error
return
}
50 changes: 47 additions & 3 deletions bcs-common/common/task/stores/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *mysqlStore) initDsn(raw string) {
s.dsn = u.String()
}

// EnsureTable 创建db表
// EnsureTable implement istore EnsureTable interface
func (s *mysqlStore) EnsureTable(ctx context.Context, dst ...any) error {
// 没有自定义数据, 使用默认表结构
if len(dst) == 0 {
Expand All @@ -86,6 +86,7 @@ func (s *mysqlStore) EnsureTable(ctx context.Context, dst ...any) error {
return s.db.WithContext(ctx).AutoMigrate(dst...)
}

// CreateTask implement istore CreateTask interface
func (s *mysqlStore) CreateTask(ctx context.Context, task *types.Task) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
record := getTaskRecord(task)
Expand All @@ -102,10 +103,50 @@ func (s *mysqlStore) CreateTask(ctx context.Context, task *types.Task) error {
})
}

func (s *mysqlStore) ListTask(ctx context.Context, opt *iface.ListOption) ([]types.Task, error) {
return nil, types.ErrNotImplemented
// ListTask implement istore ListTask interface
func (s *mysqlStore) ListTask(ctx context.Context, opt *iface.ListOption) (*iface.Pagination[types.Task], error) {
tx := s.db.WithContext(ctx)

// 条件过滤 0值gorm自动忽略查询
tx = tx.Where(&TaskRecord{
TaskID: opt.TaskID,
TaskType: opt.TaskType,
TaskName: opt.TaskName,
TaskIndex: opt.TaskIndex,
TaskIndexType: opt.TaskIndexType,
Status: opt.Status,
CurrentStep: opt.CurrentStep,
Creator: opt.Creator,
})

// mysql store 使用创建时间过滤
if opt.StartGte != nil {
tx = tx.Where("created_at >= ?", opt.StartGte)
}
if opt.StartLte != nil {
tx = tx.Where("created_at <= ?", opt.StartLte)
}

// 只使用id排序
tx = tx.Order("id DESC")

result, count, err := FindByPage[TaskRecord](tx, int(opt.Offset), int(opt.Limit))
if err != nil {
return nil, err
}

items := make([]*types.Task, 0, len(result))
for _, record := range result {
items = append(items, toTask(record, []*StepRecord{}))
}

return &iface.Pagination[types.Task]{
Count: count,
Items: items,
}, nil
}

// UpdateTask implement istore UpdateTask interface
func (s *mysqlStore) UpdateTask(ctx context.Context, task *types.Task) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
updateTask := getUpdateTaskRecord(task)
Expand All @@ -132,6 +173,7 @@ func (s *mysqlStore) UpdateTask(ctx context.Context, task *types.Task) error {
})
}

// DeleteTask implement istore DeleteTask interface
func (s *mysqlStore) DeleteTask(ctx context.Context, taskID string) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
if err := tx.Where("task_id = ?", taskID).Delete(&TaskRecord{}).Error; err != nil {
Expand All @@ -145,6 +187,7 @@ func (s *mysqlStore) DeleteTask(ctx context.Context, taskID string) error {
})
}

// GetTask implement istore GetTask interface
func (s *mysqlStore) GetTask(ctx context.Context, taskID string) (*types.Task, error) {
tx := s.db.WithContext(ctx)
taskRecord := TaskRecord{}
Expand All @@ -159,6 +202,7 @@ func (s *mysqlStore) GetTask(ctx context.Context, taskID string) (*types.Task, e
return toTask(&taskRecord, stepRecord), nil
}

// PatchTask implement istore PatchTask interface
func (s *mysqlStore) PatchTask(ctx context.Context, opt *iface.PatchOption) error {
return types.ErrNotImplemented
}

0 comments on commit 8935dfa

Please sign in to comment.