Skip to content

Commit a08be4d

Browse files
committed
Fix: pax group footer will not reocrd compress level
The compress level in group footer always be 0, because pax will not set it. We should record this field, even if it is not used during reading. Without record compress level will make the file written unaware of the compress level of the group.
1 parent 33a0bea commit a08be4d

11 files changed

Lines changed: 96 additions & 56 deletions

contrib/pax_storage/src/cpp/storage/columns/pax_column.cc

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@ PaxColumn::PaxColumn()
1616
total_rows_(0),
1717
non_null_rows_(0),
1818
encoded_type_(ColumnEncoding_Kind::ColumnEncoding_Kind_NO_ENCODED),
19+
compress_level_(0),
1920
type_align_size_(PAX_DATA_NO_ALIGN) {}
2021

21-
PaxColumn::~PaxColumn() {
22-
PAX_DELETE(null_bitmap_);
23-
}
22+
PaxColumn::~PaxColumn() { PAX_DELETE(null_bitmap_); }
2423

2524
PaxColumnTypeInMem PaxColumn::GetPaxColumnTypeInMem() const {
2625
return PaxColumnTypeInMem::kTypeInvalid;
@@ -80,13 +79,6 @@ void PaxColumn::SetAlignSize(size_t align_size) {
8079
type_align_size_ = align_size;
8180
}
8281

83-
PaxColumn *PaxColumn::SetColumnEncodeType(ColumnEncoding_Kind encoding_type) {
84-
encoded_type_ = encoding_type;
85-
return this;
86-
}
87-
88-
ColumnEncoding_Kind PaxColumn::GetEncodingType() const { return encoded_type_; }
89-
9082
template <typename T>
9183
PaxCommColumn<T>::PaxCommColumn(uint32 capacity) {
9284
data_ = PAX_NEW<DataBuffer<T>>(capacity * sizeof(T));

contrib/pax_storage/src/cpp/storage/columns/pax_column.h

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ class PaxColumn {
3131

3232
virtual ~PaxColumn();
3333

34-
virtual PaxColumn *SetColumnEncodeType(ColumnEncoding_Kind encoding_type);
35-
3634
// Get the column in memory type
3735
virtual PaxColumnTypeInMem GetPaxColumnTypeInMem() const;
3836

@@ -150,9 +148,6 @@ class PaxColumn {
150148
// Estimated memory size from current column
151149
virtual size_t PhysicalSize() const = 0;
152150

153-
// Get current encoding type
154-
virtual ColumnEncoding_Kind GetEncodingType() const;
155-
156151
// Get current storage type
157152
virtual PaxStorageFormat GetStorageFormat() const = 0;
158153

@@ -182,6 +177,22 @@ class PaxColumn {
182177

183178
virtual void SetAlignSize(size_t align_size);
184179

180+
// Get current encoding type
181+
inline ColumnEncoding_Kind GetEncodingType() const { return encoded_type_; }
182+
183+
// Get current compress level
184+
inline int GetCompressLevel() const { return compress_level_; }
185+
186+
protected:
187+
// The encoding option should pass in sub-class
188+
inline void SetEncodeType(ColumnEncoding_Kind encoding_type) {
189+
encoded_type_ = encoding_type;
190+
}
191+
192+
inline void SetCompressLevel(int compress_level) {
193+
compress_level_ = compress_level;
194+
}
195+
185196
private:
186197
void CreateNulls(size_t cap);
187198

@@ -197,9 +208,12 @@ class PaxColumn {
197208
// but can direct get not null rows by data part.
198209
size_t non_null_rows_;
199210

200-
// the column is encoded type
211+
// the column encoded type
201212
ColumnEncoding_Kind encoded_type_;
202213

214+
// the column compress level
215+
int compress_level_;
216+
203217
// data part align size.
204218
// This field only takes effect when current column is no encoding/compress.
205219
//

contrib/pax_storage/src/cpp/storage/columns/pax_column_test.cc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ TEST_P(PaxColumnEncodingTest, GetRangeEncodingColumnTest) {
417417
bits, origin_len, origin_rows, std::move(decoding_option), encoded_buff,
418418
encoded_len, storage_type, 100);
419419

420+
ASSERT_EQ(int_column_for_read->GetCompressLevel(), 0);
420421
char *verify_buff;
421422
size_t verify_len;
422423
std::tie(verify_buff, verify_len) =
@@ -471,11 +472,13 @@ TEST_P(PaxColumnCompressTest, FixedCompressColumnGetRangeTest) {
471472
PaxDecoder::DecodingOption decoding_option;
472473
decoding_option.column_encode_type = kind;
473474
decoding_option.is_sign = true;
475+
decoding_option.compress_level = 5;
474476

475477
auto int_column_for_read =
476478
CreateDecodeColumn(bits, (100) * bits / 8, origin_rows,
477479
std::move(decoding_option), encoded_buff, encoded_len);
478480

481+
ASSERT_EQ(int_column_for_read->GetCompressLevel(), 5);
479482
char *verify_buff;
480483
size_t verify_len;
481484
std::tie(verify_buff, verify_len) =
@@ -534,6 +537,7 @@ TEST_P(PaxColumnEncodingTest, PaxEncodingColumnDefault) {
534537
bits, origin_len, origin_rows, std::move(decoding_option), encoded_buff,
535538
encoded_len, storage_type);
536539

540+
ASSERT_EQ(int_column_for_read->GetCompressLevel(), 0);
537541
char *verify_buff;
538542
size_t verify_len;
539543
std::tie(verify_buff, verify_len) = int_column_for_read->GetBuffer();
@@ -577,6 +581,7 @@ TEST_P(PaxColumnEncodingTest, PaxEncodingColumnSpecType) {
577581
auto int_column_for_read = CreateDecodeColumn(
578582
bits, origin_len, origin_rows, std::move(decoding_option), encoded_buff,
579583
encoded_len, storage_type);
584+
ASSERT_EQ(int_column_for_read->GetCompressLevel(), 0);
580585

581586
char *verify_buff;
582587
size_t verify_len;
@@ -620,7 +625,7 @@ TEST_P(PaxColumnEncodingTest, PaxEncodingColumnNoEncoding) {
620625
auto int_column_for_read = CreateDecodeColumn(
621626
bits, encoded_len, origin_rows, std::move(decoding_option), encoded_buff,
622627
encoded_len, storage_type);
623-
628+
ASSERT_EQ(int_column_for_read->GetCompressLevel(), 0);
624629
char *verify_buff;
625630
size_t verify_len;
626631
std::tie(verify_buff, verify_len) = int_column_for_read->GetBuffer();
@@ -659,11 +664,13 @@ TEST_P(PaxColumnCompressTest, PaxEncodingColumnCompressDecompress) {
659664
PaxDecoder::DecodingOption decoding_option;
660665
decoding_option.column_encode_type = kind;
661666
decoding_option.is_sign = true;
667+
decoding_option.compress_level = 5;
662668

663669
auto int_column_for_read =
664670
CreateDecodeColumn(bits, (UINT16_MAX + 1) * bits / 8, origin_rows,
665671
std::move(decoding_option), encoded_buff, encoded_len);
666672

673+
ASSERT_EQ(int_column_for_read->GetCompressLevel(), 5);
667674
char *verify_buff;
668675
size_t verify_len;
669676
std::tie(verify_buff, verify_len) = int_column_for_read->GetBuffer();
@@ -718,6 +725,7 @@ TEST_P(PaxNonFixedColumnCompressTest,
718725
PaxDecoder::DecodingOption decoding_option;
719726
decoding_option.column_encode_type = kind;
720727
decoding_option.is_sign = true;
728+
decoding_option.compress_level = 5;
721729

722730
auto non_fixed_column_for_read = new PaxNonFixedEncodingColumn(
723731
buffer_len * number, std::move(decoding_option));
@@ -727,7 +735,7 @@ TEST_P(PaxNonFixedColumnCompressTest,
727735
auto length_buffer_cpy = new DataBuffer<int64>(*length_buffer);
728736
non_fixed_column_for_read->Set(data_buffer_for_read, length_buffer_cpy,
729737
origin_len);
730-
738+
ASSERT_EQ(non_fixed_column_for_read->GetCompressLevel(), 5);
731739
char *verify_buff;
732740
size_t verify_len;
733741

contrib/pax_storage/src/cpp/storage/columns/pax_columns.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ size_t PaxColumns::MeasureVecDataBuffer(
302302
column->GetOriginLength() >= 0);
303303

304304
column_encoding_func(
305-
column->GetEncodingType(),
305+
column->GetEncodingType(), column->GetCompressLevel(),
306306
(column->GetEncodingType() !=
307307
ColumnEncoding_Kind::ColumnEncoding_Kind_NO_ENCODED)
308308
? TYPEALIGN(MEMORY_ALIGN_SIZE, column->GetOriginLength())
@@ -327,8 +327,8 @@ size_t PaxColumns::MeasureOrcDataBuffer(
327327
Assert(bm);
328328
size_t bm_length = bm->MinimalStoredBytes(column->GetRows());
329329
buffer_len += bm_length;
330-
column_streams_func(pax::orc::proto::Stream_Kind_PRESENT, column->GetRows(),
331-
bm_length);
330+
column_streams_func(pax::orc::proto::Stream_Kind_PRESENT,
331+
column->GetRows(), bm_length);
332332
}
333333

334334
size_t column_size = column->GetNonNullRows();
@@ -371,7 +371,8 @@ size_t PaxColumns::MeasureOrcDataBuffer(
371371
}
372372
}
373373

374-
column_encoding_func(column->GetEncodingType(), column->GetOriginLength());
374+
column_encoding_func(column->GetEncodingType(), column->GetCompressLevel(),
375+
column->GetOriginLength());
375376
}
376377
return buffer_len;
377378
}

contrib/pax_storage/src/cpp/storage/columns/pax_columns.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ class PaxColumns : public PaxColumn {
6565
using ColumnStreamsFunc =
6666
std::function<void(const pax::orc::proto::Stream_Kind &, size_t, size_t)>;
6767

68-
using ColumnEncodingFunc =
69-
std::function<void(const ColumnEncoding_Kind &, size_t)>;
68+
using ColumnEncodingFunc = std::function<void(
69+
const ColumnEncoding_Kind &, const uint64 compress_lvl, size_t)>;
7070

7171
// Get the combined data buffer of all columns
7272
// TODO(jiaqizho): consider add a new api which support split IO from

contrib/pax_storage/src/cpp/storage/columns/pax_decoding.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ class PaxDecoder {
1414
struct DecodingOption {
1515
ColumnEncoding_Kind column_encode_type;
1616
bool is_sign;
17+
int compress_level;
1718

1819
DecodingOption()
1920
: column_encode_type(
2021
ColumnEncoding_Kind::ColumnEncoding_Kind_DEF_ENCODED),
21-
is_sign(true) {}
22+
is_sign(true),
23+
compress_level(0) {}
2224
};
2325

2426
explicit PaxDecoder(const DecodingOption &decoder_options);

contrib/pax_storage/src/cpp/storage/columns/pax_encoding_column.cc

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ void PaxEncodingColumn<T>::InitEncoder() {
4545
encoder_options_.column_encode_type = GetDefaultColumnType();
4646
}
4747

48-
PaxColumn::encoded_type_ = encoder_options_.column_encode_type;
48+
PaxColumn::SetEncodeType(encoder_options_.column_encode_type);
49+
PaxColumn::SetCompressLevel(encoder_options_.compress_level);
4950

5051
// Create a streaming encoder
5152
// If current `encoded_type_` can not create a streaming encoder,
@@ -65,23 +66,26 @@ void PaxEncodingColumn<T>::InitEncoder() {
6566
// Create a block compressor
6667
// Compressor have a different interface with pax encoder
6768
// If no pax encoder no provided, then try to create a compressor.
68-
compressor_ = PaxCompressor::CreateBlockCompressor(PaxColumn::encoded_type_);
69+
compressor_ =
70+
PaxCompressor::CreateBlockCompressor(PaxColumn::GetEncodingType());
6971
if (compressor_) {
7072
return;
7173
}
7274

7375
// can't find any encoder or compressor
7476
// then should reset encode type
7577
// or will got origin length is -1 but still have encode type
76-
PaxColumn::encoded_type_ =
77-
ColumnEncoding_Kind::ColumnEncoding_Kind_NO_ENCODED;
78+
PaxColumn::SetEncodeType(ColumnEncoding_Kind::ColumnEncoding_Kind_NO_ENCODED);
79+
PaxColumn::SetCompressLevel(0);
7880
}
7981

8082
template <typename T>
8183
void PaxEncodingColumn<T>::InitDecoder() {
8284
Assert(decoder_options_.column_encode_type !=
8385
ColumnEncoding_Kind::ColumnEncoding_Kind_DEF_ENCODED);
84-
PaxColumn::encoded_type_ = decoder_options_.column_encode_type;
86+
87+
PaxColumn::SetEncodeType(decoder_options_.column_encode_type);
88+
PaxColumn::SetCompressLevel(decoder_options_.compress_level);
8589

8690
decoder_ = PaxDecoder::CreateDecoder<T>(decoder_options_);
8791
if (decoder_) {
@@ -92,7 +96,8 @@ void PaxEncodingColumn<T>::InitDecoder() {
9296
return;
9397
}
9498

95-
compressor_ = PaxCompressor::CreateBlockCompressor(PaxColumn::encoded_type_);
99+
compressor_ =
100+
PaxCompressor::CreateBlockCompressor(PaxColumn::GetEncodingType());
96101
}
97102

98103
template <typename T>

contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.cc

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@ PaxNonFixedEncodingColumn::PaxNonFixedEncodingColumn(
1616
ColumnEncoding_Kind::ColumnEncoding_Kind_DEF_ENCODED) {
1717
encoder_options_.column_encode_type = ColumnEncoding_Kind_COMPRESS_ZSTD;
1818
}
19+
PaxColumn::SetEncodeType(encoder_options_.column_encode_type);
20+
PaxColumn::SetCompressLevel(encoder_options_.compress_level);
1921

20-
PaxColumn::encoded_type_ = encoder_options_.column_encode_type;
21-
compressor_ = PaxCompressor::CreateBlockCompressor(PaxColumn::encoded_type_);
22+
compressor_ =
23+
PaxCompressor::CreateBlockCompressor(PaxColumn::GetEncodingType());
2224
if (!compressor_) {
23-
PaxColumn::encoded_type_ =
24-
ColumnEncoding_Kind::ColumnEncoding_Kind_NO_ENCODED;
25+
PaxColumn::SetEncodeType(
26+
ColumnEncoding_Kind::ColumnEncoding_Kind_NO_ENCODED);
27+
PaxColumn::SetCompressLevel(0);
2528
}
2629
}
2730

@@ -34,8 +37,10 @@ PaxNonFixedEncodingColumn::PaxNonFixedEncodingColumn(
3437
shared_data_(nullptr) {
3538
Assert(decoder_options_.column_encode_type !=
3639
ColumnEncoding_Kind::ColumnEncoding_Kind_DEF_ENCODED);
37-
PaxColumn::encoded_type_ = decoder_options_.column_encode_type;
38-
compressor_ = PaxCompressor::CreateBlockCompressor(PaxColumn::encoded_type_);
40+
PaxColumn::SetEncodeType(decoder_options_.column_encode_type);
41+
PaxColumn::SetCompressLevel(decoder_options_.compress_level);
42+
compressor_ =
43+
PaxCompressor::CreateBlockCompressor(PaxColumn::GetEncodingType());
3944
}
4045

4146
PaxNonFixedEncodingColumn::~PaxNonFixedEncodingColumn() {

contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.cc

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,27 +45,29 @@ void PaxVecEncodingColumn<T>::InitEncoder() {
4545
encoder_options_.column_encode_type = GetDefaultColumnType();
4646
}
4747

48-
PaxColumn::encoded_type_ = encoder_options_.column_encode_type;
48+
PaxColumn::SetEncodeType(encoder_options_.column_encode_type);
49+
PaxColumn::SetCompressLevel(encoder_options_.compress_level);
4950

5051
encoder_ = PaxEncoder::CreateStreamingEncoder(encoder_options_);
5152
if (encoder_) {
5253
return;
5354
}
5455

55-
compressor_ = PaxCompressor::CreateBlockCompressor(PaxColumn::encoded_type_);
56-
if (compressor_) {
57-
return;
56+
compressor_ =
57+
PaxCompressor::CreateBlockCompressor(PaxColumn::GetEncodingType());
58+
if (!compressor_) {
59+
PaxColumn::SetEncodeType(
60+
ColumnEncoding_Kind::ColumnEncoding_Kind_NO_ENCODED);
61+
PaxColumn::SetCompressLevel(0);
5862
}
59-
60-
PaxColumn::encoded_type_ =
61-
ColumnEncoding_Kind::ColumnEncoding_Kind_NO_ENCODED;
6263
}
6364

6465
template <typename T>
6566
void PaxVecEncodingColumn<T>::InitDecoder() {
6667
Assert(decoder_options_.column_encode_type !=
6768
ColumnEncoding_Kind::ColumnEncoding_Kind_DEF_ENCODED);
68-
PaxColumn::encoded_type_ = decoder_options_.column_encode_type;
69+
PaxColumn::SetEncodeType(decoder_options_.column_encode_type);
70+
PaxColumn::SetCompressLevel(decoder_options_.compress_level);
6971

7072
decoder_ = PaxDecoder::CreateDecoder<T>(decoder_options_);
7173
if (decoder_) {
@@ -76,7 +78,8 @@ void PaxVecEncodingColumn<T>::InitDecoder() {
7678
return;
7779
}
7880

79-
compressor_ = PaxCompressor::CreateBlockCompressor(PaxColumn::encoded_type_);
81+
compressor_ =
82+
PaxCompressor::CreateBlockCompressor(PaxColumn::GetEncodingType());
8083
}
8184

8285
template <typename T>
@@ -218,11 +221,15 @@ PaxVecNonFixedEncodingColumn::PaxVecNonFixedEncodingColumn(
218221
encoder_options_.column_encode_type = ColumnEncoding_Kind_COMPRESS_ZSTD;
219222
}
220223

221-
PaxColumn::encoded_type_ = encoder_options_.column_encode_type;
222-
compressor_ = PaxCompressor::CreateBlockCompressor(PaxColumn::encoded_type_);
224+
PaxColumn::SetEncodeType(encoder_options_.column_encode_type);
225+
PaxColumn::SetCompressLevel(encoder_options_.compress_level);
226+
227+
compressor_ =
228+
PaxCompressor::CreateBlockCompressor(PaxColumn::GetEncodingType());
223229
if (!compressor_) {
224-
PaxColumn::encoded_type_ =
225-
ColumnEncoding_Kind::ColumnEncoding_Kind_NO_ENCODED;
230+
PaxColumn::SetEncodeType(
231+
ColumnEncoding_Kind::ColumnEncoding_Kind_NO_ENCODED);
232+
PaxColumn::SetCompressLevel(0);
226233
}
227234
}
228235

@@ -235,8 +242,10 @@ PaxVecNonFixedEncodingColumn::PaxVecNonFixedEncodingColumn(
235242
shared_data_(nullptr) {
236243
Assert(decoder_options_.column_encode_type !=
237244
ColumnEncoding_Kind::ColumnEncoding_Kind_DEF_ENCODED);
238-
PaxColumn::encoded_type_ = decoder_options_.column_encode_type;
239-
compressor_ = PaxCompressor::CreateBlockCompressor(PaxColumn::encoded_type_);
245+
PaxColumn::SetEncodeType(decoder_options_.column_encode_type);
246+
PaxColumn::SetCompressLevel(decoder_options_.compress_level);
247+
compressor_ =
248+
PaxCompressor::CreateBlockCompressor(PaxColumn::GetEncodingType());
240249
}
241250

242251
PaxVecNonFixedEncodingColumn::~PaxVecNonFixedEncodingColumn() {

0 commit comments

Comments
 (0)