Skip to content

Commit

Permalink
fix: fix directory confusion between indexnode and querynode
Browse files Browse the repository at this point in the history
Signed-off-by: luzhang <[email protected]>
  • Loading branch information
luzhang committed Jan 9, 2025
1 parent 84f8047 commit d21347e
Show file tree
Hide file tree
Showing 34 changed files with 207 additions and 98 deletions.
26 changes: 26 additions & 0 deletions internal/core/src/common/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,32 @@ extern int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
extern int CPU_NUM;
extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE;

enum class Role { QueryNode, IndexNode };

const std::unordered_map<Role, std::string> RoleToStringMap = {
{Role::QueryNode, "querynode"}, {Role::IndexNode, "indexnode"}};

// convert role to string
inline std::string
ToString(Role role) {
auto it = RoleToStringMap.find(role);
if (it != RoleToStringMap.end()) {
return it->second;
}
PanicInfo(UnexpectedError, "role {} not found", int(role));

Check warning on line 45 in internal/core/src/common/Common.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/Common.h#L45

Added line #L45 was not covered by tests
}

// convert string to role
inline Role
FromString(const std::string& role_str) {
for (const auto& pair : RoleToStringMap) {
if (pair.second == role_str) {
return pair.first;

Check warning on line 53 in internal/core/src/common/Common.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/Common.h#L50-L53

Added lines #L50 - L53 were not covered by tests
}
}
PanicInfo(UnexpectedError, "role {} not found", role_str);

Check warning on line 56 in internal/core/src/common/Common.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/Common.h#L56

Added line #L56 was not covered by tests
}

void
SetIndexSliceSize(const int64_t size);

Expand Down
9 changes: 9 additions & 0 deletions internal/core/src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,21 @@ class IndexBase {
return index_type_;
}

bool
IsLoadingIndex() const {
return is_loading_index_;
}

protected:
explicit IndexBase(IndexType index_type)
: index_type_(std::move(index_type)) {
}

IndexType index_type_ = "";

// current index obj is used to build and load index in one class,
// this flag is used to indicate whether building index or loading index
bool is_loading_index_{false};
};

using IndexBasePtr = std::unique_ptr<IndexBase>;
Expand Down
5 changes: 3 additions & 2 deletions internal/core/src/index/InvertedIndexTantivy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ InvertedIndexTantivy<T>::InvertedIndexTantivy(
const storage::FileManagerContext& ctx)
: ScalarIndex<T>(INVERTED_INDEX_TYPE),
schema_(ctx.fieldDataMeta.field_schema) {
this->is_loading_index_ = ctx.for_loading_index;
mem_file_manager_ = std::make_shared<MemFileManager>(ctx);
disk_file_manager_ = std::make_shared<DiskFileManager>(ctx);
// push init wrapper to load process
if (ctx.for_loading_index) {
if (this->is_loading_index_) {
return;
}
InitForBuildIndex();
Expand All @@ -107,7 +108,7 @@ InvertedIndexTantivy<T>::~InvertedIndexTantivy() {
wrapper_->free();
}
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();
auto prefix = path_;
LOG_INFO("inverted index remove path:{}", path_);
local_chunk_manager->RemoveDir(prefix);
Expand Down
17 changes: 11 additions & 6 deletions internal/core/src/index/VectorDiskIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,21 @@ VectorDiskAnnIndex<T>::VectorDiskAnnIndex(
file_manager_ =
std::make_shared<storage::DiskFileManagerImpl>(file_manager_context);
AssertInfo(file_manager_ != nullptr, "create file manager failed!");
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
auto local_index_path_prefix = file_manager_->GetLocalIndexObjectPrefix();

is_loading_index_ = file_manager_context.for_loading_index;
// As we have guarded dup-load in QueryNode,
// this assertion failed only if the Milvus rebooted in the same pod,
// need to remove these files then re-load the segment
auto local_chunk_manager =
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();
auto local_index_path_prefix = file_manager_->GetLocalIndexObjectPrefix();

if (local_chunk_manager->Exist(local_index_path_prefix)) {
local_chunk_manager->RemoveDir(local_index_path_prefix);
}
CheckCompatible(version);
local_chunk_manager->CreateDir(local_index_path_prefix);

auto diskann_index_pack =
knowhere::Pack(std::shared_ptr<knowhere::FileManager>(file_manager_));
auto get_index_obj = knowhere::IndexFactory::Instance().Create<T>(
Expand Down Expand Up @@ -135,7 +138,8 @@ template <typename T>
void
VectorDiskAnnIndex<T>::Build(const Config& config) {
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(

Check warning on line 141 in internal/core/src/index/VectorDiskIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/VectorDiskIndex.cpp#L141

Added line #L141 was not covered by tests
milvus::Role::IndexNode);
knowhere::Json build_config;
build_config.update(config);

Expand Down Expand Up @@ -185,7 +189,8 @@ void
VectorDiskAnnIndex<T>::BuildWithDataset(const DatasetPtr& dataset,
const Config& config) {
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::IndexNode);
knowhere::Json build_config;
build_config.update(config);
// set data path
Expand Down Expand Up @@ -353,7 +358,7 @@ template <typename T>
void
VectorDiskAnnIndex<T>::CleanLocalData() {
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();

Check warning on line 361 in internal/core/src/index/VectorDiskIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/VectorDiskIndex.cpp#L361

Added line #L361 was not covered by tests
local_chunk_manager->RemoveDir(file_manager_->GetLocalIndexObjectPrefix());
local_chunk_manager->RemoveDir(
file_manager_->GetLocalRawDataObjectPrefix());
Expand Down
7 changes: 4 additions & 3 deletions internal/core/src/segcore/load_index_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/EasyAssert.h"
#include "common/Types.h"
#include "common/type_c.h"
#include "common/Common.h"
#include "index/Index.h"
#include "index/IndexFactory.h"
#include "index/Meta.h"
Expand Down Expand Up @@ -156,7 +157,7 @@ appendVecIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
config["index_files"] = load_index_info->index_files;

milvus::storage::FileManagerContext fileManagerContext(
field_meta, index_meta, remote_chunk_manager);
field_meta, index_meta, remote_chunk_manager, true);
fileManagerContext.set_for_loading_index(true);

load_index_info->index =
Expand Down Expand Up @@ -424,8 +425,8 @@ CleanLoadedIndex(CLoadIndexInfo c_load_index_info) {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
auto local_chunk_manager =
milvus::storage::LocalChunkManagerSingleton::GetInstance()
.GetChunkManager();
milvus::storage::LocalChunkManagerFactory::GetInstance()
.GetChunkManager(milvus::Role::QueryNode);

Check warning on line 429 in internal/core/src/segcore/load_index_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/load_index_c.cpp#L428-L429

Added lines #L428 - L429 were not covered by tests
auto index_file_path_prefix =
milvus::storage::GenIndexPathPrefix(local_chunk_manager,
load_index_info->index_build_id,
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/segcore/segment_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ LoadTextIndex(CSegmentInterface c_segment,

milvus::storage::FileManagerContext ctx(
field_meta, index_meta, remote_chunk_manager);
ctx.set_for_loading_index(true);

Check warning on line 469 in internal/core/src/segcore/segment_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/segment_c.cpp#L469

Added line #L469 was not covered by tests

auto index = std::make_unique<milvus::index::TextMatchIndex>(ctx);
index->Load(config);
Expand Down
40 changes: 21 additions & 19 deletions internal/core/src/storage/DiskFileManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,24 @@ namespace milvus::storage {
DiskFileManagerImpl::DiskFileManagerImpl(
const FileManagerContext& fileManagerContext)
: FileManagerImpl(fileManagerContext.fieldDataMeta,
fileManagerContext.indexMeta) {
fileManagerContext.indexMeta,
fileManagerContext.for_loading_index) {
rcm_ = fileManagerContext.chunkManagerPtr;
}

DiskFileManagerImpl::~DiskFileManagerImpl() {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
auto local_chunk_manager = GetLocalChunkManager();
local_chunk_manager->RemoveDir(GetIndexPathPrefixWithBuildID(
local_chunk_manager, index_meta_.build_id));
}

LocalChunkManagerSPtr
DiskFileManagerImpl::GetLocalChunkManager() const {
auto role =
for_loading_index_ ? milvus::Role::QueryNode : milvus::Role::IndexNode;
return LocalChunkManagerFactory::GetInstance().GetChunkManager(role);
}

bool
DiskFileManagerImpl::LoadFile(const std::string& file) noexcept {
return true;
Expand All @@ -82,7 +89,7 @@ DiskFileManagerImpl::GetRemoteTextLogPath(const std::string& file_name,
bool
DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();
FILEMANAGER_TRY
if (!local_chunk_manager->Exist(file)) {
LOG_ERROR("local file {} not exists", file);
Expand Down Expand Up @@ -134,7 +141,7 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
bool
DiskFileManagerImpl::AddTextLog(const std::string& file) noexcept {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();

Check warning on line 144 in internal/core/src/storage/DiskFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/DiskFileManagerImpl.cpp#L144

Added line #L144 was not covered by tests
FILEMANAGER_TRY
if (!local_chunk_manager->Exist(file)) {
LOG_ERROR("local file {} not exists", file);
Expand Down Expand Up @@ -190,7 +197,7 @@ DiskFileManagerImpl::AddBatchIndexFiles(
const std::vector<std::string>& remote_files,
const std::vector<int64_t>& remote_file_sizes) {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);

std::vector<std::future<std::shared_ptr<uint8_t[]>>> futures;
Expand Down Expand Up @@ -239,7 +246,7 @@ void
DiskFileManagerImpl::CacheIndexToDisk(
const std::vector<std::string>& remote_files) {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();

std::map<std::string, std::vector<int>> index_slices;
for (auto& file_path : remote_files) {
Expand Down Expand Up @@ -307,7 +314,7 @@ void
DiskFileManagerImpl::CacheTextLogToDisk(
const std::vector<std::string>& remote_files) {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();

Check warning on line 317 in internal/core/src/storage/DiskFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/DiskFileManagerImpl.cpp#L317

Added line #L317 was not covered by tests

std::map<std::string, std::vector<int>> index_slices;
for (auto& file_path : remote_files) {
Expand Down Expand Up @@ -375,8 +382,7 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
auto segment_id = GetFieldDataMeta().segment_id;
auto field_id = GetFieldDataMeta().field_id;

auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
auto local_chunk_manager = GetLocalChunkManager();

Check warning on line 385 in internal/core/src/storage/DiskFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/DiskFileManagerImpl.cpp#L385

Added line #L385 was not covered by tests
std::string local_data_path;
bool file_created = false;

Expand Down Expand Up @@ -618,8 +624,7 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) {

auto segment_id = GetFieldDataMeta().segment_id;
auto vec_field_id = GetFieldDataMeta().field_id;
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
auto local_chunk_manager = GetLocalChunkManager();
auto local_data_path = storage::GenFieldRawDataPathPrefix(
local_chunk_manager, segment_id, vec_field_id) +
std::string(VEC_OPT_FIELDS);
Expand Down Expand Up @@ -695,16 +700,14 @@ DiskFileManagerImpl::GetFileName(const std::string& localfile) {

std::string
DiskFileManagerImpl::GetLocalIndexObjectPrefix() {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
auto local_chunk_manager = GetLocalChunkManager();
return GenIndexPathPrefix(
local_chunk_manager, index_meta_.build_id, index_meta_.index_version);
}

std::string
DiskFileManagerImpl::GetLocalTextIndexPrefix() {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
auto local_chunk_manager = GetLocalChunkManager();

Check warning on line 710 in internal/core/src/storage/DiskFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/DiskFileManagerImpl.cpp#L710

Added line #L710 was not covered by tests
return GenTextIndexPathPrefix(local_chunk_manager,
index_meta_.build_id,
index_meta_.index_version,
Expand All @@ -728,8 +731,7 @@ DiskFileManagerImpl::GetTextIndexIdentifier() {

std::string
DiskFileManagerImpl::GetLocalRawDataObjectPrefix() {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
auto local_chunk_manager = GetLocalChunkManager();

Check warning on line 734 in internal/core/src/storage/DiskFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/DiskFileManagerImpl.cpp#L734

Added line #L734 was not covered by tests
return GenFieldRawDataPathPrefix(
local_chunk_manager, field_meta_.segment_id, field_meta_.field_id);
}
Expand All @@ -744,7 +746,7 @@ std::optional<bool>
DiskFileManagerImpl::IsExisted(const std::string& file) noexcept {
bool isExist = false;
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();
try {
isExist = local_chunk_manager->Exist(file);
} catch (std::exception& e) {
Expand Down
4 changes: 4 additions & 0 deletions internal/core/src/storage/DiskFileManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "storage/IndexData.h"
#include "storage/FileManager.h"
#include "storage/ChunkManager.h"
#include "storage/LocalChunkManager.h"
#include "common/Consts.h"

namespace milvus::storage {
Expand Down Expand Up @@ -125,6 +126,9 @@ class DiskFileManagerImpl : public FileManagerImpl {
std::string
GetRemoteTextLogPath(const std::string& file_name, int64_t slice_num) const;

LocalChunkManagerSPtr
GetLocalChunkManager() const;

private:
// local file path (abs path)
std::vector<std::string> local_paths_;
Expand Down
20 changes: 18 additions & 2 deletions internal/core/src/storage/FileManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ struct FileManagerContext {
chunkManagerPtr(chunkManagerPtr) {
}

explicit FileManagerContext(const FieldDataMeta& fieldDataMeta,
const IndexMeta& indexMeta,
const ChunkManagerPtr& chunkManagerPtr,
bool for_loading)
: fieldDataMeta(fieldDataMeta),
indexMeta(indexMeta),
chunkManagerPtr(chunkManagerPtr),
for_loading_index(for_loading) {
}

bool
Valid() const {
return chunkManagerPtr != nullptr;
Expand Down Expand Up @@ -74,8 +84,11 @@ struct FileManagerContext {
class FileManagerImpl : public knowhere::FileManager {
public:
explicit FileManagerImpl(const FieldDataMeta& field_mata,
IndexMeta index_meta)
: field_meta_(field_mata), index_meta_(std::move(index_meta)) {
IndexMeta index_meta,
bool for_loading_index)
: field_meta_(field_mata),
index_meta_(std::move(index_meta)),
for_loading_index_(for_loading_index) {
}

public:
Expand Down Expand Up @@ -176,6 +189,9 @@ class FileManagerImpl : public knowhere::FileManager {
// index meta
IndexMeta index_meta_;
ChunkManagerPtr rcm_;

// indicate whether file manager is used for building index or load index
bool for_loading_index_{false};
};

using FileManagerImplPtr = std::shared_ptr<FileManagerImpl>;
Expand Down
Loading

0 comments on commit d21347e

Please sign in to comment.