Skip to content

Commit

Permalink
Merge pull request #48 from bzz/apollo-schema
Browse files Browse the repository at this point in the history
Refactoring: reconciling schema \w Apollo Meta DB
  • Loading branch information
bzz authored Mar 9, 2018
2 parents 5ec8b38 + 8f734c5 commit 17b4c93
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 54 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
~*
target/
.idea/
.vscode

metastore_db/
scylla
Expand Down
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ matrix:
- ./hash src/test/resources/siva || travis_terminate 1
- ./query ./LICENSE
- ./report
- go get ./src/main/go/... || true
- go run ./src/main/go/query.go ./LICENSE
before_deploy:
- VERSION=$TRAVIS_TAG ./scripts/release.sh
deploy:
Expand Down
8 changes: 7 additions & 1 deletion src/main/go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,11 @@ Trivial example of client application in Golang to query for same files.
Add path to the file you want to search duplicates for and

```
go run query.go <path-to-file>
go run src/main/go/query.go <path-to-file>
```

or, to test a connection to DB use

```
go run -tags="gocql_debug" src/main/go/connect.go
```
29 changes: 20 additions & 9 deletions src/main/go/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@ import (
"github.com/scylladb/gocqlx/qb"
)

// BlobHash is single blob inside a repository
type BlobHash struct {
BlobHash string
Repo string
FilePath string
Sha1 string
Commit string
Repo string
Path string
}

const (
defaultKeyspace = "hashes"
defaultTable = "blob_hash_files"
)

func main() {
flag.Parse()
args := flag.Args()
Expand All @@ -35,30 +42,34 @@ func main() {
session := connect()
defer session.Close()

stmt, names := qb.Select("hashes.blob_hash_files").
Where(qb.In("blob_hash")).
stmt, names := qb.Select(fmt.Sprintf("%s.%s", defaultKeyspace, defaultTable)).
Where(qb.In("sha1")).
ToCql()

q := gocqlx.Query(session.Query(stmt), names).BindMap(qb.M{
"blob_hash": []string{hash},
"sha1": []string{hash},
})
defer q.Release()

var similarHashes []BlobHash
if err := gocqlx.Select(&similarHashes, q.Query); err != nil {
log.Fatal("select:", err)
log.Fatalf("select: %v in %s", err, q.Query)
}

for _, hash := range similarHashes {
fmt.Printf("\t%+v\n", hash)
}

if len(similarHashes) == 0 {
os.Exit(2)
}
}

// connect to the cluster
func connect() *gocql.Session {
node := "127.0.0.1"
cluster := gocql.NewCluster(node)
cluster.Keyspace = "hashes"
cluster.Keyspace = defaultKeyspace
session, err := cluster.CreateSession()
if err != nil {
log.Fatalf("Can not create connection to %s, %v", node, err)
Expand All @@ -75,7 +86,7 @@ func sha1hash(file string) string {

f, err := os.Open(file)
if err != nil {
log.Fatal("Can not open a file %s", file, err)
log.Fatalf("Can not open a file %s, err:+%v", file, err)
}
defer f.Close()

Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/schema.cql
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CREATE KEYSPACE IF NOT EXISTS __KEYSPACE__ WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
USE __KEYSPACE__;
CREATE TABLE IF NOT EXISTS __KEYSPACE__.blob_hash_files (blob_hash ascii, repo text, ref_hash ascii, file_path text, PRIMARY KEY (blob_hash, repo, ref_hash, file_path));
CREATE TABLE IF NOT EXISTS __KEYSPACE__.blob_hash_files (sha1 ascii, repo text, commit ascii, path text, PRIMARY KEY (sha1, repo, commit, path));
67 changes: 39 additions & 28 deletions src/main/scala/tech/sourced/gemini/Gemini.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import scala.io.Source

class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.defautKeyspace) {

import Gemini._
import session.implicits._

def hash(reposPath: String, limit: Int = 0, format: String = "siva"): DataFrame = {
Expand Down Expand Up @@ -43,16 +44,17 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.
.getTreeEntries
.getBlobs
.select("blob_id", "repository_id", "commit_hash", "path")
.withColumnRenamed("blob_id", "blob_hash")
.withColumnRenamed("repository_id", "repo")
.withColumnRenamed("commit_hash", "ref_hash")
.withColumnRenamed("path", "file_path")
.withColumnRenamed("blob_id", meta.sha)
.withColumnRenamed("repository_id", meta.repo)
.withColumnRenamed("commit_hash", meta.commit)
.withColumnRenamed("path", meta.path)

def save(files: DataFrame): Unit = {
log.info(s"Writing ${files.rdd.countApprox(10000L)} files to DB")
val approxFileCount = files.rdd.countApprox(10000L)
log.info(s"Writing $approxFileCount files to DB")
files.write
.mode("append")
.cassandraFormat("blob_hash_files", keyspace)
.cassandraFormat(defaultTable, keyspace)
.save()
}

Expand All @@ -71,11 +73,11 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.
def query(inPath: String, conn: Session): ReportByLine = {
val path = new File(inPath)
if (path.isDirectory) {
ReportByLine(Gemini.findDuplicateProjects(path, conn, keyspace))
ReportByLine(findDuplicateProjects(path, conn, keyspace))
//TODO: implement based on Apolo
//findSimilarProjects(path)
} else {
ReportByLine(Gemini.findDuplicateItemForFile(path, conn, keyspace))
ReportByLine(findDuplicateItemForFile(path, conn, keyspace))
//TODO: implement based on Apolo
//findSimilarFiles(path)
}
Expand All @@ -89,7 +91,7 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.
* @return
*/
def report(conn: Session): ReportExpandedGroup = {
ReportExpandedGroup(Gemini.findAllDuplicateItems(conn, keyspace))
ReportExpandedGroup(findAllDuplicateItems(conn, keyspace))
}

/**
Expand All @@ -101,7 +103,7 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.
* @return
*/
def reportCassandraCondensed(conn: Session): ReportGrouped = {
ReportGrouped(Gemini.findAllDuplicateBlobHashes(conn, keyspace))
ReportGrouped(findAllDuplicateBlobHashes(conn, keyspace))
}

/**
Expand All @@ -115,15 +117,15 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.
def reportCassandraGroupBy(conn: Session): ReportExpandedGroup = {
val duplicates = reportCassandraCondensed(conn).v
.map { item =>
Gemini.findDuplicateItemForBlobHash(item.sha, conn, keyspace)
findDuplicateItemForBlobHash(item.sha, conn, keyspace)
}
ReportExpandedGroup(duplicates)
}

def applySchema(session: Session): Unit = {
log.debug("CQL: creating schema")
Source
.fromFile(Gemini.defaultSchemaFile)
.fromFile(defaultSchemaFile)
.getLines
.map(_.trim)
.filter(!_.isEmpty)
Expand All @@ -142,27 +144,29 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.
}

object URLFormatter {
val services = Map(
private val services = Map(
"github.com" -> "https://%s/blob/%s/%s",
"bitbucket.org" -> "https://%s/src/%s/%s",
"gitlab.com" -> "https://%s/blob/%s/%s"
)
val default = ("", "repo: %s ref_hash: %s file: %s")
private val default = ("", "repo: %s commit: %s path: %s")

def format(repo: String, ref_hash: String, file: String): String = {
def format(repo: String, commit: String, path: String): String = {
val urlTemplateByRepo = services.find { case (h, _) => repo.startsWith(h) }.getOrElse(default)._2
val repoWithoutSuffix = repo.replaceFirst("\\.git$", "")

urlTemplateByRepo.format(repoWithoutSuffix, ref_hash, file)
urlTemplateByRepo.format(repoWithoutSuffix, commit, path)
}
}

case class RepoFile(repo: String, ref_hash: String, file: String, sha: String) {
override def toString(): String = URLFormatter.format(repo, ref_hash, file)
case class Meta(sha: String, repo: String, commit: String, path: String)

case class RepoFile(repo: String, commit: String, path: String, sha: String) {
override def toString: String = URLFormatter.format(repo, commit, path)
}

case class DuplicateBlobHash(sha: String, count: Long) {
override def toString(): String = s"$sha ($count duplicates)"
override def toString: String = s"$sha ($count duplicates)"
}

object Gemini {
Expand All @@ -171,6 +175,10 @@ object Gemini {
val defaultCassandraPort: Int = 9042
val defaultSchemaFile: String = "src/main/resources/schema.cql"
val defautKeyspace: String = "hashes"
val defaultTable: String = "blob_hash_files"

//TODO(bzz): switch to `tables("meta")`
val meta = Meta("sha1", "repo", "commit", "path")

val formatter = new ObjectInserter.Formatter

Expand All @@ -193,13 +201,15 @@ object Gemini {
* @return
*/
def findAllDuplicateBlobHashes(conn: Session, keyspace: String): Iterable[DuplicateBlobHash] = {
val duplicatesCountCql = s"SELECT blob_hash, COUNT(*) as count FROM ${keyspace}.blob_hash_files GROUP BY blob_hash"
val hash = meta.sha
val dupCount = "count"
val duplicatesCountCql = s"SELECT $hash, COUNT(*) as $dupCount FROM $keyspace.$defaultTable GROUP BY $hash"
conn
.execute(new SimpleStatement(duplicatesCountCql))
.asScala
.filter(_.getLong("count") > 1)
.filter(_.getLong(dupCount) > 1)
.map { r =>
DuplicateBlobHash(r.getString("blob_hash"), r.getLong("count"))
DuplicateBlobHash(r.getString(meta.sha), r.getLong(dupCount))
}
}

Expand All @@ -211,12 +221,13 @@ object Gemini {
* @return
*/
def findAllDuplicateItems(conn: Session, keyspace: String): Iterable[Iterable[RepoFile]] = {
val distinctBlobHash = s"SELECT distinct blob_hash FROM ${keyspace}.blob_hash_files"
val hash = meta.sha
val distinctBlobHash = s"SELECT distinct $hash FROM $keyspace.$defaultTable"
conn
.execute(new SimpleStatement(distinctBlobHash))
.asScala
.flatMap { r =>
val dupes = findDuplicateItemForBlobHash(r.getString("blob_hash"), conn, keyspace)
val dupes = findDuplicateItemForBlobHash(r.getString(hash), conn, keyspace)
if (dupes.size > 1) {
List(dupes)
} else {
Expand All @@ -231,15 +242,15 @@ object Gemini {
}

def findDuplicateItemForFile(file: File, conn: Session, keyspace: String): Iterable[RepoFile] = {
findDuplicateItemForBlobHash(Gemini.computeSha1(file), conn, keyspace)
findDuplicateItemForBlobHash(computeSha1(file), conn, keyspace)
}

def findDuplicateItemForBlobHash(sha: String, conn: Session, keyspace: String): Iterable[RepoFile] = {
val query = QueryBuilder.select().all().from(keyspace, "blob_hash_files")
.where(QueryBuilder.eq("blob_hash", sha))
val query = QueryBuilder.select().all().from(keyspace, defaultTable)
.where(QueryBuilder.eq(meta.sha, sha))

conn.execute(query).asScala.map { row =>
RepoFile(row.getString("repo"), row.getString("ref_hash"), row.getString("file_path"), row.getString("blob_hash"))
RepoFile(row.getString(meta.repo), row.getString(meta.commit), row.getString(meta.path), row.getString(meta.sha))
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/tech/sourced/gemini/HashSparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object HashSparkApp extends App with Logging {
.getOrCreate()

if (config.verbose) {
LogManager.getRootLogger().setLevel(Level.INFO)
LogManager.getRootLogger.setLevel(Level.INFO)
}

val repos = listRepositories(reposPath, config.format, spark.sparkContext.hadoopConfiguration, config.limit)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/tech/sourced/gemini/Logger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.slf4j.{Logger => Slf4jLogger}
object Logger {
def apply(name: String, verbose: Boolean = false): Slf4jLogger = {
if (verbose) {
LogManager.getRootLogger().setLevel(Level.INFO)
LogManager.getRootLogger.setLevel(Level.INFO)
}
LoggerFactory.getLogger(name)
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/tech/sourced/gemini/QueryApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ object QueryApp extends App {

val similar = gemini.query(file, cassandra).v

cassandra.close
cluster.close
cassandra.close()
cluster.close()

if (similar.isEmpty) {
println(s"No duplicates of $file found.")
Expand Down
7 changes: 3 additions & 4 deletions src/main/scala/tech/sourced/gemini/ReportSparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ object ReportSparkApp extends App {
case `groupByMode` => gemini.reportCassandraGroupBy(cassandra)
}

cassandra.close
cluster.close
cassandra.close()
cluster.close()

print(report)
case None =>
Expand All @@ -65,12 +65,11 @@ object ReportSparkApp extends App {
report match {
case e if e.empty() => println(s"No duplicates found.")
case ReportGrouped(v) => println(s"Duplicates found:\n\t" + (v mkString "\n\t"))
case ReportExpandedGroup(v) => {
case ReportExpandedGroup(v) =>
v.foreach { item =>
val count = item.size
println(s"$count duplicates:\n\t" + (item mkString "\n\t") + "\n")
}
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/test/scala/tech/sourced/gemini/CassandraSparkSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class CassandraSparkSpec extends FlatSpec
sha1.v should not be empty
sha1.v.head.sha should be("097f4a292c384e002c5b5ce8e15d746849af7b37") // git hash-object -w LICENSE
sha1.v.head.repo should be("null/Users/alex/src-d/gemini")
sha1.v.head.ref_hash should be("4aa29ac236c55ebbfbef149fef7054d25832717f")
sha1.v.head.commit should be("4aa29ac236c55ebbfbef149fef7054d25832717f")
}

"Query for duplicates in single repository" should "return 2 files" in {
Expand Down Expand Up @@ -113,7 +113,7 @@ class CassandraSparkSpec extends FlatSpec
val detailedReport = gemini.reportCassandraGroupBy(session).v
println("Done")

val duplicatedFileNames = detailedReport map (_.head.file)
val duplicatedFileNames = detailedReport map (_.head.path)
duplicatedFileNames.toSeq should contain theSameElementsAs expectedDuplicateFiles
}

Expand All @@ -124,7 +124,7 @@ class CassandraSparkSpec extends FlatSpec
val detailedReport = gemini.report(session).v
println("Done")

val duplicatedFileNames = detailedReport map (_.head.file)
val duplicatedFileNames = detailedReport map (_.head.path)
duplicatedFileNames.toSeq should contain theSameElementsAs expectedDuplicateFiles
}

Expand All @@ -140,7 +140,7 @@ class CassandraSparkSpec extends FlatSpec

"Hash with limit" should "collect files only from limit repos" in {
val gemini = Gemini(sparkSession)
val repos = gemini.hash("src/test/resources/siva", 1).select("repo").distinct().count()
val repos = gemini.hash("src/test/resources/siva", 1).select(Gemini.meta.repo).distinct().count()
repos should be(1)
}

Expand Down
Loading

0 comments on commit 17b4c93

Please sign in to comment.