Skip to content

Commit

Permalink
fix: rerun pipeline deadlock (apache#6939)
Browse files Browse the repository at this point in the history
  • Loading branch information
klesh committed Feb 8, 2024
1 parent 822792c commit ec4702d
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 6 deletions.
7 changes: 4 additions & 3 deletions backend/server/api/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ limitations under the License.
package task

import (
"net/http"
"strconv"

"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/server/api/shared"
"github.com/apache/incubator-devlake/server/services"
"net/http"
"strconv"

"github.com/gin-gonic/gin"
)
Expand Down Expand Up @@ -63,7 +64,7 @@ func GetTaskByPipeline(c *gin.Context) {
shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid pipeline ID format"))
return
}
tasks, err := services.GetTasksWithLastStatus(pipelineId, true)
tasks, err := services.GetTasksWithLastStatus(pipelineId, true, nil)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting tasks"))
return
Expand Down
2 changes: 1 addition & 1 deletion backend/server/services/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func RerunPipeline(pipelineId uint64, task *models.Task) (tasks []*models.Task,
}
failedTasks = append(failedTasks, task)
} else {
tasks, err := GetTasksWithLastStatus(pipelineId, false)
tasks, err := GetTasksWithLastStatus(pipelineId, false, tx)
if err != nil {
return nil, errors.Default.Wrap(err, "error getting tasks")
}
Expand Down
7 changes: 5 additions & 2 deletions backend/server/services/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ func GetTasks(query *TaskQuery) ([]*models.Task, int64, errors.Error) {

// GetTasksWithLastStatus returns task list of the pipeline, only the most recently tasks would be returned
// TODO: adopts GetLatestTasksOfPipeline
func GetTasksWithLastStatus(pipelineId uint64, shouldSanitize bool) ([]*models.Task, errors.Error) {
func GetTasksWithLastStatus(pipelineId uint64, shouldSanitize bool, tx dal.Dal) ([]*models.Task, errors.Error) {
if tx == nil {
tx = db
}
var tasks []*models.Task
err := db.All(&tasks, dal.Where("pipeline_id = ?", pipelineId), dal.Orderby("id DESC"))
err := tx.All(&tasks, dal.Where("pipeline_id = ?", pipelineId), dal.Orderby("id DESC"))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit ec4702d

Please sign in to comment.