Skip to content

Commit

Permalink
EH: CS-611 add option to the parallel environment to ignore slave req…
Browse files Browse the repository at this point in the history
…uests for slave tasks on the master host

EH: CS-608 add options to the parallel environment for multithreaded or multiprocess applications
  • Loading branch information
jgabler-hpc committed Sep 26, 2024
1 parent 3d37b87 commit 33833bd
Show file tree
Hide file tree
Showing 16 changed files with 377 additions and 123 deletions.
77 changes: 77 additions & 0 deletions doc/markdown/man/man5/sge_pe.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,83 @@ usage records will be sent from execd to qmaster, which can
significantly reduce load on qmaster when running large tightly
integrated parallel jobs.

## **ign_sreq_on_mhost**

This parameter can be used to define the scheduling behavior for parallel jobs
which are submitted with the **-scope** submit option, see *qsub*(1).

By default, **ign_sreq_on_mhost** is set to FALSE. This means that slave tasks
which are running besides the master task on the master host must fulfill the
resource requirements specified in the slave scope request. For consumable slave
requests enough capacity needs to be available and the slave tasks consume the
requested amount from consumable resources.

There are situations where the master task requires multiple slots and its
resource requirements are well known and either no slave tasks are started
on the master host or they are started as part of the master task,
as sub processes or as threads.
In this case **ign_sreq_on_mhost** can be set to TRUE. This means that
on the master host only global and master requests need to be fulfilled,
slave requests are ignored.
Slave tasks on the master host will not consume any consumable resources except
for slots.

In order for queue limits to be correctly applied to the master task
**ign_sreq_on_mhost** set to TRUE
should be combined with **master_forks_slaves** set to TRUE.

## **master_forks_slaves**

This parameter is only checked if **control_slaves** (see above) is set
to TRUE and thus xxQS_NAMExx is the creator of the slave tasks of a
parallel application via *xxqs_name_sxx_execd*(8) and
*xxqs_name_sxx_shepherd*(8).

Slave tasks of tightly integrated parallel jobs are usually started by calling
`qrsh -inherit <slave_host> <command>` on the master host.

Usually applications either start every slave task individually with a separate
`qrsh -inherit` call
or the master task starts slave tasks on the master host per fork/exec or as threads
of the master process.

If slave tasks on the master host are started individually then keep the setting of
**master_forks_slaves** as FALSE (default).
If slave tasks on the master host are started via fork/exec or as threads
then set **master_forks_slaves** to TRUE.

The setting of **master_forks_slaves** influences the behavior of the *xxqs_name_sxx_execd*(8):
If **master_forks_slaves** is set to TRUE, no slave tasks can be started with `qrsh -inherit`
on the master host
and limits set for the master task (the job script) will be multiplied by the
number of slots allocated for the job on the master host.

## **daemon_forks_slaves**

This parameter is only checked if **control_slaves** (see above) is set
to TRUE and thus xxQS_NAMExx is the creator of the slave tasks of a
parallel application via *xxqs_name_sxx_execd*(8) and
*xxqs_name_sxx_shepherd*(8).

Slave tasks of tightly integrated parallel jobs are usually started by calling
`qrsh -inherit <slave_host> <command>` on the master host.

Depending on the application, either slave tasks are started individually on the slave hosts
with a separate `qrsh -inherit` call per task.
Or a single process is started per slave host which then forks/execs the slave tasks
or starts them as threads.

Default setting of **daemon_forks_slaves** is FALSE. Use this setting if slave tasks are started
individually on the slave hosts.
If a single process is started per slave host which then forks/execs the slave tasks or starts them as threads
then set **daemon_forks_slaves** to TRUE.

The setting of **daemon_forks_slaves** influences the behavior of the *xxqs_name_sxx_execd*(8):
If **daemon_forks_slaves** is set to TRUE, only a single task (the daemon)
can be started with `qrsh -inherit`
and limits set for the one task (the daemon) will be multiplied by the number of slots allocated for
the job on the slave host.

# RESTRICTIONS

**Note**, that the functionality of the start-up, shutdown and signaling
Expand Down
106 changes: 88 additions & 18 deletions source/daemons/execd/exec_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
#include "sgeobj/sge_grantedres.h"
#include "sgeobj/sge_mailrec.h"
#include "sgeobj/sge_path_alias.h"
#include "sgeobj/sge_ulong.h"

#include "comm/commlib.h"

Expand Down Expand Up @@ -261,6 +262,68 @@ static int addgrpid_already_in_use(long add_grp_id) {
#endif
#endif

static const char *
sge_exec_job_get_limit(dstring *dstr, int limit_nm, const char *limit_name, u_long32 type,
const lListElem *master_q, const lListElem *jatep, const lListElem *petep,
const char *qualified_hostname) {
DENTER(TOP_LAYER);

const char *ret = lGetString(master_q, limit_nm);
// can not really happen but just to be sure
if (ret == nullptr) {
ret = "INFINITY";
}

DPRINTF("sge_exec_job_get_limit: limit_name=%s, limit=%s\n", limit_name, ret);

// for sequential jobs we are done, for tightly integrated parallel jobs we need to check the pe settings
// if the limit in the master queue is infinity there is no need to do calculations
// if it is not infinity we need to check pe setting master_forks_slave and daemon_forks_slave
// and sum up the limits of the (possibly multiple) queues
const lListElem *pe = lGetObject(jatep, JAT_pe_object);
if (pe != nullptr && lGetBool(pe, PE_control_slaves) &&
strcasecmp(ret, "INFINITY") != 0) {
DPRINTF("sge_exec_job_get_limit: we have a tightly integrated parallel job and limit is not infinity\n");
if ((petep == nullptr && lGetBool(pe, PE_master_forks_slaves)) || // master task forks slaves
lGetBool(pe, PE_daemon_forks_slaves)) { // one slave task forks slaves
DPRINTF("sge_exec_job_get_limit: we need to sum up the limits\n");
double limit = 0;
const lList *gdil = lGetList(jatep, JAT_granted_destin_identifier_list);
const void *iterator = nullptr;
const lListElem *gdil_ep;
const lListElem *next_gdil_ep = lGetElemHostFirst(gdil, JG_qhostname, qualified_hostname, &iterator);
while ((gdil_ep = next_gdil_ep) != nullptr) {
next_gdil_ep = lGetElemHostNext(gdil, JG_qhostname, qualified_hostname, &iterator);

const lListElem *queue = lGetObject(gdil_ep, JG_queue);
if (queue != nullptr) {
const char *limit_str = lGetString(queue, limit_nm);
if (limit_str != NULL) {
// if one of the queue instances has a limit of infinity the sum is infinity
if (strcasecmp(limit_str, "INFINITY") == 0) {
DPRINTF("sge_exec_job_get_limit: qinstance %s has infinity\n", lGetString(gdil_ep, JG_qname));
ret = "INFINITY";
break;
} else {
u_long32 slots = lGetUlong(gdil_ep, JG_slots);
double dbl;
parse_ulong_val(&dbl, nullptr, type, limit_str, nullptr, 0);
limit += dbl * slots;
DPRINTF("sge_exec_job_get_limit: qinstance %s has limit %s, slots " sge_u32 ", sum %f\n",
lGetString(gdil_ep, JG_qname), limit_str, slots, dbl * slots);
}
}
}
} // end: loop over all gdil elements on this host
double_print_to_dstring(limit, dstr, type);
ret = sge_dstring_get_string(dstr);
DPRINTF("sge_exec_job_get_limit: sum of limits %s\n", ret);
} // end: we need to sum up the limits
} // end: we have a tightly integrated pe job

DRETURN(ret);
}

/************************************************************************
part of execd. Setup job environment then start shepherd.
Expand Down Expand Up @@ -1223,35 +1286,42 @@ int sge_exec_job(lListElem *jep, lListElem *jatep, lListElem *petep, char *err_s
} else {
fprintf(fp, "ckpt_job=0\n");
}

/*
* Shorthand for this code sequence:
* - obtain resource A from master queue and write out to shepherd config file
* - check if resource is job consumable and indicate to shepherd
*/
#define WRITE_COMPLEX_AND_CONSUMABLE_ATTR(A) \
fprintf(fp, #A"=%s\n", lGetString(master_q, QU_##A)); \
job_is_requesting_consumable(jep, #A) ? fprintf(fp, #A"_is_consumable_job=1\n") : fprintf(fp, #A"_is_consumable_job=0\n");

WRITE_COMPLEX_AND_CONSUMABLE_ATTR(h_vmem);
WRITE_COMPLEX_AND_CONSUMABLE_ATTR(s_vmem);
#define WRITE_COMPLEX_AND_CONSUMABLE_ATTR(A, T) \
fprintf(fp, #A"=%s\n", sge_exec_job_get_limit(&dstr_limit, QU_##A, #A, T, master_q, jatep, petep, qualified_hostname)); \
job_is_requesting_consumable(jep, #A) ? fprintf(fp, #A"_is_consumable_job=1\n") : fprintf(fp, #A"_is_consumable_job=0\n")
#define WRITE_COMPLEX_ATTR(A, T) \
fprintf(fp, #A"=%s\n", sge_exec_job_get_limit(&dstr_limit, QU_##A, #A, T, master_q, jatep, petep, qualified_hostname))
\
{
DSTRING_STATIC(dstr_limit, 64);
WRITE_COMPLEX_AND_CONSUMABLE_ATTR(h_vmem, TYPE_MEM);
WRITE_COMPLEX_AND_CONSUMABLE_ATTR(s_vmem, TYPE_MEM);

WRITE_COMPLEX_AND_CONSUMABLE_ATTR(h_cpu);
WRITE_COMPLEX_AND_CONSUMABLE_ATTR(s_cpu);
WRITE_COMPLEX_AND_CONSUMABLE_ATTR(h_cpu, TYPE_TIM);
WRITE_COMPLEX_AND_CONSUMABLE_ATTR(s_cpu, TYPE_TIM);

WRITE_COMPLEX_AND_CONSUMABLE_ATTR(h_stack);
WRITE_COMPLEX_AND_CONSUMABLE_ATTR(s_stack);
WRITE_COMPLEX_AND_CONSUMABLE_ATTR(h_stack, TYPE_MEM);
WRITE_COMPLEX_AND_CONSUMABLE_ATTR(s_stack, TYPE_MEM);

WRITE_COMPLEX_AND_CONSUMABLE_ATTR(h_data);
WRITE_COMPLEX_AND_CONSUMABLE_ATTR(s_data);
WRITE_COMPLEX_AND_CONSUMABLE_ATTR(h_data, TYPE_MEM);
WRITE_COMPLEX_AND_CONSUMABLE_ATTR(s_data, TYPE_MEM);

fprintf(fp, "h_core=%s\n", lGetString(master_q, QU_h_core));
fprintf(fp, "s_core=%s\n", lGetString(master_q, QU_s_core));
// @todo why not use WRITE_COMPLEX_AND_CONSUMABLE_ATTR? Can't they be made consumable?
WRITE_COMPLEX_ATTR(h_core, TYPE_MEM);
WRITE_COMPLEX_ATTR(s_core, TYPE_MEM);

fprintf(fp, "h_rss=%s\n", lGetString(master_q, QU_h_rss));
fprintf(fp, "s_rss=%s\n", lGetString(master_q, QU_s_rss));
WRITE_COMPLEX_ATTR(h_rss, TYPE_MEM);
WRITE_COMPLEX_ATTR(s_rss, TYPE_MEM);

fprintf(fp, "h_fsize=%s\n", lGetString(master_q, QU_h_fsize));
fprintf(fp, "s_fsize=%s\n", lGetString(master_q, QU_s_fsize));
WRITE_COMPLEX_ATTR(h_fsize, TYPE_MEM);
WRITE_COMPLEX_ATTR(s_fsize, TYPE_MEM);
}

{
char *s;
Expand Down
80 changes: 61 additions & 19 deletions source/daemons/execd/execd_job_exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
************************************************************************/
/*___INFO__MARK_END__*/
#include <cerrno>
#include <climits>
#include <pwd.h>
#include <fcntl.h>
#include <unistd.h>
Expand Down Expand Up @@ -550,37 +551,78 @@ static lList *
job_get_queue_for_task(lListElem *jatep, lListElem *petep,
const char *qualified_hostname, const char *queuename)
{
const lListElem *this_q, *gdil_ep;

DENTER(TOP_LAYER);

for_each_ep(gdil_ep, lGetList(jatep, JAT_granted_destin_identifier_list)) {
/* if a certain queuename is requested, check only this queue */
if (queuename != nullptr &&
strcmp(queuename, lGetString(gdil_ep, JG_qname)) != 0) {
DTRACE;
continue;
}
bool on_master_host = lGetUlong(jatep, JAT_status) == JSLAVE ? false : true;
const lListElem *pe = lGetObject(jatep, JAT_pe_object);
if (pe == nullptr) {
// can not really happen, we start a task for a tightly integrated parallel job
DRETURN(nullptr);
}

this_q = lGetObject(gdil_ep, JG_queue);
// if we are on the master host and master_forks_slaves is set, not a single qrsh -inherit is allowed
if (on_master_host && lGetBool(pe, PE_master_forks_slaves)) {
DRETURN(nullptr);
}

const lList *gdil = lGetList(jatep, JAT_granted_destin_identifier_list);

// in case of daemon_forks_slaves we have to check if we may start more tasks
if (lGetBool(pe, PE_daemon_forks_slaves)) {
int max_slots = INT_MAX;
if (on_master_host) {
// on the master host we can have the job script itself + one daemon starting slave tasks
max_slots = 2;
} else {
// on a slave host we can only have the daemon starting slave task
max_slots = 1;
}

DTRACE;
// we can have multiple queue instances on the host
// count how many slots are used in total
// and check if we may still start more
int total_slots_used = 0;
const void *iterator = nullptr;
const lListElem *next_gdil_ep = lGetElemHostFirst(gdil, JG_qhostname, qualified_hostname, &iterator);
const lListElem *gdil_ep;
while ((gdil_ep = next_gdil_ep) != nullptr) {
next_gdil_ep = lGetElemHostNext(gdil, JG_qhostname, qualified_hostname, &iterator);

const lListElem *queue = lGetObject(gdil_ep, JG_queue);
if (queue != nullptr) {
total_slots_used += qinstance_slots_used(queue);
if (total_slots_used >= max_slots) {
// all possible tasks are already running
DRETURN(nullptr);
}
}
}
}

/* Queue must exist and be on this host */
if (this_q != nullptr &&
sge_hostcmp(lGetHost(gdil_ep, JG_qhostname),
qualified_hostname) == 0) {
// if we get here we may still start tasks
// select a queue
const void *iterator = nullptr;
const lListElem *next_gdil_ep = lGetElemHostFirst(gdil, JG_qhostname, qualified_hostname, &iterator);
const lListElem *gdil_ep;
while ((gdil_ep = next_gdil_ep) != nullptr) {
next_gdil_ep = lGetElemHostNext(gdil, JG_qhostname, qualified_hostname, &iterator);

DTRACE;
// if a specific queue is requested, skip non matching queues
if (queuename != nullptr && sge_strnullcmp(queuename, lGetString(gdil_ep, JG_qname)) != 0) {
continue;
}

const lListElem *queue = lGetObject(gdil_ep, JG_queue);
if (queue != nullptr) {
/* Queue must have free slots */
if (qinstance_slots_used(this_q) < (int)lGetUlong(this_q, QU_job_slots)) {
lList *jat_gdil = job_set_queue_info_in_task(qualified_hostname, lGetString(gdil_ep, JG_qname),
petep);
if (qinstance_slots_used(queue) < (int)lGetUlong(queue, QU_job_slots)) {
lList *jat_gdil = job_set_queue_info_in_task(qualified_hostname, lGetString(gdil_ep, JG_qname), petep);
DRETURN(jat_gdil);
}
}
}

// if we get here we didn't find a queue instance with free slots
DRETURN(nullptr);
}

Expand Down
2 changes: 1 addition & 1 deletion source/daemons/qmaster/sge_give_jobs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ static int queue_field[] = {QU_qhostname,
/************************************************************************
Master function to give job to the execd.
We make asynchron sends and implement a retry mechanism.
We make asynchronous sends and implement a retry mechanism.
Do everything to make sure the execd is prepared for receiving the job.
************************************************************************/
/* pe = is nullptr for serial jobs*/
Expand Down
6 changes: 6 additions & 0 deletions source/daemons/qmaster/sge_pe_qmaster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ pe_mod(lList **alpp, lListElem *new_pe, lListElem *pe, /* reduced */

/* ---- PE_accounting_summary */
attr_mod_bool(pe, new_pe, PE_accounting_summary, "accounting_summary");
/* ---- PE_ignore_slave_requests_on_master_host */
attr_mod_bool(pe, new_pe, PE_ignore_slave_requests_on_master_host, "ign_sreq_on_mhost");
/* ---- PE_master_forks_slaves */
attr_mod_bool(pe, new_pe, PE_master_forks_slaves, "master_forks_slaves");
/* ---- PE_daemon_forks_slaves */
attr_mod_bool(pe, new_pe, PE_daemon_forks_slaves, "daemon_forks_slaves");

/* -------- PE_resource_utilization */
if (add) {
Expand Down
1 change: 1 addition & 0 deletions source/daemons/qmaster/sge_sched_prepare_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ static const int pe_nm[] = {
PE_job_is_first_task,
PE_resource_utilization,
PE_urgency_slots,
PE_ignore_slave_requests_on_master_host,
NoName
};

Expand Down
2 changes: 1 addition & 1 deletion source/daemons/qmaster/sge_sched_thread_rsmap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ gru_list_add_request(lList **granted_resources_list, const char *name, u_long32
DRETURN(false);
}
}
DPRINTF(" ==> gru_list_add_request: booking %s: " sge_u32 " * %f from %host %s\n", name, slots, amount, host_name);
DPRINTF(" ==> gru_list_add_request: booking %s: " sge_u32 " * %f from host %s\n", name, slots, amount, host_name);
lListElem *gru = gru_list_search(*granted_resources_list, name, host_name);
if (gru == nullptr) {
DPRINTF(" -> adding new GRU\n");
Expand Down
24 changes: 14 additions & 10 deletions source/dist/mpi/mpi.template
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
pe_name mpi
slots <the_number_of_slots>
user_lists NONE
xuser_lists NONE
start_proc_args <your_sge_root>/mpi/startmpi.sh $pe_hostfile
stop_proc_args <your_sge_root>/mpi/stopmpi.sh
allocation_rule $round_robin
control_slaves FALSE
job_is_first_task TRUE
urgency_slots min
pe_name mpi
slots <the_number_of_slots>
user_lists NONE
xuser_lists NONE
start_proc_args <your_sge_root>/mpi/startmpi.sh $pe_hostfile
stop_proc_args <your_sge_root>/mpi/stopmpi.sh
allocation_rule $round_robin
control_slaves FALSE
job_is_first_task TRUE
urgency_slots min
accounting_summary FALSE
ign_sreq_on_mhosts FALSE
master_forks_slaves FALSE
daemon_forks_slaves FALSE
Loading

0 comments on commit 33833bd

Please sign in to comment.