Skip to content

Commit

Permalink
fixed bug in order of the dfs which are passed to the task queue in t…
Browse files Browse the repository at this point in the history
…he batch mode
  • Loading branch information
Konrad1991 committed Sep 6, 2024
1 parent f48e807 commit 865d3c9
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 101 deletions.
56 changes: 31 additions & 25 deletions tsf/R/DBA_Server.R
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {

# NOTE: End of model specific code
# ===============================================================================



get_opti_result <- function() {
opti_result()$parameter
}
Expand Down Expand Up @@ -619,15 +620,8 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
}

# 3. Fill task queue
groups <- ceiling(1:size / (size / num_cores))
dfs <- df_list()
if (length(groups) > length(dfs)) {
counter <- 1
while (length(groups) > length(dfs)) {
dfs <- c(dfs, dfs[counter])
counter <- counter + 1
}
}
# TODO: add df idx and num rep info directly and not via messages
dfs <- rep(df_list(), each = num_rep)
task_queue(TaskQueue$new(
get_Model(),
lb, ub, dfs,
Expand Down Expand Up @@ -666,21 +660,7 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
cancel_batch_clicked(TRUE)
})

# observe status
observe({
invalidateLater(invalid_time())
req(nclicks() != 0)
req(!is.null(task_queue()))
req(task_queue()$filled)
# is cancel_batch_clicked
if (cancel_batch_clicked()) {
task_queue()$interrupt()
setup_batch_done(TRUE)
cancel_batch_clicked(FALSE)
nclicks(0)
send_and_read_info(paste0("release: ", session$token))
return(NULL)
}
update_status <- function() {
# NOTE: check status
# (errors are not printed otherwise screen is full of errors)
stdout(task_queue()$get_status(stdout()))
Expand All @@ -701,10 +681,31 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
return("Error")
})
req(is.character(m))
progress_bar <- task_queue()$get_progress_bar()
m <- paste(m, "\n", progress_bar)
session$sendCustomMessage(
type = get_update_field_batch(),
list(message = m)
)
}


# observe status
observe({
invalidateLater(invalid_time())
req(nclicks() != 0)
req(!is.null(task_queue()))
req(task_queue()$filled)
# is cancel_batch_clicked
if (cancel_batch_clicked()) {
task_queue()$interrupt()
setup_batch_done(TRUE)
cancel_batch_clicked(FALSE)
nclicks(0)
send_and_read_info(paste0("release: ", session$token))
return(NULL)
}
update_status()
})

get_data <- reactive({
Expand All @@ -726,6 +727,11 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
get_data()
batch_results_created(TRUE)
stdout(NULL)
# NOTE: clear status
session$sendCustomMessage(
type = get_update_field_batch(),
list(message = "")
)
values <- task_queue()$results
result_batch(values)
output$batch_data_plot <- renderPlotly({
Expand Down
53 changes: 29 additions & 24 deletions tsf/R/GDA_Server.R
Original file line number Diff line number Diff line change
Expand Up @@ -625,15 +625,8 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
}

# 3. Fill task queue
groups <- ceiling(1:size / (size / num_cores))
dfs <- df_list()
if (length(groups) > length(dfs)) {
counter <- 1
while (length(groups) > length(dfs)) {
dfs <- c(dfs, dfs[counter])
counter <- counter + 1
}
}
# TODO: add df idx and num rep info directly and not via messages
dfs <- rep(df_list(), each = num_rep)
task_queue(TaskQueue$new(
get_Model(),
lb, ub, dfs,
Expand Down Expand Up @@ -672,21 +665,7 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
cancel_batch_clicked(TRUE)
})

# observe status
observe({
invalidateLater(invalid_time())
req(nclicks() != 0)
req(!is.null(task_queue()))
req(task_queue()$filled)
# is cancel_batch_clicked
if (cancel_batch_clicked()) {
task_queue()$interrupt()
setup_batch_done(TRUE)
cancel_batch_clicked(FALSE)
nclicks(0)
send_and_read_info(paste0("release: ", session$token))
return(NULL)
}
update_status <- function() {
# NOTE: check status
# (errors are not printed otherwise screen is full of errors)
stdout(task_queue()$get_status(stdout()))
Expand All @@ -707,10 +686,31 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
return("Error")
})
req(is.character(m))
progress_bar <- task_queue()$get_progress_bar()
m <- paste(m, "\n", progress_bar)
session$sendCustomMessage(
type = get_update_field_batch(),
list(message = m)
)
}


# observe status
observe({
invalidateLater(invalid_time())
req(nclicks() != 0)
req(!is.null(task_queue()))
req(task_queue()$filled)
# is cancel_batch_clicked
if (cancel_batch_clicked()) {
task_queue()$interrupt()
setup_batch_done(TRUE)
cancel_batch_clicked(FALSE)
nclicks(0)
send_and_read_info(paste0("release: ", session$token))
return(NULL)
}
update_status()
})

get_data <- reactive({
Expand All @@ -732,6 +732,11 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
get_data()
batch_results_created(TRUE)
stdout(NULL)
# NOTE: clear status
session$sendCustomMessage(
type = get_update_field_batch(),
list(message = "")
)
values <- task_queue()$results
result_batch(values)
output$batch_data_plot <- renderPlotly({
Expand Down
58 changes: 32 additions & 26 deletions tsf/R/HG_Server.R
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {

# NOTE: End of model specific code
# ===============================================================================
get_opti_result <- function() {

get_opti_result <- function() {
opti_result()$parameter
}

Expand Down Expand Up @@ -618,15 +618,8 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
}

# 3. Fill task queue
groups <- ceiling(1:size / (size / num_cores))
dfs <- df_list()
if (length(groups) > length(dfs)) {
counter <- 1
while (length(groups) > length(dfs)) {
dfs <- c(dfs, dfs[counter])
counter <- counter + 1
}
}
# TODO: add df idx and num rep info directly and not via messages
dfs <- rep(df_list(), each = num_rep)
task_queue(TaskQueue$new(
get_Model(),
lb, ub, dfs,
Expand Down Expand Up @@ -665,21 +658,7 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
cancel_batch_clicked(TRUE)
})

# observe status
observe({
invalidateLater(invalid_time())
req(nclicks() != 0)
req(!is.null(task_queue()))
req(task_queue()$filled)
# is cancel_batch_clicked
if (cancel_batch_clicked()) {
task_queue()$interrupt()
setup_batch_done(TRUE)
cancel_batch_clicked(FALSE)
nclicks(0)
send_and_read_info(paste0("release: ", session$token))
return(NULL)
}
update_status <- function() {
# NOTE: check status
# (errors are not printed otherwise screen is full of errors)
stdout(task_queue()$get_status(stdout()))
Expand All @@ -700,10 +679,31 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
return("Error")
})
req(is.character(m))
progress_bar <- task_queue()$get_progress_bar()
m <- paste(m, "\n", progress_bar)
session$sendCustomMessage(
type = get_update_field_batch(),
list(message = m)
)
}


# observe status
observe({
invalidateLater(invalid_time())
req(nclicks() != 0)
req(!is.null(task_queue()))
req(task_queue()$filled)
# is cancel_batch_clicked
if (cancel_batch_clicked()) {
task_queue()$interrupt()
setup_batch_done(TRUE)
cancel_batch_clicked(FALSE)
nclicks(0)
send_and_read_info(paste0("release: ", session$token))
return(NULL)
}
update_status()
})

get_data <- reactive({
Expand All @@ -725,6 +725,11 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
get_data()
batch_results_created(TRUE)
stdout(NULL)
# NOTE: clear status
session$sendCustomMessage(
type = get_update_field_batch(),
list(message = "")
)
values <- task_queue()$results
result_batch(values)
output$batch_data_plot <- renderPlotly({
Expand Down Expand Up @@ -753,3 +758,4 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
)
})
}

53 changes: 29 additions & 24 deletions tsf/R/IDA_Server.R
Original file line number Diff line number Diff line change
Expand Up @@ -625,15 +625,8 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
}

# 3. Fill task queue
groups <- ceiling(1:size / (size / num_cores))
dfs <- df_list()
if (length(groups) > length(dfs)) {
counter <- 1
while (length(groups) > length(dfs)) {
dfs <- c(dfs, dfs[counter])
counter <- counter + 1
}
}
# TODO: add df idx and num rep info directly and not via messages
dfs <- rep(df_list(), each = num_rep)
task_queue(TaskQueue$new(
get_Model(),
lb, ub, dfs,
Expand Down Expand Up @@ -672,21 +665,7 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
cancel_batch_clicked(TRUE)
})

# observe status
observe({
invalidateLater(invalid_time())
req(nclicks() != 0)
req(!is.null(task_queue()))
req(task_queue()$filled)
# is cancel_batch_clicked
if (cancel_batch_clicked()) {
task_queue()$interrupt()
setup_batch_done(TRUE)
cancel_batch_clicked(FALSE)
nclicks(0)
send_and_read_info(paste0("release: ", session$token))
return(NULL)
}
update_status <- function() {
# NOTE: check status
# (errors are not printed otherwise screen is full of errors)
stdout(task_queue()$get_status(stdout()))
Expand All @@ -707,10 +686,31 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
return("Error")
})
req(is.character(m))
progress_bar <- task_queue()$get_progress_bar()
m <- paste(m, "\n", progress_bar)
session$sendCustomMessage(
type = get_update_field_batch(),
list(message = m)
)
}


# observe status
observe({
invalidateLater(invalid_time())
req(nclicks() != 0)
req(!is.null(task_queue()))
req(task_queue()$filled)
# is cancel_batch_clicked
if (cancel_batch_clicked()) {
task_queue()$interrupt()
setup_batch_done(TRUE)
cancel_batch_clicked(FALSE)
nclicks(0)
send_and_read_info(paste0("release: ", session$token))
return(NULL)
}
update_status()
})

get_data <- reactive({
Expand All @@ -732,6 +732,11 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
get_data()
batch_results_created(TRUE)
stdout(NULL)
# NOTE: clear status
session$sendCustomMessage(
type = get_update_field_batch(),
list(message = "")
)
values <- task_queue()$results
result_batch(values)
output$batch_data_plot <- renderPlotly({
Expand Down
5 changes: 3 additions & 2 deletions tsf/R/TaskQueue.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Process <- R6::R6Class(
)
)

# TODO: test edge cases so that no indxe error occur anymore
TaskQueue <- R6::R6Class(
"TaskQueue",
public = list(
Expand Down Expand Up @@ -198,10 +199,10 @@ TaskQueue <- R6::R6Class(
get_status = function(stdout) {
status <- character(length(self$processes))
for (i in seq_len(length(self$processes))) {
# if (self$processes[[i]]$is_alive()) {
if (self$processes[[i]]$is_alive()) { # TODO: can be removed. But maybe slows down code?
status[i] <-
print_status(self$processes[[i]]$read_output(), self$case)
# }
}
}
return(format_batch_status(stdout, status))
},
Expand Down

0 comments on commit 865d3c9

Please sign in to comment.