@@ -10,6 +10,7 @@ DUCKDB_INCLUDES_BEGIN
1010#include " duckdb.h"
1111#include " duckdb/catalog/catalog.hpp"
1212#include " duckdb/common/insertion_order_preserving_map.hpp"
13+ #include " duckdb/common/multi_file/multi_file_reader.hpp"
1314#include " duckdb/function/table_function.hpp"
1415#include " duckdb/main/capi/capi_internal.hpp"
1516#include " duckdb/main/connection.hpp"
@@ -19,6 +20,8 @@ DUCKDB_INCLUDES_END
1920using namespace duckdb ;
2021using vortex::CData;
2122using vortex::IntoErrString;
23+ constexpr column_t COLUMN_IDENTIFIER_FILE_INDEX = MultiFileReader::COLUMN_IDENTIFIER_FILE_INDEX;
24+ constexpr column_t COLUMN_IDENTIFIER_FILE_ROW_NUMBER = MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER;
2225
2326struct CTableFunctionInfo final : TableFunctionInfo {
2427 explicit CTableFunctionInfo (const duckdb_vx_tfunc_vtab_t &vtab) : vtab(vtab) {
@@ -334,21 +337,50 @@ extern "C" void duckdb_vx_tfunc_bind_result_add_column(duckdb_vx_tfunc_bind_resu
334337 result.return_types .emplace_back (logical_type);
335338}
336339
337- OperatorPartitionData c_get_partition_data (ClientContext &, TableFunctionGetPartitionInput &input) {
338- if (input.partition_info .RequiresPartitionColumns ()) {
339- throw InternalException (" TableScan::GetPartitionData: partition columns not supported" );
340- }
340+ /* *
341+ * Called at planning time to determine whether data is partitioned by a
342+ * given set of columns. Requested columns are GROUP BY parameters i.e. columns
343+ * over which the query aggregates.
344+ */
345+ TablePartitionInfo get_partition_info (ClientContext &, TableFunctionPartitionInput &input) {
346+ const vector<column_t > &ids = input.partition_ids ;
347+ // Our data is partitioned by array exporters. Each exporter processes a
348+ // single Array which belongs to a single file. If data is partitioned only
349+ // by file_index, there is one unique value for an Array. Otherwise there
350+ // may be multiple values.
351+ return (ids.size () == 1 && ids[0 ] == COLUMN_IDENTIFIER_FILE_INDEX)
352+ ? TablePartitionInfo::SINGLE_VALUE_PARTITIONS
353+ : TablePartitionInfo::NOT_PARTITIONED;
354+ }
355+
356+ /* *
357+ * Duckdb requests this function after exporting the chunk. We answer with
358+ * partition_index we have exported as well as information about constant
359+ * columns in this partition. As data is partitioned by array exporters, in
360+ * each partition ~ exported array file_index is constant.
361+ */
362+ OperatorPartitionData get_partition_data (ClientContext &, TableFunctionGetPartitionInput &input) {
341363 auto &bind = input.bind_data ->Cast <CTableBindData>();
342364 void *const ffi_bind = bind.ffi_data ->DataPtr ();
343365 void *const ffi_global = input.global_state ->Cast <CTableGlobalData>().ffi_data ->DataPtr ();
344366 void *const ffi_local = input.local_state ->Cast <CTableLocalData>().ffi_data ->DataPtr ();
345-
346- duckdb_vx_error error_out = nullptr ;
347- const idx_t batch_index = bind.info .vtab .get_partition_data (ffi_bind, ffi_global, ffi_local, &error_out);
348- if (error_out) {
349- throw InvalidInputException (IntoErrString (error_out));
367+ duckdb_vx_partition_data partition_data;
368+ bind.info .vtab .get_partition_data (ffi_bind, ffi_global, ffi_local, &partition_data);
369+
370+ OperatorPartitionData out (partition_data.partition_index );
371+
372+ // file_index_column_pos may be INVALID_IDX, but column_index will never
373+ // be INVALID_IDX, so we can compare directly
374+ for (const column_t column_index : input.partition_info .partition_columns ) {
375+ if (column_index == partition_data.file_index_column_pos ) {
376+ out.partition_data .emplace_back (Value::UBIGINT (partition_data.file_index ));
377+ } else {
378+ throw InternalException (StringUtil::Format (
379+ " get_partition_data: requested column_index %d is not constant for given partition" ,
380+ column_index));
381+ }
350382 }
351- return OperatorPartitionData (batch_index) ;
383+ return out ;
352384}
353385
354386extern " C" void duckdb_vx_string_map_insert (duckdb_vx_string_map map, const char *key, const char *value) {
@@ -377,21 +409,30 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d
377409
378410 tf.projection_pushdown = true ;
379411 tf.filter_pushdown = true ;
380- // We can prune out filter columns that are unused in the remainder of the query plan.
381- // e.g. in "SELECT i FROM tbl WHERE j = 42" j does not leave Vortex table function.
382412 tf.filter_prune = true ;
383413 tf.sampling_pushdown = false ;
384- tf.late_materialization = false ;
385414
386415 tf.pushdown_complex_filter = c_pushdown_complex_filter;
387416 tf.cardinality = c_cardinality;
388- tf.get_partition_data = c_get_partition_data;
417+ tf.get_partition_info = get_partition_info;
418+ tf.get_partition_data = get_partition_data;
389419 tf.to_string = c_to_string;
390420 tf.table_scan_progress = c_table_scan_progress;
391421 tf.statistics = c_statistics;
392422
423+ tf.late_materialization = true ;
424+ // Columns that uniquely identify a row for deferred re-fetch in a multi
425+ // file scan: (file index, row number in file).
426+ tf.get_row_id_columns = [](auto &, auto ) -> vector<column_t > {
427+ return {COLUMN_IDENTIFIER_FILE_INDEX, COLUMN_IDENTIFIER_FILE_ROW_NUMBER};
428+ };
429+
393430 tf.get_virtual_columns = [](auto &, auto ) -> virtual_column_map_t {
394- return {{COLUMN_IDENTIFIER_EMPTY, TableColumn (" " , LogicalTypeId::BOOLEAN)}};
431+ return {
432+ {COLUMN_IDENTIFIER_EMPTY, {" " , LogicalTypeId::BOOLEAN}},
433+ {COLUMN_IDENTIFIER_FILE_INDEX, {" file_index" , LogicalType::UBIGINT}},
434+ {COLUMN_IDENTIFIER_FILE_ROW_NUMBER, {" file_row_number" , LogicalType::BIGINT}},
435+ };
395436 };
396437
397438 tf.arguments .resize (vtab->parameter_count );
0 commit comments