Skip to content

Commit bba60fd

Browse files
committed
Op: Add a new vectorization MicroPartitionReader to replace ReadVecTuple
The vectorized condition in `MicroPartitionReader` will cause complicate the logic. - Remove function `ReadVecTuple` which is specialized logic - Added a new MicroPartitionReader named `PaxVecReader` to adapting read from vectorization version.
1 parent 96d1a16 commit bba60fd

9 files changed

Lines changed: 251 additions & 82 deletions

File tree

contrib/pax_storage/src/cpp/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ set(pax_catalog_src
170170
catalog/pax_aux_table.cc)
171171

172172
set(pax_vec_src
173-
storage/vec/pax_vec_adapter.cc)
173+
storage/vec/pax_vec_adapter.cc
174+
storage/vec/pax_vec_reader.cc)
174175

175176
link_directories($ENV{GPHOME}/lib)
176177

contrib/pax_storage/src/cpp/access/pax_scanner.cc

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ TableScanDesc PaxScanDesc::BeginScan(Relation relation, Snapshot snapshot,
3636
desc->key_ = key;
3737
desc->reused_buffer_ = new DataBuffer<char>(32 * 1024 * 1024); // 32mb
3838
desc->filter_ = filter;
39+
#ifdef VEC_BUILD
40+
if (flags & (1 << 12)) {
41+
desc->vec_adapter_ = new VecAdapter(cbdb::RelationGetTupleDesc(relation));
42+
reader_options.is_vec = true;
43+
reader_options.adapter = desc->vec_adapter_;
44+
}
45+
#endif
3946

4047
// init shared memory
4148
cbdb::InitCommandResource();
@@ -60,12 +67,6 @@ TableScanDesc PaxScanDesc::BeginScan(Relation relation, Snapshot snapshot,
6067
desc->reader_ = new TableReader(std::move(iter), reader_options);
6168
desc->reader_->Open();
6269

63-
#ifdef VEC_BUILD
64-
if (flags & (1 << 12)) {
65-
desc->vec_adapter_ = new VecAdapter(cbdb::RelationGetTupleDesc(relation));
66-
}
67-
#endif
68-
6970
MemoryContextSwitchTo(old_ctx);
7071
return &desc->rs_base_;
7172
}
@@ -147,15 +148,6 @@ bool PaxScanDesc::ScanGetNextSlot(TableScanDesc scan, TupleTableSlot *slot) {
147148
CTupleSlot cslot(slot);
148149
old_ctx = MemoryContextSwitchTo(desc->memory_context_);
149150

150-
#ifdef VEC_BUILD
151-
// TODO(vec): replace (1 << 12), and check other place
152-
if (desc->rs_base_.rs_flags & (1 << 12)) {
153-
ok = desc->reader_->ReadVecTuple(&cslot, desc->vec_adapter_);
154-
MemoryContextSwitchTo(old_ctx);
155-
return ok;
156-
}
157-
#endif
158-
159151
ok = desc->reader_->ReadTuple(&cslot);
160152

161153
MemoryContextSwitchTo(old_ctx);

contrib/pax_storage/src/cpp/storage/micro_partition.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <stdexcept>
99
#include <string>
1010

11+
#include "storage/columns/pax_columns.h"
1112
#include "storage/micro_partition_metadata.h"
1213

1314
namespace pax {
@@ -133,6 +134,22 @@ class MicroPartitionReader {
133134
// is also created during this stage, no matter the map relation is needed or
134135
// not. We may optimize to avoid creating the map relation later.
135136
virtual bool ReadTuple(CTupleSlot *slot) = 0;
137+
138+
protected:
139+
// Allow different MicroPartitionReader shared columns
140+
// but should not let export columns out of micro partition
141+
//
142+
// In MicroPartition writer/reader implementation, all in-memory data should
143+
// be accessed by pax column This is because most of the common logic of
144+
// column operation is done in pax column, such as type mapping, bitwise
145+
// fetch, compression/encoding. At the same time, pax column can also be used
146+
// as a general interface for internal using, because it's zero copy from
147+
// buffer. more details in `storage/columns`
148+
virtual PaxColumns *GetAllColumns() = 0;
149+
#ifdef VEC_BUILD
150+
private:
151+
friend class PaxVecReader;
152+
#endif
136153
};
137154

138155
} // namespace pax

contrib/pax_storage/src/cpp/storage/orc/orc.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,12 @@ class OrcReader : public MicroPartitionReader {
128128

129129
bool ReadTuple(CTupleSlot *cslot) override;
130130

131-
PaxColumns *GetAllColumns();
132-
133131
#ifndef RUN_GTEST
134132
protected: // NOLINT
135133
#endif
136134

135+
PaxColumns *GetAllColumns() override;
136+
137137
orc::proto::StripeFooter ReadStripeWithProjection(
138138
DataBuffer<char> *data_buffer, OrcReader::StripeInformation *stripe_info,
139139
const bool *proj_map, size_t proj_len);

contrib/pax_storage/src/cpp/storage/pax.cc

Lines changed: 12 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
#include "storage/micro_partition_file_factory.h"
1111
#include "storage/micro_partition_metadata.h"
1212

13+
#ifdef VEC_BUILD
14+
#include "storage/vec/pax_vec_reader.h"
15+
#endif
16+
1317
namespace pax {
1418

1519
static std::string GenRandomBlockId() {
@@ -182,65 +186,11 @@ bool TableReader::ReadTuple(CTupleSlot *slot) {
182186
}
183187
OpenFile();
184188
}
185-
num_tuples_++;
186-
slot->SetTableNo(table_no_);
187-
slot->SetBlockNumber(current_block_number_);
188-
slot->StoreVirtualTuple();
189-
return true;
190-
}
191-
192-
#ifdef VEC_BUILD
193-
// TODO(jiaqizho): should remove this method but provider a
194-
// new mirco partition reader which include adapter
195-
bool TableReader::ReadVecTuple(CTupleSlot *slot, VecAdapter *adapter) {
196-
size_t flush_nums_of_rows = 0;
197-
198-
if (is_empty_) {
199-
return false;
200-
}
201-
202-
// Three conditions indicate that there is no data left
203-
// - `VecAdapter` must be set
204-
// - `VecAdapter` has consumed all buffer data
205-
// - current have not next reader
206-
if (adapter->IsInitialized() && adapter->IsEnd() && !iterator_->HasNext()) {
207-
return false;
208-
}
209-
210-
Assert(reader_);
211-
212-
// `No set` means that the first reader not setup in apapter
213-
if (!adapter->IsInitialized()) {
214-
PaxColumns *pax_columns =
215-
reinterpret_cast<OrcReader *>(reader_)->GetAllColumns();
216-
adapter->SetDataSource(pax_columns);
217-
}
218-
219-
// `VecAdapter` has consumed all buffer data
220-
if (adapter->IsEnd()) {
221-
if (iterator_->HasNext()) {
222-
reader_->Close();
223-
OpenFile();
224-
225-
PaxColumns *pax_columns =
226-
reinterpret_cast<OrcReader *>(reader_)->GetAllColumns();
227-
adapter->SetDataSource(pax_columns);
228-
}
229-
}
230-
231-
auto ok = adapter->AppendToVecBuffer();
232-
CBDB_CHECK(ok, cbdb::CException::ExType::kExTypeLogicError);
233-
234-
flush_nums_of_rows = adapter->FlushVecBuffer(slot);
235-
Assert(flush_nums_of_rows);
236-
237-
num_tuples_ += flush_nums_of_rows;
238189
slot->SetTableNo(table_no_);
239190
slot->SetBlockNumber(current_block_number_);
240191
slot->StoreVirtualTuple();
241192
return true;
242193
}
243-
#endif // VEC_BUILD
244194

245195
void TableReader::OpenFile() {
246196
Assert(iterator_->HasNext());
@@ -266,6 +216,14 @@ void TableReader::OpenFile() {
266216

267217
reader_ = new OrcReader(
268218
Singleton<LocalFileSystem>::GetInstance()->Open(options.file_name));
219+
220+
#ifdef VEC_BUILD
221+
if (reader_options_.is_vec) {
222+
Assert(reader_options_.adapter);
223+
reader_ = new PaxVecReader(reader_, reader_options_.adapter);
224+
}
225+
#endif
226+
269227
reader_->Open(options);
270228
}
271229

contrib/pax_storage/src/cpp/storage/pax.h

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ class TableReader final {
7373
// Will not used in TableReader
7474
// But pass into micro partition reader
7575
PaxFilter *filter = nullptr;
76+
#ifdef VEC_BUILD
77+
bool is_vec = false;
78+
VecAdapter *adapter = nullptr;
79+
#endif
7680
};
7781

7882
TableReader(std::unique_ptr<IteratorBase<MicroPartitionMetadata>> &&iterator,
@@ -85,11 +89,6 @@ class TableReader final {
8589

8690
void Close();
8791

88-
#ifdef VEC_BUILD
89-
// std::unique_ptr<VecAdapter> here ?
90-
bool ReadVecTuple(CTupleSlot *slot, VecAdapter *adapter);
91-
#endif
92-
9392
bool ReadTuple(CTupleSlot *slot);
9493

9594
// deprecate:
@@ -104,7 +103,6 @@ class TableReader final {
104103
private:
105104
const std::unique_ptr<IteratorBase<MicroPartitionMetadata>> iterator_;
106105
MicroPartitionReader *reader_ = nullptr;
107-
size_t num_tuples_ = 0;
108106
bool is_empty_ = false;
109107
const ReaderOptions reader_options_;
110108
int current_block_number_ = 0;
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#include "storage/vec/pax_vec_reader.h"
2+
3+
#include "storage/vec/pax_vec_adapter.h"
4+
#ifdef VEC_BUILD
5+
6+
namespace pax {
7+
8+
PaxVecReader::PaxVecReader(MicroPartitionReader *reader, VecAdapter *adapter)
9+
: reader_(reader), adapter_(adapter) {}
10+
11+
PaxVecReader::~PaxVecReader() { delete reader_; }
12+
13+
void PaxVecReader::Open(const ReaderOptions &options) {
14+
reader_->Open(options);
15+
PaxColumns *pax_columns = reader_->GetAllColumns();
16+
adapter_->SetDataSource(pax_columns);
17+
}
18+
19+
void PaxVecReader::Close() { reader_->Close(); }
20+
21+
bool PaxVecReader::ReadTuple(CTupleSlot *cslot) {
22+
if (!adapter_->AppendToVecBuffer()) {
23+
return false;
24+
}
25+
26+
auto flush_nums_of_rows = adapter_->FlushVecBuffer(cslot);
27+
Assert(flush_nums_of_rows);
28+
return true;
29+
}
30+
31+
PaxColumns *PaxVecReader::GetAllColumns() {
32+
CBDB_RAISE(cbdb::CException::ExType::kExTypeLogicError);
33+
}
34+
}; // namespace pax
35+
36+
#endif // VEC_BUILD
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#pragma once
2+
#include "storage/micro_partition.h"
3+
4+
#ifdef VEC_BUILD
5+
6+
namespace pax {
7+
8+
class VecAdapter;
9+
10+
class PaxVecReader : public MicroPartitionReader {
11+
public:
12+
// If enable read tuple from vec reader,
13+
// then OrcReader will be hold by PaxVecReader,
14+
// current MicroPartitionReader lifecycle will be bound to the PaxVecReader)
15+
PaxVecReader(MicroPartitionReader *reader, VecAdapter *adapter);
16+
17+
~PaxVecReader() override;
18+
19+
void Open(const ReaderOptions &options) override;
20+
21+
void Close() override;
22+
23+
bool ReadTuple(CTupleSlot *cslot) override;
24+
25+
protected:
26+
PaxColumns *GetAllColumns() override;
27+
28+
private:
29+
MicroPartitionReader *reader_;
30+
VecAdapter *adapter_;
31+
};
32+
33+
} // namespace pax
34+
35+
#endif // VEC_BUILD

0 commit comments

Comments
 (0)