Skip to content

Commit

Permalink
eval: reduce encoding overhead (#1709)
Browse files Browse the repository at this point in the history
Update so that the set of subscriptions sent to the LWC API
servers are encoded before sending into the cluster grouping
operator. This means the encoding will only happen once per
cluster rather than once per node.
  • Loading branch information
brharrington authored Oct 18, 2024
1 parent 6a1f1db commit d661d81
Showing 1 changed file with 7 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,8 @@ private[stream] abstract class EvaluatorImpl(
// Cluster message first: need to connect before subscribe
val instances = sourcesAndGroups._2.groups.flatMap(_.instances).toSet
val exprs = toExprSet(sourcesAndGroups._1, context.interpreter)
val dataMap = instances.map(i => i -> exprs).toMap
val bytes = LwcMessages.encodeBatch(exprs.toSeq)
val dataMap = instances.map(i => i -> bytes).toMap
Source(
List(
ClusterOps.Cluster(instances),
Expand All @@ -496,7 +497,7 @@ private[stream] abstract class EvaluatorImpl(
// resent. This ensures the most recent set of subscriptions will go out at a
// regular cadence.
.via(StreamOps.repeatLastReceived(5.seconds))
.via(ClusterOps.groupBy(createGroupByContext(context)))
.via(ClusterOps.groupBy(createGroupByContext))
.mapAsync(parsingNumThreads) { msg =>
// This step is placed after merge of streams so there is a single
// use of the pool and it is easier to track the concurrent usage.
Expand All @@ -506,9 +507,7 @@ private[stream] abstract class EvaluatorImpl(
}
}

private def createGroupByContext(
context: StreamContext
): ClusterOps.GroupByContext[Instance, Set[LwcExpression], ByteString] = {
private def createGroupByContext: ClusterOps.GroupByContext[Instance, ByteString, ByteString] = {
ClusterOps.GroupByContext(
instance => createWebSocketFlow(instance),
registry,
Expand All @@ -528,16 +527,14 @@ private[stream] abstract class EvaluatorImpl(

private def createWebSocketFlow(
instance: EddaSource.Instance
): Flow[Set[LwcExpression], ByteString, NotUsed] = {
): Flow[ByteString, ByteString, NotUsed] = {
val base = instance.substitute("ws://{ip}:{port}")
val id = UUID.randomUUID().toString
val uri = s"$base/api/v$lwcapiVersion/subscribe/$id"
val webSocketFlowOrigin = Http(system).webSocketClientFlow(WebSocketRequest(uri))
Flow[Set[LwcExpression]]
Flow[ByteString]
.via(StreamOps.unique(uniqueTimeout)) // Updating subscriptions only if there's a change
.map { exprs =>
BinaryMessage(LwcMessages.encodeBatch(exprs.toSeq))
}
.map(BinaryMessage.apply)
.via(webSocketFlowOrigin)
.flatMapConcat {
case _: TextMessage =>
Expand Down

0 comments on commit d661d81

Please sign in to comment.