diff --git a/internal/biz/task.go b/internal/biz/task.go index 4a037996..e58380d0 100644 --- a/internal/biz/task.go +++ b/internal/biz/task.go @@ -28,5 +28,5 @@ type TaskRepo interface { Delete(id uint) error UpdateStatus(id uint, status TaskStatus) error Push(task *Task) error - DispatchWaiting() error + DispatchWaiting() } diff --git a/internal/bootstrap/app.go b/internal/bootstrap/app.go index 0f1f1dfb..170b9224 100644 --- a/internal/bootstrap/app.go +++ b/internal/bootstrap/app.go @@ -19,8 +19,8 @@ func BootWeb() { boot() initValidator() initSession() - initCron() initQueue() + initCron() initHttp() select {} diff --git a/internal/data/task.go b/internal/data/task.go index f7deb4b7..cf1af955 100644 --- a/internal/data/task.go +++ b/internal/data/task.go @@ -1,14 +1,22 @@ package data import ( + "sync" + + "go.uber.org/zap" + "github.com/TheTNB/panel/internal/app" "github.com/TheTNB/panel/internal/biz" "github.com/TheTNB/panel/internal/queuejob" ) +var taskDispatchOnce sync.Once + type taskRepo struct{} func NewTaskRepo() biz.TaskRepo { + task := &taskRepo{} + taskDispatchOnce.Do(task.DispatchWaiting) return &taskRepo{} } @@ -48,17 +56,19 @@ func (r *taskRepo) Push(task *biz.Task) error { }) } -func (r *taskRepo) DispatchWaiting() error { +func (r *taskRepo) DispatchWaiting() { var tasks []biz.Task if err := app.Orm.Where("status = ?", biz.TaskStatusWaiting).Find(&tasks).Error; err != nil { - return err + app.Logger.Error("获取待处理任务失败", zap.Error(err)) + return } for _, task := range tasks { - if err := r.Push(&task); err != nil { - return err + if err := app.Queue.Push(queuejob.NewProcessTask(r), []any{ + task.ID, + }); err != nil { + app.Logger.Error("推送任务失败", zap.Error(err)) + return } } - - return nil }