Skip to content

Commit

Permalink
Merge pull request #147 from abb3r/fixGwhisperHangOnDescDbStreamClose
Browse files Browse the repository at this point in the history
fixGwhisperHangOnDescDbStreamClose
  • Loading branch information
rainerschoe authored Nov 20, 2023
2 parents 6bacb9a + 1c7d225 commit 9d7a9d6
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 20 deletions.
14 changes: 13 additions & 1 deletion src/libCli/Call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,19 @@ namespace cli
deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(customTimeoutMs);
}
}


//before calling the RPC, close the DescDb connection with a timeout.
grpc::Status dbDescStatus = ConnectionManager::getInstance().closeDescDbWithDeadline(serverAddress, deadline);
if (not dbDescStatus.ok())
{
std::cerr << "Failed to close reflection stream ;( Status code: " << std::to_string(dbDescStatus.error_code()) << " " << cli::getGrpcStatusCodeAsString(dbDescStatus.error_code()) << ", error message: " << dbDescStatus.error_message() << std::endl;
if(dbDescStatus.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED)
{
std::cerr << "Note: You can increase the deadline by setting the --rpcTimeoutMilliseconds option to a number or 'None'." << std::endl;
}
return -1;
}

grpc::testing::CliCall call(channel, methodStr, clientMetadata, deadline);

auto messageFormatter = createMessageFormatter(parseTree);
Expand Down
18 changes: 18 additions & 0 deletions src/libCli/ConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,24 @@ namespace cli
return m_connections[f_serverAddress].descPool;
}

grpc::Status ConnectionManager::closeDescDbWithDeadline(std::string f_serverAddress,
std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline)
{
if (m_connections[f_serverAddress].descDbProxy == nullptr)
{
std::cerr << "Error: Unable to close DescDb connection!" << std::endl;
return grpc::Status( grpc::StatusCode::ABORTED, "descDbProxy has not been initialized.");
}

//if proxy exists close the stream with a deadline.
grpc::Status status = m_connections[f_serverAddress].descDbProxy->closeDescDbStream(deadline);

//delete the proxy, findChannelByAddress() protects from accessing uninitialzed DbProxy.
m_connections[f_serverAddress].descDbProxy.reset();

return status;
}

void ConnectionManager::ensureDescDbProxyAndDescPoolIsAvailable(std::string &f_serverAddress, ArgParse::ParsedElement &f_parseTree)
{
if (m_connections[f_serverAddress].channel)
Expand Down
8 changes: 8 additions & 0 deletions src/libCli/libCli/ConnectionManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ namespace cli
/// @returns the gRpc DescriptorPool of the corresponding server address.
std::shared_ptr<grpc::protobuf::DescriptorPool> getDescPool(std::string f_serverAddress, ArgParse::ParsedElement &f_parseTree);

/// @brief closes the DescDb stream with a given deadline.
/// @param f_serverAddress server addresss to lookup the assigned DescDbProxy.
/// @param deadline optional dealine for closing the stream.
/// @return returns grpc::StatusCode::ABORTED status if no DescDb proxy is attached to the server address,
/// otherwise grpc status as a result of stream closure.
grpc::Status closeDescDbWithDeadline(std::string f_serverAddress,
std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline);

private:
ConnectionManager() {}
~ConnectionManager() {}
Expand Down
11 changes: 10 additions & 1 deletion src/libLocalDescriptorCache/DescDbProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,12 +350,21 @@ void DescDbProxy::getDescriptors(const std::string &f_hostAddress)
}
}

grpc::Status DescDbProxy::closeDescDbStream(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline)
{
if ( m_reflectionDescDb == nullptr )
{
return grpc::Status::OK;
}
return m_reflectionDescDb->closeStreamWithDeadline(deadline);
}

DescDbProxy::DescDbProxy(bool disableCache, const std::string &hostAddress, std::shared_ptr<grpc::Channel> channel,
ArgParse::ParsedElement &parseTree)
{
m_channel = channel;
m_parseTree = parseTree;

m_disableCache = disableCache;
if(disableCache)
{
// Get Desc directly via reflection and without touching localDB
Expand Down
7 changes: 7 additions & 0 deletions src/libLocalDescriptorCache/DescDbProxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <string>
#include <time.h>
#include <set>
#include <optional>

#include <grpcpp/grpcpp.h>
#include <gRPC_utils/proto_reflection_descriptor_database.h>
Expand Down Expand Up @@ -56,6 +57,11 @@ class DescDbProxy : public grpc::protobuf::DescriptorDatabase{
/// Stores DescDB acquired via sever reflection locally as a DB file in proto3 structure.
/// @param hostAdress Address to the current host
void getDescriptors(const std::string &hostAddress);

/// @brief close the DescDb stream with a given deadline. If the dealine is not set it waits for the stream to close indefinitely.
/// @param deadline optional deadline to close the DescDb stream.
/// @return return grpc status as a result of call the finish() on the DescDb stream.
grpc::Status closeDescDbStream(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline);

DescDbProxy(bool disableCache, const std::string &hostAddress, std::shared_ptr<grpc::Channel> channel, ArgParse::ParsedElement &parseTree);

Expand Down Expand Up @@ -112,5 +118,6 @@ class DescDbProxy : public grpc::protobuf::DescriptorDatabase{
std::vector<const grpc::protobuf::FileDescriptor*>m_descList;
std::set<std::string> m_descNames;
std::vector<grpc::string> m_serviceList;
bool m_disableCache;
};

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
// MODIFIED by IBM (Rainer Schoenberger)
// original: #include "src/proto/grpc/reflection/v1alpha/reflection.grpc.pb.h"
// MODIFIED by IBM (Fabian Pfeifroth-Brumm)
// MODIFIED by IBM (Rahman Abber Tahir)
#include <optional>
#include "reflection.grpc.pb.h"
// END MODIFIED

Expand Down Expand Up @@ -80,6 +82,11 @@ class ProtoReflectionDescriptorDatabase : public protobuf::DescriptorDatabase {
// Provide a list of full names of registered services
bool GetServices(std::vector<grpc::string>* output);

/// @brief close the reflection stream with a given deadline. If the dealine is not set it waits for the stream to close indefinitely.
/// @param deadline optional deadline to close the reflection stream.
/// @return return grpc status as a result of call the finish() on the reflection stream.
grpc::Status closeStreamWithDeadline(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline);

private:
typedef ClientReaderWriter<
grpc::reflection::v1alpha::ServerReflectionRequest,
Expand All @@ -98,6 +105,8 @@ class ProtoReflectionDescriptorDatabase : public protobuf::DescriptorDatabase {
const grpc::reflection::v1alpha::ServerReflectionRequest& request,
grpc::reflection::v1alpha::ServerReflectionResponse& response);

grpc::Status closeStream();

std::shared_ptr<ClientStream> stream_;
grpc::ClientContext ctx_;
std::unique_ptr<grpc::reflection::v1alpha::ServerReflection::Stub> stub_;
Expand Down
58 changes: 40 additions & 18 deletions third_party/gRPC_utils/proto_reflection_descriptor_database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// MODIFIED by IBM (Rainer Schoenberger)
// original: #include "test/cpp/util/proto_reflection_descriptor_database.h"
#include "proto_reflection_descriptor_database.h"
// MODIFIED by IBM (Rahman Abber Tahir)
// END MODIFIED

#include <vector>
Expand All @@ -42,24 +43,7 @@ ProtoReflectionDescriptorDatabase::ProtoReflectionDescriptorDatabase(
: stub_(ServerReflection::NewStub(channel)) {}

ProtoReflectionDescriptorDatabase::~ProtoReflectionDescriptorDatabase() {
if (stream_) {
stream_->WritesDone();
Status status = stream_->Finish();
if (!status.ok()) {
if (status.error_code() == StatusCode::UNIMPLEMENTED) {
fprintf(stderr,
"Reflection request not implemented; "
"is the ServerReflection service enabled?\n");
} else {
fprintf(stderr,
"ServerReflectionInfo rpc failed. Error code: %d, message: %s, "
"debug info: %s\n",
static_cast<int>(status.error_code()),
status.error_message().c_str(),
ctx_.debug_error_string().c_str());
}
}
}
closeStream();
}

bool ProtoReflectionDescriptorDatabase::FindFileByName(
Expand Down Expand Up @@ -333,4 +317,42 @@ bool ProtoReflectionDescriptorDatabase::DoOneRequest(
return success;
}

grpc::Status ProtoReflectionDescriptorDatabase::closeStreamWithDeadline(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline)
{
stream_mutex_.lock();
if( deadline != std::nullopt )
{
ctx_.set_deadline(deadline.value());
}

auto status = closeStream();
stream_.reset();
stream_mutex_.unlock();
return status;
}

grpc::Status ProtoReflectionDescriptorDatabase::closeStream()
{
Status status;
if (stream_) {
stream_->WritesDone();
status = stream_->Finish();
if (!status.ok()) {
if (status.error_code() == StatusCode::UNIMPLEMENTED) {
fprintf(stderr,
"Reflection request not implemented; "
"is the ServerReflection service enabled?\n");
} else {
fprintf(stderr,
"ServerReflectionInfo rpc failed. Error code: %d, message: %s, "
"debug info: %s\n",
static_cast<int>(status.error_code()),
status.error_message().c_str(),
ctx_.debug_error_string().c_str());
}
}
}
return status;
}

} // namespace grpc

0 comments on commit 9d7a9d6

Please sign in to comment.