Skip to content

Commit a83c9b3

Browse files
authored
Try and improve the perf of natural file splits in DF (#7609)
## Summary Try and re-capture some of the performance we lost in #7591, only doing the extra work when its actually required. --------- Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 9d59a8e commit a83c9b3

1 file changed

Lines changed: 45 additions & 21 deletions

File tree

vortex-datafusion/src/persistent/opener.rs

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -307,13 +307,7 @@ impl FileOpener for VortexOpener {
307307
}
308308
};
309309

310-
let natural_split_ranges = natural_split_ranges_for_file(
311-
natural_split_ranges.as_ref(),
312-
&file.object_meta.location,
313-
&layout_reader,
314-
)?;
315-
316-
let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader);
310+
let mut scan_builder = ScanBuilder::new(session.clone(), Arc::clone(&layout_reader));
317311

318312
if let Some(extensions) = file.extensions
319313
&& let Some(vortex_plan) = extensions.downcast_ref::<VortexAccessPlan>()
@@ -328,16 +322,25 @@ impl FileOpener for VortexOpener {
328322
end: u64::try_from(file_range.end)
329323
.map_err(|_| exec_datafusion_err!("Vortex file range end is negative"))?,
330324
};
331-
332-
let Some(row_range) = split_aligned_row_range(
333-
byte_range,
334-
file.object_meta.size,
335-
natural_split_ranges.as_ref(),
336-
) else {
337-
return Ok(stream::empty().boxed());
338-
};
339-
340-
scan_builder = scan_builder.with_row_range(row_range);
325+
if byte_range.start != 0 || byte_range.end != file.object_meta.size {
326+
// Full-file scans already cover every natural split. Only translate the
327+
// byte range back into row boundaries when DataFusion has trimmed the file.
328+
let natural_split_ranges = natural_split_ranges_for_file(
329+
natural_split_ranges.as_ref(),
330+
&file.object_meta.location,
331+
&layout_reader,
332+
)?;
333+
334+
let Some(row_range) = split_aligned_row_range(
335+
byte_range,
336+
file.object_meta.size,
337+
natural_split_ranges.as_ref(),
338+
) else {
339+
return Ok(stream::empty().boxed());
340+
};
341+
342+
scan_builder = scan_builder.with_row_range(row_range);
343+
}
341344
}
342345

343346
let filter = filter
@@ -477,6 +480,8 @@ fn compute_natural_split_ranges(layout_reader: &dyn LayoutReader) -> DFResult<Ar
477480
}
478481

479482
/// Translate a DataFusion byte range to the contiguous natural split ranges it owns.
483+
/// Most splits are assigned by midpoint, but the leading split stays with the range that owns
484+
/// byte 0 so a tiny first byte range still claims the first rows.
480485
fn split_aligned_row_range(
481486
byte_range: Range<u64>,
482487
total_size: u64,
@@ -491,10 +496,13 @@ fn split_aligned_row_range(
491496
return None;
492497
}
493498

494-
let mut owned_splits = split_ranges.iter().filter(|split_range| {
495-
let midpoint_byte = split_midpoint_to_byte(split_range, row_count, total_size);
496-
byte_range.contains(&midpoint_byte)
497-
});
499+
let mut owned_splits = split_ranges
500+
.iter()
501+
.enumerate()
502+
.filter_map(|(idx, split_range)| {
503+
let assignment_byte = split_assignment_byte(idx, split_range, row_count, total_size);
504+
byte_range.contains(&assignment_byte).then_some(split_range)
505+
});
498506

499507
let first_split = owned_splits.next()?;
500508
let mut row_range = first_split.start..first_split.end;
@@ -505,6 +513,21 @@ fn split_aligned_row_range(
505513
Some(row_range)
506514
}
507515

516+
fn split_assignment_byte(
517+
idx: usize,
518+
split_range: &Range<u64>,
519+
row_count: u64,
520+
total_size: u64,
521+
) -> u64 {
522+
if idx == 0 && split_range.start == 0 {
523+
// Byte 0 is the only stable representative for the leading split. A midpoint can fall
524+
// into the next DataFusion byte range and leave the first range with no rows to read.
525+
0
526+
} else {
527+
split_midpoint_to_byte(split_range, row_count, total_size)
528+
}
529+
}
530+
508531
fn split_midpoint_to_byte(split_range: &Range<u64>, row_count: u64, total_size: u64) -> u64 {
509532
let midpoint_row = split_range.start + (split_range.end - split_range.start) / 2;
510533
let midpoint_byte = (u128::from(midpoint_row) * u128::from(total_size)) / u128::from(row_count);
@@ -566,6 +589,7 @@ mod tests {
566589
#[case(3..7, 10, vec![0..2, 2..5, 5..10], Some(2..5))]
567590
#[case(1..8, 10, vec![0..1, 1..9, 9..10], Some(1..9))]
568591
#[case(1..4, 16, vec![0..1, 1..2, 2..3, 3..4], None)]
592+
#[case(0..1, 10, vec![0..2, 2..10], Some(0..2))]
569593
fn test_split_aligned_row_range(
570594
#[case] byte_range: Range<u64>,
571595
#[case] total_size: u64,

0 commit comments

Comments
 (0)