Skip to content

Commit

Permalink
[server] fix flaky test in blob transfer integration test (#1255)
Browse files Browse the repository at this point in the history
  • Loading branch information
jingy-li authored Oct 22, 2024
1 parent 5a6fb94 commit a698f3e
Showing 1 changed file with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void tearDown() {
}
}

@Test(singleThreaded = true)
@Test(singleThreaded = true, timeOut = 180000)
public void testBlobP2PTransferAmongServersForBatchStore() throws Exception {
String storeName = "test-store";
Consumer<UpdateStoreQueryParams> paramsConsumer = params -> params.setBlobTransferEnabled(true);
Expand Down Expand Up @@ -139,7 +139,7 @@ public void testBlobP2PTransferAmongServersForBatchStore() throws Exception {
* If there are no snapshots available for the store on server2, the blob transfer should throw an exception and return a 404 error.
* When server1 restarts and receives the 404 error from server2, it will switch to using Kafka to ingest the data.
*/
@Test(singleThreaded = true)
@Test(singleThreaded = true, timeOut = 180000)
public void testBlobTransferThrowExceptionIfSnapshotNotExisted() throws Exception {
String storeName = "test-store-snapshot-not-existed";
Consumer<UpdateStoreQueryParams> paramsConsumer = params -> params.setBlobTransferEnabled(true);
Expand Down Expand Up @@ -286,7 +286,7 @@ private static void runVPJ(Properties vpjProperties, int expectedVersionNumber,
LOGGER.info("**TIME** VPJ" + expectedVersionNumber + " takes " + (System.currentTimeMillis() - vpjStart));
}

@Test(singleThreaded = true)
@Test(singleThreaded = true, timeOut = 180000)
public void testBlobP2PTransferAmongServersForHybridStore() throws Exception {
ControllerClient controllerClient = new ControllerClient(cluster.getClusterName(), cluster.getAllControllersURLs());
// prepare hybrid store.
Expand All @@ -306,13 +306,15 @@ public void testBlobP2PTransferAmongServersForHybridStore() throws Exception {
VeniceServerWrapper server2 = cluster.getVeniceServers().get(1);

// offset record should be same after the empty push
for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) {
OffsetRecord offsetRecord1 =
server1.getVeniceServer().getStorageMetadataService().getLastOffset(storeName + "_v1", partitionId);
OffsetRecord offsetRecord2 =
server2.getVeniceServer().getStorageMetadataService().getLastOffset(storeName + "_v1", partitionId);
Assert.assertEquals(offsetRecord2.getLocalVersionTopicOffset(), offsetRecord1.getLocalVersionTopicOffset());
}
TestUtils.waitForNonDeterministicAssertion(2, TimeUnit.MINUTES, () -> {
for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) {
OffsetRecord offsetRecord1 =
server1.getVeniceServer().getStorageMetadataService().getLastOffset(storeName + "_v1", partitionId);
OffsetRecord offsetRecord2 =
server2.getVeniceServer().getStorageMetadataService().getLastOffset(storeName + "_v1", partitionId);
Assert.assertEquals(offsetRecord2.getLocalVersionTopicOffset(), offsetRecord1.getLocalVersionTopicOffset());
}
});

// cleanup and stop server 1
cluster.stopVeniceServer(server1.getPort());
Expand Down

0 comments on commit a698f3e

Please sign in to comment.