Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: [2.5]intermin index support different index type and more data type(fp16/bf16) #39180

Open
wants to merge 1 commit into
base: 2.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/index/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
AUTOINDEX IndexType = "AUTOINDEX"
DISKANN IndexType = "DISKANN"
SCANN IndexType = "SCANN"
SCANNDVR IndexType = "SCANN_DVR"

// Sparse
SparseInverted IndexType = "SPARSE_INVERTED_INDEX"
Expand Down
5 changes: 4 additions & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,11 @@ queryNode:
# Milvus will eventually seals and indexes all segments, but enabling this optimizes search performance for immediate queries following data insertion.
# This defaults to true, indicating that Milvus creates temporary index for growing segments and the sealed segments that are not indexed upon searches.
enableIndex: true
nlist: 128 # temp index nlist, recommend to set sqrt(chunkRows), must smaller than chunkRows/8
nlist: 128 # interim index nlist, recommend to set sqrt(chunkRows), must smaller than chunkRows/8
nprobe: 16 # nprobe to search small index, based on your accuracy requirement, must smaller than nlist
subDim: 4 # interim index sub dim, recommend to (subDim % vector dim == 0)
refineRatio: 3.5 # interim index parameters, should set to be >= 1.0
withRawData: true # Whether to keep raw data inside the intermin index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be an index feature, not a configuration item.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace withRawData by denseVectorIndexType

memExpansionRate: 1.15 # extra memory needed by building interim index
buildParallelRate: 0.5 # the ratio of building interim index parallel matched with cpu num
multipleChunkedEnable: true # Enable multiple chunked search
Expand Down
24 changes: 24 additions & 0 deletions internal/core/src/index/VectorMemIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ VectorMemIndex<T>::VectorMemIndex(
}
}

template <typename T>
VectorMemIndex<T>::VectorMemIndex(const IndexType& index_type,
const MetricType& metric_type,
const IndexVersion& version,
const knowhere::ViewDataOp view_data)
: VectorIndex(index_type, metric_type) {
CheckMetricTypeSupport<T>(metric_type);
AssertInfo(!is_unsupported(index_type, metric_type),
index_type + " doesn't support metric: " + metric_type);

auto view_data_pack = knowhere::Pack(view_data);
auto get_index_obj = knowhere::IndexFactory::Instance().Create<T>(
GetIndexType(), version, view_data_pack);
if (get_index_obj.has_value()) {
index_ = get_index_obj.value();
} else {
auto err = get_index_obj.error();
if (err == knowhere::Status::invalid_index_error) {
PanicInfo(ErrorCode::Unsupported, get_index_obj.what());
}
PanicInfo(ErrorCode::KnowhereError, get_index_obj.what());
}
}

template <typename T>
knowhere::expected<std::vector<knowhere::IndexNode::IteratorPtr>>
VectorMemIndex<T>::VectorIterators(const milvus::DatasetPtr dataset,
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/index/VectorMemIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class VectorMemIndex : public VectorIndex {
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());

// knowhere data view index special constucter for intermin index, no need to hold file_manager_ to upload or download files
VectorMemIndex(const IndexType& index_type,
const MetricType& metric_type,
const IndexVersion& version,
const knowhere::ViewDataOp view_data);

BinarySet
Serialize(const Config& config) override;

Expand Down
97 changes: 71 additions & 26 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "common/Tracer.h"
#include "common/Types.h"
#include "google/protobuf/message_lite.h"
#include "index/VectorIndex.h"
#include "index/VectorMemIndex.h"
#include "mmap/ChunkedColumn.h"
#include "mmap/Utils.h"
Expand Down Expand Up @@ -115,7 +116,8 @@ ChunkedSegmentSealedImpl::LoadVecIndex(const LoadIndexInfo& info) {
if (get_bit(field_data_ready_bitset_, field_id)) {
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);
} else if (get_bit(binlog_index_bitset_, field_id)) {
}
if (get_bit(binlog_index_bitset_, field_id)) {
set_bit(binlog_index_bitset_, field_id, false);
vector_indexings_.drop_field_indexing(field_id);
}
Expand All @@ -136,8 +138,7 @@ ChunkedSegmentSealedImpl::WarmupChunkCache(const FieldId field_id,
auto& field_meta = schema_->operator[](field_id);
AssertInfo(field_meta.is_vector(), "vector field is not vector type");

if (!get_bit(index_ready_bitset_, field_id) &&
!get_bit(binlog_index_bitset_, field_id)) {
if (!get_bit(index_ready_bitset_, field_id)) {
return;
}

Expand Down Expand Up @@ -489,21 +490,13 @@ ChunkedSegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
insert_record_.seal_pks();
}

bool use_temp_index = false;
{
// update num_rows to build temperate binlog index
// update num_rows to build temperate intermin index
std::unique_lock lck(mutex_);
update_row_count(num_rows);
}

if (generate_interim_index(field_id)) {
std::unique_lock lck(mutex_);
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);
use_temp_index = true;
}

if (!use_temp_index) {
if (!generate_interim_index(field_id)) {
std::unique_lock lck(mutex_);
set_bit(field_data_ready_bitset_, field_id, true);
}
Expand Down Expand Up @@ -1744,6 +1737,10 @@ ChunkedSegmentSealedImpl::bulk_subscript(FieldId field_id,
return fill_with_empty(field_id, count);
}

if (HasFieldData(field_id)) {
Assert(get_bit(field_data_ready_bitset_, field_id));
return get_raw_data(field_id, field_meta, seg_offsets, count);
}
if (HasIndex(field_id)) {
// if field has load scalar index, reverse raw data from index
if (!IsVectorDataType(field_meta.get_data_type())) {
Expand All @@ -1757,11 +1754,9 @@ ChunkedSegmentSealedImpl::bulk_subscript(FieldId field_id,
return get_raw_data(field_id, field_meta, seg_offsets, count);
}
return get_vector(field_id, seg_offsets, count);
} else {
return fill_with_empty(field_id, count);
}

Assert(get_bit(field_data_ready_bitset_, field_id));

return get_raw_data(field_id, field_meta, seg_offsets, count);
}

std::unique_ptr<DataArray>
Expand Down Expand Up @@ -1818,15 +1813,22 @@ ChunkedSegmentSealedImpl::HasRawData(int64_t field_id) const {
auto fieldID = FieldId(field_id);
const auto& field_meta = schema_->operator[](fieldID);
if (IsVectorDataType(field_meta.get_data_type())) {
if (get_bit(index_ready_bitset_, fieldID) |
get_bit(binlog_index_bitset_, fieldID)) {
if (get_bit(index_ready_bitset_, fieldID)) {
AssertInfo(vector_indexings_.is_ready(fieldID),
"vector index is not ready");
auto field_indexing = vector_indexings_.get_field_indexing(fieldID);
auto vec_index = dynamic_cast<index::VectorIndex*>(
field_indexing->indexing_.get());
return vec_index->HasRawData();
}
} else if (get_bit(binlog_index_bitset_, fieldID)) {
AssertInfo(vector_indexings_.is_ready(fieldID),
"vector index is not ready");
auto field_indexing = vector_indexings_.get_field_indexing(fieldID);
auto vec_index =
dynamic_cast<index::VectorIndex*>(field_indexing->indexing_.get());
return vec_index->HasRawData() ||
get_bit(field_data_ready_bitset_, fieldID);
} else {
auto scalar_index = scalar_indexings_.find(fieldID);
if (scalar_index != scalar_indexings_.end()) {
Expand Down Expand Up @@ -2017,6 +2019,8 @@ ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id) {
}
// check data type
if (field_meta.get_data_type() != DataType::VECTOR_FLOAT &&
field_meta.get_data_type() != DataType::VECTOR_FLOAT16 &&
field_meta.get_data_type() != DataType::VECTOR_BFLOAT16 &&
!is_sparse) {
return false;
}
Expand Down Expand Up @@ -2062,16 +2066,50 @@ ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id) {
is_sparse
? dynamic_cast<ChunkedSparseFloatColumn*>(vec_data.get())->Dim()
: field_meta.get_dim();
auto index_metric = field_binlog_config->GetMetricType();
std::unique_ptr<index::VectorIndex> vec_index = nullptr;
if (!is_sparse) {
knowhere::ViewDataOp view_data = [field_raw_data_ptr =
vec_data](size_t id) {
return field_raw_data_ptr->ValueAt(id);
};
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {
vec_index = std::make_unique<index::VectorMemIndex<float>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
view_data);
} else if (field_meta.get_data_type() == DataType::VECTOR_FLOAT16) {
vec_index =
std::make_unique<index::VectorMemIndex<knowhere::fp16>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
view_data);
} else if (field_meta.get_data_type() ==
DataType::VECTOR_BFLOAT16) {
vec_index =
std::make_unique<index::VectorMemIndex<knowhere::bf16>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
view_data);
}
} else {
vec_index = std::make_unique<index::VectorMemIndex<float>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber());
}
if (vec_index == nullptr) {
LOG_INFO("fail to generate intermin index, invalid data type.");
return false;
}

auto build_config = field_binlog_config->GetBuildBaseParams();
build_config[knowhere::meta::DIM] = std::to_string(dim);
build_config[knowhere::meta::NUM_BUILD_THREAD] = std::to_string(1);
auto index_metric = field_binlog_config->GetMetricType();

auto vec_index = std::make_unique<index::VectorMemIndex<float>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber());
auto num_chunk = vec_data->num_chunks();
for (int i = 0; i < num_chunk; ++i) {
auto dataset = knowhere::GenDataSet(
Expand All @@ -2088,19 +2126,26 @@ ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id) {

if (enable_binlog_index()) {
std::unique_lock lck(mutex_);
if (vec_index->HasRawData()) {
// some knowhere view data index not has raw data, still keep it
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);
} else {
set_bit(field_data_ready_bitset_, field_id, true);
}
vector_indexings_.append_field_indexing(
field_id, index_metric, std::move(vec_index));

vec_binlog_config_[field_id] = std::move(field_binlog_config);
set_bit(binlog_index_bitset_, field_id, true);
LOG_INFO(
"replace binlog with binlog index in segment {}, field {}.",
"replace binlog with intermin index in segment {}, field {}.",
this->get_segment_id(),
field_id.get());
}
return true;
} catch (std::exception& e) {
LOG_WARN("fail to generate binlog index, because {}", e.what());
LOG_WARN("fail to generate intermin index, because {}", e.what());
return false;
}
}
Expand Down
13 changes: 13 additions & 0 deletions internal/core/src/segcore/ConcurrentVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -541,4 +541,17 @@ class ConcurrentVector<BFloat16Vector>
}
};

static bool
ConcurrentDenseVectorCheck(const VectorBase* vec_base, DataType data_type) {
if (data_type == DataType::VECTOR_FLOAT) {
return dynamic_cast<const ConcurrentVector<FloatVector>*>(vec_base);
} else if (data_type == DataType::VECTOR_FLOAT16) {
return dynamic_cast<const ConcurrentVector<Float16Vector>*>(vec_base);
} else if (data_type == DataType::VECTOR_BFLOAT16) {
return dynamic_cast<const ConcurrentVector<BFloat16Vector>*>(vec_base);
} else {
return false;
}
}

} // namespace milvus::segcore
Loading
Loading