Skip to content

Commit

Permalink
Resolve the conflict between thor and roxie semantics
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Dec 13, 2024
1 parent 4c88a61 commit f384cc6
Show file tree
Hide file tree
Showing 15 changed files with 24 additions and 31 deletions.
6 changes: 3 additions & 3 deletions dali/base/dafdesc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3794,16 +3794,16 @@ static void doInitializeStorageGroups(bool createPlanesFromGroups, IPropertyTree
setupContainerizedStorageLocations();
}

void initializeStorageGroups(bool createPlanesFromGroups)
void initializeStoragePlanes(bool createPlanesFromGroups, bool threadSafe)
{
auto updateFunc = [createPlanesFromGroups](IPropertyTree * newComponentConfiguration, IPropertyTree * newGlobalConfiguration)
{
//MORE: createPlanesFromGroup will never be updated....
PROGLOG("initializeStorageGroups update");
PROGLOG("initializeStoragePlanes update");
doInitializeStorageGroups(createPlanesFromGroups, newGlobalConfiguration);
};

configUpdateHook.installOnce(updateFunc);
configUpdateHook.installModifierOnce(updateFunc, threadSafe);
}

bool getDefaultStoragePlane(StringBuffer &ret)
Expand Down
2 changes: 1 addition & 1 deletion dali/base/dafdesc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ extern da_decl StringBuffer &getPartMask(StringBuffer &ret,const char *lname=NUL
extern da_decl void setPartMask(const char * mask);
extern da_decl bool setReplicateDir(const char *name,StringBuffer &out, bool isrep=true,const char *baseDir=NULL,const char *repDir=NULL); // changes directory of name passed to backup directory

extern da_decl void initializeStorageGroups(bool createPlanesFromGroups);
extern da_decl void initializeStoragePlanes(bool createPlanesFromGroups, bool threadSafe); // threadSafe should be true if no other threads will be accessing the global config
extern da_decl bool getDefaultStoragePlane(StringBuffer &ret);
extern da_decl bool getDefaultSpillPlane(StringBuffer &ret);
extern da_decl bool getDefaultIndexBuildStoragePlane(StringBuffer &ret);
Expand Down
2 changes: 1 addition & 1 deletion dali/daliadmin/daadmin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ bool dfspart(const char *lname, IUserDescriptor *userDesc, unsigned partnum, Str
void dfsmeta(const char *filename,IUserDescriptor *userDesc, bool includeStorage)
{
//This function isn't going to work on a container system because it won't have access to the storage planes
initializeStorageGroups(true);
initializeStoragePlanes(true, true);
ResolveOptions options = ROpartinfo|ROdiskinfo|ROsizes;
if (includeStorage)
options = options | ROincludeLocation;
Expand Down
2 changes: 1 addition & 1 deletion dali/dfu/dfuserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ int main(int argc, const char *argv[])
IPropertyTree * config = nullptr;
installDefaultFileHooks(config);

initializeStorageGroups(true);
initializeStoragePlanes(true, true);
}
StringBuffer queue, monitorQueue;
#ifndef _CONTAINERIZED
Expand Down
2 changes: 1 addition & 1 deletion dali/dfu/dfuutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ class CFileCloner
dstfdesc->setDefaultDir(dstdir.str());

Owned<IStoragePlane> plane = getDataStoragePlane(cluster1, false);
if (plane) // I think it should always exist, even in bare-metal.., but guard against it not for now (assumes initializeStorageGroups has been called)
if (plane) // I think it should always exist, even in bare-metal.., but guard against it not for now (assumes initializeStoragePlanes has been called)
{
DBGLOG("cloneSubFile: destfilename='%s', plane='%s', dirPerPart=%s", destfilename, cluster1.get(), boolToStr(plane->queryDirPerPart()));

Expand Down
2 changes: 1 addition & 1 deletion dali/dfuxref/dfuxrefmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ int main(int argc, char* argv[])
try
{
initClientProcess(group, DCR_XRef);
initializeStorageGroups(true);
initializeStoragePlanes(true, true);
StringArray args, clusters;
bool backupcheck = false;
unsigned mode = PMtextoutput|PMcsvoutput|PMtreeoutput;
Expand Down
2 changes: 1 addition & 1 deletion dali/sasha/saserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ int main(int argc, const char* argv[])
else
{
addAbortHandler(actionOnAbort);
initializeStorageGroups(true);
initializeStoragePlanes(true, true);
#ifdef _CONTAINERIZED
service = serverConfig->queryProp("@service");
if (isEmptyString(service))
Expand Down
2 changes: 1 addition & 1 deletion ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3785,7 +3785,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], Owned<ILocalWor
}
}

initializeStorageGroups(daliClientActive());
initializeStoragePlanes(daliClientActive(), true);

if (!standAloneWorkUnit)
{
Expand Down
2 changes: 1 addition & 1 deletion esp/platform/espcfg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ CEspConfig::CEspConfig(IProperties* inputs, IPropertyTree* envpt, IPropertyTree*
if (sdsSessionNeeded && !daliservers.isEmpty())
initSDSSessionCleaner(isDetachedFromDali());

initializeStorageGroups(daliClientActive());
initializeStoragePlanes(daliClientActive(), true);

const unsigned dafilesrvConnectTimeout = m_cfg->getPropInt("@dafilesrvConnectTimeout", 10)*1000;
const unsigned dafilesrvReadTimeout = m_cfg->getPropInt("@dafilesrvReadTimeout", 10)*1000;
Expand Down
2 changes: 1 addition & 1 deletion esp/platform/espp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ int init_main(int argc, const char* argv[])
config->checkESPCache(*server.get());

initializeMetrics(config);
initializeStorageGroups(daliClientActive());
initializeStoragePlanes(daliClientActive(), true);
}
catch(IException* e)
{
Expand Down
2 changes: 1 addition & 1 deletion roxie/ccd/ccddali.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ class CRoxieDaliHelper : implements IRoxieDaliHelper, public CInterface
waitToConnect -= delay;
}
}
initializeStorageGroups(daliHelper->connected());
initializeStoragePlanes(daliHelper->connected(), false); // This can be called while queries are running - so is not thread safe
return daliHelper;
}

Expand Down
2 changes: 1 addition & 1 deletion roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1463,7 +1463,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
#endif

//MORE: I'm not sure where this should go, or how it fits in. Possibly the function needs to be split in two.
initializeStorageGroups(false);
initializeStoragePlanes(false, true);

EnableSEHtoExceptionMapping();
setSEHtoExceptionHandler(&abortHandler);
Expand Down
21 changes: 8 additions & 13 deletions system/jlib/jptree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8821,22 +8821,20 @@ class CConfigUpdater : public CInterface
}
else if (avoidClone)
{
//This is added during the initialiation phase, so no danger of other threads accesing the global information
//while it is updated. Thor currently relies on the pointer not changing.
Owned<IPropertyTree> newComponentConfiguration = getComponentConfigSP();
Owned<IPropertyTree> newGlobalConfiguration = getGlobalConfigSP();
refreshConfiguration(newComponentConfiguration, newGlobalConfiguration);
}
else
{
DBGLOG("Refresh %p,", getComponentConfigSP().get());
dbglogXML(getComponentConfigSP());
// File monitor is disabled - no updates to the configuration files are supported.
//So clone the existing configuration and use that to refresh the config - update fucntions may perform differently.
Owned<IPropertyTree> newComponentConfiguration = createPTreeFromIPT(getComponentConfigSP());
Owned<IPropertyTree> newGlobalConfiguration = createPTreeFromIPT(getGlobalConfigSP());
refreshConfiguration(newComponentConfiguration, newGlobalConfiguration);
}
DBGLOG("Refreshed %p,", getComponentConfigSP().get());
dbglogXML(getComponentConfigSP());
}

void executeCallbacks()
Expand Down Expand Up @@ -8873,7 +8871,7 @@ class CConfigUpdater : public CInterface
}
return notifyFuncId;
}
unsigned addModifyFunc(ConfigModifyFunc notifyFunc)
unsigned addModifyFunc(ConfigModifyFunc notifyFunc, bool threadSafe)
{
CriticalBlock b(notifyFuncCS);
notifyFuncId++;
Expand All @@ -8883,8 +8881,7 @@ class CConfigUpdater : public CInterface
//Force all cached values to be recalculated, do not reload the config
//This is only legal if no other threads are accessing the config yet - otherwise the reading thread
//could crash when the global configuration is updated.
//MORE: This function needs an extra parameter to indicate whether or not threads have already started/avoid clone
refreshConfiguration(false, true);
refreshConfiguration(false, threadSafe);
DBGLOG("Modify functions should be registered before the configuration is loaded");
}
return notifyFuncId;
Expand Down Expand Up @@ -8925,11 +8922,11 @@ unsigned installConfigUpdateHook(ConfigUpdateFunc notifyFunc, bool callWhenInsta
return configFileUpdater->addNotifyFunc(notifyFunc, callWhenInstalled);
}

jlib_decl unsigned installConfigUpdateHook(ConfigModifyFunc notifyFunc) // This function must be called before the configuration is loaded.
jlib_decl unsigned installConfigUpdateHook(ConfigModifyFunc notifyFunc, bool threadSafe) // This function must be called before the configuration is loaded.
{
if (!configFileUpdater) // NB: installConfigUpdateHook should always be called after configFileUpdater is initialized
return 0;
return configFileUpdater->addModifyFunc(notifyFunc);
return configFileUpdater->addModifyFunc(notifyFunc, threadSafe);
}

void removeConfigUpdateHook(unsigned notifyFuncId)
Expand Down Expand Up @@ -8973,7 +8970,7 @@ void CConfigUpdateHook::installOnce(ConfigUpdateFunc callbackFunc, bool callWhen
}


void CConfigUpdateHook::installOnce(ConfigModifyFunc callbackFunc)
void CConfigUpdateHook::installModifierOnce(ConfigModifyFunc callbackFunc, bool threadSafe)
{
unsigned id = configCBId.load(std::memory_order_acquire);
if ((unsigned)-1 == id) // avoid CS in common case
Expand All @@ -8983,7 +8980,7 @@ void CConfigUpdateHook::installOnce(ConfigModifyFunc callbackFunc)
id = configCBId.load(std::memory_order_acquire);
if ((unsigned)-1 == id)
{
id = installConfigUpdateHook(callbackFunc);
id = installConfigUpdateHook(callbackFunc, threadSafe);
configCBId.store(id, std::memory_order_release);
}
}
Expand Down Expand Up @@ -10231,8 +10228,6 @@ void setExpertOpt(const char *opt, const char *value)
config->setPropTree(xpath);
getExpertOptPath(opt, xpath.clear());
config->setProp(xpath, value);
DBGLOG("*** SET *** %p", config.get());
dbglogXML(config);
}

//---------------------------------------------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jptree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ class jlib_decl CConfigUpdateHook
~CConfigUpdateHook() { clear(); }
void clear();
void installOnce(ConfigUpdateFunc callbackFunc, bool callWhenInstalled);
void installOnce(ConfigModifyFunc callbackFunc);
void installModifierOnce(ConfigModifyFunc callbackFunc, bool threadSafe);
};

/*
Expand Down
4 changes: 1 addition & 3 deletions thorlcr/master/thmastermain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,6 @@ class CRegistryServer : public CSimpleInterface
processGroup->serialize(msg);
globals->serialize(msg);
getGlobalConfigSP()->serialize(msg);
DBGLOG("**** CONFIG ****");
dbglogXML(globals);
msg.append(managerWorkerMpTag);
msg.append(kjServiceMpTag);
if (!queryNodeComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND))
Expand Down Expand Up @@ -762,7 +760,7 @@ int main( int argc, const char *argv[] )
}

//This can only be called once dali is initialised
initializeStorageGroups(true);
initializeStoragePlanes(true, true);

if (globals->getPropBool("@MPChannelReconnect"))
getMPServer()->setOpt(mpsopt_channelreopen, "true");
Expand Down

0 comments on commit f384cc6

Please sign in to comment.