Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions packages/orchestrator/cmd/resume-build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,14 +638,13 @@ func (r *runner) pauseOnce(ctx context.Context, opts pauseOptions, verbose bool)

// Only upload when not in benchmark mode (verbose = true means single run)
if verbose {
paths := storage.Paths{BuildID: opts.newBuildID}
if opts.isRemoteStorage {
fmt.Println("📤 Uploading snapshot...")
} else {
fmt.Println("💾 Saving snapshot to local storage...")
}

if _, _, err := snapshot.Upload(ctx, r.storage, paths, storage.CompressConfig{}, nil, ""); err != nil {
if _, _, err := snapshot.Upload(ctx, r.storage, storage.CompressConfig{}, nil, "", nil); err != nil {
return timings, fmt.Errorf("failed to upload snapshot: %w", err)
}

Expand Down
1 change: 1 addition & 0 deletions packages/orchestrator/pkg/sandbox/block/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type ReadonlyDevice interface {
Slicer
BlockSize() int64
Header() *header.Header
SwapHeader(h *header.Header)
}

type Device interface {
Expand Down
22 changes: 14 additions & 8 deletions packages/orchestrator/pkg/sandbox/block/empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"

"github.com/google/uuid"

"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
)

type Empty struct {
header *header.Header
header atomic.Pointer[header.Header]
}

var _ ReadonlyDevice = (*Empty)(nil)
Expand All @@ -26,9 +27,10 @@ func NewEmpty(size int64, blockSize int64, buildID uuid.UUID) (*Empty, error) {
return nil, fmt.Errorf("failed to create header: %w", err)
}

return &Empty{
header: h,
}, nil
e := &Empty{}
e.header.Store(h)

return e, nil
}

func (e *Empty) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
Expand All @@ -41,11 +43,11 @@ func (e *Empty) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
}

func (e *Empty) Size(_ context.Context) (int64, error) {
return int64(e.header.Metadata.Size), nil
return int64(e.Header().Metadata.Size), nil
}

func (e *Empty) BlockSize() int64 {
return int64(e.header.Metadata.BlockSize)
return int64(e.Header().Metadata.BlockSize)
}

func (e *Empty) Close() error {
Expand All @@ -54,7 +56,7 @@ func (e *Empty) Close() error {

func (e *Empty) Slice(_ context.Context, off, length int64) ([]byte, error) {
end := off + length
size := int64(e.header.Metadata.Size)
size := int64(e.Header().Metadata.Size)
if end > size {
end = size
length = end - off
Expand All @@ -65,7 +67,11 @@ func (e *Empty) Slice(_ context.Context, off, length int64) ([]byte, error) {
}

func (e *Empty) Header() *header.Header {
return e.header
return e.header.Load()
}

func (e *Empty) SwapHeader(h *header.Header) {
e.header.Store(h)
}

func (e *Empty) UpdateSize() error {
Expand Down
26 changes: 15 additions & 11 deletions packages/orchestrator/pkg/sandbox/block/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"os"
"sync/atomic"

"github.com/google/uuid"

Expand All @@ -15,7 +16,7 @@ type Local struct {
f *os.File
path string

header *header.Header
header atomic.Pointer[header.Header]
}

var _ ReadonlyDevice = (*Local)(nil)
Expand Down Expand Up @@ -44,11 +45,10 @@ func NewLocal(path string, blockSize int64, buildID uuid.UUID) (*Local, error) {
return nil, fmt.Errorf("failed to create header: %w", err)
}

return &Local{
f: f,
path: path,
header: h,
}, nil
d := &Local{f: f, path: path}
d.header.Store(h)

return d, nil
}

func (d *Local) Path() string {
Expand All @@ -65,11 +65,11 @@ func (d *Local) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
}

func (d *Local) Size(_ context.Context) (int64, error) {
return int64(d.header.Metadata.Size), nil
return int64(d.Header().Metadata.Size), nil
}

func (d *Local) BlockSize() int64 {
return int64(d.header.Metadata.BlockSize)
return int64(d.Header().Metadata.BlockSize)
}

func (d *Local) Close() (e error) {
Expand All @@ -83,7 +83,7 @@ func (d *Local) Close() (e error) {

func (d *Local) Slice(_ context.Context, off, length int64) ([]byte, error) {
end := off + length
size := int64(d.header.Metadata.Size)
size := int64(d.Header().Metadata.Size)
if end > size {
end = size
length = end - off
Expand All @@ -99,7 +99,11 @@ func (d *Local) Slice(_ context.Context, off, length int64) ([]byte, error) {
}

func (d *Local) Header() *header.Header {
return d.header
return d.header.Load()
}

func (d *Local) SwapHeader(h *header.Header) {
d.header.Store(h)
}

func (d *Local) UpdateHeaderSize() error {
Expand All @@ -108,7 +112,7 @@ func (d *Local) UpdateHeaderSize() error {
return fmt.Errorf("failed to get file info: %w", err)
}

d.header.Metadata.Size = uint64(info.Size())
d.Header().Metadata.Size = uint64(info.Size())

return nil
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions packages/orchestrator/pkg/sandbox/block/overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,7 @@ func (o *Overlay) Close() error {
func (o *Overlay) Header() *header.Header {
return o.device.Header()
}

func (o *Overlay) SwapHeader(h *header.Header) {
o.device.SwapHeader(h)
}
57 changes: 16 additions & 41 deletions packages/orchestrator/pkg/sandbox/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync/atomic"

"github.com/google/uuid"
"go.uber.org/zap"

blockmetrics "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/block/metrics"
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
Expand Down Expand Up @@ -42,21 +41,16 @@ func NewFile(
return f
}

// Header returns the current header. After a peer transition the header may
// have been atomically swapped to a V4 header containing FrameTables.
// Header returns the current header.
func (b *File) Header() *header.Header {
return b.header.Load()
}

// maxTransitionRetries caps the number of header-swap retries when the peer
// signals upload completion via PeerTransitionedError. After a successful CAS,
// subsequent swapHeader calls are no-ops, so without a limit the loop would
// retry the same failing read forever.
const maxTransitionRetries = 2
func (b *File) SwapHeader(h *header.Header) {
b.header.Store(h)
}

func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
transitionRetries := 0

for n < len(p) {
h := b.header.Load()

Expand Down Expand Up @@ -109,10 +103,13 @@ func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err erro
ft,
)
if err != nil {
if retry, swapErr := b.retryOnTransition(ctx, err, &transitionRetries); retry {
var transErr *storage.PeerTransitionedError
Comment thread
levb marked this conversation as resolved.
Outdated
if errors.As(err, &transErr) {
if swapErr := b.swapHeader(transErr); swapErr != nil {
return 0, fmt.Errorf("failed to swap header: %w", swapErr)
}

continue
} else if swapErr != nil {
return 0, swapErr
}

return 0, fmt.Errorf("failed to read from source: %w", err)
Expand All @@ -126,8 +123,6 @@ func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err erro

// The slice access must be in the predefined blocksize of the build.
func (b *File) Slice(ctx context.Context, off, _ int64) ([]byte, error) {
transitionRetries := 0

for {
h := b.header.Load()

Expand All @@ -150,10 +145,13 @@ func (b *File) Slice(ctx context.Context, off, _ int64) ([]byte, error) {

result, err := diff.Slice(ctx, int64(mappedBuild.Offset), int64(h.Metadata.BlockSize), ft)
if err != nil {
if retry, swapErr := b.retryOnTransition(ctx, err, &transitionRetries); retry {
var transErr *storage.PeerTransitionedError
if errors.As(err, &transErr) {
if swapErr := b.swapHeader(transErr); swapErr != nil {
return nil, fmt.Errorf("failed to swap header: %w", swapErr)
}

continue
} else if swapErr != nil {
return nil, swapErr
}

return nil, err
Expand All @@ -163,29 +161,6 @@ func (b *File) Slice(ctx context.Context, off, _ int64) ([]byte, error) {
}
}

// retryOnTransition checks if err is a PeerTransitionedError and swaps the
// header if the retry budget allows. Returns (true, nil) to signal the caller
// should continue the loop, or (false, swapErr) if the swap itself failed.
func (b *File) retryOnTransition(ctx context.Context, err error, retries *int) (retry bool, swapErr error) {
var transErr *storage.PeerTransitionedError
if !errors.As(err, &transErr) || *retries >= maxTransitionRetries {
return false, nil
}

*retries++

logger.L().Info(ctx, "peer transition detected, swapping header",
zap.String("file_type", string(b.fileType)),
zap.Int("retry", *retries),
)

if swapErr := b.swapHeader(transErr); swapErr != nil {
return false, fmt.Errorf("failed to swap header: %w", swapErr)
}

return true, nil
}

// swapHeader atomically replaces the header when the peer signals upload
// completion. Only the first goroutine to CAS succeeds; others just retry
// with the already-swapped header. The caller's retry counter bounds
Expand Down
Loading
Loading