Skip to content

Commit 6ee2ebb

Browse files
authored
Modify UFFD tests to run serve loop in a separate process (#1450)
1 parent 1e6afef commit 6ee2ebb

7 files changed

Lines changed: 476 additions & 173 deletions

File tree

Lines changed: 358 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,358 @@
1+
package uffd
2+
3+
// This tests is creating uffd in the main process and handling the page faults in another process.
4+
// It prevents problems with Go mmap during testing (https://pojntfx.github.io/networked-linux-memsync/main.html#limitations) and also more accurately simulates what we do with Firecracker.
5+
// These problems are not affecting Firecracker, because:
6+
// 1. It is a different process that handles the page faults
7+
// 2. Does not use garbage collection
8+
9+
import (
10+
"context"
11+
"encoding/binary"
12+
"errors"
13+
"fmt"
14+
"io"
15+
"os"
16+
"os/exec"
17+
"os/signal"
18+
"strconv"
19+
"strings"
20+
"sync"
21+
"syscall"
22+
"testing"
23+
24+
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
26+
"go.uber.org/zap"
27+
"golang.org/x/sys/unix"
28+
29+
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/uffd/fdexit"
30+
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/uffd/mapping"
31+
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/uffd/testutils"
32+
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/uffd/userfaultfd"
33+
)
34+
35+
// Main process, FC in our case
36+
func configureCrossProcessTest(t *testing.T, tt testConfig) (*testHandler, error) {
37+
t.Helper()
38+
39+
data := testutils.RandomPages(tt.pagesize, tt.numberOfPages)
40+
41+
size, err := data.Size()
42+
require.NoError(t, err)
43+
44+
memoryArea, memoryStart, err := testutils.NewPageMmap(t, uint64(size), tt.pagesize)
45+
require.NoError(t, err)
46+
47+
uffd, err := userfaultfd.NewUserfaultfd(syscall.O_CLOEXEC | syscall.O_NONBLOCK)
48+
require.NoError(t, err)
49+
50+
t.Cleanup(func() {
51+
userfaultfd.Close(uffd)
52+
})
53+
54+
err = userfaultfd.ConfigureApi(uffd, tt.pagesize)
55+
require.NoError(t, err)
56+
57+
err = userfaultfd.Register(uffd, memoryStart, uint64(size), userfaultfd.UFFDIO_REGISTER_MODE_MISSING)
58+
require.NoError(t, err)
59+
60+
cmd := exec.CommandContext(t.Context(), os.Args[0], "-test.run=TestHelperServingProcess")
61+
cmd.Env = append(os.Environ(), "GO_TEST_HELPER_PROCESS=1")
62+
cmd.Env = append(cmd.Env, fmt.Sprintf("GO_MMAP_START=%d", memoryStart))
63+
cmd.Env = append(cmd.Env, fmt.Sprintf("GO_MMAP_PAGE_SIZE=%d", tt.pagesize))
64+
65+
dup, err := syscall.Dup(int(uffd))
66+
require.NoError(t, err)
67+
68+
// clear FD_CLOEXEC on the dup we pass across exec
69+
_, err = unix.FcntlInt(uintptr(dup), unix.F_SETFD, 0)
70+
require.NoError(t, err)
71+
72+
uffdFile := os.NewFile(uintptr(dup), "uffd")
73+
74+
contentReader, contentWriter, err := os.Pipe()
75+
require.NoError(t, err)
76+
77+
go func() {
78+
_, writeErr := contentWriter.Write(data.Content())
79+
assert.NoError(t, writeErr)
80+
81+
closeErr := contentWriter.Close()
82+
assert.NoError(t, closeErr)
83+
}()
84+
85+
offsetsReader, offsetsWriter, err := os.Pipe()
86+
require.NoError(t, err)
87+
88+
t.Cleanup(func() {
89+
offsetsReader.Close()
90+
})
91+
92+
readyReader, readyWriter, err := os.Pipe()
93+
require.NoError(t, err)
94+
95+
t.Cleanup(func() {
96+
readyReader.Close()
97+
})
98+
99+
readySignal := make(chan struct{}, 1)
100+
go func() {
101+
_, err := io.ReadAll(readyReader)
102+
assert.NoError(t, err)
103+
104+
readySignal <- struct{}{}
105+
}()
106+
107+
cmd.ExtraFiles = []*os.File{
108+
uffdFile,
109+
contentReader,
110+
offsetsWriter,
111+
readyWriter,
112+
}
113+
cmd.Stdout = os.Stdout
114+
cmd.Stderr = os.Stderr
115+
116+
err = cmd.Start()
117+
require.NoError(t, err)
118+
119+
contentReader.Close()
120+
offsetsWriter.Close()
121+
readyWriter.Close()
122+
uffdFile.Close()
123+
124+
t.Cleanup(func() {
125+
signalErr := cmd.Process.Signal(syscall.SIGUSR1)
126+
assert.NoError(t, signalErr)
127+
128+
waitErr := cmd.Wait()
129+
// It can be either nil, an ExitError, a context.Canceled error, or "signal: killed"
130+
assert.True(t,
131+
(waitErr != nil && func(err error) bool {
132+
var exitErr *exec.ExitError
133+
134+
return errors.As(err, &exitErr)
135+
}(waitErr)) ||
136+
errors.Is(waitErr, context.Canceled) ||
137+
(waitErr != nil && strings.Contains(waitErr.Error(), "signal: killed")) ||
138+
waitErr == nil,
139+
"unexpected error: %v", waitErr,
140+
)
141+
})
142+
143+
offsetsOnce := func() ([]uint, error) {
144+
err := cmd.Process.Signal(syscall.SIGUSR2)
145+
if err != nil {
146+
return nil, err
147+
}
148+
149+
offsetsBytes, err := io.ReadAll(offsetsReader)
150+
if err != nil {
151+
return nil, err
152+
}
153+
154+
var offsetList []uint
155+
156+
if len(offsetsBytes)%8 != 0 {
157+
return nil, fmt.Errorf("invalid offsets bytes length: %d", len(offsetsBytes))
158+
}
159+
160+
for i := 0; i < len(offsetsBytes); i += 8 {
161+
offsetList = append(offsetList, uint(binary.LittleEndian.Uint64(offsetsBytes[i:i+8])))
162+
}
163+
164+
return offsetList, nil
165+
}
166+
167+
select {
168+
case <-t.Context().Done():
169+
return nil, t.Context().Err()
170+
case <-readySignal:
171+
}
172+
173+
return &testHandler{
174+
memoryArea: &memoryArea,
175+
pagesize: tt.pagesize,
176+
data: data,
177+
uffd: uffd,
178+
offsetsOnce: offsetsOnce,
179+
}, nil
180+
}
181+
182+
// Secondary process, orchestrator in our case
183+
func TestHelperServingProcess(t *testing.T) {
184+
if os.Getenv("GO_TEST_HELPER_PROCESS") != "1" {
185+
t.Skip("this is a helper process, skipping direct execution")
186+
}
187+
188+
err := crossProcessServe()
189+
if err != nil {
190+
fmt.Println("exit serving process", err)
191+
os.Exit(1)
192+
}
193+
194+
os.Exit(0)
195+
}
196+
197+
func crossProcessServe() error {
198+
ctx, cancel := context.WithCancelCause(context.Background())
199+
defer cancel(nil)
200+
201+
startRaw, err := strconv.Atoi(os.Getenv("GO_MMAP_START"))
202+
if err != nil {
203+
return fmt.Errorf("exit parsing mmap start: %w", err)
204+
}
205+
206+
memoryStart := uintptr(startRaw)
207+
208+
uffdFile := os.NewFile(uintptr(3), os.Getenv("GO_UFFD_FILE"))
209+
defer uffdFile.Close()
210+
211+
uffd := uffdFile.Fd()
212+
213+
contentFile := os.NewFile(uintptr(4), "content")
214+
defer contentFile.Close()
215+
216+
offsetsFile := os.NewFile(uintptr(5), "offsets")
217+
218+
readyFile := os.NewFile(uintptr(6), "ready")
219+
220+
missingRequests := &sync.Map{}
221+
222+
offsetsSignal := make(chan os.Signal, 1)
223+
signal.Notify(offsetsSignal, syscall.SIGUSR2)
224+
defer signal.Stop(offsetsSignal)
225+
226+
go func() {
227+
defer offsetsFile.Close()
228+
229+
for {
230+
select {
231+
case <-ctx.Done():
232+
return
233+
case <-offsetsSignal:
234+
offsets, err := getAccessedOffsets(missingRequests)
235+
if err != nil {
236+
msg := fmt.Errorf("error getting accessed offsets from cross process: %w", err)
237+
238+
fmt.Fprint(os.Stderr, msg.Error())
239+
240+
cancel(msg)
241+
242+
return
243+
}
244+
245+
for _, offset := range offsets {
246+
writeErr := binary.Write(offsetsFile, binary.LittleEndian, uint64(offset))
247+
if writeErr != nil {
248+
msg := fmt.Errorf("error writing offsets to file: %w", writeErr)
249+
250+
fmt.Fprint(os.Stderr, msg.Error())
251+
252+
cancel(msg)
253+
254+
return
255+
}
256+
}
257+
258+
return
259+
}
260+
}
261+
}()
262+
263+
content, err := io.ReadAll(contentFile)
264+
if err != nil {
265+
return fmt.Errorf("exit reading content: %w", err)
266+
}
267+
268+
pageSize, err := strconv.Atoi(os.Getenv("GO_MMAP_PAGE_SIZE"))
269+
if err != nil {
270+
return fmt.Errorf("exit parsing page size: %w", err)
271+
}
272+
273+
data := testutils.NewMemorySlicer(content, int64(pageSize))
274+
275+
m := mapping.FcMappings([]mapping.GuestRegionUffdMapping{
276+
{
277+
BaseHostVirtAddr: memoryStart,
278+
Size: uintptr(len(content)),
279+
Offset: 0,
280+
PageSize: uintptr(pageSize),
281+
},
282+
})
283+
284+
exitUffd := make(chan struct{}, 1)
285+
286+
logger, err := zap.NewDevelopment()
287+
if err != nil {
288+
return fmt.Errorf("exit creating logger: %w", err)
289+
}
290+
291+
fdExit, err := fdexit.New()
292+
if err != nil {
293+
return fmt.Errorf("exit creating fd exit: %w", err)
294+
}
295+
defer fdExit.Close()
296+
297+
go func() {
298+
defer func() {
299+
exitUffd <- struct{}{}
300+
}()
301+
302+
serverErr := Serve(ctx, int(uffd), m, data, fdExit, missingRequests, logger)
303+
if serverErr != nil {
304+
msg := fmt.Errorf("error serving: %w", serverErr)
305+
306+
fmt.Fprint(os.Stderr, msg.Error())
307+
308+
cancel(msg)
309+
310+
return
311+
}
312+
}()
313+
314+
cleanup := func() {
315+
err := fdExit.SignalExit()
316+
if err != nil {
317+
msg := fmt.Errorf("error signaling exit: %w", err)
318+
319+
fmt.Fprint(os.Stderr, msg.Error())
320+
321+
cancel(msg)
322+
323+
return
324+
}
325+
326+
<-exitUffd
327+
}
328+
329+
defer cleanup()
330+
331+
exitSignal := make(chan os.Signal, 1)
332+
signal.Notify(exitSignal, syscall.SIGUSR1)
333+
defer signal.Stop(exitSignal)
334+
335+
closeErr := readyFile.Close()
336+
if closeErr != nil {
337+
return fmt.Errorf("error closing ready file: %w", closeErr)
338+
}
339+
340+
select {
341+
case <-ctx.Done():
342+
return fmt.Errorf("context done: %w: %w", ctx.Err(), context.Cause(ctx))
343+
case <-exitSignal:
344+
return nil
345+
}
346+
}
347+
348+
func getAccessedOffsets(missingRequests *sync.Map) ([]uint, error) {
349+
var offsets []uint
350+
351+
missingRequests.Range(func(key, _ any) bool {
352+
offsets = append(offsets, uint(key.(int64)))
353+
354+
return true
355+
})
356+
357+
return offsets, nil
358+
}

0 commit comments

Comments
 (0)