Skip to content

Commit

Permalink
[SPARK-45827][SQL] Add Variant data type in Spark
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This PR adds Variant data type in Spark. It doesn't actually introduce any binary encoding, but just has the `value` and `metadata` binaries.

This PR includes:
- The in-memory Variant representation in different types of Spark rows. All rows except `UnsafeRow` use the `VariantVal` object to store an Variant value. In the `UnsafeRow`, the two binaries are stored contiguously.
- Spark parquet writer and reader support for the Variant type. This is agnostic to the detailed binary encoding but just transparently reads the two binaries.
- A dummy Spark `parse_json` implementation so that I can manually test the writer and reader. It currently returns an `VariantVal` with value being the raw bytes of the input string and empty metadata. This is **not** a valid Variant value in the final binary encoding.

## How was this patch tested?

Manual testing. Some supported usages:

```
> sql("create table T using parquet as select parse_json('1') as o")
> sql("select * from T").show
+---+
|  o|
+---+
|  1|
+---+
> sql("insert into T select parse_json('[2]') as o")
> sql("select * from T").show
+---+
|  o|
+---+
|[2]|
|  1|
+---+
```

Closes apache#43707 from chenhao-db/variant-type.

Authored-by: Chenhao Li <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
chenhao-db authored and HyukjinKwon committed Nov 14, 2023
1 parent cd19d6c commit aa10ac7
Show file tree
Hide file tree
Showing 56 changed files with 545 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.unsafe.types;

import org.apache.spark.unsafe.Platform;

import java.io.Serializable;
import java.util.Arrays;

/**
* The physical data representation of {@link org.apache.spark.sql.types.VariantType} that
* represents a semi-structured value. It consists of two binary values: {@link VariantVal#value}
* and {@link VariantVal#metadata}. The value encodes types and values, but not field names. The
* metadata currently contains a version flag and a list of field names. We can extend/modify the
* detailed binary format given the version flag.
* <p>
* A {@link VariantVal} can be produced by casting another value into the Variant type or parsing a
* JSON string in the {@link org.apache.spark.sql.catalyst.expressions.variant.ParseJson}
* expression. We can extract a path consisting of field names and array indices from it, cast it
* into a concrete data type, or rebuild a JSON string from it.
* <p>
* The storage layout of this class in {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow}
* and {@link org.apache.spark.sql.catalyst.expressions.UnsafeArrayData} is: the fixed-size part is
* a long value "offsetAndSize". The upper 32 bits is the offset that points to the start position
* of the actual binary content. The lower 32 bits is the total length of the binary content. The
* binary content contains: 4 bytes representing the length of {@link VariantVal#value}, content of
* {@link VariantVal#value}, content of {@link VariantVal#metadata}. This is an internal and
* transient format and can be modified at any time.
*/
public class VariantVal implements Serializable {
protected final byte[] value;
protected final byte[] metadata;

public VariantVal(byte[] value, byte[] metadata) {
this.value = value;
this.metadata = metadata;
}

public byte[] getValue() {
return value;
}

public byte[] getMetadata() {
return metadata;
}

/**
* This function reads the binary content described in `writeIntoUnsafeRow` from `baseObject`. The
* offset is computed by adding the offset in {@code offsetAndSize} and {@code baseOffset}.
*/
public static VariantVal readFromUnsafeRow(
long offsetAndSize,
Object baseObject,
long baseOffset) {
// offset and totalSize is the upper/lower 32 bits in offsetAndSize.
int offset = (int) (offsetAndSize >> 32);
int totalSize = (int) offsetAndSize;
int valueSize = Platform.getInt(baseObject, baseOffset + offset);
int metadataSize = totalSize - 4 - valueSize;
byte[] value = new byte[valueSize];
byte[] metadata = new byte[metadataSize];
Platform.copyMemory(
baseObject,
baseOffset + offset + 4,
value,
Platform.BYTE_ARRAY_OFFSET,
valueSize
);
Platform.copyMemory(
baseObject,
baseOffset + offset + 4 + valueSize,
metadata,
Platform.BYTE_ARRAY_OFFSET,
metadataSize
);
return new VariantVal(value, metadata);
}

public String debugString() {
return "VariantVal{" +
"value=" + Arrays.toString(value) +
", metadata=" + Arrays.toString(metadata) +
'}';
}

/**
* @return A human-readable representation of the Variant value. It is always a JSON string at
* this moment.
*/
@Override
public String toString() {
// NOTE: the encoding is not yet implemented, this is not the final implementation.
return new String(value);
}
}
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@
],
"sqlState" : "58030"
},
"CANNOT_SAVE_VARIANT" : {
"message" : [
"Cannot save variant data type into external storage."
],
"sqlState" : "0A000"
},
"CANNOT_UPDATE_FIELD" : {
"message" : [
"Cannot update <table> field <fieldName> type:"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ private[sql] object AvroUtils extends Logging {
}

def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: VariantType => false

case _: AtomicType => true

case st: StructType => st.forall { f => supportsDataType(f.dataType) }
Expand Down
6 changes: 6 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,12 @@ SQLSTATE: 58030

Failed to set permissions on created path `<path>` back to `<permission>`.

### CANNOT_SAVE_VARIANT

[SQLSTATE: 0A000](sql-error-conditions-sqlstates.html#class-0A-feature-not-supported)

Cannot save variant data type into external storage.

### [CANNOT_UPDATE_FIELD](sql-error-conditions-cannot-update-field-error-class.html)

[SQLSTATE: 0A000](sql-error-conditions-sqlstates.html#class-0A-feature-not-supported)
Expand Down
1 change: 1 addition & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ Below is a list of all the keywords in Spark SQL.
|VARCHAR|non-reserved|non-reserved|reserved|
|VAR|non-reserved|non-reserved|non-reserved|
|VARIABLE|non-reserved|non-reserved|non-reserved|
|VARIANT|non-reserved|non-reserved|reserved|
|VERSION|non-reserved|non-reserved|non-reserved|
|VIEW|non-reserved|non-reserved|non-reserved|
|VIEWS|non-reserved|non-reserved|non-reserved|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ VALUES: 'VALUES';
VARCHAR: 'VARCHAR';
VAR: 'VAR';
VARIABLE: 'VARIABLE';
VARIANT: 'VARIANT';
VERSION: 'VERSION';
VIEW: 'VIEW';
VIEWS: 'VIEWS';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,7 @@ type
| DECIMAL | DEC | NUMERIC
| VOID
| INTERVAL
| VARIANT
| ARRAY | STRUCT | MAP
| unsupportedType=identifier
;
Expand Down Expand Up @@ -1545,6 +1546,7 @@ ansiNonReserved
| VARCHAR
| VAR
| VARIABLE
| VARIANT
| VERSION
| VIEW
| VIEWS
Expand Down Expand Up @@ -1893,6 +1895,7 @@ nonReserved
| VARCHAR
| VAR
| VARIABLE
| VARIANT
| VERSION
| VIEW
| VIEWS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.reflect.{classTag, ClassTag}

import org.apache.spark.sql.{Encoder, Row}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.unsafe.types.{CalendarInterval, VariantVal}
import org.apache.spark.util.SparkClassUtils

/**
Expand Down Expand Up @@ -216,6 +216,7 @@ object AgnosticEncoders {
case object CalendarIntervalEncoder extends LeafEncoder[CalendarInterval](CalendarIntervalType)
case object DayTimeIntervalEncoder extends LeafEncoder[Duration](DayTimeIntervalType())
case object YearMonthIntervalEncoder extends LeafEncoder[Period](YearMonthIntervalType())
case object VariantEncoder extends LeafEncoder[VariantVal](VariantType)
case class DateEncoder(override val lenientSerialization: Boolean)
extends LeafEncoder[jsql.Date](DateType)
case class LocalDateEncoder(override val lenientSerialization: Boolean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable
import scala.reflect.classTag

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, InstantEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, TimestampEncoder, UDTEncoder, YearMonthIntervalEncoder}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, InstantEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, TimestampEncoder, UDTEncoder, VariantEncoder, YearMonthIntervalEncoder}
import org.apache.spark.sql.errors.ExecutionErrors
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -90,6 +90,7 @@ object RowEncoder {
case CalendarIntervalType => CalendarIntervalEncoder
case _: DayTimeIntervalType => DayTimeIntervalEncoder
case _: YearMonthIntervalType => YearMonthIntervalEncoder
case _: VariantType => VariantEncoder
case p: PythonUserDefinedType =>
// TODO check if this works.
encoderForDataType(p.sqlType, lenient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.util.SparkParserUtils.{string, withOrigin}
import org.apache.spark.sql.errors.QueryParsingErrors
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, CharType, DataType, DateType, DayTimeIntervalType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, MetadataBuilder, NullType, ShortType, StringType, StructField, StructType, TimestampNTZType, TimestampType, VarcharType, YearMonthIntervalType}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, CharType, DataType, DateType, DayTimeIntervalType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, MetadataBuilder, NullType, ShortType, StringType, StructField, StructType, TimestampNTZType, TimestampType, VarcharType, VariantType, YearMonthIntervalType}

class DataTypeAstBuilder extends SqlBaseParserBaseVisitor[AnyRef] {
protected def typedVisit[T](ctx: ParseTree): T = {
Expand Down Expand Up @@ -82,6 +82,7 @@ class DataTypeAstBuilder extends SqlBaseParserBaseVisitor[AnyRef] {
DecimalType(precision.getText.toInt, scale.getText.toInt)
case (VOID, Nil) => NullType
case (INTERVAL, Nil) => CalendarIntervalType
case (VARIANT, Nil) => VariantType
case (CHARACTER | CHAR | VARCHAR, Nil) =>
throw QueryParsingErrors.charTypeMissingLengthError(ctx.`type`.getText, ctx)
case (ARRAY | STRUCT | MAP, Nil) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ object DataType {
YearMonthIntervalType(YEAR),
YearMonthIntervalType(MONTH),
YearMonthIntervalType(YEAR, MONTH),
TimestampNTZType)
TimestampNTZType,
VariantType)
.map(t => t.typeName -> t).toMap
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types

import org.apache.spark.annotation.Unstable

/**
* The data type representing semi-structured values with arbitrary hierarchical data structures. It
* is intended to store parsed JSON values and most other data types in the system (e.g., it cannot
* store a map with a non-string key type).
*
* @since 4.0.0
*/
@Unstable
class VariantType private () extends AtomicType {
// The default size is used in query planning to drive optimization decisions. 2048 is arbitrarily
// picked and we currently don't have any data to support it. This may need revisiting later.
override def defaultSize: Int = 2048

/** This is a no-op because values with VARIANT type are always nullable. */
private[spark] override def asNullable: VariantType = this
}

/**
* @since 4.0.0
*/
@Unstable
case object VariantType extends VariantType
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class ExpressionInfo {
"collection_funcs", "predicate_funcs", "conditional_funcs", "conversion_funcs",
"csv_funcs", "datetime_funcs", "generator_funcs", "hash_funcs", "json_funcs",
"lambda_funcs", "map_funcs", "math_funcs", "misc_funcs", "string_funcs", "struct_funcs",
"window_funcs", "xml_funcs", "table_funcs", "url_funcs"));
"window_funcs", "xml_funcs", "table_funcs", "url_funcs", "variant_funcs"));

private static final Set<String> validSources =
new HashSet<>(Arrays.asList("built-in", "hive", "python_udf", "scala_udf",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;

public interface SpecializedGetters {

Expand Down Expand Up @@ -51,6 +52,8 @@ public interface SpecializedGetters {

CalendarInterval getInterval(int ordinal);

VariantVal getVariant(int ordinal);

InternalRow getStruct(int ordinal, int numFields);

ArrayData getArray(int ordinal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public static Object read(
if (physicalDataType instanceof PhysicalBinaryType) {
return obj.getBinary(ordinal);
}
if (physicalDataType instanceof PhysicalVariantType) {
return obj.getVariant(ordinal);
}
if (physicalDataType instanceof PhysicalStructType) {
return obj.getStruct(ordinal, ((PhysicalStructType) physicalDataType).fields().length);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;

import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;

Expand Down Expand Up @@ -231,6 +232,12 @@ public CalendarInterval getInterval(int ordinal) {
return new CalendarInterval(months, days, microseconds);
}

@Override
public VariantVal getVariant(int ordinal) {
if (isNullAt(ordinal)) return null;
return VariantVal.readFromUnsafeRow(getLong(ordinal), baseObject, baseOffset);
}

@Override
public UnsafeRow getStruct(int ordinal, int numFields) {
if (isNullAt(ordinal)) return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;

import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;

Expand Down Expand Up @@ -417,6 +418,12 @@ public CalendarInterval getInterval(int ordinal) {
}
}

@Override
public VariantVal getVariant(int ordinal) {
if (isNullAt(ordinal)) return null;
return VariantVal.readFromUnsafeRow(getLong(ordinal), baseObject, baseOffset);
}

@Override
public UnsafeRow getStruct(int ordinal, int numFields) {
if (isNullAt(ordinal)) {
Expand Down
Loading

0 comments on commit aa10ac7

Please sign in to comment.