diff --git a/doc/markdown/man/man5/sge_pe.md b/doc/markdown/man/man5/sge_pe.md index b77d707304..afdfbec34c 100644 --- a/doc/markdown/man/man5/sge_pe.md +++ b/doc/markdown/man/man5/sge_pe.md @@ -257,6 +257,83 @@ usage records will be sent from execd to qmaster, which can significantly reduce load on qmaster when running large tightly integrated parallel jobs. +## **ign_sreq_on_mhost** + +This parameter can be used to define the scheduling behavior for parallel jobs +which are submitted with the **-scope** submit option, see *qsub*(1). + +By default, **ign_sreq_on_mhost** is set to FALSE. This means that slave tasks +which are running besides the master task on the master host must fulfill the +resource requirements specified in the slave scope request. For consumable slave +requests enough capacity needs to be available and the slave tasks consume the +requested amount from consumable resources. + +There are situations where the master task requires multiple slots and its +resource requirements are well known and either no slave tasks are started +on the master host or they are started as part of the master task, +as sub processes or as threads. +In this case **ign_sreq_on_mhost** can be set to TRUE. This means that +on the master host only global and master requests need to be fulfilled, +slave requests are ignored. +Slave tasks on the master host will not consume any consumable resources except +for slots. + +In order for queue limits to be correctly applied to the master task +**ign_sreq_on_mhost** set to TRUE +should be combined with **master_forks_slaves** set to TRUE. + +## **master_forks_slaves** + +This parameter is only checked if **control_slaves** (see above) is set +to TRUE and thus xxQS_NAMExx is the creator of the slave tasks of a +parallel application via *xxqs_name_sxx_execd*(8) and +*xxqs_name_sxx_shepherd*(8). + +Slave tasks of tightly integrated parallel jobs are usually started by calling +`qrsh -inherit ` on the master host. + +Usually applications either start every slave task individually with a separate +`qrsh -inherit` call +or the master task starts slave tasks on the master host per fork/exec or as threads +of the master process. + +If slave tasks on the master host are started individually then keep the setting of +**master_forks_slaves** as FALSE (default). +If slave tasks on the master host are started via fork/exec or as threads +then set **master_forks_slaves** to TRUE. + +The setting of **master_forks_slaves** influences the behavior of the *xxqs_name_sxx_execd*(8): +If **master_forks_slaves** is set to TRUE, no slave tasks can be started with `qrsh -inherit` +on the master host +and limits set for the master task (the job script) will be multiplied by the +number of slots allocated for the job on the master host. + +## **daemon_forks_slaves** + +This parameter is only checked if **control_slaves** (see above) is set +to TRUE and thus xxQS_NAMExx is the creator of the slave tasks of a +parallel application via *xxqs_name_sxx_execd*(8) and +*xxqs_name_sxx_shepherd*(8). + +Slave tasks of tightly integrated parallel jobs are usually started by calling +`qrsh -inherit ` on the master host. + +Depending on the application, either slave tasks are started individually on the slave hosts +with a separate `qrsh -inherit` call per task. +Or a single process is started per slave host which then forks/execs the slave tasks +or starts them as threads. + +Default setting of **daemon_forks_slaves** is FALSE. Use this setting if slave tasks are started +individually on the slave hosts. +If a single process is started per slave host which then forks/execs the slave tasks or starts them as threads +then set **daemon_forks_slaves** to TRUE. + +The setting of **daemon_forks_slaves** influences the behavior of the *xxqs_name_sxx_execd*(8): +If **daemon_forks_slaves** is set to TRUE, only a single task (the daemon) +can be started with `qrsh -inherit` +and limits set for the one task (the daemon) will be multiplied by the number of slots allocated for +the job on the slave host. + # RESTRICTIONS **Note**, that the functionality of the start-up, shutdown and signaling diff --git a/source/daemons/execd/exec_job.cc b/source/daemons/execd/exec_job.cc index d6ae8096f6..069dcc96f1 100644 --- a/source/daemons/execd/exec_job.cc +++ b/source/daemons/execd/exec_job.cc @@ -83,6 +83,7 @@ #include "sgeobj/sge_grantedres.h" #include "sgeobj/sge_mailrec.h" #include "sgeobj/sge_path_alias.h" +#include "sgeobj/sge_ulong.h" #include "comm/commlib.h" @@ -261,6 +262,68 @@ static int addgrpid_already_in_use(long add_grp_id) { #endif #endif +static const char * +sge_exec_job_get_limit(dstring *dstr, int limit_nm, const char *limit_name, u_long32 type, + const lListElem *master_q, const lListElem *jatep, const lListElem *petep, + const char *qualified_hostname) { + DENTER(TOP_LAYER); + + const char *ret = lGetString(master_q, limit_nm); + // can not really happen but just to be sure + if (ret == nullptr) { + ret = "INFINITY"; + } + + DPRINTF("sge_exec_job_get_limit: limit_name=%s, limit=%s\n", limit_name, ret); + + // for sequential jobs we are done, for tightly integrated parallel jobs we need to check the pe settings + // if the limit in the master queue is infinity there is no need to do calculations + // if it is not infinity we need to check pe setting master_forks_slave and daemon_forks_slave + // and sum up the limits of the (possibly multiple) queues + const lListElem *pe = lGetObject(jatep, JAT_pe_object); + if (pe != nullptr && lGetBool(pe, PE_control_slaves) && + strcasecmp(ret, "INFINITY") != 0) { + DPRINTF("sge_exec_job_get_limit: we have a tightly integrated parallel job and limit is not infinity\n"); + if ((petep == nullptr && lGetBool(pe, PE_master_forks_slaves)) || // master task forks slaves + lGetBool(pe, PE_daemon_forks_slaves)) { // one slave task forks slaves + DPRINTF("sge_exec_job_get_limit: we need to sum up the limits\n"); + double limit = 0; + const lList *gdil = lGetList(jatep, JAT_granted_destin_identifier_list); + const void *iterator = nullptr; + const lListElem *gdil_ep; + const lListElem *next_gdil_ep = lGetElemHostFirst(gdil, JG_qhostname, qualified_hostname, &iterator); + while ((gdil_ep = next_gdil_ep) != nullptr) { + next_gdil_ep = lGetElemHostNext(gdil, JG_qhostname, qualified_hostname, &iterator); + + const lListElem *queue = lGetObject(gdil_ep, JG_queue); + if (queue != nullptr) { + const char *limit_str = lGetString(queue, limit_nm); + if (limit_str != NULL) { + // if one of the queue instances has a limit of infinity the sum is infinity + if (strcasecmp(limit_str, "INFINITY") == 0) { + DPRINTF("sge_exec_job_get_limit: qinstance %s has infinity\n", lGetString(gdil_ep, JG_qname)); + ret = "INFINITY"; + break; + } else { + u_long32 slots = lGetUlong(gdil_ep, JG_slots); + double dbl; + parse_ulong_val(&dbl, nullptr, type, limit_str, nullptr, 0); + limit += dbl * slots; + DPRINTF("sge_exec_job_get_limit: qinstance %s has limit %s, slots " sge_u32 ", sum %f\n", + lGetString(gdil_ep, JG_qname), limit_str, slots, dbl * slots); + } + } + } + } // end: loop over all gdil elements on this host + double_print_to_dstring(limit, dstr, type); + ret = sge_dstring_get_string(dstr); + DPRINTF("sge_exec_job_get_limit: sum of limits %s\n", ret); + } // end: we need to sum up the limits + } // end: we have a tightly integrated pe job + + DRETURN(ret); +} + /************************************************************************ part of execd. Setup job environment then start shepherd. @@ -1223,35 +1286,42 @@ int sge_exec_job(lListElem *jep, lListElem *jatep, lListElem *petep, char *err_s } else { fprintf(fp, "ckpt_job=0\n"); } + /* * Shorthand for this code sequence: * - obtain resource A from master queue and write out to shepherd config file * - check if resource is job consumable and indicate to shepherd */ -#define WRITE_COMPLEX_AND_CONSUMABLE_ATTR(A) \ - fprintf(fp, #A"=%s\n", lGetString(master_q, QU_##A)); \ - job_is_requesting_consumable(jep, #A) ? fprintf(fp, #A"_is_consumable_job=1\n") : fprintf(fp, #A"_is_consumable_job=0\n"); - - WRITE_COMPLEX_AND_CONSUMABLE_ATTR(h_vmem); - WRITE_COMPLEX_AND_CONSUMABLE_ATTR(s_vmem); +#define WRITE_COMPLEX_AND_CONSUMABLE_ATTR(A, T) \ + fprintf(fp, #A"=%s\n", sge_exec_job_get_limit(&dstr_limit, QU_##A, #A, T, master_q, jatep, petep, qualified_hostname)); \ + job_is_requesting_consumable(jep, #A) ? fprintf(fp, #A"_is_consumable_job=1\n") : fprintf(fp, #A"_is_consumable_job=0\n") +#define WRITE_COMPLEX_ATTR(A, T) \ + fprintf(fp, #A"=%s\n", sge_exec_job_get_limit(&dstr_limit, QU_##A, #A, T, master_q, jatep, petep, qualified_hostname)) + \ + { + DSTRING_STATIC(dstr_limit, 64); + WRITE_COMPLEX_AND_CONSUMABLE_ATTR(h_vmem, TYPE_MEM); + WRITE_COMPLEX_AND_CONSUMABLE_ATTR(s_vmem, TYPE_MEM); - WRITE_COMPLEX_AND_CONSUMABLE_ATTR(h_cpu); - WRITE_COMPLEX_AND_CONSUMABLE_ATTR(s_cpu); + WRITE_COMPLEX_AND_CONSUMABLE_ATTR(h_cpu, TYPE_TIM); + WRITE_COMPLEX_AND_CONSUMABLE_ATTR(s_cpu, TYPE_TIM); - WRITE_COMPLEX_AND_CONSUMABLE_ATTR(h_stack); - WRITE_COMPLEX_AND_CONSUMABLE_ATTR(s_stack); + WRITE_COMPLEX_AND_CONSUMABLE_ATTR(h_stack, TYPE_MEM); + WRITE_COMPLEX_AND_CONSUMABLE_ATTR(s_stack, TYPE_MEM); - WRITE_COMPLEX_AND_CONSUMABLE_ATTR(h_data); - WRITE_COMPLEX_AND_CONSUMABLE_ATTR(s_data); + WRITE_COMPLEX_AND_CONSUMABLE_ATTR(h_data, TYPE_MEM); + WRITE_COMPLEX_AND_CONSUMABLE_ATTR(s_data, TYPE_MEM); - fprintf(fp, "h_core=%s\n", lGetString(master_q, QU_h_core)); - fprintf(fp, "s_core=%s\n", lGetString(master_q, QU_s_core)); + // @todo why not use WRITE_COMPLEX_AND_CONSUMABLE_ATTR? Can't they be made consumable? + WRITE_COMPLEX_ATTR(h_core, TYPE_MEM); + WRITE_COMPLEX_ATTR(s_core, TYPE_MEM); - fprintf(fp, "h_rss=%s\n", lGetString(master_q, QU_h_rss)); - fprintf(fp, "s_rss=%s\n", lGetString(master_q, QU_s_rss)); + WRITE_COMPLEX_ATTR(h_rss, TYPE_MEM); + WRITE_COMPLEX_ATTR(s_rss, TYPE_MEM); - fprintf(fp, "h_fsize=%s\n", lGetString(master_q, QU_h_fsize)); - fprintf(fp, "s_fsize=%s\n", lGetString(master_q, QU_s_fsize)); + WRITE_COMPLEX_ATTR(h_fsize, TYPE_MEM); + WRITE_COMPLEX_ATTR(s_fsize, TYPE_MEM); + } { char *s; diff --git a/source/daemons/execd/execd_job_exec.cc b/source/daemons/execd/execd_job_exec.cc index 751d39e49f..0b00888ad3 100644 --- a/source/daemons/execd/execd_job_exec.cc +++ b/source/daemons/execd/execd_job_exec.cc @@ -34,6 +34,7 @@ ************************************************************************/ /*___INFO__MARK_END__*/ #include +#include #include #include #include @@ -550,37 +551,78 @@ static lList * job_get_queue_for_task(lListElem *jatep, lListElem *petep, const char *qualified_hostname, const char *queuename) { - const lListElem *this_q, *gdil_ep; - DENTER(TOP_LAYER); - for_each_ep(gdil_ep, lGetList(jatep, JAT_granted_destin_identifier_list)) { - /* if a certain queuename is requested, check only this queue */ - if (queuename != nullptr && - strcmp(queuename, lGetString(gdil_ep, JG_qname)) != 0) { - DTRACE; - continue; - } + bool on_master_host = lGetUlong(jatep, JAT_status) == JSLAVE ? false : true; + const lListElem *pe = lGetObject(jatep, JAT_pe_object); + if (pe == nullptr) { + // can not really happen, we start a task for a tightly integrated parallel job + DRETURN(nullptr); + } - this_q = lGetObject(gdil_ep, JG_queue); + // if we are on the master host and master_forks_slaves is set, not a single qrsh -inherit is allowed + if (on_master_host && lGetBool(pe, PE_master_forks_slaves)) { + DRETURN(nullptr); + } + + const lList *gdil = lGetList(jatep, JAT_granted_destin_identifier_list); + + // in case of daemon_forks_slaves we have to check if we may start more tasks + if (lGetBool(pe, PE_daemon_forks_slaves)) { + int max_slots = INT_MAX; + if (on_master_host) { + // on the master host we can have the job script itself + one daemon starting slave tasks + max_slots = 2; + } else { + // on a slave host we can only have the daemon starting slave task + max_slots = 1; + } - DTRACE; + // we can have multiple queue instances on the host + // count how many slots are used in total + // and check if we may still start more + int total_slots_used = 0; + const void *iterator = nullptr; + const lListElem *next_gdil_ep = lGetElemHostFirst(gdil, JG_qhostname, qualified_hostname, &iterator); + const lListElem *gdil_ep; + while ((gdil_ep = next_gdil_ep) != nullptr) { + next_gdil_ep = lGetElemHostNext(gdil, JG_qhostname, qualified_hostname, &iterator); + + const lListElem *queue = lGetObject(gdil_ep, JG_queue); + if (queue != nullptr) { + total_slots_used += qinstance_slots_used(queue); + if (total_slots_used >= max_slots) { + // all possible tasks are already running + DRETURN(nullptr); + } + } + } + } - /* Queue must exist and be on this host */ - if (this_q != nullptr && - sge_hostcmp(lGetHost(gdil_ep, JG_qhostname), - qualified_hostname) == 0) { + // if we get here we may still start tasks + // select a queue + const void *iterator = nullptr; + const lListElem *next_gdil_ep = lGetElemHostFirst(gdil, JG_qhostname, qualified_hostname, &iterator); + const lListElem *gdil_ep; + while ((gdil_ep = next_gdil_ep) != nullptr) { + next_gdil_ep = lGetElemHostNext(gdil, JG_qhostname, qualified_hostname, &iterator); - DTRACE; + // if a specific queue is requested, skip non matching queues + if (queuename != nullptr && sge_strnullcmp(queuename, lGetString(gdil_ep, JG_qname)) != 0) { + continue; + } + const lListElem *queue = lGetObject(gdil_ep, JG_queue); + if (queue != nullptr) { /* Queue must have free slots */ - if (qinstance_slots_used(this_q) < (int)lGetUlong(this_q, QU_job_slots)) { - lList *jat_gdil = job_set_queue_info_in_task(qualified_hostname, lGetString(gdil_ep, JG_qname), - petep); + if (qinstance_slots_used(queue) < (int)lGetUlong(queue, QU_job_slots)) { + lList *jat_gdil = job_set_queue_info_in_task(qualified_hostname, lGetString(gdil_ep, JG_qname), petep); DRETURN(jat_gdil); } } } + + // if we get here we didn't find a queue instance with free slots DRETURN(nullptr); } diff --git a/source/daemons/qmaster/sge_give_jobs.cc b/source/daemons/qmaster/sge_give_jobs.cc index 319de7fd54..9ad4bac6ff 100644 --- a/source/daemons/qmaster/sge_give_jobs.cc +++ b/source/daemons/qmaster/sge_give_jobs.cc @@ -186,7 +186,7 @@ static int queue_field[] = {QU_qhostname, /************************************************************************ Master function to give job to the execd. - We make asynchron sends and implement a retry mechanism. + We make asynchronous sends and implement a retry mechanism. Do everything to make sure the execd is prepared for receiving the job. ************************************************************************/ /* pe = is nullptr for serial jobs*/ diff --git a/source/daemons/qmaster/sge_pe_qmaster.cc b/source/daemons/qmaster/sge_pe_qmaster.cc index da3363db55..25fd972440 100644 --- a/source/daemons/qmaster/sge_pe_qmaster.cc +++ b/source/daemons/qmaster/sge_pe_qmaster.cc @@ -184,6 +184,12 @@ pe_mod(lList **alpp, lListElem *new_pe, lListElem *pe, /* reduced */ /* ---- PE_accounting_summary */ attr_mod_bool(pe, new_pe, PE_accounting_summary, "accounting_summary"); + /* ---- PE_ignore_slave_requests_on_master_host */ + attr_mod_bool(pe, new_pe, PE_ignore_slave_requests_on_master_host, "ign_sreq_on_mhost"); + /* ---- PE_master_forks_slaves */ + attr_mod_bool(pe, new_pe, PE_master_forks_slaves, "master_forks_slaves"); + /* ---- PE_daemon_forks_slaves */ + attr_mod_bool(pe, new_pe, PE_daemon_forks_slaves, "daemon_forks_slaves"); /* -------- PE_resource_utilization */ if (add) { diff --git a/source/daemons/qmaster/sge_sched_prepare_data.cc b/source/daemons/qmaster/sge_sched_prepare_data.cc index 3a2bbdbcd2..c291bfbd9f 100644 --- a/source/daemons/qmaster/sge_sched_prepare_data.cc +++ b/source/daemons/qmaster/sge_sched_prepare_data.cc @@ -262,6 +262,7 @@ static const int pe_nm[] = { PE_job_is_first_task, PE_resource_utilization, PE_urgency_slots, + PE_ignore_slave_requests_on_master_host, NoName }; diff --git a/source/daemons/qmaster/sge_sched_thread_rsmap.cc b/source/daemons/qmaster/sge_sched_thread_rsmap.cc index d43327119e..45ff9b0c0d 100644 --- a/source/daemons/qmaster/sge_sched_thread_rsmap.cc +++ b/source/daemons/qmaster/sge_sched_thread_rsmap.cc @@ -121,7 +121,7 @@ gru_list_add_request(lList **granted_resources_list, const char *name, u_long32 DRETURN(false); } } - DPRINTF(" ==> gru_list_add_request: booking %s: " sge_u32 " * %f from %host %s\n", name, slots, amount, host_name); + DPRINTF(" ==> gru_list_add_request: booking %s: " sge_u32 " * %f from host %s\n", name, slots, amount, host_name); lListElem *gru = gru_list_search(*granted_resources_list, name, host_name); if (gru == nullptr) { DPRINTF(" -> adding new GRU\n"); diff --git a/source/dist/mpi/mpi.template b/source/dist/mpi/mpi.template index 016f47039b..21168467ff 100644 --- a/source/dist/mpi/mpi.template +++ b/source/dist/mpi/mpi.template @@ -1,10 +1,14 @@ -pe_name mpi -slots -user_lists NONE -xuser_lists NONE -start_proc_args /mpi/startmpi.sh $pe_hostfile -stop_proc_args /mpi/stopmpi.sh -allocation_rule $round_robin -control_slaves FALSE -job_is_first_task TRUE -urgency_slots min +pe_name mpi +slots +user_lists NONE +xuser_lists NONE +start_proc_args /mpi/startmpi.sh $pe_hostfile +stop_proc_args /mpi/stopmpi.sh +allocation_rule $round_robin +control_slaves FALSE +job_is_first_task TRUE +urgency_slots min +accounting_summary FALSE +ign_sreq_on_mhosts FALSE +master_forks_slaves FALSE +daemon_forks_slaves FALSE \ No newline at end of file diff --git a/source/dist/mpi/mpich.template b/source/dist/mpi/mpich.template index f85b882b34..4ff169e4dd 100644 --- a/source/dist/mpi/mpich.template +++ b/source/dist/mpi/mpich.template @@ -1,10 +1,14 @@ -pe_name mpich -slots -user_lists NONE -xuser_lists NONE -start_proc_args /mpi/startmpi.sh -catch_rsh $pe_hostfile -stop_proc_args /mpi/stopmpi.sh -allocation_rule $round_robin -control_slaves TRUE -job_is_first_task FALSE -urgency_slots min +pe_name mpich +slots +user_lists NONE +xuser_lists NONE +start_proc_args /mpi/startmpi.sh -catch_rsh $pe_hostfile +stop_proc_args /mpi/stopmpi.sh +allocation_rule $round_robin +control_slaves TRUE +job_is_first_task FALSE +urgency_slots min +accounting_summary FALSE +ign_sreq_on_mhosts FALSE +master_forks_slaves FALSE +daemon_forks_slaves FALSE \ No newline at end of file diff --git a/source/dist/util/resources/pe/make b/source/dist/util/resources/pe/make index 40959b5417..ec4a6aa61f 100644 --- a/source/dist/util/resources/pe/make +++ b/source/dist/util/resources/pe/make @@ -1,15 +1,18 @@ -# Version: 6.2 +# Version: 9.0.0 # # DO NOT MODIFY THIS FILE MANUALLY! # -pe_name make -slots 999 -user_lists NONE -xuser_lists NONE -start_proc_args NONE -stop_proc_args NONE -allocation_rule $round_robin -control_slaves TRUE -job_is_first_task FALSE -urgency_slots min -accounting_summary TRUE +pe_name make +slots 999 +user_lists NONE +xuser_lists NONE +start_proc_args NONE +stop_proc_args NONE +allocation_rule $round_robin +control_slaves TRUE +job_is_first_task FALSE +urgency_slots min +accounting_summary TRUE +ign_sreq_on_mhosts FALSE +master_forks_slaves FALSE +daemon_forks_slaves FALSE \ No newline at end of file diff --git a/source/dist/util/resources/pe/make.sge_pqs_api b/source/dist/util/resources/pe/make.sge_pqs_api deleted file mode 100644 index 1cda19d1a9..0000000000 --- a/source/dist/util/resources/pe/make.sge_pqs_api +++ /dev/null @@ -1,16 +0,0 @@ -# Version: 6.2 -# -# DO NOT MODIFY THIS FILE MANUALLY! -# -pe_name make -slots 999 -user_lists NONE -xuser_lists NONE -start_proc_args NONE -stop_proc_args NONE -allocation_rule $round_robin -control_slaves TRUE -job_is_first_task FALSE -urgency_slots min -qsort_args NONE -accounting_summary TRUE diff --git a/source/libs/sched/sge_select_queue.cc b/source/libs/sched/sge_select_queue.cc index 5354b69588..8c37e9ce2c 100755 --- a/source/libs/sched/sge_select_queue.cc +++ b/source/libs/sched/sge_select_queue.cc @@ -6434,6 +6434,12 @@ parallel_rc_slots_by_time(const sge_assignment_t *a, int *slots, int *slots_qend // when calculating the max_slots after matching the slave requests we need to add the one slot we might // have got for the master task int master_slot = 0, master_slot_qend = 0; + // we might not have global or master requests at all + // so lets assume that a master task can run, + // if global or master request matching fails then revert master_slot / master_slot_qend to 0 + if ((need_master || is_master_host) && lGetBool (a->pe, PE_job_is_first_task)) { + master_slot = master_slot_qend = 1; // we have a master slot now and later + } lListElem *jrs; for_each_rw (jrs, lGetListRW(a->job, JB_request_set_list)) { u_long32 scope = lGetUlong(jrs, JRS_scope); @@ -6450,8 +6456,6 @@ parallel_rc_slots_by_time(const sge_assignment_t *a, int *slots, int *slots_qend // need to do it per request? Or only if there is some overlap between the master and slave requests? // We have such scenarios in the scope_basic test (-scope master -l int=1 -scope slave -l dbl=2, // and they work fine. - master_slot = 1; // already have the master task, need to add it to the possible slave tasks - // @todo anything to do about master_slot_qend? } continue; } else { @@ -6475,9 +6479,25 @@ parallel_rc_slots_by_time(const sge_assignment_t *a, int *slots, int *slots_qend continue; } } - // @todo do the same check as above also for the global requests + // @todo do the same check as above also for the global request + + // consider PE setting ign_sreq_on_mhost if we are on the (potential) master host + if (scope == JRS_SCOPE_SLAVE && + (need_master || is_master_host) && + lGetBool(a->pe, PE_ignore_slave_requests_on_master_host)) { + if (master_slot != 0) { + // we can run the master task here, ignore slave requests + DPRINTF("%s: parallel_rc_slots_by_time() ign_sreq_on_mhost TRUE\n", object_name); + continue; + } else { + // we cannot run the master task here, try to use the host/the queue for slave tasks + DPRINTF("%s: parallel_rc_slots_by_time() ign_sreq_on_mhost TRUE but no master task, potential slave host\n", + object_name); + } + } - DPRINTF("%s: parallel_rc_slots_by_time() testing %s requests\n", object_name, job_scope_name(scope)); + DPRINTF("%s: parallel_rc_slots_by_time() testing %s requests, master_slot = %d\n", object_name, + job_scope_name(scope), master_slot); lList *requests = lGetListRW(jrs, JRS_hard_resource_list); for_each_rw (req, requests) { @@ -6486,6 +6506,7 @@ parallel_rc_slots_by_time(const sge_assignment_t *a, int *slots, int *slots_qend result = ri_slots_by_time(a, &avail, &avail_qend, rue_list, req, load_attr, total_list, master_usage, queue, layer, lc_factor, &reason, allow_non_requestable, false, object_name); + DPRINTF(" -> ri_slots_by_time returned %d\n", result); if (result == DISPATCH_NEVER_CAT || result == DISPATCH_NEVER_JOB) { // if we are checking global requests, and it is a CONSUMABLE_JOB: // special handling: the queue can still be used as slave queue @@ -6529,19 +6550,27 @@ parallel_rc_slots_by_time(const sge_assignment_t *a, int *slots, int *slots_qend switch (result) { case DISPATCH_OK: master_usage = requests; // for slave matching need to consider what the master task would consume - if (lGetBool (a->pe, PE_job_is_first_task)) { - master_slot = master_slot_qend = 1; // we have a master slot now and later + // @todo sometimes ri_slots_by_time() seems to return 0 (DISPATCH_OK) instead of DISPATCH_NOT_AT_TIME + if (avail == 0) { + master_slot = 0; + master_usage = nullptr; + host_or_queue_clear_tags(object_name, queue, a->queue_list, TAG4SCHED_MASTER); } break; case DISPATCH_NOT_AT_TIME: // not suitable now, but later host_or_queue_clear_tags(object_name, queue, a->queue_list, TAG4SCHED_MASTER); master_usage = requests; // for slave matching need to consider what the master task would consume - if (lGetBool (a->pe, PE_job_is_first_task)) { - master_slot = 0; - master_slot_qend = 1; // for later we have a master slot - } + DPRINTF(" --> we matched the master requests, -> not at time, clearing master tags and master_slot\n"); + master_slot = 0; break; + case DISPATCH_MISSING_ATTR: + // we are e.g. on queue level, e.g. a memory consumable does not exist here + // but has already been matched on higher levels - fine, we can use the queue as master queue + if (tag < lGetUlong(req, CE_tagged)) { + DPRINTF(" --> we matched the master requests, -> missing attr, but already satisfied\n"); + } + break; case DISPATCH_NEVER_CAT: case DISPATCH_NEVER_JOB: DPRINTF(" --> we matched the master requests, -> never_cat or never_job, clearing master tags\n"); @@ -6573,12 +6602,15 @@ parallel_rc_slots_by_time(const sge_assignment_t *a, int *slots, int *slots_qend if (lGetUlong(req, CE_tagged) < tag && tag != RQS_TAG) lSetUlong(req, CE_tagged, tag); + // slave requests matched we might have to add one slot for the master task if (scope != JRS_SCOPE_MASTER) { - if (avail < INT_MAX) { - avail += master_slot; - } - if (avail_qend < INT_MAX) { - avail_qend += master_slot_qend; + if (scope == JRS_SCOPE_SLAVE) { + if (avail < INT_MAX) { + avail += master_slot; + } + if (avail_qend < INT_MAX) { + avail_qend += master_slot_qend; + } } max_slots = MIN(max_slots, avail); max_slots_qend = MIN(max_slots_qend, avail_qend); @@ -6630,26 +6662,30 @@ parallel_rc_slots_by_time(const sge_assignment_t *a, int *slots, int *slots_qend // if we still search for a master host/queue, then need to remember that this host/queue is suitable switch (result) { case DISPATCH_OK: - // if the global requests have matched (and there might not be any master specific requests) - // the host / the queue might be suited for the master task - need to consider its slot - if (need_master && lGetBool (a->pe, PE_job_is_first_task)) { - master_slot = master_slot_qend = 1; // we have a master slot now and later - } break; case DISPATCH_NOT_AT_TIME: // if the global requests have matched (and there might not be any master specific requests) // the host / the queue might be suited for the master task - need to consider its slot if (need_master && lGetBool (a->pe, PE_job_is_first_task)) { master_slot = 0; - master_slot_qend = 1; // we have a master slot later + } + break; + case DISPATCH_MISSING_ATTR: + // we are e.g. on queue level, e.g. a memory consumable does not exist here + // but has already been matched on higher levels - fine, we can use the queue as master queue + if (tag < lGetUlong(req, CE_tagged)) { + DPRINTF(" --> we matched the global requests, -> missing attr, but already satisfied\n"); } break; default: - // just fall through to result handling below + // cannot be master host - actually cannot run anything at all, see below + master_slot = master_slot_qend = 0; break; } } + // @todo if one request of a scope has not matched, we can break out of the request loop (?) } // end for each request per scope + // @todo if global requests have not matched, no need to check other scopes } // end for each scope *slots = max_slots; diff --git a/source/libs/sgeobj/sge_qinstance.cc b/source/libs/sgeobj/sge_qinstance.cc index 5959ce23a7..9afc5c89d3 100644 --- a/source/libs/sgeobj/sge_qinstance.cc +++ b/source/libs/sgeobj/sge_qinstance.cc @@ -37,6 +37,7 @@ #include #include "uti/sge_dstring.h" +#include "uti/sge_string.h" #include "uti/sge_log.h" #include "uti/sge_rmon_macros.h" @@ -1143,7 +1144,16 @@ rc_debit_consumable(const lListElem *jep, const lListElem *pe, lListElem *ep, co // now do booking for the (remaining) slave tasks, if any if (slave_debit_slots != 0) { double slave_dval = 0.0; - tmp_ret = job_get_contribution_by_scope(jep, nullptr, name, &slave_dval, dcep, JRS_SCOPE_SLAVE); + // if we are on the master host, and we shall ignore slave requests to booking only for slots + if (is_master_task && lGetBool(pe, PE_ignore_slave_requests_on_master_host)) { + tmp_ret = true; + if (sge_strnullcmp(name, SGE_ATTR_SLOTS) == 0) { + slave_dval = 1.0; + } + } else { + // we consider the slave requests + tmp_ret = job_get_contribution_by_scope(jep, nullptr, name, &slave_dval, dcep, JRS_SCOPE_SLAVE); + } if (tmp_ret && slave_dval != 0.0) { // we have a slave request slave_debit_slots = consumable_get_debit_slots(consumable, slave_debit_slots); diff --git a/source/libs/sgeobj/sge_ulong.cc b/source/libs/sgeobj/sge_ulong.cc index 68e4825acb..acaea376ca 100644 --- a/source/libs/sgeobj/sge_ulong.cc +++ b/source/libs/sgeobj/sge_ulong.cc @@ -232,6 +232,17 @@ bool double_print_to_dstring(double value, dstring *string) DRETURN(ret); } +bool double_print_to_dstring(double value, dstring *string, u_long32 type) { + switch (type) { + case TYPE_TIM: + return double_print_time_to_dstring(value, string); + case TYPE_MEM: + return double_print_memory_to_dstring(value, string); + default: + return double_print_to_dstring(value, string); + } +} + /****** sge_ulong/ulong_parse_date_time_from_string() ************************** * NAME * ulong_parse_date_time_from_string() -- Parse string into date/time ulong diff --git a/source/libs/sgeobj/sge_ulong.h b/source/libs/sgeobj/sge_ulong.h index 9eae8600f6..b83d6d9cb4 100644 --- a/source/libs/sgeobj/sge_ulong.h +++ b/source/libs/sgeobj/sge_ulong.h @@ -49,9 +49,12 @@ double_print_memory_to_dstring(double value, dstring *string); bool double_print_int_to_dstring(double value, dstring *string); -bool +bool double_print_to_dstring(double value, dstring *string); +bool +double_print_to_dstring(double value, dstring *string, u_long32 type); + bool ulong_parse_date_time_from_string(u_long32 *this_ulong, lList **alpp, const char *date_str); diff --git a/source/libs/spool/flatfile/sge_flatfile_obj.cc b/source/libs/spool/flatfile/sge_flatfile_obj.cc index 103345dd57..214b127e7a 100644 --- a/source/libs/spool/flatfile/sge_flatfile_obj.cc +++ b/source/libs/spool/flatfile/sge_flatfile_obj.cc @@ -538,18 +538,21 @@ spooling_field AR_fields[] = { }; spooling_field PE_fields[] = { - { PE_name, 18, "pe_name", false, nullptr, false, nullptr, nullptr}, - { PE_slots, 18, "slots", false, nullptr, false, nullptr, nullptr}, - { PE_user_list, 18, "user_lists", false, US_sub_fields, false, nullptr, nullptr}, - { PE_xuser_list, 18, "xuser_lists", false, US_sub_fields, false, nullptr, nullptr}, - { PE_start_proc_args, 18, "start_proc_args", false, nullptr, false, nullptr, nullptr}, - { PE_stop_proc_args, 18, "stop_proc_args", false, nullptr, false, nullptr, nullptr}, - { PE_allocation_rule, 18, "allocation_rule", false, nullptr, false, nullptr, nullptr}, - { PE_control_slaves, 18, "control_slaves", false, nullptr, false, nullptr, nullptr}, - { PE_job_is_first_task, 18, "job_is_first_task",false, nullptr, false, nullptr, nullptr}, - { PE_urgency_slots, 18, "urgency_slots", false, nullptr, false, nullptr, nullptr}, - { PE_accounting_summary, 18, "accounting_summary", false, nullptr, false, nullptr, nullptr}, - { NoName, 18, nullptr, false, nullptr, false, nullptr, nullptr} + { PE_name, 20, "pe_name", false, nullptr, false, nullptr, nullptr}, + { PE_slots, 20, "slots", false, nullptr, false, nullptr, nullptr}, + { PE_user_list, 20, "user_lists", false, US_sub_fields, false, nullptr, nullptr}, + { PE_xuser_list, 20, "xuser_lists", false, US_sub_fields, false, nullptr, nullptr}, + { PE_start_proc_args, 20, "start_proc_args", false, nullptr, false, nullptr, nullptr}, + { PE_stop_proc_args, 20, "stop_proc_args", false, nullptr, false, nullptr, nullptr}, + { PE_allocation_rule, 20, "allocation_rule", false, nullptr, false, nullptr, nullptr}, + { PE_control_slaves, 20, "control_slaves", false, nullptr, false, nullptr, nullptr}, + { PE_job_is_first_task, 20, "job_is_first_task", false, nullptr, false, nullptr, nullptr}, + { PE_urgency_slots, 20, "urgency_slots", false, nullptr, false, nullptr, nullptr}, + { PE_accounting_summary, 20, "accounting_summary", false, nullptr, false, nullptr, nullptr}, + { PE_ignore_slave_requests_on_master_host, 20, "ign_sreq_on_mhost", false, nullptr, false, nullptr, nullptr}, + { PE_master_forks_slaves, 20, "master_forks_slaves", false, nullptr, false, nullptr, nullptr}, + { PE_daemon_forks_slaves, 20, "daemon_forks_slaves", false, nullptr, false, nullptr, nullptr}, + { NoName, 20, nullptr, false, nullptr, false, nullptr, nullptr} }; spooling_field RQS_fields[] = {