Skip to content

Commit f05d897

Browse files
authored
Attach team_id metadata to uploaded storage objects (#2533)
* Attach team / build lineage labels to uploaded storage objects Adds custom object metadata to GCS/S3 uploads from snapshot/template builds so artifacts can be filtered after upload. Local filesystem backend is a no-op. Per-object labels (every artifact in {buildID}/): - team_id - root_build_id Per-build labels (only on {buildID}/metadata.json): - build_kind (template_layer | sandbox_pause | sandbox_checkpoint) - parent_build_id The Blob.Put / Seekable.StoreFile interfaces gain a variadic PutOption, with WithMetadata as the first option. Snapshot.Upload and TemplateBuild take a SnapshotUploadMetadata{Common, MetadataOnly} to control what goes on every object vs. only on metadata.json. Option types live in a new storageopts subpackage so generated mocks can reference them without an import cycle. * Address review: BaseBuildId for snapshot root, log source-meta error, fix nlreturn - root_build_id for sandbox pause/checkpoint now uses snapshot.MemfileDiffHeader.Metadata.BaseBuildId (the diff-chain root / originating template), preserved across pause -> resume -> pause. Falls back to the snapshot's own build ID if the header is missing. Layer builds keep BuildContext.Template.BuildID since it's the user-requested top-level build (not redundant with the path). - Log a warning when localTemplate.Metadata() fails in BuildLayer instead of silently dropping parent_build_id. - Restore blank lines before returns in template_build.go to satisfy the nlreturn linter. * Use BaseBuildId from header for root_build_id everywhere; omit when unknown root_build_id should be the very first build ID in the lineage chain. Now derived consistently from the memfile header's Metadata.BaseBuildId in all three upload paths: - Sandbox pause/checkpoint: read from snapshot.MemfileDiffHeader - Template layer build: read from snapshot.MemfileDiffHeader after pause - Optimize phase rewrite: read from the cached template's memfile header If the header doesn't carry a usable base build ID, root_build_id is omitted from the labels rather than falling back to a wrong value. * Strip lineage labels: keep only team_id on uploaded objects
1 parent 9eeb211 commit f05d897

20 files changed

Lines changed: 195 additions & 63 deletions

File tree

packages/orchestrator/cmd/copy-build/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (o *osFileBlob) Exists(_ context.Context) (bool, error) {
108108
return true, nil
109109
}
110110

111-
func (o *osFileBlob) Put(_ context.Context, _ []byte) error {
111+
func (o *osFileBlob) Put(_ context.Context, _ []byte, _ ...storage.PutOption) error {
112112
return errors.New("not implemented")
113113
}
114114

packages/orchestrator/cmd/resume-build/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -641,13 +641,13 @@ func (r *runner) pauseOnce(ctx context.Context, opts pauseOptions, verbose bool)
641641
paths := storage.Paths{BuildID: opts.newBuildID}
642642
if opts.isRemoteStorage {
643643
fmt.Println("📤 Uploading snapshot...")
644-
if err := snapshot.Upload(ctx, r.storage, paths); err != nil {
644+
if err := snapshot.Upload(ctx, r.storage, paths, nil); err != nil {
645645
return timings, fmt.Errorf("failed to upload snapshot: %w", err)
646646
}
647647
fmt.Println("✅ Snapshot uploaded successfully")
648648
} else {
649649
fmt.Println("💾 Saving snapshot to local storage...")
650-
if err := snapshot.Upload(ctx, r.storage, paths); err != nil {
650+
if err := snapshot.Upload(ctx, r.storage, paths, nil); err != nil {
651651
return timings, fmt.Errorf("failed to save snapshot: %w", err)
652652
}
653653
fmt.Println("✅ Snapshot saved successfully")
@@ -864,7 +864,7 @@ func (r *runner) collectAndUploadPrefetch(ctx context.Context, opts pauseOptions
864864
Memory: mapping,
865865
})
866866

867-
if err := metadata.UploadMetadata(ctx, r.storage, updatedMeta); err != nil {
867+
if err := metadata.UploadMetadata(ctx, r.storage, updatedMeta, nil); err != nil {
868868
return fmt.Errorf("upload metadata: %w", err)
869869
}
870870

packages/orchestrator/pkg/sandbox/snapshot.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@ type Snapshot struct {
2121
cleanup *Cleanup
2222
}
2323

24+
// Upload writes snapshot artifacts to persistence under paths. objectMetadata
25+
// is attached to every uploaded object; pass nil to skip.
2426
func (s *Snapshot) Upload(
2527
ctx context.Context,
2628
persistence storage.StorageProvider,
2729
paths storage.Paths,
30+
objectMetadata storage.ObjectMetadata,
2831
) error {
2932
var memfilePath *string
3033
switch r := s.MemfileDiff.(type) {
@@ -55,6 +58,7 @@ func (s *Snapshot) Upload(
5558
s.RootfsDiffHeader,
5659
persistence,
5760
paths,
61+
objectMetadata,
5862
)
5963

6064
if err := templateBuild.Upload(

packages/orchestrator/pkg/sandbox/template/peerclient/blob.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,14 @@ func (b *peerBlob) Exists(ctx context.Context) (bool, error) {
7676
)
7777
}
7878

79-
func (b *peerBlob) Put(ctx context.Context, data []byte) error {
79+
func (b *peerBlob) Put(ctx context.Context, data []byte, opts ...storage.PutOption) error {
8080
// Writes always go to the base provider (GCS/S3); the peer is read-only.
8181
fallback, err := b.getOrOpenBase(ctx)
8282
if err != nil {
8383
return err
8484
}
8585

86-
return fallback.Put(ctx, data)
86+
return fallback.Put(ctx, data, opts...)
8787
}
8888

8989
// openPeerBlobStream opens a GetBuildBlob stream, checks peer availability,

packages/orchestrator/pkg/sandbox/template/peerclient/seekable.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,14 @@ func (s *peerSeekable) OpenRangeReader(ctx context.Context, off, length int64) (
120120
)
121121
}
122122

123-
func (s *peerSeekable) StoreFile(ctx context.Context, path string) error {
123+
func (s *peerSeekable) StoreFile(ctx context.Context, path string, opts ...storage.PutOption) error {
124124
// Writes always go to the base provider (GCS/S3); the peer is read-only.
125125
fallback, err := s.getOrOpenBase(ctx)
126126
if err != nil {
127127
return err
128128
}
129129

130-
return fallback.StoreFile(ctx, path)
130+
return fallback.StoreFile(ctx, path, opts...)
131131
}
132132

133133
// openPeerSeekableStream opens a ReadAtBuildSeekable stream, checks peer availability,

packages/orchestrator/pkg/sandbox/template_build.go

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,25 @@ import (
1313
)
1414

1515
type TemplateBuild struct {
16-
paths storage.Paths
17-
persistence storage.StorageProvider
16+
paths storage.Paths
17+
persistence storage.StorageProvider
18+
objectMetadata storage.ObjectMetadata
1819

1920
memfileHeader *headers.Header
2021
rootfsHeader *headers.Header
2122
}
2223

23-
func NewTemplateBuild(memfileHeader *headers.Header, rootfsHeader *headers.Header, persistence storage.StorageProvider, paths storage.Paths) *TemplateBuild {
24+
func NewTemplateBuild(
25+
memfileHeader *headers.Header,
26+
rootfsHeader *headers.Header,
27+
persistence storage.StorageProvider,
28+
paths storage.Paths,
29+
objectMetadata storage.ObjectMetadata,
30+
) *TemplateBuild {
2431
return &TemplateBuild{
25-
persistence: persistence,
26-
paths: paths,
32+
persistence: persistence,
33+
paths: paths,
34+
objectMetadata: objectMetadata,
2735

2836
memfileHeader: memfileHeader,
2937
rootfsHeader: rootfsHeader,
@@ -39,6 +47,14 @@ func (t *TemplateBuild) Remove(ctx context.Context) error {
3947
return nil
4048
}
4149

50+
func (t *TemplateBuild) putOpts() []storage.PutOption {
51+
if len(t.objectMetadata) == 0 {
52+
return nil
53+
}
54+
55+
return []storage.PutOption{storage.WithMetadata(t.objectMetadata)}
56+
}
57+
4258
func (t *TemplateBuild) uploadMemfileHeader(ctx context.Context, h *headers.Header) error {
4359
object, err := t.persistence.OpenBlob(ctx, t.paths.MemfileHeader(), storage.MemfileHeaderObjectType)
4460
if err != nil {
@@ -50,7 +66,7 @@ func (t *TemplateBuild) uploadMemfileHeader(ctx context.Context, h *headers.Head
5066
return fmt.Errorf("error when serializing memfile header: %w", err)
5167
}
5268

53-
err = object.Put(ctx, serialized)
69+
err = object.Put(ctx, serialized, t.putOpts()...)
5470
if err != nil {
5571
return fmt.Errorf("error when uploading memfile header: %w", err)
5672
}
@@ -64,7 +80,7 @@ func (t *TemplateBuild) uploadMemfile(ctx context.Context, memfilePath string) e
6480
return err
6581
}
6682

67-
err = object.StoreFile(ctx, memfilePath)
83+
err = object.StoreFile(ctx, memfilePath, t.putOpts()...)
6884
if err != nil {
6985
return fmt.Errorf("error when uploading memfile: %w", err)
7086
}
@@ -83,7 +99,7 @@ func (t *TemplateBuild) uploadRootfsHeader(ctx context.Context, h *headers.Heade
8399
return fmt.Errorf("error when serializing memfile header: %w", err)
84100
}
85101

86-
err = object.Put(ctx, serialized)
102+
err = object.Put(ctx, serialized, t.putOpts()...)
87103
if err != nil {
88104
return fmt.Errorf("error when uploading memfile header: %w", err)
89105
}
@@ -97,7 +113,7 @@ func (t *TemplateBuild) uploadRootfs(ctx context.Context, rootfsPath string) err
97113
return err
98114
}
99115

100-
err = object.StoreFile(ctx, rootfsPath)
116+
err = object.StoreFile(ctx, rootfsPath, t.putOpts()...)
101117
if err != nil {
102118
return fmt.Errorf("error when uploading rootfs: %w", err)
103119
}
@@ -112,7 +128,7 @@ func (t *TemplateBuild) uploadSnapfile(ctx context.Context, path string) error {
112128
return err
113129
}
114130

115-
if err = uploadFileAsBlob(ctx, object, path); err != nil {
131+
if err = uploadFileAsBlob(ctx, object, path, t.putOpts()...); err != nil {
116132
return fmt.Errorf("error when uploading snapfile: %w", err)
117133
}
118134

@@ -126,14 +142,14 @@ func (t *TemplateBuild) uploadMetadata(ctx context.Context, path string) error {
126142
return err
127143
}
128144

129-
if err := uploadFileAsBlob(ctx, object, path); err != nil {
145+
if err := uploadFileAsBlob(ctx, object, path, t.putOpts()...); err != nil {
130146
return fmt.Errorf("error when uploading metadata: %w", err)
131147
}
132148

133149
return nil
134150
}
135151

136-
func uploadFileAsBlob(ctx context.Context, b storage.Blob, path string) error {
152+
func uploadFileAsBlob(ctx context.Context, b storage.Blob, path string, opts ...storage.PutOption) error {
137153
f, err := os.Open(path)
138154
if err != nil {
139155
return fmt.Errorf("failed to open file %s: %w", path, err)
@@ -145,7 +161,7 @@ func uploadFileAsBlob(ctx context.Context, b storage.Blob, path string) error {
145161
return fmt.Errorf("failed to read file %s: %w", path, err)
146162
}
147163

148-
err = b.Put(ctx, data)
164+
err = b.Put(ctx, data, opts...)
149165
if err != nil {
150166
return fmt.Errorf("failed to write data to object: %w", err)
151167
}

packages/orchestrator/pkg/server/sandboxes.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,7 @@ func (s *Server) Checkpoint(ctx context.Context, in *orchestrator.SandboxCheckpo
667667
defer cancel()
668668
defer res.completeUpload(uploadCtx)
669669

670-
if err := res.snapshot.Upload(uploadCtx, s.persistence, res.paths); err != nil {
670+
if err := res.snapshot.Upload(uploadCtx, s.persistence, res.paths, res.objectMetadata); err != nil {
671671
telemetry.ReportCriticalError(ctx, "error uploading snapshot for checkpoint", err, telemetry.WithSandboxID(in.GetSandboxId()))
672672

673673
s.sandboxFactory.Sandboxes.MarkStopping(ctx, resumedSbx.Runtime.SandboxID, resumedSbx.LifecycleID)
@@ -721,6 +721,7 @@ type snapshotResult struct {
721721
meta metadata.Template
722722
snapshot *sandbox.Snapshot
723723
paths storage.Paths
724+
objectMetadata storage.ObjectMetadata
724725
completeUpload func(ctx context.Context)
725726
}
726727

@@ -765,6 +766,9 @@ func (s *Server) snapshotAndCacheSandbox(
765766
telemetry.ReportEvent(ctx, "added snapshot to template cache")
766767

767768
paths := storage.Paths{BuildID: meta.Template.BuildID}
769+
objectMetadata := storage.ObjectMetadata{
770+
storage.ObjectMetadataTeamID: sbx.Runtime.TeamID,
771+
}
768772

769773
// Register in Redis so other orchestrators can find us for peer routing.
770774
if s.featureFlags.BoolFlag(ctx, featureflags.PeerToPeerChunkTransferFlag) {
@@ -786,6 +790,7 @@ func (s *Server) snapshotAndCacheSandbox(
786790
meta: meta,
787791
snapshot: snapshot,
788792
paths: paths,
793+
objectMetadata: objectMetadata,
789794
completeUpload: completeUpload,
790795
}, nil
791796
}
@@ -794,6 +799,7 @@ func (s *Server) snapshotAndCacheSandbox(
794799
meta: meta,
795800
snapshot: snapshot,
796801
paths: paths,
802+
objectMetadata: objectMetadata,
797803
completeUpload: func(context.Context) {},
798804
}, nil
799805
}
@@ -808,7 +814,7 @@ func (s *Server) uploadSnapshotAsync(ctx context.Context, sbx *sandbox.Sandbox,
808814
defer cancel()
809815
defer res.completeUpload(ctx)
810816

811-
err := res.snapshot.Upload(ctx, s.persistence, res.paths)
817+
err := res.snapshot.Upload(ctx, s.persistence, res.paths, res.objectMetadata)
812818
if err != nil {
813819
sbxlogger.I(sbx).Error(ctx, "error uploading snapshot files", zap.Error(err))
814820

packages/orchestrator/pkg/template/build/layer/layer_executor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,9 @@ func (lb *LayerExecutor) PauseAndUpload(
293293
ctx,
294294
lb.templateStorage,
295295
storage.Paths{BuildID: meta.Template.BuildID},
296+
storage.ObjectMetadata{
297+
storage.ObjectMetadataTeamID: lb.BuildContext.Config.TeamID,
298+
},
296299
)
297300
if err != nil {
298301
return fmt.Errorf("error uploading snapshot: %w", err)

packages/orchestrator/pkg/template/build/phases/optimize/builder.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,9 @@ func (pb *OptimizeBuilder) runSandboxAndCollectPrefetch(
256256

257257
// updateMetadata updates the template metadata in storages.
258258
func (pb *OptimizeBuilder) updateMetadata(ctx context.Context, t metadata.Template) error {
259-
err := metadata.UploadMetadata(ctx, pb.templateStorage, t)
259+
err := metadata.UploadMetadata(ctx, pb.templateStorage, t, storage.ObjectMetadata{
260+
storage.ObjectMetadataTeamID: pb.BuildContext.Config.TeamID,
261+
})
260262
if err != nil {
261263
return err
262264
}

packages/orchestrator/pkg/template/metadata/prefetch.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ func PrefetchEntriesToMapping(entries []block.PrefetchBlockEntry, blockSize int6
4343
}
4444
}
4545

46-
// UploadMetadata uploads the template metadata to storage.
47-
func UploadMetadata(ctx context.Context, persistence storage.StorageProvider, t Template) error {
46+
// UploadMetadata uploads the template metadata to storage. objectMetadata
47+
// is attached to the object; pass nil to skip.
48+
func UploadMetadata(ctx context.Context, persistence storage.StorageProvider, t Template, objectMetadata storage.ObjectMetadata) error {
4849
ctx, span := tracer.Start(ctx, "upload-metadata")
4950
defer span.End()
5051

@@ -60,7 +61,12 @@ func UploadMetadata(ctx context.Context, persistence storage.StorageProvider, t
6061
return fmt.Errorf("failed to serialize metadata: %w", err)
6162
}
6263

63-
err = object.Put(ctx, metaBytes)
64+
var opts []storage.PutOption
65+
if len(objectMetadata) > 0 {
66+
opts = append(opts, storage.WithMetadata(objectMetadata))
67+
}
68+
69+
err = object.Put(ctx, metaBytes, opts...)
6470
if err != nil {
6571
return fmt.Errorf("failed to write metadata: %w", err)
6672
}

0 commit comments

Comments
 (0)