diff --git a/packages/orchestrator/cmd/resume-build/main.go b/packages/orchestrator/cmd/resume-build/main.go index 9294b25c98..c83d22be5c 100644 --- a/packages/orchestrator/cmd/resume-build/main.go +++ b/packages/orchestrator/cmd/resume-build/main.go @@ -71,6 +71,7 @@ func main() { cmdPause := flag.String("cmd-pause", "", "execute command in sandbox, then pause on success") cmdSignalPause := flag.String("cmd-signal-pause", "", "execute command in sandbox, then wait for SIGUSR1 before pausing") optimize := flag.Bool("optimize", false, "collect fresh prefetch mapping after pause (resumes snapshot to record page faults)") + shell := flag.Bool("shell", false, "attach an interactive PTY shell via envd (no sshd required in the sandbox)") flag.Parse() @@ -125,6 +126,10 @@ func main() { log.Fatal("-optimize is incompatible with -iterations (benchmarking doesn't upload)") } + if *shell && (isCmdMode || isPauseMode || *iterations > 0) { + log.Fatal("-shell can only be used in interactive mode (no -cmd, no pause flags, no -iterations)") + } + // Generate new build ID if not specified and pause mode is enabled outputBuildID := *toBuild if isPauseMode && outputBuildID == "" { @@ -159,7 +164,7 @@ func main() { iterations: *iterations, } - err := run(ctx, *fromBuild, *iterations, *coldStart, *noPrefetch, *noEgress, *verbose, pauseOpts, runOpts) + err := run(ctx, *fromBuild, *iterations, *coldStart, *noPrefetch, *noEgress, *verbose, *shell, pauseOpts, runOpts) cancel() if err != nil { @@ -274,6 +279,7 @@ type runner struct { cache *template.Cache coldStart bool noPrefetch bool + shell bool config cfg.BuilderConfig storage storage.StorageProvider } @@ -314,11 +320,23 @@ func (r *runner) interactive(ctx context.Context) error { fmt.Printf("โœ… Running (resumed in %s)\n", time.Since(t0)) fmt.Printf(" sudo nsenter --net=/var/run/netns/%s ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no root@169.254.0.21\n", sbx.Slot.NamespaceID()) - fmt.Println("Ctrl+C to stop") + defer func() { + fmt.Println("๐Ÿงน Cleanup...") + sbx.Close(context.WithoutCancel(ctx)) + }() + + if r.shell { + err := attachShell(ctx, sbx) + if err != nil && !isShellExited(err) { + return err + } + + return nil + } + + fmt.Println("Ctrl+C to stop") <-ctx.Done() - fmt.Println("๐Ÿงน Cleanup...") - sbx.Close(context.WithoutCancel(ctx)) return nil } @@ -959,7 +977,7 @@ func (r *runner) benchmark(ctx context.Context, n int) error { return lastErr } -func run(ctx context.Context, buildID string, iterations int, coldStart, noPrefetch, noEgress, verbose bool, pauseOpts pauseOptions, runOpts runOptions) error { +func run(ctx context.Context, buildID string, iterations int, coldStart, noPrefetch, noEgress, verbose, shell bool, pauseOpts pauseOptions, runOpts runOptions) error { // Silence other loggers unless verbose mode var l logger.Logger if !verbose { @@ -1122,6 +1140,7 @@ func run(ctx context.Context, buildID string, iterations int, coldStart, noPrefe cache: cache, coldStart: coldStart, noPrefetch: noPrefetch, + shell: shell, config: config.BuilderConfig, storage: persistence, sbxConfig: sbxCfg, diff --git a/packages/orchestrator/cmd/resume-build/shell.go b/packages/orchestrator/cmd/resume-build/shell.go new file mode 100644 index 0000000000..5a631e7bbc --- /dev/null +++ b/packages/orchestrator/cmd/resume-build/shell.go @@ -0,0 +1,331 @@ +package main + +import ( + "context" + "errors" + "fmt" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "connectrpc.com/connect" + "golang.org/x/term" + + "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox" + "github.com/e2b-dev/infra/packages/shared/pkg/consts" + "github.com/e2b-dev/infra/packages/shared/pkg/grpc" + "github.com/e2b-dev/infra/packages/shared/pkg/grpc/envd/process" + "github.com/e2b-dev/infra/packages/shared/pkg/grpc/envd/process/processconnect" +) + +// shellExitedError is returned when an interactive shell session ends. +// Callers in pause modes use it to distinguish "user detached" from real errors. +type shellExitedError struct{ exitCode int32 } + +func (e *shellExitedError) Error() string { + return fmt.Sprintf("shell exited with code %d", e.exitCode) +} + +func isShellExited(err error) bool { + var s *shellExitedError + + return errors.As(err, &s) +} + +// shellEnv builds the environment passed into the in-guest PTY shell. +// envd intentionally only inherits PATH/HOME/USER/LOGNAME plus its own +// configured globals, so we must propagate TERM (and a few common locale +// vars) explicitly โ€” otherwise curses apps like htop, tmux, vim and less +// fail to initialise. +func shellEnv() map[string]string { + envs := map[string]string{} + + if t := os.Getenv("TERM"); t != "" { + envs["TERM"] = t + } else { + envs["TERM"] = "xterm-256color" + } + + for _, k := range []string{"LANG", "LC_ALL", "LC_CTYPE", "COLORTERM"} { + if v := os.Getenv(k); v != "" { + envs[k] = v + } + } + + return envs +} + +// attachShell opens an interactive PTY shell against envd inside sbx, +// proxying the host terminal through. It tries /bin/bash -l first and +// falls back to /bin/sh if bash is missing in the guest. +// +// Returns when the in-guest shell exits (Ctrl+D), or when ctx is cancelled. +func attachShell(ctx context.Context, sbx *sandbox.Sandbox) error { + if !term.IsTerminal(int(os.Stdin.Fd())) { + return errors.New("-shell requires an interactive terminal on stdin") + } + + envdURL := fmt.Sprintf("http://%s:%d", sbx.Slot.HostIPString(), consts.DefaultEnvdServerPort) + hc := http.Client{ + // No request timeout โ€” interactive sessions can be long-lived. + Transport: sandbox.SandboxHttpTransport, + } + processC := processconnect.NewProcessClient(&hc, envdURL) + + cols, rows, err := term.GetSize(int(os.Stdout.Fd())) + if err != nil || cols == 0 || rows == 0 { + cols, rows = 80, 24 + } + + for _, candidate := range []struct { + cmd string + args []string + }{ + {"/bin/bash", []string{"-l"}}, + {"/bin/sh", []string{"-l"}}, + } { + err := runShell(ctx, processC, sbx, candidate.cmd, candidate.args, uint32(cols), uint32(rows)) + // Fall back to the next candidate if the shell binary is missing or + // failed before we ever saw any output. + if isMissingShell(err) { + continue + } + + return err + } + + return errors.New("no usable shell found in sandbox (tried /bin/bash, /bin/sh)") +} + +// missingShellError signals that a candidate shell binary failed to launch +// and we should try the next one. +type missingShellError struct{ inner error } + +func (e *missingShellError) Error() string { return e.inner.Error() } +func (e *missingShellError) Unwrap() error { return e.inner } + +func isMissingShell(err error) bool { + var m *missingShellError + + return errors.As(err, &m) +} + +func runShell( + ctx context.Context, + processC processconnect.ProcessClient, + sbx *sandbox.Sandbox, + cmd string, + args []string, + cols, rows uint32, +) error { + startReq := connect.NewRequest(&process.StartRequest{ + Process: &process.ProcessConfig{ + Cmd: cmd, + Args: args, + Envs: shellEnv(), + }, + Pty: &process.PTY{ + Size: &process.PTY_Size{Cols: cols, Rows: rows}, + }, + }) + grpc.SetUserHeader(startReq.Header(), "root") + if sbx.Config.Envd.AccessToken != nil { + startReq.Header().Set("X-Access-Token", *sbx.Config.Envd.AccessToken) + } + + stream, err := processC.Start(ctx, startReq) + if err != nil { + return fmt.Errorf("start %s: %w", cmd, err) + } + defer stream.Close() + + // Wait for the StartEvent so we have a pid to address input/resize at. + var pid uint32 + gotData := false + for stream.Receive() { + event := stream.Msg().GetEvent().GetEvent() + switch e := event.(type) { + case *process.ProcessEvent_Start: + pid = e.Start.GetPid() + case *process.ProcessEvent_Data: + gotData = true + // Push any data that arrived before we exited the bootstrap loop. + if pty := e.Data.GetPty(); pty != nil { + _, _ = os.Stdout.Write(pty) + } + case *process.ProcessEvent_End: + // Process ended before producing output โ€” treat as missing-shell + // so the caller can try the fallback. + if !gotData { + return &missingShellError{inner: fmt.Errorf("%s exited immediately (code %d)", cmd, e.End.GetExitCode())} + } + + return endToError(e.End) + } + if pid != 0 { + break + } + } + if pid == 0 { + if err := stream.Err(); err != nil { + return fmt.Errorf("stream closed before start: %w", err) + } + + return errors.New("no start event received") + } + + fmt.Println("๐Ÿ“Ÿ Attaching shell via envd (Ctrl+D to exit)") + + oldState, err := term.MakeRaw(int(os.Stdin.Fd())) + if err != nil { + return fmt.Errorf("raw mode: %w", err) + } + defer func() { + _ = term.Restore(int(os.Stdin.Fd()), oldState) + fmt.Println() + }() + + sessionCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // Output pump: stream โ†’ stdout. Closes sessionCtx on exit so input/resize + // goroutines unwind too. + endCh := make(chan *process.ProcessEvent_EndEvent, 1) + go func() { + defer cancel() + for stream.Receive() { + event := stream.Msg().GetEvent().GetEvent() + switch e := event.(type) { + case *process.ProcessEvent_Data: + if pty := e.Data.GetPty(); pty != nil { + _, _ = os.Stdout.Write(pty) + } + case *process.ProcessEvent_End: + endCh <- e.End + + return + } + } + }() + + // Input pump: stdin โ†’ StreamInput as PTY bytes. + go pumpInput(sessionCtx, processC, sbx, pid) + + // Resize: forward SIGWINCH to envd via Update. + go pumpResize(sessionCtx, processC, sbx, pid) + + <-sessionCtx.Done() + + select { + case end := <-endCh: + return endToError(end) + default: + } + + if err := stream.Err(); err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("shell stream: %w", err) + } + + return nil +} + +func pumpInput( + ctx context.Context, + processC processconnect.ProcessClient, + sbx *sandbox.Sandbox, + pid uint32, +) { + in := processC.StreamInput(ctx) + grpc.SetUserHeader(in.RequestHeader(), "root") + if sbx.Config.Envd.AccessToken != nil { + in.RequestHeader().Set("X-Access-Token", *sbx.Config.Envd.AccessToken) + } + + if err := in.Send(&process.StreamInputRequest{ + Event: &process.StreamInputRequest_Start{ + Start: &process.StreamInputRequest_StartEvent{ + Process: &process.ProcessSelector{ + Selector: &process.ProcessSelector_Pid{Pid: pid}, + }, + }, + }, + }); err != nil { + return + } + + buf := make([]byte, 4096) + for { + // In raw mode, Read blocks until a byte arrives. We can't easily + // interrupt it on ctx.Done, but the parent process will exit soon + // after the stream closes, which is acceptable for a CLI. + n, err := os.Stdin.Read(buf) + if n > 0 { + data := append([]byte(nil), buf[:n]...) + if sendErr := in.Send(&process.StreamInputRequest{ + Event: &process.StreamInputRequest_Data{ + Data: &process.StreamInputRequest_DataEvent{ + Input: &process.ProcessInput{ + Input: &process.ProcessInput_Pty{Pty: data}, + }, + }, + }, + }); sendErr != nil { + return + } + } + if err != nil { + return + } + if ctx.Err() != nil { + return + } + } +} + +func pumpResize( + ctx context.Context, + processC processconnect.ProcessClient, + sbx *sandbox.Sandbox, + pid uint32, +) { + winch := make(chan os.Signal, 1) + signal.Notify(winch, syscall.SIGWINCH) + defer signal.Stop(winch) + + for { + select { + case <-ctx.Done(): + return + case <-winch: + cols, rows, err := term.GetSize(int(os.Stdout.Fd())) + if err != nil || cols == 0 || rows == 0 { + continue + } + req := connect.NewRequest(&process.UpdateRequest{ + Process: &process.ProcessSelector{ + Selector: &process.ProcessSelector_Pid{Pid: pid}, + }, + Pty: &process.PTY{ + Size: &process.PTY_Size{Cols: uint32(cols), Rows: uint32(rows)}, + }, + }) + grpc.SetUserHeader(req.Header(), "root") + if sbx.Config.Envd.AccessToken != nil { + req.Header().Set("X-Access-Token", *sbx.Config.Envd.AccessToken) + } + updateCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + _, _ = processC.Update(updateCtx, req) + cancel() + } + } +} + +func endToError(end *process.ProcessEvent_EndEvent) error { + if end == nil { + return nil + } + + return &shellExitedError{exitCode: end.GetExitCode()} +} diff --git a/packages/orchestrator/go.mod b/packages/orchestrator/go.mod index cb6773e332..a33bf5e10c 100644 --- a/packages/orchestrator/go.mod +++ b/packages/orchestrator/go.mod @@ -72,6 +72,7 @@ require ( go.uber.org/zap v1.27.1 golang.org/x/sync v0.20.0 golang.org/x/sys v0.43.0 + golang.org/x/term v0.42.0 google.golang.org/api v0.267.0 google.golang.org/grpc v1.80.0 google.golang.org/protobuf v1.36.11 @@ -319,7 +320,6 @@ require ( golang.org/x/mod v0.35.0 // indirect golang.org/x/net v0.53.0 // indirect golang.org/x/oauth2 v0.36.0 // indirect - golang.org/x/term v0.42.0 // indirect golang.org/x/text v0.36.0 // indirect golang.org/x/time v0.14.0 // indirect golang.org/x/tools v0.44.0 // indirect