Skip to content

Commit

Permalink
refacto: Base streaming module, use it for fs2
Browse files Browse the repository at this point in the history
  • Loading branch information
Clark Andrianasolo committed Jun 21, 2023
1 parent 4173d09 commit 617e15b
Show file tree
Hide file tree
Showing 14 changed files with 110 additions and 64 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ It allows to compute indicators on a stream of data, which is a very relevant us
- [talib-core](/lib/talib-core/) provides the shared TALIB friendlier interfaces for indicator inputs and outputs. This level enforces memory-safe operations and accurate computations for each [indicator](https://ta-lib.org/function.html). It makes use of standard data-structures (e.g. `Array[Double]`) that satisfy the needs of higher-level modules.
The [talib-core-tests](/lib/talib-core-tests/) module provides tests for the core module, to guarantee the accuracy of the computations for each indicator, on any platform.

- [talib-fs2](/lib/talib-fs2/) provides streaming adapters for the [fs2](https://fs2.io/) library. It implements the essential streaming operations for the computation of indicators by streamig the indicator inputs and the indicator outputs. This is where the inputs is buffered according to the indicator computation needs : most indicators will need to buffer past values in order to output more accurate results on current values.
The [talib-fs2-tests](/lib/talib-fs2-tests/) module provides tests for the fs2 module, to test the accuracy of the computations for each indicator, on any platform. The measured accuracy is compared to the accuracy of the batch computation of the indicator, which is the reference for the accuracy of the indicator.
- [talib-streams](/lib/talib-streams/) provides streaming definitions. It contain the main definitions of the streaming operations for computating indicators in a streaming fashion. This is where the inputs is buffered according to the indicator computation needs : most indicators will need to buffer past values in order to output more accurate results on current values. It consists of buffering know-how and heuristics for streaming each of the indicators. This module serves as a basis for the streaming adapters modules that are detailed below.

- [talib-streams-fs2](/lib/talib-fs2/) provides streaming adapters for the [fs2](https://fs2.io/) library using the _fs2_ API.
The [talib-streams-fs2-tests](/lib/talib-fs2-tests/) module provides tests for the fs2 module, to test the accuracy of the computations for each indicator, on any platform. The measured accuracy is compared to the accuracy of the batch computation of the indicator, which is the reference for the accuracy of the indicator.

- [signals](/lib/signals/) provides data types for type-safety when manipulating indicator inputs and outputs. Each indicator has its associated signal type, which simply contains as many output values as the indicator has outputs. It introduces concepts from trading signals, such as 'ohlc' (open-high-low-close), for which the streaming modules may provide helpers to compute indicators on.

Expand Down
24 changes: 16 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ lazy val root = tlCrossRootProject
talibCore.native,
talibCoreTests.native,
signalsLib.native,
talibStreamFs2.native,
talibStreamsFs2.native,
tradingDomain.native,
tradingPersistenceSkunk.native,
exampleSkunkApp.native
Expand Down Expand Up @@ -172,22 +172,30 @@ lazy val signalsLib = crossProject(JVMPlatform, NativePlatform)
.settings(circeDependencies)
.nativeSettings(commonNativeSettings)

lazy val talibStreamFs2 = crossProject(JVMPlatform, NativePlatform)
lazy val talibStreams = crossProject(JVMPlatform, NativePlatform)
.crossType(CrossType.Full)
.in(file("lib/talib-streams"))
.settings(moduleName := "talib-streams", name := "Talib streams")
.settings(sharedSettings)
.dependsOn(talibCore, signalsLib)
.nativeSettings(commonNativeSettings)

lazy val talibStreamsFs2 = crossProject(JVMPlatform, NativePlatform)
.crossType(CrossType.Full)
.in(file("lib/talib-fs2"))
.settings(moduleName := "talib-streams-fs2", name := "Talib streams fs2")
.settings(sharedSettings)
.settings(fs2Dependencies)
.dependsOn(talibCore, signalsLib)
.nativeSettings(commonNativeSettings, testingNativeSettings)
.dependsOn(talibCore, signalsLib, talibStreams)
.nativeSettings(commonNativeSettings)

lazy val talibStreamFs2Tests = crossProject(JVMPlatform, NativePlatform)
lazy val talibStreamsFs2Tests = crossProject(JVMPlatform, NativePlatform)
.crossType(CrossType.Full)
.in(file("lib/talib-fs2-tests"))
.settings(moduleName := "talib-streams-fs2-tests", name := "Talib streams fs2 tests")
.settings(sharedSettings)
.settings(fs2Dependencies, testingJUnitSettings)
.dependsOn(talibStreamFs2)
.dependsOn(talibStreamsFs2)
.nativeSettings(commonNativeSettings, testingNativeSettings)


Expand All @@ -206,7 +214,7 @@ lazy val tradingPersistenceInMemory = crossProject(JVMPlatform, NativePlatform)
.settings(moduleName := "trading-persistence-in-memory", name := "Trading persistence in memory")
.settings(sharedSettings)
.settings(loggingDependencies, catsEffectOverrideDependencies)
.dependsOn(talibCore, talibStreamFs2, tradingDomain)
.dependsOn(talibCore, talibStreamsFs2, tradingDomain)
.nativeSettings(commonNativeSettings)

lazy val tradingPersistenceSkunk = crossProject(JVMPlatform, NativePlatform)
Expand All @@ -215,7 +223,7 @@ lazy val tradingPersistenceSkunk = crossProject(JVMPlatform, NativePlatform)
.settings(moduleName := "trading-persistence-skunk", name := "Trading persistence skunk")
.settings(sharedSettings)
.settings(skunkDependencies, loggingDependencies, catsEffectOverrideDependencies)
.dependsOn(talibCore, talibStreamFs2, tradingDomain)
.dependsOn(talibCore, talibStreamsFs2, tradingDomain)
.nativeSettings(commonNativeSettings)
.enablePlugins(NoPublishPlugin)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import fs2.{Pipe, Stream}

import io.clarktsiory.signals.*
import io.clarktsiory.ta.*
import io.clarktsiory.ta.fs2.*
import io.clarktsiory.ta.streams.fs2.*

class IndicatorInMemStreamingService[F[_]: Async](
topic: Topic[F, (Symbol, Signal)],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import skunk.implicits.*

import io.clarktsiory.signals.*
import io.clarktsiory.ta.*
import io.clarktsiory.ta.fs2.*
import io.clarktsiory.ta.streams.fs2.*

class IndicatorSkunkStreamingService[F[_]: Async](
signalService: SignalStreamingService[[X] =>> Stream[F, X]],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.clarktsiory.ta.fs2
package io.clarktsiory.ta.streams.fs2

import java.time.LocalDateTime

Expand All @@ -9,6 +9,7 @@ import org.junit.Test

import io.clarktsiory.signals.*
import io.clarktsiory.ta.given
import io.clarktsiory.ta.streams.BufferedIndicator
import io.clarktsiory.ta.{ComputedIndicator, Indicator}

class IndicatorStreamingFS2Test {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.clarktsiory.ta.fs2
package io.clarktsiory.ta.streams.fs2

import scala.scalanative.unsafe.Zone
import java.time.LocalDateTime
Expand All @@ -11,6 +11,7 @@ import org.junit.{AfterClass, Test}

import io.clarktsiory.signals.*
import io.clarktsiory.ta.given
import io.clarktsiory.ta.streams.BufferedIndicator
import io.clarktsiory.ta.{ComputedIndicator, Indicator}

class IndicatorStreamingFS2Test {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package io.clarktsiory.ta.fs2
package io.clarktsiory.ta.streams.fs2

import fs2.{Pipe, Scan}

import io.clarktsiory.signals.*
import io.clarktsiory.ta.Indicator
import io.clarktsiory.ta.given
import io.clarktsiory.ta.streams.*

given BufferedIndicator[Indicator.RSI] = new RSIBufferedIndicator with MutableSignalBuffer
given BufferedIndicator[Indicator.MACD] = new MACDBufferedIndicator with MutableSignalBuffer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.clarktsiory.ta.fs2
package io.clarktsiory.ta.streams.fs2

import scala.collection.immutable.Queue
import scala.collection.mutable.ListBuffer
Expand All @@ -12,8 +12,9 @@ import cats.syntax.option.*
import fs2.{Chunk, Pipe, Scan, Stream}

import io.clarktsiory.signals.*
import io.clarktsiory.ta.fs2.given
import io.clarktsiory.ta.given
import io.clarktsiory.ta.streams.*
import io.clarktsiory.ta.streams.fs2.given
import io.clarktsiory.ta.{ComputedIndicator, Indicator, IndicatorSig}
import impl.*

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package io.clarktsiory.ta.fs2
package io.clarktsiory.ta.streams.fs2

import cats.data.State
import cats.syntax.traverse.*
import fs2.{Chunk, Scan}

import io.clarktsiory.signals.{MACDSignal, ScalarSignal}
import io.clarktsiory.ta.fs2.MACDChunksState.copyBuffer
import io.clarktsiory.ta.streams.BufferedIndicator
import io.clarktsiory.ta.streams.fs2.MACDChunksState.copyBuffer
import io.clarktsiory.ta.{ComputedIndicator, Indicator}

/** Stateful computation of MACD using a buffer that keeps previous signal values.
Expand Down Expand Up @@ -72,7 +73,7 @@ object MACDChunksState:
def empty(macd: Indicator.MACD)(using b: BufferedIndicator[Indicator.MACD]): MACDChunksState =
MACDChunksState(macd, b)(b.emptyBuffer(), 0)

def scan(macd: Indicator.MACD)(using
private[fs2] def scan(macd: Indicator.MACD)(using
BufferedIndicator[Indicator.MACD],
ComputedIndicator[Indicator.MACD, Array[
Double
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.clarktsiory.ta.fs2
package io.clarktsiory.ta.streams.fs2

import fs2.Stream

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package io.clarktsiory.ta.fs2
package io.clarktsiory.ta.streams.fs2

import fs2.{Chunk, Scan}

import io.clarktsiory.signals.{RSISignal, ScalarSignal}
import io.clarktsiory.ta.fs2.RSIChunksState.copyBuffer
import io.clarktsiory.ta.streams.BufferedIndicator
import io.clarktsiory.ta.streams.fs2.RSIChunksState.copyBuffer
import io.clarktsiory.ta.{ComputedIndicator, Indicator}

/** Stateful computation of RSI using a buffer that keeps previous signal values.
Expand Down Expand Up @@ -63,7 +64,7 @@ object RSIChunksState:
def empty(rsi: Indicator.RSI)(using b: BufferedIndicator[Indicator.RSI]): RSIChunksState =
RSIChunksState(rsi, b)(b.emptyBuffer(), 0)

def scan(rsi: Indicator.RSI)(using
private[fs2] def scan(rsi: Indicator.RSI)(using
BufferedIndicator[Indicator.RSI],
ComputedIndicator[Indicator.RSI, Array[Double], Array[Double]],
): Scan[RSIChunksState, ScalarSignal, RSISignal] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.clarktsiory.ta.streams
import scala.collection.mutable.ListBuffer

import io.clarktsiory.signals.ScalarSignal
import io.clarktsiory.ta.Indicator
import io.clarktsiory.ta.Indicator.*

/**
* Type-class that encodes buffering needs according to the indicator computation properties.
* Most indicators will need to buffer past values in order to output more accurate results on current values.
* @tparam A the indicator type
*/
trait BufferedIndicator[A <: Indicator]:
/**
* Defines the max number of periods that need to be buffered.
* If the value is static, it may influence the precision of computations.
* If the value is 0 or purely depends on the indicator parameters, it means that the indicator computation should
* always be deterministic and computation is always fundamentally accurate (e.g. the mean of _n_ values).
*/
def bufferSize(indicator: A): Int

/**
* Defines the min number of periods that yield a computation result (e.g. _n_ for the indicator that is mean of the
* _n_ latest values).
*/
def minComputationSize(indicator: A): Int

/**
* The type of the buffer used to store past values. It allows buffer manipulation with a pre-defined number of
* operations through the extension methods.
* It should always be an opaque type, to avoid accidental buffer manipulation apart from the extension methods.
*/
type SignalBuffer

/**
* @return An empty buffer than can be further manipulated with the extension methods.
*/
def emptyBuffer(): SignalBuffer

extension (buffer: SignalBuffer)
def added(value: ScalarSignal): SignalBuffer
def dropped(n: Int): SignalBuffer
def takeRight(n: Int): SignalBuffer
def size: Int
def toArray: Array[ScalarSignal]
end extension
end BufferedIndicator

/**
* Partial implementation of the BufferedIndicator type-class for MACD, using known properties of the indicator, and
* a heuristic for the buffer size.
*/
abstract private[streams] class MACDBufferedIndicator extends BufferedIndicator[MACD]:
override def bufferSize(macd: MACD): Int = 1_000
override def minComputationSize(macd: MACD): Int = macd.slow + macd.signalPeriod - 1
end MACDBufferedIndicator

/**
* Partial implementation of the BufferedIndicator type-class for RSI, using known properties of the indicator, and
* a heuristic for the buffer size.
*/
abstract private[streams] class RSIBufferedIndicator extends BufferedIndicator[RSI]:
override def bufferSize(rsi: Indicator.RSI): Int = 500
override def minComputationSize(rsi: RSI): Int = rsi.timeperiod + 1
end RSIBufferedIndicator

object BufferedIndicator:
def apply[A <: Indicator](using BufferedIndicator[A]): BufferedIndicator[A] = summon
end BufferedIndicator
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.clarktsiory.ta.fs2
package io.clarktsiory.ta.streams

import scala.collection.mutable.ListBuffer

Expand Down

0 comments on commit 617e15b

Please sign in to comment.