Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Tachyon WriteType configurable. #327

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/main/scala/shark/SharkConfVars.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ object SharkConfVars {

// Number of mappers to force for table scan jobs
val NUM_MAPPERS = new ConfVar("shark.map.tasks", -1)

// WriteType for Tachyon off-heap table writer,e.g., "TRY_CACHE", "MUST_CACHE",
// "CACHE_THROUGH", "THROUGH".
// For the reliability concern, we strongly recommend to use the default "CACHE_THROUGH",
// which means to write the table synchronously to the under fs, and cache the host columns.
// Both "TRY_CACHE" and "MUST_CACHE" options only cache the table with better write
// performance. However be careful to use those two options! If the entire table
// cannot be fully cached, some data part will be evicted and lost forever.
// "THROUGH" only writes the table to under fs and with no cache at all.
val TACHYON_WRITER_WRITETYPE = new ConfVar("shark.tachyon.writetype", "CACHE_THROUGH")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment here to both explain what this option is and to warn against the dangers of setting the WriteType without THROUGH?


// Add Shark configuration variables and their default values to the given conf,
// so default values show up in 'set'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class MemoryStoreSinkOperator extends TerminalOperator {
outputRDD = outputRDD.mapPartitionsWithIndex { case(part, iter) =>
val partition = iter.next()
partition.toOffHeap.zipWithIndex.foreach { case(buf, column) =>
offHeapWriter.setLocalHconf(op.getLocalHconf)
offHeapWriter.writeColumnPartition(column, part, buf)
}
Iterator(partition)
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/shark/execution/SparkLoadTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class SparkLoadTask extends HiveTask[SparkLoadWork] with Serializable with LogHe
transformedRdd = transformedRdd.mapPartitionsWithIndex { case(part, iter) =>
val partition = iter.next()
partition.toOffHeap.zipWithIndex.foreach { case(buf, column) =>
offHeapWriter.setLocalHconf(broadcastedHiveConf.value.value)
offHeapWriter.writeColumnPartition(column, part, buf)
}
Iterator(partition)
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/shark/memstore2/OffHeapStorageClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package shark.memstore2
import java.util
import java.nio.ByteBuffer

import scala.reflect.BeanProperty

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.spark.rdd.RDD

import shark.LogHelper
Expand Down Expand Up @@ -67,6 +70,7 @@ abstract class OffHeapStorageClient {
}

abstract class OffHeapTableWriter extends Serializable {
@transient @BeanProperty var localHconf: HiveConf = _

/** Creates this table. Called only on the driver node. */
def createTable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package shark.tachyon

import java.nio.ByteBuffer

import scala.reflect.BeanProperty

import tachyon.client.WriteType

import shark.LogHelper
import shark.{LogHelper, SharkConfVars}
import shark.execution.serialization.JavaSerializer
import shark.memstore2.{OffHeapStorageClient, OffHeapTableWriter, TablePartitionStats}

Expand Down Expand Up @@ -51,7 +53,9 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns:
val rawColumn = rawTable.getRawColumn(column)
rawColumn.createPartition(part)
val file = rawColumn.getPartition(part)
val outStream = file.getOutStream(WriteType.CACHE_THROUGH)
val writeType: WriteType = WriteType.valueOf(
SharkConfVars.getVar(localHconf, SharkConfVars.TACHYON_WRITER_WRITETYPE))
val outStream = file.getOutStream(writeType)
outStream.write(data.array(), 0, data.limit())
outStream.close()
}
Expand Down