2222import sysconfig
2323import select
2424import shutil
25+ import socket
2526import threading
2627import gc
2728import textwrap
@@ -1044,19 +1045,49 @@ def test_communicate_timeout_large_input(self):
10441045 # On Windows, stdin writing must also honor the timeout rather than
10451046 # blocking indefinitely when the pipe buffer fills.
10461047
1047- # Input larger than typical pipe buffer (4-64KB on Windows)
1048- input_data = b"x" * (128 * 1024 )
1048+ input_data = b"x" * (128 * 1024 ) # > typical pipe buffer
1049+
1050+ # Cross-platform wake mechanism: the slow reader connects to a
1051+ # loopback TCP socket and blocks in select() on it (capped at 9s
1052+ # as a safety net we don't expect to hit). After phase 1 raises
1053+ # TimeoutExpired, the parent sends a byte to release the child so
1054+ # it drains stdin. A socket (rather than a raw pipe) is required
1055+ # because Windows select() only supports sockets, not arbitrary
1056+ # file descriptors.
1057+ server = socket .create_server (('127.0.0.1' , 0 ), backlog = 1 )
1058+ server .settimeout (10 ) # bound the accept() if the child fails to start
1059+ port = server .getsockname ()[1 ]
1060+ # The child sends one byte (low byte of its PID) first so the parent
1061+ # can detect the rare case of an unrelated process on the same host
1062+ # connecting to our ephemeral port before our child does. A single
1063+ # byte gives 1/256 collision odds, which is plenty for flake-prevention.
1064+ slow_reader = (
1065+ "import os, socket, sys, select; "
1066+ f"s = socket.create_connection(('127.0.0.1', { port } ), timeout=9); "
1067+ "s.sendall(bytes([os.getpid() & 0xff])); "
1068+ "select.select([s], [], [], 9); "
1069+ "sys.stdout.buffer.write(sys.stdin.buffer.read())"
1070+ )
10491071
10501072 p = subprocess .Popen (
1051- [sys .executable , "-c" ,
1052- "import sys, time; "
1053- "time.sleep(30); " # Don't read stdin for a long time
1054- "sys.stdout.buffer.write(sys.stdin.buffer.read())" ],
1073+ [sys .executable , "-c" , slow_reader ],
10551074 stdin = subprocess .PIPE ,
10561075 stdout = subprocess .PIPE ,
10571076 stderr = subprocess .PIPE )
10581077
1078+ conn = None
10591079 try :
1080+ conn , _ = server .accept ()
1081+ server .close ()
1082+ server = None
1083+
1084+ conn .settimeout (5 )
1085+ peer_byte = conn .recv (1 )
1086+ conn .settimeout (None )
1087+ self .assertEqual (peer_byte , bytes ([p .pid & 0xff ]),
1088+ f"loopback handshake byte { peer_byte !r} != "
1089+ f"low byte of child PID { p .pid } ({ p .pid & 0xff :#x} )" )
1090+
10601091 timeout = 0.2
10611092 start = time .monotonic ()
10621093 try :
@@ -1065,19 +1096,24 @@ def test_communicate_timeout_large_input(self):
10651096 elapsed = time .monotonic () - start
10661097 self .fail (
10671098 f"TimeoutExpired not raised. communicate() completed in "
1068- f"{ elapsed :.2f} s, but subprocess sleeps for 30s . "
1099+ f"{ elapsed :.2f} s, but slow reader stalls for up to 9s . "
10691100 "Stdin writing blocked without enforcing timeout." )
10701101 except subprocess .TimeoutExpired :
10711102 elapsed = time .monotonic () - start
10721103
10731104 # Timeout should occur close to the specified timeout value,
10741105 # not after waiting for the subprocess to finish sleeping.
10751106 # Allow generous margin for slow CI, but must be well under
1076- # the subprocess sleep time .
1107+ # the slow-reader's stall cap .
10771108 self .assertLess (elapsed , 5.0 ,
10781109 f"TimeoutExpired raised after { elapsed :.2f} s; expected ~{ timeout } s. "
10791110 "Stdin writing blocked without checking timeout." )
10801111
1112+ # Release the slow reader so it stops blocking and drains stdin.
1113+ conn .sendall (b'go' )
1114+ conn .close ()
1115+ conn = None
1116+
10811117 # After timeout, continue communication. The remaining input
10821118 # should be sent and we should receive all data back.
10831119 stdout , stderr = p .communicate ()
@@ -1087,6 +1123,10 @@ def test_communicate_timeout_large_input(self):
10871123 f"Expected { len (input_data )} bytes output but got { len (stdout )} " )
10881124 self .assertEqual (stdout , input_data )
10891125 finally :
1126+ if conn is not None :
1127+ conn .close ()
1128+ if server is not None :
1129+ server .close ()
10901130 p .kill ()
10911131 p .wait ()
10921132
0 commit comments