diff --git a/app/console/commands/monitoring.go b/app/console/commands/monitoring.go index e593ee26..3c419fe9 100644 --- a/app/console/commands/monitoring.go +++ b/app/console/commands/monitoring.go @@ -45,7 +45,6 @@ func (receiver *Monitoring) Handle(console.Context) error { } // 将等待中的任务分发 - // TODO 有bug,需要设计一个锁机制防止重复分发 task := services.NewTaskImpl() _ = task.DispatchWaiting() diff --git a/app/jobs/process_task.go b/app/jobs/process_task.go index d0c79d66..2d5bf15d 100644 --- a/app/jobs/process_task.go +++ b/app/jobs/process_task.go @@ -39,6 +39,15 @@ func (receiver *ProcessTask) Handle(args ...any) error { "task_id": taskID, }).Infof("开始执行任务") + task.Status = models.TaskStatusRunning + if err := facades.Orm().Query().Save(&task); err != nil { + facades.Log().Tags("面板", "异步任务").With(map[string]any{ + "task_id": taskID, + "error": err.Error(), + }).Infof("更新任务状态失败") + return nil + } + if _, err := shell.Execf(task.Shell); err != nil { task.Status = models.TaskStatusFailed _ = facades.Orm().Query().Save(&task) @@ -50,7 +59,13 @@ func (receiver *ProcessTask) Handle(args ...any) error { } task.Status = models.TaskStatusSuccess - _ = facades.Orm().Query().Save(&task) + if err := facades.Orm().Query().Save(&task); err != nil { + facades.Log().Tags("面板", "异步任务").With(map[string]any{ + "task_id": taskID, + "error": err.Error(), + }).Infof("更新任务状态失败") + return nil + } facades.Log().Tags("面板", "异步任务").With(map[string]any{ "task_id": taskID, diff --git a/internal/services/task.go b/internal/services/task.go index 0db27a9c..2ce31d2e 100644 --- a/internal/services/task.go +++ b/internal/services/task.go @@ -3,7 +3,6 @@ package services import ( "sync" - "github.com/goravel/framework/database/orm" "github.com/goravel/framework/facades" "github.com/TheTNB/panel/app/jobs" @@ -20,9 +19,7 @@ func NewTaskImpl() *TaskImpl { } func (r *TaskImpl) Process(taskID uint) error { - if err := r.markAsRunning(taskID); err != nil { - return err - } + taskMap.Store(taskID, true) return facades.Queue().Job(&jobs.ProcessTask{}, []any{taskID}).Dispatch() } @@ -43,16 +40,3 @@ func (r *TaskImpl) DispatchWaiting() error { return nil } - -func (r *TaskImpl) markAsRunning(taskID uint) error { - task := models.Task{ - Model: orm.Model{ID: taskID}, - Status: models.TaskStatusRunning, - } - if _, err := facades.Orm().Query().Where("id", taskID).Update(&task); err != nil { - return err - } - - taskMap.Store(taskID, true) - return nil -}