Skip to content

Commit

Permalink
Merge branch 'develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinavk96 authored Jul 7, 2023
2 parents 134140a + 22305fa commit 1725c04
Show file tree
Hide file tree
Showing 117 changed files with 4,732 additions and 1,439 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/runtests.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/bin/sh
wget -q -O - https://www.apache.org/dist/cassandra/KEYS | sudo apt-key add -
sudo sh -c 'echo "deb http://www.apache.org/dist/cassandra/debian 40x main" > /etc/apt/sources.list.d/cassandra.list'
set -e
wget -q -O - https://archive.apache.org/dist/cassandra/KEYS | sudo apt-key add -
sudo sh -c 'echo "deb http://archive.apache.org/dist/cassandra/debian 40x main" > /etc/apt/sources.list.d/cassandra.list'
sudo apt update
sudo apt install cassandra
set +e
sbt coverage test coverageAggregate

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ sealed class PartitionKeysByUpdateTimeTable(val dataset: DatasetRef,
def scanPartKeys(shard: Int, updateHour: Long, split: Int): Observable[PartKeyRecord] = {
session.executeAsync(readCql.bind(shard: JInt, updateHour: JLong, split: JInt))
.toObservable.handleObservableErrors
.map(PartitionKeysTable.rowToPartKeyRecord)
.map(r => PartitionKeysTable.rowToPartKeyRecord(r, shard))
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ sealed class PartitionKeysTable(val dataset: DatasetRef,

private lazy val deleteCql = session.prepare(
s"DELETE FROM $tableString " +
s"WHERE partKey = ?"
).setIdempotent(true)
s"WHERE partKey = ?")
.setIdempotent(true)
.setConsistencyLevel(writeConsistencyLevel)

def writePartKey(pk: PartKeyRecord, diskTimeToLiveSeconds: Long): Future[Response] = {
if (diskTimeToLiveSeconds <= 0) {
Expand All @@ -99,7 +100,7 @@ sealed class PartitionKeysTable(val dataset: DatasetRef,
.mapParallelUnordered(scanParallelism) { range =>
val fut = session.executeAsync(scanCql.bind(range._1.toLong: JLong, range._2.toLong: JLong))
.toIterator.handleErrors
.map { rowIt => rowIt.map(PartitionKeysTable.rowToPartKeyRecord) }
.map { rowIt => rowIt.map(r => PartitionKeysTable.rowToPartKeyRecord(r, shard)) }
Task.fromFuture(fut)
}
for {
Expand Down Expand Up @@ -174,7 +175,7 @@ sealed class PartitionKeysTable(val dataset: DatasetRef,
startTimeLTE: java.lang.Long,
endTimeGTE: java.lang.Long,
endTimeLTE: java.lang.Long)
session.execute(stmt).iterator.asScala.map(PartitionKeysTable.rowToPartKeyRecord)
session.execute(stmt).iterator.asScala.map(r => PartitionKeysTable.rowToPartKeyRecord(r, shard))
}

/**
Expand All @@ -186,7 +187,7 @@ sealed class PartitionKeysTable(val dataset: DatasetRef,
def readPartKey(pk: Array[Byte]) : Option[PartKeyRecord] = {
val iterator = session.execute(readCql.bind().setBytes(0, toBuffer(pk))).iterator()
if (iterator.hasNext) {
Some(PartitionKeysTable.rowToPartKeyRecord(iterator.next()))
Some(PartitionKeysTable.rowToPartKeyRecord(iterator.next(), shard))
} else {
None
}
Expand All @@ -206,8 +207,8 @@ sealed class PartitionKeysTable(val dataset: DatasetRef,
}

object PartitionKeysTable {
private[columnstore] def rowToPartKeyRecord(row: Row) = {
private[columnstore] def rowToPartKeyRecord(row: Row, shard: Int) = {
PartKeyRecord(row.getBytes("partKey").array(),
row.getLong("startTime"), row.getLong("endTime"), None)
row.getLong("startTime"), row.getLong("endTime"), shard)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package filodb.cassandra.columnstore

import java.lang.{Integer => JInt, Long => JLong}

import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters.asScalaIteratorConverter

import com.datastax.driver.core.{ConsistencyLevel, Row}
import monix.eval.Task
import monix.reactive.Observable

import filodb.cassandra.FiloCassandraConnector
import filodb.core.{DatasetRef, Response}
import filodb.core.store.PartKeyRecord

sealed class PartitionKeysV2Table(val dataset: DatasetRef,
val connector: FiloCassandraConnector,
writeConsistencyLevel: ConsistencyLevel,
readConsistencyLevel: ConsistencyLevel)
(implicit ec: ExecutionContext) extends BaseDatasetTable {

import filodb.cassandra.Util._

val suffix = s"partitionkeysv2"

val createCql =
s"""CREATE TABLE IF NOT EXISTS $tableString (
| shard int,
| bucket int,
| partKey blob,
| startTime bigint,
| endTime bigint,
| PRIMARY KEY ((shard, bucket), partKey)
|) WITH compression = {'chunk_length_in_kb': '16', 'sstable_compression': '$sstableCompression'}""".stripMargin

private lazy val writePartitionCql = session.prepare(
s"INSERT INTO ${tableString} (shard, bucket, partKey, startTime, endTime) " +
s"VALUES (?, ?, ?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)

private lazy val writePartitionCqlNoTtl = session.prepare(
s"INSERT INTO ${tableString} (shard, bucket, partKey, startTime, endTime) " +
s"VALUES (?, ?, ?, ?, ?)")
.setConsistencyLevel(writeConsistencyLevel)

private lazy val scanCql = session.prepare(
s"SELECT partKey, startTime, endTime, shard FROM $tableString " +
s"WHERE shard = ? and bucket = ?"
).setConsistencyLevel(readConsistencyLevel)

private lazy val scanCqlForStartEndTime = session.prepare(
s"SELECT partKey, startTime, endTime, shard FROM $tableString " +
s"WHERE TOKEN(shard, bucket) >= ? AND TOKEN(shard, bucket) < ? AND " +
s"startTime >= ? AND startTime <= ? AND " +
s"endTime >= ? AND endTime <= ? " +
s"ALLOW FILTERING")
.setConsistencyLevel(readConsistencyLevel)

private lazy val scanCqlForStartTime = session.prepare(
s"SELECT partKey, startTime, endTime, shard FROM $tableString " +
s"WHERE TOKEN(shard, bucket) >= ? AND TOKEN(shard, bucket) < ? AND startTime >= ? AND startTime <= ? " +
s"ALLOW FILTERING")
.setConsistencyLevel(readConsistencyLevel)

private lazy val scanCqlForEndTime = session.prepare(
s"SELECT partKey, startTime, endTime, shard FROM $tableString " +
s"WHERE TOKEN(shard, bucket) >= ? AND TOKEN(shard, bucket) < ? AND endTime >= ? AND endTime <= ? " +
s"ALLOW FILTERING")
.setConsistencyLevel(readConsistencyLevel)

private lazy val readCql = session.prepare(
s"SELECT partKey, startTime, endTime, shard FROM $tableString " +
s"WHERE shard = ? and bucket = ? and partKey = ?")
.setConsistencyLevel(readConsistencyLevel)

private lazy val deleteCql = session.prepare(
s"DELETE FROM $tableString " +
s"WHERE shard = ? and bucket = ? and partKey = ?"
).setConsistencyLevel(writeConsistencyLevel)

def writePartKey(bucket: Int, pk: PartKeyRecord, diskTimeToLiveSeconds: Long): Future[Response] = {
if (diskTimeToLiveSeconds <= 0) {
connector.execStmtWithRetries(writePartitionCqlNoTtl.bind(pk.shard: JInt, bucket: JInt,
toBuffer(pk.partKey), pk.startTime: JLong, pk.endTime: JLong))
} else {
connector.execStmtWithRetries(writePartitionCql.bind(pk.shard: JInt, bucket: JInt,
toBuffer(pk.partKey), pk.startTime: JLong, pk.endTime: JLong, diskTimeToLiveSeconds.toInt: JInt))
}
}

def scanPartKeys(shard: Int, scanParallelism: Int, numBuckets: Int): Observable[PartKeyRecord] = {

/*
TODO If this is slow, then another option to evaluate is to do token scan with shard filter.
SELECT partKey, startTime, endTime, shard FROM $tableString " +
s"WHERE TOKEN(shard, bucket) >= ? AND TOKEN(shard, bucket) < ? AND " +
s"shard = ? ALLOW FILTERING")
*/

val res: Observable[Iterator[PartKeyRecord]] = Observable.fromIterable(0 until numBuckets)
.mapParallelUnordered(scanParallelism) { bucket =>
val fut = session.executeAsync(scanCql.bind(shard: JInt, bucket: JInt))
.toIterator.handleErrors
.map { rowIt => rowIt.map(PartitionKeysV2Table.rowToPartKeyRecord) }
Task.fromFuture(fut)
}
for {
pkRecs <- res
pk <- Observable.fromIteratorUnsafe(pkRecs)
} yield pk
}

/**
* Method used by data repair jobs.
* Return PartitionKey rows where timeSeries startTime falls within the specified repair start/end window.
* (In other words - return timesSeries that were born during the data loss period)
* Rows consist of partKey, start and end time. Refer the CQL used below.
*/
def scanRowsByStartTimeRangeNoAsync(tokens: Seq[(String, String)],
repairStartTime: Long,
repairEndTime: Long): Set[Row] = {
tokens.iterator.flatMap { case (start, end) =>
/*
* FIXME conversion of tokens to Long works only for Murmur3Partitioner because it generates
* Long based tokens. If other partitioners are used, this can potentially break.
* Correct way is to pass Token objects around and bind tokens with stmt.bind().setPartitionKeyToken(token)
*/
val startTimeStmt = scanCqlForStartTime.bind(start.toLong: java.lang.Long,
end.toLong: java.lang.Long,
repairStartTime: java.lang.Long,
repairEndTime: java.lang.Long)
session.execute(startTimeStmt).iterator.asScala
}
}.toSet

/**
* Method used by data repair jobs.
* Return PartitionKey rows where timeSeries endTime falls within the specified repair start/end window.
* (In other words - return timesSeries that died during the data loss period)
* Rows consist of partKey, start and end time. Refer the CQL used below.
*/
def scanRowsByEndTimeRangeNoAsync(tokens: Seq[(String, String)],
startTime: Long,
endTime: Long): Set[Row] = {
tokens.iterator.flatMap { case (start, end) =>
/*
* FIXME conversion of tokens to Long works only for Murmur3Partitioner because it generates
* Long based tokens. If other partitioners are used, this can potentially break.
* Correct way is to pass Token objects around and bind tokens with stmt.bind().setPartitionKeyToken(token)
*/
val endTimeStmt = scanCqlForEndTime.bind(start.toLong: java.lang.Long,
end.toLong: java.lang.Long,
startTime: java.lang.Long,
endTime: java.lang.Long)
session.execute(endTimeStmt).iterator.asScala
}
}.toSet

/**
* Method used by Cardinality Buster job.
* Return PartitionKeyRecord objects where timeSeries start and end Time falls within the specified window.
*/
def scanPksByStartEndTimeRangeNoAsync(split: (String, String),
startTimeGTE: Long,
startTimeLTE: Long,
endTimeGTE: Long,
endTimeLTE: Long): Iterator[PartKeyRecord] = {
/*
* FIXME conversion of tokens to Long works only for Murmur3Partitioner because it generates
* Long based tokens. If other partitioners are used, this can potentially break.
* Correct way is to pass Token objects around and bind tokens with stmt.bind().setPartitionKeyToken(token)
*/
val stmt = scanCqlForStartEndTime.bind(split._1.toLong: java.lang.Long,
split._2.toLong: java.lang.Long,
startTimeGTE: java.lang.Long,
startTimeLTE: java.lang.Long,
endTimeGTE: java.lang.Long,
endTimeLTE: java.lang.Long)
session.execute(stmt).iterator.asScala.map(PartitionKeysV2Table.rowToPartKeyRecord)
}

/**
* Returns PartKeyRecord for a given partKey bytes.
*
* @param pk partKey bytes
* @return Option[PartKeyRecord]
*/
def readPartKey(shard: Int, bucket: Int, pk: Array[Byte]) : Option[PartKeyRecord] = {
val iterator = session.execute(readCql.bind(shard: JInt, bucket: JInt, toBuffer(pk))).iterator()
if (iterator.hasNext) {
Some(PartitionKeysV2Table.rowToPartKeyRecord(iterator.next()))
} else {
None
}
}

/**
* Deletes PartKeyRecord for a given partKey bytes.
*
* @param pk partKey bytes
* @return Future[Response]
*/
def deletePartKeyNoAsync(shard: Int, bucket: Int, pk: Array[Byte]): Response = {
val stmt = deleteCql.bind(shard: JInt, bucket: JInt, toBuffer(pk)).setConsistencyLevel(writeConsistencyLevel)
connector.execCqlNoAsync(stmt)
}

}

object PartitionKeysV2Table {
private[columnstore] def rowToPartKeyRecord(row: Row) = {
PartKeyRecord(row.getBytes("partKey").array(),
row.getLong("startTime"), row.getLong("endTime"), row.getInt("shard"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package filodb.cassandra.columnstore

import java.lang.ref.Reference
import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._

import com.datastax.driver.core.Row
import com.typesafe.config.ConfigFactory
import monix.reactive.Observable

import filodb.cassandra.DefaultFiloSessionProvider
import filodb.cassandra.metastore.CassandraMetaStore
import filodb.core._
Expand All @@ -16,8 +19,8 @@ import filodb.core.store.{ChunkSet, ChunkSetInfo, ColumnStoreSpec, PartKeyRecord
import filodb.memory.{BinaryRegionLarge, NativeMemoryManager}
import filodb.memory.format.{TupleRowReader, UnsafeUtils}
import filodb.memory.format.ZeroCopyUTF8String._

import java.nio.charset.StandardCharsets
import java.util.UUID

class CassandraColumnStoreSpec extends ColumnStoreSpec {
import NamesTestData._
Expand Down Expand Up @@ -66,20 +69,22 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec {
colStore.initialize(dataset, 1).futureValue
colStore.truncate(dataset, 1).futureValue

val pks = (10000 to 30000).map(_.toString.getBytes(StandardCharsets.UTF_8))
.zipWithIndex.map { case (pk, i) => PartKeyRecord(pk, 5, 10, Some(i))}.toSet
// GOTCHA: these synthetic pks are not well-formed. We just use the bytes for Schemas to read some hash
val pks = (1 to 10000)
.map(_ => UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8))
.zipWithIndex.map { case (pk, i) => PartKeyRecord(pk, 5, 10, shard = 0)}.toSet

val updateHour = 10
colStore.writePartKeys(dataset, 0, Observable.fromIterable(pks), 1.hour.toSeconds.toInt, 10, true )
.futureValue shouldEqual Success

val expectedKeys = pks.map(pk => new String(pk.partKey, StandardCharsets.UTF_8).toInt)
val expectedKeys = pks.map(pk => new String(pk.partKey, StandardCharsets.UTF_8))

val readData = colStore.getPartKeysByUpdateHour(dataset, 0, updateHour).toListL.runToFuture.futureValue.toSet
readData.map(pk => new String(pk.partKey, StandardCharsets.UTF_8).toInt) shouldEqual expectedKeys
readData.map(pk => new String(pk.partKey, StandardCharsets.UTF_8)) shouldEqual expectedKeys

val readData2 = colStore.scanPartKeys(dataset, 0).toListL.runToFuture.futureValue.toSet
readData2.map(pk => new String(pk.partKey, StandardCharsets.UTF_8).toInt) shouldEqual expectedKeys
readData2.map(pk => new String(pk.partKey, StandardCharsets.UTF_8)) shouldEqual expectedKeys

readData2.map(_.partKey).foreach(pk => colStore.deletePartKeyNoAsync(dataset, 0, pk))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala
val chunks = part.makeFlushChunks(offheapMem.blockMemFactory)

colStore.write(dataset.ref, Observable.fromIteratorUnsafe(chunks)).futureValue
val pk = PartKeyRecord(gaugePartKeyBytes, firstSampleTime, firstSampleTime + numSamples, Some(150))
colStore.writePartKeys(dataset.ref, 0, Observable.now(pk), 259200, 34).futureValue
val pk = PartKeyRecord(gaugePartKeyBytes, firstSampleTime, firstSampleTime + numSamples, shard = 0)
colStore.writePartKeys(dataset.ref, shard = 0, Observable.now(pk), 259200, 34).futureValue
}

it ("should be able to do full ODP for non concurrent queries") {
Expand Down
12 changes: 7 additions & 5 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,10 @@ object CliMain extends StrictLogging {
printf("%40s %20s %20s %15s %15s\n", "Child", "TotalTimeSeries", "ActiveTimeSeries", "Children", "Children")
printf("%40s %20s %20s %15s %15s\n", "Name", "Count", "Count", "Count", "Quota")
println("==============================================================================================================================")
crs._2.sortBy(c => if (addInactive) c.tsCount else c.activeTsCount)(Ordering.Int.reverse)
crs._2.sortBy(c => if (addInactive) c.value.tsCount else c.value.activeTsCount)(Ordering.Int.reverse)
.foreach { cr =>
printf("%40s %20d %20d %15d %15d\n", cr.prefix, cr.tsCount,
cr.activeTsCount, cr.childrenCount, cr.childrenQuota)
printf("%40s %20d %20d %15d %15d\n", cr.prefix, cr.value.tsCount,
cr.value.activeTsCount, cr.value.childrenCount, cr.value.childrenQuota)
}
}

Expand Down Expand Up @@ -487,9 +487,10 @@ object CliMain extends StrictLogging {
case Some(intervalSecs) =>
val fut = Observable.intervalAtFixedRate(intervalSecs.seconds).foreach { n =>
client.logicalPlan2Query(ref, plan, qOpts) match {
case QueryResult(_, _, result, stats, _, _) =>
case QueryResult(_, _, result, stats, warnings, _, _) =>
result.take(options.limit).foreach(rv => println(rv.prettyPrint()))
println(s"QueryStats: $stats")
println(s"QueryWarnings: $warnings")
case err: QueryError => throw new ClientException(err)
}
}.recover {
Expand All @@ -500,11 +501,12 @@ object CliMain extends StrictLogging {
case None =>
try {
client.logicalPlan2Query(ref, plan, qOpts) match {
case QueryResult(_, schema, result, stats, _, _) =>
case QueryResult(_, schema, result, stats, warnings, _, _) =>
println(s"Output schema: $schema")
println(s"Number of Range Vectors: ${result.size}")
result.take(options.limit).foreach(rv => println(rv.prettyPrint()))
println(s"QueryStats: $stats")
println(s"QueryWarnings: $warnings")
case QueryError(_, stats, ex) =>
println(s"QueryError: ${ex.getClass.getSimpleName} ${ex.getMessage}")
println(s"QueryStats: $stats")
Expand Down
Loading

0 comments on commit 1725c04

Please sign in to comment.