From 0c4d97a5121cba3c1dee6e220c7fe2b5f11452b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=97=E5=AD=90?= Date: Sat, 12 Oct 2024 00:58:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E9=98=9F=E5=88=97?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/bootstrap/queue.go | 6 ++- pkg/queue/job.go | 2 +- pkg/queue/queue.go | 77 ++++++++++++++---------------- pkg/queue/queue_test.go | 70 ++++++--------------------- web/src/api/panel/app/index.ts | 2 +- web/src/views/app/VersionModal.vue | 31 +++++++++--- 6 files changed, 82 insertions(+), 106 deletions(-) diff --git a/internal/bootstrap/queue.go b/internal/bootstrap/queue.go index 69fec45f..7ea617a4 100644 --- a/internal/bootstrap/queue.go +++ b/internal/bootstrap/queue.go @@ -1,11 +1,13 @@ package bootstrap import ( + "context" + "github.com/TheTNB/panel/internal/app" "github.com/TheTNB/panel/pkg/queue" ) func initQueue() { - app.Queue = queue.New() - go app.Queue.Run() + app.Queue = queue.New(40) + go app.Queue.Run(context.Background()) } diff --git a/pkg/queue/job.go b/pkg/queue/job.go index 20432a31..81aaf4e1 100644 --- a/pkg/queue/job.go +++ b/pkg/queue/job.go @@ -9,7 +9,7 @@ type JobWithErrHandle interface { ErrHandle(err error) } -type Jobs struct { +type JobItem struct { Job Job Args []any Delay uint diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index f64cd049..9f478c80 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -1,53 +1,45 @@ package queue import ( + "context" "errors" "time" ) type Queue struct { - jobs chan Jobs - isShutdown chan struct{} - done chan struct{} + jobs chan JobItem } -func New() *Queue { +func New(bufferSize int) *Queue { return &Queue{ - jobs: make(chan Jobs, 10), - isShutdown: make(chan struct{}), - done: make(chan struct{}), + jobs: make(chan JobItem, bufferSize), } } func (r *Queue) Push(job Job, args []any) error { select { - case <-r.isShutdown: - return errors.New("queue is shutdown, cannot add new jobs") - default: - r.jobs <- Jobs{Job: job, Args: args} + case r.jobs <- JobItem{Job: job, Args: args}: return nil + default: + return errors.New("job queue is full") } } -func (r *Queue) Bulk(jobs []Jobs) error { +func (r *Queue) Bulk(jobs []JobItem) error { for _, job := range jobs { - if job.Delay > 0 { - time.AfterFunc(time.Duration(job.Delay)*time.Second, func() { - select { - case <-r.isShutdown: - return - default: - r.jobs <- Jobs{Job: job.Job, Args: job.Args} - } + jobCopy := job + if jobCopy.Delay > 0 { + time.AfterFunc(time.Duration(jobCopy.Delay)*time.Second, func() { + r.jobs <- jobCopy }) continue } select { - case <-r.isShutdown: - return errors.New("queue is shutdown, cannot add new jobs") + case r.jobs <- jobCopy: + return nil default: - r.jobs <- job + return errors.New("job queue is full") } } @@ -55,38 +47,41 @@ func (r *Queue) Bulk(jobs []Jobs) error { } func (r *Queue) Later(delay uint, job Job, args []any) error { + jobCopy := job + argsCopy := make([]any, len(args)) + copy(argsCopy, args) time.AfterFunc(time.Duration(delay)*time.Second, func() { - select { - case <-r.isShutdown: - return - default: - r.jobs <- Jobs{Job: job, Args: args} - } + r.jobs <- JobItem{Job: jobCopy, Args: argsCopy} }) return nil } -func (r *Queue) Run() { +func (r *Queue) Run(ctx context.Context) { go func() { for { select { case job := <-r.jobs: - if err := job.Job.Handle(job.Args...); err != nil { - if errJob, ok := job.Job.(JobWithErrHandle); ok { - errJob.ErrHandle(err) - } - } - case <-r.isShutdown: - close(r.done) + processJob(job) + case <-ctx.Done(): return } } }() } -func (r *Queue) Shutdown() error { - close(r.isShutdown) - <-r.done - return nil +func (r *Queue) Len() int { + return len(r.jobs) +} + +func (r *Queue) IsFull() bool { + return len(r.jobs) == cap(r.jobs) +} + +func processJob(job JobItem) { + if err := job.Job.Handle(job.Args...); err != nil { + if errJob, ok := job.Job.(JobWithErrHandle); ok { + errJob.ErrHandle(err) + } + } } diff --git a/pkg/queue/queue_test.go b/pkg/queue/queue_test.go index db86a0fe..22b0be3b 100644 --- a/pkg/queue/queue_test.go +++ b/pkg/queue/queue_test.go @@ -1,6 +1,7 @@ package queue import ( + "context" "errors" "testing" "time" @@ -17,33 +18,21 @@ func TestQueueTestSuite(t *testing.T) { } func (suite *QueueTestSuite) TestQueueInitialization() { - queue := New() + queue := New(10) suite.NotNil(queue) suite.NotNil(queue.jobs) - suite.NotNil(queue.isShutdown) - suite.NotNil(queue.done) } func (suite *QueueTestSuite) TestPushJobToQueue() { - queue := New() + queue := New(10) job := &MockJob{} err := queue.Push(job, []any{"arg1", "arg2"}) suite.NoError(err) } -func (suite *QueueTestSuite) TestPushJobToShutdownQueue() { - queue := New() - queue.Run() - suite.NoError(queue.Shutdown()) - job := &MockJob{} - err := queue.Push(job, []any{"arg1", "arg2"}) - suite.Error(err) - suite.EqualError(err, "queue is shutdown, cannot add new jobs") -} - func (suite *QueueTestSuite) TestBulkJobsToQueue() { - queue := New() - jobs := []Jobs{ + queue := New(10) + jobs := []JobItem{ {Job: &MockJob{}, Args: []any{"arg1"}}, {Job: &MockJob{}, Args: []any{"arg2"}}, } @@ -51,80 +40,51 @@ func (suite *QueueTestSuite) TestBulkJobsToQueue() { suite.NoError(err) } -func (suite *QueueTestSuite) TestBulkJobsToShutdownQueue() { - queue := New() - queue.Run() - suite.NoError(queue.Shutdown()) - jobs := []Jobs{ - {Job: &MockJob{}, Args: []any{"arg1"}}, - {Job: &MockJob{}, Args: []any{"arg2"}}, - } - err := queue.Bulk(jobs) - suite.Error(err) - suite.EqualError(err, "queue is shutdown, cannot add new jobs") -} - func (suite *QueueTestSuite) TestLaterJobExecution() { - queue := New() - job := &MockJob{} - err := queue.Later(1, job, []any{"arg1"}) - suite.NoError(err) -} - -func (suite *QueueTestSuite) TestLaterJobExecutionOnShutdownQueue() { - queue := New() - queue.Run() - suite.NoError(queue.Shutdown()) + queue := New(10) job := &MockJob{} err := queue.Later(1, job, []any{"arg1"}) suite.NoError(err) } func (suite *QueueTestSuite) TestRunQueue() { - queue := New() + queue := New(10) job := &MockJob{} suite.NoError(queue.Push(job, []any{"arg1"})) - queue.Run() + queue.Run(context.Background()) time.Sleep(1 * time.Second) suite.True(job.Executed) } func (suite *QueueTestSuite) TestRunQueueWithLaterJob() { - queue := New() + queue := New(10) job := &MockJob{} suite.NoError(queue.Later(1, job, []any{"arg1"})) - queue.Run() + queue.Run(context.Background()) time.Sleep(2 * time.Second) suite.True(job.Executed) } func (suite *QueueTestSuite) TestRunQueueWithBulkJobs() { - queue := New() - jobs := []Jobs{ + queue := New(10) + jobs := []JobItem{ {Job: &MockJob{}, Args: []any{"arg1"}}, {Job: &MockJob{}, Args: []any{"arg2"}}, } suite.NoError(queue.Bulk(jobs)) - queue.Run() + queue.Run(context.Background()) time.Sleep(1 * time.Second) } func (suite *QueueTestSuite) TestRunQueueWithErrHandle() { - queue := New() + queue := New(10) job := &MockJob{} suite.NoError(queue.Push(job, []any{"arg1"})) - queue.Run() + queue.Run(context.Background()) time.Sleep(1 * time.Second) suite.Error(job.Err) } -func (suite *QueueTestSuite) TestShutdownQueue() { - queue := New() - queue.Run() - err := queue.Shutdown() - suite.NoError(err) -} - type MockJob struct { Executed bool Err error diff --git a/web/src/api/panel/app/index.ts b/web/src/api/panel/app/index.ts index 587338f9..d4a18246 100644 --- a/web/src/api/panel/app/index.ts +++ b/web/src/api/panel/app/index.ts @@ -7,7 +7,7 @@ export default { list: (page: number, limit: number): Promise> => request.get('/app/list', { params: { page, limit } }), // 安装应用 - install: (slug: string, channel: string): Promise> => + install: (slug: string, channel: string | null): Promise> => request.post('/app/install', { slug, channel }), // 卸载应用 uninstall: (slug: string): Promise> => diff --git a/web/src/views/app/VersionModal.vue b/web/src/views/app/VersionModal.vue index a72dbdd6..3558b654 100644 --- a/web/src/views/app/VersionModal.vue +++ b/web/src/views/app/VersionModal.vue @@ -11,8 +11,8 @@ const info = defineModel('info', { type: Object, required: true }) const doSubmit = ref(false) -const model = reactive({ - channel: '', +const model = ref({ + channel: null, version: '' }) @@ -26,15 +26,33 @@ const options = computed(() => { }) const handleSubmit = () => { - app.install(info.value.slug, model.channel).then(() => { - window.$message.success(t('appIndex.alerts.install')) - }) + app + .install(info.value.slug, model.value.channel) + .then(() => { + window.$message.success(t('appIndex.alerts.install')) + }) + .finally(() => { + doSubmit.value = false + show.value = false + model.value = { + channel: null, + version: '' + } + }) } const handleChannelUpdate = (value: string) => { const channel = info.value.channels.find((channel) => channel.slug === value) if (channel) { - model.version = channel.subs[0].version + model.value.version = channel.subs[0].version + } +} + +const handleClose = () => { + show.value = false + model.value = { + channel: null, + version: '' } } @@ -48,6 +66,7 @@ const handleChannelUpdate = (value: string) => { size="huge" :bordered="false" :segmented="false" + @close="handleClose" >