mirror of
https://github.com/acepanel/panel.git
synced 2026-02-04 01:57:19 +08:00
feat: 优化队列实现
This commit is contained in:
@@ -1,11 +1,13 @@
|
||||
package bootstrap
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/TheTNB/panel/internal/app"
|
||||
"github.com/TheTNB/panel/pkg/queue"
|
||||
)
|
||||
|
||||
func initQueue() {
|
||||
app.Queue = queue.New()
|
||||
go app.Queue.Run()
|
||||
app.Queue = queue.New(40)
|
||||
go app.Queue.Run(context.Background())
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ type JobWithErrHandle interface {
|
||||
ErrHandle(err error)
|
||||
}
|
||||
|
||||
type Jobs struct {
|
||||
type JobItem struct {
|
||||
Job Job
|
||||
Args []any
|
||||
Delay uint
|
||||
|
||||
@@ -1,53 +1,45 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Queue struct {
|
||||
jobs chan Jobs
|
||||
isShutdown chan struct{}
|
||||
done chan struct{}
|
||||
jobs chan JobItem
|
||||
}
|
||||
|
||||
func New() *Queue {
|
||||
func New(bufferSize int) *Queue {
|
||||
return &Queue{
|
||||
jobs: make(chan Jobs, 10),
|
||||
isShutdown: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
jobs: make(chan JobItem, bufferSize),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Queue) Push(job Job, args []any) error {
|
||||
select {
|
||||
case <-r.isShutdown:
|
||||
return errors.New("queue is shutdown, cannot add new jobs")
|
||||
default:
|
||||
r.jobs <- Jobs{Job: job, Args: args}
|
||||
case r.jobs <- JobItem{Job: job, Args: args}:
|
||||
return nil
|
||||
default:
|
||||
return errors.New("job queue is full")
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Queue) Bulk(jobs []Jobs) error {
|
||||
func (r *Queue) Bulk(jobs []JobItem) error {
|
||||
for _, job := range jobs {
|
||||
if job.Delay > 0 {
|
||||
time.AfterFunc(time.Duration(job.Delay)*time.Second, func() {
|
||||
select {
|
||||
case <-r.isShutdown:
|
||||
return
|
||||
default:
|
||||
r.jobs <- Jobs{Job: job.Job, Args: job.Args}
|
||||
}
|
||||
jobCopy := job
|
||||
if jobCopy.Delay > 0 {
|
||||
time.AfterFunc(time.Duration(jobCopy.Delay)*time.Second, func() {
|
||||
r.jobs <- jobCopy
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case <-r.isShutdown:
|
||||
return errors.New("queue is shutdown, cannot add new jobs")
|
||||
case r.jobs <- jobCopy:
|
||||
return nil
|
||||
default:
|
||||
r.jobs <- job
|
||||
return errors.New("job queue is full")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,38 +47,41 @@ func (r *Queue) Bulk(jobs []Jobs) error {
|
||||
}
|
||||
|
||||
func (r *Queue) Later(delay uint, job Job, args []any) error {
|
||||
jobCopy := job
|
||||
argsCopy := make([]any, len(args))
|
||||
copy(argsCopy, args)
|
||||
time.AfterFunc(time.Duration(delay)*time.Second, func() {
|
||||
select {
|
||||
case <-r.isShutdown:
|
||||
return
|
||||
default:
|
||||
r.jobs <- Jobs{Job: job, Args: args}
|
||||
}
|
||||
r.jobs <- JobItem{Job: jobCopy, Args: argsCopy}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Queue) Run() {
|
||||
func (r *Queue) Run(ctx context.Context) {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case job := <-r.jobs:
|
||||
if err := job.Job.Handle(job.Args...); err != nil {
|
||||
if errJob, ok := job.Job.(JobWithErrHandle); ok {
|
||||
errJob.ErrHandle(err)
|
||||
}
|
||||
}
|
||||
case <-r.isShutdown:
|
||||
close(r.done)
|
||||
processJob(job)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (r *Queue) Shutdown() error {
|
||||
close(r.isShutdown)
|
||||
<-r.done
|
||||
return nil
|
||||
func (r *Queue) Len() int {
|
||||
return len(r.jobs)
|
||||
}
|
||||
|
||||
func (r *Queue) IsFull() bool {
|
||||
return len(r.jobs) == cap(r.jobs)
|
||||
}
|
||||
|
||||
func processJob(job JobItem) {
|
||||
if err := job.Job.Handle(job.Args...); err != nil {
|
||||
if errJob, ok := job.Job.(JobWithErrHandle); ok {
|
||||
errJob.ErrHandle(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -17,33 +18,21 @@ func TestQueueTestSuite(t *testing.T) {
|
||||
}
|
||||
|
||||
func (suite *QueueTestSuite) TestQueueInitialization() {
|
||||
queue := New()
|
||||
queue := New(10)
|
||||
suite.NotNil(queue)
|
||||
suite.NotNil(queue.jobs)
|
||||
suite.NotNil(queue.isShutdown)
|
||||
suite.NotNil(queue.done)
|
||||
}
|
||||
|
||||
func (suite *QueueTestSuite) TestPushJobToQueue() {
|
||||
queue := New()
|
||||
queue := New(10)
|
||||
job := &MockJob{}
|
||||
err := queue.Push(job, []any{"arg1", "arg2"})
|
||||
suite.NoError(err)
|
||||
}
|
||||
|
||||
func (suite *QueueTestSuite) TestPushJobToShutdownQueue() {
|
||||
queue := New()
|
||||
queue.Run()
|
||||
suite.NoError(queue.Shutdown())
|
||||
job := &MockJob{}
|
||||
err := queue.Push(job, []any{"arg1", "arg2"})
|
||||
suite.Error(err)
|
||||
suite.EqualError(err, "queue is shutdown, cannot add new jobs")
|
||||
}
|
||||
|
||||
func (suite *QueueTestSuite) TestBulkJobsToQueue() {
|
||||
queue := New()
|
||||
jobs := []Jobs{
|
||||
queue := New(10)
|
||||
jobs := []JobItem{
|
||||
{Job: &MockJob{}, Args: []any{"arg1"}},
|
||||
{Job: &MockJob{}, Args: []any{"arg2"}},
|
||||
}
|
||||
@@ -51,80 +40,51 @@ func (suite *QueueTestSuite) TestBulkJobsToQueue() {
|
||||
suite.NoError(err)
|
||||
}
|
||||
|
||||
func (suite *QueueTestSuite) TestBulkJobsToShutdownQueue() {
|
||||
queue := New()
|
||||
queue.Run()
|
||||
suite.NoError(queue.Shutdown())
|
||||
jobs := []Jobs{
|
||||
{Job: &MockJob{}, Args: []any{"arg1"}},
|
||||
{Job: &MockJob{}, Args: []any{"arg2"}},
|
||||
}
|
||||
err := queue.Bulk(jobs)
|
||||
suite.Error(err)
|
||||
suite.EqualError(err, "queue is shutdown, cannot add new jobs")
|
||||
}
|
||||
|
||||
func (suite *QueueTestSuite) TestLaterJobExecution() {
|
||||
queue := New()
|
||||
job := &MockJob{}
|
||||
err := queue.Later(1, job, []any{"arg1"})
|
||||
suite.NoError(err)
|
||||
}
|
||||
|
||||
func (suite *QueueTestSuite) TestLaterJobExecutionOnShutdownQueue() {
|
||||
queue := New()
|
||||
queue.Run()
|
||||
suite.NoError(queue.Shutdown())
|
||||
queue := New(10)
|
||||
job := &MockJob{}
|
||||
err := queue.Later(1, job, []any{"arg1"})
|
||||
suite.NoError(err)
|
||||
}
|
||||
|
||||
func (suite *QueueTestSuite) TestRunQueue() {
|
||||
queue := New()
|
||||
queue := New(10)
|
||||
job := &MockJob{}
|
||||
suite.NoError(queue.Push(job, []any{"arg1"}))
|
||||
queue.Run()
|
||||
queue.Run(context.Background())
|
||||
time.Sleep(1 * time.Second)
|
||||
suite.True(job.Executed)
|
||||
}
|
||||
|
||||
func (suite *QueueTestSuite) TestRunQueueWithLaterJob() {
|
||||
queue := New()
|
||||
queue := New(10)
|
||||
job := &MockJob{}
|
||||
suite.NoError(queue.Later(1, job, []any{"arg1"}))
|
||||
queue.Run()
|
||||
queue.Run(context.Background())
|
||||
time.Sleep(2 * time.Second)
|
||||
suite.True(job.Executed)
|
||||
}
|
||||
|
||||
func (suite *QueueTestSuite) TestRunQueueWithBulkJobs() {
|
||||
queue := New()
|
||||
jobs := []Jobs{
|
||||
queue := New(10)
|
||||
jobs := []JobItem{
|
||||
{Job: &MockJob{}, Args: []any{"arg1"}},
|
||||
{Job: &MockJob{}, Args: []any{"arg2"}},
|
||||
}
|
||||
suite.NoError(queue.Bulk(jobs))
|
||||
queue.Run()
|
||||
queue.Run(context.Background())
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
func (suite *QueueTestSuite) TestRunQueueWithErrHandle() {
|
||||
queue := New()
|
||||
queue := New(10)
|
||||
job := &MockJob{}
|
||||
suite.NoError(queue.Push(job, []any{"arg1"}))
|
||||
queue.Run()
|
||||
queue.Run(context.Background())
|
||||
time.Sleep(1 * time.Second)
|
||||
suite.Error(job.Err)
|
||||
}
|
||||
|
||||
func (suite *QueueTestSuite) TestShutdownQueue() {
|
||||
queue := New()
|
||||
queue.Run()
|
||||
err := queue.Shutdown()
|
||||
suite.NoError(err)
|
||||
}
|
||||
|
||||
type MockJob struct {
|
||||
Executed bool
|
||||
Err error
|
||||
|
||||
@@ -7,7 +7,7 @@ export default {
|
||||
list: (page: number, limit: number): Promise<AxiosResponse<any>> =>
|
||||
request.get('/app/list', { params: { page, limit } }),
|
||||
// 安装应用
|
||||
install: (slug: string, channel: string): Promise<AxiosResponse<any>> =>
|
||||
install: (slug: string, channel: string | null): Promise<AxiosResponse<any>> =>
|
||||
request.post('/app/install', { slug, channel }),
|
||||
// 卸载应用
|
||||
uninstall: (slug: string): Promise<AxiosResponse<any>> =>
|
||||
|
||||
@@ -11,8 +11,8 @@ const info = defineModel<App>('info', { type: Object, required: true })
|
||||
|
||||
const doSubmit = ref(false)
|
||||
|
||||
const model = reactive({
|
||||
channel: '',
|
||||
const model = ref({
|
||||
channel: null,
|
||||
version: ''
|
||||
})
|
||||
|
||||
@@ -26,15 +26,33 @@ const options = computed(() => {
|
||||
})
|
||||
|
||||
const handleSubmit = () => {
|
||||
app.install(info.value.slug, model.channel).then(() => {
|
||||
window.$message.success(t('appIndex.alerts.install'))
|
||||
})
|
||||
app
|
||||
.install(info.value.slug, model.value.channel)
|
||||
.then(() => {
|
||||
window.$message.success(t('appIndex.alerts.install'))
|
||||
})
|
||||
.finally(() => {
|
||||
doSubmit.value = false
|
||||
show.value = false
|
||||
model.value = {
|
||||
channel: null,
|
||||
version: ''
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
const handleChannelUpdate = (value: string) => {
|
||||
const channel = info.value.channels.find((channel) => channel.slug === value)
|
||||
if (channel) {
|
||||
model.version = channel.subs[0].version
|
||||
model.value.version = channel.subs[0].version
|
||||
}
|
||||
}
|
||||
|
||||
const handleClose = () => {
|
||||
show.value = false
|
||||
model.value = {
|
||||
channel: null,
|
||||
version: ''
|
||||
}
|
||||
}
|
||||
</script>
|
||||
@@ -48,6 +66,7 @@ const handleChannelUpdate = (value: string) => {
|
||||
size="huge"
|
||||
:bordered="false"
|
||||
:segmented="false"
|
||||
@close="handleClose"
|
||||
>
|
||||
<n-form :model="model">
|
||||
<n-form-item path="channel" label="渠道">
|
||||
|
||||
Reference in New Issue
Block a user