From 28cec062fd76d120f271d1569b4a3c94350be6af Mon Sep 17 00:00:00 2001 From: benthecarman Date: Fri, 22 May 2026 22:20:53 -0500 Subject: [PATCH 1/4] Bound streaming gRPC frame sizes Reject oversized server-streaming gRPC messages as soon as their frame header is available. Also reject compressed stream frames consistently and avoid unchecked frame length arithmetic before slicing buffers. --- ldk-server-client/src/client.rs | 80 +++++++++++++++++++++++++++++++-- 1 file changed, 76 insertions(+), 4 deletions(-) diff --git a/ldk-server-client/src/client.rs b/ldk-server-client/src/client.rs index 4c08d738..6dbdf9d3 100644 --- a/ldk-server-client/src/client.rs +++ b/ldk-server-client/src/client.rs @@ -71,6 +71,12 @@ use crate::error::LdkServerErrorCode::{ type StreamingClient = HyperClient, HyperBody>; +const GRPC_FRAME_HEADER_LEN: usize = 5; + +// Applies to each server-streaming gRPC message. Graph RPCs use the unary client path and are not +// constrained by this limit. +const MAX_GRPC_STREAM_MESSAGE_LEN: usize = 4 * 1024 * 1024; + /// Client to access a hosted instance of LDK Server via gRPC. /// /// The client requires the server's TLS certificate to be provided for verification. @@ -568,19 +574,43 @@ impl GrpcStream { pub async fn next_message(&mut self) -> Option> { loop { // Try to decode a complete gRPC frame from the buffer - if self.buf.len() >= 5 { + if self.buf.len() >= GRPC_FRAME_HEADER_LEN { + if self.buf[0] != 0 { + return Some(Err(LdkServerError::new( + InternalError, + "gRPC stream compression is not supported", + ))); + } let msg_len = u32::from_be_bytes([self.buf[1], self.buf[2], self.buf[3], self.buf[4]]) as usize; - if self.buf.len() >= 5 + msg_len { - let proto_bytes = &self.buf[5..5 + msg_len]; + if msg_len > MAX_GRPC_STREAM_MESSAGE_LEN { + return Some(Err(LdkServerError::new( + InternalError, + format!( + "gRPC stream message exceeds maximum size of {} bytes", + MAX_GRPC_STREAM_MESSAGE_LEN + ), + ))); + } + let frame_len = match GRPC_FRAME_HEADER_LEN.checked_add(msg_len) { + Some(frame_len) => frame_len, + None => { + return Some(Err(LdkServerError::new( + InternalError, + "gRPC stream frame length overflow", + ))); + }, + }; + if self.buf.len() >= frame_len { + let proto_bytes = &self.buf[GRPC_FRAME_HEADER_LEN..frame_len]; let result = M::decode(proto_bytes).map_err(|e| { LdkServerError::new( InternalError, format!("Failed to decode gRPC stream message: {}", e), ) }); - self.buf.drain(..5 + msg_len); + self.buf.drain(..frame_len); return Some(result); } } @@ -713,6 +743,48 @@ mod tests { assert!(stream.next_message().await.is_none()); } + #[tokio::test] + async fn test_event_stream_rejects_oversized_frame_header() { + let (mut sender, body) = Body::channel(); + sender.send_data(vec![0u8, 0xff, 0xff, 0xff, 0xff].into()).await.unwrap(); + drop(sender); + + let mut stream: EventStream = GrpcStream { + body, + buf: Vec::new(), + trailers_checked: false, + _marker: std::marker::PhantomData, + }; + + let result = stream.next_message().await.unwrap().unwrap_err(); + assert_eq!(result.error_code, InternalError); + assert_eq!( + result.message, + format!( + "gRPC stream message exceeds maximum size of {} bytes", + MAX_GRPC_STREAM_MESSAGE_LEN + ) + ); + } + + #[tokio::test] + async fn test_event_stream_rejects_compressed_frame() { + let (mut sender, body) = Body::channel(); + sender.send_data(vec![1u8, 0, 0, 0, 0].into()).await.unwrap(); + drop(sender); + + let mut stream: EventStream = GrpcStream { + body, + buf: Vec::new(), + trailers_checked: false, + _marker: std::marker::PhantomData, + }; + + let result = stream.next_message().await.unwrap().unwrap_err(); + assert_eq!(result.error_code, InternalError); + assert_eq!(result.message, "gRPC stream compression is not supported"); + } + #[test] fn test_grpc_code_to_error_all_known_codes() { let cases = [ From 1ce6b292c2987d2cffd0078c2939a2e9b1111530 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Fri, 22 May 2026 22:29:45 -0500 Subject: [PATCH 2/4] Bound unary gRPC response reads Read unary client responses incrementally and reject bodies above the configured limit before protobuf decoding. Preallocate only after a valid Content-Length has been checked. --- ldk-server-client/src/client.rs | 64 ++++++++++++++++++++++++++++++--- 1 file changed, 60 insertions(+), 4 deletions(-) diff --git a/ldk-server-client/src/client.rs b/ldk-server-client/src/client.rs index 6dbdf9d3..3e9f53e1 100644 --- a/ldk-server-client/src/client.rs +++ b/ldk-server-client/src/client.rs @@ -73,6 +73,10 @@ type StreamingClient = HyperClient, const GRPC_FRAME_HEADER_LEN: usize = 5; +// Applies to complete unary gRPC responses. The server applies the same cap to unary request +// bodies before protobuf decoding. +const MAX_GRPC_UNARY_RESPONSE_LEN: usize = 10 * 1024 * 1024; + // Applies to each server-streaming gRPC message. Graph RPCs use the unary client path and are not // constrained by this limit. const MAX_GRPC_STREAM_MESSAGE_LEN: usize = 4 * 1024 * 1024; @@ -456,10 +460,7 @@ impl LdkServerClient { return Err(error); } - // Read the response body - let payload = response.bytes().await.map_err(|e| { - LdkServerError::new(InternalError, format!("Failed to read response body: {}", e)) - })?; + let payload = read_grpc_unary_response_body(response).await?; let proto_bytes = decode_grpc_body(&payload) .map_err(|e| LdkServerError::new(InternalError, e.message))?; @@ -514,6 +515,42 @@ impl LdkServerClient { } } +async fn read_grpc_unary_response_body( + mut response: reqwest::Response, +) -> Result, LdkServerError> { + let capacity = if let Some(content_length) = response.content_length() { + check_grpc_unary_response_len(content_length)?; + content_length as usize + } else { + 0 + }; + + let mut payload = Vec::with_capacity(capacity); + while let Some(chunk) = response.chunk().await.map_err(|e| { + LdkServerError::new(InternalError, format!("Failed to read response body: {}", e)) + })? { + let len = payload.len().checked_add(chunk.len()).ok_or_else(|| { + LdkServerError::new(InternalError, "gRPC unary response body length overflow") + })?; + check_grpc_unary_response_len(len as u64)?; + payload.extend_from_slice(&chunk); + } + Ok(payload) +} + +fn check_grpc_unary_response_len(len: u64) -> Result<(), LdkServerError> { + if len > MAX_GRPC_UNARY_RESPONSE_LEN as u64 { + return Err(LdkServerError::new( + InternalError, + format!( + "gRPC unary response exceeds maximum size of {} bytes", + MAX_GRPC_UNARY_RESPONSE_LEN + ), + )); + } + Ok(()) +} + /// Map a gRPC status code to an LdkServerError. fn grpc_code_to_error(code: u32, message: String) -> LdkServerError { match code { @@ -721,6 +758,25 @@ mod tests { assert_eq!(err.message, "gRPC stream became unavailable: server shutting down"); } + #[test] + fn test_grpc_unary_response_len_allows_limit() { + assert!(check_grpc_unary_response_len(MAX_GRPC_UNARY_RESPONSE_LEN as u64).is_ok()); + } + + #[test] + fn test_grpc_unary_response_len_rejects_oversized() { + let err = + check_grpc_unary_response_len(MAX_GRPC_UNARY_RESPONSE_LEN as u64 + 1).unwrap_err(); + assert_eq!(err.error_code, InternalError); + assert_eq!( + err.message, + format!( + "gRPC unary response exceeds maximum size of {} bytes", + MAX_GRPC_UNARY_RESPONSE_LEN + ) + ); + } + #[tokio::test] async fn test_event_stream_surfaces_terminal_grpc_status() { let (mut sender, body) = Body::channel(); From 66fdafe1b46ed58f3eeed8909171bd0e122efc13 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Wed, 27 May 2026 13:30:33 +0200 Subject: [PATCH 3/4] Set gRPC response content length --- ldk-server-grpc/src/grpc.rs | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/ldk-server-grpc/src/grpc.rs b/ldk-server-grpc/src/grpc.rs index bf65e4b2..59d15764 100644 --- a/ldk-server-grpc/src/grpc.rs +++ b/ldk-server-grpc/src/grpc.rs @@ -191,6 +191,7 @@ pub fn grpc_error_response(status: GrpcStatus) -> http::Response { .status(200) .header("content-type", "application/grpc+proto") .header("grpc-accept-encoding", "identity") + .header("content-length", "0") .header("grpc-status", status.code.to_string()); if !status.message.is_empty() { let encoded = percent_encode(&status.message); @@ -203,12 +204,15 @@ pub fn grpc_error_response(status: GrpcStatus) -> http::Response { /// Build an HTTP 200 response with gRPC content-type and the given body. pub fn grpc_response(body: GrpcBody) -> http::Response { - http::Response::builder() + let mut builder = http::Response::builder() .status(200) .header("content-type", "application/grpc+proto") - .header("grpc-accept-encoding", "identity") - .body(body) - .unwrap() + .header("grpc-accept-encoding", "identity"); + if let GrpcBody::Unary { data, .. } = &body { + let len = data.as_ref().map_or(0, Bytes::len); + builder = builder.header("content-length", len.to_string()); + } + builder.body(body).unwrap() } /// Validate that the request looks like a gRPC call. @@ -322,6 +326,30 @@ mod tests { assert!(decoded.is_empty()); } + #[test] + fn test_grpc_response_sets_unary_content_length() { + let data = encode_grpc_frame(b"hello"); + let expected_len = data.len().to_string(); + let response = grpc_response(GrpcBody::Unary { data: Some(data), trailers_sent: false }); + + assert_eq!(response.headers().get("content-length").unwrap(), expected_len.as_str()); + } + + #[test] + fn test_grpc_response_omits_stream_content_length() { + let (_tx, rx) = tokio::sync::mpsc::channel(1); + let response = grpc_response(GrpcBody::Stream { rx, done: false }); + + assert!(response.headers().get("content-length").is_none()); + } + + #[test] + fn test_grpc_error_response_sets_zero_content_length() { + let response = grpc_error_response(GrpcStatus::new(GRPC_STATUS_INVALID_ARGUMENT, "bad")); + + assert_eq!(response.headers().get("content-length").unwrap(), "0"); + } + #[test] fn test_decode_too_short() { assert!(decode_grpc_body(&[0, 0, 0]).is_err()); From f9d1f4642747cce19cc66c142d183e5ca55f46e2 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Wed, 27 May 2026 13:30:48 +0200 Subject: [PATCH 4/4] Validate gRPC request content length --- ldk-server-client/src/client.rs | 4 ++ ldk-server/src/service.rs | 94 +++++++++++++++++++++++++++++++-- 2 files changed, 95 insertions(+), 3 deletions(-) diff --git a/ldk-server-client/src/client.rs b/ldk-server-client/src/client.rs index 3e9f53e1..b672ad0f 100644 --- a/ldk-server-client/src/client.rs +++ b/ldk-server-client/src/client.rs @@ -436,6 +436,7 @@ impl LdkServerClient { &self, request: &Rq, method: &str, ) -> Result { let grpc_body = encode_grpc_frame(&request.encode_to_vec()).to_vec(); + let content_length = grpc_body.len().to_string(); let url = format!("https://{}{}{}", self.base_url, GRPC_SERVICE_PREFIX, method); let auth_header = self.compute_auth_header(&grpc_body); @@ -444,6 +445,7 @@ impl LdkServerClient { .client .post(&url) .header("content-type", "application/grpc+proto") + .header("content-length", content_length) .header("te", "trailers") .header("x-auth", auth_header) .body(grpc_body) @@ -476,6 +478,7 @@ impl LdkServerClient { &self, request: &Rq, method: &str, ) -> Result, LdkServerError> { let grpc_body = encode_grpc_frame(&request.encode_to_vec()).to_vec(); + let content_length = grpc_body.len().to_string(); let url = format!("https://{}{}{}", self.base_url, GRPC_SERVICE_PREFIX, method); let auth_header = self.compute_auth_header(&grpc_body); @@ -486,6 +489,7 @@ impl LdkServerClient { HyperRequest::post(&url) .version(Version::HTTP_2) .header("content-type", "application/grpc+proto") + .header("content-length", content_length) .header("te", "trailers") .header("x-auth", auth_header) .body(HyperBody::from(grpc_body)) diff --git a/ldk-server/src/service.rs b/ldk-server/src/service.rs index 2358bc9e..8706b379 100644 --- a/ldk-server/src/service.rs +++ b/ldk-server/src/service.rs @@ -14,7 +14,7 @@ use std::sync::Arc; use http_body_util::{BodyExt, Limited}; use hyper::body::Incoming; use hyper::service::Service; -use hyper::{Request, Response}; +use hyper::{HeaderMap, Request, Response}; use ldk_node::bitcoin::hashes::hmac::{Hmac, HmacEngine}; use ldk_node::bitcoin::hashes::{sha256, Hash, HashEngine}; use ldk_node::Node; @@ -256,7 +256,11 @@ impl Service> for NodeService { let shutdown_rx = self.shutdown_rx.clone(); let (request_parts, request_body) = req.into_parts(); let future: Self::Future = Box::pin(async move { - let body_bytes = match read_request_body(request_body).await { + let content_length = match request_content_length(&request_parts.headers) { + Ok(content_length) => content_length, + Err(status) => return Ok(grpc_error_response(status)), + }; + let body_bytes = match read_request_body(request_body, content_length).await { Ok(bytes) => bytes, Err(status) => return Ok(grpc_error_response(status)), }; @@ -499,7 +503,39 @@ async fn handle_grpc_unary< } } -async fn read_request_body(body: Incoming) -> Result { +fn request_content_length(headers: &HeaderMap) -> Result, GrpcStatus> { + let Some(content_length) = headers.get("content-length") else { + return Ok(None); + }; + let len = content_length.to_str().ok().and_then(|value| value.parse::().ok()).ok_or_else( + || GrpcStatus::new(GRPC_STATUS_INVALID_ARGUMENT, "Invalid content-length header"), + )?; + if len > MAX_BODY_SIZE as u64 { + return Err(GrpcStatus::new( + GRPC_STATUS_INVALID_ARGUMENT, + "Request body too large or failed to read", + )); + } + Ok(Some(len)) +} + +fn validate_request_body_len( + content_length: Option, actual_len: usize, +) -> Result<(), GrpcStatus> { + if let Some(expected_len) = content_length { + if expected_len != actual_len as u64 { + return Err(GrpcStatus::new( + GRPC_STATUS_INVALID_ARGUMENT, + "Request body length does not match content-length", + )); + } + } + Ok(()) +} + +async fn read_request_body( + body: Incoming, content_length: Option, +) -> Result { let limited_body = Limited::new(body, MAX_BODY_SIZE); let bytes = match limited_body.collect().await { Ok(collected) => collected.to_bytes(), @@ -510,6 +546,7 @@ async fn read_request_body(body: Incoming) -> Result { )); }, }; + validate_request_body_len(content_length, bytes.len())?; Ok(bytes) } @@ -606,4 +643,55 @@ mod tests { assert!(result.is_err()); assert_eq!(result.unwrap_err().error_code, LdkServerErrorCode::AuthError); } + + #[test] + fn test_request_content_length_missing() { + let headers = HeaderMap::new(); + assert_eq!(request_content_length(&headers).unwrap(), None); + } + + #[test] + fn test_request_content_length_parses_value() { + let mut headers = HeaderMap::new(); + headers.insert("content-length", "42".parse().unwrap()); + + assert_eq!(request_content_length(&headers).unwrap(), Some(42)); + } + + #[test] + fn test_request_content_length_rejects_invalid_value() { + let mut headers = HeaderMap::new(); + headers.insert("content-length", "not-a-number".parse().unwrap()); + + let err = request_content_length(&headers).unwrap_err(); + assert_eq!(err.code, GRPC_STATUS_INVALID_ARGUMENT); + assert_eq!(err.message, "Invalid content-length header"); + } + + #[test] + fn test_request_content_length_rejects_oversized_value() { + let mut headers = HeaderMap::new(); + headers.insert("content-length", (MAX_BODY_SIZE as u64 + 1).to_string().parse().unwrap()); + + let err = request_content_length(&headers).unwrap_err(); + assert_eq!(err.code, GRPC_STATUS_INVALID_ARGUMENT); + assert_eq!(err.message, "Request body too large or failed to read"); + } + + #[test] + fn test_validate_request_body_len_allows_matching_length() { + assert!(validate_request_body_len(Some(5), 5).is_ok()); + } + + #[test] + fn test_validate_request_body_len_allows_missing_length() { + assert!(validate_request_body_len(None, 5).is_ok()); + } + + #[test] + fn test_validate_request_body_len_rejects_mismatch() { + let err = validate_request_body_len(Some(6), 5).unwrap_err(); + assert_eq!(err.code, GRPC_STATUS_INVALID_ARGUMENT); + assert_eq!(err.message, "Request body length does not match content-length"); + } }