Skip to content

Commit 8207896

Browse files
committed
fix: RepeatedScan::execute is CPU-work not I/O
I also renamed `Handle::spawn_blocking` to `Handle::spawn_blocking_io` which better reflects what it does. Signed-off-by: Daniel King <dan@spiraldb.com>
1 parent 9fe20c5 commit 8207896

7 files changed

Lines changed: 12 additions & 13 deletions

File tree

vortex-cuda/src/pooled_read_at.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ impl VortexReadAt for PooledFileReadAt {
106106
async move {
107107
let mut target = pool.get(length)?;
108108
let target = handle
109-
.spawn_blocking(move || {
109+
.spawn_blocking_io(move || {
110110
read_exact_at(&file, target.as_mut_slice(), offset)?;
111111
Ok::<_, io::Error>(target)
112112
})
@@ -234,7 +234,7 @@ impl VortexReadAt for PooledObjectStoreReadAt {
234234
#[cfg(not(target_arch = "wasm32"))]
235235
GetResultPayload::File(file, _) => {
236236
target = handle
237-
.spawn_blocking(move || {
237+
.spawn_blocking_io(move || {
238238
read_exact_at(&file, target.as_mut_slice(), range.start)?;
239239
Ok::<_, io::Error>(target)
240240
})

vortex-duckdb/src/duckdb/file_system.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ impl VortexWrite for DuckDbFsWriter {
118118

119119
let runtime = RUNTIME.handle();
120120
let buffer = runtime
121-
.spawn_blocking(move || {
121+
.spawn_blocking_io(move || {
122122
let mut err: cpp::duckdb_vx_error = ptr::null_mut();
123123
let mut out_len: cpp::idx_t = 0;
124124
let status = unsafe {
@@ -149,7 +149,7 @@ impl VortexWrite for DuckDbFsWriter {
149149

150150
let runtime = RUNTIME.handle();
151151
runtime
152-
.spawn_blocking(move || {
152+
.spawn_blocking_io(move || {
153153
let mut err: cpp::duckdb_vx_error = ptr::null_mut();
154154
let status = unsafe { cpp::duckdb_vx_fs_sync(handle.as_ptr(), &raw mut err) };
155155
if status != cpp::duckdb_state::DuckDBSuccess {

vortex-duckdb/src/filesystem.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ impl FileSystem for DuckDbFileSystem {
133133
stream::once(async move {
134134
RUNTIME
135135
.handle()
136-
.spawn_blocking(move || list_recursive(ctx, &directory_url, &base_url))
136+
.spawn_blocking_io(move || list_recursive(ctx, &directory_url, &base_url))
137137
.await
138138
})
139139
.flat_map(|result| match result {
@@ -271,7 +271,7 @@ impl VortexReadAt for DuckDbFsReader {
271271

272272
let runtime = RUNTIME.handle();
273273
let size = runtime
274-
.spawn_blocking(move || {
274+
.spawn_blocking_io(move || {
275275
let mut err: cpp::duckdb_vx_error = ptr::null_mut();
276276
let mut size_out: cpp::idx_t = 0;
277277
let status = unsafe {
@@ -301,7 +301,7 @@ impl VortexReadAt for DuckDbFsReader {
301301
async move {
302302
let runtime = RUNTIME.handle();
303303
let result: VortexResult<BufferHandle> = runtime
304-
.spawn_blocking(move || -> VortexResult<BufferHandle> {
304+
.spawn_blocking_io(move || -> VortexResult<BufferHandle> {
305305
let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment);
306306
unsafe { buffer.set_len(length) };
307307

vortex-io/src/object_store/read_at.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ impl VortexReadAt for ObjectStoreReadAt {
134134
#[cfg(not(target_arch = "wasm32"))]
135135
GetResultPayload::File(file, _) => {
136136
handle
137-
.spawn_blocking(move || {
137+
.spawn_blocking_io(move || {
138138
read_exact_at(&file, buffer.as_mut_slice(), range.start)?;
139139
Ok::<_, io::Error>(buffer)
140140
})

vortex-io/src/runtime/handle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ impl Handle {
122122
}
123123

124124
/// Spawn a blocking I/O task for execution on the runtime.
125-
pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
125+
pub fn spawn_blocking_io<F, R>(&self, f: F) -> Task<R>
126126
where
127127
F: FnOnce() -> R + Send + 'static,
128128
R: Send + 'static,

vortex-io/src/std_file/read_at.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl VortexReadAt for FileReadAt {
126126
let allocator = Arc::clone(&self.allocator);
127127
async move {
128128
handle
129-
.spawn_blocking(move || {
129+
.spawn_blocking_io(move || {
130130
let mut buffer = allocator.allocate(length, alignment)?;
131131
read_exact_at(&file, buffer.as_mut_slice(), offset)?;
132132
Ok(BufferHandle::new_host(buffer.freeze()))

vortex-layout/src/scan/scan_builder.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,9 +372,8 @@ impl<A: 'static + Send> Stream for LazyScanStream<A> {
372372
.unwrap_or(1);
373373
let concurrency = builder.concurrency * num_workers;
374374
let handle = builder.session.handle();
375-
let task = handle.spawn_blocking(move || {
376-
builder.prepare().and_then(|scan| scan.execute(None))
377-
});
375+
let task = handle
376+
.spawn_cpu(move || builder.prepare().and_then(|scan| scan.execute(None)));
378377
self.state = LazyScanState::Preparing(PreparingScan {
379378
ordered,
380379
concurrency,

0 commit comments

Comments
 (0)