Skip to content

Commit 96d1a16

Browse files
committed
Op: remove OrcIteratorReader
OrcIteratorReader became no longer applicable after multiple version iterations. Iterators exist in the table layer, so there is no need to use iterators in the micro-partition layer.
1 parent c58e7af commit 96d1a16

10 files changed

Lines changed: 202 additions & 316 deletions

File tree

contrib/pax_storage/src/cpp/access/pax_scanner.cc

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ TableScanDesc PaxScanDesc::BeginScan(Relation relation, Snapshot snapshot,
1616
PaxFilter *filter) {
1717
PaxScanDesc *desc;
1818
MemoryContext old_ctx;
19-
FileSystem *file_system;
20-
MicroPartitionReader *micro_partition_reader;
2119
TableReader::ReaderOptions reader_options{};
2220

2321
StaticAssertStmt(
@@ -45,12 +43,8 @@ TableScanDesc PaxScanDesc::BeginScan(Relation relation, Snapshot snapshot,
4543
old_ctx = MemoryContextSwitchTo(desc->memory_context_);
4644

4745
// build reader
48-
file_system = Singleton<LocalFileSystem>::GetInstance();
49-
50-
micro_partition_reader = new OrcIteratorReader(file_system);
51-
micro_partition_reader->SetReadBuffer(desc->reused_buffer_);
52-
5346
reader_options.build_bitmap = true;
47+
reader_options.reused_buffer = desc->reused_buffer_;
5448
reader_options.rel_oid = desc->rs_base_.rs_rd->rd_id;
5549
reader_options.filter = filter;
5650

@@ -63,8 +57,7 @@ TableScanDesc PaxScanDesc::BeginScan(Relation relation, Snapshot snapshot,
6357
});
6458
iter = std::unique_ptr<IteratorBase<MicroPartitionMetadata>>(wrap);
6559
}
66-
desc->reader_ =
67-
new TableReader(micro_partition_reader, std::move(iter), reader_options);
60+
desc->reader_ = new TableReader(std::move(iter), reader_options);
6861
desc->reader_->Open();
6962

7063
#ifdef VEC_BUILD

contrib/pax_storage/src/cpp/storage/micro_partition.cc

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,4 @@ const std::string &MicroPartitionWriter::FileName() const {
4848
return writer_options_.file_name;
4949
}
5050

51-
MicroPartitionReader::MicroPartitionReader(FileSystem *fs) : file_system_(fs) {}
52-
53-
MicroPartitionReader::MicroPartitionReader() : file_system_(nullptr) {}
54-
5551
} // namespace pax

contrib/pax_storage/src/cpp/storage/micro_partition.h

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,14 @@ class MicroPartitionReader {
109109
// additioinal info to initialize a reader.
110110
std::string block_id;
111111

112+
// Optional, when reused buffer is not set, new memory will be generated for
113+
// ReadTuple
114+
DataBuffer<char> *reused_buffer = nullptr;
115+
112116
PaxFilter *filter = nullptr;
113117
};
114118

115-
explicit MicroPartitionReader(FileSystem *fs);
116-
117-
MicroPartitionReader();
119+
MicroPartitionReader() = default;
118120

119121
virtual ~MicroPartitionReader() = default;
120122

@@ -131,15 +133,6 @@ class MicroPartitionReader {
131133
// is also created during this stage, no matter the map relation is needed or
132134
// not. We may optimize to avoid creating the map relation later.
133135
virtual bool ReadTuple(CTupleSlot *slot) = 0;
134-
135-
virtual uint64 Offset() const = 0;
136-
137-
virtual size_t Length() const = 0;
138-
139-
virtual void SetReadBuffer(DataBuffer<char> *data_buffer) = 0;
140-
141-
protected:
142-
FileSystem *file_system_ = nullptr;
143136
};
144137

145138
} // namespace pax

contrib/pax_storage/src/cpp/storage/micro_partition_file_factory.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
namespace pax {
66
MicroPartitionReader *MicroPartitionFileFactory::CreateMicroPartitionReader(
77
const std::string &type, File *file,
8-
const MicroPartitionReader::ReaderOptions & /*options*/) {
8+
const MicroPartitionReader::ReaderOptions &options) {
99
if (type == MICRO_PARTITION_TYPE_PAX) {
10-
MicroPartitionReader *reader = nullptr;
11-
reader = OrcReader::CreateReader(file);
10+
MicroPartitionReader *reader = new OrcReader(file);
11+
12+
reader->Open(options);
1213
return reader;
1314
}
1415

contrib/pax_storage/src/cpp/storage/orc/orc.cc

Lines changed: 108 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,57 @@
1414
#include "storage/pax_filter.h"
1515
namespace pax {
1616

17+
std::pair<std::vector<orc::proto::Type_Kind>, std::vector<ColumnEncoding_Kind>>
18+
OrcWriter::BuildSchema(const MicroPartitionWriter::WriterOptions &options) {
19+
std::vector<orc::proto::Type_Kind> type_kinds;
20+
std::vector<ColumnEncoding_Kind> encoding_types;
21+
TupleDesc desc;
22+
23+
desc = options.desc;
24+
Assert(desc);
25+
26+
for (int i = 0; i < desc->natts; i++) {
27+
auto *attr = &desc->attrs[i];
28+
if (attr->attbyval) {
29+
switch (attr->attlen) {
30+
case 1:
31+
type_kinds.emplace_back(orc::proto::Type_Kind::Type_Kind_BYTE);
32+
encoding_types.emplace_back(
33+
ColumnEncoding_Kind::ColumnEncoding_Kind_NO_ENCODED);
34+
break;
35+
case 2:
36+
type_kinds.emplace_back(orc::proto::Type_Kind::Type_Kind_SHORT);
37+
encoding_types.emplace_back(
38+
ColumnEncoding_Kind::ColumnEncoding_Kind_NO_ENCODED);
39+
break;
40+
case 4:
41+
type_kinds.emplace_back(orc::proto::Type_Kind::Type_Kind_INT);
42+
encoding_types.emplace_back(
43+
ColumnEncoding_Kind::ColumnEncoding_Kind_NO_ENCODED);
44+
break;
45+
case 8:
46+
type_kinds.emplace_back(orc::proto::Type_Kind::Type_Kind_LONG);
47+
// TODO: parse options
48+
encoding_types.emplace_back(
49+
ColumnEncoding_Kind::ColumnEncoding_Kind_ORC_RLE_V2);
50+
break;
51+
default:
52+
Assert(!"should not be here! pg_type which attbyval=true only have typlen of "
53+
"1, 2, 4, or 8");
54+
}
55+
} else {
56+
Assert(attr->attlen > 0 || attr->attlen == -1);
57+
type_kinds.emplace_back(orc::proto::Type_Kind::Type_Kind_STRING);
58+
encoding_types.emplace_back(
59+
ColumnEncoding_Kind::ColumnEncoding_Kind_NO_ENCODED);
60+
}
61+
}
62+
63+
Assert(type_kinds.size() == encoding_types.size());
64+
65+
return std::make_pair(type_kinds, encoding_types);
66+
}
67+
1768
OrcWriter::OrcWriter(
1869
const MicroPartitionWriter::WriterOptions &orc_writer_options,
1970
const std::vector<orc::proto::Type_Kind> &column_types,
@@ -339,25 +390,8 @@ OrcReader::OrcReader(File *file)
339390
working_pax_columns_(nullptr),
340391
num_of_stripes_(0),
341392
proj_map_(nullptr),
342-
proj_len_(0) {
343-
size_t file_length = file_->FileLength();
344-
uint64 post_script_len = 0;
345-
346-
file_->PRead(&post_script_len, ORC_POST_SCRIPT_SIZE,
347-
(off_t)(file_length - ORC_POST_SCRIPT_SIZE));
348-
349-
ReadPostScript(file_length, post_script_len);
350-
351-
size_t footer_len = post_script_.footerlength();
352-
size_t tail_len = ORC_POST_SCRIPT_SIZE + post_script_len + footer_len;
353-
size_t footer_offset = file_length - tail_len;
354-
355-
ReadFooter(footer_offset, footer_len);
356-
num_of_stripes_ = file_footer_.stripes_size();
357-
358-
if (post_script_.metadatalength() != 0)
359-
ReadMetadata(file_length, post_script_len);
360-
}
393+
proj_len_(0),
394+
is_close_(true) {}
361395

362396
OrcReader::~OrcReader() { delete file_; }
363397

@@ -851,9 +885,59 @@ OrcReader::StripeInformation *OrcReader::GetStripeInfo(size_t index) const {
851885
size_t OrcReader::GetNumberOfStripes() const { return num_of_stripes_; }
852886

853887
void OrcReader::Open(const ReaderOptions &options) {
888+
size_t file_length = 0;
889+
uint64 post_script_len = 0;
890+
891+
size_t footer_offset = 0;
892+
size_t footer_len = 0;
893+
size_t tail_len = 0;
894+
895+
Assert(file_);
896+
Assert(is_close_);
897+
898+
// Must not open twice.
899+
Assert(!reused_buffer_);
900+
if (options.reused_buffer) {
901+
CBDB_CHECK(options.reused_buffer->IsMemTakeOver(),
902+
cbdb::CException::ExType::kExTypeLogicError);
903+
options.reused_buffer->BrushBackAll();
904+
905+
reused_buffer_ = options.reused_buffer;
906+
}
907+
854908
Assert(!proj_map_ && !proj_len_);
855909
if (options.filter)
856910
std::tie(proj_map_, proj_len_) = options.filter->GetColumnProjection();
911+
912+
// Begin read footer
913+
914+
// TODO(jiaqizho):
915+
// There is an optimization here, in standard ORC, A single ORC
916+
// file will read
917+
// follow these step:
918+
// - read postscript size
919+
// - read post script
920+
// - read file footer
921+
// - read meta if exist
922+
// the footer information of a single ORC file needing cost 3-4 iops
923+
// consider add a new filed after postscript size, contain the full size of
924+
// footer information
925+
file_length = file_->FileLength();
926+
file_->PRead(&post_script_len, ORC_POST_SCRIPT_SIZE,
927+
(off_t)(file_length - ORC_POST_SCRIPT_SIZE));
928+
929+
ReadPostScript(file_length, post_script_len);
930+
931+
footer_len = post_script_.footerlength();
932+
tail_len = ORC_POST_SCRIPT_SIZE + post_script_len + footer_len;
933+
footer_offset = file_length - tail_len;
934+
935+
ReadFooter(footer_offset, footer_len);
936+
num_of_stripes_ = file_footer_.stripes_size();
937+
938+
if (post_script_.metadatalength() != 0)
939+
ReadMetadata(file_length, post_script_len);
940+
is_close_ = false;
857941
}
858942

859943
void OrcReader::ResetCurrentReading() {
@@ -868,11 +952,15 @@ void OrcReader::ResetCurrentReading() {
868952
}
869953

870954
void OrcReader::Close() {
871-
Assert(current_nulls_);
955+
if (is_close_) {
956+
return;
957+
}
958+
872959
ResetCurrentReading();
873960
delete[] current_nulls_;
874961
current_nulls_ = nullptr;
875962
file_->Close();
963+
is_close_ = true;
876964
}
877965

878966
bool OrcReader::ReadTuple(CTupleSlot *cslot) {
@@ -1018,69 +1106,4 @@ bool OrcReader::ReadTuple(CTupleSlot *cslot) {
10181106
return true;
10191107
}
10201108

1021-
uint64 OrcReader::Offset() const { return current_offset_; }
1022-
1023-
void OrcReader::SetReadBuffer(DataBuffer<char> *reused_buffer) {
1024-
Assert(!reused_buffer_);
1025-
CBDB_CHECK(reused_buffer->IsMemTakeOver(),
1026-
cbdb::CException::ExType::kExTypeLogicError);
1027-
reused_buffer->BrushBackAll();
1028-
1029-
reused_buffer_ = reused_buffer;
1030-
}
1031-
1032-
size_t OrcReader::Length() const { return 0; }
1033-
1034-
OrcIteratorReader::OrcIteratorReader(FileSystem *fs)
1035-
: MicroPartitionReader(fs), reader_(nullptr), reused_buffer_(nullptr) {}
1036-
1037-
void OrcIteratorReader::Open(const ReaderOptions &options) {
1038-
File *file = file_system_->Open(options.file_name);
1039-
Assert(file != nullptr);
1040-
// last file should closed
1041-
Assert(reader_ == nullptr);
1042-
1043-
reader_ = OrcReader::CreateReader(file);
1044-
if (reused_buffer_) {
1045-
reader_->SetReadBuffer(reused_buffer_);
1046-
}
1047-
// TODO(jiaqizho): should remove OrcIteratorReader
1048-
// and create micro partition writer/reader by a builder.
1049-
// Then we won't pass the options to anywhere
1050-
reader_->Open(options);
1051-
closed_ = false;
1052-
}
1053-
1054-
void OrcIteratorReader::Close() {
1055-
if (closed_) {
1056-
return;
1057-
}
1058-
Assert(reader_);
1059-
reader_->Close();
1060-
delete reader_;
1061-
reader_ = nullptr;
1062-
closed_ = true;
1063-
}
1064-
1065-
uint64 OrcIteratorReader::Offset() const {
1066-
Assert(reader_);
1067-
return reader_->Offset();
1068-
}
1069-
1070-
bool OrcIteratorReader::ReadTuple(CTupleSlot *slot) {
1071-
Assert(reader_);
1072-
return reader_->ReadTuple(slot);
1073-
}
1074-
1075-
size_t OrcIteratorReader::Length() const {
1076-
// TODO(gongxun): get length from orc file
1077-
Assert(!"not implemented");
1078-
return 0;
1079-
}
1080-
1081-
void OrcIteratorReader::SetReadBuffer(DataBuffer<char> *reused_buffer) {
1082-
Assert(!reused_buffer_);
1083-
reused_buffer_ = reused_buffer;
1084-
}
1085-
10861109
} // namespace pax

0 commit comments

Comments
 (0)