Skip to content

Commit

Permalink
feat: bcs-task manager add ListTask
Browse files Browse the repository at this point in the history
  • Loading branch information
ifooth committed Sep 28, 2024
1 parent ca17bab commit d5bef22
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 14 deletions.
5 changes: 5 additions & 0 deletions bcs-common/common/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ func (m *TaskManager) GetTaskWithID(ctx context.Context, taskId string) (*types.
return GetGlobalStorage().GetTask(ctx, taskId)
}

// ListTask list tasks with options, return pagination items
func (m *TaskManager) ListTask(ctx context.Context, opt *istore.ListOption) (*istore.Pagination[types.Task], error) {
return GetGlobalStorage().ListTask(ctx, opt)
}

// UpdateTask update task
// ! warning: modify task status will cause task status not consistent
func (m *TaskManager) UpdateTask(ctx context.Context, task *types.Task) error {
Expand Down
22 changes: 15 additions & 7 deletions bcs-common/common/task/stores/iface/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,26 @@ import (

// ListOption ...
type ListOption struct {
TaskID string
TaskType string
TaskName string
TaskIndex string
TaskIndexType string
CurrentStep string
Status string
Creator string
// Sort map for sort list results
Sort map[string]int
// Offset offset for list results
Offset int64
// Limit limit for list results
Limit int64
// All for all results
All bool
// Count for index
Count bool
// SkipDecrypt skip data decrypt
SkipDecrypt bool
}

// Pagination generic pagination for list results
type Pagination[T any] struct {
Count int64 `json:"count"`
Items []*T `json:"items"`
}

// UpdateOption ...
Expand Down Expand Up @@ -74,7 +82,7 @@ type PatchOption struct {
type Store interface {
EnsureTable(ctx context.Context, dst ...any) error
CreateTask(ctx context.Context, task *types.Task) error
ListTask(ctx context.Context, opt *ListOption) ([]types.Task, error)
ListTask(ctx context.Context, opt *ListOption) (*Pagination[types.Task], error)
GetTask(ctx context.Context, taskID string) (*types.Task, error)
DeleteTask(ctx context.Context, taskID string) error
UpdateTask(ctx context.Context, task *types.Task) error
Expand Down
2 changes: 1 addition & 1 deletion bcs-common/common/task/stores/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *memStore) CreateTask(ctx context.Context, task *types.Task) error {
return nil
}

func (s *memStore) ListTask(ctx context.Context, opt *iface.ListOption) ([]types.Task, error) {
func (s *memStore) ListTask(ctx context.Context, opt *iface.ListOption) (*iface.Pagination[types.Task], error) {
return nil, types.ErrNotImplemented
}

Expand Down
10 changes: 7 additions & 3 deletions bcs-common/common/task/stores/mongo/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func (m *ModelTask) GetTask(ctx context.Context, taskID string) (*types.Task, er
}

// ListTask list clusters
func (m *ModelTask) ListTask(ctx context.Context, opt *iface.ListOption) ([]types.Task, error) {
taskList := make([]types.Task, 0)
func (m *ModelTask) ListTask(ctx context.Context, opt *iface.ListOption) (*iface.Pagination[types.Task], error) {
taskList := make([]*types.Task, 0)
finder := m.db.Table(m.tableName).Find(operator.EmptyCondition)
if len(opt.Sort) != 0 {
finder = finder.WithSort(MapInt2MapIf(opt.Sort))
Expand All @@ -182,5 +182,9 @@ func (m *ModelTask) ListTask(ctx context.Context, opt *iface.ListOption) ([]type
if err := finder.All(ctx, &taskList); err != nil {
return nil, err
}
return taskList, nil
result := &iface.Pagination[types.Task]{
Count: int64(len(taskList)),
Items: taskList,
}
return result, nil
}
21 changes: 21 additions & 0 deletions bcs-common/common/task/stores/mysql/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
package mysql

import (
"gorm.io/gorm"

"github.com/Tencent/bk-bcs/bcs-common/common/task/types"
)

Expand Down Expand Up @@ -164,3 +166,22 @@ func getUpdateStepRecord(t *types.Step) *StepRecord {
}
return record
}

// FindByPage 分页查询
func FindByPage[T any](db *gorm.DB, offset int, limit int) (result []*T, count int64, err error) {
err = db.Offset(offset).Limit(limit).Find(&result).Error
if err != nil {
return
}

if size := len(result); 0 < limit && 0 < size && size < limit {
count = int64(size + offset)
return
}

err = db.Offset(-1).Limit(-1).Count(&count).Error
if err != nil {
return
}
return
}
34 changes: 31 additions & 3 deletions bcs-common/common/task/stores/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *mysqlStore) initDsn(raw string) {
s.dsn = u.String()
}

// EnsureTable 创建db表
// EnsureTable implement istore EnsureTable interface
func (s *mysqlStore) EnsureTable(ctx context.Context, dst ...any) error {
// 没有自定义数据, 使用默认表结构
if len(dst) == 0 {
Expand All @@ -86,6 +86,7 @@ func (s *mysqlStore) EnsureTable(ctx context.Context, dst ...any) error {
return s.db.WithContext(ctx).AutoMigrate(dst...)
}

// CreateTask implement istore CreateTask interface
func (s *mysqlStore) CreateTask(ctx context.Context, task *types.Task) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
record := getTaskRecord(task)
Expand All @@ -102,10 +103,34 @@ func (s *mysqlStore) CreateTask(ctx context.Context, task *types.Task) error {
})
}

func (s *mysqlStore) ListTask(ctx context.Context, opt *iface.ListOption) ([]types.Task, error) {
return nil, types.ErrNotImplemented
// ListTask implement istore ListTask interface
func (s *mysqlStore) ListTask(ctx context.Context, opt *iface.ListOption) (*iface.Pagination[types.Task], error) {
tx := s.db.WithContext(ctx)

// 0值gorm自动忽略查询
tx = tx.Where(&TaskRecord{
TaskID: opt.TaskID,
TaskType: opt.TaskType,
TaskName: opt.TaskName,
TaskIndex: opt.TaskIndex,
TaskIndexType: opt.TaskIndexType,
Status: opt.Status,
CurrentStep: opt.CurrentStep,
Creator: opt.Creator,
})

result, count, err := FindByPage[types.Task](tx, int(opt.Offset), int(opt.Limit))
if err != nil {
return nil, err
}

return &iface.Pagination[types.Task]{
Count: count,
Items: result,
}, nil
}

// UpdateTask implement istore UpdateTask interface
func (s *mysqlStore) UpdateTask(ctx context.Context, task *types.Task) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
updateTask := getUpdateTaskRecord(task)
Expand All @@ -132,6 +157,7 @@ func (s *mysqlStore) UpdateTask(ctx context.Context, task *types.Task) error {
})
}

// DeleteTask implement istore DeleteTask interface
func (s *mysqlStore) DeleteTask(ctx context.Context, taskID string) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
if err := tx.Where("task_id = ?", taskID).Delete(&TaskRecord{}).Error; err != nil {
Expand All @@ -145,6 +171,7 @@ func (s *mysqlStore) DeleteTask(ctx context.Context, taskID string) error {
})
}

// GetTask implement istore GetTask interface
func (s *mysqlStore) GetTask(ctx context.Context, taskID string) (*types.Task, error) {
tx := s.db.WithContext(ctx)
taskRecord := TaskRecord{}
Expand All @@ -159,6 +186,7 @@ func (s *mysqlStore) GetTask(ctx context.Context, taskID string) (*types.Task, e
return toTask(&taskRecord, stepRecord), nil
}

// PatchTask implement istore PatchTask interface
func (s *mysqlStore) PatchTask(ctx context.Context, opt *iface.PatchOption) error {
return types.ErrNotImplemented
}

0 comments on commit d5bef22

Please sign in to comment.