diff --git a/.gitignore b/.gitignore index b4111e738..2fc481985 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ tmp/ *.swo .bloop/ .metals/ +.vscode/ diff --git a/src/main/java/io/tarantool/driver/api/connection/TarantoolConnection.java b/src/main/java/io/tarantool/driver/api/connection/TarantoolConnection.java index 00915576f..d3fc26c3f 100644 --- a/src/main/java/io/tarantool/driver/api/connection/TarantoolConnection.java +++ b/src/main/java/io/tarantool/driver/api/connection/TarantoolConnection.java @@ -3,12 +3,13 @@ import io.netty.channel.Channel; import io.tarantool.driver.TarantoolVersion; import io.tarantool.driver.exceptions.TarantoolClientException; -import io.tarantool.driver.mappers.MessagePackValueMapper; import io.tarantool.driver.protocol.TarantoolRequest; import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; +import org.msgpack.value.Value; + public interface TarantoolConnection extends AutoCloseable { /** * Get the Tarantool server address for this connection @@ -38,10 +39,9 @@ public interface TarantoolConnection extends AutoCloseable { * * @param request the request * @param resultMapper the mapper for response body - * @param result type * @return result future */ - CompletableFuture sendRequest(TarantoolRequest request, MessagePackValueMapper resultMapper); + CompletableFuture sendRequest(TarantoolRequest request); /** * Get the Netty channel baking this connection diff --git a/src/main/java/io/tarantool/driver/core/AbstractTarantoolClient.java b/src/main/java/io/tarantool/driver/core/AbstractTarantoolClient.java index e7eb176d3..910b9cd2c 100644 --- a/src/main/java/io/tarantool/driver/core/AbstractTarantoolClient.java +++ b/src/main/java/io/tarantool/driver/core/AbstractTarantoolClient.java @@ -495,7 +495,9 @@ private CompletableFuture makeRequest( } TarantoolCallRequest request = builder.build(argumentsMapper); - return connectionManager().getConnection().thenCompose(c -> c.sendRequest(request, resultMapper)); + return connectionManager().getConnection() + .thenCompose(c -> c.sendRequest(request)) + .thenApply(resultMapper::fromValue); } catch (TarantoolProtocolException e) { throw new TarantoolClientException(e); } @@ -535,7 +537,9 @@ public CompletableFuture> eval( .withExpression(expression) .withArguments(arguments) .build(argumentsMapper); - return connectionManager().getConnection().thenCompose(c -> c.sendRequest(request, resultMapper)); + return connectionManager().getConnection() + .thenCompose(c -> c.sendRequest(request)) + .thenApply(resultMapper::fromValue); } catch (TarantoolProtocolException e) { throw new TarantoolClientException(e); } diff --git a/src/main/java/io/tarantool/driver/core/RequestFutureManager.java b/src/main/java/io/tarantool/driver/core/RequestFutureManager.java index 179f3a88f..ba0f1aa0c 100644 --- a/src/main/java/io/tarantool/driver/core/RequestFutureManager.java +++ b/src/main/java/io/tarantool/driver/core/RequestFutureManager.java @@ -1,7 +1,6 @@ package io.tarantool.driver.core; import io.tarantool.driver.api.TarantoolClientConfig; -import io.tarantool.driver.mappers.MessagePackValueMapper; import io.tarantool.driver.protocol.TarantoolRequest; import java.util.Map; @@ -11,6 +10,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.msgpack.value.Value; + /** * Keeps track of submitted requests, finishing them by timeout and allowing asynchronous request processing * @@ -37,12 +38,10 @@ public RequestFutureManager(TarantoolClientConfig config, ScheduledExecutorServi * The request timeout is taken from the client configuration * * @param request request to Tarantool server - * @param resultMapper result message entity-to-object mapper - * @param target response body type * @return {@link CompletableFuture} that completes when a response is received from Tarantool server */ - public CompletableFuture submitRequest(TarantoolRequest request, MessagePackValueMapper resultMapper) { - return submitRequest(request, config.getRequestTimeout(), resultMapper); + public CompletableFuture submitRequest(TarantoolRequest request) { + return submitRequest(request, config.getRequestTimeout()); } /** @@ -51,18 +50,16 @@ public CompletableFuture submitRequest(TarantoolRequest request, MessageP * * @param request request to Tarantool server * @param requestTimeout timeout after which the request will be automatically failed, milliseconds - * @param resultMapper result message entity-to-object mapper - * @param target response body type + * @param target response body MessagePack type * @return {@link CompletableFuture} that completes when a response is received from Tarantool server */ - public CompletableFuture submitRequest( + public CompletableFuture submitRequest( TarantoolRequest request, - int requestTimeout, - MessagePackValueMapper resultMapper) { - CompletableFuture requestFuture = new CompletableFuture<>(); + int requestTimeout) { + CompletableFuture requestFuture = new CompletableFuture<>(); long requestId = request.getHeader().getSync(); requestFuture.whenComplete((r, e) -> requestFutures.remove(requestId)); - requestFutures.put(requestId, new TarantoolRequestMetadata(requestFuture, resultMapper)); + requestFutures.put(requestId, new TarantoolRequestMetadata(requestFuture)); timeoutScheduler.schedule(() -> { if (!requestFuture.isDone()) { requestFuture.completeExceptionally(new TimeoutException(String.format( diff --git a/src/main/java/io/tarantool/driver/core/TarantoolRequestMetadata.java b/src/main/java/io/tarantool/driver/core/TarantoolRequestMetadata.java index 22132bfa9..5b09f23d7 100644 --- a/src/main/java/io/tarantool/driver/core/TarantoolRequestMetadata.java +++ b/src/main/java/io/tarantool/driver/core/TarantoolRequestMetadata.java @@ -1,28 +1,22 @@ package io.tarantool.driver.core; -import io.tarantool.driver.mappers.MessagePackValueMapper; - import java.util.concurrent.CompletableFuture; +import org.msgpack.value.Value; + /** * Intermediate request metadata holder * * @author Alexey Kuzin */ public class TarantoolRequestMetadata { - private final CompletableFuture feature; - private final MessagePackValueMapper mapper; - - protected TarantoolRequestMetadata(CompletableFuture feature, MessagePackValueMapper mapper) { - this.feature = feature; - this.mapper = mapper; - } + private final CompletableFuture future; - public CompletableFuture getFuture() { - return feature; + protected TarantoolRequestMetadata(CompletableFuture future) { + this.future = future; } - public MessagePackValueMapper getMapper() { - return mapper; + public CompletableFuture getFuture() { + return future; } } diff --git a/src/main/java/io/tarantool/driver/core/connection/TarantoolConnectionImpl.java b/src/main/java/io/tarantool/driver/core/connection/TarantoolConnectionImpl.java index 6559172f1..48166bdac 100644 --- a/src/main/java/io/tarantool/driver/core/connection/TarantoolConnectionImpl.java +++ b/src/main/java/io/tarantool/driver/core/connection/TarantoolConnectionImpl.java @@ -8,8 +8,9 @@ import io.tarantool.driver.api.connection.TarantoolConnectionFailureListener; import io.tarantool.driver.core.RequestFutureManager; import io.tarantool.driver.exceptions.TarantoolClientException; -import io.tarantool.driver.mappers.MessagePackValueMapper; import io.tarantool.driver.protocol.TarantoolRequest; + +import org.msgpack.value.Value; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,12 +66,12 @@ public boolean isConnected() { } @Override - public CompletableFuture sendRequest(TarantoolRequest request, MessagePackValueMapper resultMapper) { + public CompletableFuture sendRequest(TarantoolRequest request) { if (!isConnected()) { throw new TarantoolClientException("Not connected to Tarantool server"); } - CompletableFuture requestFuture = requestManager.submitRequest(request, resultMapper); + CompletableFuture requestFuture = requestManager.submitRequest(request); channel.writeAndFlush(request).addListener(f -> { if (!f.isSuccess()) { requestFuture.completeExceptionally( diff --git a/src/main/java/io/tarantool/driver/core/space/TarantoolSpace.java b/src/main/java/io/tarantool/driver/core/space/TarantoolSpace.java index 25ac8d9d1..a248a089b 100644 --- a/src/main/java/io/tarantool/driver/core/space/TarantoolSpace.java +++ b/src/main/java/io/tarantool/driver/core/space/TarantoolSpace.java @@ -260,7 +260,9 @@ private CompletableFuture truncate(MessagePackValueMapper resultMapper) protected abstract MessagePackValueMapper arrayTupleResultMapper(); private CompletableFuture sendRequest(TarantoolRequest request, MessagePackValueMapper resultMapper) { - return connectionManager.getConnection().thenCompose(c -> c.sendRequest(request, resultMapper)); + return connectionManager.getConnection() + .thenCompose(c -> c.sendRequest(request)) + .thenApply(resultMapper::fromValue); } @Override diff --git a/src/main/java/io/tarantool/driver/handlers/TarantoolResponseHandler.java b/src/main/java/io/tarantool/driver/handlers/TarantoolResponseHandler.java index 677fc79d6..dba9e8176 100644 --- a/src/main/java/io/tarantool/driver/handlers/TarantoolResponseHandler.java +++ b/src/main/java/io/tarantool/driver/handlers/TarantoolResponseHandler.java @@ -10,6 +10,8 @@ import io.tarantool.driver.protocol.TarantoolErrorResult; import io.tarantool.driver.protocol.TarantoolOkResult; import io.tarantool.driver.protocol.TarantoolResponse; + +import org.msgpack.value.Value; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +38,7 @@ public TarantoolResponseHandler(RequestFutureManager futureManager) { protected void channelRead0(ChannelHandlerContext ctx, TarantoolResponse tarantoolResponse) throws Exception { TarantoolRequestMetadata requestMeta = futureManager.getRequest(tarantoolResponse.getSyncId()); if (requestMeta != null) { - CompletableFuture requestFuture = requestMeta.getFuture(); + CompletableFuture requestFuture = requestMeta.getFuture(); if (!requestFuture.isDone()) { switch (tarantoolResponse.getResponseType()) { case IPROTO_NOT_OK: @@ -48,7 +50,7 @@ protected void channelRead0(ChannelHandlerContext ctx, TarantoolResponse taranto try { TarantoolOkResult okResult = new TarantoolOkResult(tarantoolResponse.getSyncId(), tarantoolResponse.getBody().getData()); - requestFuture.complete(requestMeta.getMapper().fromValue(okResult.getData())); + requestFuture.complete(okResult.getData()); } catch (Throwable e) { requestFuture.completeExceptionally(e); } @@ -65,7 +67,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E TarantoolDecoderException ex = (TarantoolDecoderException) cause.getCause(); TarantoolRequestMetadata requestMeta = futureManager.getRequest(ex.getHeader().getSync()); if (requestMeta != null) { - CompletableFuture requestFuture = requestMeta.getFuture(); + CompletableFuture requestFuture = requestMeta.getFuture(); if (!requestFuture.isDone()) { requestFuture.completeExceptionally(cause); return; diff --git a/src/test/java/io/tarantool/driver/core/CustomConnection.java b/src/test/java/io/tarantool/driver/core/CustomConnection.java index 99c97d6da..cd743d977 100644 --- a/src/test/java/io/tarantool/driver/core/CustomConnection.java +++ b/src/test/java/io/tarantool/driver/core/CustomConnection.java @@ -6,7 +6,6 @@ import io.tarantool.driver.api.connection.TarantoolConnectionCloseListener; import io.tarantool.driver.api.connection.TarantoolConnectionFailureListener; import io.tarantool.driver.exceptions.TarantoolClientException; -import io.tarantool.driver.mappers.MessagePackValueMapper; import io.tarantool.driver.protocol.TarantoolRequest; import java.net.InetSocketAddress; @@ -14,6 +13,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.msgpack.value.Value; + /** * @author Alexey Kuzin */ @@ -65,7 +66,7 @@ public boolean isConnected() { } @Override - public CompletableFuture sendRequest(TarantoolRequest request, MessagePackValueMapper resultMapper) { + public CompletableFuture sendRequest(TarantoolRequest request) { return null; } diff --git a/src/test/java/io/tarantool/driver/integration/CustomConnection.java b/src/test/java/io/tarantool/driver/integration/CustomConnection.java index 18fcf3214..8050d4500 100644 --- a/src/test/java/io/tarantool/driver/integration/CustomConnection.java +++ b/src/test/java/io/tarantool/driver/integration/CustomConnection.java @@ -6,12 +6,13 @@ import io.tarantool.driver.api.connection.TarantoolConnectionCloseListener; import io.tarantool.driver.api.connection.TarantoolConnectionFailureListener; import io.tarantool.driver.exceptions.TarantoolClientException; -import io.tarantool.driver.mappers.MessagePackValueMapper; import io.tarantool.driver.protocol.TarantoolRequest; import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; +import org.msgpack.value.Value; + /** * @author Alexey Kuzin */ @@ -44,8 +45,8 @@ public boolean isConnected() { } @Override - public CompletableFuture sendRequest(TarantoolRequest request, MessagePackValueMapper resultMapper) { - return connection.sendRequest(request, resultMapper); + public CompletableFuture sendRequest(TarantoolRequest request) { + return connection.sendRequest(request); } @Override