Skip to content

Commit

Permalink
code review
Browse files Browse the repository at this point in the history
  • Loading branch information
Qup42 committed Jan 17, 2025
1 parent 6861b5a commit 5b9be96
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 49 deletions.
51 changes: 25 additions & 26 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -883,11 +883,15 @@ Awaitable<void> Server::processQuery(
co_return;
}

json Server::createResponseMetadata(
json Server::createResponseMetadataForUpdate(
const ad_utility::Timer& requestTimer, const Index& index,
const DeltaTriples& deltaTriples, const PlannedQuery& plannedQuery,
const QueryExecutionTree& qet, const DeltaTriplesCount& countBefore,
const UpdateMetadata& updateMetadata, const DeltaTriplesCount& countAfter) {
auto formatTime = [](std::chrono::milliseconds time) {
return absl::StrCat(time.count(), "ms");
};

json response;
response["update"] = plannedQuery.parsedQuery_._originalString;
response["status"] = "OK";
Expand All @@ -907,20 +911,19 @@ json Server::createResponseMetadata(
response["delta-triples"]["difference"] =
nlohmann::json(countAfter - countBefore);
response["time"]["planing"] =
absl::StrCat(runtimeInfoWholeOp.timeQueryPlanning.count(), "ms");
formatTime(runtimeInfoWholeOp.timeQueryPlanning);
response["time"]["where"] =
absl::StrCat(runtimeInfo.totalTime_.count() / 1000, "ms");
formatTime(std::chrono::duration_cast<std::chrono::milliseconds>(
runtimeInfo.totalTime_));
json updateTime{
{"total", absl::StrCat(updateMetadata.triplePreparationTime_.count() +
updateMetadata.deletionTime_.count() +
updateMetadata.insertionTime_.count(),
"ms")},
{"triplePreparation",
absl::StrCat(updateMetadata.triplePreparationTime_.count(), "ms")},
{"deletion", absl::StrCat(updateMetadata.deletionTime_.count(), "ms")},
{"insertion", absl::StrCat(updateMetadata.insertionTime_.count(), "ms")}};
{"total", formatTime(updateMetadata.triplePreparationTime_ +
updateMetadata.deletionTime_ +
updateMetadata.insertionTime_)},
{"triplePreparation", formatTime(updateMetadata.triplePreparationTime_)},
{"deletion", formatTime(updateMetadata.deletionTime_)},
{"insertion", formatTime(updateMetadata.insertionTime_)}};
response["time"]["update"] = updateTime;
response["time"]["total"] = absl::StrCat(requestTimer.msecs().count(), "ms");
response["time"]["total"] = formatTime(requestTimer.msecs());
for (auto permutation : Permutation::ALL) {
response["located-triples"][Permutation::toString(
permutation)]["blocks-affected"] =
Expand Down Expand Up @@ -976,10 +979,9 @@ json Server::processUpdateImpl(
// cache key).
cache_.clearAll();

auto response =
createResponseMetadata(requestTimer, index_, deltaTriples, plannedQuery,
qet, countBefore, updateMetadata, countAfter);
return response;
return createResponseMetadataForUpdate(requestTimer, index_, deltaTriples,
plannedQuery, qet, countBefore,
updateMetadata, countAfter);
}

// ____________________________________________________________________________
Expand All @@ -999,27 +1001,24 @@ Awaitable<void> Server::processUpdate(
updateThreadPool_,
[this, &params, &update, &requestTimer, &timeLimit, &messageSender,
&cancellationHandle] {
std::optional<nlohmann::json> response;
// Update the delta triples.
index_.deltaTriplesManager().modify(
return index_.deltaTriplesManager().modify<nlohmann::json>(
[this, &params, &update, &requestTimer, &timeLimit, &messageSender,
&cancellationHandle, &response](auto& deltaTriples) {
&cancellationHandle](auto& deltaTriples) {
// Use `this` explicitly to silence false-positive errors on
// captured `this` being unused.
response = this->processUpdateImpl(
params, update, requestTimer, timeLimit, messageSender,
cancellationHandle, deltaTriples);
return this->processUpdateImpl(params, update, requestTimer,
timeLimit, messageSender,
cancellationHandle, deltaTriples);
});
return response;
},
cancellationHandle);
auto response = co_await std::move(coroutine);

// SPARQL 1.1 Protocol 2.2.4 Successful Responses: "The response body of a
// successful update request is implementation defined."
AD_CORRECTNESS_CHECK(response.has_value());
co_await send(ad_utility::httpUtils::createJsonResponse(
std::move(response.value()), request));
co_await send(
ad_utility::httpUtils::createJsonResponse(std::move(response), request));
co_return;
}

Expand Down
14 changes: 6 additions & 8 deletions src/engine/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,12 @@ class Server {
TimeLimit timeLimit);
// For an executed update create a json with some stats on the update (timing,
// number of changed triples, etc.).
static json createResponseMetadata(const ad_utility::Timer& requestTimer,
const Index& index,
const DeltaTriples& deltaTriples,
const PlannedQuery& plannedQuery,
const QueryExecutionTree& qet,
const DeltaTriplesCount& countBefore,
const UpdateMetadata& updateMetadata,
const DeltaTriplesCount& countAfter);
static json createResponseMetadataForUpdate(
const ad_utility::Timer& requestTimer, const Index& index,
const DeltaTriples& deltaTriples, const PlannedQuery& plannedQuery,
const QueryExecutionTree& qet, const DeltaTriplesCount& countBefore,
const UpdateMetadata& updateMetadata,
const DeltaTriplesCount& countAfter);
FRIEND_TEST(ServerTest, createResponseMetadata);
// Do the actual execution of an update.
Awaitable<void> processUpdate(
Expand Down
39 changes: 28 additions & 11 deletions src/index/DeltaTriples.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,24 +216,41 @@ DeltaTriplesManager::DeltaTriplesManager(const IndexImpl& index)
currentLocatedTriplesSnapshot_{deltaTriples_.wlock()->getSnapshot()} {}

// _____________________________________________________________________________
void DeltaTriplesManager::modify(
const std::function<void(DeltaTriples&)>& function) {
template <typename ReturnType>
ReturnType DeltaTriplesManager::modify(
const std::function<ReturnType(DeltaTriples&)>& function) {
// While holding the lock for the underlying `DeltaTriples`, perform the
// actual `function` (typically some combination of insert and delete
// operations) and (while still holding the lock) update the
// `currentLocatedTriplesSnapshot_`.
deltaTriples_.withWriteLock([this, &function](DeltaTriples& deltaTriples) {
function(deltaTriples);
auto newSnapshot = deltaTriples.getSnapshot();
currentLocatedTriplesSnapshot_.withWriteLock(
[&newSnapshot](auto& currentSnapshot) {
currentSnapshot = std::move(newSnapshot);
});
});
return deltaTriples_.withWriteLock(
[this, &function](DeltaTriples& deltaTriples) {
auto updateSnapshot = [this, &deltaTriples] {
auto newSnapshot = deltaTriples.getSnapshot();
currentLocatedTriplesSnapshot_.withWriteLock(
[&newSnapshot](auto& currentSnapshot) {
currentSnapshot = std::move(newSnapshot);
});
};

if constexpr (std::is_void_v<ReturnType>) {
function(deltaTriples);
updateSnapshot();
} else {
ReturnType returnValue = function(deltaTriples);
updateSnapshot();
return returnValue;
}
});
}
// Explicit instantions
template void DeltaTriplesManager::modify<void>(
std::function<void(DeltaTriples&)> const&);
template nlohmann::json DeltaTriplesManager::modify<nlohmann::json>(
const std::function<nlohmann::json(DeltaTriples&)>&);

// _____________________________________________________________________________
void DeltaTriplesManager::clear() { modify(&DeltaTriples::clear); }
void DeltaTriplesManager::clear() { modify<void>(&DeltaTriples::clear); }

// _____________________________________________________________________________
SharedLocatedTriplesSnapshot DeltaTriplesManager::getCurrentSnapshot() const {
Expand Down
3 changes: 2 additions & 1 deletion src/index/DeltaTriples.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ class DeltaTriplesManager {
// serialized, and each call to `getCurrentSnapshot` will either return the
// snapshot before or after a modification, but never one of an ongoing
// modification.
void modify(const std::function<void(DeltaTriples&)>& function);
template <typename ReturnType>
ReturnType modify(const std::function<ReturnType(DeltaTriples&)>& function);

// Reset the updates represented by the underlying `DeltaTriples` and then
// update the current snapshot.
Expand Down
2 changes: 1 addition & 1 deletion src/index/IndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ void IndexImpl::createFromOnDiskIndex(const string& onDiskBase) {
// `Permutation`class, but we first have to deal with The delta triples for
// the additional permutations.
auto setMetadata = [this](const Permutation& p) {
deltaTriplesManager().modify([&p](DeltaTriples& deltaTriples) {
deltaTriplesManager().modify<void>([&p](DeltaTriples& deltaTriples) {
deltaTriples.setOriginalMetadata(p.permutation(),
p.metaData().blockData());
});
Expand Down
1 change: 0 additions & 1 deletion test/DeltaTriplesCountTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
// 2025 Julian Mundhahs <[email protected]>

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "index/DeltaTriples.h"

Expand Down
2 changes: 1 addition & 1 deletion test/ServerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ TEST(ServerTest, createResponseMetadata) {
DeltaTriplesCount countAfter = deltaTriples.getCounts();

// Assertions
json metadata = Server::createResponseMetadata(
json metadata = Server::createResponseMetadataForUpdate(
requestTimer, index, deltaTriples, plannedQuery,
plannedQuery.queryExecutionTree_, countBefore, updateMetadata,
countAfter);
Expand Down

0 comments on commit 5b9be96

Please sign in to comment.