Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add read support for S3 Tables in Iceberg #24815

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ jobs:
AWS_SECRET_ACCESS_KEY: ${{ secrets.TRINO_AWS_SECRET_ACCESS_KEY }}
AWS_REGION: ${{ vars.TRINO_AWS_REGION }}
S3_BUCKET: ${{ vars.TRINO_S3_BUCKET }}
S3_TABLES_BUCKET: ${{ vars.TRINO_S3_TABLES_BUCKET }}
GCP_CREDENTIALS_KEY: ${{ secrets.GCP_CREDENTIALS_KEY }}
GCP_STORAGE_BUCKET: ${{ vars.GCP_STORAGE_BUCKET }}
ABFS_CONTAINER: ${{ vars.AZURE_ABFS_HIERARCHICAL_CONTAINER }}
Expand Down
30 changes: 30 additions & 0 deletions plugin/trino-iceberg/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Iceberg Connector Developer Notes

Steps to create TPCH tables on S3 Tables:
1. Set `AWS_REGION`, `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables.
2. Replace placeholders in the following command and run it:
```sh
./spark-sql \
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,software.amazon.awssdk:bundle:2.20.10,software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:0.1.3,org.apache.kyuubi:kyuubi-spark-connector-tpch_2.12:1.8.0 \
--conf spark.sql.catalog.s3tablesbucket=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.s3tablesbucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog \
--conf spark.sql.catalog.s3tablesbucket.warehouse=arn:aws:s3tables:{region}:{account-id}:bucket/{bucket-name} \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.tpch=org.apache.kyuubi.spark.connector.tpch.TPCHCatalog
```

3. Run the following command to create TPCH tables:
```sql
CREATE TABLE s3tablesbucket.tpch.nation AS SELECT
n_nationkey AS nationkey,
n_name AS name,
n_regionkey AS regionkey,
n_comment AS comment
FROM tpch.tiny.nation;

CREATE TABLE s3tablesbucket.tpch.region AS SELECT
r_regionkey AS regionkey,
r_name AS name,
r_comment AS comment
FROM tpch.tiny.region;
```
18 changes: 12 additions & 6 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@
<artifactId>trino-filesystem-manager</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem-s3</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hive</artifactId>
Expand Down Expand Up @@ -218,6 +223,11 @@
<artifactId>iceberg-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
Expand Down Expand Up @@ -359,12 +369,6 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem-s3</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
Expand Down Expand Up @@ -744,6 +748,7 @@
<exclude>**/TestIcebergGlueTableOperationsInsertFailure.java</exclude>
<exclude>**/TestIcebergGlueCatalogSkipArchive.java</exclude>
<exclude>**/TestIcebergS3AndGlueMetastoreTest.java</exclude>
<exclude>**/TestIcebergS3TablesConnectorSmokeTest.java</exclude>
<exclude>**/TestIcebergGcsConnectorSmokeTest.java</exclude>
<exclude>**/TestIcebergAbfsConnectorSmokeTest.java</exclude>
<exclude>**/Test*FailureRecoveryTest.java</exclude>
Expand Down Expand Up @@ -809,6 +814,7 @@
<include>**/TestIcebergGlueTableOperationsInsertFailure.java</include>
<include>**/TestIcebergGlueCatalogSkipArchive.java</include>
<include>**/TestIcebergS3AndGlueMetastoreTest.java</include>
<include>**/TestIcebergS3TablesConnectorSmokeTest.java</include>
<include>**/TestIcebergGcsConnectorSmokeTest.java</include>
<include>**/TestIcebergAbfsConnectorSmokeTest.java</include>
<include>**/TestIcebergSnowflakeCatalogConnectorSmokeTest.java</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.security.TrinoPrincipal;
import jakarta.annotation.Nullable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -159,6 +160,7 @@ Transaction newCreateOrReplaceTableTransaction(

void updateViewColumnComment(ConnectorSession session, SchemaTableName schemaViewName, String columnName, Optional<String> comment);

@Nullable
String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName);

void setTablePrincipal(ConnectorSession session, SchemaTableName schemaTableName, TrinoPrincipal principal);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.rest;

import java.util.Map;

public interface AwsProperties
{
Map<String, String> get();
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public enum SessionType
private Security security = Security.NONE;
private SessionType sessionType = SessionType.NONE;
private boolean vendedCredentialsEnabled;
private boolean viewEndpointsEnabled = true;
private boolean sigv4Enabled;
private boolean caseInsensitiveNameMatching;
private Duration caseInsensitiveNameMatchingCacheTtl = new Duration(1, MINUTES);

Expand Down Expand Up @@ -146,6 +148,32 @@ public IcebergRestCatalogConfig setVendedCredentialsEnabled(boolean vendedCreden
return this;
}

public boolean isViewEndpointsEnabled()
{
return viewEndpointsEnabled;
}

@Config("iceberg.rest-catalog.view-endpoints-enabled")
@ConfigDescription("Enable view endpoints")
public IcebergRestCatalogConfig setViewEndpointsEnabled(boolean viewEndpointsEnabled)
{
this.viewEndpointsEnabled = viewEndpointsEnabled;
return this;
}

public boolean isSigv4Enabled()
{
return sigv4Enabled;
}

@Config("iceberg.rest-catalog.sigv4-enabled")
@ConfigDescription("Enable AWSSignature Version 4 (SigV4)")
public IcebergRestCatalogConfig setSigv4Enabled(boolean sigv4Enabled)
{
this.sigv4Enabled = sigv4Enabled;
return this;
}

public boolean isCaseInsensitiveNameMatching()
{
return caseInsensitiveNameMatching;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.iceberg.IcebergConfig;
import io.trino.plugin.iceberg.IcebergFileSystemFactory;
import io.trino.plugin.iceberg.IcebergSecurityConfig;
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.Security;
import io.trino.spi.TrinoException;

import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.plugin.iceberg.IcebergSecurityConfig.IcebergSecurity.READ_ONLY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;

public class IcebergRestCatalogModule
Expand All @@ -39,6 +41,15 @@ protected void setup(Binder binder)
config -> config.getSecurity() == Security.OAUTH2,
new OAuth2SecurityModule(),
new NoneSecurityModule()));
install(conditionalModule(
IcebergRestCatalogConfig.class,
IcebergRestCatalogConfig::isSigv4Enabled,
internalBinder -> {
configBinder(internalBinder).bindConfig(IcebergRestCatalogSigv4Config.class);
configBinder(internalBinder).bindConfigDefaults(IcebergSecurityConfig.class, config -> config.setSecuritySystem(READ_ONLY));
internalBinder.bind(AwsProperties.class).to(SigV4AwsProperties.class).in(Scopes.SINGLETON);
},
internalBinder -> internalBinder.bind(AwsProperties.class).to(NoneAwsProperties.class)));

binder.bind(TrinoCatalogFactory.class).to(TrinoIcebergRestCatalogFactory.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, IcebergFileSystemFactory.class).setBinding().to(IcebergRestCatalogFileSystemFactory.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.rest;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import jakarta.validation.constraints.NotNull;
import org.apache.iceberg.aws.AwsProperties;

public class IcebergRestCatalogSigv4Config
{
private String signingName = AwsProperties.REST_SIGNING_NAME_DEFAULT;

@NotNull
public String getSigningName()
{
return signingName;
}

@Config("iceberg.rest-catalog.signing-name")
@ConfigDescription("The service name to be used by the SigV4 protocol for signing requests")

public IcebergRestCatalogSigv4Config setSigningName(String signingName)
{
this.signingName = signingName;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.rest;

import com.google.common.collect.ImmutableMap;

import java.util.Map;

public class NoneAwsProperties
implements AwsProperties
{
@Override
public Map<String, String> get()
{
return ImmutableMap.of();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.rest;

import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.trino.filesystem.s3.S3FileSystemConfig;

import java.util.Map;

public class SigV4AwsProperties
implements AwsProperties
{
private final Map<String, String> properties;

@Inject
public SigV4AwsProperties(IcebergRestCatalogSigv4Config sigv4Config, S3FileSystemConfig s3Config)
{
this.properties = ImmutableMap.<String, String>builder()
.put("rest.sigv4-enabled", "true")
.put("rest.signing-name", sigv4Config.getSigningName())
.put("rest.access-key-id", s3Config.getAwsAccessKey())
.put("rest.secret-access-key", s3Config.getAwsSecretKey())
.put("rest.signing-region", s3Config.getRegion())
.put("rest-metrics-reporting-enabled", "false")
.buildOrThrow();
}

@Override
public Map<String, String> get()
{
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public class TrinoIcebergRestCatalogFactory
private final boolean nestedNamespaceEnabled;
private final SessionType sessionType;
private final boolean vendedCredentialsEnabled;
private final boolean viewEndpointsEnabled;
private final SecurityProperties securityProperties;
private final AwsProperties awsProperties;
private final boolean uniqueTableLocation;
private final TypeManager typeManager;
private final boolean caseInsensitiveNameMatching;
Expand All @@ -73,6 +75,7 @@ public TrinoIcebergRestCatalogFactory(
CatalogName catalogName,
IcebergRestCatalogConfig restConfig,
SecurityProperties securityProperties,
AwsProperties awsProperties,
IcebergConfig icebergConfig,
TypeManager typeManager,
NodeVersion nodeVersion)
Expand All @@ -87,7 +90,9 @@ public TrinoIcebergRestCatalogFactory(
this.nestedNamespaceEnabled = restConfig.isNestedNamespaceEnabled();
this.sessionType = restConfig.getSessionType();
this.vendedCredentialsEnabled = restConfig.isVendedCredentialsEnabled();
this.viewEndpointsEnabled = restConfig.isViewEndpointsEnabled();
this.securityProperties = requireNonNull(securityProperties, "securityProperties is null");
this.awsProperties = requireNonNull(awsProperties, "awsProperties is null");
requireNonNull(icebergConfig, "icebergConfig is null");
this.uniqueTableLocation = icebergConfig.isUniqueTableLocation();
this.typeManager = requireNonNull(typeManager, "typeManager is null");
Expand All @@ -112,9 +117,10 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity)
properties.put(CatalogProperties.URI, serverUri.toString());
warehouse.ifPresent(location -> properties.put(CatalogProperties.WAREHOUSE_LOCATION, location));
prefix.ifPresent(prefix -> properties.put("prefix", prefix));
properties.put("view-endpoints-supported", "true");
properties.put("view-endpoints-supported", Boolean.toString(viewEndpointsEnabled));
properties.put("trino-version", trinoVersion);
properties.putAll(securityProperties.get());
properties.putAll(awsProperties.get());

if (vendedCredentialsEnabled) {
properties.put("header.X-Iceberg-Access-Delegation", "vended-credentials");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import java.util.function.UnaryOperator;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.cache.CacheUtils.uncheckedCacheGet;
import static io.trino.filesystem.Locations.appendPath;
Expand Down Expand Up @@ -477,7 +476,10 @@ public String defaultTableLocation(ConnectorSession session, SchemaTableName sch

Map<String, Object> properties = loadNamespaceMetadata(session, schemaTableName.getSchemaName());
String databaseLocation = (String) properties.get(IcebergSchemaProperties.LOCATION_PROPERTY);
checkArgument(databaseLocation != null, "location must be set for %s", schemaTableName.getSchemaName());
if (databaseLocation == null) {
// REST catalog spec doesn't ensure that a location property is present in the namespace metadata
return null;
}

return appendPath(databaseLocation, tableName);
}
Expand Down
Loading