Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring: reconciling schema \w Apollo Meta DB #48

Merged
merged 8 commits into from
Mar 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
~*
target/
.idea/
.vscode

metastore_db/
scylla
Expand Down
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ matrix:
- ./hash src/test/resources/siva || travis_terminate 1
- ./query ./LICENSE
- ./report
- go get ./src/main/go/... || true
- go run ./src/main/go/query.go ./LICENSE
before_deploy:
- VERSION=$TRAVIS_TAG ./scripts/release.sh
deploy:
Expand Down
8 changes: 7 additions & 1 deletion src/main/go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,11 @@ Trivial example of client application in Golang to query for same files.
Add path to the file you want to search duplicates for and

```
go run query.go <path-to-file>
go run src/main/go/query.go <path-to-file>
```

or, to test a connection to DB use

```
go run -tags="gocql_debug" src/main/go/connect.go
```
29 changes: 20 additions & 9 deletions src/main/go/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@ import (
"github.com/scylladb/gocqlx/qb"
)

// BlobHash is single blob inside a repository
type BlobHash struct {
BlobHash string
Repo string
FilePath string
Sha1 string
Commit string
Repo string
Path string
}

const (
defaultKeyspace = "hashes"
defaultTable = "blob_hash_files"
)

func main() {
flag.Parse()
args := flag.Args()
Expand All @@ -35,30 +42,34 @@ func main() {
session := connect()
defer session.Close()

stmt, names := qb.Select("hashes.blob_hash_files").
Where(qb.In("blob_hash")).
stmt, names := qb.Select(fmt.Sprintf("%s.%s", defaultKeyspace, defaultTable)).
Where(qb.In("sha1")).
ToCql()

q := gocqlx.Query(session.Query(stmt), names).BindMap(qb.M{
"blob_hash": []string{hash},
"sha1": []string{hash},
})
defer q.Release()

var similarHashes []BlobHash
if err := gocqlx.Select(&similarHashes, q.Query); err != nil {
log.Fatal("select:", err)
log.Fatalf("select: %v in %s", err, q.Query)
}

for _, hash := range similarHashes {
fmt.Printf("\t%+v\n", hash)
}

if len(similarHashes) == 0 {
os.Exit(2)
}
}

// connect to the cluster
func connect() *gocql.Session {
node := "127.0.0.1"
cluster := gocql.NewCluster(node)
cluster.Keyspace = "hashes"
cluster.Keyspace = defaultKeyspace
session, err := cluster.CreateSession()
if err != nil {
log.Fatalf("Can not create connection to %s, %v", node, err)
Expand All @@ -75,7 +86,7 @@ func sha1hash(file string) string {

f, err := os.Open(file)
if err != nil {
log.Fatal("Can not open a file %s", file, err)
log.Fatalf("Can not open a file %s, err:+%v", file, err)
}
defer f.Close()

Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/schema.cql
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CREATE KEYSPACE IF NOT EXISTS __KEYSPACE__ WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
USE __KEYSPACE__;
CREATE TABLE IF NOT EXISTS __KEYSPACE__.blob_hash_files (blob_hash ascii, repo text, ref_hash ascii, file_path text, PRIMARY KEY (blob_hash, repo, ref_hash, file_path));
CREATE TABLE IF NOT EXISTS __KEYSPACE__.blob_hash_files (sha1 ascii, repo text, commit ascii, path text, PRIMARY KEY (sha1, repo, commit, path));
67 changes: 39 additions & 28 deletions src/main/scala/tech/sourced/gemini/Gemini.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import scala.io.Source

class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.defautKeyspace) {

import Gemini._
import session.implicits._

def hash(reposPath: String, limit: Int = 0, format: String = "siva"): DataFrame = {
Expand Down Expand Up @@ -43,16 +44,17 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.
.getTreeEntries
.getBlobs
.select("blob_id", "repository_id", "commit_hash", "path")
.withColumnRenamed("blob_id", "blob_hash")
.withColumnRenamed("repository_id", "repo")
.withColumnRenamed("commit_hash", "ref_hash")
.withColumnRenamed("path", "file_path")
.withColumnRenamed("blob_id", meta.sha)
.withColumnRenamed("repository_id", meta.repo)
.withColumnRenamed("commit_hash", meta.commit)
.withColumnRenamed("path", meta.path)

def save(files: DataFrame): Unit = {
log.info(s"Writing ${files.rdd.countApprox(10000L)} files to DB")
val approxFileCount = files.rdd.countApprox(10000L)
log.info(s"Writing $approxFileCount files to DB")
files.write
.mode("append")
.cassandraFormat("blob_hash_files", keyspace)
.cassandraFormat(defaultTable, keyspace)
.save()
}

Expand All @@ -71,11 +73,11 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.
def query(inPath: String, conn: Session): ReportByLine = {
val path = new File(inPath)
if (path.isDirectory) {
ReportByLine(Gemini.findDuplicateProjects(path, conn, keyspace))
ReportByLine(findDuplicateProjects(path, conn, keyspace))
//TODO: implement based on Apolo
//findSimilarProjects(path)
} else {
ReportByLine(Gemini.findDuplicateItemForFile(path, conn, keyspace))
ReportByLine(findDuplicateItemForFile(path, conn, keyspace))
//TODO: implement based on Apolo
//findSimilarFiles(path)
}
Expand All @@ -89,7 +91,7 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.
* @return
*/
def report(conn: Session): ReportExpandedGroup = {
ReportExpandedGroup(Gemini.findAllDuplicateItems(conn, keyspace))
ReportExpandedGroup(findAllDuplicateItems(conn, keyspace))
}

/**
Expand All @@ -101,7 +103,7 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.
* @return
*/
def reportCassandraCondensed(conn: Session): ReportGrouped = {
ReportGrouped(Gemini.findAllDuplicateBlobHashes(conn, keyspace))
ReportGrouped(findAllDuplicateBlobHashes(conn, keyspace))
}

/**
Expand All @@ -115,15 +117,15 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.
def reportCassandraGroupBy(conn: Session): ReportExpandedGroup = {
val duplicates = reportCassandraCondensed(conn).v
.map { item =>
Gemini.findDuplicateItemForBlobHash(item.sha, conn, keyspace)
findDuplicateItemForBlobHash(item.sha, conn, keyspace)
}
ReportExpandedGroup(duplicates)
}

def applySchema(session: Session): Unit = {
log.debug("CQL: creating schema")
Source
.fromFile(Gemini.defaultSchemaFile)
.fromFile(defaultSchemaFile)
.getLines
.map(_.trim)
.filter(!_.isEmpty)
Expand All @@ -142,27 +144,29 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini.
}

object URLFormatter {
val services = Map(
private val services = Map(
"github.com" -> "https://%s/blob/%s/%s",
"bitbucket.org" -> "https://%s/src/%s/%s",
"gitlab.com" -> "https://%s/blob/%s/%s"
)
val default = ("", "repo: %s ref_hash: %s file: %s")
private val default = ("", "repo: %s commit: %s path: %s")

def format(repo: String, ref_hash: String, file: String): String = {
def format(repo: String, commit: String, path: String): String = {
val urlTemplateByRepo = services.find { case (h, _) => repo.startsWith(h) }.getOrElse(default)._2
val repoWithoutSuffix = repo.replaceFirst("\\.git$", "")

urlTemplateByRepo.format(repoWithoutSuffix, ref_hash, file)
urlTemplateByRepo.format(repoWithoutSuffix, commit, path)
}
}

case class RepoFile(repo: String, ref_hash: String, file: String, sha: String) {
override def toString(): String = URLFormatter.format(repo, ref_hash, file)
case class Meta(sha: String, repo: String, commit: String, path: String)

case class RepoFile(repo: String, commit: String, path: String, sha: String) {
override def toString: String = URLFormatter.format(repo, commit, path)
}

case class DuplicateBlobHash(sha: String, count: Long) {
override def toString(): String = s"$sha ($count duplicates)"
override def toString: String = s"$sha ($count duplicates)"
}

object Gemini {
Expand All @@ -171,6 +175,10 @@ object Gemini {
val defaultCassandraPort: Int = 9042
val defaultSchemaFile: String = "src/main/resources/schema.cql"
val defautKeyspace: String = "hashes"
val defaultTable: String = "blob_hash_files"

//TODO(bzz): switch to `tables("meta")`
val meta = Meta("sha1", "repo", "commit", "path")

val formatter = new ObjectInserter.Formatter

Expand All @@ -193,13 +201,15 @@ object Gemini {
* @return
*/
def findAllDuplicateBlobHashes(conn: Session, keyspace: String): Iterable[DuplicateBlobHash] = {
val duplicatesCountCql = s"SELECT blob_hash, COUNT(*) as count FROM ${keyspace}.blob_hash_files GROUP BY blob_hash"
val hash = meta.sha
val dupCount = "count"
val duplicatesCountCql = s"SELECT $hash, COUNT(*) as $dupCount FROM $keyspace.$defaultTable GROUP BY $hash"
conn
.execute(new SimpleStatement(duplicatesCountCql))
.asScala
.filter(_.getLong("count") > 1)
.filter(_.getLong(dupCount) > 1)
.map { r =>
DuplicateBlobHash(r.getString("blob_hash"), r.getLong("count"))
DuplicateBlobHash(r.getString(meta.sha), r.getLong(dupCount))
}
}

Expand All @@ -211,12 +221,13 @@ object Gemini {
* @return
*/
def findAllDuplicateItems(conn: Session, keyspace: String): Iterable[Iterable[RepoFile]] = {
val distinctBlobHash = s"SELECT distinct blob_hash FROM ${keyspace}.blob_hash_files"
val hash = meta.sha
val distinctBlobHash = s"SELECT distinct $hash FROM $keyspace.$defaultTable"
conn
.execute(new SimpleStatement(distinctBlobHash))
.asScala
.flatMap { r =>
val dupes = findDuplicateItemForBlobHash(r.getString("blob_hash"), conn, keyspace)
val dupes = findDuplicateItemForBlobHash(r.getString(hash), conn, keyspace)
if (dupes.size > 1) {
List(dupes)
} else {
Expand All @@ -231,15 +242,15 @@ object Gemini {
}

def findDuplicateItemForFile(file: File, conn: Session, keyspace: String): Iterable[RepoFile] = {
findDuplicateItemForBlobHash(Gemini.computeSha1(file), conn, keyspace)
findDuplicateItemForBlobHash(computeSha1(file), conn, keyspace)
}

def findDuplicateItemForBlobHash(sha: String, conn: Session, keyspace: String): Iterable[RepoFile] = {
val query = QueryBuilder.select().all().from(keyspace, "blob_hash_files")
.where(QueryBuilder.eq("blob_hash", sha))
val query = QueryBuilder.select().all().from(keyspace, defaultTable)
.where(QueryBuilder.eq(meta.sha, sha))

conn.execute(query).asScala.map { row =>
RepoFile(row.getString("repo"), row.getString("ref_hash"), row.getString("file_path"), row.getString("blob_hash"))
RepoFile(row.getString(meta.repo), row.getString(meta.commit), row.getString(meta.path), row.getString(meta.sha))
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/tech/sourced/gemini/HashSparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object HashSparkApp extends App with Logging {
.getOrCreate()

if (config.verbose) {
LogManager.getRootLogger().setLevel(Level.INFO)
LogManager.getRootLogger.setLevel(Level.INFO)
}

val repos = listRepositories(reposPath, config.format, spark.sparkContext.hadoopConfiguration, config.limit)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/tech/sourced/gemini/Logger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.slf4j.{Logger => Slf4jLogger}
object Logger {
def apply(name: String, verbose: Boolean = false): Slf4jLogger = {
if (verbose) {
LogManager.getRootLogger().setLevel(Level.INFO)
LogManager.getRootLogger.setLevel(Level.INFO)
}
LoggerFactory.getLogger(name)
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/tech/sourced/gemini/QueryApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ object QueryApp extends App {

val similar = gemini.query(file, cassandra).v

cassandra.close
cluster.close
cassandra.close()
cluster.close()

if (similar.isEmpty) {
println(s"No duplicates of $file found.")
Expand Down
7 changes: 3 additions & 4 deletions src/main/scala/tech/sourced/gemini/ReportSparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ object ReportSparkApp extends App {
case `groupByMode` => gemini.reportCassandraGroupBy(cassandra)
}

cassandra.close
cluster.close
cassandra.close()
cluster.close()

print(report)
case None =>
Expand All @@ -65,12 +65,11 @@ object ReportSparkApp extends App {
report match {
case e if e.empty() => println(s"No duplicates found.")
case ReportGrouped(v) => println(s"Duplicates found:\n\t" + (v mkString "\n\t"))
case ReportExpandedGroup(v) => {
case ReportExpandedGroup(v) =>
v.foreach { item =>
val count = item.size
println(s"$count duplicates:\n\t" + (item mkString "\n\t") + "\n")
}
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/test/scala/tech/sourced/gemini/CassandraSparkSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class CassandraSparkSpec extends FlatSpec
sha1.v should not be empty
sha1.v.head.sha should be("097f4a292c384e002c5b5ce8e15d746849af7b37") // git hash-object -w LICENSE
sha1.v.head.repo should be("null/Users/alex/src-d/gemini")
sha1.v.head.ref_hash should be("4aa29ac236c55ebbfbef149fef7054d25832717f")
sha1.v.head.commit should be("4aa29ac236c55ebbfbef149fef7054d25832717f")
}

"Query for duplicates in single repository" should "return 2 files" in {
Expand Down Expand Up @@ -113,7 +113,7 @@ class CassandraSparkSpec extends FlatSpec
val detailedReport = gemini.reportCassandraGroupBy(session).v
println("Done")

val duplicatedFileNames = detailedReport map (_.head.file)
val duplicatedFileNames = detailedReport map (_.head.path)
duplicatedFileNames.toSeq should contain theSameElementsAs expectedDuplicateFiles
}

Expand All @@ -124,7 +124,7 @@ class CassandraSparkSpec extends FlatSpec
val detailedReport = gemini.report(session).v
println("Done")

val duplicatedFileNames = detailedReport map (_.head.file)
val duplicatedFileNames = detailedReport map (_.head.path)
duplicatedFileNames.toSeq should contain theSameElementsAs expectedDuplicateFiles
}

Expand All @@ -140,7 +140,7 @@ class CassandraSparkSpec extends FlatSpec

"Hash with limit" should "collect files only from limit repos" in {
val gemini = Gemini(sparkSession)
val repos = gemini.hash("src/test/resources/siva", 1).select("repo").distinct().count()
val repos = gemini.hash("src/test/resources/siva", 1).select(Gemini.meta.repo).distinct().count()
repos should be(1)
}

Expand Down
Loading