Skip to content

Latest commit

 

History

History
707 lines (558 loc) · 34.5 KB

connector-write.md

File metadata and controls

707 lines (558 loc) · 34.5 KB

Load data using Spark connector

StarRocks provides a self-developed connector named StarRocks Connector for Apache Spark™ (Spark connector for short) to help you load data into a StarRocks table by using Spark. The basic principle is to accumulate the data and then load it all at a time into StarRocks through STREAM LOAD. The Spark connector is implemented based on Spark DataSource V2. A DataSource can be created by using Spark DataFrames or Spark SQL. And both batch and structured streaming modes are supported.

NOTICE

Loading data into StarRocks tables with Spark connector needs SELECT and INSERT privileges. If you do not have these privileges, follow the instructions provided in GRANT to grant these privileges to the user that you use to connect to your StarRocks cluster.

Version requirements

Spark connector Spark StarRocks Java Scala
1.1.2 3.2, 3.3, 3.4, 3.5 2.5 and later 8 2.12
1.1.1 3.2, 3.3, or 3.4 2.5 and later 8 2.12
1.1.0 3.2, 3.3, or 3.4 2.5 and later 8 2.12

NOTICE

  • Please see Upgrade Spark connector for behaviour changes among different versions of the Spark connector.
  • The Spark connector does not provide MySQL JDBC driver since version 1.1.1, and you need import the driver to the spark classpath manually. You can find the driver on MySQL site or Maven Central.

Obtain Spark connector

You can obtain the Spark connector JAR file in the following ways:

  • Directly download the compiled Spark Connector JAR file.
  • Add the Spark connector as a dependency in your Maven project and then download the JAR file.
  • Compile the source code of the Spark Connector into a JAR file by yourself.

The naming format of the Spark connector JAR file is starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar.

For example, if you install Spark 3.2 and Scala 2.12 in your environment and you want to use Spark connector 1.1.0, you can use starrocks-spark-connector-3.2_2.12-1.1.0.jar.

NOTICE

In general, the latest version of the Spark connector only maintains compatibility with the three most recent versions of Spark.

Download the compiled Jar file

Directly download the corresponding version of the Spark connector JAR from the Maven Central Repository.

Maven Dependency

  1. In your Maven project's pom.xml file, add the Spark connector as a dependency according to the following format. Replace spark_version, scala_version, and connector_version with the respective versions.

    <dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
    <version>${connector_version}</version>
    </dependency>
  2. For example, if the version of Spark in your environment is 3.2, the version of Scala is 2.12, and you choose Spark connector 1.1.0, you need to add the following dependency:

    <dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
    <version>1.1.0</version>
    </dependency>

Compile by yourself

  1. Download the Spark connector package.

  2. Execute the following command to compile the source code of Spark connector into a JAR file. Note that spark_version is replaced with the corresponding Spark version.

    sh build.sh <spark_version>

    For example, if the Spark version in your environment is 3.2, you need to execute the following command:

    sh build.sh 3.2
  3. Go to the target/ directory to find the Spark connector JAR file, such as starrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar , generated upon compilation.

NOTE

The name of Spark connector which is not formally released contains the SNAPSHOT suffix.

Parameters

Parameter Required Default value Description
starrocks.fe.http.url YES None The HTTP URL of the FE in your StarRocks cluster. You can specify multiple URLs, which must be separated by a comma (,). Format: <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>. Since version 1.1.1, you can also add http:// prefix to the URL, such as http://<fe_host1>:<fe_http_port1>,http://<fe_host2>:<fe_http_port2>.
starrocks.fe.jdbc.url YES None The address that is used to connect to the MySQL server of the FE. Format: jdbc:mysql://<fe_host>:<fe_query_port>.
starrocks.table.identifier YES None The name of the StarRocks table. Format: <database_name>.<table_name>.
starrocks.user YES None The username of your StarRocks cluster account.
starrocks.password YES None The password of your StarRocks cluster account.
starrocks.write.label.prefix NO spark- The label prefix used by Stream Load.
starrocks.write.enable.transaction-stream-load NO TRUE Whether to use Stream Load transaction interface to load data. It requires StarRocks v2.5 or later. This feature can load more data in a transaction with less memory usage, and improve performance.
NOTICE: Since 1.1.1, this parameter takes effect only when the value of starrocks.write.max.retries is non-positive because Stream Load transaction interface does not support retry.
starrocks.write.buffer.size NO 104857600 The maximum size of data that can be accumulated in memory before being sent to StarRocks at a time. Setting this parameter to a larger value can improve loading performance but may increase loading latency.
starrocks.write.buffer.rows NO Integer.MAX_VALUE Supported since version 1.1.1. The maximum number of rows that can be accumulated in memory before being sent to StarRocks at a time.
starrocks.write.flush.interval.ms NO 300000 The interval at which data is sent to StarRocks. This parameter is used to control the loading latency.
starrocks.write.max.retries NO 3 Supported since version 1.1.1. The number of times that the connector retries to perform the Stream Load for the same batch of data if the load fails.
NOTICE: Because Stream Load transaction interface does not support retry. If this parameter is positive, the connector always use Stream Load interface and ingnore the value of starrocks.write.enable.transaction-stream-load.
starrocks.write.retry.interval.ms NO 10000 Supported since version 1.1.1. The interval to retry the Stream Load for the same batch of data if the load fails.
starrocks.columns NO None The StarRocks table column into which you want to load data. You can specify multiple columns, which must be separated by commas (,), for example, "col0,col1,col2".
starrocks.column.types NO None Supported since version 1.1.1. Customize the column data types for Spark instead of using the defaults inferred from the StarRocks table and the default mapping. The parameter value is a schema in DDL format same as the output of Spark StructType#toDDL , such as col0 INT, col1 STRING, col2 BIGINT. Note that you only need to specify columns that need customization. One use case is to load data into columns of BITMAP or HLL type.
starrocks.write.properties.* NO None The parameters that are used to control Stream Load behavior. For example, the parameter starrocks.write.properties.format specifies the format of the data to be loaded, such as CSV or JSON. For a list of supported parameters and their descriptions, see STREAM LOAD.
starrocks.write.properties.format NO CSV The file format based on which the Spark connector transforms each batch of data before the data is sent to StarRocks. Valid values: CSV and JSON.
starrocks.write.properties.row_delimiter NO \n The row delimiter for CSV-formatted data.
starrocks.write.properties.column_separator NO \t The column separator for CSV-formatted data.
starrocks.write.num.partitions NO None The number of partitions into which Spark can write data in parallel. When the data volume is small, you can reduce the number of partitions to lower the loading concurrency and frequency. The default value for this parameter is determined by Spark. However, this method may cause Spark Shuffle cost.
starrocks.write.partition.columns NO None The partitioning columns in Spark. The parameter takes effect only when starrocks.write.num.partitions is specified. If this parameter is not specified, all columns being written are used for partitioning.
starrocks.timezone NO Default timezone of JVM Supported since 1.1.1. The timezone used to convert Spark TimestampType to StarRocks DATETIME. The default is the timezone of JVM returned by ZoneId#systemDefault(). The format can be a timezone name such as Asia/Shanghai, or a zone offset such as +08:00.

Data type mapping between Spark and StarRocks

  • The default data type mapping is as follows:

    Spark data type StarRocks data type
    BooleanType BOOLEAN
    ByteType TINYINT
    ShortType SMALLINT
    IntegerType INT
    LongType BIGINT
    StringType LARGEINT
    FloatType FLOAT
    DoubleType DOUBLE
    DecimalType DECIMAL
    StringType CHAR
    StringType VARCHAR
    StringType STRING
    StringType JSON
    DateType DATE
    TimestampType DATETIME
    ArrayType ARRAY
    NOTE:
    Supported since version 1.1.1. For detailed steps, see Load data into columns of ARRAY type.
  • You can also customize the data type mapping.

    For example, a StarRocks table consists of the BITMAP and HLL data types, but Spark does not support the two data types. You need to customize the corresponding data types in Spark. For detailed steps, see load data into columns of BITMAP and HLL types. BITMAP and HLL are supported since version 1.1.1.

Upgrade Spark connector

Upgrade from version 1.1.0 to 1.1.1

  • Since 1.1.1, the Spark connector does not provide mysql-connector-java which is the official JDBC driver for MySQL, because of the limitations of the GPL license used by mysql-connector-java. However, the Spark connector still needs the MySQL JDBC driver to connect to StarRocks for the table metadata, so you need to add the driver to the Spark classpath manually. You can find the driver on MySQL site or Maven Central.
  • Since 1.1.1, the connector uses Stream Load interface by default rather than Stream Load transaction interface in version 1.1.0. If you still want to use Stream Load transaction interface, you can set the option starrocks.write.max.retries to 0. Please see the description of starrocks.write.enable.transaction-stream-load and starrocks.write.max.retries for details.

Examples

The following examples show how to use the Spark connector to load data into a StarRocks table with Spark DataFrames or Spark SQL. The Spark DataFrames supports both Batch and Structured Streaming modes.

For more examples, see Spark Connector Examples.

Preparations

Create a StarRocks table

Create a database test and create a Primary Key table score_board.

CREATE DATABASE `test`;

CREATE TABLE `test`.`score_board`
(
    `id` int(11) NOT NULL COMMENT "",
    `name` varchar(65533) NULL DEFAULT "" COMMENT "",
    `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);

Set up your Spark environment

Note that the following examples are run in Spark 3.2.4 and use spark-shell, pyspark and spark-sql. Before running the examples, make sure to place the Spark connector JAR file in the $SPARK_HOME/jars directory.

Load data with Spark DataFrames

The following two examples explain how to load data with Spark DataFrames Batch or Structured Streaming mode.

Batch

Construct data in memory and load data into the StarRocks table.

  1. You can write the spark job using scala or python

    For scala, run the following codes in spark-shell:

    // 1. Create a DataFrame from a sequence.
    val data = Seq((1, "starrocks", 100), (2, "spark", 100))
    val df = data.toDF("id", "name", "score")
    
    // 2. Write to starrocks with the format "starrocks",
    // and replace the options with your own.
    df.write.format("starrocks")
        .option("starrocks.fe.http.url", "127.0.0.1:8030")
        .option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
        .option("starrocks.table.identifier", "test.score_board")
        .option("starrocks.user", "root")
        .option("starrocks.password", "")
        .mode("append")
        .save()

    For python, run the following codes in pyspark:

    from pyspark.sql import SparkSession
    
    spark = SparkSession \
         .builder \
         .appName("StarRocks Example") \
         .getOrCreate()
    
     # 1. Create a DataFrame from a sequence.
     data = [(1, "starrocks", 100), (2, "spark", 100)]
     df = spark.sparkContext.parallelize(data) \
             .toDF(["id", "name", "score"])
    
     # 2. Write to starrocks with the format "starrocks",
     # and replace the options with your own.
     df.write.format("starrocks") \
         .option("starrocks.fe.http.url", "127.0.0.1:8038") \
         .option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9038") \
         .option("starrocks.table.identifier", "test.score_board") \
         .option("starrocks.user", "root") \
         .option("starrocks.password", "") \
         .mode("append") \
         .save()
  2. Query data in the StarRocks table.

    MySQL [test]> SELECT * FROM `score_board`;
    +------+-----------+-------+
    | id   | name      | score |
    +------+-----------+-------+
    |    1 | starrocks |   100 |
    |    2 | spark     |   100 |
    +------+-----------+-------+
    2 rows in set (0.00 sec)

Structured Streaming

Construct a streaming read of data from a CSV file and load data into the StarRocks table.

  1. In the directory csv-data, create a CSV file test.csv with the following data:

    3,starrocks,100
    4,spark,100
    
  2. You can write the spark job using scala or python

    For scala, run the following codes in spark-shell:

    import org.apache.spark.sql.types.StructType
    
    // 1. Create a DataFrame from CSV.
    val schema = (new StructType()
            .add("id", "integer")
            .add("name", "string")
            .add("score", "integer")
        )
    val df = (spark.readStream
            .option("sep", ",")
            .schema(schema)
            .format("csv") 
            // Replace it with your path to the directory "csv-data".
            .load("/path/to/csv-data")
        )
    
    // 2. Write to starrocks with the format "starrocks", and replace the options with your own.
    val query = (df.writeStream.format("starrocks")
            .option("starrocks.fe.http.url", "127.0.0.1:8030")
            .option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
            .option("starrocks.table.identifier", "test.score_board")
            .option("starrocks.user", "root")
            .option("starrocks.password", "")
            // replace it with your checkpoint directory
            .option("checkpointLocation", "/path/to/checkpoint")
            .outputMode("append")
            .start()
        )

    For python, run the following codes in pyspark:

    from pyspark.sql import SparkSession
    from pyspark.sql.types import IntegerType, StringType, StructType, StructField
    
    spark = SparkSession \
         .builder \
         .appName("StarRocks SS Example") \
         .getOrCreate()
    
     # 1. Create a DataFrame from CSV.
     schema = StructType([ \
             StructField("id", IntegerType()), \
             StructField("name", StringType()), \
             StructField("score", IntegerType()) \
         ])
     df = spark.readStream \
             .option("sep", ",") \
             .schema(schema) \
             .format("csv") \
             # Replace it with your path to the directory "csv-data".
             .load("/path/to/csv-data")
    
     # 2. Write to starrocks with the format "starrocks", and replace the options with your own.
     query = df.writeStream.format("starrocks") \
             .option("starrocks.fe.http.url", "127.0.0.1:8038") \
             .option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9038") \
             .option("starrocks.table.identifier", "test.score_board") \
             .option("starrocks.user", "root") \
             .option("starrocks.password", "") \
             # replace it with your checkpoint directory
             .option("checkpointLocation", "/path/to/checkpoint") \
             .outputMode("append") \
             .start()
         )
  3. Query data in the StarRocks table.

    MySQL [test]> select * from score_board;
    +------+-----------+-------+
    | id   | name      | score |
    +------+-----------+-------+
    |    4 | spark     |   100 |
    |    3 | starrocks |   100 |
    +------+-----------+-------+
    2 rows in set (0.67 sec)

Load data with Spark SQL

The following example explains how to load data with Spark SQL by using the INSERT INTO statement in the Spark SQL CLI.

  1. Execute the following SQL statement in the spark-sql:

    -- 1. create a table using datasource "starrocks", and replace the options with your own
    CREATE TABLE `score_board`
    USING starrocks
    OPTIONS(
    "starrocks.fe.http.url"="127.0.0.1:8030",
    "starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
    "starrocks.table.identifier"="test.score_board",
    "starrocks.user"="root",
    "starrocks.password"=""
    );
    
    -- 2. insert two rows into the table
    INSERT INTO `score_board` VALUES (5, "starrocks", 100), (6, "spark", 100);
  2. Query data in the StarRocks table.

    MySQL [test]> select * from score_board;
    +------+-----------+-------+
    | id   | name      | score |
    +------+-----------+-------+
    |    6 | spark     |   100 |
    |    5 | starrocks |   100 |
    +------+-----------+-------+
    2 rows in set (0.00 sec)

Best Practices

Load data to primary key table

This section will show how to load data to StarRocks primary key table to achieve partial update, and conditional update. You can see Change data through loading for the introduction of those features. These examples use Spark SQL.

Preparations

Create a database test and create a Primary Key table score_board in StarRocks.

CREATE DATABASE `test`;

CREATE TABLE `test`.`score_board`
(
    `id` int(11) NOT NULL COMMENT "",
    `name` varchar(65533) NULL DEFAULT "" COMMENT "",
    `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);

Partial update

This example will show how to load data only to columns id and name.

  1. Insert initial data to StarRocks table in MySQL client
mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

mysql> select * from score_board;
+------+-----------+-------+
| id   | name      | score |
+------+-----------+-------+
|    1 | starrocks |   100 |
|    2 | flink     |   100 |
+------+-----------+-------+
2 rows in set (0.02 sec)
  1. Create a Spark table score_board in Spark SQL client
  • Set the option starrocks.write.properties.partial_update to true which tells the connector to do partial update

  • Set the option starrocks.columns to "id,name" to tell the connector which columns to write

    CREATE TABLE `score_board`
    USING starrocks
    OPTIONS(
        "starrocks.fe.http.url"="127.0.0.1:8030",
        "starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
        "starrocks.table.identifier"="test.score_board",
        "starrocks.user"="root",
        "starrocks.password"="",
        "starrocks.write.properties.partial_update"="true",
        "starrocks.columns"="id,name"
     );
  1. Insert data to the table in Spark SQL client, and only update the column name
INSERT INTO `score_board` VALUES (1, 'starrocks-update'), (2, 'flink-update');
  1. Query the StarRocks table in mysql client You can see that only values for name change, and the values for score does not change.
mysql> select * from score_board;
+------+------------------+-------+
| id   | name             | score |
+------+------------------+-------+
|    1 | starrocks-update |   100 |
|    2 | flink-update     |   100 |
+------+------------------+-------+
2 rows in set (0.02 sec)

Conditional update

This example will show how to do conditional update according to the value of column score. The update for an id takes effect only when the new value for score is has a greater or equal to the old value.

  1. Insert initial data to StarRocks table in MySQL client
mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

mysql> select * from score_board;
+------+-----------+-------+
| id   | name      | score |
+------+-----------+-------+
|    1 | starrocks |   100 |
|    2 | flink     |   100 |
+------+-----------+-------+
2 rows in set (0.02 sec)
  1. Create a Spark table score_board in the following ways
  • Set the option starrocks.write.properties.merge_condition to score which tells the connector to use the column score as the condition

  • Make sure that the Spark connector use Stream Load interface to load data, rather than Stream Load transaction interface, because the latter does not support this feature.

    CREATE TABLE `score_board`
    USING starrocks
    OPTIONS(
        "starrocks.fe.http.url"="127.0.0.1:8030",
        "starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
        "starrocks.table.identifier"="test.score_board",
        "starrocks.user"="root",
        "starrocks.password"="",
        "starrocks.write.properties.merge_condition"="score"
     );
  1. Insert data to the table in Spark SQL client, and update id 1 with a smaller score, and id 2 with a larger score
INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'flink-update', 101);
  1. Query the StarRocks table in mysql client You can see that only the row for id 2 changes, and the row for id 1 does not change.
 mysql> select * from score_board;
 +------+--------------+-------+
 | id   | name         | score |
 +------+--------------+-------+
 |    1 | starrocks    |   100 |
 |    2 | flink-update |   101 |
 +------+--------------+-------+
 2 rows in set (0.03 sec)

Load data into columns of BITMAP type

BITMAP is often used to accelerate count distinct, such as counting UV, see Use Bitmap for exact Count Distinct. Here we take the counting of UV as an example to show how to load data into columns of the BITMAP type.

  1. Create a StarRocks Aggregate table

    In the database test, create an Aggregate table page_uv where the column visit_users is defined as the BITMAP type and configured with the aggregate function BITMAP_UNION.

    CREATE TABLE `test`.`page_uv` (
      `page_id` INT NOT NULL COMMENT 'page ID',
      `visit_date` datetime NOT NULL COMMENT 'access time',
      `visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. Create a Spark table

    The schema of the Spark table is inferred from the StarRocks table, and the Spark does not support the BITMAP type. So you need to customize the corresponding column data type in Spark, for example as BIGINT, by configuring the option "starrocks.column.types"="visit_users BIGINT". When using Stream Load to ingest data, the connector uses the to_bitmap function to convert the data of BIGINT type into BITMAP type.

    Run the following DDL in spark-sql:

    CREATE TABLE `page_uv`
    USING starrocks
    OPTIONS(
       "starrocks.fe.http.url"="127.0.0.1:8030",
       "starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
       "starrocks.table.identifier"="test.page_uv",
       "starrocks.user"="root",
       "starrocks.password"="",
       "starrocks.column.types"="visit_users BIGINT"
    );
  3. Load data into StarRocks table

    Run the following DML in spark-sql:

    INSERT INTO `page_uv` VALUES
       (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13),
       (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23),
       (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33),
       (1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13),
       (2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23);
  4. Calculate page UVs from the StarRocks table.

    MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `page_uv` GROUP BY `page_id`;
    +---------+-----------------------------+
    | page_id | count(DISTINCT visit_users) |
    +---------+-----------------------------+
    |       2 |                           1 |
    |       1 |                           3 |
    +---------+-----------------------------+
    2 rows in set (0.01 sec)

NOTICE:

The connector uses to_bitmap function to convert data of the TINYINT, SMALLINT, INTEGER, and BIGINT types in Spark to the BITMAP type in StarRocks, and uses bitmap_hash function for other Spark data types.

Load data into columns of HLL type

HLL can be used for approximate count distinct, see Use HLL for approximate count distinct.

Here we take the counting of UV as an example to show how to load data into columns of the HLL type. HLL is supported since version 1.1.1.

  1. Create a StarRocks Aggregate table

    In the database test, create an Aggregate table hll_uv where the column visit_users is defined as the HLL type and configured with the aggregate function HLL_UNION.

CREATE TABLE `hll_uv` (
  `page_id` INT NOT NULL COMMENT 'page ID',
  `visit_date` datetime NOT NULL COMMENT 'access time',
  `visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID'
) ENGINE=OLAP
AGGREGATE KEY(`page_id`, `visit_date`)
DISTRIBUTED BY HASH(`page_id`);
  1. Create a Spark table

    The schema of the Spark table is inferred from the StarRocks table, and the Spark does not support the HLL type. So you need to customize the corresponding column data type in Spark, for example as BIGINT, by configuring the option "starrocks.column.types"="visit_users BIGINT". When using Stream Load to ingest data, the connector uses the hll_hash function to convert the data of BIGINT type into HLL type.

    Run the following DDL in spark-sql:

    CREATE TABLE `hll_uv`
    USING starrocks
    OPTIONS(
       "starrocks.fe.http.url"="127.0.0.1:8030",
       "starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
       "starrocks.table.identifier"="test.hll_uv",
       "starrocks.user"="root",
       "starrocks.password"="",
       "starrocks.column.types"="visit_users BIGINT"
    );
  2. Load data into StarRocks table

    Run the following DML in spark-sql:

    INSERT INTO `hll_uv` VALUES
       (3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78),
       (4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2),
       (3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674);
  3. Calculate page UVs from the StarRocks table.

    MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;
    +---------+-----------------------------+
    | page_id | count(DISTINCT visit_users) |
    +---------+-----------------------------+
    |       4 |                           1 |
    |       3 |                           2 |
    +---------+-----------------------------+
    2 rows in set (0.01 sec)

Load data into columns of ARRAY type

The following example explains how to load data into columns of the ARRAY type.

  1. Create a StarRocks table

In the database test, create a Primary Key table array_tbl that includes one INT column and two ARRAY columns.

CREATE TABLE `array_tbl` (
  `id` INT NOT NULL,
  `a0` ARRAY<STRING>,
  `a1` ARRAY<ARRAY<INT>>
) ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`)
;
  1. Write data to StarRocks

Because some versions of StarRocks does not provide the metadata of ARRAY column, the connector can not infer the corresponding Spark data type for this column. However, you can explicitly specify the corresponding Spark data type of the column in the option starrocks.column.types. In this exapmle, you can configure the option as a0 ARRAY<STRING>,a1 ARRAY<ARRAY<INT>>.

Run the following codes in spark-shell:

val data = Seq(
   |  (1, Seq("hello", "starrocks"), Seq(Seq(1, 2), Seq(3, 4))),
   |  (2, Seq("hello", "spark"), Seq(Seq(5, 6, 7), Seq(8, 9, 10)))
   | )
val df = data.toDF("id", "a0", "a1")
df.write
     .format("starrocks")
     .option("starrocks.fe.http.url", "127.0.0.1:8030")
     .option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
     .option("starrocks.table.identifier", "test.array_tbl")
     .option("starrocks.user", "root")
     .option("starrocks.password", "")
     .option("starrocks.column.types", "a0 ARRAY<STRING>,a1 ARRAY<ARRAY<INT>>")
     .mode("append")
     .save()
  1. Query data in the StarRocks table.
MySQL [test]> SELECT * FROM `array_tbl`;
+------+-----------------------+--------------------+
| id   | a0                    | a1                 |
+------+-----------------------+--------------------+
|    1 | ["hello","starrocks"] | [[1,2],[3,4]]      |
|    2 | ["hello","spark"]     | [[5,6,7],[8,9,10]] |
+------+-----------------------+--------------------+
2 rows in set (0.01 sec)