mirror of
https://github.com/acepanel/panel.git
synced 2026-02-04 18:27:13 +08:00
feat: 优化任务
This commit is contained in:
@@ -46,8 +46,8 @@ func (receiver *Monitoring) Handle(console.Context) error {
|
||||
|
||||
// 将等待中的任务分发
|
||||
// TODO 有bug,需要设计一个锁机制防止重复分发
|
||||
//task := services.NewTaskImpl()
|
||||
//_ = task.DispatchWaiting()
|
||||
task := services.NewTaskImpl()
|
||||
_ = task.DispatchWaiting()
|
||||
|
||||
setting := services.NewSettingImpl()
|
||||
monitor := setting.Get(models.SettingKeyMonitor)
|
||||
|
||||
@@ -27,20 +27,11 @@ func (receiver *ProcessTask) Handle(args ...any) error {
|
||||
}
|
||||
|
||||
var task models.Task
|
||||
if err := facades.Orm().Query().Where("id = ?", taskID).Get(&task); err != nil {
|
||||
_ = facades.Orm().Query().Where("id = ?", taskID).Get(&task)
|
||||
if task.ID == 0 {
|
||||
facades.Log().Tags("面板", "异步任务").With(map[string]any{
|
||||
"task_id": taskID,
|
||||
"error": err.Error(),
|
||||
}).Infof("获取任务失败")
|
||||
return nil
|
||||
}
|
||||
|
||||
task.Status = models.TaskStatusRunning
|
||||
if err := facades.Orm().Query().Save(&task); err != nil {
|
||||
facades.Log().Tags("面板", "异步任务").With(map[string]any{
|
||||
"task_id": taskID,
|
||||
"error": err.Error(),
|
||||
}).Infof("更新任务失败")
|
||||
}).Infof("任务不存在")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -50,13 +41,7 @@ func (receiver *ProcessTask) Handle(args ...any) error {
|
||||
|
||||
if _, err := shell.Execf(task.Shell); err != nil {
|
||||
task.Status = models.TaskStatusFailed
|
||||
if err := facades.Orm().Query().Save(&task); err != nil {
|
||||
facades.Log().Tags("面板", "异步任务").With(map[string]any{
|
||||
"task_id": taskID,
|
||||
"error": err.Error(),
|
||||
}).Infof("更新任务失败")
|
||||
return nil
|
||||
}
|
||||
_ = facades.Orm().Query().Save(&task)
|
||||
facades.Log().Tags("面板", "异步任务").With(map[string]any{
|
||||
"task_id": taskID,
|
||||
"error": err.Error(),
|
||||
@@ -65,13 +50,7 @@ func (receiver *ProcessTask) Handle(args ...any) error {
|
||||
}
|
||||
|
||||
task.Status = models.TaskStatusSuccess
|
||||
if err := facades.Orm().Query().Save(&task); err != nil {
|
||||
facades.Log().Tags("面板", "异步任务").With(map[string]any{
|
||||
"task_id": taskID,
|
||||
"error": err.Error(),
|
||||
}).Infof("更新任务失败")
|
||||
return nil
|
||||
}
|
||||
_ = facades.Orm().Query().Save(&task)
|
||||
|
||||
facades.Log().Tags("面板", "异步任务").With(map[string]any{
|
||||
"task_id": taskID,
|
||||
|
||||
@@ -1,12 +1,17 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/goravel/framework/database/orm"
|
||||
"github.com/goravel/framework/facades"
|
||||
|
||||
"github.com/TheTNB/panel/app/jobs"
|
||||
"github.com/TheTNB/panel/app/models"
|
||||
)
|
||||
|
||||
var taskMap sync.Map
|
||||
|
||||
type TaskImpl struct {
|
||||
}
|
||||
|
||||
@@ -15,6 +20,9 @@ func NewTaskImpl() *TaskImpl {
|
||||
}
|
||||
|
||||
func (r *TaskImpl) Process(taskID uint) error {
|
||||
if err := r.markAsRunning(taskID); err != nil {
|
||||
return err
|
||||
}
|
||||
return facades.Queue().Job(&jobs.ProcessTask{}, []any{taskID}).Dispatch()
|
||||
}
|
||||
|
||||
@@ -25,6 +33,9 @@ func (r *TaskImpl) DispatchWaiting() error {
|
||||
}
|
||||
|
||||
for _, task := range tasks {
|
||||
if _, ok := taskMap.Load(task.ID); ok {
|
||||
continue
|
||||
}
|
||||
if err := r.Process(task.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -32,3 +43,16 @@ func (r *TaskImpl) DispatchWaiting() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *TaskImpl) markAsRunning(taskID uint) error {
|
||||
task := models.Task{
|
||||
Model: orm.Model{ID: taskID},
|
||||
Status: models.TaskStatusRunning,
|
||||
}
|
||||
if _, err := facades.Orm().Query().Where("id", taskID).Update(&task); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
taskMap.Store(taskID, true)
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user