Skip to content

Commit

Permalink
[vignette + bugfix with r.apply]
Browse files Browse the repository at this point in the history
- wrote a pre version of a vignette testing and showcasing the r_exec
interface with SciDB and scidbst
- fixing misbehavior of r.apply when passing no aggregation attributes
  • Loading branch information
flahn committed Jan 13, 2017
1 parent 01c3012 commit 568e75c
Show file tree
Hide file tree
Showing 6 changed files with 538 additions and 27 deletions.
57 changes: 39 additions & 18 deletions R/apply.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ if(!isGeneric("r.apply")) {
}

.log = function(s, logfile,quote=TRUE) {
if (missing(logfile)) {
return(sprintf("cat(\"%s\n\")",s))
}
if (quote) {
return(sprintf("cat(\"%s\n\", file=\"%s\",append=TRUE)",s,logfile))
} else {
Expand All @@ -64,17 +67,38 @@ if(!isGeneric("r.apply")) {

}

# aggregates: the names of the columns in the data.frame to aggregate by
# parallel: ddply option to use parallel processing when aggregating
# df.name: the name of the data.frame used to create the data chunk data.frame
# logging: boolean, whether or not to log
# logfile: character string, where to write the log information into
# output: the key-value list with the output attributes and their type
.createDDPLYCommand = function(aggregates,parallel,df.name,logging,logfile,output) {
# write aggregate expression for the rexec_script
aggregates.string = paste("c(",paste("\"",aggregates,"\"",collapse=",",sep=""),")",sep="")
if (length(aggregates > 0)) {
aggregates.string = paste("c(",paste("\"",aggregates,"\"",collapse=",",sep=""),")",sep="")
} else {
aggregates.string = "c()"
}


#TODO you can pass additional arguments to f after .fun
ddply_cmd = paste("func.result = ddply(.data=",df.name,", .variables=",aggregates.string,", .fun=f ,.parallel=",parallel,")",sep="")

#.GlobalEnv$func.result = cbind(df,default_values)
tc_exec = sprintf("tryCatch({ %s },error = function(err) { %s$df$ = $df$[,which(names($df$) %%in%% names(output))]; var = names(output)[!names(output) %%in%% names($df$)]; default_values = as.list(rep(0,length(var))); names(default_values) = var; func.result <<- cbind($df$,default_values); })",
ddply_cmd,
sprintf("%s%s%s",
if (logging) {
paste(.log("executing function on chunk",logfile),"; ",sep="")
}else {""},
ddply_cmd,
if (logging) {
paste(.log("function executed successfully",logfile=logfile),";",sep="; ")
}else {""}),
if (logging) {
paste(.log("error in a chunk",logfile),"; ",sep="")
p1 = paste(.log("error in a chunk",logfile),"; ",sep="")
p2= paste(p1,.log("err$message",logfile=logfile,quote=FALSE),"; ",sep="")
p2
} else { "" })
tc_exec = gsub(pattern="\\$df\\$",replacement = df.name, tc_exec)

Expand Down Expand Up @@ -135,6 +159,9 @@ if(!isGeneric("r.apply")) {
commands = append(commands,paste("registerDoParallel(cores=",cores,")",sep=""))
}

# create the data.frame
commands = .appendChunkDataFrameDefinition(commands,attr)

# get the code of the function as text
if (!missing(f) && is.function(f)) {
commands = .appendUserDefinedFunctionCode(commands,f)
Expand All @@ -143,8 +170,7 @@ if(!isGeneric("r.apply")) {
}


# create the data.frame
commands = .appendChunkDataFrameDefinition(commands,attr)

# TODO option to calculate coordinates or timestamps to create or work with spatial or spatio-temporal objects


Expand Down Expand Up @@ -182,9 +208,6 @@ if(!isGeneric("r.apply")) {
aggregates = c()
}



# dot.params = list(...)
if (logging) {
commands = append(commands, .log("processing a chunk",logfile))
}
Expand All @@ -198,7 +221,7 @@ if(!isGeneric("r.apply")) {

commands = append(commands,tc_exec)

#R_EXEC probably allows in scidb just double values
#R_EXEC probably allows in scidb only double values
#https://github.com/Paradigm4/r_exec/blob/master/LogicalRExec.cpp
#line 54
commands = .appendOutputTypeConversionForR(commands,output)
Expand All @@ -215,11 +238,15 @@ if(!isGeneric("r.apply")) {
paste(commands,collapse="; ",sep=""),
array)



# clean up! there might be accidentially a semicolon between ) and {
query.R = gsub(pattern="\\)\\s*(;)\\s*\\{",replacement=") {",x=query.R)
query.R = gsub(pattern="(;;)",replacement=";",x=query.R)
query.R = gsub(pattern="(\\{\\s*;)",replacement="{",x=query.R)
query.R = gsub(pattern="(;\\s*;)",replacement="; ",x=query.R)

cat(paste(commands,collapse="\n",sep=""))
return(query.R)
}

Expand Down Expand Up @@ -432,9 +459,10 @@ if(!isGeneric("r.apply")) {
return(x)
}

#' Applys a custom R function on chunks of an array
#' Apply custom R functions on scidbst array chunks
#'
#' This function applies a custom r function on each individual scidbst array chunk using r_exec.
#' This function applies a custom r function on each individual scidbst array chunk using the r_exec interface with
#' SciDB.
#'
#' @aliases r.apply
#' @details The script that is created during this function will handle the installation of required R-packages on
Expand Down Expand Up @@ -492,13 +520,6 @@ if(!isGeneric("r.apply")) {
#' @export
setMethod("r.apply",signature(x="scidbst",f="function"), .apply.scidbst.fun)


# if(!isGeneric("rexec.scidb")) {
# setGeneric("rexec.scidb",function(x,f,...){
# standardGeneric("rexec.scidb")
# })
# }

#' @rdname r-apply-scidbst-method
#' @export
setMethod("r.apply",signature(x="scidb",f="function"), function(x,f,array,packages,parallel=FALSE,cores=1,aggregates=c(),output, logfile, ...) {
Expand Down
87 changes: 87 additions & 0 deletions inst/doc/interfacing_R_EXEC_on_SciDB.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
## ----connecting----------------------------------------------------------
library(scidbst)
library(rgdal)
source("/home/lahn/GitHub/scidbst/vignettes/assets/credentials2.R")
scidbconnect(host=host,port=port,user=user,password=password,protocol="https",auth_type = "digest")

## ----creating_demo_array-------------------------------------------------
EPSG = make_EPSG()
srid = 4326
proj4args = as.character(subset(EPSG,code==4326)$prj4)

aff = matrix(c(34.5,8.0, 0.01,0, 0,-0.01),nrow=2,ncol=3)
ex = extent(34.5,35.5,7,8)
tex = textent(as.POSIXlt("2003-07-28"),as.POSIXlt("2003-11-05"))
srs = SRS(proj4args,c("y","x"))
srs@authority = "EPSG"
srs@srid = as.integer(srid)
srs@srtext = showWKT(proj4args)

trs = TRS(dimension="t",t0="2003-07-28",tres=1,tunit="days")

sizex = 100
sizey = 100
sizet = 100

# random array
gr = expand.grid(0:(sizex-1),0:(sizey-1),0:(sizet-1))
gr = cbind(gr,runif(sizex*sizey*sizet,1,100))
colnames(gr) = c("y","x","t","val")

array = as.scidb(X=gr,name="tmp_arr",chunkSize=nrow(gr))
schema = sprintf("<val:double NULL> [y=0:%i,20,0,x=0:%i,20,0,t=0:%i,100,0]",(sizey-1),(sizex-1),(sizet-1))

st.arr = new("scidbst")
st.arr@title="random_st"
st.arr@srs = srs
st.arr@extent = ex
st.arr@trs = trs
st.arr@tExtent = tex
st.arr@affine = aff
st.arr@isSpatial = TRUE
st.arr@isTemporal = TRUE
st.arr@proxy = redimension(array,schema=schema)

st.arr = scidbsteval(st.arr,"random_st")
scidbrm("tmp_arr",force=T)

## ------------------------------------------------------------------------
arr.prep = transform(st.arr, dimx="double(x)",dimy="double(y)", dimt="double(t)")
arr.prep = scidbsteval(arr.prep,name="random_st_prep")
arr.prep@proxy

scidbrm("random_st",force=T)

## ------------------------------------------------------------------------
f <- function(x) {
if (is.null(x)) {
return(c(nt=0,var=0,median=0,mean=0))
}
t = x$dimt
n = x$val

return(c(nt=length(t),var=var(n),median=median(n),mean=mean(n)))
}

## ------------------------------------------------------------------------
aggregates=c("dimy","dimx")

## ------------------------------------------------------------------------
output=list(dimy="double",dimx="double",nt="double",var="double",median="double",mean="double")

## ------------------------------------------------------------------------
dim=list(dimy="y",dimx="x")
dim.spec=list(y=c(min=0,max=99,chunk=20,overlap=0),x=c(min=0,max=99,chunk=20,overlap=0))

## ------------------------------------------------------------------------
rexec.arr = r.apply(x=arr.prep,
f=f,
array="rexec_applied",
aggregates=aggregates,
output = output,
dim=dim,
dim.spec=dim.spec)

## ------------------------------------------------------------------------
rexec.arr

152 changes: 152 additions & 0 deletions inst/doc/interfacing_R_EXEC_on_SciDB.Rmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
---
title: "Interfacing R_EXEC on SciDB"
output: rmarkdown::html_vignette
vignette: >
%\VignetteEngine{knitr::rmarkdown}
%\VignetteEncoding{UTF-8}
%\VignetteIndexEntry{Interfacing the scidbst package for using R_EXEC on SciDB}
---

## Introduction
SciDB offers a multitude of functions to store, manipulate and obtain array-based data. Following the Big-Data analysis trend to open up infrastructures to process data at its source, SciDB offers two methods to perform analysis or manipulations besides there provided functions. The first method is provided by the scidb extension for R_EXEC, where r scripts can be executed on each instance of the scidb cluster using the R_EXEC server interface. The second option is the Hadoop aligned Streaming interface, where you can run arbitrary command line commands at each instance in order to processes the data chunks.
In this document we will cover the interaction possibilities of the 'scidbst' package towards the execution of custom R functions on data chunks using R_EXEC.

## Requirements
1. Make sure that you have installed and running SciDB + Shim setup
2. install and load the [scidb4geo extension](https://github.com/appelmar/scidb4geo), the [stream extension](https://github.com/Paradigm4/stream) and the [r_exec extension] (https://github.com/Paradigm4/r_exec)

## General workflow of r.apply
r.apply is a quite comprehensive and powerful tool to analyze spatio-temporal arrays in scidb. The general workflow can be categorized as follows:
1. Preparation of the R-Script and execute the r_exec operation passed as argument to the r_exec operation
2. Redimensioning, if the dimension schema is stated or if it can be derived
3. storing the spatial and/or temporal references

### R-Script
The R-script is created dynamically during the r.apply call based on a template. The template is structured to first install required R-packages on each SciDB instance. Then the stated function is written into the the script and the input data vectors are bound to a data.frame. We further use 'ddply' to group the data by certain attributes (*aggregates*) and apply the stated function on each of those grouped sub data sets. After the ddply function has run the output needs to be created in the form the user described (*output*).

Since r_exec only operates on the attributes it is necessary to project the dimensions into attributes if they are necessary in the calculation.

### Redimensioning
The array ouput after the execution of the R-script will be a 1-dimensional array with the specified output attributes. By stating stating the dimensions and optionally renaming them (*dim*) the redimension operation will be conducted. To do that the dimension schema is also needed. If the new dimension names are the same as in the input array, that schema will be used. Otherwise the user needs to state the schema or the attributes will be simply renamed. In the latter case there will be no redimensioning.

## Demonstration
In this chapter we will briefly introduce the r.apply function of the scidbst package and its internal process. We will create and store an spatio-temporal array with random values and we will apply a custom r function on the data that was grouped by space.

### Connecting and creating Example Data
```{r connecting}
library(scidbst)
library(rgdal)
source("/home/lahn/GitHub/scidbst/vignettes/assets/credentials2.R")
scidbconnect(host=host,port=port,user=user,password=password,protocol="https",auth_type = "digest")
```

In the following example we will create a spatio-temporal array with random values for a size 100 values for each dimension. We will also give it an arbitrary spatial and temporal reference.

```{r creating_demo_array}
EPSG = make_EPSG()
srid = 4326
proj4args = as.character(subset(EPSG,code==4326)$prj4)
aff = matrix(c(34.5,8.0, 0.01,0, 0,-0.01),nrow=2,ncol=3)
ex = extent(34.5,35.5,7,8)
tex = textent(as.POSIXlt("2003-07-28"),as.POSIXlt("2003-11-05"))
srs = SRS(proj4args,c("y","x"))
srs@authority = "EPSG"
srs@srid = as.integer(srid)
srs@srtext = showWKT(proj4args)
trs = TRS(dimension="t",t0="2003-07-28",tres=1,tunit="days")
sizex = 100
sizey = 100
sizet = 100
# random array
gr = expand.grid(0:(sizex-1),0:(sizey-1),0:(sizet-1))
gr = cbind(gr,runif(sizex*sizey*sizet,1,100))
colnames(gr) = c("y","x","t","val")
array = as.scidb(X=gr,name="tmp_arr",chunkSize=nrow(gr))
schema = sprintf("<val:double NULL> [y=0:%i,20,0,x=0:%i,20,0,t=0:%i,100,0]",(sizey-1),(sizex-1),(sizet-1))
st.arr = new("scidbst")
st.arr@title="random_st"
st.arr@srs = srs
st.arr@extent = ex
st.arr@trs = trs
st.arr@tExtent = tex
st.arr@affine = aff
st.arr@isSpatial = TRUE
st.arr@isTemporal = TRUE
st.arr@proxy = redimension(array,schema=schema)
st.arr = scidbsteval(st.arr,"random_st")
scidbrm("tmp_arr",force=T)
```

### Preparing the array for R_EXEC
An important aspect when using R_EXEC is the fact that only the chunkwise attribute values are used as input for the R script. If we want to use the dimensions in the calculation, then the array needs to be prepared to have the dimensions as attributes. However, this will temporarily remove the spatial and temporal references from the scidbst array. The user needs to specify the dimension explicitly later and in case there are matching names of the dimension, the function restores the references for those.

```{r}
arr.prep = transform(st.arr, dimx="double(x)",dimy="double(y)", dimt="double(t)")
arr.prep = scidbsteval(arr.prep,name="random_st_prep")
arr.prep@proxy
scidbrm("random_st",force=T)
```
At this point we have created an array that has its dimensions additionally as attributes with new names.

### Preparing the parameter
The next step is most important, because it will decide whether the processing will be successful. The 'output' parameter is directly related to the data that is the result of the applied function 'f'. This means new attributes must be stated here as well as potential dimensions. For example, the following function will be applied during the r_exec call within the function 'ddply' of the package 'plyr'. ddply will be used to aggregate the data chunk, e.g. partioning the data set by space or time. Grouping by space would mean that the rows in the data set with the same spatial coordinate will be partioned and on each partition in the chunk the function 'f' will be applied.

```{r}
f <- function(x) {
if (is.null(x)) {
return(c(nt=0,var=0,median=0,mean=0))
}
t = x$dimt
n = x$val
return(c(nt=length(t),var=var(n),median=median(n),mean=mean(n)))
}
```

```{r}
aggregates=c("dimy","dimx")
```

As mentioned the 'output' parameter relates to the data output of the r script after the function with ddply was executed. Since we will aggregate by space, meaning that we will process time series in our function, the output will not have the former temporal dimension "dimt". But function 'f' returns a named vector and those names and their data types must be added in the same order as in the function output. Internally the r.apply function will add the results of the function into a data.frame object consisting of the used aggregate attributes and the function output attributes (here: "dimy","dimx", "nt", "var", "median", "mean").

```{r}
output=list(dimy="double",dimx="double",nt="double",var="double",median="double",mean="double")
```

In principal this would be enough to run a successful r.apply query, but the result will be a 1-dimensional array containing the stated output attributes. If you want to run an additional redimensioning on the data, then the dimensions need to be named and specified.

```{r}
dim=list(dimy="y",dimx="x")
dim.spec=list(y=c(min=0,max=99,chunk=20,overlap=0),x=c(min=0,max=99,chunk=20,overlap=0))
```

The parameter 'dim' expects a named list of the dimensions. The keys of the list refer to the attribute names for the dimensions after function 'f' was executed on each chunk. As values use either NULL to keep the name or use a character string for new dimension name. The use of "dim" as a parameter will run scidb's 'redimension' operation on the 1-dimensional array in order to create dimensions from attributes. To perform the 'redimension' operation we need to state the new dimension schema manually. Therefore you need to state the start/end/chunksizes/overlap parameter for each of the dimensions.

After the redimension call r.apply tries to naively assume that the spatial and temporal references, as well as the extents, are the same as from the input, when the dimension names match after the redimension. Only in this case a scidbst array will be returned, otherwise it will be a scidb array, where users have to set the required parameter by themselves.

### Running R_EXEC

```{r}
rexec.arr = r.apply(x=arr.prep,
f=f,
array="rexec_applied",
aggregates=aggregates,
output = output,
dim=dim,
dim.spec=dim.spec)
```
In this example we will run a minimalisitc example, meaning that we will not make use of ddply's parallelization capabilities (parallel = FALSE and cores=1) and without logging. Also the function we stated did not use any additional R packages, that is why the parameter 'packages' is also missing.

### Result
```{r}
rexec.arr
```

Loading

0 comments on commit 568e75c

Please sign in to comment.