Skip to content

Commit dd91ea8

Browse files
authored
test(uffd): add cross-process scaffolding for gated and async ops (#2475)
* test(uffd): add cross-process scaffolding for gated and async ops Pre-work for the upcoming REMOVE event handling PR. Lands as a runtime no-op on main: no production code changes, no new UFFD features enabled. Test-helper additions (all *_test.go): - testConfig.gated and per-op `async bool` - operationModeRemove / ServePause / ServeResume / Sleep - testHandler.servePause / serveResume hooks driven by control pipes (fd 7/8) into the helper subprocess - helper subprocess pause/resume goroutine that tears down and restarts the UFFD serve loop on demand - executeAll() async path that fans out goroutines and joins on test context - expectRemoved + checkDirtiness handling for read/write/remove ordering - executeRemove via unix.MADV_DONTNEED - unregister() helper around UFFDIO_UNREGISTER for cleanup - pageStateEntries() now drains settleRequests then takes the pageTracker RLock directly so the snapshot stays correct for future writers (e.g. REMOVE) that don't go through settleRequests operationModeRemove and the gated/async paths are exercised only by the follow-up PR; here they are dead code at runtime, kept to minimise the diff of that PR. * test(uffd): restore early uffd close cleanup, keep unregister late * test(uffd): make gated cleanup idempotent to avoid pause-then-exit hang Codex review caught a deadlock: in GO_GATED mode, handling 'P' replaced the stop closure with one that drained `exitUffd`, but the deferred final cleanup still pointed at the original closure. If the helper exited between 'P' and 'R' (e.g. parent test failure, ctx cancel, SIGUSR1), the defer ran the original cleanup again and blocked forever on `<-exitUffd` — the channel had already been drained and the original Serve goroutine had already exited. The hung helper would in turn hang the parent's cmd.Wait(). Replace the two-variable (cleanup / stopServe) layout with a single mutex-guarded `stopFn` that is reset to a no-op after firing, and have both the gated 'P' handler and the deferred final cleanup go through the same `stopServe` wrapper. 'R' now just installs a fresh `stopFn` that drains the new goroutine. * test(uffd): drop REMOVE-specific bits, keep only gated/async scaffolding Pull out everything that's not strictly needed for "gated pause/resume + async ops" so PR T is the smallest possible scaffolding diff and the follow-up REMOVE PR carries those pieces alongside the production code that actually needs them: - helpers_test.go: drop operationModeRemove, executeRemove, expectRemoved and the REMOVE-aware checkDirtiness branch — none of the PR T tests call them, and they only become meaningful once UFFD_FEATURE_EVENT_REMOVE is set in the follow-up. - fd_helpers_test.go: revert; drop unregister() helper. Without UFFD_FEATURE_EVENT_REMOVE, MADV_DONTNEED never queues UFFD events, so munmap doesn't block on un-acked events and there's nothing to unregister around. - cross_process_helpers_test.go: - drop unregister(uffdFd, ...) call in late cleanup (same reason as above). - revert pageStateEntries() to the simple settleRequests.Lock pattern; the rework was forward-looking for a REMOVE handler that doesn't go through settleRequests and is dead code on main. - keep cleanup → stopFn/stopServe restructure (mandatory for the gated tear-down/respawn path) but restore the original fdExit.SignalExit error handling that the previous version had silently dropped. - revert incidental cosmetic reformats (exitUffd placement, defer multi-line) so the diff is purely additive in the unchanged paths. * test(uffd): address review comments on cross-process scaffolding - Guard executeOperation against nil servePause/serveResume hooks when a non-gated test misuses operationModeServePause/Resume — return a clear error instead of panicking on a nil function call (cursor + codex). - Make pageStateEntries() also hold pageTracker.mu.RLock() so the snapshot stays correct for any future writer that doesn't go through settleRequests (claude — matches the PR description). - Make gated 'P' check the gateSyncFile.Write error and propagate via cancel() so a write failure doesn't leave the parent's servePause closure blocked indefinitely on Read (claude). - Make the gated command goroutine reject 'P'-when-paused and 'R'-when-running so a stray or duplicate resume can't leak an untracked Serve goroutine and break later pauses (codex). * test(uffd): serialize executeRead under shared RWMutex with executeWrite Previously executeWrite took h.mutex.Lock() but executeRead held no lock, so an async write + async read on the same page would race on memoryArea under go test -race even though the bytes happen to match (data.Slice is deterministic per offset). Switch h.mutex to sync.RWMutex; reads take RLock so concurrent reads still trigger UFFD faults in parallel, writes take Lock so they exclude both other writers and any in-flight reads. No active test in this PR mixes async read/write yet; this is preemptive against the upcoming REMOVE PR's matrix tests.
1 parent f05d897 commit dd91ea8

2 files changed

Lines changed: 222 additions & 16 deletions

File tree

packages/orchestrator/pkg/sandbox/uffd/userfaultfd/cross_process_helpers_test.go

Lines changed: 156 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"slices"
2020
"strconv"
2121
"strings"
22+
"sync"
2223
"syscall"
2324
"testing"
2425

@@ -110,6 +111,9 @@ func configureCrossProcessTest(t *testing.T, tt testConfig) (*testHandler, error
110111
if tt.alwaysWP {
111112
cmd.Env = append(cmd.Env, "GO_ALWAYS_WP=1")
112113
}
114+
if tt.gated {
115+
cmd.Env = append(cmd.Env, "GO_GATED=1")
116+
}
113117

114118
dup, err := syscall.Dup(int(uffdFd))
115119
require.NoError(t, err)
@@ -153,12 +157,34 @@ func configureCrossProcessTest(t *testing.T, tt testConfig) (*testHandler, error
153157
readySignal <- struct{}{}
154158
}()
155159

156-
cmd.ExtraFiles = []*os.File{
160+
extraFiles := []*os.File{
157161
uffdFile,
158162
contentReader,
159163
offsetsWriter,
160164
readyWriter,
161165
}
166+
167+
var gateCmdWriter *os.File
168+
var gateSyncReader *os.File
169+
if tt.gated {
170+
var gateCmdReader *os.File
171+
gateCmdReader, gateCmdWriter, err = os.Pipe()
172+
require.NoError(t, err)
173+
174+
var gateSyncWriter *os.File
175+
gateSyncReader, gateSyncWriter, err = os.Pipe()
176+
require.NoError(t, err)
177+
178+
t.Cleanup(func() {
179+
gateCmdWriter.Close()
180+
gateSyncReader.Close()
181+
})
182+
183+
extraFiles = append(extraFiles, gateCmdReader) // fd 7
184+
extraFiles = append(extraFiles, gateSyncWriter) // fd 8
185+
}
186+
187+
cmd.ExtraFiles = extraFiles
162188
cmd.Stdout = os.Stdout
163189
cmd.Stderr = os.Stderr
164190

@@ -169,6 +195,10 @@ func configureCrossProcessTest(t *testing.T, tt testConfig) (*testHandler, error
169195
offsetsWriter.Close()
170196
readyWriter.Close()
171197
uffdFile.Close()
198+
if tt.gated {
199+
extraFiles[4].Close() // gateCmdReader
200+
extraFiles[5].Close() // gateSyncWriter
201+
}
172202

173203
t.Cleanup(func() {
174204
signalErr := cmd.Process.Signal(syscall.SIGUSR1)
@@ -238,12 +268,31 @@ func configureCrossProcessTest(t *testing.T, tt testConfig) (*testHandler, error
238268
case <-readySignal:
239269
}
240270

241-
return &testHandler{
271+
h := &testHandler{
242272
memoryArea: &memoryArea,
243273
pagesize: tt.pagesize,
244274
data: data,
245275
pageStatesOnce: pageStatesOnce,
246-
}, nil
276+
}
277+
278+
if tt.gated {
279+
h.servePause = func() error {
280+
if _, err := gateCmdWriter.Write([]byte{'P'}); err != nil {
281+
return err
282+
}
283+
var buf [1]byte
284+
_, err := gateSyncReader.Read(buf[:])
285+
286+
return err
287+
}
288+
h.serveResume = func() error {
289+
_, err := gateCmdWriter.Write([]byte{'R'})
290+
291+
return err
292+
}
293+
}
294+
295+
return h, nil
247296
}
248297

249298
// Secondary process, orchestrator in our case
@@ -378,22 +427,111 @@ func crossProcessServe() error {
378427
}
379428
}()
380429

381-
cleanup := func() {
382-
err := fdExit.SignalExit()
383-
if err != nil {
384-
msg := fmt.Errorf("error signaling exit: %w", err)
430+
// stopFn drains whichever Serve goroutine is currently running. The
431+
// running flag plus stopMu makes both pause-then-exit (no resume in
432+
// between) and pause-resume-pause-exit safe, and rejects nonsensical
433+
// command sequences from the gated channel: 'P' when already paused
434+
// is a no-op, 'R' when already running is a no-op so a stray or
435+
// duplicate resume can't leak an untracked Serve goroutine and break
436+
// later pauses.
437+
var (
438+
stopMu sync.Mutex
439+
running = true
440+
stopFn = func() {
441+
err := fdExit.SignalExit()
442+
if err != nil {
443+
msg := fmt.Errorf("error signaling exit: %w", err)
385444

386-
fmt.Fprint(os.Stderr, msg.Error())
445+
fmt.Fprint(os.Stderr, msg.Error())
387446

388-
cancel(msg)
447+
cancel(msg)
448+
449+
return
450+
}
451+
452+
<-exitUffd
453+
}
454+
)
455+
456+
stopServe := func() {
457+
stopMu.Lock()
458+
if !running {
459+
stopMu.Unlock()
389460

390461
return
391462
}
463+
fn := stopFn
464+
stopFn = func() {}
465+
running = false
466+
stopMu.Unlock()
392467

393-
<-exitUffd
468+
fn()
394469
}
395470

396-
defer cleanup()
471+
defer stopServe()
472+
473+
if os.Getenv("GO_GATED") == "1" {
474+
gateCmdFile := os.NewFile(uintptr(7), "gate-cmd")
475+
defer gateCmdFile.Close()
476+
477+
gateSyncFile := os.NewFile(uintptr(8), "gate-sync")
478+
defer gateSyncFile.Close()
479+
480+
startServe := func() {
481+
stopMu.Lock()
482+
if running {
483+
stopMu.Unlock()
484+
485+
return
486+
}
487+
stopMu.Unlock()
488+
489+
newExit, fdErr := fdexit.New()
490+
if fdErr != nil {
491+
cancel(fmt.Errorf("error creating fd exit: %w", fdErr))
492+
493+
return
494+
}
495+
496+
done := make(chan struct{})
497+
go func() {
498+
defer close(done)
499+
if err := uffd.Serve(ctx, newExit); err != nil {
500+
cancel(fmt.Errorf("error serving: %w", err))
501+
}
502+
}()
503+
504+
stopMu.Lock()
505+
stopFn = func() {
506+
newExit.SignalExit()
507+
<-done
508+
newExit.Close()
509+
}
510+
running = true
511+
stopMu.Unlock()
512+
}
513+
514+
go func() {
515+
var buf [1]byte
516+
for {
517+
if _, err := gateCmdFile.Read(buf[:]); err != nil {
518+
return
519+
}
520+
521+
switch buf[0] {
522+
case 'P':
523+
stopServe()
524+
if _, err := gateSyncFile.Write([]byte{1}); err != nil {
525+
cancel(fmt.Errorf("writing gate sync: %w", err))
526+
527+
return
528+
}
529+
case 'R':
530+
startServe()
531+
}
532+
}
533+
}()
534+
}
397535

398536
exitSignal := make(chan os.Signal, 1)
399537
signal.Notify(exitSignal, syscall.SIGUSR1)
@@ -423,12 +561,17 @@ type pageStateEntry struct {
423561
}
424562

425563
// pageStateEntries returns a snapshot of every tracked page and its state.
426-
// It holds the settleRequests write lock so no in-flight faultPage worker
427-
// can mutate the pageTracker while we iterate.
564+
// It first drains in-flight faultPage workers via settleRequests.Lock(), then
565+
// holds the pageTracker RLock while iterating so any future writer that
566+
// doesn't go through settleRequests (e.g. the upcoming REMOVE handler)
567+
// still can't mutate the map under us.
428568
func (u *Userfaultfd) pageStateEntries() ([]pageStateEntry, error) {
429569
u.settleRequests.Lock()
430570
defer u.settleRequests.Unlock()
431571

572+
u.pageTracker.mu.RLock()
573+
defer u.pageTracker.mu.RUnlock()
574+
432575
entries := make([]pageStateEntry, 0, len(u.pageTracker.m))
433576
for addr, state := range u.pageTracker.m {
434577
offset, err := u.ma.GetOffset(addr)

packages/orchestrator/pkg/sandbox/uffd/userfaultfd/helpers_test.go

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ package userfaultfd
33
import (
44
"bytes"
55
"context"
6+
"errors"
67
"fmt"
78
"slices"
89
"sync"
910
"testing"
11+
"time"
1012
"unsafe"
1113

1214
"github.com/RoaringBitmap/roaring/v2"
@@ -26,19 +28,28 @@ type testConfig struct {
2628
operations []operation
2729
// alwaysWP makes the handler copy with UFFDIO_COPY_MODE_WP for all faults.
2830
alwaysWP bool
31+
// gated enables pause/resume control over the handler's serve loop.
32+
gated bool
2933
}
3034

3135
type operationMode uint32
3236

3337
const (
3438
operationModeRead operationMode = 1 << iota
3539
operationModeWrite
40+
operationModeServePause
41+
operationModeServeResume
42+
// operationModeSleep pauses for a short duration to let async goroutines
43+
// enter their blocking syscalls before proceeding.
44+
operationModeSleep
3645
)
3746

3847
type operation struct {
3948
// Offset in bytes. Must be smaller than the (numberOfPages-1) * pagesize as it reads a page and it must be aligned to the pagesize from the testConfig.
4049
offset int64
4150
mode operationMode
51+
// async runs the operation in a background goroutine.
52+
async bool
4253
}
4354

4455
// handlerPageStates is a snapshot of the pageTracker grouped by state. It
@@ -71,16 +82,43 @@ type testHandler struct {
7182
// pageStatesOnce returns a per-state snapshot of the handler's pageTracker.
7283
// It can only be called once.
7384
pageStatesOnce func() (handlerPageStates, error)
74-
mutex sync.Mutex
85+
// servePause and serveResume gate the UFFD event loop in the child process.
86+
// Tests use them to deterministically batch a sequence of UFFD events
87+
// before more faults are processed.
88+
servePause func() error
89+
serveResume func() error
90+
mutex sync.RWMutex
7591
}
7692

7793
func (h *testHandler) executeAll(t *testing.T, operations []operation) {
7894
t.Helper()
7995

96+
var asyncErrors []chan error
97+
8098
for i, op := range operations {
99+
if op.async {
100+
errCh := make(chan error, 1)
101+
asyncErrors = append(asyncErrors, errCh)
102+
103+
go func() {
104+
errCh <- h.executeOperation(t.Context(), op)
105+
}()
106+
107+
continue
108+
}
109+
81110
err := h.executeOperation(t.Context(), op)
82111
require.NoError(t, err, "step %d: %v at offset %d", i, op.mode, op.offset)
83112
}
113+
114+
for _, errCh := range asyncErrors {
115+
select {
116+
case err := <-errCh:
117+
require.NoError(t, err, "async operation")
118+
case <-t.Context().Done():
119+
t.Fatal("timed out waiting for async operation")
120+
}
121+
}
84122
}
85123

86124
type pageExpectation uint8
@@ -135,19 +173,43 @@ func (h *testHandler) executeOperation(ctx context.Context, op operation) error
135173
return h.executeRead(ctx, op)
136174
case operationModeWrite:
137175
return h.executeWrite(ctx, op)
176+
case operationModeServePause:
177+
if h.servePause == nil {
178+
return errors.New("operationModeServePause requires testConfig.gated = true")
179+
}
180+
181+
return h.servePause()
182+
case operationModeServeResume:
183+
if h.serveResume == nil {
184+
return errors.New("operationModeServeResume requires testConfig.gated = true")
185+
}
186+
187+
return h.serveResume()
188+
case operationModeSleep:
189+
time.Sleep(50 * time.Millisecond)
190+
191+
return nil
138192
default:
139193
return fmt.Errorf("invalid operation mode: %d", op.mode)
140194
}
141195
}
142196

143197
func (h *testHandler) executeRead(ctx context.Context, op operation) error {
144-
readBytes := (*h.memoryArea)[op.offset : op.offset+int64(h.pagesize)]
145-
146198
expectedBytes, err := h.data.Slice(ctx, op.offset, int64(h.pagesize))
147199
if err != nil {
148200
return err
149201
}
150202

203+
// Hold the read side of the memoryArea mutex while we touch the page.
204+
// Reads can run concurrently with each other (RLock), but a parallel
205+
// async write to the same page (executeWrite, write lock) is excluded
206+
// so go test -race stays clean when a test plan mixes async read and
207+
// async write at overlapping offsets.
208+
h.mutex.RLock()
209+
defer h.mutex.RUnlock()
210+
211+
readBytes := (*h.memoryArea)[op.offset : op.offset+int64(h.pagesize)]
212+
151213
// The bytes.Equal is the first place in this flow that actually touches the uffd managed memory and triggers the pagefault, so any deadlocks will manifest here.
152214
if !bytes.Equal(readBytes, expectedBytes) {
153215
idx, want, got := testutils.FirstDifferentByte(readBytes, expectedBytes)
@@ -165,6 +227,7 @@ func (h *testHandler) executeWrite(ctx context.Context, op operation) error {
165227
}
166228

167229
// An unprotected parallel write to map might result in an undefined behavior.
230+
// Lock excludes both other writers and any concurrent executeRead.
168231
h.mutex.Lock()
169232
defer h.mutex.Unlock()
170233

0 commit comments

Comments
 (0)