Skip to content

Commit 5c18b3c

Browse files
obdevmiyuan-ljr
authored andcommitted
[CP] [fix] hybrid vector index refresh task return 4377
Co-authored-by: 884244693 <884244693@qq.com>
1 parent aed0d54 commit 5c18b3c

2 files changed

Lines changed: 88 additions & 20 deletions

File tree

src/share/vector_index/ob_hybrid_vector_refresh_task.cpp

Lines changed: 78 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -919,6 +919,7 @@ int ObHybridVectorRefreshTask::after_embedding(ObPluginVectorIndexAdaptor &adapt
919919
transaction::ObTxDesc *tx_desc = nullptr;
920920
oceanbase::transaction::ObTxReadSnapshot snapshot;
921921
storage::ObStoreCtxGuard store_ctx_guard;
922+
ObAccessService *oas = MTL(ObAccessService *);
922923
ObAccessService *tsc_service = MTL(ObAccessService *);
923924
oceanbase::transaction::ObTransService *txs = MTL(transaction::ObTransService *);
924925
uint64_t timeout_us = ObTimeUtility::current_time() + ObInsertLobColumnHelper::LOB_TX_TIMEOUT;
@@ -954,14 +955,13 @@ int ObHybridVectorRefreshTask::after_embedding(ObPluginVectorIndexAdaptor &adapt
954955
ret = OB_ALLOCATE_MEMORY_FAILED;
955956
LOG_WARN("failed to alloc mem.", K(ret), K(dim));
956957
} else {
957-
HEAP_VARS_4((blocksstable::ObDatumRow, datum_row, tenant_id_), (blocksstable::ObDatumRow, new_row, tenant_id_), (storage::ObTableScanParam, vid_rowkey_scan_param), (schema::ObTableParam, vid_rowkey_table_param, allocator_)) {
958+
HEAP_VARS_3((blocksstable::ObDatumRow, new_row, tenant_id_), (storage::ObTableScanParam, vid_rowkey_scan_param), (schema::ObTableParam, vid_rowkey_table_param, allocator_)) {
958959
ObArenaAllocator scan_allocator("VecEmbedding", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
959960
common::ObNewRowIterator *vid_rowkey_iter = nullptr;
960961
ObTableScanIterator *table_scan_iter = nullptr;
962+
blocksstable::ObDatumRow *datum_row = nullptr;
961963
int64_t loop_cnt = 0;
962-
if (OB_FAIL(datum_row.init(task_ctx->embedded_table_column_ids_.count()))) {
963-
LOG_WARN("fail to init datum row", K(ret), K(task_ctx->embedded_table_column_ids_), K(datum_row));
964-
} else if (OB_FAIL(new_row.init(task_ctx->embedded_table_column_ids_.count()))) {
964+
if (OB_FAIL(new_row.init(task_ctx->embedded_table_column_ids_.count()))) {
965965
LOG_WARN("fail to init datum row", K(ret), K(task_ctx->embedded_table_column_ids_), K(new_row));
966966
} else if (adaptor.get_is_need_vid() && OB_FAIL(ObPluginVectorIndexUtils::read_local_tablet(ls_id_,
967967
&adaptor,
@@ -975,20 +975,32 @@ int ObHybridVectorRefreshTask::after_embedding(ObPluginVectorIndexAdaptor &adapt
975975
LOG_WARN("failed to read vid rowkey tablet.", K(ret));
976976
}
977977
// col order of 6th table is rowkey vid vector or pk(vid) part_key vector
978+
const int64_t embedded_rowkey_count = task_ctx->embedded_table_column_ids_.count() - task_ctx->part_key_num_ - 1;
979+
void *buf = nullptr;
980+
ObObj *obj_ptr = nullptr;
981+
if (OB_FAIL(ret)) {
982+
} else if (embedded_rowkey_count <= 0) {
983+
ret = OB_ERR_UNEXPECTED;
984+
LOG_WARN("get embedded_rowkey_count invalid.", K(ret), K(embedded_rowkey_count));
985+
} else {
986+
if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObObj) * (embedded_rowkey_count)))) {
987+
ret = OB_ALLOCATE_MEMORY_FAILED;
988+
LOG_WARN("failed to alloc mem.", K(ret), K(embedded_rowkey_count));
989+
} else {
990+
obj_ptr = new (buf) ObObj[embedded_rowkey_count];
991+
}
992+
}
978993
for (int64_t row_id = 0; row_id < task_ctx->embedding_vids_.count() && OB_SUCC(ret); row_id++) {
979-
datum_row.storage_datums_[task_ctx->embedded_table_column_ids_.count() - 2 - task_ctx->part_key_num_].set_int(task_ctx->embedding_vids_.at(row_id));
980994
for (int64_t i = 0; i < dim; i++) {
981995
vector_buf[i] = output_vector.at(row_id)[i];
982996
}
983-
datum_row.storage_datums_[task_ctx->embedded_table_column_ids_.count() - 1].set_null();
984-
for (int i = task_ctx->embedded_table_column_ids_.count() - 2; i > task_ctx->embedded_table_column_ids_.count() - 2 - task_ctx->part_key_num_; i--) {
985-
datum_row.storage_datums_[i].set_null();
986-
}
997+
obj_ptr[embedded_rowkey_count - 1].set_uint64(task_ctx->embedding_vids_.at(row_id));
987998
if (adaptor.get_is_need_vid()) {
988999
ObObj vid_obj;
9891000
vid_obj.set_int(task_ctx->embedding_vids_.at(row_id));
9901001
ObRowkey rowkey(&vid_obj, 1);
9911002
blocksstable::ObDatumRow *vid_rowkey_datum = nullptr;
1003+
obj_ptr[embedded_rowkey_count - 1].set_int(task_ctx->embedding_vids_.at(row_id));
9921004
if (OB_FAIL(ObPluginVectorIndexUtils::add_key_ranges(adaptor.get_vid_rowkey_table_id(), rowkey, vid_rowkey_scan_param))) {
9931005
LOG_WARN("failed to set vid id key", K(ret));
9941006
} else if (OB_FAIL(ObPluginVectorIndexUtils::iter_table_rescan(vid_rowkey_scan_param, vid_rowkey_iter))) {
@@ -1010,20 +1022,69 @@ int ObHybridVectorRefreshTask::after_embedding(ObPluginVectorIndexAdaptor &adapt
10101022
ret = OB_ERR_UNEXPECTED;
10111023
LOG_WARN("column count mismatch", K(ret), K(vid_rowkey_datum), K(task_ctx->embedded_table_column_ids_), K(row_id));
10121024
} else {
1013-
1025+
const ObIArray<share::schema::ObColumnParam *> *out_col_param = vid_rowkey_scan_param.table_param_->get_read_info().get_columns();
10141026
for (int64_t i = 0; OB_SUCC(ret) && i < task_ctx->embedded_table_column_ids_.count() - 2; i++) {
1015-
datum_row.storage_datums_[i].shallow_copy_from_datum(vid_rowkey_datum->storage_datums_[i + 1]); // deep copy in add_row() later.
1027+
ObObj tmp_obj;
1028+
ObObjMeta meta_type = out_col_param->at(i + 1)->get_meta_type();
1029+
if (OB_FAIL(vid_rowkey_datum->storage_datums_[i + 1].to_obj(tmp_obj, meta_type))) {
1030+
LOG_WARN("failed to convert datum to obj.", K(ret), K(i), K(vid_rowkey_datum->storage_datums_[i + 1]));
1031+
} else if (OB_FAIL(ob_write_obj(allocator_, tmp_obj, obj_ptr[i]))) {
1032+
LOG_WARN("failed to write obj.", K(ret), K(i), K(tmp_obj));
1033+
}
10161034
}
10171035
}
10181036
}
1037+
ObSEArray<uint64_t, 4> dml_column_ids;
10191038
if (OB_SUCC(ret)) {
1020-
if (OB_FAIL(new_row.deep_copy(datum_row, allocator_))) {
1021-
LOG_WARN("failed to copy row", K(ret), K(datum_row));
1022-
} else if (FALSE_IT(new_row.storage_datums_[task_ctx->embedded_table_column_ids_.count() - 1].set_string(reinterpret_cast<char *>(vector_buf), dim * sizeof(float)))) {
1023-
} else if (OB_FAIL(embedded_iter.add_row(datum_row, new_row))) {
1024-
LOG_WARN("failed to add row to index id iter", K(ret));
1039+
HEAP_VARS_2((storage::ObTableScanParam, embedded_scan_param), (schema::ObTableParam, embedded_table_param, allocator_)) {
1040+
common::ObNewRowIterator *embedded_scan_iter = nullptr;
1041+
ObTableScanIterator *embedded_table_scan_iter = nullptr;
1042+
ObArenaAllocator embedde_scan_allocator("VecEmbedding", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
1043+
ObRowkey rowkey(obj_ptr, embedded_rowkey_count);
1044+
if (OB_FAIL(ObPluginVectorIndexUtils::read_local_tablet(ls_id_,
1045+
&adaptor,
1046+
snapshot.version(),
1047+
INDEX_TYPE_HYBRID_INDEX_EMBEDDED_LOCAL,
1048+
allocator_,
1049+
embedde_scan_allocator,
1050+
embedded_scan_param,
1051+
embedded_table_param,
1052+
embedded_scan_iter,
1053+
&dml_column_ids,
1054+
true))) {
1055+
LOG_WARN("failed to read embeded vector table", K(ret));
1056+
} else if (OB_FAIL(ObPluginVectorIndexUtils::add_key_ranges(adaptor.get_embedded_table_id(), rowkey, embedded_scan_param))) {
1057+
LOG_WARN("failed to set embedded row key", K(ret));
1058+
} else if (OB_FAIL(ObPluginVectorIndexUtils::iter_table_rescan(embedded_scan_param, embedded_scan_iter))) {
1059+
LOG_WARN("failed to rescan embedded scan param.", K(ret));
1060+
} else if (OB_ISNULL(embedded_table_scan_iter = dynamic_cast<storage::ObTableScanIterator *>(embedded_scan_iter))) {
1061+
ret = OB_ERR_UNEXPECTED;
1062+
LOG_WARN("failed to get embedded scan iter.", K(ret));
1063+
} else if (OB_FAIL(embedded_table_scan_iter->get_next_row(datum_row))) {
1064+
if (OB_ITER_END != ret) {
1065+
LOG_WARN("failed to get next row from next table.", K(ret));
1066+
} else {
1067+
ret = OB_SUCCESS;
1068+
}
1069+
} else if (OB_ISNULL(datum_row) || !datum_row->is_valid()) {
1070+
ret = OB_ERR_UNEXPECTED;
1071+
LOG_WARN("get row invalid.", K(ret));
1072+
} else if (OB_FAIL(new_row.deep_copy(*datum_row, allocator_))) {
1073+
LOG_WARN("failed to copy row", K(ret), KP(datum_row));
1074+
} else if (FALSE_IT(new_row.storage_datums_[task_ctx->embedded_table_column_ids_.count() - 1].set_string(reinterpret_cast<char *>(vector_buf), dim * sizeof(float)))) {
1075+
} else if (OB_FAIL(embedded_iter.add_row(*datum_row, new_row))) {
1076+
LOG_WARN("failed to add row to index id iter", K(ret));
1077+
}
1078+
new_row.reuse();
1079+
int tmp_ret = OB_SUCCESS;
1080+
if (OB_NOT_NULL(oas) && OB_NOT_NULL(embedded_scan_iter)) {
1081+
tmp_ret = oas->revert_scan_iter(embedded_scan_iter);
1082+
if (tmp_ret != OB_SUCCESS) {
1083+
LOG_WARN("revert scan iter failed", K(tmp_ret));
1084+
}
1085+
embedded_scan_iter = nullptr;
1086+
}
10251087
}
1026-
datum_row.reuse();
10271088
}
10281089

10291090
CHECK_TASK_CANCELLED_IN_PROCESS(ret, loop_cnt, ctx_);
@@ -1044,7 +1105,6 @@ int ObHybridVectorRefreshTask::after_embedding(ObPluginVectorIndexAdaptor &adapt
10441105
int64_t affected_rows = 0;
10451106
ObDMLBaseParam dml_param;
10461107
share::schema::ObTableDMLParam table_dml_param(allocator_);
1047-
ObAccessService *oas = MTL(ObAccessService *);
10481108
if (OB_FAIL(ret)) {
10491109
} else if (OB_ISNULL(oas)) {
10501110
ret = OB_ERR_UNEXPECTED;

src/share/vector_index/ob_plugin_vector_index_utils.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,15 +288,15 @@ int ObPluginVectorIndexUtils::read_object_from_embedded_table_iter(ObObj *&input
288288
output_vec_obj.reset();
289289
ret = OB_SUCCESS;
290290
}
291-
} else if (datum_row->get_column_count() != EMBEDDED_TABLE_BASE_COLUMN_CNT + data_table_rowkey_count) {
291+
} else if (datum_row->get_column_count() < EMBEDDED_TABLE_BASE_COLUMN_CNT + data_table_rowkey_count) {
292292
ret = OB_ERR_UNEXPECTED;
293293
LOG_WARN("get row column cnt invalid.", K(ret), K(extra_column_count), K(datum_row->get_column_count()));
294294
} else {
295295
if (extra_column_count > 0 && OB_FAIL(get_extra_info_objs(scan_param, allocator, extra_column_count, datum_row, output_extra_info_objs, 0))) {
296296
LOG_WARN("failed to get extra info.", K(ret), K(extra_column_count), K(datum_row->storage_datums_[extra_column_count]));
297297
} else {
298298
char *copy_str = nullptr;
299-
ObString vector = datum_row->storage_datums_[data_table_rowkey_count + 1].get_string();
299+
ObString vector = datum_row->storage_datums_[datum_row->get_column_count() - 1].get_string();
300300
int64_t size = vector.length();
301301
if (size == 0) {
302302
output_vec_obj.reset();
@@ -1614,6 +1614,14 @@ int ObPluginVectorIndexUtils::init_table_param(ObTableParam *table_param,
16141614
if (OB_FAIL(ret)) {
16151615
} else if (OB_FAIL(column_ids.push_back(vid_column_id))) {
16161616
LOG_WARN("failed to push column id.", K(ret), K(vid_column_id));
1617+
} else if (!adapter->get_is_need_vid()) {
1618+
for (int64_t i = 0; OB_SUCC(ret) && i < part_column_ids.count(); ++i) {
1619+
if (OB_FAIL(column_ids.push_back(part_column_ids[i]))) {
1620+
LOG_WARN("failed to push column id.", K(ret), K(i), K(part_column_ids[i]), K(vid_column_id));
1621+
}
1622+
}
1623+
}
1624+
if (OB_FAIL(ret)) {
16171625
} else if (OB_FAIL(column_ids.push_back(vector_column_id))) {
16181626
LOG_WARN("failed to push column id.", K(ret), K(vector_column_id));
16191627
}

0 commit comments

Comments
 (0)