Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up the Load Balancer call. #179

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
1 change: 0 additions & 1 deletion DataManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,6 @@ void DataManager::resetReadOnly(Parameters param, const CkCallback &cb)
verbosity = param.iVerbosity;
dExtraStore = param.dExtraStore;
dMaxBalance = param.dMaxBalance;
dFracLoadBalance = param.dFracLoadBalance;
nIOProcessor = param.nIOProcessor;
theta = param.dTheta;
thetaMono = theta*theta*theta*theta;
Expand Down
8 changes: 1 addition & 7 deletions ParallelGravity.ci
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ mainmodule ParallelGravity {
readonly DomainsDec domainDecomposition;
readonly double dExtraStore;
readonly double dMaxBalance;
readonly double dFracLoadBalance;
readonly double dGlassDamper;
readonly int bUseCkLoopPar;
readonly int peanoKey;
Expand Down Expand Up @@ -514,13 +513,8 @@ mainmodule ParallelGravity {
entry [local] void receiveParticlesFullCallback(GravityParticle *egp, int num, int chunk, int reqID, Tree::NodeKey &remoteBucket, int awi, void *source);

// jetley
entry void startlb(const CkCallback &cb, int activeRung);
entry void startlb(const CkCallback &cb, int activeRung, bool bDoLB);
entry void ResumeFromSync();
entry [reductiontarget] void getParticleInfoForLB(int64_t active_part,
int64_t total_part);
//jetley
// entry void receiveProxy(CkGroupID);
entry void doAtSync();

entry void outputASCII(CkReference<OutputParams>, int bParaWrite,
const CkCallback& cb);
Expand Down
26 changes: 17 additions & 9 deletions ParallelGravity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ unsigned int _yieldPeriod;
DomainsDec domainDecomposition;
double dExtraStore; ///< fraction of extra particle storage
double dMaxBalance; ///< Max piece imbalance for load balancing
double dFracLoadBalance; ///< Min fraction of particles active
/// for doing load balancing.
double dGlassDamper; // Damping inverse timescale for making glasses
int iGasModel; ///< For backward compatibility
int peanoKey;
Expand Down Expand Up @@ -976,7 +974,6 @@ Main::Main(CkArgMsg* m) {
thetaMono = theta*theta*theta*theta;
dExtraStore = param.dExtraStore;
dMaxBalance = param.dMaxBalance;
dFracLoadBalance = param.dFracLoadBalance;
dGlassDamper = param.dGlassDamper;
_cacheLineDepth = param.cacheLineDepth;
verbosity = param.iVerbosity;
Expand Down Expand Up @@ -1698,14 +1695,25 @@ Main::loadBalance(int iPhase)
}
else {
double startTime = CkWallTimer();
if(iPhase == PHASE_FEEDBACK) {
CkPrintf("Load balancer for star formation/feedback... ");

bool bDoLB = true;
if(iPhase != -1) {
int64_t nActivePart;
if(iPhase == PHASE_FEEDBACK) {
CkPrintf("Load balancer for star formation/feedback... ");
nActivePart = nTotalSPH + nTotalStar;
}
else {
CkPrintf("Load balancer ... ");
nActivePart = nActiveGrav;
if(nActiveSPH > nActivePart) nActivePart = nActiveSPH;
}
bDoLB = ((float)nActivePart/nTotalParticles > param.dFracLoadBalance) ?
true : false;
}
else {
else
CkPrintf("Load balancer ... ");
}

treeProxy.startlb(CkCallbackResumeThread(), iPhase);
treeProxy.startlb(CkCallbackResumeThread(), iPhase, bDoLB);
double tLB = CkWallTimer()-startTime;
timings[iPhase].tLoadB += tLB;
CkPrintf("took %g seconds.\n", tLB);
Expand Down
5 changes: 1 addition & 4 deletions ParallelGravity.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ extern unsigned int _yieldPeriod;
extern DomainsDec domainDecomposition;
extern double dExtraStore;
extern double dMaxBalance;
extern double dFracLoadBalance;
extern double dGlassDamper;
extern int bUseCkLoopPar;
extern GenericTrees useTree;
Expand Down Expand Up @@ -1972,8 +1971,7 @@ class TreePiece : public CBase_TreePiece {
void flushSmoothParticles(CkCacheFillMsg<KeyType> *msg);
void processReqSmoothParticles();

void getParticleInfoForLB(int64_t active_part, int64_t total_part);
void startlb(const CkCallback &cb, int activeRung);
void startlb(const CkCallback &cb, int activeRung, bool bDoLB);
void setTreePieceLoad(int activeRung);
void populateSavedPhaseData(int phase, double tpload, unsigned int activeparts);
bool havePhaseData(int phase);
Expand Down Expand Up @@ -2034,7 +2032,6 @@ class TreePiece : public CBase_TreePiece {
void receiveNodeCallback(GenericTreeNode *node, int chunk, int reqID, int awi, void *source);
void receiveParticlesCallback(ExternalGravityParticle *egp, int num, int chunk, int reqID, Tree::NodeKey &remoteBucket, int awi, void *source);
void receiveParticlesFullCallback(GravityParticle *egp, int num, int chunk, int reqID, Tree::NodeKey &remoteBucket, int awi, void *source);
void doAtSync();

void balanceBeforeInitialForces(const CkCallback &cb);

Expand Down
84 changes: 42 additions & 42 deletions TreePiece.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1885,14 +1885,24 @@ void TreePiece::countActive(int activeRung, const CkCallback& cb) {
int64_t nActive[2];

nActive[0] = nActive[1] = 0;
for(unsigned int i = 1; i <= myNumParticles; ++i) {
if(myParticles[i].rung >= activeRung) {
nActive[0]++;
if(TYPETest(&myParticles[i], TYPE_GAS)) {
nActive[1]++;
}
}
if(activeRung == 0){
nActive[0] = myNumParticles;
nActive[1] = myNumSPH;
}
else if(activeRung == PHASE_FEEDBACK) {
nActive[0] = myNumSPH + myNumStar;
}
else{
for(unsigned int i = 1; i <= myNumParticles; ++i) {
if(myParticles[i].rung >= activeRung) {
nActive[0]++;
if(TYPETest(&myParticles[i], TYPE_GAS)) {
nActive[1]++;
}
}
}
}
numActiveParticles = nActive[0];
contribute(2*sizeof(int64_t), nActive, CkReduction::sum_long, cb);
}

Expand Down Expand Up @@ -5413,9 +5423,11 @@ void TreePiece::setTreePieceLoad(int activeRung) {
setObjTime(dLoadExp);
}

// jetley - contribute your centroid. AtSync is now called by the load balancer (broadcast) when it has
// all centroids.
void TreePiece::startlb(const CkCallback &cb, int activeRung){
/// @brief Save piece loads and call AtSync() if we should load balance.
/// @param cb Callback for ResumeFromSync().
/// @param activeRung Rung we are load balancing for.
/// @param bDoLB Whether we should call AtSync()
void TreePiece::startlb(const CkCallback &cb, int activeRung, bool bDoLB){

if(verbosity > 1)
CkPrintf("[%d] actual load: %g\n", thisIndex, getObjTime());
Expand All @@ -5424,41 +5436,33 @@ void TreePiece::startlb(const CkCallback &cb, int activeRung){
iActiveRungLB = activeRung;
if(verbosity > 1)
CkPrintf("[%d] TreePiece %d calling AtSync()\n",CkMyPe(),thisIndex);

unsigned int i;

if(activeRung == 0){
numActiveParticles = myNumParticles;
}
else if(activeRung == PHASE_FEEDBACK) {
numActiveParticles = myNumSPH + myNumStar;
}
else{
for(numActiveParticles = 0, i = 1; i <= myNumParticles; i++)
if(myParticles[i].rung >= activeRung)
numActiveParticles++;
}

int64_t active_tp[2];
active_tp[0] = numActiveParticles;
active_tp[1] = myNumParticles;

contribute(2*sizeof(int64_t), &active_tp, CkReduction::sum_long,
CkCallback(CkReductionTarget(TreePiece,getParticleInfoForLB),thisProxy));
}

// This is called by startlb to check whether to call the load balancer
void TreePiece::getParticleInfoForLB(int64_t active_part, int64_t total_part) {
bool doLB = ((float)active_part/total_part > dFracLoadBalance) ? true : false;
// Don't do LB
if (!doLB) {
// Don't do LB; just save and reset loads.
if (!bDoLB) {
setTreePieceLoad(iActiveRungLB);
iPrevRungLB = iActiveRungLB;
setObjTime(0.0);
contribute(callback);
return;
}

// We need to recount the number of active particles since DD has
// moved particles around
if(activeRung == 0){ // Everybody is active; no need to count
numActiveParticles = myNumParticles;
}
else if(activeRung == PHASE_FEEDBACK) { // Also no need to recount
numActiveParticles = myNumSPH + myNumStar;
}
else{
numActiveParticles = 0;
for(unsigned int i = 1; i <= myNumParticles; ++i) {
if(myParticles[i].rung >= activeRung) {
numActiveParticles++;
}
}
}

LDObjHandle myHandle = myRec->getLdHandle();

TaggedVector3D tv(savedCentroid, myHandle, numActiveParticles, myNumParticles,
Expand All @@ -5474,11 +5478,8 @@ void TreePiece::getParticleInfoForLB(int64_t active_part, int64_t total_part) {
*(TaggedVector3D *) data = tv;
}
}
thisProxy[thisIndex].doAtSync();
iPrevRungLB = iActiveRungLB;
}

void TreePiece::doAtSync(){
if(verbosity > 1)
CkPrintf("[%d] TreePiece %d calling AtSync() at %g\n",CkMyPe(),thisIndex, CkWallTimer());
AtSync();
Expand Down Expand Up @@ -6577,10 +6578,9 @@ void TreePiece::balanceBeforeInitialForces(const CkCallback &cb){
*(TaggedVector3D *)data = tv;
}
}
thisProxy[thisIndex].doAtSync();

// this will be called in resumeFromSync()
callback = cb;
AtSync();
}

// Choose a piece from among the owners from which to
Expand Down
Loading