diff --git a/connector-java/src/main/java/io/zeebe/hazelcast/connect/java/ZeebeHazelcast.java b/connector-java/src/main/java/io/zeebe/hazelcast/connect/java/ZeebeHazelcast.java index 4f74fcf..9878afe 100644 --- a/connector-java/src/main/java/io/zeebe/hazelcast/connect/java/ZeebeHazelcast.java +++ b/connector-java/src/main/java/io/zeebe/hazelcast/connect/java/ZeebeHazelcast.java @@ -1,6 +1,7 @@ package io.zeebe.hazelcast.connect.java; import com.google.protobuf.InvalidProtocolBufferException; +import com.hazelcast.client.HazelcastClientNotActiveException; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.ringbuffer.Ringbuffer; import com.hazelcast.ringbuffer.StaleSequenceException; @@ -76,6 +77,10 @@ private void start() { future = executorService.submit(this::readFromBuffer); } + public boolean isClosed() { + return isClosed; + } + /** Stop reading from the ringbuffer. */ @Override public void close() throws Exception { @@ -146,6 +151,15 @@ private void readNext() { sequence = headSequence; + } catch (HazelcastClientNotActiveException e) { + LOGGER.warn("Lost connection to the Hazelcast server", e); + + try { + close(); + } catch (Exception closingFailure) { + LOGGER.debug("Failure while closing the client", closingFailure); + } + } catch (InterruptedException e) { LOGGER.debug("Interrupted while reading from ring-buffer with sequence '{}'", sequence); throw new RuntimeException("Interrupted while reading from ring-buffer", e); diff --git a/connector-java/src/test/java/io/zeebe/hazelcast/ZeebeHazelcastClientTest.java b/connector-java/src/test/java/io/zeebe/hazelcast/ZeebeHazelcastClientTest.java new file mode 100644 index 0000000..1fdae15 --- /dev/null +++ b/connector-java/src/test/java/io/zeebe/hazelcast/ZeebeHazelcastClientTest.java @@ -0,0 +1,65 @@ +package io.zeebe.hazelcast; + +import com.hazelcast.client.HazelcastClient; +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.config.Config; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import io.zeebe.hazelcast.connect.java.ZeebeHazelcast; +import io.zeebe.hazelcast.exporter.ExporterConfiguration; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ZeebeHazelcastClientTest { + + private static final ExporterConfiguration CONFIGURATION = new ExporterConfiguration(); + private static final Duration CONNECTION_TIMEOUT = Duration.ofSeconds(5); + + private ZeebeHazelcast zeebeHazelcast; + + private HazelcastInstance hzInstance; + private HazelcastInstance hzClient; + + @Before + public void init() { + final Config config = new Config(); + config.getNetworkConfig().setPort(5702); + hzInstance = Hazelcast.newHazelcastInstance(config); + + final ClientConfig clientConfig = new ClientConfig(); + + final var connectionRetryConfig = + clientConfig.getConnectionStrategyConfig().getConnectionRetryConfig(); + connectionRetryConfig.setClusterConnectTimeoutMillis(CONNECTION_TIMEOUT.toMillis()); + + clientConfig.getNetworkConfig().addAddress("127.0.0.1:5702"); + hzClient = HazelcastClient.newHazelcastClient(clientConfig); + } + + @After + public void cleanUp() throws Exception { + zeebeHazelcast.close(); + hzClient.shutdown(); + hzInstance.shutdown(); + } + + @Test + public void shouldCloseIfHazelcastIsUnavailable() { + // given + zeebeHazelcast = ZeebeHazelcast.newBuilder(hzClient).build(); + + // when + hzInstance.shutdown(); + + // then + Awaitility.await() + .atMost(CONNECTION_TIMEOUT.multipliedBy(2)) + .untilAsserted(() -> assertThat(zeebeHazelcast.isClosed()).isTrue()); + } +}