Sometimes you may want to process the output in a way which is not provided by the framework, e.g. custom file format, or remote network endpoint, or just store it in memory. For that purpose you may implement the output as a function. It is as easy as call .out()
on the stream of any type:
stream.out { /* handle the samples your own way */ }
The argument of the function is of type io.wavebeans.lib.io.WriteFunctionArgument
, which has the following fields:
sampleClazz
(KClass<T>
) -- the class of the sample for convenience.sampleIndex
(Long
) -- the 0-based global index of the sample.sampleRate
(Float
) -- the sample rate the output is being evaluated with.sample
(T?
) -- the nullable depending on the phasephase
sample value.phase
(WriteFunctionPhase
) -- the phase of the writing routine:WRITE
,END
,CLOSE
. Phase describes where is the writer currently is:WRITE
tells that the writer is currently getting the input signal, and expect it to process. Thesample
field is nevernull
in this case.END
tells that the writer has reached the end of the input stream, but the writer has been called. May not be called in some cases (i.e. the writer's write function is stopped calling before the writer hit on the end of the stream, or the stream is endless), or be called more than once (in case that the writer's write function is called after the previous call returnedfalse
), but during regular execution is being called only once. Thesample
field isnull
in this case.CLOSE
tells that the writer is being closed. Thesample
field isnull
in this case.
The function expects to return the value of Boolean
type, that controls the output writer behavior:
- In the
WRITE
phase if the function returnstrue
the writer will continue processing the input, if it returnsfalse
the writer will stop processing, but anywayCLOSE
phase will be initiated. - It doesn't affect anything in other phases.
Here is some example writing into a shared memory storage, it writes the 1 second of 440Hz sine:
/**
* It's not a proper storage, just to provide an idea.
* It is an object to be able to use function as lambda.
*/
object Storage {
private val list = ArrayList<Sample>()
fun add(sample: Sample) { list += sample }
fun list(): List<Sample> = list
}
440.sine() // the stream is infinite, but we'll limit it in the output function
.out {
// write only samples within WRITE phase
if (it.phase == WRITE) Storage.add(it.sample!!)
// limit with one second of data
it.sampleIndex / it.sampleRate < 1.0f
}
Running in multi-threaded or distributed mode: by default outputs are evaluated as a single bean and are not parallelized, the function as an output is not exception. That means it is safe to say the output function may have some state in it, though it is not guaranteed that it will be launched in the very same thread every time. One more thing, if the stream is evaluated sequentially a few times in a row within the same process routine, the function is created only once, so the state should take this into account.