diff --git a/pom.xml b/pom.xml index 1bf10ad3..d7079ba3 100644 --- a/pom.xml +++ b/pom.xml @@ -172,13 +172,13 @@ testCompile - + ${scala.version} @@ -280,6 +280,10 @@ org.eclipse.jetty.orbit:* org.slf4j:* org.scala-lang:scala-library + + org.scala-lang:scala-reflect + org.scala-lang.modules:scala-parser-combinators_2.11 + commons-httpclient:commons-httpclient org.apache.curator:* org.apache.commons:commons-lang3 diff --git a/src/main/scala/com/lucidworks/spark/SolrRelation.scala b/src/main/scala/com/lucidworks/spark/SolrRelation.scala index 198e93e1..cfe13859 100644 --- a/src/main/scala/com/lucidworks/spark/SolrRelation.scala +++ b/src/main/scala/com/lucidworks/spark/SolrRelation.scala @@ -676,66 +676,60 @@ class SolrRelation( // Convert RDD of rows in to SolrInputDocuments val docs = df.rdd.map(row => { - val schema: StructType = row.schema - val doc = new SolrInputDocument - schema.fields.foreach(field => { - val fname = field.name - breakable { - if (fname.equals("_version_")) break() - val isChildDocument = (fname == fieldNameForChildDocuments) - val fieldIndex = row.fieldIndex(fname) - val fieldValue : Option[Any] = if (row.isNullAt(fieldIndex)) None else Some(row.get(fieldIndex)) - if (fieldValue.isDefined) { - val value = fieldValue.get - - if(isChildDocument) { - val it = value.asInstanceOf[Iterable[GenericRowWithSchema]].iterator - while (it.hasNext) { - val elem = it.next() - val childDoc = new SolrInputDocument - for (i <- 0 until elem.schema.fields.size) { - childDoc.setField(elem.schema.fields(i).name, elem.get(i)) - } - - // Generate unique key if the child document doesn't have one - if (generateUniqChildKey) { - if (!childDoc.containsKey(uniqueKey)) { - childDoc.setField(uniqueKey, UUID.randomUUID().toString) - } - } + val schema: StructType = row.schema + val doc = createDoc(schema.fields, row, fieldNameForChildDocuments, generateUniqKey) + doc + }) + val acc: SparkSolrAccumulator = new SparkSolrAccumulator + val accName = if (conf.getAccumulatorName.isDefined) conf.getAccumulatorName.get else "Records Written" + sparkSession.sparkContext.register(acc, accName) + SparkSolrAccumulatorContext.add(accName, acc.id) + SolrSupport.indexDocs(zkHost, collectionId, batchSize, docs, conf.commitWithin, Some(acc)) + logger.info("Written {} documents to Solr collection {}", acc.value, collectionId) + } - doc.addChildDocument(childDoc) - } - break() + private def createDoc(schema: Array[StructField], row: Row, fieldNameForChildDocuments: String, generateUniqKey: Boolean): SolrInputDocument = { + val doc = new SolrInputDocument + schema.foreach(field => { + val fname = field.name + breakable { + if (fname.equals("_version_")) break() + val isChildDocument = (fname == fieldNameForChildDocuments) + val fieldIndex = row.fieldIndex(fname) + val fieldValue: Option[Any] = if (row.isNullAt(fieldIndex)) None else Some(row.get(fieldIndex)) + if (fieldValue.isDefined) { + val value = fieldValue.get + println(fname) + + if (isChildDocument) { + val it = value.asInstanceOf[Iterable[GenericRowWithSchema]].iterator + while (it.hasNext) { + println("recurs" + value) + val child = it.next() + doc.addChildDocument(createDoc((child.schema.toList).toArray, child, fieldNameForChildDocuments, generateUniqKey)) } + break() + } + value match { + //TODO: Do we need to check explicitly for ArrayBuffer and WrappedArray + case v: Iterable[Any] => + val it = v.iterator + while (it.hasNext) doc.addField(fname, it.next()) + case bd: java.math.BigDecimal => + doc.setField(fname, bd.doubleValue()) + case _ => doc.setField(fname, value) + } - value match { - //TODO: Do we need to check explicitly for ArrayBuffer and WrappedArray - case v: Iterable[Any] => - val it = v.iterator - while (it.hasNext) doc.addField(fname, it.next()) - case bd: java.math.BigDecimal => - doc.setField(fname, bd.doubleValue()) - case _ => doc.setField(fname, value) + // Generate unique key if the document doesn't have one + if (generateUniqKey) { + if (!doc.containsKey(uniqueKey)) { + doc.setField(uniqueKey, UUID.randomUUID().toString) } } } - }) - - // Generate unique key if the document doesn't have one - if (generateUniqKey) { - if (!doc.containsKey(uniqueKey)) { - doc.setField(uniqueKey, UUID.randomUUID().toString) - } } - doc }) - val acc: SparkSolrAccumulator = new SparkSolrAccumulator - val accName = if (conf.getAccumulatorName.isDefined) conf.getAccumulatorName.get else "Records Written" - sparkSession.sparkContext.register(acc, accName) - SparkSolrAccumulatorContext.add(accName, acc.id) - SolrSupport.indexDocs(zkHost, collectionId, batchSize, docs, conf.commitWithin, Some(acc)) - logger.info("Written {} documents to Solr collection {}", acc.value, collectionId) + doc } private def checkRequiredParams(): Unit = {