-
Notifications
You must be signed in to change notification settings - Fork 149
Expand file tree
/
Copy pathtable_function.cpp
More file actions
477 lines (410 loc) · 19.1 KB
/
table_function.cpp
File metadata and controls
477 lines (410 loc) · 19.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors
#include "duckdb_vx/data.hpp"
#include "duckdb_vx/duckdb_diagnostics.h"
#include "duckdb_vx/error.hpp"
#include "duckdb_vx/table_function.h"
DUCKDB_INCLUDES_BEGIN
#include "duckdb.h"
#include "duckdb/catalog/catalog.hpp"
#include "duckdb/common/insertion_order_preserving_map.hpp"
#include "duckdb/common/multi_file/multi_file_reader.hpp"
#include "duckdb/function/table_function.hpp"
#include "duckdb/main/capi/capi_internal.hpp"
#include "duckdb/main/connection.hpp"
#include "duckdb/parser/parsed_data/create_table_function_info.hpp"
DUCKDB_INCLUDES_END
using namespace duckdb;
using vortex::CData;
using vortex::IntoErrString;
constexpr column_t COLUMN_IDENTIFIER_FILE_INDEX = MultiFileReader::COLUMN_IDENTIFIER_FILE_INDEX;
constexpr column_t COLUMN_IDENTIFIER_FILE_ROW_NUMBER = MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER;
struct CTableFunctionInfo final : TableFunctionInfo {
explicit CTableFunctionInfo(const duckdb_vx_tfunc_vtab_t &vtab) : vtab(vtab) {
}
const duckdb_vx_tfunc_vtab_t vtab;
};
struct CTableBindData final : FunctionData {
CTableBindData(const CTableFunctionInfo &info,
unique_ptr<CData> ffi_data_p,
const vector<LogicalType> &types)
: info(info), ffi_data(std::move(ffi_data_p)), types(types) {
}
unique_ptr<FunctionData> Copy() const override;
bool Equals(const FunctionData &other_base) const override;
// Table function info lives for as long as TableFunction is alive as it's
// stored inside TableFunction, so it's safe to store a reference.
const CTableFunctionInfo &info;
unique_ptr<CData> ffi_data;
vector<LogicalType> types;
};
unique_ptr<FunctionData> CTableBindData::Copy() const {
duckdb_vx_error error_out = nullptr;
const auto copied_ffi_data = info.vtab.bind_data_clone(ffi_data->DataPtr(), &error_out);
if (error_out) {
throw BinderException(IntoErrString(error_out));
}
auto ffi_data_p = unique_ptr<CData>(reinterpret_cast<CData *>(copied_ffi_data));
return make_uniq<CTableBindData>(info, std::move(ffi_data_p), types);
}
bool CTableBindData::Equals(const FunctionData &other_base) const {
const CTableBindData &other = other_base.Cast<CTableBindData>();
// if "types" are different, "ffi_data" would also be different as it
// contains types inside, so omit "types" from comparison.
return &info == &other.info && ffi_data.get() == other.ffi_data.get();
}
struct CTableGlobalData final : GlobalTableFunctionState {
explicit CTableGlobalData(unique_ptr<CData> ffi_data) : ffi_data(std::move(ffi_data)) {
}
idx_t MaxThreads() const override {
return GlobalTableFunctionState::MAX_THREADS;
}
unique_ptr<CData> ffi_data;
};
struct CTableLocalData final : LocalTableFunctionState {
explicit CTableLocalData(unique_ptr<CData> ffi_data) : ffi_data(std::move(ffi_data)) {
}
unique_ptr<CData> ffi_data;
};
double c_table_scan_progress(ClientContext &context,
const FunctionData *bind_data,
const GlobalTableFunctionState *global_state) {
auto &bind = bind_data->Cast<CTableBindData>();
duckdb_client_context c_ctx = reinterpret_cast<duckdb_client_context>(&context);
void *const c_bind_data = bind.ffi_data->DataPtr();
void *const c_global_state = global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
return bind.info.vtab.table_scan_progress(c_ctx, c_bind_data, c_global_state);
}
static Value &UnwrapValue(duckdb_value value) {
return *(reinterpret_cast<Value *>(value));
}
unique_ptr<BaseStatistics> numeric_stats(duckdb_column_statistics &stats, LogicalType type) {
BaseStatistics out = StringStats::CreateUnknown(type);
if (stats.min) {
NumericStats::SetMin(out, UnwrapValue(stats.min));
duckdb_destroy_value(&stats.min);
}
if (stats.max) {
NumericStats::SetMax(out, UnwrapValue(stats.max));
duckdb_destroy_value(&stats.max);
}
if (!stats.has_null) {
out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES);
}
return out.ToUnique();
}
unique_ptr<BaseStatistics> string_stats(duckdb_column_statistics &stats, LogicalType type) {
BaseStatistics out = StringStats::CreateUnknown(type);
if (stats.min) {
StringStats::SetMin(out, StringValue::Get(UnwrapValue(stats.min)));
duckdb_destroy_value(&stats.min);
}
if (stats.max) {
StringStats::SetMax(out, StringValue::Get(UnwrapValue(stats.max)));
duckdb_destroy_value(&stats.max);
}
if (stats.max_string_length >> 63) {
StringStats::SetMaxStringLength(out, uint32_t(stats.max_string_length));
}
if (!stats.has_null) {
out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES);
}
return out.ToUnique();
}
unique_ptr<BaseStatistics> base_stats(duckdb_column_statistics &stats, LogicalType type) {
BaseStatistics out = StringStats::CreateUnknown(type);
if (!stats.has_null) {
out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES);
}
return out.ToUnique();
}
unique_ptr<BaseStatistics>
c_statistics(ClientContext &context, const FunctionData *bind_data, column_t column_index) {
if (IsVirtualColumn(column_index)) {
return {};
}
const auto &bind = bind_data->Cast<CTableBindData>();
void *const ffi_bind = bind.ffi_data->DataPtr();
duckdb_client_context c_ctx = reinterpret_cast<duckdb_client_context>(&context);
duckdb_column_statistics statistics = {};
if (!bind.info.vtab.statistics(c_ctx, ffi_bind, column_index, &statistics)) {
return {};
}
const LogicalType type = bind.types[column_index];
switch (type.id()) {
case LogicalTypeId::BOOLEAN:
case LogicalTypeId::TINYINT:
case LogicalTypeId::SMALLINT:
case LogicalTypeId::INTEGER:
case LogicalTypeId::BIGINT:
case LogicalTypeId::FLOAT:
case LogicalTypeId::DOUBLE:
case LogicalTypeId::UTINYINT:
case LogicalTypeId::USMALLINT:
case LogicalTypeId::UINTEGER:
case LogicalTypeId::UBIGINT:
case LogicalTypeId::UHUGEINT:
case LogicalTypeId::HUGEINT: {
return numeric_stats(statistics, type);
}
case LogicalTypeId::VARCHAR:
case LogicalTypeId::BLOB: {
return string_stats(statistics, type);
}
case LogicalTypeId::STRUCT: {
// TODO(myrrc)
// Duckdb's has_null has a different semantics for structs.
// If we propagate our has_null, this breaks Duckdb optimizer.
// You can reproduce it in struct.slt test in vortex-sqllogictests:
return {};
}
default:
return base_stats(statistics, type);
}
}
struct CTableBindResult {
vector<LogicalType> &return_types;
vector<string> &names;
};
/**
* Called for every new query. For example, if there is a VIEW over *.vortex,
* and after a query another file is added matching the glob, for second query
* bind() will be called again.
*/
unique_ptr<FunctionData> c_bind(ClientContext &context,
TableFunctionBindInput &input,
vector<LogicalType> &return_types,
vector<string> &names) {
const auto &info = input.table_function.function_info->Cast<CTableFunctionInfo>();
CTableBindResult result = {return_types, names};
duckdb_vx_error error_out = nullptr;
auto ctx = reinterpret_cast<duckdb_client_context>(&context);
auto ffi_bind_data = info.vtab.bind(ctx,
reinterpret_cast<duckdb_vx_tfunc_bind_input>(&input),
reinterpret_cast<duckdb_vx_tfunc_bind_result>(&result),
&error_out);
if (error_out) {
throw BinderException(IntoErrString(error_out));
}
auto cdata = unique_ptr<CData>(reinterpret_cast<CData *>(ffi_bind_data));
return make_uniq<CTableBindData>(info, std::move(cdata), return_types);
}
unique_ptr<GlobalTableFunctionState> c_init_global(ClientContext &context, TableFunctionInitInput &input) {
const CTableBindData &bind = input.bind_data->Cast<CTableBindData>();
duckdb_vx_tfunc_init_input ffi_input = {
.bind_data = bind.ffi_data->DataPtr(),
.column_ids = input.column_ids.data(),
.column_ids_count = input.column_ids.size(),
.projection_ids = input.projection_ids.data(),
.projection_ids_count = input.projection_ids.size(),
.filters = reinterpret_cast<duckdb_vx_table_filter_set>(input.filters.get()),
.client_context = reinterpret_cast<duckdb_client_context>(&context),
};
duckdb_vx_error error_out = nullptr;
duckdb_vx_data ffi_global_data = bind.info.vtab.init_global(&ffi_input, &error_out);
if (error_out) {
throw BinderException(IntoErrString(error_out));
}
auto cdata = unique_ptr<CData>(reinterpret_cast<CData *>(ffi_global_data));
return make_uniq<CTableGlobalData>(std::move(cdata));
}
unique_ptr<LocalTableFunctionState> c_init_local(ExecutionContext &context,
TableFunctionInitInput &input,
GlobalTableFunctionState *global_state) {
const auto &bind = input.bind_data->Cast<CTableBindData>();
void *const ffi_global = global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
duckdb_vx_tfunc_init_input ffi_input = {
.bind_data = bind.ffi_data->DataPtr(),
.column_ids = input.column_ids.data(),
.column_ids_count = input.column_ids.size(),
.projection_ids = input.projection_ids.data(),
.projection_ids_count = input.projection_ids.size(),
.filters = reinterpret_cast<duckdb_vx_table_filter_set>(input.filters.get()),
.client_context = reinterpret_cast<duckdb_client_context>(&context),
};
duckdb_vx_error error_out = nullptr;
duckdb_vx_data ffi_local_data = bind.info.vtab.init_local(&ffi_input, ffi_global, &error_out);
if (error_out) {
throw BinderException(IntoErrString(error_out));
}
auto cdata = unique_ptr<CData>(reinterpret_cast<CData *>(ffi_local_data));
return make_uniq<CTableLocalData>(std::move(cdata));
}
void c_function(ClientContext &context, TableFunctionInput &input, DataChunk &output) {
const auto &bind = input.bind_data->Cast<CTableBindData>();
duckdb_client_context ffi_ctx = reinterpret_cast<duckdb_client_context>(&context);
void *const ffi_bind = bind.ffi_data->DataPtr();
void *const ffi_global = input.global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
void *const ffi_local = input.local_state->Cast<CTableLocalData>().ffi_data->DataPtr();
duckdb_data_chunk chunk = reinterpret_cast<duckdb_data_chunk>(&output);
duckdb_vx_error error_out = nullptr;
bind.info.vtab.function(ffi_ctx, ffi_bind, ffi_global, ffi_local, chunk, &error_out);
if (error_out) {
throw InvalidInputException(IntoErrString(error_out));
}
}
/*
* Table filter pushdown is used twice in duckdb:
*
* 1. Planning time: duckdb uses file metadata (filename, hive_partitioning
* options in MultiFileReader) to prune files based on filename or hive
* partition data i.e. month, year, etc. This happens before any file IO.
* We don't use this because we have own file-level pruning in
* FileStatsLayoutReader.
*
* 2. Scan time. As we have filter_pushdown = true, filter expressions are
* converted to TableFilterSet and pushed down to Vortex. We convert them to
* vortex expressions and use as filter options while initializing the scan.
*/
void c_pushdown_complex_filter(ClientContext &,
LogicalGet &,
FunctionData *bind_data,
vector<unique_ptr<Expression>> &filters) {
auto &bind = bind_data->Cast<CTableBindData>();
void *const ffi_bind = bind.ffi_data->DataPtr();
duckdb_vx_error error_out = nullptr;
for (auto iter = filters.begin(); iter != filters.end();) {
duckdb_vx_expr ffi_expr = reinterpret_cast<duckdb_vx_expr>(iter->get());
const bool pushed = bind.info.vtab.pushdown_complex_filter(ffi_bind, ffi_expr, &error_out);
if (error_out) {
throw BinderException(IntoErrString(error_out));
}
iter = pushed ? filters.erase(iter) : std::next(iter);
}
}
unique_ptr<NodeStatistics> c_cardinality(ClientContext &, const FunctionData *bind_data) {
auto &bind = bind_data->Cast<CTableBindData>();
duckdb_vx_node_statistics stats = {};
bind.info.vtab.cardinality(bind.ffi_data->DataPtr(), &stats);
auto out = make_uniq<NodeStatistics>();
out->has_estimated_cardinality = stats.has_estimated_cardinality;
out->estimated_cardinality = stats.estimated_cardinality;
out->has_max_cardinality = stats.has_max_cardinality;
out->max_cardinality = stats.max_cardinality;
return out;
}
extern "C" duckdb_value duckdb_vx_tfunc_bind_input_get_parameter(duckdb_vx_tfunc_bind_input ffi_input,
size_t index) {
D_ASSERT(ffi_input);
const TableFunctionBindInput &input = *reinterpret_cast<TableFunctionBindInput *>(ffi_input);
return reinterpret_cast<duckdb_value>(new Value(input.inputs[index]));
}
extern "C" void duckdb_vx_tfunc_bind_result_add_column(duckdb_vx_tfunc_bind_result ffi_result,
const char *name_str,
size_t name_len,
duckdb_logical_type ffi_type) {
D_ASSERT(ffi_result);
D_ASSERT(name_str);
D_ASSERT(ffi_type);
const CTableBindResult &result = *reinterpret_cast<CTableBindResult *>(ffi_result);
const LogicalType logical_type = *reinterpret_cast<LogicalType *>(ffi_type);
result.names.emplace_back(name_str, name_len);
result.return_types.emplace_back(logical_type);
}
/**
* Called at planning time to determine whether data is partitioned by a
* given set of columns. Requested columns are GROUP BY parameters i.e. columns
* over which the query aggregates.
*/
TablePartitionInfo get_partition_info(ClientContext &, TableFunctionPartitionInput &input) {
const vector<column_t> &ids = input.partition_ids;
// Our data is partitioned by array exporters. Each exporter processes a
// single Array which belongs to a single file. If data is partitioned only
// by file_index, there is one unique value for an Array. Otherwise there
// may be multiple values.
return (ids.size() == 1 && ids[0] == COLUMN_IDENTIFIER_FILE_INDEX)
? TablePartitionInfo::SINGLE_VALUE_PARTITIONS
: TablePartitionInfo::NOT_PARTITIONED;
}
/**
* Duckdb requests this function after exporting the chunk. We answer with
* partition_index we have exported as well as information about constant
* columns in this partition. As data is partitioned by array exporters, in
* each partition ~ exported array file_index is constant.
*/
OperatorPartitionData get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) {
auto &bind = input.bind_data->Cast<CTableBindData>();
void *const ffi_bind = bind.ffi_data->DataPtr();
void *const ffi_global = input.global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
void *const ffi_local = input.local_state->Cast<CTableLocalData>().ffi_data->DataPtr();
duckdb_vx_partition_data partition_data;
bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &partition_data);
OperatorPartitionData out(partition_data.partition_index);
// file_index_column_pos may be INVALID_IDX, but column_index will never
// be INVALID_IDX, so we can compare directly
for (const column_t column_index : input.partition_info.partition_columns) {
if (column_index == partition_data.file_index_column_pos) {
out.partition_data.emplace_back(Value::UBIGINT(partition_data.file_index));
} else {
throw InternalException(StringUtil::Format(
"get_partition_data: requested column_index %d is not constant for given partition",
column_index));
}
}
return out;
}
extern "C" void duckdb_vx_string_map_insert(duckdb_vx_string_map map, const char *key, const char *value) {
D_ASSERT(map);
D_ASSERT(key);
D_ASSERT(value);
reinterpret_cast<InsertionOrderPreservingMap<string> *>(map)->insert(key, value);
}
InsertionOrderPreservingMap<string> c_to_string(TableFunctionToStringInput &input) {
InsertionOrderPreservingMap<string> result;
duckdb_vx_string_map ffi_map = reinterpret_cast<duckdb_vx_string_map>(&result);
void *const ffi_bind = input.bind_data->Cast<CTableBindData>().ffi_data->DataPtr();
const auto &info = static_cast<CTableFunctionInfo &>(*input.table_function.function_info);
info.vtab.to_string(ffi_bind, ffi_map);
return result;
}
extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const duckdb_vx_tfunc_vtab_t *vtab) {
D_ASSERT(ffi_db);
D_ASSERT(vtab);
const DatabaseWrapper &wrapper = *reinterpret_cast<DatabaseWrapper *>(ffi_db);
DatabaseInstance &db = *wrapper.database->instance;
TableFunction tf(vtab->name, {}, c_function, c_bind, c_init_global, c_init_local);
tf.projection_pushdown = true;
tf.filter_pushdown = true;
tf.filter_prune = true;
tf.sampling_pushdown = false;
tf.pushdown_complex_filter = c_pushdown_complex_filter;
tf.cardinality = c_cardinality;
tf.get_partition_info = get_partition_info;
tf.get_partition_data = get_partition_data;
tf.to_string = c_to_string;
tf.table_scan_progress = c_table_scan_progress;
tf.statistics = c_statistics;
tf.late_materialization = true;
// Columns that uniquely identify a row for deferred re-fetch in a multi
// file scan: (file index, row number in file).
tf.get_row_id_columns = [](auto &, auto) -> vector<column_t> {
return {COLUMN_IDENTIFIER_FILE_INDEX, COLUMN_IDENTIFIER_FILE_ROW_NUMBER};
};
tf.get_virtual_columns = [](auto &, auto) -> virtual_column_map_t {
return {
{COLUMN_IDENTIFIER_EMPTY, {"", LogicalTypeId::BOOLEAN}},
{COLUMN_IDENTIFIER_FILE_INDEX, {"file_index", LogicalType::UBIGINT}},
{COLUMN_IDENTIFIER_FILE_ROW_NUMBER, {"file_row_number", LogicalType::BIGINT}},
};
};
tf.pushdown_expression = [](auto &, auto &, auto &) {
return true;
};
tf.arguments.resize(vtab->parameter_count);
for (size_t i = 0; i < vtab->parameter_count; i++) {
tf.arguments[i] = *reinterpret_cast<LogicalType *>(vtab->parameters[i]);
}
tf.function_info = make_shared_ptr<CTableFunctionInfo>(*vtab);
try {
auto &system_catalog = Catalog::GetSystemCatalog(db);
auto data = CatalogTransaction::GetSystemTransaction(db);
CreateTableFunctionInfo tf_info(tf);
tf_info.on_conflict = OnCreateConflict::ALTER_ON_CONFLICT;
system_catalog.CreateFunction(data, tf_info);
} catch (const std::exception &e) {
ErrorData data(e);
DUCKDB_LOG_ERROR(db, "Failed to create Vortex table function:\t" + data.Message());
return DuckDBError;
}
return DuckDBSuccess;
}