Skip to content

Commit

Permalink
#48: clear registered regions when resetting a context
Browse files Browse the repository at this point in the history
  • Loading branch information
nmm0 committed Jun 8, 2023
1 parent 5e719fb commit 55a7fcb
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 36 deletions.
15 changes: 8 additions & 7 deletions src/resilience/backend/AutomaticBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,24 @@ class AutomaticBackendBase {
explicit AutomaticBackendBase(ContextBase& ctx) : m_context(ctx) {};

virtual ~AutomaticBackendBase() = default;

//All members should be registered before being checkpointed or restarted
virtual void register_member(Registration& member) = 0;
virtual void unregister_member(const Registration &member) = 0;

//as_global to checkpoint indepently of PID
virtual bool checkpoint(const std::string& label, int version,
const std::unordered_set<Registration> &members,
bool as_global = false) = 0;
//Get the highest version available which is still less than max

//Get the highest version available which is still less than max
// (or just the highest, if max=0)
virtual int latest_version(const std::string& label, int max = 0, bool as_global = false) const noexcept = 0;

//Returns failure flag for recovering the specified members.
//as_global to restart independently of PID
virtual bool restart(const std::string& label, int version,
const std::unordered_set<Registration> &members,
const std::unordered_set<Registration> &members,
bool as_global = false) = 0;

//Reset any state, useful for online-recovery.
Expand All @@ -88,15 +89,15 @@ class AutomaticBackendBase {
};

ContextBase& m_context;


//Delete potentially problematic functions for maintaining consistent state
AutomaticBackendBase(const AutomaticBackendBase&) = delete;
AutomaticBackendBase(AutomaticBackendBase&&) noexcept = delete;
AutomaticBackendBase &operator=( const AutomaticBackendBase & ) = delete;
AutomaticBackendBase &operator=( AutomaticBackendBase && ) = default;
};

using AutomaticBackend = std::shared_ptr<AutomaticBackendBase>;
}

Expand Down
1 change: 1 addition & 0 deletions src/resilience/backend/stdfile/StdFileBackend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class StdFileBackend : public AutomaticBackendBase {

//No state to manage
void register_member(Registration&) override {};
void unregister_member(const Registration &) override {}

bool checkpoint(const std::string &label, int version,
const std::unordered_set<Registration>& members, bool as_global) override;
Expand Down
37 changes: 33 additions & 4 deletions src/resilience/backend/veloc/VelocBackend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

#include "resilience/AutomaticCheckpoint.hpp"

#include "resilience/registration/Registration.hpp"
#include "resilience/util/Trace.hpp"

#define VELOC_SAFE_CALL( call ) KokkosResilience::veloc_internal_safe_call( call, #call, __FILE__, __LINE__ )
Expand Down Expand Up @@ -103,7 +104,15 @@ namespace KokkosResilience
if(success) {
std::set<int> ids;
for(auto member : _members) ids.insert(static_cast<int>(member.hash()));

std::cout << "checkpointing ids (";
std::size_t count = 0;
for ( auto &&id : ids )
{
std::cout << id;
if ( ++count != ids.size() )
std::cout << id << ", ";
}
std::cout << ")\n";
success = VELOC_SAFE_CALL( veloc_client->checkpoint_mem(VELOC_CKPT_SOME, ids) );
}

Expand Down Expand Up @@ -144,7 +153,15 @@ namespace KokkosResilience
if(success){
std::set<int> ids;
for(auto member : _members) ids.insert(static_cast<int>(member.hash()));

std::cout << "restarting ids (";
std::size_t count = 0;
for ( auto &&id : ids )
{
std::cout << id;
if ( ++count != ids.size() )
std::cout << ", ";
}
std::cout << ")\n";
success = VELOC_SAFE_CALL( veloc_client->recover_mem(VELOC_RECOVER_SOME, ids) );
}

Expand All @@ -165,13 +182,25 @@ namespace KokkosResilience
void
VeloCMemoryBackend::register_member(KokkosResilience::Registration &member)
{
auto sfun = member->serializer();
if ( !sfun )
throw std::runtime_error( "invalid member serializer" );
auto dfun = member->deserializer();
if ( !dfun )
throw std::runtime_error( "invalid member deserializer" );
veloc_client->mem_protect(
static_cast<int>(member.hash()),
member->serializer(),
member->deserializer()
std::move(sfun),
std::move(dfun)
);
}

void
VeloCMemoryBackend::unregister_member(const Registration &member)
{
veloc_client->mem_unprotect(static_cast<int>(member.hash()));
}

VeloCFileBackend::VeloCFileBackend(ContextBase& ctx)
: AutomaticBackendBase(ctx) {
const auto &vconf = m_context.config()["backends"]["veloc"]["config"].as< std::string >();
Expand Down
1 change: 1 addition & 0 deletions src/resilience/backend/veloc/VelocBackend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ namespace KokkosResilience
void clear_checkpoints();

void register_member(KokkosResilience::Registration& member) override;
void unregister_member(const Registration &member) override;

void reset() override;
void register_alias( const std::string &original, const std::string &alias );
Expand Down
15 changes: 15 additions & 0 deletions src/resilience/context/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,19 @@ namespace KokkosResilience
}
return buffer.data();
}

void
ContextBase::reset()
{
// Unregister all regions
for ( auto &&[name, members] : regions )
{
for ( const auto &m : members)
{
m_backend->unregister_member(m);
}
}

reset_impl();
}
}
4 changes: 3 additions & 1 deletion src/resilience/context/ContextBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ namespace KokkosResilience
//TODO
};

virtual void reset() = 0;
void reset();

virtual void register_member(KokkosResilience::Registration member){
m_backend->register_member(member);
Expand Down Expand Up @@ -191,6 +191,8 @@ namespace KokkosResilience
template<typename... Traits, typename RegionFunc, typename... T>
void detect_and_register(RegionFunc&& fun, Detail::RegInfo<T>... explicit_members);

virtual void reset_impl() = 0;

//Hold onto a buffer per context for de/serializing non-contiguous or non-host views.
std::vector<char> buffer;

Expand Down
5 changes: 3 additions & 2 deletions src/resilience/context/MPIContext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ class MPIContext : public ContextBase {
return latest;
}

void reset() override { m_backend->reset(); }

private:

void reset_impl() override { m_backend->reset(); }

MPI_Comm m_comm;
};

Expand Down
9 changes: 5 additions & 4 deletions src/resilience/context/StdFileContext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,16 @@ class StdFileContext : public ContextBase {
return m_backend.latest_version(label);
}

void reset() override {
m_backend.reset();
}

void register_alias( const std::string &original, const std::string &alias ) override {

}

private:

void reset_impl() override {
m_backend.reset();
}

std::string m_filename;
Backend m_backend;
};
Expand Down
37 changes: 19 additions & 18 deletions src/resilience/context/VTContext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,34 +114,34 @@ namespace KokkosResilience {
class VTContext : public ContextBase {
public:
explicit VTContext(const std::string& config_file)
: ContextBase(config_file, vt::theContext()->getNode()),
: ContextBase(config_file, vt::theContext()->getNode()),
contexts_proxy(vt::theObjGroup()->makeCollective(this, "kr::VTContext")) {
next_epoch = vt::theTerm()->makeEpochCollective("kr::VTContext initial checkpoint epoch");
}

using ProxyID = KokkosResilience::Util::VT::ProxyID;
using VTProxyHolder = Detail::VTProxyHolder;
using VTAction = Detail::VTAction;
using VTActionMsg = Detail::VTActionMsg;

VTContext(const VTContext &) = delete;
VTContext(VTContext &&) noexcept = default;

VTContext &operator=(const VTContext &) = delete;
VTContext &operator=(VTContext &&) noexcept = default;

virtual ~VTContext() {
//Wait for next_epoch to finish being created, then finish it.
vt::theSched()->runSchedulerWhile([&]{return next_epoch == vt::no_epoch;});
vt::theTerm()->finishedEpoch(next_epoch);

vt::theObjGroup()->destroyCollective(contexts_proxy);
}

bool restart_available(const std::string &label, int version) override {
return m_backend->restart_available(label, version);
}

void restart(const std::string &label, int version,
const std::unordered_set<Registration> &members) override {
begin_operation();
Expand All @@ -151,7 +151,7 @@ namespace KokkosResilience {

end_operation();
}

void checkpoint(const std::string &label, int version,
const std::unordered_set<Registration> &members) override {
begin_operation();
Expand All @@ -161,18 +161,19 @@ namespace KokkosResilience {

end_operation();
}

int latest_version(const std::string &label) const noexcept override {
return m_backend->latest_version(label);
}

void reset() override { m_backend->reset(); }

//Handles initialization for new holders
template<typename T>
VTProxyHolder& get_holder(T proxy, bool mark_modified = true);

private:

void reset_impl() override { m_backend->reset(); }

//Start a new resilience operation
// Manages the epochs, waiting for previous, beginning new, etc.
void begin_operation();
Expand All @@ -182,26 +183,26 @@ namespace KokkosResilience {
void checkpoint_proxies();

//Recursive restart which manages potentially non-registered proxies
// assume_collective to assume non-local proxies are already known by their
// assume_collective to assume non-local proxies are already known by their
// local context to need to recover
void restart_proxy(ProxyID proxy, int version, bool is_remote_request = false);
void restart_proxies();

//Handle marking element or group as modified and the possible
//remote operations required.
template<typename ProxyT>
void mark_modified(ProxyT& proxy, VTProxyHolder& holder,
void mark_modified(ProxyT& proxy, VTProxyHolder& holder,
bool is_remote_request = false);


//Send an action to be executed before the next checkpoint/recovery function
//can begin.
void msg_before_checkpoint(VTProxyHolder& holder, VTAction action, int arg = 0);
template<auto func, typename... Args>
void msg_before_checkpoint(VTContextElmProxy dest, Args... args);
template<auto func, typename... Args>
void msg_before_checkpoint(VTContextProxy dest, Args... args);

template<typename ProxyT, typename GroupProxyT>
void action_handler(ProxyT elm, GroupProxyT group, const VTActionMsg& msg);

Expand All @@ -210,13 +211,13 @@ namespace KokkosResilience {
template<typename GroupProxyT>
void register_group(GroupProxyT group, bool should_mark_modified);
void restart_group(ProxyID group, int version);

//For recovering unregistered entities, manually generate an element
//registration from a reference to the group (potentially gathered from
//some other registered element).
template<typename GroupProxyT>
void register_element(GroupProxyT group, uint64_t index_bits);

//Remote context notifiying us
template<typename ProxyT>
void remotely_modified(ProxyT proxy);
Expand All @@ -238,7 +239,7 @@ namespace KokkosResilience {
//Map from a collection/objgroup to some registered proxy within it,
//for reconstructing as needed.
std::unordered_map<ProxyID, ProxyID> groups;

//Local proxies known to have been changed since last checkpoint
std::unordered_set<ProxyID> modified_proxies;

Expand Down

0 comments on commit 55a7fcb

Please sign in to comment.