Skip to content

Commit

Permalink
Incoporate PR review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Apr 22, 2024
1 parent 5f28f61 commit a57054b
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
import java.util.List;

/**
Expand All @@ -30,7 +29,7 @@ public interface IndexMetadataUploadListener {
* @param indexMetadataList list of index metadata of new indexes (or first time index metadata upload).
* @param actionListener listener to be invoked on success or failure.
*/
void beforeNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener) throws IOException;
void beforeNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener);

String getThreadpoolName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(
}

/**
* Invokes the index metadata upload listener.
* Invokes the index metadata upload listener but does not wait for the execution to complete.
*/
private void invokeIndexMetadataUploadListeners(
List<IndexMetadata> newIndexMetadataList,
Expand All @@ -537,19 +537,10 @@ private void invokeIndexMetadataUploadListeners(
String threadPoolName = listener.getThreadpoolName();
assert ThreadPool.THREAD_POOL_TYPES.containsKey(threadPoolName) && ThreadPool.Names.SAME.equals(threadPoolName) == false;
threadpool.executor(threadPoolName).execute(() -> {
try {
listener.beforeNewIndexUpload(
newIndexMetadataList,
getIndexMetadataUploadActionListener(newIndexMetadataList, latch, exceptionList, listenerName)
);
} catch (IOException e) {
exceptionList.add(
new RemoteStateTransferException(
"Exception occurred while running invokeIndexMetadataUploadListeners in " + listenerName,
e
)
);
}
listener.beforeNewIndexUpload(
newIndexMetadataList,
getIndexMetadataUploadActionListener(newIndexMetadataList, latch, exceptionList, listenerName)
);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
/**
* Uploads the remote store path for all possible combinations of {@link org.opensearch.index.remote.RemoteStoreEnums.DataCategory}
* and {@link org.opensearch.index.remote.RemoteStoreEnums.DataType} for each shard of an index.
*
* @opensearch.internal
*/
@ExperimentalApi
public class RemoteIndexPathUploader implements IndexMetadataUploadListener {
Expand Down Expand Up @@ -94,7 +96,7 @@ public String getThreadpoolName() {
}

@Override
public void beforeNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener) throws IOException {
public void beforeNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener) {
if (isRemoteDataAttributePresent == false) {
logger.trace("Skipping beforeNewIndexUpload as there are no remote indexes");
actionListener.onResponse(null);
Expand All @@ -109,7 +111,16 @@ public void beforeNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionLi
CountDownLatch latch = new CountDownLatch(latchCount);
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount));
for (IndexMetadata indexMetadata : eligibleList) {
writeIndexPathAsync(indexMetadata, latch, exceptionList);
try {
writeIndexPathAsync(indexMetadata, latch, exceptionList);
} catch (IOException exception) {
RemoteStateTransferException ex = new RemoteStateTransferException(
String.format(Locale.ROOT, UPLOAD_EXCEPTION_MSG, List.of(indexMetadata.getIndex().getName()))
);
exceptionList.forEach(ex::addSuppressed);
actionListener.onFailure(ex);
return;
}
}
String indexNames = eligibleList.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining(","));
logger.trace(new ParameterizedMessage("Remote index path upload started for {}", indexNames));
Expand Down Expand Up @@ -142,6 +153,9 @@ public void beforeNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionLi
}
success = true;
actionListener.onResponse(null);
} catch (Exception ex) {
actionListener.onFailure(ex);
throw ex;
} finally {
long tookTimeNs = System.nanoTime() - startTime;
logger.trace(new ParameterizedMessage("executed beforeNewIndexUpload status={} tookTimeNs={}", success, tookTimeNs));
Expand Down
6 changes: 3 additions & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1472,9 +1472,9 @@ public Node start() throws NodeValidationException {
if (remoteClusterStateService != null) {
remoteClusterStateService.start();
}
final RemoteIndexPathUploader indexCreationListener = injector.getInstance(RemoteIndexPathUploader.class);
if (indexCreationListener != null) {
indexCreationListener.start();
final RemoteIndexPathUploader remoteIndexPathUploader = injector.getInstance(RemoteIndexPathUploader.class);
if (remoteIndexPathUploader != null) {
remoteIndexPathUploader.start();
}
// Load (and maybe upgrade) the metadata stored on disk
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ public BytesReference serialize(final T obj, final String blobName, final Compre
)
) {
CodecUtil.writeHeader(indexOutput, codec, VERSION);
XContentType xContentType = XContentType.SMILE;
try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
@Override
public void close() throws IOException {
Expand All @@ -309,7 +308,7 @@ public void close() throws IOException {
}
};
XContentBuilder builder = MediaTypeRegistry.contentBuilder(
xContentType,
XContentType.SMILE,
compressor.threadLocalOutputStream(indexOutputOutputStream)
)
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,6 @@ public void testInterceptWithInterruptedExceptionDuringLatchAwait() throws Excep
Thread thread = new Thread(() -> {
try {
remoteIndexPathUploader.beforeNewIndexUpload(indexMetadataList, actionListener);
} catch (IOException e) {
throw new AssertionError(e);
} catch (Exception e) {
assertTrue(e instanceof InterruptedException);
assertEquals("sleep interrupted", e.getMessage());
Expand Down

0 comments on commit a57054b

Please sign in to comment.