Skip to content

Commit

Permalink
Move MsgPack object mapping up from connection level
Browse files Browse the repository at this point in the history
This patch is a part of the global task of decoupling the MessagePack object mapping from the core driver code in the scope of discoverable mapper bindings, caching and support for different Tarantool cluster implementations
  • Loading branch information
akudiyar committed Aug 10, 2023
1 parent 167bd38 commit 423e425
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 42 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ tmp/
*.swo
.bloop/
.metals/
.vscode/
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import io.netty.channel.Channel;
import io.tarantool.driver.TarantoolVersion;
import io.tarantool.driver.exceptions.TarantoolClientException;
import io.tarantool.driver.mappers.MessagePackValueMapper;
import io.tarantool.driver.protocol.TarantoolRequest;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;

import org.msgpack.value.Value;

public interface TarantoolConnection extends AutoCloseable {
/**
* Get the Tarantool server address for this connection
Expand Down Expand Up @@ -38,10 +39,9 @@ public interface TarantoolConnection extends AutoCloseable {
*
* @param request the request
* @param resultMapper the mapper for response body
* @param <T> result type
* @return result future
*/
<T> CompletableFuture<T> sendRequest(TarantoolRequest request, MessagePackValueMapper resultMapper);
CompletableFuture<Value> sendRequest(TarantoolRequest request);

/**
* Get the Netty channel baking this connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,9 @@ private <S> CompletableFuture<S> makeRequest(
}

TarantoolCallRequest request = builder.build(argumentsMapper);
return connectionManager().getConnection().thenCompose(c -> c.sendRequest(request, resultMapper));
return connectionManager().getConnection()
.thenCompose(c -> c.sendRequest(request))
.thenApply(resultMapper::fromValue);
} catch (TarantoolProtocolException e) {
throw new TarantoolClientException(e);
}
Expand Down Expand Up @@ -535,7 +537,9 @@ public CompletableFuture<List<?>> eval(
.withExpression(expression)
.withArguments(arguments)
.build(argumentsMapper);
return connectionManager().getConnection().thenCompose(c -> c.sendRequest(request, resultMapper));
return connectionManager().getConnection()
.thenCompose(c -> c.sendRequest(request))
.thenApply(resultMapper::fromValue);
} catch (TarantoolProtocolException e) {
throw new TarantoolClientException(e);
}
Expand Down
21 changes: 9 additions & 12 deletions src/main/java/io/tarantool/driver/core/RequestFutureManager.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.tarantool.driver.core;

import io.tarantool.driver.api.TarantoolClientConfig;
import io.tarantool.driver.mappers.MessagePackValueMapper;
import io.tarantool.driver.protocol.TarantoolRequest;

import java.util.Map;
Expand All @@ -11,6 +10,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.msgpack.value.Value;

/**
* Keeps track of submitted requests, finishing them by timeout and allowing asynchronous request processing
*
Expand All @@ -37,12 +38,10 @@ public RequestFutureManager(TarantoolClientConfig config, ScheduledExecutorServi
* The request timeout is taken from the client configuration
*
* @param request request to Tarantool server
* @param resultMapper result message entity-to-object mapper
* @param <T> target response body type
* @return {@link CompletableFuture} that completes when a response is received from Tarantool server
*/
public <T> CompletableFuture<T> submitRequest(TarantoolRequest request, MessagePackValueMapper resultMapper) {
return submitRequest(request, config.getRequestTimeout(), resultMapper);
public CompletableFuture<Value> submitRequest(TarantoolRequest request) {
return submitRequest(request, config.getRequestTimeout());
}

/**
Expand All @@ -51,18 +50,16 @@ public <T> CompletableFuture<T> submitRequest(TarantoolRequest request, MessageP
*
* @param request request to Tarantool server
* @param requestTimeout timeout after which the request will be automatically failed, milliseconds
* @param resultMapper result message entity-to-object mapper
* @param <T> target response body type
* @param <V> target response body MessagePack type
* @return {@link CompletableFuture} that completes when a response is received from Tarantool server
*/
public <T> CompletableFuture<T> submitRequest(
public CompletableFuture<Value> submitRequest(
TarantoolRequest request,
int requestTimeout,
MessagePackValueMapper resultMapper) {
CompletableFuture<T> requestFuture = new CompletableFuture<>();
int requestTimeout) {
CompletableFuture<Value> requestFuture = new CompletableFuture<>();
long requestId = request.getHeader().getSync();
requestFuture.whenComplete((r, e) -> requestFutures.remove(requestId));
requestFutures.put(requestId, new TarantoolRequestMetadata(requestFuture, resultMapper));
requestFutures.put(requestId, new TarantoolRequestMetadata(requestFuture));
timeoutScheduler.schedule(() -> {
if (!requestFuture.isDone()) {
requestFuture.completeExceptionally(new TimeoutException(String.format(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
package io.tarantool.driver.core;

import io.tarantool.driver.mappers.MessagePackValueMapper;

import java.util.concurrent.CompletableFuture;

import org.msgpack.value.Value;

/**
* Intermediate request metadata holder
*
* @author Alexey Kuzin
*/
public class TarantoolRequestMetadata {
private final CompletableFuture<?> feature;
private final MessagePackValueMapper mapper;

protected TarantoolRequestMetadata(CompletableFuture<?> feature, MessagePackValueMapper mapper) {
this.feature = feature;
this.mapper = mapper;
}
private final CompletableFuture<Value> future;

public CompletableFuture<?> getFuture() {
return feature;
protected TarantoolRequestMetadata(CompletableFuture<Value> future) {
this.future = future;
}

public MessagePackValueMapper getMapper() {
return mapper;
public CompletableFuture<Value> getFuture() {
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import io.tarantool.driver.api.connection.TarantoolConnectionFailureListener;
import io.tarantool.driver.core.RequestFutureManager;
import io.tarantool.driver.exceptions.TarantoolClientException;
import io.tarantool.driver.mappers.MessagePackValueMapper;
import io.tarantool.driver.protocol.TarantoolRequest;

import org.msgpack.value.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -65,12 +66,12 @@ public boolean isConnected() {
}

@Override
public <T> CompletableFuture<T> sendRequest(TarantoolRequest request, MessagePackValueMapper resultMapper) {
public CompletableFuture<Value> sendRequest(TarantoolRequest request) {
if (!isConnected()) {
throw new TarantoolClientException("Not connected to Tarantool server");
}

CompletableFuture<T> requestFuture = requestManager.submitRequest(request, resultMapper);
CompletableFuture<Value> requestFuture = requestManager.submitRequest(request);
channel.writeAndFlush(request).addListener(f -> {
if (!f.isSuccess()) {
requestFuture.completeExceptionally(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,9 @@ private CompletableFuture<Void> truncate(MessagePackValueMapper resultMapper)
protected abstract MessagePackValueMapper arrayTupleResultMapper();

private CompletableFuture<R> sendRequest(TarantoolRequest request, MessagePackValueMapper resultMapper) {
return connectionManager.getConnection().thenCompose(c -> c.sendRequest(request, resultMapper));
return connectionManager.getConnection()
.thenCompose(c -> c.sendRequest(request))
.thenApply(resultMapper::fromValue);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import io.tarantool.driver.protocol.TarantoolErrorResult;
import io.tarantool.driver.protocol.TarantoolOkResult;
import io.tarantool.driver.protocol.TarantoolResponse;

import org.msgpack.value.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,7 +38,7 @@ public TarantoolResponseHandler(RequestFutureManager futureManager) {
protected void channelRead0(ChannelHandlerContext ctx, TarantoolResponse tarantoolResponse) throws Exception {
TarantoolRequestMetadata requestMeta = futureManager.getRequest(tarantoolResponse.getSyncId());
if (requestMeta != null) {
CompletableFuture<?> requestFuture = requestMeta.getFuture();
CompletableFuture<Value> requestFuture = requestMeta.getFuture();
if (!requestFuture.isDone()) {
switch (tarantoolResponse.getResponseType()) {
case IPROTO_NOT_OK:
Expand All @@ -48,7 +50,7 @@ protected void channelRead0(ChannelHandlerContext ctx, TarantoolResponse taranto
try {
TarantoolOkResult okResult = new TarantoolOkResult(tarantoolResponse.getSyncId(),
tarantoolResponse.getBody().getData());
requestFuture.complete(requestMeta.getMapper().fromValue(okResult.getData()));
requestFuture.complete(okResult.getData());
} catch (Throwable e) {
requestFuture.completeExceptionally(e);
}
Expand All @@ -65,7 +67,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
TarantoolDecoderException ex = (TarantoolDecoderException) cause.getCause();
TarantoolRequestMetadata requestMeta = futureManager.getRequest(ex.getHeader().getSync());
if (requestMeta != null) {
CompletableFuture<?> requestFuture = requestMeta.getFuture();
CompletableFuture<Value> requestFuture = requestMeta.getFuture();
if (!requestFuture.isDone()) {
requestFuture.completeExceptionally(cause);
return;
Expand Down
5 changes: 3 additions & 2 deletions src/test/java/io/tarantool/driver/core/CustomConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
import io.tarantool.driver.api.connection.TarantoolConnectionCloseListener;
import io.tarantool.driver.api.connection.TarantoolConnectionFailureListener;
import io.tarantool.driver.exceptions.TarantoolClientException;
import io.tarantool.driver.mappers.MessagePackValueMapper;
import io.tarantool.driver.protocol.TarantoolRequest;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.msgpack.value.Value;

/**
* @author Alexey Kuzin
*/
Expand Down Expand Up @@ -65,7 +66,7 @@ public boolean isConnected() {
}

@Override
public <T> CompletableFuture<T> sendRequest(TarantoolRequest request, MessagePackValueMapper resultMapper) {
public CompletableFuture<Value> sendRequest(TarantoolRequest request) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import io.tarantool.driver.api.connection.TarantoolConnectionCloseListener;
import io.tarantool.driver.api.connection.TarantoolConnectionFailureListener;
import io.tarantool.driver.exceptions.TarantoolClientException;
import io.tarantool.driver.mappers.MessagePackValueMapper;
import io.tarantool.driver.protocol.TarantoolRequest;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;

import org.msgpack.value.Value;

/**
* @author Alexey Kuzin
*/
Expand Down Expand Up @@ -44,8 +45,8 @@ public boolean isConnected() {
}

@Override
public <T> CompletableFuture<T> sendRequest(TarantoolRequest request, MessagePackValueMapper resultMapper) {
return connection.sendRequest(request, resultMapper);
public CompletableFuture<Value> sendRequest(TarantoolRequest request) {
return connection.sendRequest(request);
}

@Override
Expand Down

0 comments on commit 423e425

Please sign in to comment.