Skip to content

Commit

Permalink
JsonRpcConnection: Log message processing stats
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed Aug 30, 2024
1 parent f85f9a3 commit c8832b5
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 12 deletions.
55 changes: 44 additions & 11 deletions lib/remote/jsonrpcconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ void JsonRpcConnection::Start()

void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
{
namespace ch = std::chrono;

m_Stream->next_layer().SetSeen(&m_Seen);

for (;;) {
String message;
String jsonString;

try {
message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
jsonString = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
} catch (const std::exception& ex) {
Log(m_ShuttingDown ? LogDebug : LogNotice, "JsonRpcConnection")
<< "Error while reading JSON-RPC message for identity '" << m_Identity
Expand All @@ -76,20 +78,55 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
}

m_Seen = Utility::GetTime();
if (m_Endpoint) {
m_Endpoint->AddMessageReceived(jsonString.GetLength());
}

auto start (ch::steady_clock::now());
ch::steady_clock::duration cpuBoundDuration, totalDuration;

String rpcMethod("UNKNOWN");
String diagnosticInfo; // Contains the diagnostic/debug information in case of an error.

try {
Defer extractTotalDuration ([&start, &totalDuration]() {
totalDuration = ch::steady_clock::now() - start;
});

CpuBoundWork handleMessage (yc);

// Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
cpuBoundDuration = ch::steady_clock::now() - start;

Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
if (auto method = message->Get("method"); !method.IsEmpty()) {
rpcMethod = method;
}

MessageHandler(message);

l_TaskStats.InsertValue(Utility::GetTime(), 1);
} catch (const std::exception& ex) {
Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection")
<< "Error while processing JSON-RPC message for identity '" << m_Identity
<< "': " << DiagnosticInformation(ex);
diagnosticInfo = DiagnosticInformation(ex);
}

break;
auto severity = LogDebug;
if (totalDuration >= ch::seconds(5) || (!m_ShuttingDown && !diagnosticInfo.IsEmpty())) {
// Processing that RPC message seems to take an unexpectedly long time,
// so promote the log entry from debug to warning.
severity = LogWarning;
}

Log statsLog(severity, "JsonRpcConnection");
statsLog << (!diagnosticInfo.IsEmpty() ? "Error while processing" : "Processing") << " JSON-RPC '"
<< rpcMethod << "' message for identity '" << m_Identity << "' ";

if (cpuBoundDuration >= ch::seconds(1)) {
statsLog << "waited " << ch::duration_cast<ch::milliseconds>(cpuBoundDuration).count() << "ms on semaphore and";
}

statsLog << "took total " << ch::duration_cast<ch::milliseconds>(totalDuration).count()
<< "ms" << (diagnosticInfo.IsEmpty() ? "." : ": "+diagnosticInfo);
}

Disconnect();
Expand Down Expand Up @@ -245,10 +282,8 @@ void JsonRpcConnection::Disconnect()
});
}

void JsonRpcConnection::MessageHandler(const String& jsonString)
void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
{
Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);

if (m_Endpoint && message->Contains("ts")) {
double ts = message->Get("ts");

Expand All @@ -267,8 +302,6 @@ void JsonRpcConnection::MessageHandler(const String& jsonString)
origin->FromZone = m_Endpoint->GetZone();
else
origin->FromZone = Zone::GetByName(message->Get("originZone"));

m_Endpoint->AddMessageReceived(jsonString.GetLength());
}

Value vmethod;
Expand Down
14 changes: 13 additions & 1 deletion lib/remote/jsonrpcconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,19 @@ class JsonRpcConnection final : public Object
void CheckLiveness(boost::asio::yield_context yc);

bool ProcessMessage();
void MessageHandler(const String& jsonString);

/**
* MessageHandler routes the provided message to its corresponding handler (if any).
*
* This will first verify the timestamp of that RPC message (if any) and subsequently, rejects any message whose
* timestamp is less than the remote log position of the client Endpoint; otherwise, the endpoint's remote log
* position is updated to that timestamp. It is not expected to happen, but any message lacking an RPC method or
* referring to a non-existent one is also discarded. Afterwards, the RPC handler is then called for that message
* and sends it's result back to the sender if the message contains an ID.
*
* @param message Dictionary::Ptr The RPC message you want to process.
*/
void MessageHandler(const Dictionary::Ptr& message);

void CertificateRequestResponseHandler(const Dictionary::Ptr& message);

Expand Down

0 comments on commit c8832b5

Please sign in to comment.