diff --git a/go.mod b/go.mod index 6a5caf94..7e2ea04a 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/bddjr/hlfhr v1.4.0 github.com/beevik/ntp v1.5.0 github.com/coder/websocket v1.8.14 + github.com/containerd/errdefs v1.0.0 github.com/coreos/go-systemd/v22 v22.6.0 github.com/creack/pty v1.1.24 github.com/dchest/captcha v1.1.0 @@ -64,7 +65,6 @@ require ( github.com/G-Core/gcore-dns-sdk-go v0.3.3 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/boombuler/barcode v1.1.0 // indirect - github.com/containerd/errdefs v1.0.0 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect github.com/distribution/reference v0.6.0 // indirect github.com/docker/go-connections v0.6.0 // indirect diff --git a/internal/route/ws.go b/internal/route/ws.go index 205dc791..55c170e1 100644 --- a/internal/route/ws.go +++ b/internal/route/ws.go @@ -18,8 +18,9 @@ func NewWs(ws *service.WsService) *Ws { func (route *Ws) Register(r *chi.Mux) { r.Route("/api/ws", func(r chi.Router) { - r.Get("/ssh", route.ws.Session) r.Get("/exec", route.ws.Exec) + r.Get("/pty", route.ws.PTY) + r.Get("/ssh", route.ws.Session) r.Get("/container/{id}", route.ws.ContainerTerminal) r.Get("/container/image/pull", route.ws.ContainerImagePull) }) diff --git a/internal/service/ws.go b/internal/service/ws.go index 7b3a69f0..9d8bc663 100644 --- a/internal/service/ws.go +++ b/internal/service/ws.go @@ -38,49 +38,6 @@ func NewWsService(t *gotext.Locale, conf *config.Config, log *slog.Logger, ssh b } } -func (s *WsService) Session(w http.ResponseWriter, r *http.Request) { - req, err := Bind[request.ID](r) - if err != nil { - Error(w, http.StatusUnprocessableEntity, "%v", err) - return - } - info, err := s.sshRepo.Get(req.ID) - if err != nil { - Error(w, http.StatusInternalServerError, "%v", err) - return - } - - ws, err := s.upgrade(w, r) - if err != nil { - s.log.Warn("[Websocket] upgrade session ws error", slog.Any("err", err)) - return - } - defer func(ws *websocket.Conn) { _ = ws.CloseNow() }(ws) - - client, err := ssh.NewSSHClient(info.Config) - if err != nil { - _ = ws.Close(websocket.StatusNormalClosure, err.Error()) - return - } - defer func(client *stdssh.Client) { _ = client.Close() }(client) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - turn, err := ssh.NewTurn(ctx, ws, client) - if err != nil { - _ = ws.Close(websocket.StatusNormalClosure, err.Error()) - return - } - - go func() { - defer turn.Close() // Handle 退出后关闭 SSH 连接,以结束 Wait 阶段 - _ = turn.Handle(ctx) - }() - - turn.Wait() -} - func (s *WsService) Exec(w http.ResponseWriter, r *http.Request) { ws, err := s.upgrade(w, r) if err != nil { @@ -90,7 +47,7 @@ func (s *WsService) Exec(w http.ResponseWriter, r *http.Request) { defer func(ws *websocket.Conn) { _ = ws.CloseNow() }(ws) // 第一条消息是命令 - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(r.Context()) defer cancel() _, cmd, err := ws.Read(ctx) @@ -119,7 +76,91 @@ func (s *WsService) Exec(w http.ResponseWriter, r *http.Request) { s.readLoop(ctx, ws) } -// ContainerTerminal 容器终端 WebSocket 处理 +// PTY 通用 PTY 命令执行 +// 前端发送第一条消息为要执行的命令,后端通过 PTY 执行并实时返回输出 +func (s *WsService) PTY(w http.ResponseWriter, r *http.Request) { + ws, err := s.upgrade(w, r) + if err != nil { + s.log.Warn("[Websocket] upgrade pty ws error", slog.Any("err", err)) + return + } + defer func(ws *websocket.Conn) { _ = ws.CloseNow() }(ws) + + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() + + // 要执行的命令 + _, message, err := ws.Read(ctx) + if err != nil { + _ = ws.Close(websocket.StatusNormalClosure, s.t.Get("failed to read command: %v", err)) + return + } + command := string(message) + if command == "" { + _ = ws.Close(websocket.StatusNormalClosure, s.t.Get("command is empty")) + return + } + + // PTY 执行命令 + turn, err := shell.NewPTYTurn(ctx, ws, command) + if err != nil { + _ = ws.Write(ctx, websocket.MessageBinary, []byte("\r\n"+s.t.Get("Failed to start command: %v", err)+"\r\n")) + _ = ws.Close(websocket.StatusNormalClosure, "") + return + } + + go func() { + defer turn.Close() + _ = turn.Handle(ctx) + }() + + turn.Wait() +} + +func (s *WsService) Session(w http.ResponseWriter, r *http.Request) { + req, err := Bind[request.ID](r) + if err != nil { + Error(w, http.StatusUnprocessableEntity, "%v", err) + return + } + info, err := s.sshRepo.Get(req.ID) + if err != nil { + Error(w, http.StatusInternalServerError, "%v", err) + return + } + + ws, err := s.upgrade(w, r) + if err != nil { + s.log.Warn("[Websocket] upgrade session ws error", slog.Any("err", err)) + return + } + defer func(ws *websocket.Conn) { _ = ws.CloseNow() }(ws) + + sshClient, err := ssh.NewSSHClient(info.Config) + if err != nil { + _ = ws.Close(websocket.StatusNormalClosure, err.Error()) + return + } + defer func(sshClient *stdssh.Client) { _ = sshClient.Close() }(sshClient) + + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() + + turn, err := ssh.NewTurn(ctx, ws, sshClient) + if err != nil { + _ = ws.Close(websocket.StatusNormalClosure, err.Error()) + return + } + + go func() { + defer turn.Close() // Handle 退出后关闭 SSH 连接,以结束 Wait 阶段 + _ = turn.Handle(ctx) + }() + + turn.Wait() +} + +// ContainerTerminal 容器终端 func (s *WsService) ContainerTerminal(w http.ResponseWriter, r *http.Request) { req, err := Bind[request.ContainerID](r) if err != nil { @@ -134,7 +175,7 @@ func (s *WsService) ContainerTerminal(w http.ResponseWriter, r *http.Request) { } defer func(ws *websocket.Conn) { _ = ws.CloseNow() }(ws) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(r.Context()) defer cancel() // 默认使用 bash 作为 shell,如果不存在则回退到 sh @@ -155,30 +196,7 @@ func (s *WsService) ContainerTerminal(w http.ResponseWriter, r *http.Request) { turn.Wait() } -func (s *WsService) upgrade(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) { - opts := &websocket.AcceptOptions{ - CompressionMode: websocket.CompressionContextTakeover, - } - - // debug 模式下不校验 origin,方便 vite 代理调试 - if s.conf.App.Debug { - opts.InsecureSkipVerify = true - } - - return websocket.Accept(w, r, opts) -} - -// readLoop 阻塞直到客户端关闭连接 -func (s *WsService) readLoop(ctx context.Context, c *websocket.Conn) { - for { - if _, _, err := c.Read(ctx); err != nil { - _ = c.CloseNow() - break - } - } -} - -// ContainerImagePull 镜像拉取 WebSocket 处理 +// ContainerImagePull 镜像拉取 func (s *WsService) ContainerImagePull(w http.ResponseWriter, r *http.Request) { ws, err := s.upgrade(w, r) if err != nil { @@ -187,7 +205,7 @@ func (s *WsService) ContainerImagePull(w http.ResponseWriter, r *http.Request) { } defer func(ws *websocket.Conn) { _ = ws.CloseNow() }(ws) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(r.Context()) defer cancel() _, message, err := ws.Read(ctx) @@ -269,3 +287,26 @@ func (s *WsService) ContainerImagePull(w http.ResponseWriter, r *http.Request) { _ = ws.Write(ctx, websocket.MessageText, completeMsg) _ = ws.Close(websocket.StatusNormalClosure, "") } + +func (s *WsService) upgrade(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) { + opts := &websocket.AcceptOptions{ + CompressionMode: websocket.CompressionContextTakeover, + } + + // debug 模式下不校验 origin,方便 vite 代理调试 + if s.conf.App.Debug { + opts.InsecureSkipVerify = true + } + + return websocket.Accept(w, r, opts) +} + +// readLoop 阻塞直到客户端关闭连接 +func (s *WsService) readLoop(ctx context.Context, c *websocket.Conn) { + for { + if _, _, err := c.Read(ctx); err != nil { + _ = c.CloseNow() + break + } + } +} diff --git a/pkg/shell/exec.go b/pkg/shell/exec.go index ba610afd..f4b3269a 100644 --- a/pkg/shell/exec.go +++ b/pkg/shell/exec.go @@ -10,7 +10,6 @@ import ( "os/exec" "slices" "strings" - "syscall" "time" "github.com/creack/pty" @@ -208,7 +207,7 @@ func ExecfWithTTY(shell string, args ...any) (string, error) { } defer func(f *os.File) { _ = f.Close() }(f) - if _, err = io.Copy(&out, f); ptyError(err) != nil { + if _, err = io.Copy(&out, f); IsPTYError(err) != nil { return "", fmt.Errorf("run %s failed, out: %s, err: %w", shell, strings.TrimSpace(out.String()), err) } if stderr.Len() > 0 { @@ -228,15 +227,3 @@ func preCheckArg(args []any) bool { return true } - -// Linux kernel return EIO when attempting to read from a master pseudo -// terminal which no longer has an open slave. So ignore error here. -// See https://github.com/creack/pty/issues/21 -func ptyError(err error) error { - var pathErr *os.PathError - if !errors.As(err, &pathErr) || !errors.Is(pathErr.Err, syscall.EIO) { - return err - } - - return nil -} diff --git a/pkg/shell/pty.go b/pkg/shell/pty.go new file mode 100644 index 00000000..81cd98b8 --- /dev/null +++ b/pkg/shell/pty.go @@ -0,0 +1,149 @@ +package shell + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "syscall" + + "github.com/coder/websocket" + "github.com/creack/pty" +) + +// MessageResize 终端大小调整消息 +type MessageResize struct { + Resize bool `json:"resize"` + Columns uint `json:"columns"` + Rows uint `json:"rows"` +} + +// Turn PTY 终端 +type Turn struct { + ctx context.Context + ws *websocket.Conn + ptmx *os.File + cmd *exec.Cmd +} + +// NewPTYTurn 使用 PTY 执行命令,返回 Turn 用于流式读取输出 +// 调用方需要负责调用 Close() 和 Wait() +func NewPTYTurn(ctx context.Context, ws *websocket.Conn, shell string, args ...any) (*Turn, error) { + if !preCheckArg(args) { + return nil, errors.New("command contains illegal characters") + } + if len(args) > 0 { + shell = fmt.Sprintf(shell, args...) + } + + _ = os.Setenv("LC_ALL", "C") + cmd := exec.CommandContext(ctx, "bash", "-c", shell) + + ptmx, err := pty.Start(cmd) + if err != nil { + return nil, fmt.Errorf("failed to start pty: %w", err) + } + + return &Turn{ + ctx: ctx, + ws: ws, + ptmx: ptmx, + cmd: cmd, + }, nil +} + +// Write 写入 PTY 输入 +func (t *Turn) Write(data []byte) (int, error) { + return t.ptmx.Write(data) +} + +// Wait 等待命令完成 +func (t *Turn) Wait() { + _ = t.cmd.Wait() +} + +// Close 关闭 PTY +func (t *Turn) Close() { + _ = t.ptmx.Close() +} + +// Handle 从 WebSocket 读取输入写入 PTY +func (t *Turn) Handle(ctx context.Context) error { + var resize MessageResize + + go func() { _ = t.Pipe(ctx) }() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + _, data, err := t.ws.Read(ctx) + if err != nil { + // 通常是客户端关闭连接 + return fmt.Errorf("failed to read ws message: %w", err) + } + + // 判断是否是 resize 消息 + if err = json.Unmarshal(data, &resize); err == nil { + if resize.Resize && resize.Columns > 0 && resize.Rows > 0 { + if err = t.Resize(uint16(resize.Rows), uint16(resize.Columns)); err != nil { + return fmt.Errorf("failed to resize terminal: %w", err) + } + } + continue + } + + if _, err = t.Write(data); err != nil { + return fmt.Errorf("failed to write to pty stdin: %w", err) + } + } + } +} + +// Pipe 从 PTY 读取输出写入 WebSocket +func (t *Turn) Pipe(ctx context.Context) error { + buf := make([]byte, 8192) + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + n, err := t.ptmx.Read(buf) + if err != nil { + if err = IsPTYError(err); err != nil { + return fmt.Errorf("failed to read from pty: %w", err) + } + return nil + } + if n > 0 { + if err = t.ws.Write(ctx, websocket.MessageBinary, buf[:n]); err != nil { + return fmt.Errorf("failed to write to ws: %w", err) + } + } + } + } +} + +// Resize 调整 PTY 窗口大小 +func (t *Turn) Resize(rows, cols uint16) error { + return pty.Setsize(t.ptmx, &pty.Winsize{ + Rows: rows, + Cols: cols, + }) +} + +// IsPTYError Linux kernel return EIO when attempting to read from a master pseudo +// terminal which no longer has an open slave. So ignore error here. +// See https://github.com/creack/pty/issues/21 +func IsPTYError(err error) error { + var pathErr *os.PathError + if !errors.As(err, &pathErr) || !errors.Is(pathErr.Err, syscall.EIO) || !errors.Is(err, io.EOF) { + return err + } + + return nil +} diff --git a/web/src/api/ws/index.ts b/web/src/api/ws/index.ts index a2c0b448..acdf0054 100644 --- a/web/src/api/ws/index.ts +++ b/web/src/api/ws/index.ts @@ -13,6 +13,17 @@ export default { ws.onerror = (e) => reject(e) }) }, + // PTY 命令执行 + pty: (command: string): Promise => { + return new Promise((resolve, reject) => { + const ws = new WebSocket(`${base}/pty`) + ws.onopen = () => { + ws.send(command) + resolve(ws) + } + ws.onerror = (e) => reject(e) + }) + }, // 连接SSH ssh: (id: number): Promise => { return new Promise((resolve, reject) => { diff --git a/web/src/components/common/PtyTerminalModal.vue b/web/src/components/common/PtyTerminalModal.vue new file mode 100644 index 00000000..4fc82527 --- /dev/null +++ b/web/src/components/common/PtyTerminalModal.vue @@ -0,0 +1,303 @@ + + + + + diff --git a/web/src/views/container/ComposeView.vue b/web/src/views/container/ComposeView.vue index c3c4b3e1..81839986 100644 --- a/web/src/views/container/ComposeView.vue +++ b/web/src/views/container/ComposeView.vue @@ -3,6 +3,7 @@ import { NButton, NCheckbox, NDataTable, NFlex, NInput, NPopconfirm, NTag } from import { useGettext } from 'vue3-gettext' import container from '@/api/panel/container' +import PtyTerminalModal from '@/components/common/PtyTerminalModal.vue' import { useFileStore } from '@/store' import { formatDateTime } from '@/utils' @@ -28,6 +29,28 @@ const updateModel = ref({ }) const updateModal = ref(false) +// Compose 启动状态 +const upModal = ref(false) +const upComposeName = ref('') +const upCommand = ref('') + +// 处理 Compose 启动 +const handleComposeUp = (row: any, force: boolean) => { + upComposeName.value = row.name + let cmd = `docker compose -f ${row.path}/docker-compose.yml up -d` + if (force) { + cmd += ' --pull always' + } + upCommand.value = cmd + upModal.value = true +} + +// Compose 启动完成 +const handleUpComplete = () => { + refresh() + forcePull.value = false +} + const columns: any = [ { type: 'selection', fixed: 'left' }, { @@ -104,18 +127,7 @@ const columns: any = [ { showIcon: false, onPositiveClick: () => { - const messageReactive = window.$message.loading($gettext('Starting...'), { - duration: 0 - }) - useRequest(container.composeUp(row.name, forcePull.value)) - .onSuccess(() => { - refresh() - forcePull.value = false - window.$message.success($gettext('Start successful')) - }) - .onComplete(() => { - messageReactive?.destroy() - }) + handleComposeUp(row, forcePull.value) } }, { @@ -391,4 +403,10 @@ onMounted(() => { {{ $gettext('Submit') }} + diff --git a/web/src/views/container/ContainerView.vue b/web/src/views/container/ContainerView.vue index f66ad63b..8d7924c6 100644 --- a/web/src/views/container/ContainerView.vue +++ b/web/src/views/container/ContainerView.vue @@ -1,7 +1,6 @@