Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Makes multivalued handled within ChildDocuments #241

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,13 @@
<goal>testCompile</goal>
</goals>
</execution>
<execution>
<!-- <execution>
<id>attach-scaladocs</id>
<phase>verify</phase>
<goals>
<goal>doc-jar</goal>
</goals>
</execution>
</execution> -->
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
Expand Down Expand Up @@ -280,6 +280,10 @@
<exclude>org.eclipse.jetty.orbit:*</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.scala-lang:scala-library</exclude>
<!-- begin removed for livy-->
<exclude>org.scala-lang:scala-reflect</exclude>
<exclude>org.scala-lang.modules:scala-parser-combinators_2.11</exclude>
<!-- end removed for livy-->
<exclude>commons-httpclient:commons-httpclient</exclude>
<exclude>org.apache.curator:*</exclude>
<exclude>org.apache.commons:commons-lang3</exclude>
Expand Down
98 changes: 46 additions & 52 deletions src/main/scala/com/lucidworks/spark/SolrRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -676,66 +676,60 @@ class SolrRelation(

// Convert RDD of rows in to SolrInputDocuments
val docs = df.rdd.map(row => {
val schema: StructType = row.schema
val doc = new SolrInputDocument
schema.fields.foreach(field => {
val fname = field.name
breakable {
if (fname.equals("_version_")) break()
val isChildDocument = (fname == fieldNameForChildDocuments)
val fieldIndex = row.fieldIndex(fname)
val fieldValue : Option[Any] = if (row.isNullAt(fieldIndex)) None else Some(row.get(fieldIndex))
if (fieldValue.isDefined) {
val value = fieldValue.get

if(isChildDocument) {
val it = value.asInstanceOf[Iterable[GenericRowWithSchema]].iterator
while (it.hasNext) {
val elem = it.next()
val childDoc = new SolrInputDocument
for (i <- 0 until elem.schema.fields.size) {
childDoc.setField(elem.schema.fields(i).name, elem.get(i))
}

// Generate unique key if the child document doesn't have one
if (generateUniqChildKey) {
if (!childDoc.containsKey(uniqueKey)) {
childDoc.setField(uniqueKey, UUID.randomUUID().toString)
}
}
val schema: StructType = row.schema
val doc = createDoc(schema.fields, row, fieldNameForChildDocuments, generateUniqKey)
doc
})
val acc: SparkSolrAccumulator = new SparkSolrAccumulator
val accName = if (conf.getAccumulatorName.isDefined) conf.getAccumulatorName.get else "Records Written"
sparkSession.sparkContext.register(acc, accName)
SparkSolrAccumulatorContext.add(accName, acc.id)
SolrSupport.indexDocs(zkHost, collectionId, batchSize, docs, conf.commitWithin, Some(acc))
logger.info("Written {} documents to Solr collection {}", acc.value, collectionId)
}

doc.addChildDocument(childDoc)
}
break()
private def createDoc(schema: Array[StructField], row: Row, fieldNameForChildDocuments: String, generateUniqKey: Boolean): SolrInputDocument = {
val doc = new SolrInputDocument
schema.foreach(field => {
val fname = field.name
breakable {
if (fname.equals("_version_")) break()
val isChildDocument = (fname == fieldNameForChildDocuments)
val fieldIndex = row.fieldIndex(fname)
val fieldValue: Option[Any] = if (row.isNullAt(fieldIndex)) None else Some(row.get(fieldIndex))
if (fieldValue.isDefined) {
val value = fieldValue.get
println(fname)

if (isChildDocument) {
val it = value.asInstanceOf[Iterable[GenericRowWithSchema]].iterator
while (it.hasNext) {
println("recurs" + value)
val child = it.next()
doc.addChildDocument(createDoc((child.schema.toList).toArray, child, fieldNameForChildDocuments, generateUniqKey))
}
break()
}
value match {
//TODO: Do we need to check explicitly for ArrayBuffer and WrappedArray
case v: Iterable[Any] =>
val it = v.iterator
while (it.hasNext) doc.addField(fname, it.next())
case bd: java.math.BigDecimal =>
doc.setField(fname, bd.doubleValue())
case _ => doc.setField(fname, value)
}

value match {
//TODO: Do we need to check explicitly for ArrayBuffer and WrappedArray
case v: Iterable[Any] =>
val it = v.iterator
while (it.hasNext) doc.addField(fname, it.next())
case bd: java.math.BigDecimal =>
doc.setField(fname, bd.doubleValue())
case _ => doc.setField(fname, value)
// Generate unique key if the document doesn't have one
if (generateUniqKey) {
if (!doc.containsKey(uniqueKey)) {
doc.setField(uniqueKey, UUID.randomUUID().toString)
}
}
}
})

// Generate unique key if the document doesn't have one
if (generateUniqKey) {
if (!doc.containsKey(uniqueKey)) {
doc.setField(uniqueKey, UUID.randomUUID().toString)
}
}
doc
})
val acc: SparkSolrAccumulator = new SparkSolrAccumulator
val accName = if (conf.getAccumulatorName.isDefined) conf.getAccumulatorName.get else "Records Written"
sparkSession.sparkContext.register(acc, accName)
SparkSolrAccumulatorContext.add(accName, acc.id)
SolrSupport.indexDocs(zkHost, collectionId, batchSize, docs, conf.commitWithin, Some(acc))
logger.info("Written {} documents to Solr collection {}", acc.value, collectionId)
doc
}

private def checkRequiredParams(): Unit = {
Expand Down