Skip to content

Commit

Permalink
Merge pull request ClickHouse#61936 from hanfei1991/hanfei/refreshabl…
Browse files Browse the repository at this point in the history
…e_view_race

fix a race in refreshable view
  • Loading branch information
hanfei1991 authored Mar 27, 2024
2 parents 795026a + f3616f6 commit b824e94
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 4 deletions.
4 changes: 2 additions & 2 deletions src/Storages/MaterializedView/RefreshSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ void RefreshSet::Handle::reset()

RefreshSet::RefreshSet() = default;

RefreshSet::Handle RefreshSet::emplace(StorageID id, const std::vector<StorageID> & dependencies, RefreshTaskHolder task)
void RefreshSet::emplace(StorageID id, const std::vector<StorageID> & dependencies, RefreshTaskHolder task)
{
std::lock_guard guard(mutex);
auto [it, is_inserted] = tasks.emplace(id, task);
if (!is_inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Refresh set entry already exists for table {}", id.getFullTableName());
addDependenciesLocked(id, dependencies);

return Handle(this, id, dependencies);
task->setRefreshSetHandleUnlock(Handle(this, id, dependencies));
}

void RefreshSet::addDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies)
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MaterializedView/RefreshSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class RefreshSet

RefreshSet();

Handle emplace(StorageID id, const std::vector<StorageID> & dependencies, RefreshTaskHolder task);
void emplace(StorageID id, const std::vector<StorageID> & dependencies, RefreshTaskHolder task);

RefreshTaskHolder getTask(const StorageID & id) const;

Expand Down
7 changes: 6 additions & 1 deletion src/Storages/MaterializedView/RefreshTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ RefreshTaskHolder RefreshTask::create(
for (auto && dependency : strategy.dependencies->children)
deps.emplace_back(dependency->as<const ASTTableIdentifier &>());

task->set_handle = context->getRefreshSet().emplace(view.getStorageID(), deps, task);
context->getRefreshSet().emplace(view.getStorageID(), deps, task);

return task;
}
Expand Down Expand Up @@ -509,4 +509,9 @@ std::chrono::system_clock::time_point RefreshTask::currentTime() const
return std::chrono::system_clock::time_point(std::chrono::seconds(fake));
}

void RefreshTask::setRefreshSetHandleUnlock(RefreshSet::Handle && set_handle_)
{
set_handle = std::move(set_handle_);
}

}
3 changes: 3 additions & 0 deletions src/Storages/MaterializedView/RefreshTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class RefreshTask : public std::enable_shared_from_this<RefreshTask>
/// For tests
void setFakeTime(std::optional<Int64> t);

/// RefreshSet will set handle for refresh tasks, to avoid race condition.
void setRefreshSetHandleUnlock(RefreshSet::Handle && set_handle_);

private:
LoggerPtr log = nullptr;
std::weak_ptr<IStorage> view_to_refresh;
Expand Down

0 comments on commit b824e94

Please sign in to comment.