Skip to content

Commit aee31a3

Browse files
fix(grpc): resilient SwooleGrpcTransport with mutex, retry and reconnect
- Add Channel(1) mutex to serialize concurrent coroutine sends; shutdown closes the channel to unblock waiting coroutines without deadlock - Extract attemptSend() retry loop (send failure → reset + retry up to maxRetries; recv failure → ErrorFuture immediately to avoid duplicate DELTA metric exports) - Extract doRequest() to reduce cyclomatic complexity below threshold - Reset HTTP/2 client on send/recv failure so next call reconnects - Fix getClient(): close stale (disconnected) client before reconnecting - Suppress PHP 8.2 E_DEPRECATED from Swoole's dynamic $serverLastStreamId property in recv() using @ operator (Hyperf ErrorExceptionHandler converts deprecation notices to ErrorException, aborting the recv) - Validate unsupported compression types in factory (prevents compressed flag=1 with uncompressed payload) - Wire retryDelay and maxRetries from factory to transport constructor - Add endpoint fallback in Trace and Metric gRPC exporter factories - Replace 'application/x-protobuf' literal with ContentTypes::PROTOBUF - Cast grpc-status to int before comparison (more robust against whitespace) - Add tests: default endpoint fallback, retry parameters, invalid compression Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 7eeaa13 commit aee31a3

8 files changed

Lines changed: 211 additions & 41 deletions

File tree

src/Factory/Log/Exporter/OtlpGrpcLogExporterFactory.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
use Hyperf\Contract\ConfigInterface;
88
use Hyperf\OpenTelemetry\Transport\SwooleGrpcTransportFactory;
9+
use OpenTelemetry\Contrib\Otlp\ContentTypes;
910
use OpenTelemetry\Contrib\Otlp\LogsExporter;
1011
use OpenTelemetry\SDK\Common\Export\TransportFactoryInterface;
1112
use OpenTelemetry\SDK\Logs\LogRecordExporterInterface;
@@ -28,7 +29,7 @@ public function make(): LogRecordExporterInterface
2829
return new LogsExporter(
2930
(new SwooleGrpcTransportFactory())->create(
3031
endpoint: $endpoint,
31-
contentType: 'application/x-protobuf',
32+
contentType: ContentTypes::PROTOBUF,
3233
headers: $options['headers'] ?? [],
3334
compression: $options['compression'] ?? TransportFactoryInterface::COMPRESSION_GZIP,
3435
timeout: $options['timeout'] ?? 10,

src/Factory/Metric/Exporter/OtlpGrpcMetricExporterFactory.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
use Hyperf\Contract\ConfigInterface;
88
use Hyperf\OpenTelemetry\Transport\SwooleGrpcTransportFactory;
9+
use OpenTelemetry\Contrib\Otlp\ContentTypes;
910
use OpenTelemetry\Contrib\Otlp\MetricExporter;
1011
use OpenTelemetry\SDK\Common\Export\TransportFactoryInterface;
1112
use OpenTelemetry\SDK\Metrics\Data\Temporality;
@@ -24,12 +25,12 @@ public function make(): MetricExporterInterface
2425
{
2526
$options = $this->config->get('open-telemetry.metrics.exporters.otlp_grpc.options', []);
2627

27-
$endpoint = rtrim($options['endpoint'], '/') . self::GRPC_METHOD;
28+
$endpoint = rtrim($options['endpoint'] ?? 'http://localhost:4317', '/') . self::GRPC_METHOD;
2829

2930
return new MetricExporter(
3031
transport: (new SwooleGrpcTransportFactory())->create(
3132
endpoint: $endpoint,
32-
contentType: 'application/x-protobuf',
33+
contentType: ContentTypes::PROTOBUF,
3334
headers: $options['headers'] ?? [],
3435
compression: $options['compression'] ?? TransportFactoryInterface::COMPRESSION_GZIP,
3536
timeout: $options['timeout'] ?? 10,

src/Factory/Trace/Exporter/OtlpGrpcTraceExporterFactory.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
use Hyperf\Contract\ConfigInterface;
88
use Hyperf\OpenTelemetry\Transport\SwooleGrpcTransportFactory;
9+
use OpenTelemetry\Contrib\Otlp\ContentTypes;
910
use OpenTelemetry\Contrib\Otlp\SpanExporter;
1011
use OpenTelemetry\SDK\Common\Export\TransportFactoryInterface;
1112
use OpenTelemetry\SDK\Trace\SpanExporterInterface;
@@ -23,12 +24,12 @@ public function make(): SpanExporterInterface
2324
{
2425
$options = $this->config->get('open-telemetry.traces.exporters.otlp_grpc.options', []);
2526

26-
$endpoint = rtrim($options['endpoint'], '/') . self::GRPC_METHOD;
27+
$endpoint = rtrim($options['endpoint'] ?? 'http://localhost:4317', '/') . self::GRPC_METHOD;
2728

2829
return new SpanExporter(
2930
(new SwooleGrpcTransportFactory())->create(
3031
endpoint: $endpoint,
31-
contentType: 'application/x-protobuf',
32+
contentType: ContentTypes::PROTOBUF,
3233
headers: $options['headers'] ?? [],
3334
compression: $options['compression'] ?? TransportFactoryInterface::COMPRESSION_GZIP,
3435
timeout: $options['timeout'] ?? 10,

src/Transport/SwooleGrpcTransport.php

Lines changed: 130 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
use OpenTelemetry\SDK\Common\Future\ErrorFuture;
1313
use OpenTelemetry\SDK\Common\Future\FutureInterface;
1414
use RuntimeException;
15+
use Swoole\Coroutine;
16+
use Swoole\Coroutine\Channel;
1517
use Swoole\Coroutine\Http2\Client;
1618
use Swoole\Http2\Request;
1719
use Throwable;
@@ -22,6 +24,8 @@ final class SwooleGrpcTransport implements TransportInterface
2224

2325
private ?Client $client = null;
2426

27+
private ?Channel $mutex = null;
28+
2529
/**
2630
* @SuppressWarnings(PHPMD.BooleanArgumentFlag)
2731
*/
@@ -33,6 +37,8 @@ public function __construct(
3337
private readonly float $timeout = 10.0,
3438
private readonly bool $ssl = false,
3539
private readonly ?string $compression = null,
40+
private readonly int $retryDelay = 100,
41+
private readonly int $maxRetries = 3,
3642
) {
3743
}
3844

@@ -47,42 +53,21 @@ public function send(string $payload, ?CancellationInterface $cancellation = nul
4753
return new ErrorFuture(new RuntimeException('Transport is closed'));
4854
}
4955

50-
try {
51-
$client = $this->getClient();
52-
53-
$data = $this->compress($payload);
54-
55-
$request = new Request();
56-
$request->method = 'POST';
57-
$request->path = $this->method;
58-
$request->headers = $this->buildHeaders();
59-
$request->data = $this->packMessage($data);
60-
61-
$streamId = $client->send($request);
62-
63-
if ($streamId === false || $streamId <= 0) {
64-
return new ErrorFuture(new RuntimeException(
65-
'Failed to send gRPC request: ' . ($client->errMsg ?: 'unknown error')
66-
));
67-
}
68-
69-
$response = $client->recv($this->timeout);
70-
71-
if ($response === false) {
72-
return new ErrorFuture(new RuntimeException(
73-
'Failed to receive gRPC response: ' . ($client->errMsg ?: 'timeout')
74-
));
75-
}
56+
$mutex = $this->getMutex();
57+
if ($mutex->pop() === false) {
58+
return new ErrorFuture(new RuntimeException('Transport is closed'));
59+
}
7660

77-
$grpcStatus = $response->headers['grpc-status'] ?? '0';
78-
if ($grpcStatus !== '0') {
79-
$grpcMessage = $response->headers['grpc-message'] ?? 'Unknown error';
80-
return new ErrorFuture(new RuntimeException("gRPC error: {$grpcMessage}", (int) $grpcStatus));
61+
try {
62+
if ($this->closed) {
63+
return new ErrorFuture(new RuntimeException('Transport is closed'));
8164
}
8265

83-
return new CompletedFuture(null);
66+
return $this->attemptSend($payload);
8467
} catch (Throwable $e) {
8568
return new ErrorFuture($e);
69+
} finally {
70+
$mutex->push(true); // release — returns false silently if channel was closed by shutdown
8671
}
8772
}
8873

@@ -93,6 +78,7 @@ public function shutdown(?CancellationInterface $cancellation = null): bool
9378
}
9479

9580
$this->closed = true;
81+
$this->mutex?->close(); // unblock any coroutines waiting to acquire the mutex
9682

9783
if ($this->client !== null) {
9884
$this->client->close();
@@ -107,19 +93,126 @@ public function forceFlush(?CancellationInterface $cancellation = null): bool
10793
return ! $this->closed;
10894
}
10995

96+
/**
97+
* @SuppressWarnings(PHPMD.ErrorControlOperator)
98+
*/
99+
private function attemptSend(string $payload): FutureInterface
100+
{
101+
$data = $this->compress($payload);
102+
$lastError = null;
103+
104+
for ($attempt = 0; $attempt <= $this->maxRetries; ++$attempt) {
105+
if ($this->closed) {
106+
return new ErrorFuture(new RuntimeException('Transport is closed'));
107+
}
108+
109+
if ($attempt > 0 && $this->retryDelay > 0) {
110+
Coroutine::sleep($this->retryDelay / 1000.0);
111+
}
112+
113+
try {
114+
return $this->doRequest($this->getClient(), $data);
115+
} catch (Throwable $e) {
116+
$this->resetClient();
117+
$lastError = $e;
118+
}
119+
}
120+
121+
return new ErrorFuture($lastError ?? new RuntimeException('Unknown transport error after retries'));
122+
}
123+
124+
/**
125+
* Executes a single gRPC send+recv cycle.
126+
*
127+
* Throws RuntimeException on send failure so the caller can reset the client and retry.
128+
* Returns ErrorFuture on recv failure (delivery uncertain — no retry).
129+
*
130+
* Note: @$client->recv() suppresses E_DEPRECATED from Swoole setting $serverLastStreamId
131+
* as a dynamic property in PHP 8.2+. Hyperf's ErrorExceptionHandler would otherwise convert
132+
* the deprecation notice into an ErrorException, aborting the recv() call.
133+
*
134+
* @throws RuntimeException on retryable send failure
135+
* @SuppressWarnings(PHPMD.ErrorControlOperator)
136+
*/
137+
private function doRequest(Client $client, string $data): FutureInterface
138+
{
139+
$request = new Request();
140+
$request->method = 'POST';
141+
$request->path = $this->method;
142+
$request->headers = $this->buildHeaders();
143+
$request->data = $this->packMessage($data);
144+
145+
$streamId = $client->send($request);
146+
147+
if ($streamId === false || $streamId <= 0) {
148+
// Throw so the retry loop can reset the client and try again;
149+
// data was never transmitted so retrying is safe.
150+
throw new RuntimeException(
151+
'Failed to send gRPC request: ' . ($client->errMsg ?: 'unknown error')
152+
);
153+
}
154+
155+
$response = @$client->recv($this->timeout);
156+
157+
if ($response === false) {
158+
$this->resetClient($client);
159+
// Do not retry: send succeeded, so delivery is uncertain — retrying could duplicate exports.
160+
return new ErrorFuture(new RuntimeException(
161+
'Failed to receive gRPC response: ' . ($client->errMsg ?: 'timeout')
162+
));
163+
}
164+
165+
$grpcStatus = (int) ($response->headers['grpc-status'] ?? 0);
166+
if ($grpcStatus !== 0) {
167+
$grpcMessage = $response->headers['grpc-message'] ?? 'Unknown error';
168+
// gRPC application error — not retried
169+
return new ErrorFuture(new RuntimeException("gRPC error: {$grpcMessage}", $grpcStatus));
170+
}
171+
172+
return new CompletedFuture(null);
173+
}
174+
175+
private function getMutex(): Channel
176+
{
177+
if ($this->mutex === null) {
178+
$this->mutex = new Channel(1);
179+
$this->mutex->push(true); // initially unlocked
180+
}
181+
182+
return $this->mutex;
183+
}
184+
185+
private function resetClient(?Client $client = null): void
186+
{
187+
$target = $client ?? $this->client;
188+
$target?->close();
189+
if ($target === $this->client) {
190+
$this->client = null;
191+
}
192+
}
193+
110194
private function getClient(): Client
111195
{
112-
if ($this->client === null || ! $this->client->connected) {
113-
$this->client = new Client($this->host, $this->port, $this->ssl);
114-
$this->client->set([
196+
if ($this->client !== null && ! $this->client->connected) {
197+
$this->client->close();
198+
$this->client = null;
199+
}
200+
201+
if ($this->client === null) {
202+
$client = new Client($this->host, $this->port, $this->ssl);
203+
$client->set([
115204
'timeout' => $this->timeout,
116205
]);
117206

118-
if (! $this->client->connect()) {
207+
if (! $client->connect()) {
208+
$errMsg = $client->errMsg;
209+
$client->close();
119210
throw new RuntimeException(
120-
"Failed to connect to {$this->host}:{$this->port}: " . $this->client->errMsg
211+
"Failed to connect to {$this->host}:{$this->port}: " . $errMsg
121212
);
122213
}
214+
215+
$this->client = $client;
123216
}
124217

125218
return $this->client;
@@ -165,6 +258,7 @@ private function buildHeaders(): array
165258
private function packMessage(string $data): string
166259
{
167260
$compressed = $this->compression !== null ? 1 : 0;
261+
// gRPC message frame: [1-byte compressed flag][4-byte big-endian message length][message]
168262
return pack('CN', $compressed, strlen($data)) . $data;
169263
}
170264
}

src/Transport/SwooleGrpcTransportFactory.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@
1111

1212
final class SwooleGrpcTransportFactory implements TransportFactoryInterface
1313
{
14+
/**
15+
* Note: $cacert, $cert, and $key are accepted to satisfy the TransportFactoryInterface
16+
* contract but are not implemented in this transport. TLS certificates are not supported;
17+
* use the https:// or grpcs:// scheme for basic TLS.
18+
*
19+
* @param null|mixed $compression
20+
*/
1421
public function create(
1522
string $endpoint,
1623
string $contentType = ContentTypes::PROTOBUF,
@@ -45,6 +52,13 @@ public function create(
4552
$compressionType = null;
4653
}
4754

55+
$supportedCompression = [null, TransportFactoryInterface::COMPRESSION_GZIP, TransportFactoryInterface::COMPRESSION_DEFLATE];
56+
if (! in_array($compressionType, $supportedCompression, true)) {
57+
throw new InvalidArgumentException(
58+
sprintf('Unsupported compression type "%s"', $compressionType)
59+
);
60+
}
61+
4862
return new SwooleGrpcTransport(
4963
host: $parsed['host'],
5064
port: $parsed['port'],
@@ -53,6 +67,8 @@ public function create(
5367
timeout: $timeout,
5468
ssl: $parsed['ssl'],
5569
compression: $compressionType,
70+
retryDelay: $retryDelay,
71+
maxRetries: $maxRetries,
5672
);
5773
}
5874

tests/Unit/Factory/Metric/Exporter/OtlpGrpcMetricExporterFactoryTest.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,17 @@ public function testMake(): void
3636

3737
$this->assertInstanceOf(MetricExporterInterface::class, $exporter);
3838
}
39+
40+
public function testMakeWithDefaultEndpoint(): void
41+
{
42+
$config = $this->createMock(ConfigInterface::class);
43+
$config->method('get')
44+
->with('open-telemetry.metrics.exporters.otlp_grpc.options', [])
45+
->willReturn([]);
46+
47+
$factory = new OtlpGrpcMetricExporterFactory($config);
48+
$exporter = $factory->make();
49+
50+
$this->assertInstanceOf(MetricExporterInterface::class, $exporter);
51+
}
3952
}

tests/Unit/Factory/Trace/Exporter/OtlpGrpcTraceExporterFactoryTest.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,17 @@ public function testMake(): void
3434

3535
$this->assertInstanceOf(SpanExporterInterface::class, $exporter);
3636
}
37+
38+
public function testMakeWithDefaultEndpoint(): void
39+
{
40+
$config = $this->createMock(ConfigInterface::class);
41+
$config->method('get')
42+
->with('open-telemetry.traces.exporters.otlp_grpc.options', [])
43+
->willReturn([]);
44+
45+
$factory = new OtlpGrpcTraceExporterFactory($config);
46+
$exporter = $factory->make();
47+
48+
$this->assertInstanceOf(SpanExporterInterface::class, $exporter);
49+
}
3750
}

tests/Unit/Transport/SwooleGrpcTransportFactoryTest.php

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,35 @@ public function testCreateWithCompressionNoneTreatedAsNull(): void
119119
$compressionProp = $reflection->getProperty('compression');
120120
$this->assertNull($compressionProp->getValue($transport));
121121
}
122+
123+
public function testCreateWithRetryParameters(): void
124+
{
125+
$factory = new SwooleGrpcTransportFactory();
126+
127+
$transport = $factory->create(
128+
endpoint: 'http://localhost:4317/opentelemetry.proto.collector.trace.v1.TraceService/Export',
129+
contentType: ContentTypes::PROTOBUF,
130+
retryDelay: 200,
131+
maxRetries: 5,
132+
);
133+
134+
$reflection = new ReflectionClass($transport);
135+
136+
$this->assertSame(200, $reflection->getProperty('retryDelay')->getValue($transport));
137+
$this->assertSame(5, $reflection->getProperty('maxRetries')->getValue($transport));
138+
}
139+
140+
public function testCreateThrowsExceptionForUnsupportedCompression(): void
141+
{
142+
$factory = new SwooleGrpcTransportFactory();
143+
144+
$this->expectException(InvalidArgumentException::class);
145+
$this->expectExceptionMessage('Unsupported compression type "br"');
146+
147+
$factory->create(
148+
endpoint: 'http://localhost:4317/opentelemetry.proto.collector.trace.v1.TraceService/Export',
149+
contentType: ContentTypes::PROTOBUF,
150+
compression: 'br',
151+
);
152+
}
122153
}

0 commit comments

Comments
 (0)