Skip to content

Commit e8915f1

Browse files
authored
fix(http): fix aborting a streaming response (#2562)
1 parent 6de61f8 commit e8915f1

10 files changed

Lines changed: 150 additions & 55 deletions

File tree

.changes/http-stream-cancel.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"http": "patch"
3+
"http-js": "patch"
4+
---
5+
6+
Fix aborting a request in the middle of a streaming response.
7+

plugins/http/api-iife.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugins/http/build.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,13 @@
66
#[allow(dead_code)]
77
mod scope;
88

9-
const COMMANDS: &[&str] = &["fetch", "fetch_cancel", "fetch_send", "fetch_read_body"];
9+
const COMMANDS: &[&str] = &[
10+
"fetch",
11+
"fetch_cancel",
12+
"fetch_send",
13+
"fetch_read_body",
14+
"fetch_cancel_body",
15+
];
1016

1117
/// HTTP scope entry.
1218
#[derive(schemars::JsonSchema)]

plugins/http/guest-js/index.ts

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* @module
2727
*/
2828

29-
import { Channel, invoke } from '@tauri-apps/api/core'
29+
import { invoke } from '@tauri-apps/api/core'
3030

3131
/**
3232
* Configuration of a proxy that a Client should pass requests to.
@@ -126,7 +126,7 @@ export async function fetch(
126126
input: URL | Request | string,
127127
init?: RequestInit & ClientOptions
128128
): Promise<Response> {
129-
// abort early here if needed
129+
// Optimistically check for abort signal and avoid doing any work
130130
const signal = init?.signal
131131
if (signal?.aborted) {
132132
throw new Error(ERROR_REQUEST_CANCELLED)
@@ -181,7 +181,7 @@ export async function fetch(
181181
]
182182
)
183183

184-
// abort early here if needed
184+
// Optimistically check for abort signal and avoid doing any work on the Rust side
185185
if (signal?.aborted) {
186186
throw new Error(ERROR_REQUEST_CANCELLED)
187187
}
@@ -201,7 +201,8 @@ export async function fetch(
201201

202202
const abort = () => invoke('plugin:http|fetch_cancel', { rid })
203203

204-
// abort early here if needed
204+
// Optimistically check for abort signal
205+
// and avoid doing any work after doing intial work on the Rust side
205206
if (signal?.aborted) {
206207
// we don't care about the result of this proimse
207208
// eslint-disable-next-line @typescript-eslint/no-floating-promises
@@ -229,41 +230,52 @@ export async function fetch(
229230
rid
230231
})
231232

233+
const dropBody = () => {
234+
return invoke('plugin:http|fetch_cancel_body', { rid: responseRid })
235+
}
236+
237+
const readChunk = async (
238+
controller: ReadableStreamDefaultController<Uint8Array>
239+
) => {
240+
let data: ArrayBuffer
241+
try {
242+
data = await invoke('plugin:http|fetch_read_body', {
243+
rid: responseRid
244+
})
245+
} catch (e) {
246+
// close the stream if an error occurs
247+
// and drop the body on Rust side
248+
controller.error(e)
249+
void dropBody()
250+
return
251+
}
252+
253+
const dataUint8 = new Uint8Array(data)
254+
const lastByte = dataUint8[dataUint8.byteLength - 1]
255+
const actualData = dataUint8.slice(0, dataUint8.byteLength - 1)
256+
257+
// close when the signal to close (last byte is 1) is sent from the IPC.
258+
if (lastByte === 1) {
259+
controller.close()
260+
return
261+
}
262+
263+
controller.enqueue(actualData)
264+
}
265+
232266
// no body for 101, 103, 204, 205 and 304
233267
// see https://fetch.spec.whatwg.org/#null-body-status
234268
const body = [101, 103, 204, 205, 304].includes(status)
235269
? null
236-
: new ReadableStream({
270+
: new ReadableStream<Uint8Array>({
237271
start: (controller) => {
238-
const streamChannel = new Channel<ArrayBuffer | number[]>()
239-
streamChannel.onmessage = (res: ArrayBuffer | number[]) => {
240-
// close early if aborted
241-
if (signal?.aborted) {
242-
controller.error(ERROR_REQUEST_CANCELLED)
243-
return
244-
}
245-
246-
const resUint8 = new Uint8Array(res)
247-
const lastByte = resUint8[resUint8.byteLength - 1]
248-
const actualRes = resUint8.slice(0, resUint8.byteLength - 1)
249-
250-
// close when the signal to close (last byte is 1) is sent from the IPC.
251-
if (lastByte == 1) {
252-
controller.close()
253-
return
254-
}
255-
256-
controller.enqueue(actualRes)
257-
}
258-
259-
// run a non-blocking body stream fetch
260-
invoke('plugin:http|fetch_read_body', {
261-
rid: responseRid,
262-
streamChannel
263-
}).catch((e) => {
264-
controller.error(e)
272+
// listen for abort events to cancel reading
273+
signal?.addEventListener('abort', () => {
274+
controller.error(ERROR_REQUEST_CANCELLED)
275+
void dropBody()
265276
})
266-
}
277+
},
278+
pull: (controller) => readChunk(controller)
267279
})
268280

269281
const res = new Response(body, {
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Automatically generated - DO NOT EDIT!
2+
3+
"$schema" = "../../schemas/schema.json"
4+
5+
[[permission]]
6+
identifier = "allow-fetch-cancel-body"
7+
description = "Enables the fetch_cancel_body command without any pre-configured scope."
8+
commands.allow = ["fetch_cancel_body"]
9+
10+
[[permission]]
11+
identifier = "deny-fetch-cancel-body"
12+
description = "Denies the fetch_cancel_body command without any pre-configured scope."
13+
commands.deny = ["fetch_cancel_body"]

plugins/http/permissions/autogenerated/reference.md

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ All fetch operations are enabled.
1515

1616
- `allow-fetch`
1717
- `allow-fetch-cancel`
18-
- `allow-fetch-read-body`
1918
- `allow-fetch-send`
19+
- `allow-fetch-read-body`
20+
- `allow-fetch-cancel-body`
2021

2122
## Permission Table
2223

@@ -82,6 +83,32 @@ Denies the fetch_cancel command without any pre-configured scope.
8283
<tr>
8384
<td>
8485

86+
`http:allow-fetch-cancel-body`
87+
88+
</td>
89+
<td>
90+
91+
Enables the fetch_cancel_body command without any pre-configured scope.
92+
93+
</td>
94+
</tr>
95+
96+
<tr>
97+
<td>
98+
99+
`http:deny-fetch-cancel-body`
100+
101+
</td>
102+
<td>
103+
104+
Denies the fetch_cancel_body command without any pre-configured scope.
105+
106+
</td>
107+
</tr>
108+
109+
<tr>
110+
<td>
111+
85112
`http:allow-fetch-read-body`
86113

87114
</td>

plugins/http/permissions/default.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ All fetch operations are enabled.
1717
permissions = [
1818
"allow-fetch",
1919
"allow-fetch-cancel",
20-
"allow-fetch-read-body",
2120
"allow-fetch-send",
21+
"allow-fetch-read-body",
22+
"allow-fetch-cancel-body",
2223
]

plugins/http/permissions/schemas/schema.json

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,18 @@
318318
"const": "deny-fetch-cancel",
319319
"markdownDescription": "Denies the fetch_cancel command without any pre-configured scope."
320320
},
321+
{
322+
"description": "Enables the fetch_cancel_body command without any pre-configured scope.",
323+
"type": "string",
324+
"const": "allow-fetch-cancel-body",
325+
"markdownDescription": "Enables the fetch_cancel_body command without any pre-configured scope."
326+
},
327+
{
328+
"description": "Denies the fetch_cancel_body command without any pre-configured scope.",
329+
"type": "string",
330+
"const": "deny-fetch-cancel-body",
331+
"markdownDescription": "Denies the fetch_cancel_body command without any pre-configured scope."
332+
},
321333
{
322334
"description": "Enables the fetch_read_body command without any pre-configured scope.",
323335
"type": "string",
@@ -343,10 +355,10 @@
343355
"markdownDescription": "Denies the fetch_send command without any pre-configured scope."
344356
},
345357
{
346-
"description": "This permission set configures what kind of\nfetch operations are available from the http plugin.\n\nThis enables all fetch operations but does not\nallow explicitly any origins to be fetched. This needs to\nbe manually configured before usage.\n\n#### Granted Permissions\n\nAll fetch operations are enabled.\n\n\n#### This default permission set includes:\n\n- `allow-fetch`\n- `allow-fetch-cancel`\n- `allow-fetch-read-body`\n- `allow-fetch-send`",
358+
"description": "This permission set configures what kind of\nfetch operations are available from the http plugin.\n\nThis enables all fetch operations but does not\nallow explicitly any origins to be fetched. This needs to\nbe manually configured before usage.\n\n#### Granted Permissions\n\nAll fetch operations are enabled.\n\n\n#### This default permission set includes:\n\n- `allow-fetch`\n- `allow-fetch-cancel`\n- `allow-fetch-send`\n- `allow-fetch-read-body`\n- `allow-fetch-cancel-body`",
347359
"type": "string",
348360
"const": "default",
349-
"markdownDescription": "This permission set configures what kind of\nfetch operations are available from the http plugin.\n\nThis enables all fetch operations but does not\nallow explicitly any origins to be fetched. This needs to\nbe manually configured before usage.\n\n#### Granted Permissions\n\nAll fetch operations are enabled.\n\n\n#### This default permission set includes:\n\n- `allow-fetch`\n- `allow-fetch-cancel`\n- `allow-fetch-read-body`\n- `allow-fetch-send`"
361+
"markdownDescription": "This permission set configures what kind of\nfetch operations are available from the http plugin.\n\nThis enables all fetch operations but does not\nallow explicitly any origins to be fetched. This needs to\nbe manually configured before usage.\n\n#### Granted Permissions\n\nAll fetch operations are enabled.\n\n\n#### This default permission set includes:\n\n- `allow-fetch`\n- `allow-fetch-cancel`\n- `allow-fetch-send`\n- `allow-fetch-read-body`\n- `allow-fetch-cancel-body`"
350362
}
351363
]
352364
}

plugins/http/src/commands.rs

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
1010
use tauri::{
1111
async_runtime::Mutex,
1212
command,
13-
ipc::{Channel, CommandScope, GlobalScope},
13+
ipc::{CommandScope, GlobalScope},
1414
Manager, ResourceId, ResourceTable, Runtime, State, Webview,
1515
};
1616
use tokio::sync::oneshot::{channel, Receiver, Sender};
@@ -415,26 +415,42 @@ pub async fn fetch_send<R: Runtime>(
415415
pub async fn fetch_read_body<R: Runtime>(
416416
webview: Webview<R>,
417417
rid: ResourceId,
418-
stream_channel: Channel<tauri::ipc::InvokeResponseBody>,
419-
) -> crate::Result<()> {
418+
) -> crate::Result<tauri::ipc::Response> {
420419
let res = {
421-
let mut resources_table = webview.resources_table();
422-
resources_table.take::<ReqwestResponse>(rid)?
420+
let resources_table = webview.resources_table();
421+
resources_table.get::<ReqwestResponse>(rid)?
423422
};
424423

425-
let mut res = Arc::into_inner(res).unwrap().0;
424+
// SAFETY: we can access the inner value mutably
425+
// because we are the only ones with a reference to it
426+
// and we don't want to use `Arc::into_inner` because we want to keep the value in the table
427+
// for potential future calls to `fetch_cancel_body`
428+
let res_ptr = Arc::as_ptr(&res) as *mut ReqwestResponse;
429+
let res = unsafe { &mut *res_ptr };
430+
let res = &mut res.0;
426431

427-
// send response through IPC channel
428-
while let Some(chunk) = res.chunk().await? {
429-
let mut chunk = chunk.to_vec();
430-
// append 0 to indicate we are not done yet
431-
chunk.push(0);
432-
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(chunk))?;
433-
}
432+
let Some(chunk) = res.chunk().await? else {
433+
let mut resources_table = webview.resources_table();
434+
resources_table.close(rid)?;
434435

435-
// send 1 to indicate we are done
436-
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(vec![1]))?;
436+
// return a response with a single byte to indicate that the body is empty
437+
return Ok(tauri::ipc::Response::new(vec![1]));
438+
};
439+
440+
let mut chunk = chunk.to_vec();
441+
// append a 0 byte to indicate that the body is not empty
442+
chunk.push(0);
437443

444+
Ok(tauri::ipc::Response::new(chunk))
445+
}
446+
447+
#[command]
448+
pub async fn fetch_cancel_body<R: Runtime>(
449+
webview: Webview<R>,
450+
rid: ResourceId,
451+
) -> crate::Result<()> {
452+
let mut resources_table = webview.resources_table();
453+
resources_table.close(rid)?;
438454
Ok(())
439455
}
440456

plugins/http/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ pub fn init<R: Runtime>() -> TauriPlugin<R> {
8484
commands::fetch,
8585
commands::fetch_cancel,
8686
commands::fetch_send,
87-
commands::fetch_read_body
87+
commands::fetch_read_body,
88+
commands::fetch_cancel_body,
8889
])
8990
.build()
9091
}

0 commit comments

Comments
 (0)