Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek committed Oct 17, 2024
1 parent 526fc5a commit 3a9a483
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ catalogs:
schema: |-
{
"schemaType": "AVRO",
"schema": "{\"type\": \"record\", \"name\": \"cities\", \"namespace\": \"dev\", \"fields\": [ {\"name\": \"city\", \"type\": \"string\"}, {\"name\": \"id\", \"type\": \"string\"}]}"
"schema": "{\"type\": \"record\", \"name\": \"cities\", \"namespace\": \"dev\", \"fields\": [ {\"name\": \"id\", \"type\": \"string\"}, {\"name\": \"city\", \"type\": \"string\"}]}"
}
bindings:
app0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,7 @@ private void decodeDropTopicCommand(
}
else if (server.commandsProcessed == 0)
{
Set<String> drops = parser.parseDrop(statement);
List<String> drops = parser.parseDrop(statement);
drops.stream().findFirst().ifPresent(d ->
{
final PgsqlKafkaBindingConfig binding = server.binding;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ copy_generic_opt_arg_list_item
;

createstreamstmt
: CREATE STREAM stream_name OPEN_PAREN stream_columns CLOSE_PAREN opt_with_stream
: CREATE STREAM (IF_P NOT EXISTS)? stream_name OPEN_PAREN stream_columns CLOSE_PAREN opt_with_stream
;

stream_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package io.aklivity.zilla.runtime.binding.pgsql.parser;

import java.util.Set;
import java.util.List;

import org.antlr.v4.runtime.BailErrorStrategy;
import org.antlr.v4.runtime.CharStream;
Expand Down Expand Up @@ -98,7 +98,7 @@ public ViewInfo parseCreateMaterializedView(
return createMaterializedViewListener.viewInfo();
}

public Set<String> parseDrop(
public List<String> parseDrop(
String sql)
{
parser(sql, dropListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@

public class SqlCreateFunctionListener extends PostgreSqlParserBaseListener
{
private String name;
private final List<FunctionArgument> arguments = new ArrayList<>();

private String name;
private String returnType;
private String asFunction;
private String language;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@
*/
package io.aklivity.zilla.runtime.binding.pgsql.parser.listener;

import java.util.LinkedHashMap;
import java.util.Map;

import org.agrona.collections.Object2ObjectHashMap;

import io.aklivity.zilla.runtime.binding.pgsql.parser.PostgreSqlParser;
import io.aklivity.zilla.runtime.binding.pgsql.parser.PostgreSqlParserBaseListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.module.StreamInfo;

public class SqlCreateStreamListener extends PostgreSqlParserBaseListener
{
private String name;
private final Map<String, String> columns = new Object2ObjectHashMap<>();
private final Map<String, String> columns = new LinkedHashMap<>();

public StreamInfo streamInfo()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
*/
package io.aklivity.zilla.runtime.binding.pgsql.parser.listener;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

import org.agrona.collections.Object2ObjectHashMap;
import org.agrona.collections.ObjectHashSet;

import io.aklivity.zilla.runtime.binding.pgsql.parser.PostgreSqlParser;
Expand All @@ -27,7 +27,7 @@
public class SqlCreateTableTopicListener extends PostgreSqlParserBaseListener
{
private String name;
private final Map<String, String> columns = new Object2ObjectHashMap<>();
private final Map<String, String> columns = new LinkedHashMap<>();
private final Set<String> primaryKeys = new ObjectHashSet<>();

public TableInfo tableInfo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@
*/
package io.aklivity.zilla.runtime.binding.pgsql.parser.listener;

import java.util.Set;

import org.agrona.collections.ObjectHashSet;
import java.util.ArrayList;
import java.util.List;

import io.aklivity.zilla.runtime.binding.pgsql.parser.PostgreSqlParser;
import io.aklivity.zilla.runtime.binding.pgsql.parser.PostgreSqlParserBaseListener;

public class SqlDropListener extends PostgreSqlParserBaseListener
{
private final Set<String> drops = new ObjectHashSet<>();
private final List<String> drops = new ArrayList<>();

public Set<String> drops()
public List<String> drops()
{
return drops;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.pgsql.parser.module;

public record FunctionArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.Map;

public record StreamInfo(
String name,
Map<String, String> columns)
String name,
Map<String, String> columns)
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import java.util.Set;

public record TableInfo(
String name,
Map<String, String> columns,
Set<String> primaryKeys)
String name,
Map<String, String> columns,
Set<String> primaryKeys)
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.util.Set;
import java.util.List;

import org.antlr.v4.runtime.misc.ParseCancellationException;
import org.junit.Before;
import org.junit.Test;

import io.aklivity.zilla.runtime.binding.pgsql.parser.module.FunctionInfo;
import io.aklivity.zilla.runtime.binding.pgsql.parser.module.StreamInfo;
import io.aklivity.zilla.runtime.binding.pgsql.parser.module.TableInfo;
import io.aklivity.zilla.runtime.binding.pgsql.parser.module.ViewInfo;

Expand Down Expand Up @@ -154,7 +156,7 @@ public void shouldHandleInvalidCreateTable()
public void shouldParseDropSingleTable()
{
String sql = "DROP TABLE test_table;";
Set<String> drops = parser.parseDrop(sql);
List<String> drops = parser.parseDrop(sql);
assertEquals(1, drops.size());
assertTrue(drops.contains("test_table"));
}
Expand All @@ -163,7 +165,7 @@ public void shouldParseDropSingleTable()
public void shouldParseDropMultipleTables()
{
String sql = "DROP TABLE table1, table2;";
Set<String> drops = parser.parseDrop(sql);
List<String> drops = parser.parseDrop(sql);
assertEquals(2, drops.size());
assertTrue(drops.contains("table1"));
assertTrue(drops.contains("table2"));
Expand All @@ -173,15 +175,15 @@ public void shouldParseDropMultipleTables()
public void shouldHandleEmptyDropStatement()
{
String sql = "DROP TABLE;";
Set<String> drops = parser.parseDrop(sql);
List<String> drops = parser.parseDrop(sql);
assertEquals(0, drops.size());
}

@Test
public void shouldParseDropView()
{
String sql = "DROP VIEW test_view;";
Set<String> drops = parser.parseDrop(sql);
List<String> drops = parser.parseDrop(sql);
assertEquals(1, drops.size());
assertTrue(drops.contains("test_view"));
}
Expand All @@ -190,9 +192,57 @@ public void shouldParseDropView()
public void shouldParseDropMaterializedView()
{
String sql = "DROP MATERIALIZED VIEW test_materialized_view;";
Set<String> drops = parser.parseDrop(sql);
List<String> drops = parser.parseDrop(sql);
assertEquals(1, drops.size());
assertTrue(drops.contains("test_materialized_view"));
}

@Test
public void shouldParseCreateStream()
{
String sql = "CREATE STREAM test_stream (id INT, name VARCHAR(100));";
StreamInfo streamInfo = parser.parseCreateStream(sql);
assertNotNull(streamInfo);
assertEquals("test_stream", streamInfo.name());
assertEquals(2, streamInfo.columns().size());
assertEquals("INT", streamInfo.columns().get("id"));
assertEquals("VARCHAR(100)", streamInfo.columns().get("name"));
}

@Test
public void shouldParseCreateStreamIfNotExists()
{
String sql = "CREATE STREAM IF NOT EXISTS test_stream (id INT, name VARCHAR(100));";
StreamInfo streamInfo = parser.parseCreateStream(sql);
assertNotNull(streamInfo);
assertEquals("test_stream", streamInfo.name());
assertEquals(2, streamInfo.columns().size());
assertEquals("INT", streamInfo.columns().get("id"));
assertEquals("VARCHAR(100)", streamInfo.columns().get("name"));
}

@Test(expected = ParseCancellationException.class)
public void shouldHandleInvalidCreateStream()
{
String sql = "CREATE STREAM test_stream";
parser.parseCreateStream(sql);
}

@Test
public void shouldParseCreateFunction()
{
String sql = "CREATE FUNCTION test_function() RETURNS INT AS $$ BEGIN RETURN 1; END $$ LANGUAGE plpgsql;";
FunctionInfo functionInfo = parser.parseCreateFunction(sql);
assertNotNull(functionInfo);
assertEquals("test_function", functionInfo.name());
assertEquals("INT", functionInfo.returnType());
}

@Test(expected = ParseCancellationException.class)
public void shouldHandleInvalidCreateFunction()
{
String sql = "CREATE FUNCTION test_function()";
parser.parseCreateFunction(sql);
}

}
1 change: 0 additions & 1 deletion incubator/binding-risingwave/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,4 @@ WARRANTIES OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.

This project includes:
JSQLParser library under GNU Library or Lesser General Public License (LGPL) V2.1 or The Apache Software License, Version 2.0

0 comments on commit 3a9a483

Please sign in to comment.