diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/BlobP2PTransferAmongServersTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/BlobP2PTransferAmongServersTest.java index c6ac0c9956..f80daea06a 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/BlobP2PTransferAmongServersTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/BlobP2PTransferAmongServersTest.java @@ -66,7 +66,7 @@ public void tearDown() { } } - @Test(singleThreaded = true) + @Test(singleThreaded = true, timeOut = 180000) public void testBlobP2PTransferAmongServersForBatchStore() throws Exception { String storeName = "test-store"; Consumer paramsConsumer = params -> params.setBlobTransferEnabled(true); @@ -139,7 +139,7 @@ public void testBlobP2PTransferAmongServersForBatchStore() throws Exception { * If there are no snapshots available for the store on server2, the blob transfer should throw an exception and return a 404 error. * When server1 restarts and receives the 404 error from server2, it will switch to using Kafka to ingest the data. */ - @Test(singleThreaded = true) + @Test(singleThreaded = true, timeOut = 180000) public void testBlobTransferThrowExceptionIfSnapshotNotExisted() throws Exception { String storeName = "test-store-snapshot-not-existed"; Consumer paramsConsumer = params -> params.setBlobTransferEnabled(true); @@ -286,7 +286,7 @@ private static void runVPJ(Properties vpjProperties, int expectedVersionNumber, LOGGER.info("**TIME** VPJ" + expectedVersionNumber + " takes " + (System.currentTimeMillis() - vpjStart)); } - @Test(singleThreaded = true) + @Test(singleThreaded = true, timeOut = 180000) public void testBlobP2PTransferAmongServersForHybridStore() throws Exception { ControllerClient controllerClient = new ControllerClient(cluster.getClusterName(), cluster.getAllControllersURLs()); // prepare hybrid store. @@ -306,13 +306,15 @@ public void testBlobP2PTransferAmongServersForHybridStore() throws Exception { VeniceServerWrapper server2 = cluster.getVeniceServers().get(1); // offset record should be same after the empty push - for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) { - OffsetRecord offsetRecord1 = - server1.getVeniceServer().getStorageMetadataService().getLastOffset(storeName + "_v1", partitionId); - OffsetRecord offsetRecord2 = - server2.getVeniceServer().getStorageMetadataService().getLastOffset(storeName + "_v1", partitionId); - Assert.assertEquals(offsetRecord2.getLocalVersionTopicOffset(), offsetRecord1.getLocalVersionTopicOffset()); - } + TestUtils.waitForNonDeterministicAssertion(2, TimeUnit.MINUTES, () -> { + for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) { + OffsetRecord offsetRecord1 = + server1.getVeniceServer().getStorageMetadataService().getLastOffset(storeName + "_v1", partitionId); + OffsetRecord offsetRecord2 = + server2.getVeniceServer().getStorageMetadataService().getLastOffset(storeName + "_v1", partitionId); + Assert.assertEquals(offsetRecord2.getLocalVersionTopicOffset(), offsetRecord1.getLocalVersionTopicOffset()); + } + }); // cleanup and stop server 1 cluster.stopVeniceServer(server1.getPort());