Skip to content

Commit

Permalink
[Feature] Suppor††t clickhouse jdbc datasource(StarRocks#40894)
Browse files Browse the repository at this point in the history
  • Loading branch information
DataScientistSamChan committed Mar 25, 2024
1 parent 7706785 commit 9072fbf
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 17 deletions.
9 changes: 9 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,15 @@ under the License.
<artifactId>postgresql</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.clickhouse/clickhouse-jdbc -->
<!-- we need clickhouse driver for jdbc connector -->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.4.6</version>
</dependency>


<!-- https://mvnrepository.com/artifact/com.mockrunner/mockrunner-jdbc -->
<dependency>
<groupId>com.mockrunner</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
public class ClickhouseSchemaResolver extends JDBCSchemaResolver {
Map<String, String> properties;


public static final String KEY_FOR_TABLE_NAME_FOR_PARTITION_INFO = "table_name_for_partition_info";
public static final String KEY_FOR_TABLE_NAME_FOR_TABLE_INFO = "table_name_for_table_info";
private static final String DEFAULT_TABLE_NAME_FOR_PARTITION_INFO = "system.parts";
private static final String DEFAULT_TABLE_NAME_FOR_TABLE_INFO = "system.tables";
private static final Set<String> SUPPORTED_TABLE_TYPES = new HashSet<>(
Expand Down Expand Up @@ -100,23 +103,41 @@ public boolean checkAndSetSupportPartitionInformation(Connection connection) {
// The architecture of user's clickhouse is undermined, so we allow user to specify <part_into_table>
// and fall back to clickhouse's default system table for partition information, i.e. `system.parts`.
String tableNameForPartInfo = getTableNameForPartInfo();
try (Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("DESC" + tableNameForPartInfo)) {
return this.supportPartitionInformation = true;
} catch (SQLException e) {
if (e.getMessage().toLowerCase().contains("doesn't exist")) {
return this.supportPartitionInformation = false;
String[] schemaAndTable = tableNameForPartInfo.split("\\.");
if (schemaAndTable.length != 2) {
throw new StarRocksConnectorException(String.format("Invalid table name for partition information: %s," +
"Please specify the full table name <schema>.<table_name>", tableNameForPartInfo));
}
String catalogSchema = schemaAndTable[0];
String partitionInfoTable = schemaAndTable[1];
// Different types of MySQL protocol databases have different case names for schema and table names,
// which need to be converted to lowercase for comparison
try (ResultSet catalogSet = connection.getMetaData().getCatalogs()) {
while (catalogSet.next()) {
String schemaName = catalogSet.getString("TABLE_CAT");
if (schemaName.equalsIgnoreCase(catalogSchema)) {
try (ResultSet tableSet = connection.getMetaData().getTables(catalogSchema, null, null, null)) {
while (tableSet.next()) {
String tableName = tableSet.getString("TABLE_NAME");
if (tableName.equalsIgnoreCase(partitionInfoTable)) {
return this.supportPartitionInformation = true;
}
}
}
}
}
} catch (SQLException e) {
throw new StarRocksConnectorException(e.getMessage());
}
return this.supportPartitionInformation = false;
}

private String getTableNameForPartInfo() {
return properties.getOrDefault("table_name_for_partition_info", DEFAULT_TABLE_NAME_FOR_PARTITION_INFO);
return properties.getOrDefault(KEY_FOR_TABLE_NAME_FOR_PARTITION_INFO, DEFAULT_TABLE_NAME_FOR_PARTITION_INFO);
}

private String getTableNameForTableInfo() {
return properties.getOrDefault("table_name_for_table_info", DEFAULT_TABLE_NAME_FOR_TABLE_INFO);
return properties.getOrDefault(KEY_FOR_TABLE_NAME_FOR_TABLE_INFO, DEFAULT_TABLE_NAME_FOR_TABLE_INFO);
}

@Override
Expand Down Expand Up @@ -190,7 +211,6 @@ public Type convertColumnType(int dataType, String typeName, int columnSize, int

@Override
public List<String> listPartitionNames(Connection connection, String databaseName, String tableName) {
// todo listPartitionNames是想要干什么?
String tableNameForPartInfo = getTableNameForPartInfo();
String partitionNamesQuery = "SELECT DISTINCT partition FROM " + tableNameForPartInfo + " WHERE database = ? " +
"AND table = ? AND name IS NOT NULL " + "ORDER BY name";
Expand Down Expand Up @@ -343,15 +363,19 @@ public List<Partition> getPartitions(Connection connection, Table table) {
*/
@NotNull
private String getPartitionQuery(Table table) {
String tableNameForPartInfo = getTableNameForPartInfo();
final String partitionQuery =
"SELECT partition AS NAME, max(modification_time) AS MODIFIED_TIME FROM " + tableNameForPartInfo +
" WHERE database = ? " + "AND table = ? AND name IS NOT NULL " +
"GROUP BY partition ORDER BY partition";
if (table.isUnPartitioned()) {
throw new StarRocksConnectorException("GetPartitionQuery for unpartitioned table is not implemented");
} else {
if(table.isPartitioned()){
String tableNameForPartInfo = getTableNameForPartInfo();
final String partitionQuery =
"SELECT partition AS NAME, max(modification_time) AS MODIFIED_TIME FROM " + tableNameForPartInfo +
" WHERE database = ? " + "AND table = ? AND name IS NOT NULL" +
" GROUP BY partition ORDER BY partition";
return partitionQuery;
}else{
String tableNameForTableInfo = getTableNameForTableInfo();
final String nonPartitionQuery =
" SELECT name AS NAME, max(metadata_modification_time) AS MODIFIED_TIME FROM " + tableNameForTableInfo +
" WHERE database = ? AND name = ? AND name IS NOT NULL GROUP BY name";
return nonPartitionQuery;
}
}

Expand Down

0 comments on commit 9072fbf

Please sign in to comment.