From 865d3c9d973ad186bb5b67dbc2cbde9dc2aabefe Mon Sep 17 00:00:00 2001 From: Konrad1991 Date: Fri, 6 Sep 2024 16:31:00 +0200 Subject: [PATCH] fixed bug in order of the dfs which are passed to the task queue in the batch mode --- tsf/R/DBA_Server.R | 56 ++++++++++++++++++++++++-------------------- tsf/R/GDA_Server.R | 53 +++++++++++++++++++++++------------------- tsf/R/HG_Server.R | 58 +++++++++++++++++++++++++--------------------- tsf/R/IDA_Server.R | 53 +++++++++++++++++++++++------------------- tsf/R/TaskQueue.R | 5 ++-- 5 files changed, 124 insertions(+), 101 deletions(-) diff --git a/tsf/R/DBA_Server.R b/tsf/R/DBA_Server.R index 4e4e413..0327cdb 100644 --- a/tsf/R/DBA_Server.R +++ b/tsf/R/DBA_Server.R @@ -143,7 +143,8 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { # NOTE: End of model specific code # =============================================================================== - + + get_opti_result <- function() { opti_result()$parameter } @@ -619,15 +620,8 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { } # 3. Fill task queue - groups <- ceiling(1:size / (size / num_cores)) - dfs <- df_list() - if (length(groups) > length(dfs)) { - counter <- 1 - while (length(groups) > length(dfs)) { - dfs <- c(dfs, dfs[counter]) - counter <- counter + 1 - } - } + # TODO: add df idx and num rep info directly and not via messages + dfs <- rep(df_list(), each = num_rep) task_queue(TaskQueue$new( get_Model(), lb, ub, dfs, @@ -666,21 +660,7 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { cancel_batch_clicked(TRUE) }) - # observe status - observe({ - invalidateLater(invalid_time()) - req(nclicks() != 0) - req(!is.null(task_queue())) - req(task_queue()$filled) - # is cancel_batch_clicked - if (cancel_batch_clicked()) { - task_queue()$interrupt() - setup_batch_done(TRUE) - cancel_batch_clicked(FALSE) - nclicks(0) - send_and_read_info(paste0("release: ", session$token)) - return(NULL) - } + update_status <- function() { # NOTE: check status # (errors are not printed otherwise screen is full of errors) stdout(task_queue()$get_status(stdout())) @@ -701,10 +681,31 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { return("Error") }) req(is.character(m)) + progress_bar <- task_queue()$get_progress_bar() + m <- paste(m, "\n", progress_bar) session$sendCustomMessage( type = get_update_field_batch(), list(message = m) ) + } + + + # observe status + observe({ + invalidateLater(invalid_time()) + req(nclicks() != 0) + req(!is.null(task_queue())) + req(task_queue()$filled) + # is cancel_batch_clicked + if (cancel_batch_clicked()) { + task_queue()$interrupt() + setup_batch_done(TRUE) + cancel_batch_clicked(FALSE) + nclicks(0) + send_and_read_info(paste0("release: ", session$token)) + return(NULL) + } + update_status() }) get_data <- reactive({ @@ -726,6 +727,11 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { get_data() batch_results_created(TRUE) stdout(NULL) + # NOTE: clear status + session$sendCustomMessage( + type = get_update_field_batch(), + list(message = "") + ) values <- task_queue()$results result_batch(values) output$batch_data_plot <- renderPlotly({ diff --git a/tsf/R/GDA_Server.R b/tsf/R/GDA_Server.R index d1a3c39..80af5fa 100644 --- a/tsf/R/GDA_Server.R +++ b/tsf/R/GDA_Server.R @@ -625,15 +625,8 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { } # 3. Fill task queue - groups <- ceiling(1:size / (size / num_cores)) - dfs <- df_list() - if (length(groups) > length(dfs)) { - counter <- 1 - while (length(groups) > length(dfs)) { - dfs <- c(dfs, dfs[counter]) - counter <- counter + 1 - } - } + # TODO: add df idx and num rep info directly and not via messages + dfs <- rep(df_list(), each = num_rep) task_queue(TaskQueue$new( get_Model(), lb, ub, dfs, @@ -672,21 +665,7 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { cancel_batch_clicked(TRUE) }) - # observe status - observe({ - invalidateLater(invalid_time()) - req(nclicks() != 0) - req(!is.null(task_queue())) - req(task_queue()$filled) - # is cancel_batch_clicked - if (cancel_batch_clicked()) { - task_queue()$interrupt() - setup_batch_done(TRUE) - cancel_batch_clicked(FALSE) - nclicks(0) - send_and_read_info(paste0("release: ", session$token)) - return(NULL) - } + update_status <- function() { # NOTE: check status # (errors are not printed otherwise screen is full of errors) stdout(task_queue()$get_status(stdout())) @@ -707,10 +686,31 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { return("Error") }) req(is.character(m)) + progress_bar <- task_queue()$get_progress_bar() + m <- paste(m, "\n", progress_bar) session$sendCustomMessage( type = get_update_field_batch(), list(message = m) ) + } + + + # observe status + observe({ + invalidateLater(invalid_time()) + req(nclicks() != 0) + req(!is.null(task_queue())) + req(task_queue()$filled) + # is cancel_batch_clicked + if (cancel_batch_clicked()) { + task_queue()$interrupt() + setup_batch_done(TRUE) + cancel_batch_clicked(FALSE) + nclicks(0) + send_and_read_info(paste0("release: ", session$token)) + return(NULL) + } + update_status() }) get_data <- reactive({ @@ -732,6 +732,11 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { get_data() batch_results_created(TRUE) stdout(NULL) + # NOTE: clear status + session$sendCustomMessage( + type = get_update_field_batch(), + list(message = "") + ) values <- task_queue()$results result_batch(values) output$batch_data_plot <- renderPlotly({ diff --git a/tsf/R/HG_Server.R b/tsf/R/HG_Server.R index 897c547..d7a1479 100644 --- a/tsf/R/HG_Server.R +++ b/tsf/R/HG_Server.R @@ -142,8 +142,8 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { # NOTE: End of model specific code # =============================================================================== - - get_opti_result <- function() { + + get_opti_result <- function() { opti_result()$parameter } @@ -618,15 +618,8 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { } # 3. Fill task queue - groups <- ceiling(1:size / (size / num_cores)) - dfs <- df_list() - if (length(groups) > length(dfs)) { - counter <- 1 - while (length(groups) > length(dfs)) { - dfs <- c(dfs, dfs[counter]) - counter <- counter + 1 - } - } + # TODO: add df idx and num rep info directly and not via messages + dfs <- rep(df_list(), each = num_rep) task_queue(TaskQueue$new( get_Model(), lb, ub, dfs, @@ -665,21 +658,7 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { cancel_batch_clicked(TRUE) }) - # observe status - observe({ - invalidateLater(invalid_time()) - req(nclicks() != 0) - req(!is.null(task_queue())) - req(task_queue()$filled) - # is cancel_batch_clicked - if (cancel_batch_clicked()) { - task_queue()$interrupt() - setup_batch_done(TRUE) - cancel_batch_clicked(FALSE) - nclicks(0) - send_and_read_info(paste0("release: ", session$token)) - return(NULL) - } + update_status <- function() { # NOTE: check status # (errors are not printed otherwise screen is full of errors) stdout(task_queue()$get_status(stdout())) @@ -700,10 +679,31 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { return("Error") }) req(is.character(m)) + progress_bar <- task_queue()$get_progress_bar() + m <- paste(m, "\n", progress_bar) session$sendCustomMessage( type = get_update_field_batch(), list(message = m) ) + } + + + # observe status + observe({ + invalidateLater(invalid_time()) + req(nclicks() != 0) + req(!is.null(task_queue())) + req(task_queue()$filled) + # is cancel_batch_clicked + if (cancel_batch_clicked()) { + task_queue()$interrupt() + setup_batch_done(TRUE) + cancel_batch_clicked(FALSE) + nclicks(0) + send_and_read_info(paste0("release: ", session$token)) + return(NULL) + } + update_status() }) get_data <- reactive({ @@ -725,6 +725,11 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { get_data() batch_results_created(TRUE) stdout(NULL) + # NOTE: clear status + session$sendCustomMessage( + type = get_update_field_batch(), + list(message = "") + ) values <- task_queue()$results result_batch(values) output$batch_data_plot <- renderPlotly({ @@ -753,3 +758,4 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { ) }) } + diff --git a/tsf/R/IDA_Server.R b/tsf/R/IDA_Server.R index e5ae98a..7dd8d90 100644 --- a/tsf/R/IDA_Server.R +++ b/tsf/R/IDA_Server.R @@ -625,15 +625,8 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) { } # 3. Fill task queue - groups <- ceiling(1:size / (size / num_cores)) - dfs <- df_list() - if (length(groups) > length(dfs)) { - counter <- 1 - while (length(groups) > length(dfs)) { - dfs <- c(dfs, dfs[counter]) - counter <- counter + 1 - } - } + # TODO: add df idx and num rep info directly and not via messages + dfs <- rep(df_list(), each = num_rep) task_queue(TaskQueue$new( get_Model(), lb, ub, dfs, @@ -672,21 +665,7 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) { cancel_batch_clicked(TRUE) }) - # observe status - observe({ - invalidateLater(invalid_time()) - req(nclicks() != 0) - req(!is.null(task_queue())) - req(task_queue()$filled) - # is cancel_batch_clicked - if (cancel_batch_clicked()) { - task_queue()$interrupt() - setup_batch_done(TRUE) - cancel_batch_clicked(FALSE) - nclicks(0) - send_and_read_info(paste0("release: ", session$token)) - return(NULL) - } + update_status <- function() { # NOTE: check status # (errors are not printed otherwise screen is full of errors) stdout(task_queue()$get_status(stdout())) @@ -707,10 +686,31 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) { return("Error") }) req(is.character(m)) + progress_bar <- task_queue()$get_progress_bar() + m <- paste(m, "\n", progress_bar) session$sendCustomMessage( type = get_update_field_batch(), list(message = m) ) + } + + + # observe status + observe({ + invalidateLater(invalid_time()) + req(nclicks() != 0) + req(!is.null(task_queue())) + req(task_queue()$filled) + # is cancel_batch_clicked + if (cancel_batch_clicked()) { + task_queue()$interrupt() + setup_batch_done(TRUE) + cancel_batch_clicked(FALSE) + nclicks(0) + send_and_read_info(paste0("release: ", session$token)) + return(NULL) + } + update_status() }) get_data <- reactive({ @@ -732,6 +732,11 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) { get_data() batch_results_created(TRUE) stdout(NULL) + # NOTE: clear status + session$sendCustomMessage( + type = get_update_field_batch(), + list(message = "") + ) values <- task_queue()$results result_batch(values) output$batch_data_plot <- renderPlotly({ diff --git a/tsf/R/TaskQueue.R b/tsf/R/TaskQueue.R index 892aa88..6f3a094 100644 --- a/tsf/R/TaskQueue.R +++ b/tsf/R/TaskQueue.R @@ -46,6 +46,7 @@ Process <- R6::R6Class( ) ) +# TODO: test edge cases so that no indxe error occur anymore TaskQueue <- R6::R6Class( "TaskQueue", public = list( @@ -198,10 +199,10 @@ TaskQueue <- R6::R6Class( get_status = function(stdout) { status <- character(length(self$processes)) for (i in seq_len(length(self$processes))) { - # if (self$processes[[i]]$is_alive()) { + if (self$processes[[i]]$is_alive()) { # TODO: can be removed. But maybe slows down code? status[i] <- print_status(self$processes[[i]]$read_output(), self$case) - # } + } } return(format_batch_status(stdout, status)) },