Skip to content

Latest commit

 

History

History
1286 lines (1128 loc) · 54.2 KB

README.md

File metadata and controls

1286 lines (1128 loc) · 54.2 KB

Apache Spark SQL connector for Google BigQuery

The connector supports reading Google BigQuery tables into Spark's DataFrames, and writing DataFrames back into BigQuery. This is done by using the Spark SQL Data Source API to communicate with BigQuery.

BigQuery Storage API

The Storage API streams data in parallel directly from BigQuery via gRPC without using Google Cloud Storage as an intermediary.

It has a number of advantages over using the previous export-based read flow that should generally lead to better read performance:

Direct Streaming

It does not leave any temporary files in Google Cloud Storage. Rows are read directly from BigQuery servers using the Arrow or Avro wire formats.

Filtering

The new API allows column and predicate filtering to only read the data you are interested in.

Column Filtering

Since BigQuery is backed by a columnar datastore, it can efficiently stream data without reading all columns.

Predicate Filtering

The Storage API supports arbitrary pushdown of predicate filters. Connector version 0.8.0-beta and above support pushdown of arbitrary filters to Bigquery.

There is a known issue in Spark that does not allow pushdown of filters on nested fields. For example - filters like address.city = "Sunnyvale" will not get pushdown to Bigquery.

Dynamic Sharding

The API rebalances records between readers until they all complete. This means that all Map phases will finish nearly concurrently. See this blog article on how dynamic sharding is similarly used in Google Cloud Dataflow.

See Configuring Partitioning for more details.

Requirements

Enable the BigQuery Storage API

Follow these instructions.

Create a Google Cloud Dataproc cluster (Optional)

If you do not have an Apache Spark environment you can create a Cloud Dataproc cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can use spark-submit on any cluster.

Any Dataproc cluster using the API needs the 'bigquery' or 'cloud-platform' scopes. Dataproc clusters have the 'bigquery' scope by default, so most clusters in enabled projects should work by default e.g.

MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"

Downloading and Using the Connector

The latest version of the connector is publicly available in the following links:

version Link
Spark 3.3 gs://spark-lib/bigquery/spark-3.3-bigquery-0.30.0.jar(HTTP link)
Spark 3.2 gs://spark-lib/bigquery/spark-3.2-bigquery-0.30.0.jar(HTTP link)
Spark 3.1 gs://spark-lib/bigquery/spark-3.1-bigquery-0.30.0.jar(HTTP link)
Spark 2.4 gs://spark-lib/bigquery/spark-2.4-bigquery-0.30.0.jar(HTTP link)
Scala 2.13 gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.30.0.jar (HTTP link)
Scala 2.12 gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.30.0.jar (HTTP link)
Scala 2.11 gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar (HTTP link)

The first four versions are Java based connectors targeting Spark 2.4/3.1/3.2/3.3 of all Scala versions built on the new Data Source APIs (Data Source API v2) of Spark.

The final two connectors are Scala based connectors, please use the jar relevant to your Spark installation as outlined below.

Connector to Spark Compatibility Matrix

Connector \ Spark 2.3 2.4
(Scala 2.11)
2.4
(Scala 2.12)
3.0 3.1 3.2 3.3
spark-3.3-bigquery
spark-3.2-bigquery
spark-3.1-bigquery
spark-2.4-bigquery
spark-bigquery-with-dependencies_2.13
spark-bigquery-with-dependencies_2.12
spark-bigquery-with-dependencies_2.11

Connector to Dataproc Image Compatibility Matrix

Connector \ Dataproc Image 1.3 1.4 1.5 2.0 2.1 Serverless
Image 1.0
Serverless
Image 2.0
spark-3.3-bigquery
spark-3.2-bigquery
spark-3.1-bigquery
spark-2.4-bigquery
spark-bigquery-with-dependencies_2.13
spark-bigquery-with-dependencies_2.12
spark-bigquery-with-dependencies_2.11

Maven / Ivy Package Usage

The connector is also available from the Maven Central repository. It can be used using the --packages option or the spark.jars.packages configuration property. Use the following value

version Connector Artifact
Spark 3.3 com.google.cloud.spark:spark-3.3-bigquery:0.30.0
Spark 3.2 com.google.cloud.spark:spark-3.2-bigquery:0.30.0
Spark 3.1 com.google.cloud.spark:spark-3.1-bigquery:0.30.0
Spark 2.4 com.google.cloud.spark:spark-2.4-bigquery:0.30.0
Scala 2.13 com.google.cloud.spark:spark-bigquery-with-dependencies_2.13:0.30.0
Scala 2.12 com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.30.0
Scala 2.11 com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.29.0

Hello World Example

You can run a simple PySpark wordcount against the API without compilation by running

Dataproc image 1.5 and above

gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER" \
  --jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.30.0.jar \
  examples/python/shakespeare.py

Dataproc image 1.4 and below

gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER" \
  --jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar \
  examples/python/shakespeare.py

Example Codelab

https://codelabs.developers.google.com/codelabs/pyspark-bigquery

Usage

The connector uses the cross language Spark SQL Data Source API:

Reading data from a BigQuery table

df = spark.read \
  .format("bigquery") \
  .load("bigquery-public-data.samples.shakespeare")

or the Scala only implicit API:

import com.google.cloud.spark.bigquery._
val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")

For more information, see additional code samples in Python, Scala and Java.

Reading data from a BigQuery query

The connector allows you to run any Standard SQL SELECT query on BigQuery and fetch its results directly to a Spark Dataframe. This is easily done as described in the following code sample:

spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")

sql = """
  SELECT tag, COUNT(*) c
  FROM (
    SELECT SPLIT(tags, '|') tags
    FROM `bigquery-public-data.stackoverflow.posts_questions` a
    WHERE EXTRACT(YEAR FROM creation_date)>=2014
  ), UNNEST(tags) tag
  GROUP BY 1
  ORDER BY 2 DESC
  LIMIT 10
  """
df = spark.read.format("bigquery").load(sql)
df.show()

Which yields the result

+----------+-------+
|       tag|      c|
+----------+-------+
|javascript|1643617|
|    python|1352904|
|      java|1218220|
|   android| 913638|
|       php| 911806|
|        c#| 905331|
|      html| 769499|
|    jquery| 608071|
|       css| 510343|
|       c++| 458938|
+----------+-------+

A second option is to use the query option like this:

df = spark.read.format("bigquery").option("query", sql).load()

Notice that the execution should be faster as only the result is transmitted over the wire. In a similar fashion the queries can include JOINs more efficiently then running joins on Spark or use other BigQuery features such as subqueries, BigQuery user defined functions, wildcard tables, BigQuery ML and more.

In order to use this feature the following configurations MUST be set:

  • viewsEnabled must be set to true.
  • materializationDataset must be set to a dataset where the GCP user has table creation permission. materializationProject is optional.

Note: As mentioned in the BigQuery documentation, the queried tables must be in the same location as the materializationDataset. Also, if the tables in the SQL statement are from projects other than the parentProject then use the fully qualified table name i.e. [project].[dataset].[table].

Important: This feature is implemented by running the query on BigQuery and saving the result into a temporary table, of which Spark will read the results from. This may add additional costs on your BigQuery account.

Reading From Views

The connector has a preliminary support for reading from BigQuery views. Please note there are a few caveats:

  • BigQuery views are not materialized by default, which means that the connector needs to materialize them before it can read them. This process affects the read performance, even before running any collect() or count() action.
  • The materialization process can also incur additional costs to your BigQuery bill.
  • By default, the materialized views are created in the same project and dataset. Those can be configured by the optional materializationProject and materializationDataset options, respectively. These options can also be globally set by calling spark.conf.set(...) before reading the views.
  • Reading from views is disabled by default. In order to enable it, either set the viewsEnabled option when reading the specific view (.option("viewsEnabled", "true")) or set it globally by calling spark.conf.set("viewsEnabled", "true").
  • As mentioned in the BigQuery documentation, the materializationDataset should be in same location as the view.

Writing data to BigQuery

Writing DataFrames to BigQuery can be done using two methods: Direct and Indirect.

Direct write using the BigQuery Storage Write API

In this method the data is written directly to BigQuery using the BigQuery Storage Write API. In order to enable this option, please set the writeMethod option to direct, as shown below:

df.write \
  .format("bigquery") \
  .option("writeMethod", "direct") \
  .save("dataset.table")

Writing to existing partitioned tables (date partitioned, ingestion time partitioned and range partitioned) in APPEND save mode is fully supported by the connector and the BigQuery Storage Write API. Partition overwrite and the use of datePartition, partitionField and partitionType as described below is not supported at this moment by the direct write method.

Important: Please refer to the data ingestion pricing page regarding the BigQuery Storage Write API pricing.

Important: Please use version 0.24.2 and above for direct writes, as previous versions have a bug that may cause a table deletion in certain cases.

Indirect write

In this method the data is written first to GCS, and then it is loaded it to BigQuery. A GCS bucket must be configured to indicate the temporary data location.

df.write \
  .format("bigquery") \
  .option("temporaryGcsBucket","some-bucket") \
  .save("dataset.table")

The data is temporarily stored using the Apache Parquet, Apache ORC or Apache Avro formats.

The GCS bucket and the format can also be set globally using Spark's RuntimeConfig like this:

spark.conf.set("temporaryGcsBucket","some-bucket")
df.write \
  .format("bigquery") \
  .save("dataset.table")

When streaming a DataFrame to BigQuery, each batch is written in the same manner as a non-streaming DataFrame. Note that a HDFS compatible checkpoint location (eg: path/to/HDFS/dir or gs://checkpoint-bucket/checkpointDir) must be specified.

df.writeStream \
  .format("bigquery") \
  .option("temporaryGcsBucket","some-bucket") \
  .option("checkpointLocation", "some-location") \
  .option("table", "dataset.table")

Important: The connector does not configure the GCS connector, in order to avoid conflict with another GCS connector, if exists. In order to use the write capabilities of the connector, please configure the GCS connector on your cluster as explained here.

Properties

The API Supports a number of options to configure the read

Property Meaning Usage
table The BigQuery table in the format [[project:]dataset.]table. It is recommended to use the path parameter of load()/save() instead. This option has been deprecated and will be removed in a future version.
(Deprecated)
Read/Write
dataset The dataset containing the table. This option should be used with standard table and views, but not when loading query results.
(Optional unless omitted in table)
Read/Write
project The Google Cloud Project ID of the table. This option should be used with standard table and views, but not when loading query results.
(Optional. Defaults to the project of the Service Account being used)
Read/Write
parentProject The Google Cloud Project ID of the table to bill for the export.
(Optional. Defaults to the project of the Service Account being used)
Read/Write
maxParallelism The maximal number of partitions to split the data into. Actual number may be less if BigQuery deems the data small enough. If there are not enough executors to schedule a reader per partition, some partitions may be empty.
Important: The old parameter (parallelism) is still supported but in deprecated mode. It will ve removed in version 1.0 of the connector.
(Optional. Defaults to the larger of the preferredMinParallelism and 20,000).)
Read
preferredMinParallelism The preferred minimal number of partitions to split the data into. Actual number may be less if BigQuery deems the data small enough. If there are not enough executors to schedule a reader per partition, some partitions may be empty.
(Optional. Defaults to 3 times the application's default parallelism).)
Read
viewsEnabled Enables the connector to read from views and not only tables. Please read the relevant section before activating this option.
(Optional. Defaults to false)
Read
materializationProject The project id where the materialized view is going to be created
(Optional. Defaults to view's project id)
Read
materializationDataset The dataset where the materialized view is going to be created. This dataset should be in same location as the view or the queried tables.
(Optional. Defaults to view's dataset)
Read
materializationExpirationTimeInMinutes The expiration time of the temporary table holding the materialized data of a view or a query, in minutes. Notice that the connector may re-use the temporary table due to the use of local cache and in order to reduce BigQuery computation, so very low values may cause errors. The value must be a positive integer.
(Optional. Defaults to 1440, or 24 hours)
Read
readDataFormat Data Format for reading from BigQuery. Options : ARROW, AVRO Unsupported Arrow filters are not pushed down and results are filtered later by Spark. (Currently Arrow does not suport disjunction across columns).
(Optional. Defaults to ARROW)
Read
optimizedEmptyProjection The connector uses an optimized empty projection (select without any columns) logic, used for count() execution. This logic takes the data directly from the table metadata or performs a much efficient `SELECT COUNT(*) WHERE...` in case there is a filter. You can cancel the use of this logic by setting this option to false.
(Optional, defaults to true)
Read
pushAllFilters If set to true, the connector pushes all the filters Spark can delegate to BigQuery Storage API. This reduces amount of data that needs to be sent from BigQuery Storage API servers to Spark clients.
(Optional, defaults to true)
Read
bigQueryJobLabel Can be used to add labels to the connector initiated query and load BigQuery jobs. Multiple labels can be set.
(Optional)
Read
bigQueryTableLabel Can be used to add labels to the table while writing to a table. Multiple labels can be set.
(Optional)
Write
traceApplicationName Application name used to trace BigQuery Storage read and write sessions. Setting the application name is required to set the trace ID on the sessions.
(Optional)
Read
traceJobId Job ID used to trace BigQuery Storage read and write sessions.
(Optional, defaults to the Dataproc job ID is exists, otherwise uses the Spark application ID)
Read
createDisposition Specifies whether the job is allowed to create new tables. The permitted values are:
  • CREATE_IF_NEEDED - Configures the job to create the table if it does not exist.
  • CREATE_NEVER - Configures the job to fail if the table does not exist.
This option takes place only in case Spark has decided to write data to the table based on the SaveMode.
(Optional. Default to CREATE_IF_NEEDED).
Write
writeMethod Controls the method in which the data is written to BigQuery. Available values are direct to use the BigQuery Storage Write API and indirect which writes the data first to GCS and then triggers a BigQuery load operation. See more here
(Optional, defaults to indirect)
Write
temporaryGcsBucket The GCS bucket that temporarily holds the data before it is loaded to BigQuery. Required unless set in the Spark configuration (spark.conf.set(...)).
Not supported by the `DIRECT` write method.
Write
persistentGcsBucket The GCS bucket that holds the data before it is loaded to BigQuery. If informed, the data won't be deleted after write data into BigQuery.
Not supported by the `DIRECT` write method.
Write
persistentGcsPath The GCS path that holds the data before it is loaded to BigQuery. Used only with persistentGcsBucket.
Not supported by the `DIRECT` write method.
Write
intermediateFormat The format of the data before it is loaded to BigQuery, values can be either "parquet","orc" or "avro". In order to use the Avro format, the spark-avro package must be added in runtime.
(Optional. Defaults to parquet). On write only.
Supported only by the `spark-bigquery-with-dependencies_2.XX` connectors and just for the `INDIRECT` write method. The `spark-X.Y-bigquery` connectors use only AVRO as an intermediate format.
Write
useAvroLogicalTypes When loading from Avro (`.option("intermediateFormat", "avro")`), BigQuery uses the underlying Avro types instead of the logical types [by default](https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types). Supplying this option converts Avro logical types to their corresponding BigQuery data types.
(Optional. Defaults to false). On write only.
Write
datePartition The date partition the data is going to be written to. Should be a date string given in the format YYYYMMDD. Can be used to overwrite the data of a single partition, like this:
df.write.format("bigquery")
  .option("datePartition", "20220331")
  .mode("overwrite")
  .save("table")

(Optional). On write only.
Can also be used with different partition types like:
HOUR: YYYYMMDDHH
MONTH: YYYYMM
YEAR: YYYY
Not supported by the `DIRECT` write method.
Write
partitionField If field is specified together with `partitionType`, the table is partitioned by this field. The field must be a top-level TIMESTAMP or DATE field. Its mode must be NULLABLE or REQUIRED. If the option is not set for a partitioned table, then the table will be partitioned by pseudo column, referenced via either'_PARTITIONTIME' as TIMESTAMP type, or '_PARTITIONDATE' as DATE type.
(Optional).
Not supported by the `DIRECT` write method.
Write
partitionExpirationMs Number of milliseconds for which to keep the storage for partitions in the table. The storage in a partition will have an expiration time of its partition time plus this value.
(Optional).
Not supported by the `DIRECT` write method.
Write
partitionType Supported types are: HOUR, DAY, MONTH, YEAR
This option is mandatory for a target table to be partitioned.
(Optional. Defaults to DAY if PartitionField is specified).
Not supported by the `DIRECT` write method.
Write
clusteredFields A string of non-repeated, top level columns seperated by comma.
(Optional).
Write
allowFieldAddition Adds the ALLOW_FIELD_ADDITION SchemaUpdateOption to the BigQuery LoadJob. Allowed values are true and false.
(Optional. Default to false).
Write
allowFieldRelaxation Adds the ALLOW_FIELD_RELAXATION SchemaUpdateOption to the BigQuery LoadJob. Allowed values are true and false.
(Optional. Default to false).
Write
proxyAddress Address of the proxy server. The proxy must be a HTTP proxy and address should be in the `host:port` format. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.address).
(Optional. Required only if connecting to GCP via proxy.)
Read/Write
proxyUsername The userName used to connect to the proxy. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.username).
(Optional. Required only if connecting to GCP via proxy with authentication.)
Read/Write
proxyPassword The password used to connect to the proxy. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.password).
(Optional. Required only if connecting to GCP via proxy with authentication.)
Read/Write
httpMaxRetry The maximum number of retries for the low-level HTTP requests to BigQuery. Can be alternatively set in the Spark configuration (spark.conf.set("httpMaxRetry", ...)) or in Hadoop Configuration (fs.gs.http.max.retry).
(Optional. Default is 10)
Read/Write
httpConnectTimeout The timeout in milliseconds to establish a connection with BigQuery. Can be alternatively set in the Spark configuration (spark.conf.set("httpConnectTimeout", ...)) or in Hadoop Configuration (fs.gs.http.connect-timeout).
(Optional. Default is 60000 ms. 0 for an infinite timeout, a negative number for 20000)
Read/Write
httpReadTimeout The timeout in milliseconds to read data from an established connection. Can be alternatively set in the Spark configuration (spark.conf.set("httpReadTimeout", ...)) or in Hadoop Configuration (fs.gs.http.read-timeout).
(Optional. Default is 60000 ms. 0 for an infinite timeout, a negative number for 20000)
Read
arrowCompressionCodec Compression codec while reading from a BigQuery table when using Arrow format. Options : ZSTD (Zstandard compression), LZ4_FRAME (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md), COMPRESSION_UNSPECIFIED. The recommended compression codec is ZSTD while using Java.
(Optional. Defaults to COMPRESSION_UNSPECIFIED which means no compression will be used)
Read
cacheExpirationTimeInMinutes The expiration time of the in-memory cache storing query information.
To disable caching, set the value to 0.
(Optional. Defaults to 15 minutes)
Read
enableModeCheckForSchemaFields Checks the mode of every field in destination schema to be equal to the mode in corresponding source field schema, during DIRECT write.
Default value is true i.e., the check is done by default. If set to false the mode check is ignored.
Write
enableListInference Indicates whether to use schema inference specifically when the mode is Parquet (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions).
Defaults to false.
Write
createReadSessionTimeoutInSeconds The timeout in seconds to create a ReadSession when reading a table.
For Extremely large table this value should be increased.
(Optional. Defaults to 600 seconds)
Read
datetimeZoneId The time zone ID used to convert BigQuery's DATETIME into Spark's Timestamp, and vice versa.
The value should be a legal time zone name, that appears is accepted by Java's java.time.ZoneId. The full list can be seen by running java.time.ZoneId.getAvailableZoneIds() in Java/Scala, or sc._jvm.java.time.ZoneId.getAvailableZoneIds() in pyspark.
(Optional. Defaults to UTC)
Read/Write
queryJobPriority Priority levels set for the job while reading data from BigQuery query. The permitted values are:
  • BATCH - Query is queued and started as soon as idle resources are available, usually within a few minutes. If the query hasn't started within 3 hours, its priority is changed to INTERACTIVE.
  • INTERACTIVE - Query is executed as soon as possible and count towards the concurrent rate limit and the daily rate limit.
For WRITE, this option will be effective when DIRECT write is used with OVERWRITE mode, where the connector overwrites the destination table using MERGE statement.
(Optional. Defaults to INTERACTIVE)
Read/Write

Options can also be set outside of the code, using the --conf parameter of spark-submit or --properties parameter of the gcloud dataproc submit spark. In order to use this, prepend the prefix spark.datasource.bigquery. to any of the options, for example spark.conf.set("temporaryGcsBucket", "some-bucket") can also be set as --conf spark.datasource.bigquery.temporaryGcsBucket=some-bucket.

Data types

With the exception of DATETIME and TIME all BigQuery data types directed map into the corresponding Spark SQL data type. Here are all of the mappings:

BigQuery Standard SQL Data Type Spark SQL

Data Type

Notes
BOOL BooleanType
INT64 LongType
FLOAT64 DoubleType
NUMERIC DecimalType This preserves NUMERIC's full 38 digits of precision and 9 digits of scope.
BIGNUMERIC BigNumericUDT (UserDefinedType) Scala/Java: BigNumericUDT DataType internally uses java.math.BigDecimal to hold the BigNumeric data.

Python: BigNumericUDT DataType internally used python's Decimal class to hold the BigNumeric data.

STRING StringType
BYTES BinaryType
STRUCT StructType
ARRAY ArrayType
TIMESTAMP TimestampType
DATE DateType
DATETIME TimestampType Spark has no DATETIME type. The value is casted as a local time in the `datetimeZoneId` time zone.
TIME LongType Spark has no TIME type. The generated longs, which indicate microseconds since midnight can be safely cast to TimestampType, but this causes the date to be inferred as the current day. Thus times are left as longs and user can cast if they like.

When casting to Timestamp TIME have the same TimeZone issues as DATETIME

JSON StringType Spark has no JSON type. The values are read as String. In order to write JSON back to BigQuery, the following conditions are REQUIRED:
  • Use the INDIRECT write method
  • Use the AVRO intermediate format
  • The DataFrame field MUST be of type String and has an entry of sqlType=JSON in its metadata
ARRAY<STRUCT<key,value>> MapType BigQuery has no MAP type, therefore similar to other conversions like Apache Avro and BigQuery Load jobs, the connector converts a Spark Map to a REPEATED STRUCT<key,value>. This means that while writing and reading of maps is available, running a SQL on BigQuery that uses map semantics is not supported. To refer to the map's values using BigQuery SQL, please check the BigQuery documentation. Due to these incompatibilities, a few restrictions apply:
  • Keys can be Strings only
  • Values can be simple types (not structs)
  • For INDIRECT write, use the AVRO intermediate format. DIRECT write is supported as well

Spark ML Data Types Support

The Spark ML Vector and Matrix are supported, including their dense and sparse versions. The data is saved as a BigQuery RECORD. Notice that a suffix is added to the field's description which includes the spark type of the field.

In order to write those types to BigQuery, use the ORC or Avro intermediate format, and have them as column of the Row (i.e. not a field in a struct).

BigNumeric support

BigQuery's BigNumeric has a precision of 76.76 (the 77th digit is partial) and scale of 38. Since this precision and scale is beyond spark's DecimalType (38 scale and 38 precision) support, the BigNumeric DataType is converted into spark's UserDefinedType. The BigNumeric data can be accessed via BigNumericUDT DataType which internally uses java.math.BigDecimal to hold the BigNumeric data. The data can be read in either AVRO or ARROW formats.

In order to write BigNumericUDT to BigQuery, use either ORC or PARQUET intermediate formats (currently we do not support AVRO). Notice that the data gets written to BigQuery as String.

Code examples:

Scala:

import org.apache.spark.bigquery.BigNumeric

val df = spark.read
  .format("bigquery")
  .load("PROJECT.DATASET.TABLE")

val rows: Array[java.math.BigDecimal] = df
  .collect()
  .map(row => row.get("BIG_NUMERIC_COLUMN").asInstanceOf[BigNumeric].getNumber)

rows.foreach(value => System.out.println("BigNumeric value  " + value.toPlainString))

Python: Spark's UserDefinedType needs a separate implementation for Python. Corresponding python class(s) should be provided as config params while creating the job or added during runtime. See examples below:

  1. Adding python files while launching pyspark
# use appropriate version for jar depending on the scala version
pyspark --jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.30.0.jar
  --py-files gs://spark-lib/bigquery/spark-bigquery-support-0.30.0.zip
  --files gs://spark-lib/bigquery/spark-bigquery-support-0.30.0.zip
  1. Adding python files in Jupyter Notebook
from pyspark.sql import SparkSession
from pyspark import SparkFiles
# use appropriate version for jar depending on the scala version
spark = SparkSession.builder\
  .appName('BigNumeric')\
  .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.30.0.jar')\
  .config('spark.submit.pyFiles', 'gs://spark-lib/bigquery/spark-bigquery-support-0.30.0.zip')\
  .config('spark.files', 'gs://spark-lib/bigquery/spark-bigquery-support-0.30.0.zip')\
  .getOrCreate()

# extract the spark-bigquery-support zip file
import zipfile
with zipfile.ZipFile(SparkFiles.get("spark-bigquery-support-0.30.0.zip")) as zf:
  zf.extractall()
  1. Adding Python files during runtime
# use appropriate version for jar depending on the scala version
spark = SparkSession.builder\
  .appName('BigNumeric')\
  .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.30.0.jar')\
  .getOrCreate()

spark.sparkContext.addPyFile("gs://spark-lib/bigquery/spark-bigquery-support-0.30.0.zip")

Usage Example:

df = spark.read.format("bigquery").load({project}.{dataset}.{table_name})
data = df.select({big_numeric_column_name}).collect()

for row in data:
  bigNumeric = row[{big_numeric_column_name}]
  # bigNumeric.number is instance of python's Decimal class
  print(str(bigNumeric.number))

In case the above code throws ModuleNotFoundError, please add the following code before reading the BigNumeric data.

try:
    import pkg_resources

    pkg_resources.declare_namespace(__name__)
except ImportError:
    import pkgutil

    __path__ = pkgutil.extend_path(__path__, __name__)

Filtering

The connector automatically computes column and pushdown filters the DataFrame's SELECT statement e.g.

spark.read.bigquery("bigquery-public-data:samples.shakespeare")
  .select("word")
  .where("word = 'Hamlet' or word = 'Claudius'")
  .collect()

filters to the column word and pushed down the predicate filter word = 'hamlet' or word = 'Claudius'.

If you do not wish to make multiple read requests to BigQuery, you can cache the DataFrame before filtering e.g.:

val cachedDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare").cache()
val rows = cachedDF.select("word")
  .where("word = 'Hamlet'")
  .collect()
// All of the table was cached and this doesn't require an API call
val otherRows = cachedDF.select("word_count")
  .where("word = 'Romeo'")
  .collect()

You can also manually specify the filter option, which will override automatic pushdown and Spark will do the rest of the filtering in the client.

Partitioned Tables

The pseudo columns _PARTITIONDATE and _PARTITIONTIME are not part of the table schema. Therefore in order to query by the partitions of partitioned tables do not use the where() method shown above. Instead, add a filter option in the following manner:

val df = spark.read.format("bigquery")
  .option("filter", "_PARTITIONDATE > '2019-01-01'")
  ...
  .load(TABLE)

Configuring Partitioning

By default the connector creates one partition per 400MB in the table being read (before filtering). This should roughly correspond to the maximum number of readers supported by the BigQuery Storage API. This can be configured explicitly with the maxParallelism property. BigQuery may limit the number of partitions based on server constraints.

Tagging BigQuery Resources

In order to support tracking the usage of BigQuery resources the connectors offers the following options to tag BigQuery resources:

Adding BigQuery Jobs Labels

The connector can launch BigQuery load and query jobs. Adding labels to the jobs is done in the following manner:

spark.conf.set("bigQueryJobLabel.cost_center", "analytics")
spark.conf.set("bigQueryJobLabel.usage", "nightly_etl")

This will create labels cost_center=analytics and usage=nightly_etl.

Adding BigQuery Storage Trace ID

Used to annotate the read and write sessions. The trace ID is of the format Spark:ApplicationName:JobID. This is an opt-in option, and to use it the user need to set the traceApplicationName property. JobID is auto generated by the Dataproc job ID, with a fallback to the Spark application ID (such as application_1648082975639_0001). The Job ID can be overridden by setting the traceJobId option. Notice that the total length of the trace ID cannot be over 256 characters.

Using in Jupyter Notebooks

The connector can be used in Jupyter notebooks even if it is not installed on the Spark cluster. It can be added as an external jar in using the following code:

Python:

from pyspark.sql import SparkSession
spark = SparkSession.builder
  .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.30.0")
  .getOrCreate()
df = spark.read.format("bigquery")
  .load("dataset.table")

Scala:

val spark = SparkSession.builder
.config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.30.0")
.getOrCreate()
val df = spark.read.format("bigquery")
.load("dataset.table")

In case Spark cluster is using Scala 2.12 (it's optional for Spark 2.4.x, mandatory in 3.0.x), then the relevant package is com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.30.0. In order to know which Scala version is used, please run the following code:

Python:

spark.sparkContext._jvm.scala.util.Properties.versionString()

Scala:

scala.util.Properties.versionString

Compiling against the connector

Unless you wish to use the implicit Scala API spark.read.bigquery("TABLE_ID"), there is no need to compile against the connector.

To include the connector in your project:

Maven

<dependency>
  <groupId>com.google.cloud.spark</groupId>
  <artifactId>spark-bigquery-with-dependencies_${scala.version}</artifactId>
  <version>0.30.0</version>
</dependency>

SBT

libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "0.30.0"

FAQ

What is the Pricing for the Storage API?

See the BigQuery pricing documentation.

I have very few partitions

You can manually set the number of partitions with the maxParallelism property. BigQuery may provide fewer partitions than you ask for. See Configuring Partitioning.

You can also always repartition after reading in Spark.

How do I authenticate outside GCE / Dataproc?

The connector needs an instance of a GoogleCredentials in order to connect to the BigQuery APIs. There are multiple options to provide it:

  • The default is to load the JSON key from the GOOGLE_APPLICATION_CREDENTIALS environment variable, as described here.
  • In case the environment variable cannot be changed, the credentials file can be configured as as a spark option. The file should reside on the same path on all the nodes of the cluster.
// Globally
spark.conf.set("credentialsFile", "</path/to/key/file>")
// Per read/Write
spark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
  • Credentials can also be provided explicitly, either as a parameter or from Spark runtime configuration. They should be passed in as a base64-encoded string directly.
// Globally
spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
// Per read/Write
spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
  • In cases where the user has an internal service providing the Google AccessToken, a custom implementation can be done, creating only the AccessToken and providing its TTL. Token refresh will re-generate a new token. In order to use this, implement the com.google.cloud.bigquery.connector.common.AccessTokenProvider interface. The fully qualified class name of the implementation should be provided in the gcpAccessTokenProvider option. AccessTokenProvider must be implemented in Java or other JVM language such as Scala or Kotlin. It must either have a no-arg constructor or a constructor accepting a single java.util.String argument. This configuration parameter can be supplied using the gcpAccessTokenProviderConfig option. If this is not provided then the no-arg constructor wil be called. The jar containing the implementation should be on the cluster's classpath.
// Globally
spark.conf.set("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
  • For a simpler application, where access token refresh is not required, another alternative is to pass the access token as the gcpAccessToken configuration option. You can get the access token by running gcloud auth application-default print-access-token.
// Globally
spark.conf.set("gcpAccessToken", "<access-token>")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")

Important: The CredentialsProvider and AccessTokenProvider need to be implemented in Java or other JVM language such as Scala or Kotlin. The jar containing the implementation should be on the cluster's classpath.

Notice: Only one of the above options should be provided.

How do I connect to GCP/BigQuery via Proxy?

To connect to a forward proxy and to authenticate the user credentials, configure the following options.

proxyAddress: Address of the proxy server. The proxy must be an HTTP proxy and address should be in the host:port format.

proxyUsername: The userName used to connect to the proxy.

proxyPassword: The password used to connect to the proxy.

val df = spark.read.format("bigquery")
  .option("proxyAddress", "http://my-proxy:1234")
  .option("proxyUsername", "my-username")
  .option("proxyPassword", "my-password")
  .load("some-table")

The same proxy parameters can also be set globally using Spark's RuntimeConfig like this:

spark.conf.set("proxyAddress", "http://my-proxy:1234")
spark.conf.set("proxyUsername", "my-username")
spark.conf.set("proxyPassword", "my-password")

val df = spark.read.format("bigquery")
  .load("some-table")

You can set the following in the hadoop configuration as well.

fs.gs.proxy.address(similar to "proxyAddress"), fs.gs.proxy.username(similar to "proxyUsername") and fs.gs.proxy.password(similar to "proxyPassword").

If the same parameter is set at multiple places the order of priority is as follows:

option("key", "value") > spark.conf > hadoop configuration