Skip to content

Commit

Permalink
Working with thor master/slave
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Dec 13, 2024
1 parent f9b1dda commit 4c88a61
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 16 deletions.
3 changes: 2 additions & 1 deletion dali/base/daclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ bool initClientProcess(IGroup *servergrp, DaliClientRole role, unsigned mpport,
// causes any config update hooks (installed by installConfigUpdateHook() to trigger on an env. change)
switch (role)
{
case DCR_ThorMaster:
case DCR_EclServer:
case DCR_EclAgent:
case DCR_SashaServer:
Expand All @@ -159,6 +158,8 @@ bool initClientProcess(IGroup *servergrp, DaliClientRole role, unsigned mpport,
case DCR_EclCCServer:
installEnvConfigMonitor();
break;
// Thor does not monitor because a fixed configuration is serialized to the slaves
case DCR_ThorMaster:
default:
break;
}
Expand Down
60 changes: 47 additions & 13 deletions system/jlib/jptree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8707,7 +8707,7 @@ class CConfigUpdater : public CInterface
StringAttr componentTag, envPrefix, legacyFilename;
IPropertyTree * (*mapper)(IPropertyTree *);
StringAttr altNameAttribute;
Owned<IFileEventWatcher> fileWatcher;
Owned<IFileEventWatcher> fileWatcher; // null if updates to the config file are not allowed
CriticalSection notifyFuncCS;
unsigned notifyFuncId = 0;
std::unordered_map<unsigned, ConfigUpdateFunc> notifyConfigUpdates;
Expand Down Expand Up @@ -8735,7 +8735,7 @@ class CConfigUpdater : public CInterface
args.append(arg);
args.append(nullptr);

refreshConfiguration(true);
refreshConfiguration(true, false);
}
bool startMonitoring()
{
Expand All @@ -8754,7 +8754,7 @@ class CConfigUpdater : public CInterface
changed = changed | containsFileWatchEvents(events, FileWatchEvents::movedTo) && streq(filename, "..data");

if (changed)
refreshConfiguration(false);
refreshConfiguration(false, false);
};

try
Expand Down Expand Up @@ -8804,18 +8804,46 @@ class CConfigUpdater : public CInterface
}
}

void refreshConfiguration(bool firstTime)
void refreshConfiguration(bool firstTime, bool avoidClone)
{
auto result = doLoadConfiguration(componentDefault, globalDefault, args.getArray(), componentTag, envPrefix, legacyFilename, mapper, altNameAttribute);
IPropertyTree * newComponentConfiguration = std::get<1>(result);
IPropertyTree * newGlobalConfiguration = std::get<2>(result);
refreshConfiguration(newComponentConfiguration, newGlobalConfiguration);
if (firstTime || fileWatcher)
{
auto result = doLoadConfiguration(componentDefault, globalDefault, args.getArray(), componentTag, envPrefix, legacyFilename, mapper, altNameAttribute);
IPropertyTree * newComponentConfiguration = std::get<1>(result);
IPropertyTree * newGlobalConfiguration = std::get<2>(result);
refreshConfiguration(newComponentConfiguration, newGlobalConfiguration);

if (firstTime)
if (firstTime)
{
absoluteConfigFilename.set(std::get<0>(result).c_str());
newGlobalConfiguration->getProp("@name", componentName);
}
}
else if (avoidClone)
{
Owned<IPropertyTree> newComponentConfiguration = getComponentConfigSP();
Owned<IPropertyTree> newGlobalConfiguration = getGlobalConfigSP();
refreshConfiguration(newComponentConfiguration, newGlobalConfiguration);
}
else
{
absoluteConfigFilename.set(std::get<0>(result).c_str());
newGlobalConfiguration->getProp("@name", componentName);
DBGLOG("Refresh %p,", getComponentConfigSP().get());
dbglogXML(getComponentConfigSP());
// File monitor is disabled - no updates to the configuration files are supported.
//So clone the existing configuration and use that to refresh the config - update fucntions may perform differently.
Owned<IPropertyTree> newComponentConfiguration = createPTreeFromIPT(getComponentConfigSP());
Owned<IPropertyTree> newGlobalConfiguration = createPTreeFromIPT(getGlobalConfigSP());
refreshConfiguration(newComponentConfiguration, newGlobalConfiguration);
}
DBGLOG("Refreshed %p,", getComponentConfigSP().get());
dbglogXML(getComponentConfigSP());
}

void executeCallbacks()
{
CriticalBlock notifyBlock(notifyFuncCS);
CriticalBlock configBlock(configCS);
executeCallbacks(componentConfiguration, globalConfiguration);
}

void executeCallbacks(IPropertyTree *oldComponentConfiguration, IPropertyTree *oldGlobalConfiguration)
Expand Down Expand Up @@ -8852,7 +8880,11 @@ class CConfigUpdater : public CInterface
modifyConfigUpdates[notifyFuncId] = notifyFunc;
if (isInitialized())
{
refreshConfiguration(false); // force all cached values to be recalculated
//Force all cached values to be recalculated, do not reload the config
//This is only legal if no other threads are accessing the config yet - otherwise the reading thread
//could crash when the global configuration is updated.
//MORE: This function needs an extra parameter to indicate whether or not threads have already started/avoid clone
refreshConfiguration(false, true);
DBGLOG("Modify functions should be registered before the configuration is loaded");
}
return notifyFuncId;
Expand Down Expand Up @@ -8914,7 +8946,7 @@ void refreshConfiguration()
{
if (!configFileUpdater) // NB: refreshConfiguration() should always be called after configFileUpdater is initialized
return;
configFileUpdater->refreshConfiguration(false);
configFileUpdater->refreshConfiguration(false, false);
}

void CConfigUpdateHook::clear()
Expand Down Expand Up @@ -10199,6 +10231,8 @@ void setExpertOpt(const char *opt, const char *value)
config->setPropTree(xpath);
getExpertOptPath(opt, xpath.clear());
config->setProp(xpath, value);
DBGLOG("*** SET *** %p", config.get());
dbglogXML(config);
}

//---------------------------------------------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jptree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ typedef std::function<void (IPropertyTree * newComponentConfiguration, IProperty
jlib_decl unsigned installConfigUpdateHook(ConfigUpdateFunc notifyFunc, bool callWhenInstalled);
jlib_decl unsigned installConfigUpdateHook(ConfigModifyFunc notifyFunc); // This function must be called before the configuration is loaded.
jlib_decl void removeConfigUpdateHook(unsigned notifyFuncId);
jlib_decl void refreshConfiguration();
jlib_decl void refreshConfiguration(); // (Optionally) reload the configuration file, reapply changes, and derive cached information

class jlib_decl CConfigUpdateHook
{
Expand Down
10 changes: 9 additions & 1 deletion thorlcr/master/thmastermain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,17 @@ class CRegistryServer : public CSimpleInterface
publishPodNames(workunit, graphName, &connectedWorkers);
}

//Check that nothing has caused the global configuration to be refreshed - otherwise inconsistent values may be used by the slave
assertex(globals == getComponentConfigSP());

PROGLOG("Workers connected, initializing..");
msg.clear();
msg.append(THOR_VERSION_MAJOR).append(THOR_VERSION_MINOR);
processGroup->serialize(msg);
globals->serialize(msg);
getGlobalConfigSP()->serialize(msg);
DBGLOG("**** CONFIG ****");
dbglogXML(globals);
msg.append(managerWorkerMpTag);
msg.append(kjServiceMpTag);
if (!queryNodeComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND))
Expand Down Expand Up @@ -636,7 +641,9 @@ int main( int argc, const char *argv[] )
InitModuleObjects();
NoQuickEditSection xxx;
{
globals.setown(loadConfiguration(thorDefaultConfigYaml, argv, "thor", "THOR", "thor.xml", nullptr, nullptr, false));
bool monitorConfig = false; // Do not allow updates to the config file, otherwise the slave may not be in sync.
//MORE: What about updates to storage planes - they will not be passed through to the slaves
globals.setown(loadConfiguration(thorDefaultConfigYaml, argv, "thor", "THOR", "thor.xml", nullptr, nullptr, monitorConfig));
}
#ifdef _DEBUG
unsigned holdWorker = globals->getPropInt("@holdSlave", NotFound);
Expand Down Expand Up @@ -754,6 +761,7 @@ int main( int argc, const char *argv[] )
}
}

//This can only be called once dali is initialised
initializeStorageGroups(true);

if (globals->getPropBool("@MPChannelReconnect"))
Expand Down

0 comments on commit 4c88a61

Please sign in to comment.