diff --git a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml index d4584a4b94..ed7e7b8a13 100644 --- a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml +++ b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml @@ -24,7 +24,7 @@ catalogs: schema: |- { "schemaType": "AVRO", - "schema": "{\"type\": \"record\", \"name\": \"cities\", \"namespace\": \"dev\", \"fields\": [ {\"name\": \"city\", \"type\": \"string\"}, {\"name\": \"id\", \"type\": \"string\"}]}" + "schema": "{\"type\": \"record\", \"name\": \"cities\", \"namespace\": \"dev\", \"fields\": [ {\"name\": \"id\", \"type\": \"string\"}, {\"name\": \"city\", \"type\": \"string\"}]}" } bindings: app0: diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java index f01fdf3530..afd2658690 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java @@ -1364,7 +1364,7 @@ private void decodeDropTopicCommand( } else if (server.commandsProcessed == 0) { - Set drops = parser.parseDrop(statement); + List drops = parser.parseDrop(statement); drops.stream().findFirst().ifPresent(d -> { final PgsqlKafkaBindingConfig binding = server.binding; diff --git a/incubator/binding-pgsql/src/main/antlr4/io/aklivity/zilla/runtime/binding/pgsql/parser/PostgreSqlParser.g4 b/incubator/binding-pgsql/src/main/antlr4/io/aklivity/zilla/runtime/binding/pgsql/parser/PostgreSqlParser.g4 index 99534e2007..267617c5ff 100644 --- a/incubator/binding-pgsql/src/main/antlr4/io/aklivity/zilla/runtime/binding/pgsql/parser/PostgreSqlParser.g4 +++ b/incubator/binding-pgsql/src/main/antlr4/io/aklivity/zilla/runtime/binding/pgsql/parser/PostgreSqlParser.g4 @@ -656,7 +656,7 @@ copy_generic_opt_arg_list_item ; createstreamstmt - : CREATE STREAM stream_name OPEN_PAREN stream_columns CLOSE_PAREN opt_with_stream + : CREATE STREAM (IF_P NOT EXISTS)? stream_name OPEN_PAREN stream_columns CLOSE_PAREN opt_with_stream ; stream_name diff --git a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/PgsqlParser.java b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/PgsqlParser.java index 1c7b703090..8a6a60ca29 100644 --- a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/PgsqlParser.java +++ b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/PgsqlParser.java @@ -14,7 +14,7 @@ */ package io.aklivity.zilla.runtime.binding.pgsql.parser; -import java.util.Set; +import java.util.List; import org.antlr.v4.runtime.BailErrorStrategy; import org.antlr.v4.runtime.CharStream; @@ -98,7 +98,7 @@ public ViewInfo parseCreateMaterializedView( return createMaterializedViewListener.viewInfo(); } - public Set parseDrop( + public List parseDrop( String sql) { parser(sql, dropListener); diff --git a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateFunctionListener.java b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateFunctionListener.java index 56f5e5dd7f..878a77f9fb 100644 --- a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateFunctionListener.java +++ b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateFunctionListener.java @@ -24,8 +24,9 @@ public class SqlCreateFunctionListener extends PostgreSqlParserBaseListener { - private String name; private final List arguments = new ArrayList<>(); + + private String name; private String returnType; private String asFunction; private String language; diff --git a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateStreamListener.java b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateStreamListener.java index cb922d14f7..f0dd2306a3 100644 --- a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateStreamListener.java +++ b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateStreamListener.java @@ -14,10 +14,9 @@ */ package io.aklivity.zilla.runtime.binding.pgsql.parser.listener; +import java.util.LinkedHashMap; import java.util.Map; -import org.agrona.collections.Object2ObjectHashMap; - import io.aklivity.zilla.runtime.binding.pgsql.parser.PostgreSqlParser; import io.aklivity.zilla.runtime.binding.pgsql.parser.PostgreSqlParserBaseListener; import io.aklivity.zilla.runtime.binding.pgsql.parser.module.StreamInfo; @@ -25,7 +24,7 @@ public class SqlCreateStreamListener extends PostgreSqlParserBaseListener { private String name; - private final Map columns = new Object2ObjectHashMap<>(); + private final Map columns = new LinkedHashMap<>(); public StreamInfo streamInfo() { diff --git a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateTableTopicListener.java b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateTableTopicListener.java index 56a2cf597f..7fbc3cd6b7 100644 --- a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateTableTopicListener.java +++ b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateTableTopicListener.java @@ -14,10 +14,10 @@ */ package io.aklivity.zilla.runtime.binding.pgsql.parser.listener; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; -import org.agrona.collections.Object2ObjectHashMap; import org.agrona.collections.ObjectHashSet; import io.aklivity.zilla.runtime.binding.pgsql.parser.PostgreSqlParser; @@ -27,7 +27,7 @@ public class SqlCreateTableTopicListener extends PostgreSqlParserBaseListener { private String name; - private final Map columns = new Object2ObjectHashMap<>(); + private final Map columns = new LinkedHashMap<>(); private final Set primaryKeys = new ObjectHashSet<>(); public TableInfo tableInfo() diff --git a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlDropListener.java b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlDropListener.java index ca079d0323..d4fc4b31b1 100644 --- a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlDropListener.java +++ b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlDropListener.java @@ -14,18 +14,17 @@ */ package io.aklivity.zilla.runtime.binding.pgsql.parser.listener; -import java.util.Set; - -import org.agrona.collections.ObjectHashSet; +import java.util.ArrayList; +import java.util.List; import io.aklivity.zilla.runtime.binding.pgsql.parser.PostgreSqlParser; import io.aklivity.zilla.runtime.binding.pgsql.parser.PostgreSqlParserBaseListener; public class SqlDropListener extends PostgreSqlParserBaseListener { - private final Set drops = new ObjectHashSet<>(); + private final List drops = new ArrayList<>(); - public Set drops() + public List drops() { return drops; } diff --git a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/module/FunctionArgument.java b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/module/FunctionArgument.java index d9e140e804..7196c49f5a 100644 --- a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/module/FunctionArgument.java +++ b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/module/FunctionArgument.java @@ -1,3 +1,17 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ package io.aklivity.zilla.runtime.binding.pgsql.parser.module; public record FunctionArgument( diff --git a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/module/StreamInfo.java b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/module/StreamInfo.java index 5c8db5dc69..5a2880ea68 100644 --- a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/module/StreamInfo.java +++ b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/module/StreamInfo.java @@ -17,7 +17,7 @@ import java.util.Map; public record StreamInfo( - String name, - Map columns) + String name, + Map columns) { } diff --git a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/module/TableInfo.java b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/module/TableInfo.java index 6244ae7534..8b0067d5b1 100644 --- a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/module/TableInfo.java +++ b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/module/TableInfo.java @@ -18,8 +18,8 @@ import java.util.Set; public record TableInfo( - String name, - Map columns, - Set primaryKeys) + String name, + Map columns, + Set primaryKeys) { } diff --git a/incubator/binding-pgsql/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/parser/PgsqlParserTest.java b/incubator/binding-pgsql/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/parser/PgsqlParserTest.java index d25bea9fb2..f069fc8d9b 100644 --- a/incubator/binding-pgsql/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/parser/PgsqlParserTest.java +++ b/incubator/binding-pgsql/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/parser/PgsqlParserTest.java @@ -18,12 +18,14 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import java.util.Set; +import java.util.List; import org.antlr.v4.runtime.misc.ParseCancellationException; import org.junit.Before; import org.junit.Test; +import io.aklivity.zilla.runtime.binding.pgsql.parser.module.FunctionInfo; +import io.aklivity.zilla.runtime.binding.pgsql.parser.module.StreamInfo; import io.aklivity.zilla.runtime.binding.pgsql.parser.module.TableInfo; import io.aklivity.zilla.runtime.binding.pgsql.parser.module.ViewInfo; @@ -154,7 +156,7 @@ public void shouldHandleInvalidCreateTable() public void shouldParseDropSingleTable() { String sql = "DROP TABLE test_table;"; - Set drops = parser.parseDrop(sql); + List drops = parser.parseDrop(sql); assertEquals(1, drops.size()); assertTrue(drops.contains("test_table")); } @@ -163,7 +165,7 @@ public void shouldParseDropSingleTable() public void shouldParseDropMultipleTables() { String sql = "DROP TABLE table1, table2;"; - Set drops = parser.parseDrop(sql); + List drops = parser.parseDrop(sql); assertEquals(2, drops.size()); assertTrue(drops.contains("table1")); assertTrue(drops.contains("table2")); @@ -173,7 +175,7 @@ public void shouldParseDropMultipleTables() public void shouldHandleEmptyDropStatement() { String sql = "DROP TABLE;"; - Set drops = parser.parseDrop(sql); + List drops = parser.parseDrop(sql); assertEquals(0, drops.size()); } @@ -181,7 +183,7 @@ public void shouldHandleEmptyDropStatement() public void shouldParseDropView() { String sql = "DROP VIEW test_view;"; - Set drops = parser.parseDrop(sql); + List drops = parser.parseDrop(sql); assertEquals(1, drops.size()); assertTrue(drops.contains("test_view")); } @@ -190,9 +192,57 @@ public void shouldParseDropView() public void shouldParseDropMaterializedView() { String sql = "DROP MATERIALIZED VIEW test_materialized_view;"; - Set drops = parser.parseDrop(sql); + List drops = parser.parseDrop(sql); assertEquals(1, drops.size()); assertTrue(drops.contains("test_materialized_view")); } + @Test + public void shouldParseCreateStream() + { + String sql = "CREATE STREAM test_stream (id INT, name VARCHAR(100));"; + StreamInfo streamInfo = parser.parseCreateStream(sql); + assertNotNull(streamInfo); + assertEquals("test_stream", streamInfo.name()); + assertEquals(2, streamInfo.columns().size()); + assertEquals("INT", streamInfo.columns().get("id")); + assertEquals("VARCHAR(100)", streamInfo.columns().get("name")); + } + + @Test + public void shouldParseCreateStreamIfNotExists() + { + String sql = "CREATE STREAM IF NOT EXISTS test_stream (id INT, name VARCHAR(100));"; + StreamInfo streamInfo = parser.parseCreateStream(sql); + assertNotNull(streamInfo); + assertEquals("test_stream", streamInfo.name()); + assertEquals(2, streamInfo.columns().size()); + assertEquals("INT", streamInfo.columns().get("id")); + assertEquals("VARCHAR(100)", streamInfo.columns().get("name")); + } + + @Test(expected = ParseCancellationException.class) + public void shouldHandleInvalidCreateStream() + { + String sql = "CREATE STREAM test_stream"; + parser.parseCreateStream(sql); + } + + @Test + public void shouldParseCreateFunction() + { + String sql = "CREATE FUNCTION test_function() RETURNS INT AS $$ BEGIN RETURN 1; END $$ LANGUAGE plpgsql;"; + FunctionInfo functionInfo = parser.parseCreateFunction(sql); + assertNotNull(functionInfo); + assertEquals("test_function", functionInfo.name()); + assertEquals("INT", functionInfo.returnType()); + } + + @Test(expected = ParseCancellationException.class) + public void shouldHandleInvalidCreateFunction() + { + String sql = "CREATE FUNCTION test_function()"; + parser.parseCreateFunction(sql); + } + } diff --git a/incubator/binding-risingwave/NOTICE b/incubator/binding-risingwave/NOTICE index c6b7d9c015..9024d8926d 100644 --- a/incubator/binding-risingwave/NOTICE +++ b/incubator/binding-risingwave/NOTICE @@ -10,5 +10,4 @@ WARRANTIES OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. This project includes: - JSQLParser library under GNU Library or Lesser General Public License (LGPL) V2.1 or The Apache Software License, Version 2.0