Skip to content

Commit 5102339

Browse files
committed
Feature: PAX support VEC executor
In the vectorized executor, we need to return the batch of rows(for the single column). In PAX, data needs to be transformed to record batch. This is because the organize of column data is different with record batch. - Fixed-length columns: PAX will not pad null datum by length. But record batch requires null padding by length. - Non-fixed-length columns: PAX will store the datum header and use length stream. Record batch requires removing the header and using offset array
1 parent 7b86282 commit 5102339

13 files changed

Lines changed: 1793 additions & 11 deletions

File tree

contrib/pax_storage/CMakeLists.txt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,17 @@ else()
5656
SET(BUILD_GTEST OFF)
5757
endif(ENBALE_DEBUG)
5858

59+
# Vec options
60+
option(VEC_BUILD "Build pax vectorization version" OFF)
61+
set(VEC_HOME "" CACHE STRING "Path to vectorization home")
62+
if (VEC_BUILD)
63+
64+
if("${VEC_HOME}" STREQUAL "")
65+
message(FATAL_ERROR "No found vectorization home setting. Using -DVEC_HOME to spec vectorization home")
66+
endif()
67+
68+
set(CBDB_ROOT_INCLUDE_DIR ${PG_INCLUDEDIR})
69+
ADD_DEFINITIONS(-DVEC_BUILD)
70+
71+
endif(VEC_BUILD)
5972
add_subdirectory(src/cpp)

contrib/pax_storage/src/cpp/CMakeLists.txt

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,9 @@ set(pax_catalog_src
173173
catalog/micro_partition_stats.cc
174174
catalog/pax_aux_table.cc)
175175

176+
set(pax_vec_src
177+
storage/vec/pax_vec_adapter.cc)
178+
176179
link_directories($ENV{GPHOME}/lib)
177180

178181
if(BUILD_PAX_FORMAT)
@@ -185,14 +188,11 @@ set_target_properties(paxformat PROPERTIES
185188
OUTPUT_NAME paxformat)
186189
add_dependencies(paxformat generate_protobuf)
187190

188-
189191
# export headers
190192
set(PAX_COMM_HEADERS
191193
comm/cbdb_api.h
192194
)
193195

194-
195-
196196
## install dynamic libraray
197197
install(TARGETS paxformat
198198
LIBRARY DESTINATION ${CMAKE_INSTALL_PREFIX}/lib)
@@ -212,10 +212,10 @@ install(FILES ${PAX_COMM_HEADERS}
212212
)
213213

214214
else()
215-
add_library(pax SHARED ${orc_proto_src} ${pax_proto_src} ${pax_storage_src} ${stats_proto_src} ${pax_exceptions_src} ${pax_access_src} ${pax_comm_src} ${pax_catalog_src})
215+
add_library(pax SHARED ${orc_proto_src} ${pax_proto_src} ${pax_storage_src} ${stats_proto_src} ${pax_exceptions_src} ${pax_access_src} ${pax_comm_src} ${pax_catalog_src} ${pax_vec_src})
216216
set_target_properties(pax PROPERTIES
217217
OUTPUT_NAME pax)
218-
target_include_directories(pax PUBLIC ${ZTSD_HEADER} ${CMAKE_CURRENT_SOURCE_DIR} "${CBDB_INCLUDE_DIR}")
218+
target_include_directories(pax PUBLIC ${ZTSD_HEADER} ${CMAKE_CURRENT_SOURCE_DIR} ${CBDB_INCLUDE_DIR})
219219
target_link_libraries(pax PUBLIC uuid orc_protobuf zstd z postgres)
220220
add_dependencies(pax generate_protobuf)
221221
add_dependencies(pax stats_generate_protobuf)
@@ -224,3 +224,20 @@ add_custom_command(TARGET pax POST_BUILD
224224
copy_if_different $<TARGET_FILE:pax> ${CMAKE_CURRENT_SOURCE_DIR}/../data/pax.so)
225225
endif(BUILD_PAX_FORMAT)
226226

227+
# vec build
228+
if (VEC_BUILD)
229+
set(VEC_HEADER ${VEC_HOME}/src/include/)
230+
231+
find_package(PkgConfig REQUIRED)
232+
pkg_check_modules(GLIB REQUIRED glib-2.0)
233+
234+
target_include_directories(pax PRIVATE
235+
${VEC_HEADER} # for utils/tuptable_vec.h
236+
${CBDB_ROOT_INCLUDE_DIR} # for arrow-glib/arrow-glib.h and otehr arrow interface
237+
${GLIB_INCLUDE_DIRS} # for glib-object.h
238+
)
239+
if(BUILD_GTEST)
240+
target_include_directories(test_main PRIVATE ${VEC_HEADER} ${CBDB_ROOT_INCLUDE_DIR} ${GLIB_INCLUDE_DIRS})
241+
endif(BUILD_GTEST)
242+
target_link_libraries(pax PRIVATE arrow)
243+
endif(VEC_BUILD)

contrib/pax_storage/src/cpp/access/pax_scanner.cc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@ TableScanDesc PaxScanDesc::BeginScan(Relation relation, Snapshot snapshot,
6767
new TableReader(micro_partition_reader, std::move(iter), reader_options);
6868
desc->reader_->Open();
6969

70+
#ifdef VEC_BUILD
71+
if (flags & (1 << 12)) {
72+
desc->vec_adapter_ = new VecAdapter(cbdb::RelationGetTupleDesc(relation));
73+
}
74+
#endif
75+
7076
MemoryContextSwitchTo(old_ctx);
7177
return &desc->rs_base_;
7278
}
@@ -81,6 +87,9 @@ void PaxScanDesc::EndScan(TableScanDesc scan) {
8187
delete desc->reader_;
8288
if (desc->filter_) delete desc->filter_;
8389

90+
#ifdef VEC_BUILD
91+
delete desc->vec_adapter_;
92+
#endif
8493
// TODO(jiaqizho): please double check with abort transaction @gongxun
8594
Assert(desc->memory_context_);
8695
cbdb::MemoryCtxDelete(desc->memory_context_);
@@ -144,7 +153,18 @@ bool PaxScanDesc::ScanGetNextSlot(TableScanDesc scan, TupleTableSlot *slot) {
144153

145154
CTupleSlot cslot(slot);
146155
old_ctx = MemoryContextSwitchTo(desc->memory_context_);
156+
157+
#ifdef VEC_BUILD
158+
// TODO(vec): replace (1 << 12), and check other place
159+
if (desc->rs_base_.rs_flags & (1 << 12)) {
160+
ok = desc->reader_->ReadVecTuple(&cslot, desc->vec_adapter_);
161+
MemoryContextSwitchTo(old_ctx);
162+
return ok;
163+
}
164+
#endif
165+
147166
ok = desc->reader_->ReadTuple(&cslot);
167+
148168
MemoryContextSwitchTo(old_ctx);
149169
return ok;
150170
}

contrib/pax_storage/src/cpp/access/pax_scanner.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
#include "storage/pax.h"
66
#include "storage/pax_filter.h"
7-
7+
#ifdef VEC_BUILD
8+
#include "storage/vec/pax_vec_adapter.h"
9+
#endif
810
namespace pax {
911

1012
class PaxScanDesc {
@@ -62,6 +64,9 @@ class PaxScanDesc {
6264

6365
// filter used to do column projection
6466
PaxFilter *filter_ = nullptr;
67+
#ifdef VEC_BUILD
68+
VecAdapter *vec_adapter_ = nullptr;
69+
#endif
6570
}; // class PaxScanDesc
6671

6772
} // namespace pax

contrib/pax_storage/src/cpp/comm/cbdb_wrappers.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,12 @@ int cbdb::RelationGetAttributesNumber(Relation rel) {
317317
CBDB_WRAP_END;
318318
}
319319

320+
TupleDesc cbdb::RelationGetTupleDesc(Relation rel) {
321+
CBDB_WRAP_START;
322+
{ return RelationGetDescr(rel); }
323+
CBDB_WRAP_END;
324+
}
325+
320326
bool cbdb::ExtractcolumnsFromNode(Node *expr, bool *cols, AttrNumber natts) {
321327
CBDB_WRAP_START;
322328
{ return extractcolumns_from_node(expr, cols, natts); }

contrib/pax_storage/src/cpp/comm/cbdb_wrappers.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ std::string BuildPaxDirectoryPath(RelFileNode rd_node, BackendId rd_backend);
139139

140140
int RelationGetAttributesNumber(Relation rel);
141141

142+
TupleDesc RelationGetTupleDesc(Relation rel);
143+
142144
bool ExtractcolumnsFromNode(Node *expr, bool *cols, AttrNumber natts);
143145

144146
std::string BuildPaxFilePath(Relation rel, const std::string &block_id);

contrib/pax_storage/src/cpp/storage/columns/pax_column.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ std::pair<char *, size_t> PaxNonFixedColumn::GetBuffer(size_t position) {
333333

334334
std::pair<char *, size_t> PaxNonFixedColumn::GetRangeBuffer(size_t start_pos,
335335
size_t len) {
336-
CBDB_CHECK((start_pos + len) <= GetNonNullRows() && len > 0,
336+
CBDB_CHECK((start_pos + len) <= GetNonNullRows() && len >= 0,
337337
cbdb::CException::ExType::kExTypeOutOfRange);
338338
size_t range_len = 0;
339339

contrib/pax_storage/src/cpp/storage/pax.cc

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,7 @@ TableReader::TableReader(
127127
is_empty_(true),
128128
reader_options_(options),
129129
table_no_(0),
130-
table_index_(0) {
131-
}
130+
table_index_(0) {}
132131

133132
TableReader::~TableReader() {
134133
if (reader_) {
@@ -163,8 +162,11 @@ void TableReader::Close() {
163162
if (is_empty_) {
164163
return;
165164
}
166-
Assert(reader_);
167-
reader_->Close();
165+
166+
if (reader_) {
167+
reader_->Close();
168+
reader_ = nullptr;
169+
}
168170
}
169171

170172
bool TableReader::ReadTuple(CTupleSlot *slot) {
@@ -188,6 +190,59 @@ bool TableReader::ReadTuple(CTupleSlot *slot) {
188190
return true;
189191
}
190192

193+
#ifdef VEC_BUILD
194+
// TODO(jiaqizho): should remove this method but provider a
195+
// new mirco partition reader which include adapter
196+
bool TableReader::ReadVecTuple(CTupleSlot *slot, VecAdapter *adapter) {
197+
size_t flush_nums_of_rows = 0;
198+
199+
if (is_empty_) {
200+
return false;
201+
}
202+
203+
// Three conditions indicate that there is no data left
204+
// - `VecAdapter` must be set
205+
// - `VecAdapter` has consumed all buffer data
206+
// - current have not next reader
207+
if (adapter->IsInitialized() && adapter->IsEnd() && !iterator_->HasNext()) {
208+
return false;
209+
}
210+
211+
Assert(reader_);
212+
213+
// `No set` means that the first reader not setup in apapter
214+
if (!adapter->IsInitialized()) {
215+
PaxColumns *pax_columns =
216+
reinterpret_cast<OrcIteratorReader *>(reader_)->GetAllColumns();
217+
adapter->SetDataSource(pax_columns);
218+
}
219+
220+
// `VecAdapter` has consumed all buffer data
221+
if (adapter->IsEnd()) {
222+
if (iterator_->HasNext()) {
223+
reader_->Close();
224+
OpenFile();
225+
226+
PaxColumns *pax_columns =
227+
reinterpret_cast<OrcIteratorReader *>(reader_)->GetAllColumns();
228+
adapter->SetDataSource(pax_columns);
229+
}
230+
}
231+
232+
auto ok = adapter->AppendToVecBuffer();
233+
CBDB_CHECK(ok, cbdb::CException::ExType::kExTypeLogicError);
234+
235+
flush_nums_of_rows = adapter->FlushVecBuffer(slot);
236+
Assert(flush_nums_of_rows);
237+
238+
num_tuples_ += flush_nums_of_rows;
239+
slot->SetTableNo(table_no_);
240+
slot->SetBlockNumber(current_block_number_);
241+
slot->StoreVirtualTuple();
242+
return true;
243+
}
244+
#endif // VEC_BUILD
245+
191246
void TableReader::OpenFile() {
192247
Assert(iterator_->HasNext());
193248
auto it = iterator_->Next();

contrib/pax_storage/src/cpp/storage/pax.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
#include "storage/paxc_block_map_manager.h"
2020
#include "storage/strategy.h"
2121

22+
#ifdef VEC_BUILD
23+
#include "storage/vec/pax_vec_adapter.h"
24+
#endif
25+
2226
namespace pax {
2327

2428
class TableWriter {
@@ -80,6 +84,11 @@ class TableReader final {
8084

8185
void Close();
8286

87+
#ifdef VEC_BUILD
88+
// std::unique_ptr<VecAdapter> here ?
89+
bool ReadVecTuple(CTupleSlot *slot, VecAdapter *adapter);
90+
#endif
91+
8392
bool ReadTuple(CTupleSlot *slot);
8493

8594
// deprecate:
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#pragma once
2+
3+
#ifdef VEC_BUILD
4+
5+
// FIXME(jiaqizho): There marco defined in datatime.h
6+
// which include in `cbdb_api.h`. In pax, we always need
7+
// include `cbdb_api.h`.
8+
9+
#undef RESERV
10+
#undef MONTH
11+
#undef YEAR
12+
#undef DAY
13+
#undef JULIAN
14+
#undef TZ
15+
#undef DTZ
16+
#undef DYNTZ
17+
#undef IGNORE_DTF
18+
#undef AMPM
19+
#undef HOUR
20+
#undef MINUTE
21+
#undef SECOND
22+
#undef MILLISECOND
23+
#undef MICROSECOND
24+
#undef IsPowerOf2
25+
26+
#pragma GCC diagnostic push
27+
#pragma GCC diagnostic ignored "-Wunused-variable"
28+
#pragma GCC diagnostic ignored "-Wunused-but-set-variable"
29+
30+
#include <arrow/array/array_binary.h>
31+
#include <arrow/array/array_nested.h>
32+
#include <arrow/array/array_primitive.h>
33+
#include <arrow/c/abi.h>
34+
#include <arrow/c/bridge.h>
35+
#include <arrow/util/bit_util.h>
36+
37+
#pragma GCC diagnostic pop
38+
39+
#define RESERV 0
40+
#define MONTH 1
41+
#define YEAR 2
42+
#define DAY 3
43+
#define JULIAN 4
44+
#define TZ 5
45+
#define DTZ 6
46+
#define DYNTZ 7
47+
#define IGNORE_DTF 8
48+
#define AMPM 9
49+
#define HOUR 10
50+
#define MINUTE 11
51+
#define SECOND 12
52+
#define MILLISECOND 13
53+
#define MICROSECOND 14
54+
55+
// NOLINTNEXTLINE
56+
#define IsPowerOf2(x) (x > 0 && ((x) & ((x)-1)) == 0)
57+
58+
#endif // VEC_BUILD

0 commit comments

Comments
 (0)