diff --git a/packages/orchestrator/cmd/resume-build/main.go b/packages/orchestrator/cmd/resume-build/main.go index fe66689035..08a995b939 100644 --- a/packages/orchestrator/cmd/resume-build/main.go +++ b/packages/orchestrator/cmd/resume-build/main.go @@ -7,7 +7,6 @@ import ( "fmt" "log" "math" - "net/http" "os" "os/signal" "path/filepath" @@ -16,7 +15,6 @@ import ( "syscall" "time" - "connectrpc.com/connect" "github.com/containernetworking/plugins/pkg/ns" "github.com/coreos/go-iptables/iptables" "github.com/google/uuid" @@ -39,11 +37,8 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/pkg/tcpfirewall" "github.com/e2b-dev/infra/packages/orchestrator/pkg/template/build/core/rootfs" "github.com/e2b-dev/infra/packages/orchestrator/pkg/template/metadata" - "github.com/e2b-dev/infra/packages/shared/pkg/consts" "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" - "github.com/e2b-dev/infra/packages/shared/pkg/grpc" "github.com/e2b-dev/infra/packages/shared/pkg/grpc/envd/process" - "github.com/e2b-dev/infra/packages/shared/pkg/grpc/envd/process/processconnect" "github.com/e2b-dev/infra/packages/shared/pkg/logger" sbxlogger "github.com/e2b-dev/infra/packages/shared/pkg/logger/sandbox" "github.com/e2b-dev/infra/packages/shared/pkg/storage" @@ -72,8 +67,18 @@ func main() { cmdSignalPause := flag.String("cmd-signal-pause", "", "execute command in sandbox, then wait for SIGUSR1 before pausing") optimize := flag.Bool("optimize", false, "collect fresh prefetch mapping after pause (resumes snapshot to record page faults)") + // Enables the pre-pause reclaim chain with sensible per-step caps. + reclaim := flag.Bool("reclaim", false, "enable pre-pause reclaim chain (sync 500ms, drop_caches 200ms, compact 1s, fstrim 500ms)") + flag.Parse() + if *reclaim { + featureflags.NewIntFlag("reclaim-sync-timeout-ms", 500) + featureflags.NewIntFlag("reclaim-drop-caches-timeout-ms", 200) + featureflags.NewIntFlag("reclaim-compact-memory-timeout-ms", 1000) + featureflags.NewIntFlag("reclaim-fstrim-timeout-ms", 500) + } + if *fromBuild == "" { log.Fatal("-from-build required") } @@ -1163,32 +1168,9 @@ func printTemplateInfo(ctx context.Context, tmpl template.Template, meta metadat } } -// runCommandInSandbox runs a command inside the sandbox via envd +// runCommandInSandbox runs a command inside the sandbox via envd. func runCommandInSandbox(ctx context.Context, sbx *sandbox.Sandbox, command string) error { - // Connect directly to envd on the sandbox - envdURL := fmt.Sprintf("http://%s:%d", sbx.Slot.HostIPString(), consts.DefaultEnvdServerPort) - - hc := http.Client{ - Timeout: 10 * time.Minute, - Transport: sandbox.SandboxHttpTransport, - } - - processC := processconnect.NewProcessClient(&hc, envdURL) - - req := connect.NewRequest(&process.StartRequest{ - Process: &process.ProcessConfig{ - Cmd: "/bin/bash", - Args: []string{"-l", "-c", command}, - }, - }) - grpc.SetUserHeader(req.Header(), "root") - - // Set access token if available - if sbx.Config.Envd.AccessToken != nil { - req.Header().Set("X-Access-Token", *sbx.Config.Envd.AccessToken) - } - - stream, err := processC.Start(ctx, req) + stream, err := sbx.StartEnvdProcess(ctx, command, "root", 0) if err != nil { return fmt.Errorf("failed to start process: %w", err) } diff --git a/packages/orchestrator/pkg/sandbox/envd_process.go b/packages/orchestrator/pkg/sandbox/envd_process.go new file mode 100644 index 0000000000..8777b974c1 --- /dev/null +++ b/packages/orchestrator/pkg/sandbox/envd_process.go @@ -0,0 +1,43 @@ +package sandbox + +import ( + "context" + "fmt" + "net/http" + "strconv" + "time" + + "connectrpc.com/connect" + + "github.com/e2b-dev/infra/packages/shared/pkg/consts" + "github.com/e2b-dev/infra/packages/shared/pkg/grpc" + "github.com/e2b-dev/infra/packages/shared/pkg/grpc/envd/process" + "github.com/e2b-dev/infra/packages/shared/pkg/grpc/envd/process/processconnect" +) + +// StartEnvdProcess opens a streaming Process.Start call against this +// sandbox's envd, running `script` under `/bin/bash -c` as `user`. When +// timeout > 0 it sets `Connect-Timeout-Ms` so envd kills the process +// hard at the deadline. Auth/user headers are wired from sandbox config. +// Caller owns the returned stream (Close + Receive). +func (s *Sandbox) StartEnvdProcess( + ctx context.Context, + script, user string, + timeout time.Duration, +) (*connect.ServerStreamForClient[process.StartResponse], error) { + addr := fmt.Sprintf("http://%s:%d", s.Slot.HostIPString(), consts.DefaultEnvdServerPort) + pc := processconnect.NewProcessClient(&http.Client{Transport: sandboxHttpClient.Transport}, addr) + + req := connect.NewRequest(&process.StartRequest{ + Process: &process.ProcessConfig{Cmd: "/bin/bash", Args: []string{"-c", script}}, + }) + if timeout > 0 { + req.Header().Set("Connect-Timeout-Ms", strconv.FormatInt(int64(timeout/time.Millisecond), 10)) + } + if s.Config.Envd.AccessToken != nil { + req.Header().Set("X-Access-Token", *s.Config.Envd.AccessToken) + } + grpc.SetUserHeader(req.Header(), user) + + return pc.Start(ctx, req) +} diff --git a/packages/orchestrator/pkg/sandbox/reclaim.go b/packages/orchestrator/pkg/sandbox/reclaim.go new file mode 100644 index 0000000000..7ca4d3f4c3 --- /dev/null +++ b/packages/orchestrator/pkg/sandbox/reclaim.go @@ -0,0 +1,100 @@ +package sandbox + +import ( + "context" + "fmt" + "strings" + "time" + + "go.uber.org/zap" + + "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" + "github.com/e2b-dev/infra/packages/shared/pkg/logger" +) + +type reclaimStep struct { + flag featureflags.IntFlag + cmd string +} + +// Order matters: sync makes drop_caches more effective; drop_caches gives +// compact_memory more headroom; fstrim wants a stable FS view. +var reclaimSteps = []reclaimStep{ + {featureflags.ReclaimSyncTimeoutMs, "sync"}, + {featureflags.ReclaimDropCachesTimeoutMs, "echo 3 > /proc/sys/vm/drop_caches"}, + {featureflags.ReclaimCompactMemoryTimeoutMs, "echo 1 > /proc/sys/vm/compact_memory"}, + {featureflags.ReclaimFstrimTimeoutMs, "fstrim -av"}, +} + +// Slack added to the sum of per-step caps to absorb shell start / +// envd round-trip overhead. +const reclaimOuterSlack = 500 * time.Millisecond + +// buildReclaimScript composes a chain where each step has its own +// `timeout --foreground -s KILL` ceiling. Steps with cap=0 are skipped. +// Returns ("", 0) when every step is disabled. +func (s *Sandbox) buildReclaimScript(ctx context.Context) (string, time.Duration) { + var ( + parts []string + sum time.Duration + ) + for _, st := range reclaimSteps { + ms := s.featureFlags.IntFlag(ctx, st.flag) + if ms <= 0 { + continue + } + // `timeout` accepts fractional seconds (s/m/h/d), not `ms`. + // `--foreground` ensures SIGKILL actually reaches the child when we + // run inside a non-interactive bash invoked from envd. Per-step + // stdout/stderr is dropped; any non-zero status is captured into + // `rc` so the script's overall exit code surfaces failures without + // short-circuiting subsequent steps. + secs := float64(ms) / 1000.0 + parts = append(parts, fmt.Sprintf("timeout --foreground -s KILL %.3f sh -c %q >/dev/null 2>&1 || rc=$?", secs, st.cmd)) + sum += time.Duration(ms) * time.Millisecond + } + if len(parts) == 0 { + return "", 0 + } + + return "rc=0; " + strings.Join(parts, "; ") + "; exit $rc", sum + reclaimOuterSlack +} + +// bestEffortReclaim asks envd to reclaim guest memory + disk before pause. +// Per-step stdout/stderr is silenced inside the guest; we only log when +// envd itself errors or the script reports a non-zero exit code. +func (s *Sandbox) bestEffortReclaim(ctx context.Context) { + ctx, span := tracer.Start(ctx, "envd-reclaim") + defer span.End() + + script, timeout := s.buildReclaimScript(ctx) + if script == "" { + return + } + + rcCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + stream, err := s.StartEnvdProcess(rcCtx, script, "root", timeout) + if err != nil { + logger.L().Warn(ctx, "envd reclaim failed", logger.WithSandboxID(s.Runtime.SandboxID), zap.Error(err)) + + return + } + defer stream.Close() + + var exitCode int32 + for stream.Receive() { + if end := stream.Msg().GetEvent().GetEnd(); end != nil { + exitCode = end.GetExitCode() + } + } + if err := stream.Err(); err != nil { + logger.L().Warn(ctx, "envd reclaim stream error", logger.WithSandboxID(s.Runtime.SandboxID), zap.Error(err)) + + return + } + if exitCode != 0 { + logger.L().Warn(ctx, "envd reclaim non-zero exit", logger.WithSandboxID(s.Runtime.SandboxID), zap.Int32("exit_code", exitCode)) + } +} diff --git a/packages/orchestrator/pkg/sandbox/sandbox.go b/packages/orchestrator/pkg/sandbox/sandbox.go index f69100015d..0ce563c9d8 100644 --- a/packages/orchestrator/pkg/sandbox/sandbox.go +++ b/packages/orchestrator/pkg/sandbox/sandbox.go @@ -217,6 +217,8 @@ type Sandbox struct { files *storage.SandboxFiles cleanup *Cleanup + featureFlags *featureflags.Client + process *fc.Process cgroupHandle *cgroup.CgroupHandle @@ -457,7 +459,8 @@ func (f *Factory) CreateSandbox( files: sandboxFiles, process: fcHandle, - cleanup: cleanup, + cleanup: cleanup, + featureFlags: f.featureFlags, APIStoredConfig: apiConfigToStore, @@ -797,7 +800,8 @@ func (f *Factory) ResumeSandbox( files: sandboxFiles, process: fcHandle, - cleanup: cleanup, + cleanup: cleanup, + featureFlags: f.featureFlags, APIStoredConfig: apiConfigToStore, CABundle: f.egressProxy.CABundle(), @@ -1051,6 +1055,11 @@ func (s *Sandbox) Pause( // Stop the health check before pausing the VM s.Checks.Stop() + // Best-effort pre-pause guest reclaim (sync, drop_caches, compact_memory, + // fstrim) on the live VM via envd. Per-step caps are LD-flag-driven; all + // default to 0 which disables the chain entirely. Non-fatal. + s.bestEffortReclaim(ctx) + if err := s.process.Pause(ctx); err != nil { return nil, fmt.Errorf("failed to pause VM: %w", err) } diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index 83a4a0c11c..a1a163a28b 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -157,7 +157,16 @@ var ( BestOfKMaxOvercommit = NewIntFlag("best-of-k-max-overcommit", 400) // Default R=4 (stored as percentage, max over-commit ratio) BestOfKAlpha = NewIntFlag("best-of-k-alpha", 50) // Default Alpha=0.5 (stored as percentage for int flag, current usage weight) EnvdInitTimeoutMilliseconds = NewIntFlag("envd-init-request-timeout-milliseconds", 50) // Timeout for envd init request in milliseconds - HostStatsSamplingInterval = NewIntFlag("host-stats-sampling-interval", 5000) // Host stats sampling interval in milliseconds (default 5s) + // Per-step ceilings for the pre-pause guest reclaim chain (run via envd + // before snapshot). Each step is wrapped in `timeout -s KILL`; 0 skips it. + // All default to 0, so the feature is disabled until an operator opts in + // per step. A stuck step cannot starve the rest, so compact_memory always + // runs as long as its own cap is > 0. + ReclaimSyncTimeoutMs = NewIntFlag("reclaim-sync-timeout-ms", 0) + ReclaimDropCachesTimeoutMs = NewIntFlag("reclaim-drop-caches-timeout-ms", 0) + ReclaimCompactMemoryTimeoutMs = NewIntFlag("reclaim-compact-memory-timeout-ms", 0) + ReclaimFstrimTimeoutMs = NewIntFlag("reclaim-fstrim-timeout-ms", 0) + HostStatsSamplingInterval = NewIntFlag("host-stats-sampling-interval", 5000) // Host stats sampling interval in milliseconds (default 5s) MaxCacheWriterConcurrencyFlag = NewIntFlag("max-cache-writer-concurrency", 10) // BuildCacheMaxUsagePercentage the maximum percentage of the cache disk storage