From 36342598bc3e0d0654f168cf65afa41a805d1664 Mon Sep 17 00:00:00 2001 From: xhuang Date: Fri, 17 Jan 2025 20:46:40 -0800 Subject: [PATCH 01/11] basic implementation --- .../apache/iceberg/BaseViewMetadataTable.java | 173 ++++++++++++++++++ .../apache/iceberg/MetadataTableUtils.java | 31 ++++ .../apache/iceberg/ViewMetadataTableType.java | 35 ++++ .../org/apache/iceberg/ViewVersionTable.java | 108 +++++++++++ .../view/BaseMetastoreViewCatalog.java | 31 ++++ 5 files changed, 378 insertions(+) create mode 100644 core/src/main/java/org/apache/iceberg/BaseViewMetadataTable.java create mode 100644 core/src/main/java/org/apache/iceberg/ViewMetadataTableType.java create mode 100644 core/src/main/java/org/apache/iceberg/ViewVersionTable.java diff --git a/core/src/main/java/org/apache/iceberg/BaseViewMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseViewMetadataTable.java new file mode 100644 index 000000000000..ca321043add1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseViewMetadataTable.java @@ -0,0 +1,173 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.iceberg; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.view.BaseView; +import org.apache.iceberg.view.View; +import org.apache.iceberg.view.ViewOperations; + +public abstract class BaseViewMetadataTable extends BaseReadOnlyTable implements Serializable { + + //TODO: refactor and share same code as BaseMetadataTable + private final PartitionSpec spec = PartitionSpec.unpartitioned(); + + private final SortOrder sortOrder = SortOrder.unsorted(); + private final BaseView view; + private final String name; + private final UUID uuid; + + protected BaseViewMetadataTable(View view, String name) { + super("metadata"); + Preconditions.checkArgument( + view instanceof BaseView, "Cannot create metadata table for view: %s", view); + this.view = (BaseView) view; + this.name = name; + this.uuid = UUID.randomUUID(); + } + + protected ViewOperations operations() { + return view.operations(); + } + + @Override + public void refresh() { + view.operations().refresh(); + } + + @Override + public PartitionSpec spec() { + return spec; + } + + @Override + public String name() { + return name; + } + + @Override + public FileIO io() { + return null; + } + + @Override + public String location() { + return view.location(); + } + + @Override + public EncryptionManager encryption() { + return null; + } + + @Override + public LocationProvider locationProvider() { + return null; + } + + @Override + public Map schemas() { + return ImmutableMap.of(TableMetadata.INITIAL_SCHEMA_ID, schema()); + } + + + @Override + public Map specs() { + return ImmutableMap.of(spec.specId(), spec); + } + + @Override + public SortOrder sortOrder() { + return sortOrder; + } + + @Override + public Map sortOrders() { + return ImmutableMap.of(sortOrder.orderId(), sortOrder); + } + + + @Override + public Map properties() { + return ImmutableMap.of(); + } + + @Override + public Snapshot currentSnapshot() { + return null; + } + + + @Override + public Iterable snapshots() { + return ImmutableList.of(); + } + + @Override + public Snapshot snapshot(long snapshotId) { + return null; + } + + @Override + public List history() { + return ImmutableList.of(); + } + + @Override + public List statisticsFiles() { + return ImmutableList.of(); + } + + @Override + public List partitionStatisticsFiles() { + return ImmutableList.of(); + } + + @Override + public Map refs() { + return ImmutableMap.of(); + } + + @Override + public UUID uuid() { + return uuid; + } + + @Override + public String toString() { + return name(); + } + + final Object writeReplace() { + return SerializableTable.copyOf(this); + } + +} diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java index adb0f18ba1ad..5a6e6ae6fe71 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java +++ b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java @@ -21,6 +21,9 @@ import java.util.Locale; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.view.BaseView; +import org.apache.iceberg.view.View; +import org.apache.iceberg.view.ViewOperations; public class MetadataTableUtils { private MetadataTableUtils() {} @@ -94,6 +97,34 @@ private static Table createMetadataTableInstance( } } + public static Table createViewMetadataTableInstance( + ViewOperations ops, + String catalogName, + TableIdentifier baseViewIdentifier, + TableIdentifier metadataTableIdentifier, + ViewMetadataTableType type) { + String baseTableName = BaseMetastoreCatalog.fullTableName(catalogName, baseViewIdentifier); + String metadataTableName = + BaseMetastoreCatalog.fullTableName(catalogName, metadataTableIdentifier); + return createViewMetadataTableInstance(ops, baseTableName, metadataTableName, type); + } + + public static Table createViewMetadataTableInstance( + ViewOperations ops, String baseViewName, String metadataTableName, ViewMetadataTableType type) { + View baseView = new BaseView(ops, baseViewName); + return createViewMetadataTableInstance(baseView, metadataTableName, type); + } + + private static Table createViewMetadataTableInstance(View baseView, String metadataTableName, ViewMetadataTableType type) { + switch (type) { + case VERSION: + return new ViewVersionTable(baseView); + default: + throw new NoSuchTableException( + "Unknown metadata table type: %s for %s", type, metadataTableName); + } + } + public static Table createMetadataTableInstance( TableOperations ops, String catalogName, diff --git a/core/src/main/java/org/apache/iceberg/ViewMetadataTableType.java b/core/src/main/java/org/apache/iceberg/ViewMetadataTableType.java new file mode 100644 index 000000000000..80ddd4f1a2de --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ViewMetadataTableType.java @@ -0,0 +1,35 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.iceberg; + +import java.util.Locale; + +public enum ViewMetadataTableType { + VERSION; + public static ViewMetadataTableType from(String name) { + try { + return ViewMetadataTableType.valueOf(name.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException ignored) { + return null; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/ViewVersionTable.java b/core/src/main/java/org/apache/iceberg/ViewVersionTable.java new file mode 100644 index 000000000000..141f55d5f7ca --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ViewVersionTable.java @@ -0,0 +1,108 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.iceberg; + +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.view.View; +import org.apache.iceberg.view.ViewVersion; + +public class ViewVersionTable extends BaseViewMetadataTable { + + static final Schema VIEW_VERSION_SCHEMA = + new Schema( + Types.NestedField.required(1, "version-id", Types.IntegerType.get()), + Types.NestedField.required(2, "schema-id", Types.IntegerType.get()), + Types.NestedField.required(3, "timestamp-ms", Types.TimestampType.withoutZone()), + Types.NestedField.required( + 4, + "summary", + Types.MapType.ofRequired(6, 7, Types.StringType.get(), Types.StringType.get())), + Types.NestedField.required( + 8, + "representations", + Types.ListType.ofRequired( + 9, + Types.StructType.of( + Types.NestedField.required(10, "type", Types.StringType.get()), + Types.NestedField.required(11, "sql", Types.StringType.get()), + Types.NestedField.required(12, "dialect", Types.StringType.get())))), + Types.NestedField.optional(13, "default-catalog", Types.StringType.get()), + Types.NestedField.required(14, "default-namespace", Types.StringType.get())); + + ViewVersionTable(View view) { + super(view, view.name() + ".version"); + } + + @Override + public TableScan newScan() { + return new ViewVersionTableScan(this); + } + + @Override + public Schema schema() { + return VIEW_VERSION_SCHEMA; + } + + private DataTask task(BaseTableScan scan) { + return StaticDataTask.of( + io().newInputFile(location()), + schema(), + scan.schema(), + operations().current().versions(), + ViewVersionTable::viewVersionToRow); + } + + private class ViewVersionTableScan extends StaticTableScan { + ViewVersionTableScan(Table table) { + super(table, VIEW_VERSION_SCHEMA, MetadataTableType.SNAPSHOTS, ViewVersionTable.this::task); + } + + ViewVersionTableScan(Table table, TableScanContext context) { + super( + table, VIEW_VERSION_SCHEMA, MetadataTableType.SNAPSHOTS, ViewVersionTable.this::task, context); + } + + @Override + protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext context) { + return new ViewVersionTableScan(table, context); + } + + @Override + public CloseableIterable planFiles() { + // override planFiles to avoid the check for a current snapshot because this metadata table is + // for all snapshots + return CloseableIterable.withNoopClose(ViewVersionTable.this.task(this)); + } + } + + private static StaticDataTask.Row viewVersionToRow(ViewVersion version) { + return StaticDataTask.Row.of( + version.versionId(), + version.schemaId(), + version.timestampMillis(), + version.summary(), + version.representations(), + version.defaultCatalog(), + version.defaultNamespace()); + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java b/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java index 2f747c54556e..c67f0323809f 100644 --- a/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java +++ b/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java @@ -23,13 +23,19 @@ import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.EnvironmentContext; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.Transaction; +import org.apache.iceberg.ViewMetadataTableType; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.catalog.ViewCatalog; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -67,6 +73,31 @@ public View loadView(TableIdentifier identifier) { throw new NoSuchViewException("Invalid view identifier: %s", identifier); } + @Override + public Table loadTable(TableIdentifier identifier) { + try { + return super.loadTable(identifier); + } catch (NoSuchTableException e) { + return loadViewMetadataTable(identifier); + } + } + + private Table loadViewMetadataTable(TableIdentifier identifier) { + String tableName = identifier.name(); + ViewMetadataTableType type = ViewMetadataTableType.from(tableName); + if (type != null) { + TableIdentifier baseViewIdentifier = TableIdentifier.of(identifier.namespace().levels()); + ViewOperations ops = newViewOps(identifier); + if (ops.current() == null) { + throw new NoSuchTableException("Table or View does not exist: %s", baseViewIdentifier); + } + return MetadataTableUtils.createViewMetadataTableInstance( + ops, name(), baseViewIdentifier, identifier, type); + } else { + throw new NoSuchTableException("Table does not exist: %s", identifier); + } + } + @Override public ViewBuilder buildView(TableIdentifier identifier) { return new BaseViewBuilder(identifier); From dfc92f47d801dafe4d8ee5fb5313309fd731c786 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 17 Jan 2025 21:29:44 -0800 Subject: [PATCH 02/11] fix bug and add 1st test --- .../view/BaseMetastoreViewCatalog.java | 2 +- .../apache/iceberg/view/ViewCatalogTests.java | 25 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java b/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java index c67f0323809f..2269ec4ec833 100644 --- a/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java +++ b/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java @@ -87,7 +87,7 @@ private Table loadViewMetadataTable(TableIdentifier identifier) { ViewMetadataTableType type = ViewMetadataTableType.from(tableName); if (type != null) { TableIdentifier baseViewIdentifier = TableIdentifier.of(identifier.namespace().levels()); - ViewOperations ops = newViewOps(identifier); + ViewOperations ops = newViewOps(baseViewIdentifier); if (ops.current() == null) { throw new NoSuchTableException("Table or View does not exist: %s", baseViewIdentifier); } diff --git a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java index 561d0cd580bf..bd809caecd89 100644 --- a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java @@ -27,8 +27,10 @@ import java.nio.file.Paths; import java.util.UUID; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdateLocation; +import org.apache.iceberg.ViewVersionTable; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -1607,6 +1609,29 @@ public void updateViewLocationConflict() { .hasMessageContaining("View does not exist: ns.view"); } + @Test + public void loadViewMetadataTable() { + TableIdentifier identifier = TableIdentifier.of("ns", "view"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(identifier.namespace()); + } + + assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse(); + + View view = + catalog() + .buildView(identifier) + .withSchema(SCHEMA) + .withDefaultNamespace(identifier.namespace()) + .withQuery("trino", "select * from ns.tbl") + .create(); + + assertThat(catalog().viewExists(identifier)).as("View should exist").isTrue(); + Table table = tableCatalog().loadTable(TableIdentifier.of("ns", "view", "version")); + assertThat(table).isInstanceOf(ViewVersionTable.class); + } + @Test public void concurrentReplaceViewVersion() { TableIdentifier identifier = TableIdentifier.of("ns", "view"); From 6280020d112acce5139f06f99f777811842f4933 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 17 Jan 2025 22:54:04 -0800 Subject: [PATCH 03/11] fix format --- .../apache/iceberg/BaseViewMetadataTable.java | 30 +++++++++---------- .../apache/iceberg/ViewMetadataTableType.java | 30 +++++++++---------- .../org/apache/iceberg/ViewVersionTable.java | 30 +++++++++---------- .../view/BaseMetastoreViewCatalog.java | 2 -- .../apache/iceberg/view/ViewCatalogTests.java | 2 +- 5 files changed, 43 insertions(+), 51 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseViewMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseViewMetadataTable.java index ca321043add1..39f8f0abc0fc 100644 --- a/core/src/main/java/org/apache/iceberg/BaseViewMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseViewMetadataTable.java @@ -1,22 +1,20 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.iceberg; diff --git a/core/src/main/java/org/apache/iceberg/ViewMetadataTableType.java b/core/src/main/java/org/apache/iceberg/ViewMetadataTableType.java index 80ddd4f1a2de..f56c86fea644 100644 --- a/core/src/main/java/org/apache/iceberg/ViewMetadataTableType.java +++ b/core/src/main/java/org/apache/iceberg/ViewMetadataTableType.java @@ -1,22 +1,20 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.iceberg; diff --git a/core/src/main/java/org/apache/iceberg/ViewVersionTable.java b/core/src/main/java/org/apache/iceberg/ViewVersionTable.java index 141f55d5f7ca..91bfab263c67 100644 --- a/core/src/main/java/org/apache/iceberg/ViewVersionTable.java +++ b/core/src/main/java/org/apache/iceberg/ViewVersionTable.java @@ -1,22 +1,20 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.iceberg; diff --git a/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java b/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java index 2269ec4ec833..79e2bfc25f0f 100644 --- a/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java +++ b/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java @@ -23,11 +23,9 @@ import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.EnvironmentContext; -import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.TableOperations; import org.apache.iceberg.Transaction; import org.apache.iceberg.ViewMetadataTableType; import org.apache.iceberg.catalog.Namespace; diff --git a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java index bd809caecd89..d0b26cc8dd03 100644 --- a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java @@ -1626,7 +1626,7 @@ public void loadViewMetadataTable() { .withDefaultNamespace(identifier.namespace()) .withQuery("trino", "select * from ns.tbl") .create(); - + assertThat(catalog().viewExists(identifier)).as("View should exist").isTrue(); Table table = tableCatalog().loadTable(TableIdentifier.of("ns", "view", "version")); assertThat(table).isInstanceOf(ViewVersionTable.class); From 8094e523f1f94a23f1f90646730462c5372cc19b Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 17 Jan 2025 23:11:20 -0800 Subject: [PATCH 04/11] fix format take 2 --- .../apache/iceberg/BaseViewMetadataTable.java | 263 +++++++++--------- .../apache/iceberg/MetadataTableUtils.java | 26 +- .../apache/iceberg/ViewMetadataTableType.java | 16 +- .../org/apache/iceberg/ViewVersionTable.java | 31 ++- .../view/BaseMetastoreViewCatalog.java | 2 +- .../apache/iceberg/view/ViewCatalogTests.java | 12 +- 6 files changed, 176 insertions(+), 174 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseViewMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseViewMetadataTable.java index 39f8f0abc0fc..5e2dee41cabf 100644 --- a/core/src/main/java/org/apache/iceberg/BaseViewMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseViewMetadataTable.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.iceberg; import java.io.Serializable; @@ -35,137 +34,133 @@ public abstract class BaseViewMetadataTable extends BaseReadOnlyTable implements Serializable { - //TODO: refactor and share same code as BaseMetadataTable - private final PartitionSpec spec = PartitionSpec.unpartitioned(); - - private final SortOrder sortOrder = SortOrder.unsorted(); - private final BaseView view; - private final String name; - private final UUID uuid; - - protected BaseViewMetadataTable(View view, String name) { - super("metadata"); - Preconditions.checkArgument( - view instanceof BaseView, "Cannot create metadata table for view: %s", view); - this.view = (BaseView) view; - this.name = name; - this.uuid = UUID.randomUUID(); - } - - protected ViewOperations operations() { - return view.operations(); - } - - @Override - public void refresh() { - view.operations().refresh(); - } - - @Override - public PartitionSpec spec() { - return spec; - } - - @Override - public String name() { - return name; - } - - @Override - public FileIO io() { - return null; - } - - @Override - public String location() { - return view.location(); - } - - @Override - public EncryptionManager encryption() { - return null; - } - - @Override - public LocationProvider locationProvider() { - return null; - } - - @Override - public Map schemas() { - return ImmutableMap.of(TableMetadata.INITIAL_SCHEMA_ID, schema()); - } - - - @Override - public Map specs() { - return ImmutableMap.of(spec.specId(), spec); - } - - @Override - public SortOrder sortOrder() { - return sortOrder; - } - - @Override - public Map sortOrders() { - return ImmutableMap.of(sortOrder.orderId(), sortOrder); - } - - - @Override - public Map properties() { - return ImmutableMap.of(); - } - - @Override - public Snapshot currentSnapshot() { - return null; - } - - - @Override - public Iterable snapshots() { - return ImmutableList.of(); - } - - @Override - public Snapshot snapshot(long snapshotId) { - return null; - } - - @Override - public List history() { - return ImmutableList.of(); - } - - @Override - public List statisticsFiles() { - return ImmutableList.of(); - } - - @Override - public List partitionStatisticsFiles() { - return ImmutableList.of(); - } - - @Override - public Map refs() { - return ImmutableMap.of(); - } - - @Override - public UUID uuid() { - return uuid; - } - - @Override - public String toString() { - return name(); - } - - final Object writeReplace() { - return SerializableTable.copyOf(this); - } - + // TODO: refactor and share same code as BaseMetadataTable + private final PartitionSpec spec = PartitionSpec.unpartitioned(); + + private final SortOrder sortOrder = SortOrder.unsorted(); + private final BaseView view; + private final String name; + private final UUID uuid; + + protected BaseViewMetadataTable(View view, String name) { + super("metadata"); + Preconditions.checkArgument( + view instanceof BaseView, "Cannot create metadata table for view: %s", view); + this.view = (BaseView) view; + this.name = name; + this.uuid = UUID.randomUUID(); + } + + protected ViewOperations operations() { + return view.operations(); + } + + @Override + public void refresh() { + view.operations().refresh(); + } + + @Override + public PartitionSpec spec() { + return spec; + } + + @Override + public String name() { + return name; + } + + @Override + public FileIO io() { + return null; + } + + @Override + public String location() { + return view.location(); + } + + @Override + public EncryptionManager encryption() { + return null; + } + + @Override + public LocationProvider locationProvider() { + return null; + } + + @Override + public Map schemas() { + return ImmutableMap.of(TableMetadata.INITIAL_SCHEMA_ID, schema()); + } + + @Override + public Map specs() { + return ImmutableMap.of(spec.specId(), spec); + } + + @Override + public SortOrder sortOrder() { + return sortOrder; + } + + @Override + public Map sortOrders() { + return ImmutableMap.of(sortOrder.orderId(), sortOrder); + } + + @Override + public Map properties() { + return ImmutableMap.of(); + } + + @Override + public Snapshot currentSnapshot() { + return null; + } + + @Override + public Iterable snapshots() { + return ImmutableList.of(); + } + + @Override + public Snapshot snapshot(long snapshotId) { + return null; + } + + @Override + public List history() { + return ImmutableList.of(); + } + + @Override + public List statisticsFiles() { + return ImmutableList.of(); + } + + @Override + public List partitionStatisticsFiles() { + return ImmutableList.of(); + } + + @Override + public Map refs() { + return ImmutableMap.of(); + } + + @Override + public UUID uuid() { + return uuid; + } + + @Override + public String toString() { + return name(); + } + + final Object writeReplace() { + return SerializableTable.copyOf(this); + } } diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java index 5a6e6ae6fe71..2c523645d55f 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java +++ b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java @@ -98,31 +98,35 @@ private static Table createMetadataTableInstance( } public static Table createViewMetadataTableInstance( - ViewOperations ops, - String catalogName, - TableIdentifier baseViewIdentifier, - TableIdentifier metadataTableIdentifier, - ViewMetadataTableType type) { + ViewOperations ops, + String catalogName, + TableIdentifier baseViewIdentifier, + TableIdentifier metadataTableIdentifier, + ViewMetadataTableType type) { String baseTableName = BaseMetastoreCatalog.fullTableName(catalogName, baseViewIdentifier); String metadataTableName = - BaseMetastoreCatalog.fullTableName(catalogName, metadataTableIdentifier); + BaseMetastoreCatalog.fullTableName(catalogName, metadataTableIdentifier); return createViewMetadataTableInstance(ops, baseTableName, metadataTableName, type); } public static Table createViewMetadataTableInstance( - ViewOperations ops, String baseViewName, String metadataTableName, ViewMetadataTableType type) { + ViewOperations ops, + String baseViewName, + String metadataTableName, + ViewMetadataTableType type) { View baseView = new BaseView(ops, baseViewName); return createViewMetadataTableInstance(baseView, metadataTableName, type); } - private static Table createViewMetadataTableInstance(View baseView, String metadataTableName, ViewMetadataTableType type) { + private static Table createViewMetadataTableInstance( + View baseView, String metadataTableName, ViewMetadataTableType type) { switch (type) { case VERSION: return new ViewVersionTable(baseView); - default: - throw new NoSuchTableException( + default: + throw new NoSuchTableException( "Unknown metadata table type: %s for %s", type, metadataTableName); - } + } } public static Table createMetadataTableInstance( diff --git a/core/src/main/java/org/apache/iceberg/ViewMetadataTableType.java b/core/src/main/java/org/apache/iceberg/ViewMetadataTableType.java index f56c86fea644..5b82757bcce6 100644 --- a/core/src/main/java/org/apache/iceberg/ViewMetadataTableType.java +++ b/core/src/main/java/org/apache/iceberg/ViewMetadataTableType.java @@ -16,18 +16,18 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.iceberg; import java.util.Locale; public enum ViewMetadataTableType { - VERSION; - public static ViewMetadataTableType from(String name) { - try { - return ViewMetadataTableType.valueOf(name.toUpperCase(Locale.ROOT)); - } catch (IllegalArgumentException ignored) { - return null; - } + VERSION; + + public static ViewMetadataTableType from(String name) { + try { + return ViewMetadataTableType.valueOf(name.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException ignored) { + return null; } + } } diff --git a/core/src/main/java/org/apache/iceberg/ViewVersionTable.java b/core/src/main/java/org/apache/iceberg/ViewVersionTable.java index 91bfab263c67..cbe704f1a941 100644 --- a/core/src/main/java/org/apache/iceberg/ViewVersionTable.java +++ b/core/src/main/java/org/apache/iceberg/ViewVersionTable.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.iceberg; import org.apache.iceberg.io.CloseableIterable; @@ -63,11 +62,11 @@ public Schema schema() { private DataTask task(BaseTableScan scan) { return StaticDataTask.of( - io().newInputFile(location()), - schema(), - scan.schema(), - operations().current().versions(), - ViewVersionTable::viewVersionToRow); + io().newInputFile(location()), + schema(), + scan.schema(), + operations().current().versions(), + ViewVersionTable::viewVersionToRow); } private class ViewVersionTableScan extends StaticTableScan { @@ -77,7 +76,11 @@ private class ViewVersionTableScan extends StaticTableScan { ViewVersionTableScan(Table table, TableScanContext context) { super( - table, VIEW_VERSION_SCHEMA, MetadataTableType.SNAPSHOTS, ViewVersionTable.this::task, context); + table, + VIEW_VERSION_SCHEMA, + MetadataTableType.SNAPSHOTS, + ViewVersionTable.this::task, + context); } @Override @@ -95,12 +98,12 @@ public CloseableIterable planFiles() { private static StaticDataTask.Row viewVersionToRow(ViewVersion version) { return StaticDataTask.Row.of( - version.versionId(), - version.schemaId(), - version.timestampMillis(), - version.summary(), - version.representations(), - version.defaultCatalog(), - version.defaultNamespace()); + version.versionId(), + version.schemaId(), + version.timestampMillis(), + version.summary(), + version.representations(), + version.defaultCatalog(), + version.defaultNamespace()); } } diff --git a/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java b/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java index 79e2bfc25f0f..124856bc0af2 100644 --- a/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java +++ b/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java @@ -90,7 +90,7 @@ private Table loadViewMetadataTable(TableIdentifier identifier) { throw new NoSuchTableException("Table or View does not exist: %s", baseViewIdentifier); } return MetadataTableUtils.createViewMetadataTableInstance( - ops, name(), baseViewIdentifier, identifier, type); + ops, name(), baseViewIdentifier, identifier, type); } else { throw new NoSuchTableException("Table does not exist: %s", identifier); } diff --git a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java index d0b26cc8dd03..247995899711 100644 --- a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java @@ -1620,12 +1620,12 @@ public void loadViewMetadataTable() { assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse(); View view = - catalog() - .buildView(identifier) - .withSchema(SCHEMA) - .withDefaultNamespace(identifier.namespace()) - .withQuery("trino", "select * from ns.tbl") - .create(); + catalog() + .buildView(identifier) + .withSchema(SCHEMA) + .withDefaultNamespace(identifier.namespace()) + .withQuery("trino", "select * from ns.tbl") + .create(); assertThat(catalog().viewExists(identifier)).as("View should exist").isTrue(); Table table = tableCatalog().loadTable(TableIdentifier.of("ns", "view", "version")); From 737c4dc42303ecea464e59e55079739c8c2f0e69 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sat, 18 Jan 2025 22:06:45 -0800 Subject: [PATCH 05/11] fix rest catalog --- .../apache/iceberg/MetadataTableUtils.java | 2 +- .../apache/iceberg/rest/CatalogHandlers.java | 3 ++- .../iceberg/rest/RESTSessionCatalog.java | 21 +++++++++++++++++++ 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java index 2c523645d55f..b79b04339e22 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java +++ b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java @@ -118,7 +118,7 @@ public static Table createViewMetadataTableInstance( return createViewMetadataTableInstance(baseView, metadataTableName, type); } - private static Table createViewMetadataTableInstance( + public static Table createViewMetadataTableInstance( View baseView, String metadataTableName, ViewMetadataTableType type) { switch (type) { case VERSION: diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index aeb310854799..9a4036804f1f 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -35,6 +35,7 @@ import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.BaseTable; import org.apache.iceberg.BaseTransaction; +import org.apache.iceberg.BaseViewMetadataTable; import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -331,7 +332,7 @@ public static LoadTableResponse loadTable(Catalog catalog, TableIdentifier ident return LoadTableResponse.builder() .withTableMetadata(((BaseTable) table).operations().current()) .build(); - } else if (table instanceof BaseMetadataTable) { + } else if (table instanceof BaseMetadataTable || table instanceof BaseViewMetadataTable) { // metadata tables are loaded on the client side, return NoSuchTableException for now throw new NoSuchTableException("Table does not exist: %s", ident.toString()); } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index d98dc99495f6..7ce1572deeb0 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -49,6 +49,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.Transaction; import org.apache.iceberg.Transactions; +import org.apache.iceberg.ViewMetadataTableType; import org.apache.iceberg.catalog.BaseViewSessionCatalog; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; @@ -461,6 +462,26 @@ private LoadTableResponse loadInternal( @Override public Table loadTable(SessionContext context, TableIdentifier identifier) { + try { + return loadTableInternal(context, identifier); + } catch (NoSuchTableException original) { + return loadViewMetadataTable(context, identifier); + } + } + + private Table loadViewMetadataTable(SessionContext context, TableIdentifier identifier) { + String tableName = identifier.name(); + ViewMetadataTableType type = ViewMetadataTableType.from(tableName); + if (type != null) { + TableIdentifier baseViewIdentifier = TableIdentifier.of(identifier.namespace().levels()); + View baseView = loadView(context, baseViewIdentifier); + return MetadataTableUtils.createViewMetadataTableInstance(baseView, tableName, type); + } else { + throw new NoSuchTableException("Table does not exist: %s", identifier); + } + } + + private Table loadTableInternal(SessionContext context, TableIdentifier identifier) { Endpoint.check( endpoints, Endpoint.V1_LOAD_TABLE, From df552e6f127bd569dc7818ab38ea002ba3b27290 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Mon, 20 Jan 2025 17:08:37 -0800 Subject: [PATCH 06/11] first runnable --- .../iceberg/BaseViewMetadataTableScan.java | 55 ++++++ .../apache/iceberg/ViewMetadataReadTask.java | 166 ++++++++++++++++++ .../org/apache/iceberg/ViewVersionTable.java | 61 ++++--- .../iceberg/rest/RESTSessionCatalog.java | 2 +- 4 files changed, 257 insertions(+), 27 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/BaseViewMetadataTableScan.java create mode 100644 core/src/main/java/org/apache/iceberg/ViewMetadataReadTask.java diff --git a/core/src/main/java/org/apache/iceberg/BaseViewMetadataTableScan.java b/core/src/main/java/org/apache/iceberg/BaseViewMetadataTableScan.java new file mode 100644 index 000000000000..82d0faee833d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseViewMetadataTableScan.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.util.PropertyUtil; + +abstract class BaseViewMetadataTableScan extends BaseTableScan { + + private final ViewMetadataTableType tableType; + + protected BaseViewMetadataTableScan(Table table, Schema schema, ViewMetadataTableType tableType) { + super(table, schema, TableScanContext.empty()); + this.tableType = tableType; + } + + protected BaseViewMetadataTableScan( + Table table, Schema schema, ViewMetadataTableType tableType, TableScanContext context) { + super(table, schema, context); + this.tableType = tableType; + } + + @Override + public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) { + throw new UnsupportedOperationException( + String.format("Cannot incrementally scan table of type %s", tableType)); + } + + @Override + public TableScan appendsAfter(long fromSnapshotId) { + throw new UnsupportedOperationException( + String.format("Cannot incrementally scan table of type %s", tableType)); + } + + @Override + public long targetSplitSize() { + return PropertyUtil.propertyAsLong( + options(), TableProperties.SPLIT_SIZE, TableProperties.METADATA_SPLIT_SIZE_DEFAULT); + } +} diff --git a/core/src/main/java/org/apache/iceberg/ViewMetadataReadTask.java b/core/src/main/java/org/apache/iceberg/ViewMetadataReadTask.java new file mode 100644 index 000000000000..a645c197e5d9 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ViewMetadataReadTask.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.StructProjection; + +class ViewMetadataReadTask implements DataTask { + + static DataTask of( + String location, + Schema tableSchema, + Schema projectedSchema, + Iterable values, + Function transform) { + return new ViewMetadataReadTask( + location, + tableSchema, + projectedSchema, + Lists.newArrayList(Iterables.transform(values, transform::apply)).toArray(new Row[0])); + } + + private final DataFile metadataFile; + private final StructLike[] rows; + private final Schema tableSchema; + private final Schema projectedSchema; + + private ViewMetadataReadTask( + String location, Schema tableSchema, Schema projectedSchema, StructLike[] rows) { + this.tableSchema = tableSchema; + this.projectedSchema = projectedSchema; + this.metadataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(location) + .withFileSizeInBytes(0) + .withRecordCount(rows.length) + .withFormat(FileFormat.METADATA) + .build(); + this.rows = rows; + } + + ViewMetadataReadTask( + DataFile metadataFile, Schema tableSchema, Schema projectedSchema, StructLike[] rows) { + this.tableSchema = tableSchema; + this.projectedSchema = projectedSchema; + this.metadataFile = metadataFile; + this.rows = rows; + } + + @Override + public Schema schema() { + return tableSchema; + } + + @Override + public List deletes() { + return ImmutableList.of(); + } + + @Override + public CloseableIterable rows() { + StructProjection projection = StructProjection.create(tableSchema, projectedSchema); + Iterable projectedRows = Iterables.transform(Arrays.asList(rows), projection::wrap); + return CloseableIterable.withNoopClose(projectedRows); + } + + @Override + public DataFile file() { + return metadataFile; + } + + @Override + public PartitionSpec spec() { + return PartitionSpec.unpartitioned(); + } + + @Override + public long start() { + return 0; + } + + @Override + public long length() { + return metadataFile.fileSizeInBytes(); + } + + @Override + public Expression residual() { + return Expressions.alwaysTrue(); + } + + @Override + public Iterable split(long splitSize) { + return ImmutableList.of(this); + } + + Schema projectedSchema() { + return projectedSchema; + } + + DataFile metadataFile() { + return metadataFile; + } + + /** + * Returns the table rows before projection. + * + * @return the table rows before projection + */ + StructLike[] tableRows() { + return rows; + } + + /** Implements {@link StructLike#get} for passing static rows. */ + static class Row implements StructLike, Serializable { + public static Row of(Object... values) { + return new Row(values); + } + + private final Object[] values; + + private Row(Object... values) { + this.values = values; + } + + @Override + public int size() { + return values.length; + } + + @Override + public T get(int pos, Class javaClass) { + return javaClass.cast(values[pos]); + } + + @Override + public void set(int pos, T value) { + throw new UnsupportedOperationException("Setting values is not supported"); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/ViewVersionTable.java b/core/src/main/java/org/apache/iceberg/ViewVersionTable.java index cbe704f1a941..2cd5a528a211 100644 --- a/core/src/main/java/org/apache/iceberg/ViewVersionTable.java +++ b/core/src/main/java/org/apache/iceberg/ViewVersionTable.java @@ -25,6 +25,27 @@ public class ViewVersionTable extends BaseViewMetadataTable { + // static final Schema VIEW_VERSION_SCHEMA = + // new Schema( + // Types.NestedField.required(1, "version-id", Types.IntegerType.get()), + // Types.NestedField.required(2, "schema-id", Types.IntegerType.get()), + // Types.NestedField.required(3, "timestamp-ms", Types.TimestampType.withoutZone()), + // Types.NestedField.required( + // 4, + // "summary", + // Types.MapType.ofRequired(6, 7, Types.StringType.get(), Types.StringType.get())), + // Types.NestedField.required( + // 8, + // "representations", + // Types.ListType.ofRequired( + // 9, + // Types.StructType.of( + // Types.NestedField.required(10, "type", Types.StringType.get()), + // Types.NestedField.required(11, "sql", Types.StringType.get()), + // Types.NestedField.required(12, "dialect", Types.StringType.get())))), + // Types.NestedField.optional(13, "default-catalog", Types.StringType.get()), + // Types.NestedField.required(14, "default-namespace", Types.StringType.get())); + static final Schema VIEW_VERSION_SCHEMA = new Schema( Types.NestedField.required(1, "version-id", Types.IntegerType.get()), @@ -34,17 +55,7 @@ public class ViewVersionTable extends BaseViewMetadataTable { 4, "summary", Types.MapType.ofRequired(6, 7, Types.StringType.get(), Types.StringType.get())), - Types.NestedField.required( - 8, - "representations", - Types.ListType.ofRequired( - 9, - Types.StructType.of( - Types.NestedField.required(10, "type", Types.StringType.get()), - Types.NestedField.required(11, "sql", Types.StringType.get()), - Types.NestedField.required(12, "dialect", Types.StringType.get())))), - Types.NestedField.optional(13, "default-catalog", Types.StringType.get()), - Types.NestedField.required(14, "default-namespace", Types.StringType.get())); + Types.NestedField.optional(13, "default-catalog", Types.StringType.get())); ViewVersionTable(View view) { super(view, view.name() + ".version"); @@ -61,26 +72,21 @@ public Schema schema() { } private DataTask task(BaseTableScan scan) { - return StaticDataTask.of( - io().newInputFile(location()), + return ViewMetadataReadTask.of( + location(), schema(), scan.schema(), operations().current().versions(), ViewVersionTable::viewVersionToRow); } - private class ViewVersionTableScan extends StaticTableScan { + private class ViewVersionTableScan extends BaseViewMetadataTableScan { ViewVersionTableScan(Table table) { - super(table, VIEW_VERSION_SCHEMA, MetadataTableType.SNAPSHOTS, ViewVersionTable.this::task); + super(table, VIEW_VERSION_SCHEMA, ViewMetadataTableType.VERSION); } ViewVersionTableScan(Table table, TableScanContext context) { - super( - table, - VIEW_VERSION_SCHEMA, - MetadataTableType.SNAPSHOTS, - ViewVersionTable.this::task, - context); + super(table, VIEW_VERSION_SCHEMA, ViewMetadataTableType.VERSION, context); } @Override @@ -88,6 +94,11 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext return new ViewVersionTableScan(table, context); } + @Override + protected CloseableIterable doPlanFiles() { + return CloseableIterable.withNoopClose(ViewVersionTable.this.task(this)); + } + @Override public CloseableIterable planFiles() { // override planFiles to avoid the check for a current snapshot because this metadata table is @@ -96,14 +107,12 @@ public CloseableIterable planFiles() { } } - private static StaticDataTask.Row viewVersionToRow(ViewVersion version) { - return StaticDataTask.Row.of( + private static ViewMetadataReadTask.Row viewVersionToRow(ViewVersion version) { + return ViewMetadataReadTask.Row.of( version.versionId(), version.schemaId(), version.timestampMillis(), version.summary(), - version.representations(), - version.defaultCatalog(), - version.defaultNamespace()); + version.defaultCatalog()); } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 7ce1572deeb0..459746d4c656 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -267,7 +267,7 @@ public void initialize(String name, Map unresolved) { if (config.endpoints().isEmpty()) { this.endpoints = - PropertyUtil.propertyAsBoolean(mergedProps, VIEW_ENDPOINTS_SUPPORTED, false) + PropertyUtil.propertyAsBoolean(mergedProps, VIEW_ENDPOINTS_SUPPORTED, true) ? ImmutableSet.builder() .addAll(DEFAULT_ENDPOINTS) .addAll(VIEW_ENDPOINTS) From 19863e369e7ce64df8a6a6c6ced6cf32afe1c95e Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Mon, 20 Jan 2025 19:35:38 -0800 Subject: [PATCH 07/11] support all fields --- .../org/apache/iceberg/ViewVersionTable.java | 50 ++++++++++--------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/ViewVersionTable.java b/core/src/main/java/org/apache/iceberg/ViewVersionTable.java index 2cd5a528a211..8e62f107f49f 100644 --- a/core/src/main/java/org/apache/iceberg/ViewVersionTable.java +++ b/core/src/main/java/org/apache/iceberg/ViewVersionTable.java @@ -18,34 +18,15 @@ */ package org.apache.iceberg; +import java.util.stream.Collectors; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Types; +import org.apache.iceberg.view.SQLViewRepresentation; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewVersion; public class ViewVersionTable extends BaseViewMetadataTable { - // static final Schema VIEW_VERSION_SCHEMA = - // new Schema( - // Types.NestedField.required(1, "version-id", Types.IntegerType.get()), - // Types.NestedField.required(2, "schema-id", Types.IntegerType.get()), - // Types.NestedField.required(3, "timestamp-ms", Types.TimestampType.withoutZone()), - // Types.NestedField.required( - // 4, - // "summary", - // Types.MapType.ofRequired(6, 7, Types.StringType.get(), Types.StringType.get())), - // Types.NestedField.required( - // 8, - // "representations", - // Types.ListType.ofRequired( - // 9, - // Types.StructType.of( - // Types.NestedField.required(10, "type", Types.StringType.get()), - // Types.NestedField.required(11, "sql", Types.StringType.get()), - // Types.NestedField.required(12, "dialect", Types.StringType.get())))), - // Types.NestedField.optional(13, "default-catalog", Types.StringType.get()), - // Types.NestedField.required(14, "default-namespace", Types.StringType.get())); - static final Schema VIEW_VERSION_SCHEMA = new Schema( Types.NestedField.required(1, "version-id", Types.IntegerType.get()), @@ -55,7 +36,17 @@ public class ViewVersionTable extends BaseViewMetadataTable { 4, "summary", Types.MapType.ofRequired(6, 7, Types.StringType.get(), Types.StringType.get())), - Types.NestedField.optional(13, "default-catalog", Types.StringType.get())); + Types.NestedField.required( + 8, + "representations", + Types.ListType.ofRequired( + 9, + Types.StructType.of( + Types.NestedField.required(10, "type", Types.StringType.get()), + Types.NestedField.required(11, "sql", Types.StringType.get()), + Types.NestedField.required(12, "dialect", Types.StringType.get())))), + Types.NestedField.optional(13, "default-catalog", Types.StringType.get()), + Types.NestedField.required(14, "default-namespace", Types.StringType.get())); ViewVersionTable(View view) { super(view, view.name() + ".version"); @@ -111,8 +102,19 @@ private static ViewMetadataReadTask.Row viewVersionToRow(ViewVersion version) { return ViewMetadataReadTask.Row.of( version.versionId(), version.schemaId(), - version.timestampMillis(), + version.timestampMillis() * 1000, version.summary(), - version.defaultCatalog()); + version.representations().stream() + .map( + r -> { + SQLViewRepresentation sqlViewRepresentation = (SQLViewRepresentation) r; + return ViewMetadataReadTask.Row.of( + sqlViewRepresentation.type(), + sqlViewRepresentation.type(), + sqlViewRepresentation.dialect()); + }) + .collect(Collectors.toList()), + version.defaultCatalog(), + version.defaultNamespace().toString()); } } From 97573166a9feba2403d0c3535b2519f68ebcb79c Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Mon, 20 Jan 2025 19:36:44 -0800 Subject: [PATCH 08/11] revert testing override --- .../main/java/org/apache/iceberg/rest/RESTSessionCatalog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 459746d4c656..7ce1572deeb0 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -267,7 +267,7 @@ public void initialize(String name, Map unresolved) { if (config.endpoints().isEmpty()) { this.endpoints = - PropertyUtil.propertyAsBoolean(mergedProps, VIEW_ENDPOINTS_SUPPORTED, true) + PropertyUtil.propertyAsBoolean(mergedProps, VIEW_ENDPOINTS_SUPPORTED, false) ? ImmutableSet.builder() .addAll(DEFAULT_ENDPOINTS) .addAll(VIEW_ENDPOINTS) From 7c1296f591f54dbf89f3d1ed3577497cc3355098 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Mon, 20 Jan 2025 23:40:58 -0800 Subject: [PATCH 09/11] add tests --- .../org/apache/iceberg/ViewVersionTable.java | 2 +- .../iceberg/TestMetadataTableScans.java | 12 ++++ .../org/apache/iceberg/view/TestViews.java | 72 +++++++++++++++++++ .../iceberg/spark/extensions/TestViews.java | 27 +++++++ .../iceberg/spark/extensions/TestViews.java | 27 +++++++ 5 files changed, 139 insertions(+), 1 deletion(-) create mode 100644 core/src/test/java/org/apache/iceberg/view/TestViews.java diff --git a/core/src/main/java/org/apache/iceberg/ViewVersionTable.java b/core/src/main/java/org/apache/iceberg/ViewVersionTable.java index 8e62f107f49f..457fd6be196a 100644 --- a/core/src/main/java/org/apache/iceberg/ViewVersionTable.java +++ b/core/src/main/java/org/apache/iceberg/ViewVersionTable.java @@ -110,7 +110,7 @@ private static ViewMetadataReadTask.Row viewVersionToRow(ViewVersion version) { SQLViewRepresentation sqlViewRepresentation = (SQLViewRepresentation) r; return ViewMetadataReadTask.Row.of( sqlViewRepresentation.type(), - sqlViewRepresentation.type(), + sqlViewRepresentation.sql(), sqlViewRepresentation.dialect()); }) .collect(Collectors.toList()), diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index 56b11009fc12..f5be904a51f4 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -45,6 +45,8 @@ import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.StructLikeWrapper; +import org.apache.iceberg.view.TestViews; +import org.apache.iceberg.view.View; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -1725,6 +1727,16 @@ public void testPositionDeletesManyColumns() { assertThat(scanTasks.get(1).file().location()).isEqualTo(delete2.location()); } + @TestTemplate + public void testViewVersionTable() throws Exception { + View view = TestViews.createSampleTestView("view"); + ViewVersionTable viewVersionTable = new ViewVersionTable(view); + + TableScan scan = viewVersionTable.newScan(); + + assertThat(scan.schema()).isEqualTo(ViewVersionTable.VIEW_VERSION_SCHEMA); + } + @TestTemplate public void testFilesTableEstimateSize() throws Exception { preparePartitionedTable(true); diff --git a/core/src/test/java/org/apache/iceberg/view/TestViews.java b/core/src/test/java/org/apache/iceberg/view/TestViews.java new file mode 100644 index 000000000000..4539eb4581e9 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/view/TestViews.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.view; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.types.Types; + +public class TestViews { + + public static TestView createSampleTestView(String name) { + return new TestView( + new TestViewOperations(/* version= */ 1, /* timestampMillis= */ 1737436503101L), name); + } + + public static class TestView extends BaseView { + + public TestView(ViewOperations ops, String name) { + super(ops, name); + } + } + + public static class TestViewOperations implements ViewOperations { + + private ViewMetadata metadata; + + public TestViewOperations(int version, long timestampMillis) { + ViewMetadata.Builder builder = ViewMetadata.builder(); + builder.addSchema(new Schema(Types.NestedField.required(1, "foo", Types.IntegerType.get()))); + builder.addVersion( + ImmutableViewVersion.builder() + .versionId(version) + .schemaId(0) + .timestampMillis(timestampMillis) + .defaultNamespace(Namespace.of("foo.bar")) + .build()); + builder.setCurrentVersionId(version).setLocation("s3://foo/bar"); + this.metadata = builder.build(); + } + + @Override + public ViewMetadata current() { + return metadata; + } + + @Override + public ViewMetadata refresh() { + return metadata; + } + + @Override + public void commit(ViewMetadata base, ViewMetadata updated) {} + } + + private TestViews() {} +} diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index cd91bfdb2067..91ae7432210b 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -2089,6 +2089,33 @@ public void createViewWithRecursiveCycleInSubqueryExpression() { String.format("Recursive cycle in view detected: %s (cycle: %s)", view1, cycle)); } + @TestTemplate + public void readFromViewVersionTable() throws NoSuchTableException { + insertRows(10); + String viewName = viewName("simpleView"); + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + // use non-existing column name to make sure only the SQL definition for spark is loaded + .withQuery("trino", String.format("SELECT non_existing FROM %s", tableName)) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + // Similar to table's metadata table, view's metadata table requires fully qualified name. + List result = sql("SELECT * FROM %s.%s.%s.version", catalogName, NAMESPACE, viewName); + assertThat(result).hasSize(1); + assertThat(result.get(0).length).isEqualTo(7); + // representations + assertThat(result.get(0)[4].toString()) + .isEqualTo("[[sql,SELECT id FROM table,spark], [sql,SELECT non_existing FROM table,trino]]"); + } + private void insertRows(int numRows) throws NoSuchTableException { List records = Lists.newArrayListWithCapacity(numRows); for (int i = 1; i <= numRows; i++) { diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index 2a1f1c7eae2b..ec01916cae10 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -2093,6 +2093,33 @@ public void createViewWithRecursiveCycleInSubqueryExpression() { String.format("Recursive cycle in view detected: %s (cycle: %s)", view1, cycle)); } + @TestTemplate + public void readFromViewVersionTable() throws NoSuchTableException { + insertRows(10); + String viewName = viewName("simpleView"); + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + // use non-existing column name to make sure only the SQL definition for spark is loaded + .withQuery("trino", String.format("SELECT non_existing FROM %s", tableName)) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + // Similar to table's metadata table, view's metadata table requires fully qualified name. + List result = sql("SELECT * FROM %s.%s.%s.version", catalogName, NAMESPACE, viewName); + assertThat(result).hasSize(1); + assertThat(result.get(0).length).isEqualTo(7); + // representations + assertThat(result.get(0)[4].toString()) + .isEqualTo("[[sql,SELECT id FROM table,spark], [sql,SELECT non_existing FROM table,trino]]"); + } + private void insertRows(int numRows) throws NoSuchTableException { List records = Lists.newArrayListWithCapacity(numRows); for (int i = 1; i <= numRows; i++) { From ed319fadfe8ea3cb5b2c082d227c94bd2c2bdddd Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Mon, 20 Jan 2025 23:53:32 -0800 Subject: [PATCH 10/11] fix format --- .../java/org/apache/iceberg/spark/extensions/TestViews.java | 3 ++- .../java/org/apache/iceberg/spark/extensions/TestViews.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index 91ae7432210b..f21ce71c70d1 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -2113,7 +2113,8 @@ public void readFromViewVersionTable() throws NoSuchTableException { assertThat(result.get(0).length).isEqualTo(7); // representations assertThat(result.get(0)[4].toString()) - .isEqualTo("[[sql,SELECT id FROM table,spark], [sql,SELECT non_existing FROM table,trino]]"); + .isEqualTo( + "[[sql,SELECT id FROM table,spark], [sql,SELECT non_existing FROM table,trino]]"); } private void insertRows(int numRows) throws NoSuchTableException { diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index ec01916cae10..5242bdd40640 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -2117,7 +2117,8 @@ public void readFromViewVersionTable() throws NoSuchTableException { assertThat(result.get(0).length).isEqualTo(7); // representations assertThat(result.get(0)[4].toString()) - .isEqualTo("[[sql,SELECT id FROM table,spark], [sql,SELECT non_existing FROM table,trino]]"); + .isEqualTo( + "[[sql,SELECT id FROM table,spark], [sql,SELECT non_existing FROM table,trino]]"); } private void insertRows(int numRows) throws NoSuchTableException { From b456118b6ffca2d2b2ee49b8e944ea1d8ca97b00 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 21 Jan 2025 07:22:32 -0800 Subject: [PATCH 11/11] fix spark 3.4 test --- .../java/org/apache/iceberg/spark/extensions/TestViews.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index f21ce71c70d1..34635ac54d27 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -2089,7 +2089,7 @@ public void createViewWithRecursiveCycleInSubqueryExpression() { String.format("Recursive cycle in view detected: %s (cycle: %s)", view1, cycle)); } - @TestTemplate + @Test public void readFromViewVersionTable() throws NoSuchTableException { insertRows(10); String viewName = viewName("simpleView");