Skip to content

Commit 9ce9c35

Browse files
gongxun0928jiaqizho
authored andcommitted
pax: suport index cluster
The pax table supports the cluster syntax based on the btree index and behaves the same as the aocs table.
1 parent 8cdb311 commit 9ce9c35

35 files changed

Lines changed: 738 additions & 178 deletions

contrib/pax_storage/.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ Thumbs.db
1414
# Temp files dir
1515
.tmp/**
1616
build*/**
17+
results/**
1718
clang-tidy.result
1819
# pb
1920
**/*.pb.h
2021
**/*.pb.cc
21-
22+
tools/**/*.data
23+
tools/**/*.txt
2224
# Executables
2325
/*.out
2426
!src/data/expected/*.out

contrib/pax_storage/src/cpp/access/pax_access_handle.cc

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "access/pax_dml_state.h"
66
#include "access/pax_partition.h"
77
#include "access/pax_scanner.h"
8+
#include "access/pax_table_cluster.h"
89
#include "access/pax_updater.h"
910
#include "access/paxc_rel_options.h"
1011
#include "access/paxc_scanner.h"
@@ -242,24 +243,55 @@ void CCPaxAccessMethod::RelationCopyData(Relation rel,
242243
}
243244

244245
/*
245-
* Used by rebuild_relation, like CLUSTER, VACUUM FULL, etc.
246+
* Copy data from `OldTable` into `NewTable`, as part of a CLUSTER or VACUUM
247+
* FULL.
246248
*
247249
* PAX does not have dead tuples, but the core framework requires
248250
* to implement this callback to do CLUSTER/VACUUM FULL/etc.
249251
* PAX may have re-organize semantics for this function.
250252
*
251-
* TODO: how to split the set of micro-partitions to several QE handlers.
253+
*
254+
* Additional Input parameters:
255+
* - use_sort - if true, the table contents are sorted appropriate for
256+
* `OldIndex`; if false and OldIndex is not InvalidOid, the data is copied
257+
* in that index's order; if false and OldIndex is InvalidOid, no sorting is
258+
* performed
259+
* - OldIndex - see use_sort
260+
* - OldestXmin - computed by vacuum_set_xid_limits(), even when
261+
* not needed for the relation's AM
262+
* - *xid_cutoff - ditto
263+
* - *multi_cutoff - ditto
264+
*
265+
* Output parameters:
266+
* - *xid_cutoff - rel's new relfrozenxid value, may be invalid
267+
* - *multi_cutoff - rel's new relminmxid value, may be invalid
268+
* - *tups_vacuumed - stats, for logging, if appropriate for AM
269+
* - *tups_recently_dead - stats, for logging, if appropriate for AM
252270
*/
253271
void CCPaxAccessMethod::RelationCopyForCluster(
254-
Relation old_heap, Relation new_heap, Relation /*old_index*/,
255-
bool /*use_sort*/, TransactionId /*oldest_xmin*/,
256-
TransactionId * /*xid_cutoff*/, MultiXactId * /*multi_cutoff*/,
257-
double * /*num_tuples*/, double * /*tups_vacuumed*/,
258-
double * /*tups_recently_dead*/) {
259-
Assert(RELATION_IS_PAX(old_heap));
260-
Assert(RELATION_IS_PAX(new_heap));
272+
Relation old_rel, Relation new_rel, Relation old_index, bool use_sort,
273+
TransactionId /*oldest_xmin*/, TransactionId * /*xid_cutoff*/,
274+
MultiXactId * /*multi_cutoff*/, double * /*num_tuples*/,
275+
double * /*tups_vacuumed*/, double * /* tups_recently_dead*/) {
276+
Assert(RELATION_IS_PAX(old_rel));
277+
Assert(RELATION_IS_PAX(new_rel));
261278
CBDB_TRY();
262-
{ pax::CCPaxAuxTable::PaxAuxRelationCopyDataForCluster(old_heap, new_heap); }
279+
{
280+
// if false and OldIndex is InvalidOid, no sorting is performed, just copy
281+
if (!use_sort && old_index == NULL) {
282+
pax::CCPaxAuxTable::PaxAuxRelationCopyDataForCluster(old_rel, new_rel);
283+
return;
284+
}
285+
286+
// TODO(gongxun): should we support index's order to cluster table? ao/aocs
287+
// does not support.
288+
289+
// like aocs, pax tables can only be clustered against a btree index
290+
CBDB_CHECK(old_index && (old_index->rd_rel->relam == BTREE_AM_OID),
291+
cbdb::CException::kExTypeInvalidIndexType,
292+
"PAX tables can only be clustered against a btree index");
293+
pax::IndexCluster(old_rel, new_rel, old_index, GetActiveSnapshot());
294+
}
263295
CBDB_CATCH_DEFAULT();
264296
CBDB_FINALLY({});
265297
CBDB_END_TRY();
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#include "access/pax_table_cluster.h"
2+
3+
#include "clustering/clustering.h"
4+
#include "clustering/index_clustering.h"
5+
#include "clustering/pax_clustering_reader.h"
6+
#include "clustering/pax_clustering_writer.h"
7+
#include "storage/micro_partition_iterator.h"
8+
#include "storage/micro_partition_metadata.h"
9+
10+
#define CLUSTER_SORT_MEMORY 128000
11+
12+
namespace pax {
13+
void IndexCluster(Relation old_rel, Relation new_rel, Relation index,
14+
Snapshot snapshot) {
15+
auto iter = MicroPartitionInfoIterator::New(old_rel, snapshot);
16+
17+
auto reader =
18+
PAX_NEW<clustering::PaxClusteringReader>(old_rel, std::move(iter));
19+
20+
auto writer = PAX_NEW<clustering::PaxClusteringWriter>(new_rel);
21+
22+
auto cluster = clustering::DataClustering::CreateDataClustering(
23+
clustering::DataClustering::kClusterTypeIndex);
24+
25+
clustering::IndexClustering::IndexClusteringOptions options;
26+
27+
options.tup_desc = old_rel->rd_att;
28+
options.index_rel = index;
29+
options.work_mem = CLUSTER_SORT_MEMORY;
30+
cluster->Clustering(reader, writer, &options);
31+
32+
writer->Close();
33+
PAX_DELETE(writer);
34+
reader->Close();
35+
PAX_DELETE(reader);
36+
}
37+
38+
} // namespace pax
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#pragma once
2+
3+
#include "comm/cbdb_api.h"
4+
5+
namespace pax {
6+
void IndexCluster(Relation old_rel, Relation new_rel, Relation index,
7+
Snapshot snapshot);
8+
} // namespace pax
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#include "clustering/clustering.h"
2+
3+
#include "clustering/index_clustering.h"
4+
#include "comm/pax_memory.h"
5+
6+
namespace pax {
7+
namespace clustering {
8+
9+
DataClustering *DataClustering::CreateDataClustering(
10+
const DataClustering::ClusterType type) {
11+
switch (type) {
12+
case DataClustering::kClusterTypeIndex:
13+
static IndexClustering index_clustering;
14+
return &index_clustering;
15+
default:
16+
return nullptr;
17+
}
18+
}
19+
20+
} // namespace clustering
21+
} // namespace pax
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#pragma once
2+
3+
#include "clustering/clustering_reader.h"
4+
#include "clustering/clustering_writer.h"
5+
#include "clustering/sorter.h"
6+
7+
namespace pax {
8+
9+
namespace clustering {
10+
11+
class DataClustering {
12+
public:
13+
enum ClusterType {
14+
kClusterTypeIndex,
15+
};
16+
17+
struct DataClusteringOptions {
18+
ClusterType type;
19+
};
20+
21+
public:
22+
DataClustering() = default;
23+
virtual ~DataClustering() = default;
24+
virtual void Clustering(ClusteringDataReader *reader, ClusteringDataWriter *writer,
25+
const DataClusteringOptions *options) = 0;
26+
static DataClustering *CreateDataClustering(const ClusterType type);
27+
};
28+
29+
} // namespace clustering
30+
} // namespace pax
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#pragma once
2+
3+
#include "comm/cbdb_api.h"
4+
namespace pax {
5+
6+
namespace clustering {
7+
class ClusteringDataReader {
8+
public:
9+
ClusteringDataReader() = default;
10+
virtual ~ClusteringDataReader() = default;
11+
// TODO(gongxun): support record batch
12+
// return false if no more tuples
13+
virtual bool GetNextTuple(TupleTableSlot *) = 0;
14+
virtual void Close() = 0;
15+
};
16+
17+
} // namespace clustering
18+
19+
} // namespace pax
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#pragma once
2+
3+
#include "comm/cbdb_api.h"
4+
namespace pax {
5+
6+
namespace clustering {
7+
8+
class ClusteringDataWriter {
9+
public:
10+
ClusteringDataWriter() = default;
11+
virtual ~ClusteringDataWriter() = default;
12+
/**
13+
* slot->tts_value[tuple_desc->natts] is zorder_value
14+
*/
15+
virtual void WriteTuple(TupleTableSlot *tuple) = 0;
16+
virtual void Close() = 0;
17+
};
18+
19+
} // namespace clustering
20+
21+
} // namespace pax
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
2+
#include "clustering/index_clustering.h"
3+
4+
#include "clustering/sorter_index.h"
5+
#include "comm/cbdb_wrappers.h"
6+
7+
namespace pax {
8+
namespace clustering {
9+
10+
IndexClustering::IndexClustering() {}
11+
IndexClustering::~IndexClustering() {}
12+
void IndexClustering::Clustering(ClusteringDataReader *reader, ClusteringDataWriter *writer,
13+
const DataClusteringOptions *options) {
14+
const IndexClusteringOptions *index_options =
15+
reinterpret_cast<const IndexClusteringOptions *>(options);
16+
17+
CBDB_CHECK(index_options != nullptr && index_options->index_rel != nullptr,
18+
cbdb::CException::kExTypeInvalid, "index options is invalid");
19+
20+
TupleTableSlot *origin_slot;
21+
TupleTableSlot *sorted_slot;
22+
IndexSorter::IndexTupleSorterOptions sorter_options;
23+
24+
sorter_options.tup_desc = index_options->tup_desc;
25+
sorter_options.work_mem = index_options->work_mem;
26+
sorter_options.index_rel = index_options->index_rel;
27+
28+
IndexSorter sorter(sorter_options);
29+
30+
origin_slot =
31+
MakeSingleTupleTableSlot(index_options->tup_desc, &TTSOpsVirtual);
32+
while (reader->GetNextTuple(origin_slot)) {
33+
sorter.AppendSortData(origin_slot);
34+
}
35+
36+
ExecDropSingleTupleTableSlot(origin_slot);
37+
38+
sorter.Sort();
39+
40+
sorted_slot =
41+
MakeSingleTupleTableSlot(sorter_options.tup_desc, &TTSOpsMinimalTuple);
42+
while (sorter.GetSortedData(sorted_slot)) {
43+
slot_getallattrs(sorted_slot);
44+
writer->WriteTuple(sorted_slot);
45+
}
46+
ExecDropSingleTupleTableSlot(sorted_slot);
47+
}
48+
49+
} // namespace clustering
50+
} // namespace pax
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#pragma once
2+
3+
#include "clustering/clustering.h"
4+
namespace pax {
5+
namespace clustering {
6+
class IndexClustering final : public DataClustering {
7+
public:
8+
struct IndexClusteringOptions : public DataClusteringOptions {
9+
IndexClusteringOptions() { type = kClusterTypeIndex; }
10+
TupleDesc tup_desc;
11+
Relation index_rel;
12+
int work_mem;
13+
};
14+
15+
public:
16+
IndexClustering();
17+
virtual ~IndexClustering();
18+
void Clustering(ClusteringDataReader *reader, ClusteringDataWriter *writer,
19+
const DataClusteringOptions *options) override;
20+
};
21+
22+
} // namespace clustering
23+
24+
} // namespace pax

0 commit comments

Comments
 (0)