Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .claude/worktrees/agent-a3e892705328d7738
Submodule agent-a3e892705328d7738 added at ea3676
1 change: 1 addition & 0 deletions packages/orchestrator/benchmarks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func BenchmarkBaseImageLaunch(b *testing.B) {
sandboxes,
templateCache,
buildMetrics,
nil,
)

buildPath := filepath.Join(os.Getenv("LOCAL_TEMPLATE_STORAGE_BASE_PATH"), buildID, "rootfs.ext4")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ func BenchmarkConcurrentResume(b *testing.B) {
config.BuilderConfig, l, featureFlags, sandboxFactory,
persistenceTemplate, persistenceBuild, artifactRegistry,
dockerhubRepository, sandboxProxy, sandboxes, templateCache, buildMetrics,
nil,
)

// build template if not cached
Expand Down
6 changes: 0 additions & 6 deletions packages/orchestrator/chunks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ message PeerAvailability {
// use_storage is true when the GCS upload has completed and the caller
// should switch to reading from GCS/NFS directly instead of this peer.
bool use_storage = 2;
// memfile_header contains the serialized V4 header (with FrameTables)
// for the memfile, included when use_storage is true and the upload was compressed.
bytes memfile_header = 3;
// rootfs_header contains the serialized V4 header (with FrameTables)
// for the rootfs, included when use_storage is true and the upload was compressed.
bytes rootfs_header = 4;
}

message GetBuildFileSizeRequest {
Expand Down
8 changes: 8 additions & 0 deletions packages/orchestrator/cmd/create-build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,18 @@ func doBuild(
buildMetrics, _ := metrics.NewBuildMetrics(noop.MeterProvider{})
sandboxFactory := sandbox.NewFactory(c.BuilderConfig, networkPool, devicePool, featureFlags, hoststats.NewNoopDelivery(), cgroup.NewNoopManager(), network.NewNoopEgressProxy(), sandboxes)

// Layered V4 builds need the upload coordinator so child layers wait on
// their parents' header finalization. Redis is nil (CLI is single-host —
// no cross-orch signaling needed); local same-orch coordination via
// futures is what matters here.
uploads := sandbox.NewUploads(templateCache, persistenceTemplate, nil)
defer uploads.Stop()

builder := build.NewBuilder(
builderConfig, l, featureFlags, sandboxFactory,
persistenceTemplate, persistenceBuild, artifactRegistry,
dockerhubRepo, sandboxProxy, sandboxes, templateCache, buildMetrics,
uploads,
)

l = l.With(zap.String("envID", templateID)).With(zap.String("buildID", buildID))
Expand Down
8 changes: 6 additions & 2 deletions packages/orchestrator/cmd/resume-build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,14 +638,18 @@ func (r *runner) pauseOnce(ctx context.Context, opts pauseOptions, verbose bool)

// Only upload when not in benchmark mode (verbose = true means single run)
if verbose {
paths := storage.Paths{BuildID: opts.newBuildID}
if opts.isRemoteStorage {
fmt.Println("📤 Uploading snapshot...")
} else {
fmt.Println("💾 Saving snapshot to local storage...")
}

if _, _, err := snapshot.Upload(ctx, r.storage, paths, storage.CompressConfig{}, nil, ""); err != nil {
upload, err := sandbox.NewUpload(ctx, nil, snapshot, r.storage, storage.CompressConfig{}, nil, "")
if err != nil {
return timings, fmt.Errorf("failed to prepare upload: %w", err)
}

if err := upload.Run(ctx); err != nil {
return timings, fmt.Errorf("failed to upload snapshot: %w", err)
}

Expand Down
1 change: 1 addition & 0 deletions packages/orchestrator/cmd/smoketest/smoke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func newTestInfra(t *testing.T, ctx context.Context) *testInfra {
builderConfig, l, flags, factory,
persistenceTemplate, persistenceBuild, artifactRegistry,
dockerhubRepo, sandboxProxy, sandboxes, templateCache, buildMetrics,
nil,
)

return ti
Expand Down
12 changes: 12 additions & 0 deletions packages/orchestrator/pkg/factories/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,13 @@ func run(config cfg.Config, opts Options) (success bool) {
builder := chrooted.NewBuilder(config)
volumeService := volumes.New(config, builder)

uploads := sandbox.NewUploads(templateCache, persistence, redisClient)
closers = append(closers, closer{"pending uploads", func(context.Context) error {
uploads.Stop()

return nil
}})

orchestratorService, err := server.New(server.ServiceConfig{
Config: config,
SandboxFactory: sandboxFactory,
Expand All @@ -561,10 +568,14 @@ func run(config cfg.Config, opts Options) (success bool) {
FeatureFlags: featureFlags,
SbxEventsService: events.NewEventsService(sbxEventsDeliveryTargets),
PeerRegistry: peerRegistry,
Uploads: uploads,
})
if err != nil {
logger.L().Fatal(ctx, "failed to create orchestrator server", zap.Error(err))
}
closers = append(closers, closer{"orchestrator server", func(context.Context) error {
return orchestratorService.Close()
}})

// template manager sandbox logger
tmplSbxLoggerExternal := sbxlogger.NewLogger(
Expand Down Expand Up @@ -639,6 +650,7 @@ func run(config cfg.Config, opts Options) (success bool) {
templateCache,
persistence,
buildPersistence,
uploads,
)
if err != nil {
logger.L().Fatal(ctx, "failed to create template manager", zap.Error(err))
Expand Down
1 change: 1 addition & 0 deletions packages/orchestrator/pkg/sandbox/block/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type ReadonlyDevice interface {
Slicer
BlockSize() int64
Header() *header.Header
SwapHeader(h *header.Header)
}

type Device interface {
Expand Down
22 changes: 14 additions & 8 deletions packages/orchestrator/pkg/sandbox/block/empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"

"github.com/google/uuid"

"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
)

type Empty struct {
header *header.Header
header atomic.Pointer[header.Header]
}

var _ ReadonlyDevice = (*Empty)(nil)
Expand All @@ -26,9 +27,10 @@ func NewEmpty(size int64, blockSize int64, buildID uuid.UUID) (*Empty, error) {
return nil, fmt.Errorf("failed to create header: %w", err)
}

return &Empty{
header: h,
}, nil
e := &Empty{}
e.header.Store(h)

return e, nil
}

func (e *Empty) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
Expand All @@ -41,11 +43,11 @@ func (e *Empty) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
}

func (e *Empty) Size(_ context.Context) (int64, error) {
return int64(e.header.Metadata.Size), nil
return int64(e.Header().Metadata.Size), nil
}

func (e *Empty) BlockSize() int64 {
return int64(e.header.Metadata.BlockSize)
return int64(e.Header().Metadata.BlockSize)
}

func (e *Empty) Close() error {
Expand All @@ -54,7 +56,7 @@ func (e *Empty) Close() error {

func (e *Empty) Slice(_ context.Context, off, length int64) ([]byte, error) {
end := off + length
size := int64(e.header.Metadata.Size)
size := int64(e.Header().Metadata.Size)
if end > size {
end = size
length = end - off
Expand All @@ -65,7 +67,11 @@ func (e *Empty) Slice(_ context.Context, off, length int64) ([]byte, error) {
}

func (e *Empty) Header() *header.Header {
return e.header
return e.header.Load()
}

func (e *Empty) SwapHeader(h *header.Header) {
e.header.Store(h)
}

func (e *Empty) UpdateSize() error {
Expand Down
35 changes: 24 additions & 11 deletions packages/orchestrator/pkg/sandbox/block/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"os"
"sync/atomic"

"github.com/google/uuid"

Expand All @@ -15,7 +16,7 @@ type Local struct {
f *os.File
path string

header *header.Header
header atomic.Pointer[header.Header]
}

var _ ReadonlyDevice = (*Local)(nil)
Expand Down Expand Up @@ -44,11 +45,10 @@ func NewLocal(path string, blockSize int64, buildID uuid.UUID) (*Local, error) {
return nil, fmt.Errorf("failed to create header: %w", err)
}

return &Local{
f: f,
path: path,
header: h,
}, nil
d := &Local{f: f, path: path}
d.header.Store(h)

return d, nil
}

func (d *Local) Path() string {
Expand All @@ -65,11 +65,11 @@ func (d *Local) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
}

func (d *Local) Size(_ context.Context) (int64, error) {
return int64(d.header.Metadata.Size), nil
return int64(d.Header().Metadata.Size), nil
}

func (d *Local) BlockSize() int64 {
return int64(d.header.Metadata.BlockSize)
return int64(d.Header().Metadata.BlockSize)
}

func (d *Local) Close() (e error) {
Expand All @@ -83,7 +83,7 @@ func (d *Local) Close() (e error) {

func (d *Local) Slice(_ context.Context, off, length int64) ([]byte, error) {
end := off + length
size := int64(d.header.Metadata.Size)
size := int64(d.Header().Metadata.Size)
if end > size {
end = size
length = end - off
Expand All @@ -99,7 +99,11 @@ func (d *Local) Slice(_ context.Context, off, length int64) ([]byte, error) {
}

func (d *Local) Header() *header.Header {
return d.header
return d.header.Load()
}

func (d *Local) SwapHeader(h *header.Header) {
d.header.Store(h)
}

func (d *Local) UpdateHeaderSize() error {
Expand All @@ -108,7 +112,16 @@ func (d *Local) UpdateHeaderSize() error {
return fmt.Errorf("failed to get file info: %w", err)
}

d.header.Metadata.Size = uint64(info.Size())
h := d.Header()
metaCopy := *h.Metadata
metaCopy.Size = uint64(info.Size())

updated := &header.Header{
Metadata: &metaCopy,
Builds: h.Builds,
Mapping: h.Mapping,
}
d.SwapHeader(updated)

return nil
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions packages/orchestrator/pkg/sandbox/block/overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,7 @@ func (o *Overlay) Close() error {
func (o *Overlay) Header() *header.Header {
return o.device.Header()
}

func (o *Overlay) SwapHeader(h *header.Header) {
o.device.SwapHeader(h)
}
Loading
Loading