Skip to content

Commit 1af207c

Browse files
authored
refactor(uffd-tests): replace SIGUSR/env-var/pipe IPC with net/rpc/jsonrpc over unix socket (#2519)
Replace the cross-process userfaultfd test harness's pipes + signals (`SIGUSR1` shutdown, `SIGUSR2` page-state snapshot, ready/offsets/gate-cmd/gate-sync pipes) with one Unix socket carrying stdlib `net/rpc` + `net/rpc/jsonrpc`. The userfaultfd and the rpc socketpair half are passed via `ExtraFiles`. Production change: one `atomic.Pointer[func(uintptr, faultPhase)]` field on `Userfaultfd` and three nil-checked inline call sites. Test builds install the hook via `SetTestFaultHook` defined in a `_test.go` file. Stacked follow-ups: - `UFFD_EVENT_REMOVE` handling + matrix tests — #2520 - Stale-source / madvise-deadlock / faulted-short-circuit race tests — #2521 - Stale-source race fix — #2512
1 parent 866f389 commit 1af207c

13 files changed

Lines changed: 842 additions & 645 deletions

File tree

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package testharness
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
)
8+
9+
// Point identifies which worker hook to park on. Values must match the
10+
// parent package's faultPhase iota so the hook can cast across.
11+
type Point uint8
12+
13+
const (
14+
// BeforeRLock parks before settleRequests.RLock.
15+
BeforeRLock Point = iota
16+
// BeforeFaultPage parks after settleRequests.RLock, before UFFDIO_COPY.
17+
BeforeFaultPage
18+
)
19+
20+
// Registry is the child-side barrier store consulted by the per-fault hook.
21+
type Registry struct {
22+
mu sync.Mutex
23+
next uint64
24+
tokens map[uint64]*slot
25+
byKey map[key]uint64
26+
}
27+
28+
type key struct {
29+
addr uintptr
30+
point Point
31+
}
32+
33+
type slot struct {
34+
addr uintptr
35+
point Point
36+
arrived chan struct{}
37+
release chan struct{}
38+
arrivedOnce sync.Once
39+
}
40+
41+
func NewRegistry() *Registry {
42+
return &Registry{
43+
tokens: make(map[uint64]*slot),
44+
byKey: make(map[key]uint64),
45+
}
46+
}
47+
48+
func (r *Registry) Install(addr uintptr, point Point) uint64 {
49+
r.mu.Lock()
50+
defer r.mu.Unlock()
51+
52+
r.next++
53+
token := r.next
54+
s := &slot{
55+
addr: addr,
56+
point: point,
57+
arrived: make(chan struct{}),
58+
release: make(chan struct{}),
59+
}
60+
r.tokens[token] = s
61+
r.byKey[key{addr, point}] = token
62+
63+
return token
64+
}
65+
66+
func (r *Registry) lookupByAddr(addr uintptr, point Point) *slot {
67+
r.mu.Lock()
68+
defer r.mu.Unlock()
69+
70+
token, ok := r.byKey[key{addr, point}]
71+
if !ok {
72+
return nil
73+
}
74+
75+
return r.tokens[token]
76+
}
77+
78+
func (r *Registry) WaitArrived(ctx context.Context, token uint64) error {
79+
r.mu.Lock()
80+
s, ok := r.tokens[token]
81+
r.mu.Unlock()
82+
if !ok {
83+
return fmt.Errorf("unknown barrier token %d", token)
84+
}
85+
86+
select {
87+
case <-s.arrived:
88+
return nil
89+
case <-ctx.Done():
90+
return ctx.Err()
91+
}
92+
}
93+
94+
// Release frees the barrier; unknown token is a no-op.
95+
func (r *Registry) Release(token uint64) {
96+
r.mu.Lock()
97+
s, ok := r.tokens[token]
98+
delete(r.tokens, token)
99+
if ok {
100+
// A later Install at this key overwrites byKey; only delete if
101+
// it still maps to this token.
102+
k := key{s.addr, s.point}
103+
if r.byKey[k] == token {
104+
delete(r.byKey, k)
105+
}
106+
}
107+
r.mu.Unlock()
108+
109+
if !ok {
110+
return
111+
}
112+
113+
select {
114+
case <-s.release:
115+
default:
116+
close(s.release)
117+
}
118+
}
119+
120+
// ReleaseAll releases every still-installed barrier.
121+
func (r *Registry) ReleaseAll() {
122+
r.mu.Lock()
123+
tokens := make([]uint64, 0, len(r.tokens))
124+
for t := range r.tokens {
125+
tokens = append(tokens, t)
126+
}
127+
r.mu.Unlock()
128+
129+
for _, t := range tokens {
130+
r.Release(t)
131+
}
132+
}
133+
134+
// Hook returns the per-fault hook to install on *Userfaultfd; faults
135+
// without an installed slot are no-ops.
136+
func (r *Registry) Hook() func(addr uintptr, point Point) {
137+
return func(addr uintptr, point Point) {
138+
s := r.lookupByAddr(addr, point)
139+
if s == nil {
140+
return
141+
}
142+
143+
s.arrivedOnce.Do(func() {
144+
close(s.arrived)
145+
})
146+
147+
<-s.release
148+
}
149+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package testharness
2+
3+
import (
4+
"io"
5+
"net/rpc"
6+
"net/rpc/jsonrpc"
7+
)
8+
9+
// Client is the typed parent-side wrapper around the JSON-RPC channel
10+
// to the child helper process.
11+
type Client struct {
12+
rpc *rpc.Client
13+
conn io.Closer
14+
}
15+
16+
// NewClient wraps an already-connected duplex stream. Closing the
17+
// returned Client closes the underlying conn.
18+
func NewClient(conn io.ReadWriteCloser) *Client {
19+
return &Client{
20+
rpc: jsonrpc.NewClient(conn),
21+
conn: conn,
22+
}
23+
}
24+
25+
func (c *Client) Bootstrap(args BootstrapArgs) error {
26+
return c.rpc.Call("Lifecycle.Bootstrap", &args, &BootstrapReply{})
27+
}
28+
29+
func (c *Client) WaitReady() error {
30+
return c.rpc.Call("Lifecycle.WaitReady", &Empty{}, &Empty{})
31+
}
32+
33+
func (c *Client) Shutdown() error {
34+
return c.rpc.Call("Lifecycle.Shutdown", &Empty{}, &Empty{})
35+
}
36+
37+
func (c *Client) Pause() error {
38+
return c.rpc.Call("Paging.Pause", &Empty{}, &Empty{})
39+
}
40+
41+
func (c *Client) Resume() error {
42+
return c.rpc.Call("Paging.Resume", &Empty{}, &Empty{})
43+
}
44+
45+
func (c *Client) PageStates() ([]PageStateEntry, error) {
46+
var reply PageStatesReply
47+
if err := c.rpc.Call("Paging.States", &Empty{}, &reply); err != nil {
48+
return nil, err
49+
}
50+
51+
return reply.Entries, nil
52+
}
53+
54+
func (c *Client) InstallBarrier(addr uintptr, point Point) (uint64, error) {
55+
var reply FaultBarrierReply
56+
if err := c.rpc.Call("Barriers.Install", &FaultBarrierArgs{Addr: uint64(addr), Point: uint8(point)}, &reply); err != nil {
57+
return 0, err
58+
}
59+
60+
return reply.Token, nil
61+
}
62+
63+
func (c *Client) WaitFaultHeld(token uint64) error {
64+
return c.rpc.Call("Barriers.WaitHeld", &TokenArgs{Token: token}, &Empty{})
65+
}
66+
67+
func (c *Client) ReleaseFault(token uint64) error {
68+
return c.rpc.Call("Barriers.Release", &TokenArgs{Token: token}, &Empty{})
69+
}
70+
71+
func (c *Client) Close() error {
72+
return c.conn.Close()
73+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Package testharness provides the wire types, typed RPC client, and
2+
// barrier registry shared between the parent and child halves of the
3+
// userfaultfd test harness.
4+
package testharness
5+
6+
// Empty is the placeholder for net/rpc methods that take or return
7+
// nothing; net/rpc still requires both args and reply pointers.
8+
type Empty struct{}
9+
10+
type BootstrapArgs struct {
11+
MmapStart uint64
12+
Pagesize int64
13+
TotalSize int64
14+
AlwaysWP bool
15+
// Barriers gates the test-only worker hooks (off by default).
16+
Barriers bool
17+
Content []byte
18+
}
19+
20+
type BootstrapReply struct{}
21+
22+
// PageStateEntry is the wire form of the parent package's pageState enum.
23+
type PageStateEntry struct {
24+
State uint8
25+
Offset uint64
26+
}
27+
28+
type PageStatesReply struct {
29+
Entries []PageStateEntry
30+
}
31+
32+
type FaultBarrierArgs struct {
33+
Addr uint64
34+
Point uint8
35+
}
36+
37+
type FaultBarrierReply struct {
38+
Token uint64
39+
}
40+
41+
type TokenArgs struct {
42+
Token uint64
43+
}

packages/orchestrator/pkg/sandbox/uffd/userfaultfd/async_wp_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ func TestAsyncWriteProtection(t *testing.T) {
289289
t.Run(tt.name, func(t *testing.T) {
290290
t.Parallel()
291291

292-
h, err := configureCrossProcessTest(t, testConfig{
292+
h, err := configureCrossProcessTest(t.Context(), t, testConfig{
293293
pagesize: tt.pagesize,
294294
numberOfPages: tt.numberOfPages,
295295
alwaysWP: tt.alwaysWP,

0 commit comments

Comments
 (0)