From a75e36e447f6c5c27f4a48b05df51ef292c7fa7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=97=E5=AD=90?= Date: Mon, 24 Jun 2024 23:10:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/console/commands/monitoring.go | 4 ++-- app/jobs/process_task.go | 31 +++++------------------------- internal/services/task.go | 24 +++++++++++++++++++++++ 3 files changed, 31 insertions(+), 28 deletions(-) diff --git a/app/console/commands/monitoring.go b/app/console/commands/monitoring.go index 0a1b1e47..e593ee26 100644 --- a/app/console/commands/monitoring.go +++ b/app/console/commands/monitoring.go @@ -46,8 +46,8 @@ func (receiver *Monitoring) Handle(console.Context) error { // 将等待中的任务分发 // TODO 有bug,需要设计一个锁机制防止重复分发 - //task := services.NewTaskImpl() - //_ = task.DispatchWaiting() + task := services.NewTaskImpl() + _ = task.DispatchWaiting() setting := services.NewSettingImpl() monitor := setting.Get(models.SettingKeyMonitor) diff --git a/app/jobs/process_task.go b/app/jobs/process_task.go index 60acfb37..d0c79d66 100644 --- a/app/jobs/process_task.go +++ b/app/jobs/process_task.go @@ -27,20 +27,11 @@ func (receiver *ProcessTask) Handle(args ...any) error { } var task models.Task - if err := facades.Orm().Query().Where("id = ?", taskID).Get(&task); err != nil { + _ = facades.Orm().Query().Where("id = ?", taskID).Get(&task) + if task.ID == 0 { facades.Log().Tags("面板", "异步任务").With(map[string]any{ "task_id": taskID, - "error": err.Error(), - }).Infof("获取任务失败") - return nil - } - - task.Status = models.TaskStatusRunning - if err := facades.Orm().Query().Save(&task); err != nil { - facades.Log().Tags("面板", "异步任务").With(map[string]any{ - "task_id": taskID, - "error": err.Error(), - }).Infof("更新任务失败") + }).Infof("任务不存在") return nil } @@ -50,13 +41,7 @@ func (receiver *ProcessTask) Handle(args ...any) error { if _, err := shell.Execf(task.Shell); err != nil { task.Status = models.TaskStatusFailed - if err := facades.Orm().Query().Save(&task); err != nil { - facades.Log().Tags("面板", "异步任务").With(map[string]any{ - "task_id": taskID, - "error": err.Error(), - }).Infof("更新任务失败") - return nil - } + _ = facades.Orm().Query().Save(&task) facades.Log().Tags("面板", "异步任务").With(map[string]any{ "task_id": taskID, "error": err.Error(), @@ -65,13 +50,7 @@ func (receiver *ProcessTask) Handle(args ...any) error { } task.Status = models.TaskStatusSuccess - if err := facades.Orm().Query().Save(&task); err != nil { - facades.Log().Tags("面板", "异步任务").With(map[string]any{ - "task_id": taskID, - "error": err.Error(), - }).Infof("更新任务失败") - return nil - } + _ = facades.Orm().Query().Save(&task) facades.Log().Tags("面板", "异步任务").With(map[string]any{ "task_id": taskID, diff --git a/internal/services/task.go b/internal/services/task.go index 479e6c9c..0db27a9c 100644 --- a/internal/services/task.go +++ b/internal/services/task.go @@ -1,12 +1,17 @@ package services import ( + "sync" + + "github.com/goravel/framework/database/orm" "github.com/goravel/framework/facades" "github.com/TheTNB/panel/app/jobs" "github.com/TheTNB/panel/app/models" ) +var taskMap sync.Map + type TaskImpl struct { } @@ -15,6 +20,9 @@ func NewTaskImpl() *TaskImpl { } func (r *TaskImpl) Process(taskID uint) error { + if err := r.markAsRunning(taskID); err != nil { + return err + } return facades.Queue().Job(&jobs.ProcessTask{}, []any{taskID}).Dispatch() } @@ -25,6 +33,9 @@ func (r *TaskImpl) DispatchWaiting() error { } for _, task := range tasks { + if _, ok := taskMap.Load(task.ID); ok { + continue + } if err := r.Process(task.ID); err != nil { return err } @@ -32,3 +43,16 @@ func (r *TaskImpl) DispatchWaiting() error { return nil } + +func (r *TaskImpl) markAsRunning(taskID uint) error { + task := models.Task{ + Model: orm.Model{ID: taskID}, + Status: models.TaskStatusRunning, + } + if _, err := facades.Orm().Query().Where("id", taskID).Update(&task); err != nil { + return err + } + + taskMap.Store(taskID, true) + return nil +}