2
0
mirror of https://github.com/acepanel/panel.git synced 2026-02-04 07:57:21 +08:00

refactor: 面板websocket

This commit is contained in:
耗子
2024-10-20 22:14:11 +08:00
parent 892f91be3e
commit ff239c467a
14 changed files with 320 additions and 214 deletions

4
go.mod
View File

@@ -23,6 +23,7 @@ require (
github.com/gookit/color v1.5.4
github.com/gorilla/websocket v1.5.3
github.com/hashicorp/go-version v1.7.0
github.com/klauspost/compress v1.17.9
github.com/knadh/koanf/parsers/yaml v0.1.0
github.com/knadh/koanf/providers/file v1.1.2
github.com/knadh/koanf/v2 v2.1.1
@@ -79,12 +80,11 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jaevor/go-nanoid v1.4.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/pgzip v1.2.6 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/leodido/go-urn v1.4.0 // indirect

3
go.sum
View File

@@ -170,8 +170,9 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY=

View File

@@ -23,6 +23,7 @@ func initHttp() {
// add route
route.Http(app.Http)
route.Ws(app.Http)
apps.Boot(app.Http)
srv := &http.Server{

View File

@@ -0,0 +1 @@
package request

View File

@@ -161,7 +161,6 @@ func Http(r chi.Router) {
ssh := service.NewSSHService()
r.Get("/info", ssh.GetInfo)
r.Post("/info", ssh.UpdateInfo)
r.Get("/session", ssh.Session)
})
r.Route("/container", func(r chi.Router) {

17
internal/route/ws.go Normal file
View File

@@ -0,0 +1,17 @@
package route
import (
"github.com/go-chi/chi/v5"
"github.com/TheTNB/panel/internal/http/middleware"
"github.com/TheTNB/panel/internal/service"
)
func Ws(r chi.Router) {
r.Route("/api/ws", func(r chi.Router) {
r.Use(middleware.MustLogin)
ws := service.NewWsService()
r.Get("/ssh", ws.Session)
r.Get("/exec", ws.Exec)
})
}

View File

@@ -1,20 +1,11 @@
package service
import (
"context"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/spf13/cast"
"go.uber.org/zap"
"github.com/TheTNB/panel/internal/app"
"github.com/TheTNB/panel/internal/biz"
"github.com/TheTNB/panel/internal/data"
"github.com/TheTNB/panel/internal/http/request"
"github.com/TheTNB/panel/pkg/ssh"
)
type SSHService struct {
@@ -49,65 +40,3 @@ func (s *SSHService) UpdateInfo(w http.ResponseWriter, r *http.Request) {
return
}
}
func (s *SSHService) Session(w http.ResponseWriter, r *http.Request) {
info, err := s.sshRepo.GetInfo()
if err != nil {
Error(w, http.StatusInternalServerError, "%v", err)
return
}
upGrader := websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
}
ws, err := upGrader.Upgrade(w, r, nil)
if err != nil {
ErrorSystem(w)
return
}
defer ws.Close()
config := ssh.ClientConfigPassword(
cast.ToString(info["host"])+":"+cast.ToString(info["port"]),
cast.ToString(info["user"]),
cast.ToString(info["password"]),
)
client, err := ssh.NewSSHClient(config)
if err != nil {
_ = ws.WriteControl(websocket.CloseMessage,
[]byte(err.Error()), time.Now().Add(time.Second))
return
}
defer client.Close()
turn, err := ssh.NewTurn(ws, client)
if err != nil {
_ = ws.WriteControl(websocket.CloseMessage,
[]byte(err.Error()), time.Now().Add(time.Second))
return
}
defer turn.Close()
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
if err = turn.Handle(ctx); err != nil {
app.Logger.Error("读取 ssh 数据失败", zap.Error(err))
return
}
}()
go func() {
defer wg.Done()
if err = turn.Wait(); err != nil {
app.Logger.Error("保持 ssh 会话失败", zap.Error(err))
}
cancel()
}()
wg.Wait()
}

140
internal/service/ws.go Normal file
View File

@@ -0,0 +1,140 @@
package service
import (
"bufio"
"context"
"net/http"
"sync"
"github.com/gorilla/websocket"
"github.com/spf13/cast"
"github.com/TheTNB/panel/internal/app"
"github.com/TheTNB/panel/internal/biz"
"github.com/TheTNB/panel/internal/data"
"github.com/TheTNB/panel/pkg/shell"
"github.com/TheTNB/panel/pkg/ssh"
)
type WsService struct {
sshRepo biz.SSHRepo
}
func NewWsService() *WsService {
return &WsService{
sshRepo: data.NewSSHRepo(),
}
}
func (s *WsService) Session(w http.ResponseWriter, r *http.Request) {
info, err := s.sshRepo.GetInfo()
if err != nil {
Error(w, http.StatusInternalServerError, "%v", err)
return
}
ws, err := s.upgrade(w, r)
if err != nil {
ErrorSystem(w)
return
}
defer ws.Close()
config := ssh.ClientConfigPassword(
cast.ToString(info["host"])+":"+cast.ToString(info["port"]),
cast.ToString(info["user"]),
cast.ToString(info["password"]),
)
client, err := ssh.NewSSHClient(config)
if err != nil {
_ = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, err.Error()))
return
}
defer client.Close()
turn, err := ssh.NewTurn(ws, client)
if err != nil {
_ = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, err.Error()))
return
}
defer turn.Close()
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
_ = turn.Handle(ctx)
}()
go func() {
defer wg.Done()
_ = turn.Wait()
}()
wg.Wait()
cancel()
}
func (s *WsService) Exec(w http.ResponseWriter, r *http.Request) {
ws, err := s.upgrade(w, r)
if err != nil {
ErrorSystem(w)
return
}
defer ws.Close()
// 第一条消息是命令
_, cmd, err := ws.ReadMessage()
if err != nil {
_ = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "failed to read command"))
return
}
ctx, cancel := context.WithCancel(context.Background())
out, err := shell.ExecfWithPipe(ctx, string(cmd))
if err != nil {
_ = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "failed to run command"))
cancel()
return
}
go func() {
scanner := bufio.NewScanner(out)
for scanner.Scan() {
line := scanner.Text()
_ = ws.WriteMessage(websocket.TextMessage, []byte(line))
}
if err = scanner.Err(); err != nil {
_ = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "failed to read command output"))
}
}()
s.readLoop(ws)
cancel()
}
func (s *WsService) upgrade(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) {
upGrader := websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
}
// debug 模式下不校验 origin方便 vite 代理调试
if app.Conf.Bool("app.debug") {
upGrader.CheckOrigin = func(r *http.Request) bool {
return true
}
}
return upGrader.Upgrade(w, r, nil)
}
// readLoop 阻塞直到客户端关闭连接
func (s *WsService) readLoop(c *websocket.Conn) {
for {
if _, _, err := c.NextReader(); err != nil {
c.Close()
break
}
}
}

View File

@@ -2,8 +2,10 @@ package shell
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"strings"
@@ -90,3 +92,19 @@ func ExecfWithOutput(shell string, args ...any) error {
return cmd.Run()
}
// ExecfWithPipe 执行 shell 命令并返回管道
func ExecfWithPipe(ctx context.Context, shell string, args ...any) (out io.ReadCloser, err error) {
_ = os.Setenv("LC_ALL", "C")
cmd := exec.CommandContext(ctx, "bash", "-c", fmt.Sprintf(shell, args...))
out, err = cmd.StdoutPipe()
if err != nil {
return
}
cmd.Stderr = cmd.Stdout
err = cmd.Start()
return
}

View File

@@ -80,6 +80,7 @@ func (t *Turn) Handle(context context.Context) error {
default:
_, data, err := t.ws.ReadMessage()
if err != nil {
// 通常是客户端关闭连接
return fmt.Errorf("reading ws message err: %v", err)
}

View File

@@ -1,20 +1,25 @@
const proxyConfigMappings: Record<ProxyType, ProxyConfig> = {
dev: [
{
prefix: '/api/ws',
target: 'ws://localhost:8888/api/ws',
secure: false
},
{
prefix: '/api',
target: 'http://localhost:8080'
target: 'http://localhost:8080/api'
}
],
test: [
{
prefix: '/api',
target: 'http://localhost:8080'
target: 'http://localhost:8080/api'
}
],
prod: [
{
prefix: '/api',
target: 'http://localhost:8080'
target: 'http://localhost:8080/api'
}
]
}

24
web/src/api/ws/index.ts Normal file
View File

@@ -0,0 +1,24 @@
const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'
const base = `${protocol}://${window.location.host}/api/ws/`
export default {
// 执行命令
exec: (cmd: string): Promise<WebSocket> => {
return new Promise((resolve, reject) => {
const ws = new WebSocket(base + 'exec')
ws.onopen = () => {
ws.send(cmd)
resolve(ws)
}
ws.onerror = (e) => reject(e)
})
},
// 连接SSH
ssh: (): Promise<WebSocket> => {
return new Promise((resolve, reject) => {
const ws = new WebSocket(base + 'ssh')
ws.onopen = () => resolve(ws)
ws.onerror = (e) => reject(e)
})
}
}

View File

@@ -3,6 +3,7 @@ defineOptions({
name: 'ssh-index'
})
import ws from '@/api/ws'
import { AttachAddon } from '@xterm/addon-attach'
import { ClipboardAddon } from '@xterm/addon-clipboard'
import { FitAddon } from '@xterm/addon-fit'
@@ -25,12 +26,8 @@ const model = ref({
const terminal = ref<HTMLElement | null>(null)
const term = ref()
const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'
const ws = new WebSocket(`${protocol}://${window.location.host}/api/ssh/session`)
const attachAddon = new AttachAddon(ws)
let sshWs: WebSocket | null = null
const fitAddon = new FitAddon()
const clipboardAddon = new ClipboardAddon()
const webLinksAddon = new WebLinksAddon()
const webglAddon = new WebglAddon()
const handleSave = () => {
@@ -51,26 +48,27 @@ const getInfo = () => {
}
const openSession = () => {
term.value = new Terminal({
lineHeight: 1.2,
fontSize: 14,
fontFamily: "Monaco, Menlo, Consolas, 'Courier New', monospace",
cursorBlink: true,
cursorStyle: 'underline',
tabStopWidth: 4,
theme: { background: '#111', foreground: '#fff' }
})
ws.ssh().then((ws) => {
sshWs = ws
term.value = new Terminal({
lineHeight: 1.2,
fontSize: 14,
fontFamily: "Monaco, Menlo, Consolas, 'Courier New', monospace",
cursorBlink: true,
cursorStyle: 'underline',
tabStopWidth: 4,
theme: { background: '#111', foreground: '#fff' }
})
term.value.loadAddon(attachAddon)
term.value.loadAddon(fitAddon)
term.value.loadAddon(clipboardAddon)
term.value.loadAddon(webLinksAddon)
term.value.loadAddon(webglAddon)
webglAddon.onContextLoss(() => {
webglAddon.dispose()
})
term.value.loadAddon(new AttachAddon(ws))
term.value.loadAddon(fitAddon)
term.value.loadAddon(new ClipboardAddon())
term.value.loadAddon(new WebLinksAddon())
term.value.loadAddon(webglAddon)
webglAddon.onContextLoss(() => {
webglAddon.dispose()
})
ws.onopen = () => {
term.value.open(terminal.value!)
fitAddon.fit()
term.value.focus()
@@ -81,33 +79,33 @@ const openSession = () => {
},
false
)
}
ws.onclose = () => {
term.value.write('\r\n连接已关闭请刷新重试。')
term.value.write('\r\nConnection closed. Please refresh.')
window.removeEventListener('resize', () => {
fitAddon.fit()
})
}
ws.onerror = (event) => {
term.value.write('\r\n连接发生错误请刷新重试。')
term.value.write('\r\nConnection error. Please refresh .')
console.error(event)
ws.close()
}
term.value.onResize(({ cols, rows }: { cols: number; rows: number }) => {
if (ws.readyState === 1) {
ws.send(
JSON.stringify({
resize: true,
columns: cols,
rows: rows
})
)
ws.onclose = () => {
term.value.write('\r\n连接已关闭请刷新重试。')
term.value.write('\r\nConnection closed. Please refresh.')
window.removeEventListener('resize', () => {
fitAddon.fit()
})
}
ws.onerror = (event) => {
term.value.write('\r\n连接发生错误请刷新重试。')
term.value.write('\r\nConnection error. Please refresh .')
console.error(event)
ws.close()
}
term.value.onResize(({ cols, rows }: { cols: number; rows: number }) => {
if (ws.readyState === 1) {
ws.send(
JSON.stringify({
resize: true,
columns: cols,
rows: rows
})
)
}
})
})
}
@@ -129,6 +127,12 @@ onMounted(() => {
getInfo()
openSession()
})
onUnmounted(() => {
if (sshWs) {
sshWs.close()
}
})
</script>
<template>

View File

@@ -3,47 +3,44 @@ defineOptions({
name: 'task-index'
})
import Editor from '@guolao/vue-monaco-editor'
import { NButton, NDataTable, NPopconfirm } from 'naive-ui'
import { useI18n } from 'vue-i18n'
import { LogInst, NButton, NDataTable, NPopconfirm } from 'naive-ui'
import task from '@/api/panel/task'
import ws from '@/api/ws'
import { formatDateTime, renderIcon } from '@/utils'
import type { Task } from '@/views/task/types'
const { t } = useI18n()
const taskLogModal = ref(false)
const taskLog = ref('')
const autoRefresh = ref(false)
const currentTaskId = ref(0)
const logRef = ref<LogInst | null>(null)
let logWs: WebSocket | null = null
const columns: any = [
{ type: 'selection', fixed: 'left' },
{
title: t('taskIndex.columns.name'),
title: '任务名',
key: 'name',
minWidth: 200,
resizable: true,
ellipsis: { tooltip: true }
},
{
title: t('taskIndex.columns.status'),
title: '状态',
key: 'status',
width: 150,
ellipsis: { tooltip: true },
render(row: any) {
return row.status === 'finished'
? t('taskIndex.options.status.finished')
? '已完成'
: row.status === 'waiting'
? t('taskIndex.options.status.waiting')
? '等待中'
: row.status === 'failed'
? t('taskIndex.options.status.failed')
: t('taskIndex.options.status.running')
? '已失败'
: '运行中'
}
},
{
title: t('taskIndex.columns.createdAt'),
title: '创建时间',
key: 'created_at',
width: 200,
ellipsis: { tooltip: true },
@@ -52,7 +49,7 @@ const columns: any = [
}
},
{
title: t('taskIndex.columns.updatedAt'),
title: '完成时间',
key: 'updated_at',
width: 200,
ellipsis: { tooltip: true },
@@ -61,7 +58,7 @@ const columns: any = [
}
},
{
title: t('taskIndex.columns.actions'),
title: '操作',
key: 'actions',
width: 200,
align: 'center',
@@ -76,14 +73,12 @@ const columns: any = [
type: 'warning',
secondary: true,
onClick: () => {
handleShowLog(row.id)
currentTaskId.value = row.id
handleShowLog(row.log)
taskLogModal.value = true
autoRefresh.value = true
}
},
{
default: () => t('taskIndex.buttons.log'),
default: () => '日志',
icon: renderIcon('material-symbols:visibility', { size: 14 })
}
)
@@ -92,14 +87,11 @@ const columns: any = [
? h(
NPopconfirm,
{
onPositiveClick: () => handleDelete(row.id),
onNegativeClick: () => {
window.$message.info(t('taskIndex.buttons.undelete'))
}
onPositiveClick: () => handleDelete(row.id)
},
{
default: () => {
return t('taskIndex.confirm.delete')
return '确定要删除吗?'
},
trigger: () => {
return h(
@@ -110,7 +102,7 @@ const columns: any = [
style: 'margin-left: 15px;'
},
{
default: () => t('taskIndex.buttons.delete'),
default: () => '删除',
icon: renderIcon('material-symbols:delete-outline', { size: 14 })
}
)
@@ -139,23 +131,39 @@ const pagination = reactive({
const handleDelete = (id: number) => {
task.delete(id).then(() => {
window.$message.success(t('taskIndex.alerts.delete'))
window.$message.success('删除成功')
onPageChange(pagination.page)
})
}
const handleShowLog = (id: number) => {
task
.get(id)
.then((res) => {
taskLog.value = res.data.log
const handleShowLog = (path: string) => {
const cmd = `tail -f ${path}`
ws.exec(cmd)
.then((ws: WebSocket) => {
logWs = ws
taskLogModal.value = true
ws.onmessage = (event) => {
taskLog.value += event.data + '\n'
const lines = taskLog.value.split('\n')
if (lines.length > 2000) {
taskLog.value = lines.slice(lines.length - 2000).join('\n')
}
}
})
.catch(() => {
autoRefresh.value = false
window.$message.error('获取日志流失败')
})
}
const getTaskList = async (page: number, limit: number) => {
const handleCloseLog = () => {
if (logWs) {
logWs.close()
}
taskLogModal.value = false
taskLog.value = ''
}
const fetchTaskList = async (page: number, limit: number) => {
const { data } = await task.list(page, limit)
return data
}
@@ -166,7 +174,7 @@ const onChecked = (rowKeys: any) => {
const onPageChange = (page: number) => {
pagination.page = page
getTaskList(page, pagination.pageSize).then((res) => {
fetchTaskList(page, pagination.pageSize).then((res) => {
tasks.value = res.items
pagination.itemCount = res.total
pagination.pageCount = res.total / pagination.pageSize + 1
@@ -178,30 +186,16 @@ const onPageSizeChange = (pageSize: number) => {
onPageChange(1)
}
let timer: any = null
const setAutoRefreshTimer = () => {
timer = setInterval(() => {
handleShowLog(currentTaskId.value)
}, 2000)
}
watch(
() => autoRefresh.value,
(value) => {
if (value) {
setAutoRefreshTimer()
} else {
clearInterval(timer)
}
},
{ immediate: true }
)
onMounted(() => {
onPageChange(pagination.page)
})
onUnmounted(() => {
clearInterval(timer)
watchEffect(() => {
if (taskLog.value) {
nextTick(() => {
logRef.value?.scrollTo({ position: 'bottom', silent: true })
})
}
})
</script>
@@ -227,42 +221,14 @@ onUnmounted(() => {
<n-modal
v-model:show="taskLogModal"
preset="card"
:title="$t('taskIndex.logModal.title')"
title="任务日志"
style="width: 80vw"
size="huge"
:bordered="false"
:segmented="false"
@close="
() => {
autoRefresh = false
taskLogModal = false
}
"
@mask-click="
() => {
autoRefresh = false
taskLogModal = false
}
"
@close="handleCloseLog"
@mask-click="handleCloseLog"
>
<template #header-extra>
<n-switch v-model:value="autoRefresh" style="margin-right: 10px">
<template #checked>{{ $t('taskIndex.logModal.autoRefresh.on') }}</template>
<template #unchecked>{{ $t('taskIndex.logModal.autoRefresh.off') }}</template>
</n-switch>
</template>
<Editor
v-model:value="taskLog"
language="shell"
theme="vs-dark"
height="60vh"
mt-8
:options="{
automaticLayout: true,
formatOnType: true,
formatOnPaste: true,
readOnly: true
}"
/>
<n-log ref="logRef" :log="taskLog" trim :rows="40" />
</n-modal>
</template>