2
0
mirror of https://github.com/acepanel/panel.git synced 2026-02-04 07:57:21 +08:00

feat: 重启后自动调度等待中的任务

This commit is contained in:
耗子
2024-10-15 17:18:28 +08:00
parent 31ad944da8
commit 41d3d3fb97
3 changed files with 18 additions and 8 deletions

View File

@@ -28,5 +28,5 @@ type TaskRepo interface {
Delete(id uint) error
UpdateStatus(id uint, status TaskStatus) error
Push(task *Task) error
DispatchWaiting() error
DispatchWaiting()
}

View File

@@ -19,8 +19,8 @@ func BootWeb() {
boot()
initValidator()
initSession()
initCron()
initQueue()
initCron()
initHttp()
select {}

View File

@@ -1,14 +1,22 @@
package data
import (
"sync"
"go.uber.org/zap"
"github.com/TheTNB/panel/internal/app"
"github.com/TheTNB/panel/internal/biz"
"github.com/TheTNB/panel/internal/queuejob"
)
var taskDispatchOnce sync.Once
type taskRepo struct{}
func NewTaskRepo() biz.TaskRepo {
task := &taskRepo{}
taskDispatchOnce.Do(task.DispatchWaiting)
return &taskRepo{}
}
@@ -48,17 +56,19 @@ func (r *taskRepo) Push(task *biz.Task) error {
})
}
func (r *taskRepo) DispatchWaiting() error {
func (r *taskRepo) DispatchWaiting() {
var tasks []biz.Task
if err := app.Orm.Where("status = ?", biz.TaskStatusWaiting).Find(&tasks).Error; err != nil {
return err
app.Logger.Error("获取待处理任务失败", zap.Error(err))
return
}
for _, task := range tasks {
if err := r.Push(&task); err != nil {
return err
if err := app.Queue.Push(queuejob.NewProcessTask(r), []any{
task.ID,
}); err != nil {
app.Logger.Error("推送任务失败", zap.Error(err))
return
}
}
return nil
}