Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process config::update/delete cluster events gracefully #9980

Merged
merged 3 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions lib/remote/apilistener-configsync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "base/configtype.hpp"
#include "base/json.hpp"
#include "base/convert.hpp"
#include "base/defer.hpp"
#include "config/vmops.hpp"
#include <fstream>

Expand Down Expand Up @@ -104,6 +105,15 @@ Value ApiListener::ConfigUpdateObjectAPIHandler(const MessageOrigin::Ptr& origin
return Empty;
}

// Wait for the object name to become available for processing and block it immediately.
yhabteab marked this conversation as resolved.
Show resolved Hide resolved
// Doing so guarantees that only one cluster event (create/update/delete) of a given
// object is being processed at any given time.
listener->m_ObjectConfigChangeLock.Lock(ptype, objName);

Defer unlockAndNotify([&listener, &ptype, &objName]{
listener->m_ObjectConfigChangeLock.Unlock(ptype, objName);
});

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just screams for a RAII style class like CpuBoundWork.

ConfigObject::Ptr object = ctype->GetObject(objName);

String config = params->Get("config");
Expand Down Expand Up @@ -258,6 +268,15 @@ Value ApiListener::ConfigDeleteObjectAPIHandler(const MessageOrigin::Ptr& origin
return Empty;
}

// Wait for the object name to become available for processing and block it immediately.
// Doing so guarantees that only one cluster event (create/update/delete) of a given
// object is being processed at any given time.
listener->m_ObjectConfigChangeLock.Lock(ptype, objName);

Defer unlockAndNotify([&listener, &ptype, &objName]{
listener->m_ObjectConfigChangeLock.Unlock(ptype, objName);
});

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Especially as there's >1 usage...

ConfigObject::Ptr object = ctype->GetObject(objName);

if (!object) {
Expand Down Expand Up @@ -462,3 +481,37 @@ void ApiListener::SendRuntimeConfigObjects(const JsonRpcConnection::Ptr& aclient
Log(LogInformation, "ApiListener")
<< "Finished syncing runtime objects to endpoint '" << endpoint->GetName() << "'.";
}

/**
* Locks the specified object name of the given type. If it is already locked, the call blocks until the lock is released.
*
* @param Type::Ptr ptype The type of the object you want to lock
* @param String objName The object name you want to lock
*/
void ObjectNameMutex::Lock(const Type::Ptr& ptype, const String& objName)
{
std::unique_lock<std::mutex> lock(m_Mutex);
m_CV.wait(lock, [this, &ptype, &objName]{
auto& locked = m_LockedObjectNames[ptype.get()];
return locked.find(objName) == locked.end();
});

// Add object name to the locked list again to block all other threads that try
// to process a message affecting the same object.
m_LockedObjectNames[ptype.get()].emplace(objName);
}

/**
* Unlocks the specified object name of the given type.
*
* @param Type::Ptr ptype The type of the object you want to unlock
* @param String objName The name of the object you want to unlock
*/
void ObjectNameMutex::Unlock(const Type::Ptr& ptype, const String& objName)
{
{
std::unique_lock<std::mutex> lock(m_Mutex);
m_LockedObjectNames[ptype.get()].erase(objName);
}
m_CV.notify_all();
Al2Klimov marked this conversation as resolved.
Show resolved Hide resolved
}
23 changes: 23 additions & 0 deletions lib/remote/apilistener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,26 @@ enum class ApiCapabilities : uint_fast64_t
IfwApiCheckCommand = 1u << 1u,
};

/**
* Allows you to easily lock/unlock a specific object of a given type by its name.
*
* That way, locking an object "this" of type Host does not affect an object "this" of
* type "Service" nor an object "other" of type "Host".
*
* @ingroup remote
*/
class ObjectNameMutex
julianbrost marked this conversation as resolved.
Show resolved Hide resolved
{
public:
void Lock(const Type::Ptr& ptype, const String& objName);
void Unlock(const Type::Ptr& ptype, const String& objName);

private:
std::mutex m_Mutex;
std::condition_variable m_CV;
std::map<Type*, std::set<String>> m_LockedObjectNames;
};

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and even a dedicated class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have absolutely no idea what you're talking about! This class literally locks and unlocks Mutex and you want another Mutex on top of that?

Copy link
Member

@Al2Klimov Al2Klimov Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your class is like std::mutex. And I'd like something like std::unique_lock on top. Btw. depending on how you name your methods, you could actually use std::unique_lock itself! 💡

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just screams for a RAII style class like CpuBoundWork.

I mean that would be nice to have.

Especially as there's >1 usage...

Well, the number is 2, so IMHO that's also fine with Defer.

Btw. depending on how you name your methods, you could actually use std::unique_lock itself!

I also thought about that before. Names aren't the only issue here. You can't pass extra parameters and std::unique_lock takes a reference to the underlying mutex, whereas with this class, if a name is not locked, there's no memory behind it so nothing that could be referenced.

/**
* @ingroup remote
*/
Expand Down Expand Up @@ -257,6 +277,9 @@ class ApiListener final : public ObjectImpl<ApiListener>
mutable std::mutex m_ActivePackageStagesLock;
std::map<String, String> m_ActivePackageStages;

/* ensures that at most one create/update/delete is being processed per object at each time */
mutable ObjectNameMutex m_ObjectConfigChangeLock;

void UpdateActivePackageStagesCache();
};

Expand Down
28 changes: 17 additions & 11 deletions lib/remote/configobjectutility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "remote/apilistener.hpp"
#include "config/configcompiler.hpp"
#include "config/configitem.hpp"
#include "base/atomic-file.hpp"
#include "base/configwriter.hpp"
#include "base/exception.hpp"
#include "base/dependencygraph.hpp"
Expand Down Expand Up @@ -198,13 +199,21 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full
return false;
}

// AtomicFile doesn't create not yet existing directories, so we have to do it by ourselves.
Utility::MkDirP(Utility::DirName(path), 0700);

std::ofstream fp(path.CStr(), std::ofstream::out | std::ostream::trunc);
// Using AtomicFile guarantees that two different threads simultaneously creating and loading the same
// configuration file do not interfere with each other, as the configuration is stored in a unique temp file.
// When one thread fails to pass object validation, it only deletes its temporary file and does not affect
// the other thread in any way.
AtomicFile fp(path, 0644);
fp << config;
fp.close();
// Flush the output buffer to catch any errors ASAP and handle them accordingly!
// Note: AtomicFile places these configs in a temp file and will be automatically
// discarded when it is not committed before going out of scope.
fp.flush();

std::unique_ptr<Expression> expr = ConfigCompiler::CompileFile(path, String(), "_api");
std::unique_ptr<Expression> expr = ConfigCompiler::CompileText(path, config, String(), "_api");

try {
ActivationScope ascope;
Expand All @@ -225,9 +234,7 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full
if (!ConfigItem::CommitItems(ascope.GetContext(), upq, newItems, true)) {
if (errors) {
Log(LogNotice, "ConfigObjectUtility")
<< "Failed to commit config item '" << fullName << "'. Aborting and removing config path '" << path << "'.";

Utility::Remove(path);
<< "Failed to commit config item '" << fullName << "'.";

for (const boost::exception_ptr& ex : upq.GetExceptions()) {
errors->Add(DiagnosticInformation(ex, false));
Expand All @@ -248,9 +255,7 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full
if (!ConfigItem::ActivateItems(newItems, true, false, false, cookie)) {
if (errors) {
Log(LogNotice, "ConfigObjectUtility")
<< "Failed to activate config object '" << fullName << "'. Aborting and removing config path '" << path << "'.";

Utility::Remove(path);
<< "Failed to activate config object '" << fullName << "'.";

for (const boost::exception_ptr& ex : upq.GetExceptions()) {
errors->Add(DiagnosticInformation(ex, false));
Expand All @@ -275,6 +280,9 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full
ConfigObject::Ptr obj = ctype->GetObject(fullName);

if (obj) {
// Object has surpassed the compiling/validation processes, we can safely commit the file!
fp.Commit();

yhabteab marked this conversation as resolved.
Show resolved Hide resolved
Log(LogInformation, "ConfigObjectUtility")
<< "Created and activated object '" << fullName << "' of type '" << type->GetName() << "'.";
} else {
Expand All @@ -283,8 +291,6 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full
}

} catch (const std::exception& ex) {
Utility::Remove(path);

if (errors)
errors->Add(DiagnosticInformation(ex, false));

Expand Down
Loading