Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creates partiql-tpc tool to generate TPC-DS in Ion, CSV, and Parquet #1170

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions buildSrc/src/main/kotlin/partiql.versions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ object Versions {
// Dependencies
const val antlr = "4.10.1"
const val awsSdk = "1.12.344"
const val avro = "1.11.2"
const val csv = "1.8"
const val dotlin = "1.0.2"
const val gson = "2.10.1"
const val guava = "31.1-jre"
const val hadoop = "3.3.6"
const val ionElement = "1.0.0"
const val ionJava = "1.10.2"
const val ionSchema = "1.2.1"
Expand All @@ -42,7 +44,9 @@ object Versions {
const val picoCli = "4.7.0"
const val kasechange = "1.3.0"
const val ktlint = "11.5.0"
const val parquet = "1.13.1"
const val pig = "0.6.2"
const val trinoTPC = "1.4"

// Testing
const val assertj = "3.11.0"
Expand All @@ -65,10 +69,12 @@ object Deps {
const val awsSdkBom = "com.amazonaws:aws-java-sdk-bom:${Versions.awsSdk}"
const val awsSdkDynamodb = "com.amazonaws:aws-java-sdk-dynamodb:${Versions.awsSdk}"
const val awsSdkS3 = "com.amazonaws:aws-java-sdk-s3:${Versions.awsSdk}"
const val avro = "org.apache.avro:avro:${Versions.avro}"
const val csv = "org.apache.commons:commons-csv:${Versions.csv}"
const val dotlin = "io.github.rchowell:dotlin:${Versions.dotlin}"
const val gson = "com.google.code.gson:gson:${Versions.gson}"
const val guava = "com.google.guava:guava:${Versions.guava}"
const val hadoopCommon = "org.apache.hadoop:hadoop-common:${Versions.hadoop}"
const val ionJava = "com.amazon.ion:ion-java:${Versions.ionJava}"
const val ionElement = "com.amazon.ion:ion-element:${Versions.ionElement}"
const val ionSchema = "com.amazon.ion:ion-schema-kotlin:${Versions.ionSchema}"
Expand All @@ -78,9 +84,13 @@ object Deps {
const val kasechange = "net.pearx.kasechange:kasechange:${Versions.kasechange}"
const val kotlinPoet = "com.squareup:kotlinpoet:${Versions.kotlinPoet}"
const val kotlinxCollections = "org.jetbrains.kotlinx:kotlinx-collections-immutable:${Versions.kotlinxCollections}"
const val parquet = "org.apache.parquet:parquet:${Versions.parquet}"
const val parquetAvro = "org.apache.parquet:parquet-avro:${Versions.parquet}"
const val parquetHadoop = "org.apache.parquet:parquet-hadoop:${Versions.parquet}"
const val picoCli = "info.picocli:picocli:${Versions.picoCli}"
const val pig = "org.partiql:partiql-ir-generator:${Versions.pig}"
const val pigRuntime = "org.partiql:partiql-ir-generator-runtime:${Versions.pig}"
const val trinoTPC = "io.trino.tpcds:tpcds:${Versions.trinoTPC}"

// Testing
const val assertj = "org.assertj:assertj-core:${Versions.assertj}"
Expand Down
31 changes: 31 additions & 0 deletions lib/partiql-tpc/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
plugins {
id(Plugins.application)
id(Plugins.conventions)
}

dependencies {
implementation(Deps.avro)
implementation(Deps.csv)
implementation(Deps.hadoopCommon)
// ??? without this I was getting linking errors at runtime
implementation("org.apache.hadoop:hadoop-mapreduce-client-core:3.3.6")
implementation(Deps.ionElement)
implementation(Deps.parquet)
implementation(Deps.parquetAvro)
implementation(Deps.parquetHadoop)
implementation(Deps.picoCli)
implementation(Deps.trinoTPC)
}

kotlin {
explicitApi = null
}

ktlint {
ignoreFailures.set(true)
}

application {
applicationName = "partiql-tpc"
mainClass.set("org.partiql.lib.tpc.Main")
}
126 changes: 126 additions & 0 deletions lib/partiql-tpc/src/main/kotlin/org/partiql/lib/tpc/Main.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at:
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
* language governing permissions and limitations under the License.
*/
@file:JvmName("Main")

package org.partiql.lib.tpc

import io.trino.tpcds.Results.constructResults
import io.trino.tpcds.Session
import io.trino.tpcds.Table
import org.partiql.lib.tpc.formats.ResultSetWriterFactory
import picocli.CommandLine
import picocli.CommandLine.Option
import java.nio.file.Path
import kotlin.io.path.createDirectory
import kotlin.io.path.notExists
import kotlin.system.exitProcess

/**
* Runs the PartiQL CLI.
*/
fun main(args: Array<String>) {
val command = CommandLine(MainCommand())
val exitCode = command.execute(*args)
exitProcess(exitCode)
}

@CommandLine.Command(
name = "partiql-tpc",
mixinStandardHelpOptions = true,
version = ["0"],
description = ["Writes a TPC dataset"],
)
class MainCommand : Runnable {

@Option(names = ["--output"], required = true)
lateinit var output: Path

@Option(
names = ["--scale"],
description = ["Scale factor is 1GB"],
)
var scale: Double = 1.0

@Option(names = ["--partitions"])
var partitions: Int = 1

@Option(names = ["--part"])
var part: Int = 1

@Option(
names = ["--table"],
description = ["Table to generate; if not specified, all tables are generated. https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-ds_v2.6.0.pdf"],
)
var table: String? = null

@Option(
names = ["-d", "-dataset"],
required = true,
description = ["Dataset type; valid values: \${COMPLETION-CANDIDATES}"],
)
lateinit var benchmark: Benchmark

@Option(
names = ["--format"],
required = true,
description = ["Output format; valid values: \${COMPLETION-CANDIDATES}"],
)
lateinit var format: Format

override fun run() {
val sNano = System.nanoTime()
// Prepare
if (output.notExists()) {
output.createDirectory()
}
// Generate
when (table) {
null -> genAll()
else -> genOne(table!!)
}
val eNano = System.nanoTime()
println("Generated ${benchmark.display} data in ${(eNano - sNano) / 1e9}s")
}

private fun genOne(t: String) {
try {
val table = Table.valueOf(t.uppercase())
write(table)
} catch (ex: IllegalArgumentException) {
throw IllegalArgumentException("Table $t does not exist for benchmark $benchmark")
}
}

private fun genAll() {
for (table in Table.getBaseTables()) {
write(table)
}
}

private fun write(table: Table) {
val session = Session.getDefaultSession()
.withScale(scale)
.withParallelism(partitions)
.withChunkNumber(part)
.withTable(table)
.withNoSexism(true)
val writer = ResultSetWriterFactory.create(table, format, output)
val results = constructResults(table, session)
writer.use {
it.open(table)
it.write(results)
// auto-close
}
}
}
54 changes: 54 additions & 0 deletions lib/partiql-tpc/src/main/kotlin/org/partiql/lib/tpc/Model.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.partiql.lib.tpc

import io.trino.tpcds.Table
import io.trino.tpcds.column.ColumnType
import java.time.LocalDate
import java.time.LocalTime

/**
* Output formats
*/
enum class Format {
ION, CSV, PARQUET,
}

/**
* Benchmark type to generate for
*/
enum class Benchmark(val display: String) {
TPCDS("TPC-DS"),
TPCH("TPC-H"),
}

/**
* Generated data is string-ly typed
*/
typealias Mapper<T> = (String) -> T

/**
* Produce an array of mappers from the table schema. Following Trino's consumption.
*
* https://github.com/trinodb/trino/blob/master/plugin/trino-tpcds/src/main/java/io/trino/plugin/tpcds/TpcdsRecordSet.java#L133-L145
*/
fun Table.mappers(): Array<Mapper<*>> = columns.map {
when (it.type.base) {
ColumnType.Base.INTEGER -> ::toInt
ColumnType.Base.DECIMAL -> ::toDecimal
ColumnType.Base.IDENTIFIER -> ::identity
ColumnType.Base.VARCHAR -> ::identity
ColumnType.Base.CHAR -> ::identity
ColumnType.Base.TIME -> ::toTime
ColumnType.Base.DATE -> ::toDate
null -> error("unreachable")
}
}.toTypedArray()

fun toInt(s: String): Int = s.toInt(10)

fun toDecimal(s: String): Double = s.toDouble() // should it be BigDecimal?

fun identity(s: String): String = s

fun toTime(s: String): LocalTime = LocalTime.parse(s)

fun toDate(s: String): LocalDate = LocalDate.parse(s)
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.partiql.lib.tpc.formats

import io.trino.tpcds.Results
import io.trino.tpcds.Table
import org.partiql.lib.tpc.Format
import org.partiql.lib.tpc.formats.csv.CsvResultSetWriter
import org.partiql.lib.tpc.formats.ion.IonResultSetWriter
import org.partiql.lib.tpc.formats.parquet.ParquetResultSetWriter
import java.nio.file.Path

/**
* Simple interface for writing [Results].
*/
interface ResultSetWriter : AutoCloseable {

/**
* Open any resources for writing results
*
*/
fun open(table: Table)

/**
* Write a result set
*/
fun write(results: Results)

/**
* Close any resources
*/
override fun close()
}

object ResultSetWriterFactory {

fun create(table: Table, format: Format, output: Path) = when (format) {
Format.ION -> IonResultSetWriter(output)
Format.CSV -> CsvResultSetWriter(output)
Format.PARQUET -> ParquetResultSetWriter(output)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.partiql.lib.tpc.formats.csv

import io.trino.tpcds.Results
import io.trino.tpcds.Table
import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVPrinter
import org.partiql.lib.tpc.Mapper
import org.partiql.lib.tpc.formats.ResultSetWriter
import org.partiql.lib.tpc.mappers
import java.io.FileOutputStream
import java.io.OutputStreamWriter
import java.nio.file.Path

class CsvResultSetWriter(
private val output: Path,
) : ResultSetWriter {

private lateinit var printer: CSVPrinter
private lateinit var mappers: Array<Mapper<*>>

override fun open(table: Table) {
// Initialize output appendable
val tableName = table.name.lowercase()
val path = output.resolve(tableName).toString() + ".csv"
val file = FileOutputStream(path)
val out = OutputStreamWriter(file)
// Builder mapper closures
printer = CSVPrinter(out, CSVFormat.DEFAULT)
mappers = table.mappers()
}

override fun write(results: Results) {
// Process each record
try {
for (rows in results) {
// skip silently; make a note??
if (rows == null) continue
for (row in rows) {
// skip silently; make a note??
if (row == null) continue
// map values
val r = row.mapIndexed { i, s ->
when (s) {
null -> "NULL"
else -> mappers[i](s)
}
}
printer.printRecord(r)
}
}
} catch (ex: NullPointerException) {
// NPE at io.trino.tpcds.Results$ResultsIterator.<init>(Results.java:88)
// As this is for rapid prototyping, I'll catch and do nothing
}
}

override fun close() {
printer.close()
}
}
Loading
Loading