Skip to content

Commit

Permalink
Merge pull request #105 from datafuselabs/fix/impl-preparestmt-method
Browse files Browse the repository at this point in the history
feat: impl prepareStatement method
  • Loading branch information
hantmac authored Nov 16, 2023
2 parents 16374ff + 582a03b commit 9854931
Show file tree
Hide file tree
Showing 20 changed files with 1,030 additions and 16 deletions.
34 changes: 34 additions & 0 deletions databend-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,29 @@
<artifactId>okhttp</artifactId>
<version>${dep.okhttp.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.13.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.6</version>
</dependency>



<!-- https://mvnrepository.com/artifact/com.squareup.okio/okio -->
<dependency>
<groupId>com.squareup.okio</groupId>
Expand Down Expand Up @@ -74,6 +97,17 @@
<artifactId>gson</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.databend.jdbc.cloud.DatabendStage;
import com.databend.jdbc.parser.BatchInsertUtils;
import com.solidfire.gson.Gson;
import lombok.NonNull;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
Expand Down Expand Up @@ -42,6 +43,7 @@
import java.util.*;
import java.util.function.Consumer;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import static com.databend.jdbc.ObjectCasts.castToBigDecimal;
import static com.databend.jdbc.ObjectCasts.castToBinary;
Expand All @@ -52,6 +54,7 @@
import static com.databend.jdbc.ObjectCasts.castToInt;
import static com.databend.jdbc.ObjectCasts.castToLong;
import static com.databend.jdbc.ObjectCasts.castToShort;
import static com.databend.jdbc.StatementUtil.replaceParameterMarksWithValues;
import static java.lang.String.format;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME;
Expand All @@ -60,6 +63,7 @@
public class DatabendPreparedStatement extends DatabendStatement implements PreparedStatement {
private static final Logger logger = Logger.getLogger(DatabendPreparedStatement.class.getPackage().getName());
static final DateTimeFormatter DATE_FORMATTER = ISODateTimeFormat.date();
private final RawStatementWrapper rawStatement;
static final DateTimeFormatter TIME_FORMATTER = DateTimeFormat.forPattern("HH:mm:ss.SSS");
static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS");
private static final java.time.format.DateTimeFormatter LOCAL_DATE_TIME_FORMATTER =
Expand All @@ -85,6 +89,7 @@ public class DatabendPreparedStatement extends DatabendStatement implements Prep
this.originalSql = requireNonNull(sql, "sql is null");
this.batchValues = new ArrayList<>();
this.batchInsertUtils = BatchInsertUtils.tryParseInsertSql(sql);
this.rawStatement = StatementUtil.parseToRawStatementWrapper(sql);
}

private static String formatBooleanLiteral(boolean x) {
Expand Down Expand Up @@ -346,14 +351,41 @@ public int[] executeBatch() throws SQLException {
@Override
public ResultSet executeQuery()
throws SQLException {
this.executeBatch();
String sql = replaceParameterMarksWithValues(batchInsertUtils.get().getProvideParams(), this.originalSql).get(0).getSql();
internalExecute(sql, null);
return getResultSet();
}

private List<StatementInfoWrapper> prepareSQL(@NonNull Map<Integer, String> params) {
return replaceParameterMarksWithValues(params, this.rawStatement);
}

@Override
public int executeUpdate()
public boolean execute()
throws SQLException {
return 0;
return this.execute(prepareSQL(batchInsertUtils.get().getProvideParams())).isPresent();
}

protected Optional<ResultSet> execute(List<StatementInfoWrapper> statements) throws SQLException {
Optional<ResultSet> resultSet = Optional.empty();
try {
for (int i = 0; i < statements.size(); i++) {
if (i == 0) {
internalExecute(statements.get(i).getSql(), null);
resultSet = Optional.ofNullable(getResultSet());
} else {
internalExecute(statements.get(i).getSql(), null);
}
}
} finally {
}
return resultSet;
}

@Override
public int executeUpdate() throws SQLException {
this.execute(prepareSQL(batchInsertUtils.get().getProvideParams())).isPresent();
return batchInsertUtils.get().getProvideParams().size();
}

@Override
Expand Down Expand Up @@ -630,11 +662,6 @@ public static String convertArrayListToString(ArrayList<?> arrayList) {
return builder.toString();
}

@Override
public boolean execute()
throws SQLException {
return false;
}

@Override
public void addBatch()
Expand Down
74 changes: 74 additions & 0 deletions databend-jdbc/src/main/java/com/databend/jdbc/LoggerUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.databend.jdbc;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors;

import com.databend.jdbc.log.DatabendLogger;
import com.databend.jdbc.log.JDKLogger;
import com.databend.jdbc.log.SLF4JLogger;

import lombok.CustomLog;
import lombok.experimental.UtilityClass;

@UtilityClass
@CustomLog
public class LoggerUtil {

private static Boolean slf4jAvailable;

/**
* Provides a {@link DatabendLogger} based on whether SLF4J is available or not.
*
* @param name logger name
* @return a {@link DatabendLogger}
*/
public static DatabendLogger getLogger(String name) {
if (slf4jAvailable == null) {
slf4jAvailable = isSlf4jJAvailable();
}

if (slf4jAvailable) {
return new SLF4JLogger(name);
} else {
return new JDKLogger(name);
}
}

/**
* Logs the {@link InputStream}
*
* @param is the {@link InputStream}
* @return a copy of the {@link InputStream} provided
*/
public InputStream logInputStream(InputStream is) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
byte[] buffer = new byte[1024];
int len;
while ((len = is.read(buffer)) > -1) {
baos.write(buffer, 0, len);
}
baos.flush();
InputStream streamToLog = new ByteArrayInputStream(baos.toByteArray());
String text = new BufferedReader(new InputStreamReader(streamToLog, StandardCharsets.UTF_8)).lines()
.collect(Collectors.joining("\n"));
log.info("======================================");
log.info(text);
log.info("======================================");
return new ByteArrayInputStream(baos.toByteArray());
} catch (Exception ex) {
log.warn("Could not log the stream", ex);
}
return new ByteArrayInputStream(baos.toByteArray());
}

private static boolean isSlf4jJAvailable() {
try {
Class.forName("org.slf4j.Logger");
return true;
} catch (ClassNotFoundException ex) {
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.databend.jdbc;

import static com.databend.jdbc.StatementType.NON_QUERY;

import java.util.List;


import lombok.EqualsAndHashCode;

/**
* A non query statement is a statement that does not return data (such as
* INSERT)
*/
@EqualsAndHashCode(callSuper = true)
public class NonQueryRawStatement extends RawStatement {

public NonQueryRawStatement(String sql, String cleanSql, List<ParamMarker> paramPositions) {
super(sql, cleanSql, paramPositions);
}

@Override
public StatementType getStatementType() {
return NON_QUERY;
}
}
10 changes: 10 additions & 0 deletions databend-jdbc/src/main/java/com/databend/jdbc/ParamMarker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.databend.jdbc;
import lombok.AllArgsConstructor;
import lombok.Value;

@AllArgsConstructor
@Value
public class ParamMarker {
int id; // Id / index of the param marker in the SQL statement
int position; // Position in the SQL subStatement
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.databend.jdbc;

import static com.databend.jdbc.StatementType.QUERY;

import java.util.List;
import java.util.Optional;

import org.apache.commons.lang3.tuple.Pair;


import lombok.EqualsAndHashCode;
import lombok.Getter;

/**
* A query statement is a statement that returns data (Typically starts with
* SELECT, SHOW, etc)
*/
@Getter
@EqualsAndHashCode(callSuper = true)
public class QueryRawStatement extends RawStatement {

private final String database;

private final String table;

public QueryRawStatement(String sql, String cleanSql, List<ParamMarker> paramPositions) {
super(sql, cleanSql, paramPositions);
Pair<Optional<String>, Optional<String>> databaseAndTablePair = StatementUtil
.extractDbNameAndTableNamePairFromCleanQuery(this.getCleanSql());
this.database = databaseAndTablePair.getLeft().orElse(null);
this.table = databaseAndTablePair.getRight().orElse(null);
}

@Override
public StatementType getStatementType() {
return QUERY;
}

}
57 changes: 57 additions & 0 deletions databend-jdbc/src/main/java/com/databend/jdbc/RawStatement.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.databend.jdbc;

import java.util.List;
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import com.databend.jdbc.ParamMarker;
import com.databend.jdbc.StatementType;

import lombok.Data;

@Data
public abstract class RawStatement {

private final String sql;
private final String cleanSql;
private final List<ParamMarker> paramMarkers;

protected RawStatement(String sql, String cleanSql, List<ParamMarker> paramPositions) {
this.sql = sql;
this.cleanSql = cleanSql;
this.paramMarkers = paramPositions;
}

public static RawStatement of(String sql, List<ParamMarker> paramPositions, String cleanSql) {
Optional<Pair<String, String>> additionalProperties = StatementUtil.extractParamFromSetStatement(cleanSql, sql);
if (additionalProperties.isPresent()) {
return new SetParamRawStatement(sql, cleanSql, paramPositions, additionalProperties.get());
} else if (StatementUtil.isQuery(cleanSql)) {
return new QueryRawStatement(sql, cleanSql, paramPositions);
} else {
return new NonQueryRawStatement(sql, cleanSql, paramPositions);
}
}

@Override
public String toString() {
return "RawSqlStatement{" + "sql='" + sql + '\'' + ", cleanSql='" + cleanSql + '\'' + ", paramMarkers="
+ StringUtils.join(paramMarkers, "|") + '}';
}

public List<ParamMarker> getParamMarkers() {
return paramMarkers;
}

public String getSql() {
return sql;
}

public String getCleanSql() {
return cleanSql;
}

public abstract StatementType getStatementType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.databend.jdbc;
import java.util.Collection;
import java.util.List;

import org.apache.commons.lang3.StringUtils;

import lombok.CustomLog;
import lombok.Value;

@CustomLog
@Value
public class RawStatementWrapper {

List<RawStatement> subStatements;

long totalParams;

public RawStatementWrapper(List<RawStatement> subStatements) {
this.subStatements = subStatements;
this.totalParams = subStatements.stream().map(RawStatement::getParamMarkers).mapToLong(Collection::size).sum();
}

@Override
public String toString() {
return "SqlQueryWrapper{" + "subQueries=" + StringUtils.join(subStatements, "|") + ", totalParams="
+ totalParams + '}';
}

}
Loading

0 comments on commit 9854931

Please sign in to comment.