Skip to content

Commit

Permalink
Fix Cassandra attributeStore performance issues
Browse files Browse the repository at this point in the history
  • Loading branch information
pomadchin committed Apr 18, 2019
1 parent c5eb74e commit 3165d27
Showing 1 changed file with 35 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,21 @@ class CassandraAttributeStore(val instance: CassandraInstance, val attributeKeys
instance.ensureKeyspaceExists(attributeKeyspace, session)
session.execute(
SchemaBuilder.createTable(attributeKeyspace, attributeTable).ifNotExists()
.addPartitionKey("layerId", text)
.addPartitionKey("layerName", text)
.addClusteringColumn("layerZoom", cint)
.addClusteringColumn("name", text)
.addColumn("value", text)
)
}

val SEP = "__.__"

def layerIdString(layerId: LayerId): String =
s"${layerId.name}${SEP}${layerId.zoom}"

private def fetch(layerId: Option[LayerId], attributeName: String): ResultSet = instance.withSessionDo { session =>
val query =
layerId match {
case Some(id) =>
QueryBuilder.select.column("value")
.from(attributeKeyspace, attributeTable)
.where(eqs("layerId", layerIdString(id)))
.where(eqs("layerName", id.name))
.and(eqs("layerZoom", id.zoom))
.and(eqs("name", attributeName))
case None =>
QueryBuilder.select.column("value")
Expand All @@ -80,12 +77,14 @@ class CassandraAttributeStore(val instance: CassandraInstance, val attributeKeys
case Some(name) =>
QueryBuilder.delete()
.from(attributeKeyspace, attributeTable)
.where(eqs("layerId", layerIdString(layerId)))
.where(eqs("layerName", layerId.name))
.and(eqs("layerZoom", layerId.zoom))
.and(eqs("name", name))
case None =>
QueryBuilder.delete()
.from(attributeKeyspace, attributeTable)
.where(eqs("layerId", layerIdString(layerId)))
.where(eqs("layerName", layerId.name))
.and(eqs("layerZoom", layerId.zoom))
}

session.execute(query)
Expand All @@ -100,7 +99,8 @@ class CassandraAttributeStore(val instance: CassandraInstance, val attributeKeys
val query =
QueryBuilder.select.column("value")
.from(attributeKeyspace, attributeTable)
.where(eqs("layerId", layerIdString(layerId)))
.where(eqs("layerName", layerId.name))
.and(eqs("layerZoom", layerId.zoom))
.and(eqs("name", attributeName))

val values = session.execute(query)
Expand All @@ -117,7 +117,7 @@ class CassandraAttributeStore(val instance: CassandraInstance, val attributeKeys
}

def readAll[T: JsonFormat](attributeName: String): Map[LayerId, T] = instance.withSessionDo { session =>
val query = QueryBuilder.select("value")
val query = QueryBuilder.select.column("value")
.from(attributeKeyspace, attributeTable).allowFiltering()
.where(eqs("name", QueryBuilder.bindMarker()))

Expand All @@ -133,22 +133,24 @@ class CassandraAttributeStore(val instance: CassandraInstance, val attributeKeys
val update =
QueryBuilder.update(attributeKeyspace, attributeTable)
.`with`(set("value", (layerId, value).toJson.compactPrint))
.where(eqs("layerId", layerIdString(layerId)))
.where(eqs("layerName", layerId.name))
.and(eqs("layerZoom", layerId.zoom))
.and(eqs("name", attributeName))

session.execute(update)
}

def layerExists(layerId: LayerId): Boolean = instance.withSessionDo { session =>
val query =
QueryBuilder.select.column("layerId")
QueryBuilder.select("layerName", "layerZoom")
.from(attributeKeyspace, attributeTable)
.where(eqs("layerId", layerIdString(layerId)))
.where(eqs("layerName", layerId.name))
.and(eqs("layerZoom", layerId.zoom))
.and(eqs("name", AttributeStore.Fields.metadata))

session.execute(query).asScala.exists { key =>
val List(name, zoomStr) = key.getString("layerId").split(SEP).toList
layerId == LayerId(name, zoomStr.toInt)
val (name, zoom) = key.getString("layerName") -> key.getInt("layerZoom")
layerId == LayerId(name, zoom)
}
}

Expand All @@ -157,24 +159,32 @@ class CassandraAttributeStore(val instance: CassandraInstance, val attributeKeys
def delete(layerId: LayerId, attributeName: String): Unit = delete(layerId, Some(attributeName))

def layerIds: Seq[LayerId] = instance.withSessionDo { session =>
val query =
QueryBuilder.select.column("layerId")
.from(attributeKeyspace, attributeTable)
val query = QueryBuilder.select("layerName", "layerZoom").from(attributeKeyspace, attributeTable)

session.execute(query).asScala.map { key =>
val List(name, zoomStr) = key.getString("layerId").split(SEP).toList
LayerId(name, zoomStr.toInt)
val (name, zoom) = key.getString("layerName") -> key.getInt("layerZoom")
LayerId(name, zoom)
}
.toList
.distinct
.toList
.distinct
}

def availableAttributes(layerId: LayerId): Seq[String] = instance.withSessionDo { session =>
val query =
QueryBuilder.select("name")
QueryBuilder.select.column("name")
.from(attributeKeyspace, attributeTable)
.where(eqs("layerId", layerIdString(layerId)))
.where(eqs("layerName", layerId.name))
.and(eqs("layerZoom", layerId.zoom))

session.execute(query).asScala.map(_.getString("name")).toVector
}

override def availableZoomLevels(layerName: String): Seq[Int] = instance.withSessionDo { session =>
val query =
QueryBuilder.select.column("layerZoom")
.from(attributeKeyspace, attributeTable)
.where(eqs("layerName", layerName))

session.execute(query).asScala.map { _.getInt("layerZoom") }.toList.distinct
}
}

0 comments on commit 3165d27

Please sign in to comment.