Skip to content

Commit

Permalink
Merge pull request #185 from smacker/features_table
Browse files Browse the repository at this point in the history
move feature frequencies to separate table
  • Loading branch information
smacker authored Jan 30, 2019
2 parents 33a1daf + 3ae28ba commit df2042e
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 40 deletions.
3 changes: 2 additions & 1 deletion src/main/resources/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ USE __KEYSPACE__;
CREATE TABLE IF NOT EXISTS __KEYSPACE__.meta (sha1 ascii, repo text, commit ascii, path text, PRIMARY KEY (sha1, repo, commit, path));
CREATE TABLE IF NOT EXISTS __KEYSPACE__.hashtables_file (sha1 text, hashtable tinyint, value blob, PRIMARY KEY (hashtable, value, sha1));
CREATE TABLE IF NOT EXISTS __KEYSPACE__.hashtables_func (sha1 text, hashtable tinyint, value blob, PRIMARY KEY (hashtable, value, sha1));
CREATE TABLE IF NOT EXISTS __KEYSPACE__.docfreq (id text, docs int, df map<text, int>, PRIMARY KEY (id));
CREATE TABLE IF NOT EXISTS __KEYSPACE__.features_docs (id text, docs int, PRIMARY KEY (id));
CREATE TABLE IF NOT EXISTS __KEYSPACE__.features_freq (id text, feature text, weight int, PRIMARY KEY (id, feature));
21 changes: 15 additions & 6 deletions src/main/scala/tech/sourced/gemini/Database.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,31 @@ import scala.collection.JavaConverters._

case class MetaCols(sha: String, repo: String, commit: String, path: String)
case class HashtablesCols(sha: String, hashtable: String, value: String)
case class DocFreqCols(id: String, docs: String, df: String)
case class FeaturesDocsCols(id: String, docs: String)
case class FeaturesFreqCols(id: String, feature: String, weight: String)

/**
* Tables is static typed definition of DB schema
*
* @param meta name of meta table
* @param hashtables name of hashtables table
* @param metaCols
* @param hashtablesCols
* @param hashtables prefix of hashtables table
* @param featuresDocs name of features documents table
* @param featuresFreq name of features frequencies table
* @param metaCols columns of meta table
* @param hashtablesCols columns of hashtables table
* @param featuresDocsCols columns of features documents table
* @param featuresFreqCols columns of features frequencies table
*/
case class Tables(meta: String,
hashtables: String,
docFreq: String,
featuresDocs: String,
featuresFreq: String,
metaCols: MetaCols,
hashtablesCols: HashtablesCols,
docFreqCols: DocFreqCols)
featuresDocsCols: FeaturesDocsCols,
featuresFreqCols: FeaturesFreqCols) {
def hashtables(mode: String): String = s"${hashtables}_$mode"
}

/**
* Database object contains common queries to DB
Expand Down
25 changes: 16 additions & 9 deletions src/main/scala/tech/sourced/gemini/FileQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,14 @@ class FileQuery(
case Gemini.funcSimilarityMode => FeaturesHash.funcParams
}

val hashtablesTable = s"${tables.hashtables}_${mode}"
val cols = tables.hashtablesCols
val wmh = hashFile(featuresList, docFreq, sampleSize)

val bands = FeaturesHash.wmhToBands(wmh, htnum, bandSize)

log.info("Looking for similar items")
val similar = bands.zipWithIndex.foldLeft(Set[String]()) { case (sim, (band, i)) =>
val cql = s"""SELECT ${cols.sha} FROM $keyspace.${hashtablesTable}
val cql = s"""SELECT ${cols.sha} FROM $keyspace.${tables.hashtables(mode)}
WHERE ${cols.hashtable}=$i AND ${cols.value}=0x${MathUtil.bytes2hex(band)}"""
log.debug(cql)

Expand Down Expand Up @@ -186,18 +185,26 @@ class FileQuery(

protected def readDocFreqFromDB(): Option[OrderedDocFreq] = {
log.info(s"Reading docFreq from DB")
val cols = tables.docFreqCols
val row = conn.execute(s"SELECT * FROM ${tables.docFreq} WHERE ${cols.id} = '${mode}'").one()
if (row == null) {
val docsCols = tables.featuresDocsCols
val freqCols = tables.featuresFreqCols
val docsRow = conn.execute(s"SELECT * FROM ${tables.featuresDocs} WHERE ${docsCols.id} = '$mode'").one()
if (docsRow == null) {
log.warn("Document frequency table is empty.")
None
} else {
val df = row
.getMap("df", classOf[java.lang.String], classOf[java.lang.Integer])
var tokens = IndexedSeq[String]()
val df = conn
.execute(s"SELECT * FROM ${tables.featuresFreq} WHERE ${freqCols.id} = '$mode' ORDER BY ${freqCols.feature}")
.asScala
.mapValues(_.toInt)
.map { row =>
// tokens have to be sorted, df.keys isn't sorted
val name = row.getString(freqCols.feature)
tokens = tokens :+ name

Some(OrderedDocFreq(row.getInt(cols.docs), df.keys.toIndexedSeq, df))
(name, row.getInt(freqCols.weight))
}.toMap

Some(OrderedDocFreq(docsRow.getInt(docsCols.docs), tokens, df))
}
}

Expand Down
24 changes: 16 additions & 8 deletions src/main/scala/tech/sourced/gemini/Gemini.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,28 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.
}

def isDBEmpty(session: Session, mode: String): Boolean = {
var row = session.execute(s"select count(*) from $keyspace.${tables.docFreq} where id='$mode' limit 1").one()
if (row.getLong(0) > 0) {
var row = session.execute(s"select * from $keyspace.${tables.featuresDocs} where id='$mode' limit 1").one()
if (row != null) {
return false
}

row = session.execute(s"select count(*) from $keyspace.${tables.hashtables}_$mode").one()
if (row.getLong(0) > 0) {
row = session.execute(s"select * from $keyspace.${tables.featuresFreq} where id='$mode' limit 1").one()
if (row != null) {
return false
}

row = session.execute(s"select * from $keyspace.${tables.hashtables}_$mode limit 1").one()
if (row != null) {
return false
}

true
}

def cleanDB(session: Session, mode: String): Unit = {
session.execute(s"delete from $keyspace.${tables.docFreq} where id='$mode'")
session.execute(s"truncate table $keyspace.${tables.hashtables}_$mode")
session.execute(s"delete from $keyspace.${tables.featuresDocs} where id='$mode'")
session.execute(s"delete from $keyspace.${tables.featuresFreq} where id='$mode'")
session.execute(s"truncate table $keyspace.${tables.hashtables(mode)}")
}

def applySchema(session: Session): Unit = {
Expand Down Expand Up @@ -198,10 +204,12 @@ object Gemini {
val tables = Tables(
"meta",
"hashtables",
"docfreq",
"features_docs",
"features_freq",
MetaCols("sha1", "repo", "commit", "path"),
HashtablesCols("sha1", "hashtable", "value"),
DocFreqCols("id", "docs", "df")
FeaturesDocsCols("id", "docs"),
FeaturesFreqCols("id", "feature", "weight")
)

val formatter = new ObjectInserter.Formatter
Expand Down
20 changes: 13 additions & 7 deletions src/main/scala/tech/sourced/gemini/Hash.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,21 @@ class Hash(session: SparkSession,

protected def saveDocFreqToDB(docFreq: OrderedDocFreq, keyspace: String, tables: Tables): Unit = {
log.warn(s"save document frequencies to DB")
CassandraConnector(session.sparkContext).withSessionDo { cassandra =>
val cols = tables.docFreqCols
val javaMap = docFreq.df.asJava

CassandraConnector(session.sparkContext).withSessionDo { cassandra =>
val docsCols = tables.featuresDocsCols
cassandra.execute(
s"INSERT INTO $keyspace.${tables.docFreq} (${cols.id}, ${cols.docs}, ${cols.df}) VALUES (?, ?, ?)",
mode, int2Integer(docFreq.docs), javaMap
s"INSERT INTO $keyspace.${tables.featuresDocs} (${docsCols.id}, ${docsCols.docs}) VALUES (?, ?)",
mode, int2Integer(docFreq.docs)
)

val freqCols = tables.featuresFreqCols
val prepared = cassandra.prepare(s"INSERT INTO $keyspace.${tables.featuresFreq}" +
s"(${freqCols.id}, ${freqCols.feature}, ${freqCols.weight}) VALUES (?, ?, ?)")

docFreq.df.foreach { case(feature, weight) =>
cassandra.execute(prepared.bind(mode, feature, int2Integer(weight)))
}
}
}

Expand Down Expand Up @@ -267,7 +274,6 @@ class Hash(session: SparkSession,
case Gemini.funcSimilarityMode => FeaturesHash.funcParams
}

val hashtablesTable = s"${tables.hashtables}_${mode}"
val cols = tables.hashtablesCols
rdd
.flatMap { case RDDHash(doc, wmh) =>
Expand All @@ -276,7 +282,7 @@ class Hash(session: SparkSession,
.toDF(cols.sha, cols.hashtable, cols.value)
.write
.mode("append")
.cassandraFormat(hashtablesTable, keyspace)
.cassandraFormat(tables.hashtables(mode), keyspace)
.save()
}

Expand Down
3 changes: 1 addition & 2 deletions src/main/scala/tech/sourced/gemini/Report.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ class Report(conn: Session, log: Slf4jLogger, keyspace: String, tables: Tables)
*/
def findConnectedComponents(mode: String): (Map[Int, Set[Int]], Map[Int, List[Int]], Map[String, Int]) = {
log.info(s"Finding ${mode} connected components")
val hashtablesTable = s"${tables.hashtables}_${mode}"
val cc = new DBConnectedComponents(log, conn, hashtablesTable, keyspace)
val cc = new DBConnectedComponents(log, conn, tables.hashtables(mode), keyspace)
val (buckets, elementIds) = cc.makeBuckets()
val elsToBuckets = cc.elementsToBuckets(buckets)

Expand Down
20 changes: 13 additions & 7 deletions src/test/scala/tech/sourced/gemini/BaseDBSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,30 @@ trait BaseDBSpec extends BeforeAndAfterAll {
}

def insertHashtables(items: Iterable[HashtableItem], mode: String): Unit = {
val hashtablesTable = s"${Gemini.tables.hashtables}_${mode}"
val cols = Gemini.tables.hashtablesCols
items.foreach { case HashtableItem(ht, v, sha1) =>
val cql = s"""INSERT INTO $keyspace.${hashtablesTable}
val cql = s"""INSERT INTO $keyspace.${Gemini.tables.hashtables(mode)}
(${cols.hashtable}, ${cols.value}, ${cols.sha})
VALUES ($ht, $v, '$sha1')"""
cassandra.execute(cql)
}
}

def insertDocFreq(docFreq: OrderedDocFreq, mode: String): Unit = {
val cols = Gemini.tables.docFreqCols
val javaMap = docFreq.df.asJava

val docsCols = Gemini.tables.featuresDocsCols
cassandra.execute(
s"INSERT INTO $keyspace.${Gemini.tables.docFreq} (${cols.id}, ${cols.docs}, ${cols.df}) VALUES (?, ?, ?)",
mode, int2Integer(docFreq.docs), javaMap
s"INSERT INTO $keyspace.${Gemini.tables.featuresDocs} (${docsCols.id}, ${docsCols.docs}) VALUES (?, ?)",
mode, int2Integer(docFreq.docs)
)

val freqCols = Gemini.tables.featuresFreqCols
docFreq.df.foreach { case(feature, weight) =>
cassandra.execute(
s"INSERT INTO $keyspace.${Gemini.tables.featuresFreq}" +
s"(${freqCols.id}, ${freqCols.feature}, ${freqCols.weight}) VALUES (?, ?, ?)",
mode, feature, int2Integer(weight)
)
}
}

override def afterAll(): Unit = {
Expand Down

0 comments on commit df2042e

Please sign in to comment.