Skip to content

Commit

Permalink
[Backport 2.19] Modified Rest Handlers to stash context before modify…
Browse files Browse the repository at this point in the history
…ing system indices #126 (#127)

Signed-off-by: rithin-pullela-aws <[email protected]>
  • Loading branch information
rithin-pullela-aws authored Feb 1, 2025
1 parent 2f584bc commit 45ae58d
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 45 deletions.
19 changes: 18 additions & 1 deletion src/main/java/com/o19s/es/ltr/rest/RestAddFeatureToSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@
import java.util.List;

import org.opensearch.client.node.NodeClient;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.ParseField;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ObjectParser;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.ltr.settings.LTRSettings;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestStatusToXContentListener;

import com.o19s.es.ltr.action.AddFeaturesToSetAction;
import com.o19s.es.ltr.action.AddFeaturesToSetAction.AddFeaturesToSetRequestBuilder;
import com.o19s.es.ltr.feature.FeatureValidation;
import com.o19s.es.ltr.feature.store.StoredFeature;
Expand Down Expand Up @@ -96,7 +101,19 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
builder.request().setFeatures(features);
builder.request().setMerge(merge);
builder.request().setValidation(validation);
return (channel) -> builder.execute(new RestStatusToXContentListener<>(channel, (r) -> r.getResponse().getLocation(routing)));
return (channel) -> {
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
ActionListener<AddFeaturesToSetAction.AddFeaturesToSetResponse> wrappedListener = ActionListener
.runBefore(
new RestStatusToXContentListener<>(channel, (r) -> r.getResponse().getLocation(routing)),
threadContext::restore
);

builder.execute(wrappedListener);
} catch (Exception e) {
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
};
}

static class FeaturesParserState {
Expand Down
24 changes: 17 additions & 7 deletions src/main/java/com/o19s/es/ltr/rest/RestCreateModelFromSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.opensearch.ExceptionsHelper;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.ParseField;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.ParsingException;
Expand Down Expand Up @@ -83,15 +84,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
}
builder.request().setValidation(state.validation);
builder.routing(routing);
return (channel) -> builder
.execute(
ActionListener
return (channel) -> {
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
ActionListener<CreateModelFromSetAction.CreateModelFromSetResponse> wrappedListener = ActionListener
.wrap(
response -> new RestStatusToXContentListener<CreateModelFromSetAction.CreateModelFromSetResponse>(
channel,
(r) -> r.getResponse().getLocation(routing)
r -> r.getResponse().getLocation(routing)
).onResponse(response),
(e) -> {
e -> {
final Exception exc;
final RestStatus status;
if (ExceptionsHelper.unwrap(e, VersionConflictEngineException.class) != null) {
Expand All @@ -112,8 +113,17 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
logger.error("failed to send failure response", inner);
}
}
)
);
);

ActionListener<CreateModelFromSetAction.CreateModelFromSetResponse> contextRestoringListener = ActionListener
.runBefore(wrappedListener, () -> threadContext.restore());

builder.execute(contextRestoringListener);
} catch (Exception e) {
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
};

}

private static class ParserState {
Expand Down
86 changes: 53 additions & 33 deletions src/main/java/com/o19s/es/ltr/rest/RestFeatureManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import java.io.IOException;
import java.util.List;

import org.opensearch.action.delete.DeleteRequestBuilder;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.ltr.settings.LTRSettings;
Expand Down Expand Up @@ -97,33 +99,34 @@ RestChannelConsumer delete(NodeClient client, String type, String indexName, Res
String routing = request.param("routing");
return (channel) -> {
RestStatusToXContentListener<DeleteResponse> restR = new RestStatusToXContentListener<>(channel, (r) -> r.getLocation(routing));
client.prepareDelete(indexName, id).setRouting(routing).execute(ActionListener.wrap((deleteResponse) -> {
// wrap the response so we can send another request to clear the cache
// usually we send only one transport request from the rest layer
// it's still unclear which direction we should take (thick or thin REST layer?)
ClearCachesAction.ClearCachesNodesRequest clearCache = new ClearCachesAction.ClearCachesNodesRequest();
switch (type) {
case StoredFeature.TYPE:
clearCache.clearFeature(indexName, name);
break;
case StoredFeatureSet.TYPE:
clearCache.clearFeatureSet(indexName, name);
break;
case StoredLtrModel.TYPE:
clearCache.clearModel(indexName, name);
break;
ClearCachesAction.ClearCachesNodesRequest clearCache = new ClearCachesAction.ClearCachesNodesRequest();
switch (type) {
case StoredFeature.TYPE:
clearCache.clearFeature(indexName, name);
break;
case StoredFeatureSet.TYPE:
clearCache.clearFeatureSet(indexName, name);
break;
case StoredLtrModel.TYPE:
clearCache.clearModel(indexName, name);
break;
}
DeleteRequestBuilder deleteRequest = client.prepareDelete(indexName, id).setRouting(routing);
// clearing cache first and deleting next
// if cache clearning fails, we do not attempt to delete and return with an error
// need to evaluate this strategy
client.execute(ClearCachesAction.INSTANCE, clearCache, ActionListener.wrap((r) -> {
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
deleteRequest.execute(ActionListener.wrap((deleteResponse) -> {
restR.onResponse(deleteResponse);
threadContext.restore();
}, (e) -> {
restR.onFailure(e);
threadContext.restore();
}));
} catch (Exception e) {
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
client
.execute(
ClearCachesAction.INSTANCE,
clearCache,
ActionListener
.wrap(
(r) -> restR.onResponse(deleteResponse),
// Is it good to fail the whole request if cache invalidation failed?
restR::onFailure
)
);
}, restR::onFailure));
};
}
Expand All @@ -133,12 +136,17 @@ RestChannelConsumer get(NodeClient client, String type, String indexName, RestRe
String name = request.param("name");
String routing = request.param("routing");
String id = generateId(type, name);
return (channel) -> client.prepareGet(indexName, id).setRouting(routing).execute(new RestToXContentListener<GetResponse>(channel) {
@Override
protected RestStatus getStatus(final GetResponse response) {
return response.isExists() ? OK : NOT_FOUND;
}
});
// refresh index before performing get
return (channel) -> {
client.admin().indices().prepareRefresh(indexName).execute(ActionListener.wrap(refreshResponse -> {
client.prepareGet(indexName, id).setRouting(routing).execute(new RestToXContentListener<GetResponse>(channel) {
@Override
protected RestStatus getStatus(final GetResponse response) {
return response.isExists() ? OK : NOT_FOUND;
}
});
}, e -> channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()))));
};
}

RestChannelConsumer addOrUpdate(NodeClient client, String type, String indexName, RestRequest request) throws IOException {
Expand Down Expand Up @@ -181,6 +189,18 @@ RestChannelConsumer addOrUpdate(NodeClient client, String type, String indexName
builder.request().setRouting(routing);
builder.request().setStore(indexName);
builder.request().setValidation(parserState.getValidation());
return (channel) -> builder.execute(new RestStatusToXContentListener<>(channel, (r) -> r.getResponse().getLocation(routing)));
return (channel) -> {
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
ActionListener<FeatureStoreAction.FeatureStoreResponse> wrappedListener = ActionListener
.runBefore(
new RestStatusToXContentListener<>(channel, (r) -> r.getResponse().getLocation(routing)),
() -> threadContext.restore()
);

builder.execute(wrappedListener);
} catch (Exception e) {
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
};
}
}
14 changes: 13 additions & 1 deletion src/main/java/com/o19s/es/ltr/rest/RestStoreManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@

import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.ltr.settings.LTRSettings;
Expand Down Expand Up @@ -128,6 +131,15 @@ RestChannelConsumer createIndex(NodeClient client, String indexName) {

RestChannelConsumer deleteIndex(NodeClient client, String indexName) {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
return (channel) -> client.admin().indices().delete(deleteIndexRequest, new RestToXContentListener<>(channel));
return (channel) -> {
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
ActionListener<AcknowledgedResponse> wrappedListener = ActionListener
.runBefore(new RestToXContentListener<>(channel), () -> threadContext.restore());

client.admin().indices().delete(deleteIndexRequest, wrappedListener);
} catch (Exception e) {
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
};
}
}
9 changes: 7 additions & 2 deletions src/test/resources/rest-api-spec/test/fstore/10_manage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
- do:
allowed_warnings:
- "this request accesses system indices: [.ltrstore_mystore], but in a future major version, direct access to system indices will be prevented by default"
- "this request accesses system indices: [.ltrstore], but in a future major version, direct access to system indices will be prevented by default"
ltr.list_stores: {}

- match: { stores._default_.version: 2 }
Expand Down Expand Up @@ -179,6 +180,8 @@
index: test

- do:
allowed_warnings:
- "this request accesses system indices: [.ltrstore_custom], but in a future major version, direct access to system indices will be prevented by default"
search:
index: test
body: { query: { "sltr": { "params": {"query_string": "rambo"}, "model": "my_model", "store": "custom" } } }
Expand All @@ -191,8 +194,8 @@
- do:
allowed_warnings:
- "this request accesses system indices: [.ltrstore_custom], but in a future major version, direct access to system indices will be prevented by default"
indices.delete:
index: .ltrstore_custom
ltr.delete_store:
store: custom

- do:
ltr.cache_stats: {}
Expand Down Expand Up @@ -241,6 +244,8 @@
index: test

- do:
allowed_warnings:
- "this request accesses system indices: [.ltrstore_custom], but in a future major version, direct access to system indices will be prevented by default"
search:
index: test
body: { query: { "sltr": { "params": {"query_string": "rambo"}, "model": "my_model", "store": "custom" } } }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@
- skip:
features: allowed_warnings
- do:
ltr.create_store: {}
allowed_warnings:
- "this request accesses system indices: [.ltrstore], but in a future major version, direct access to system indices will be prevented by default"
ltr.create_store: {}
- do:
indices.create:
index: test_index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ setup:

---
"single feature ranklib model":
- skip:
features: allowed_warnings
- do:
allowed_warnings:
- "this request accesses system indices: [.ltrstore], but in a future major version, direct access to system indices will be prevented by default"
search:
index: test
body: { query: { "sltr": { "params": {"query_string": "rambo"}, "model": "single_feature_ranklib_model" } } }
Expand All @@ -148,7 +152,11 @@ setup:

---
"single feature linear model":
- skip:
features: allowed_warnings
- do:
allowed_warnings:
- "this request accesses system indices: [.ltrstore], but in a future major version, direct access to system indices will be prevented by default"
search:
index: test
body: { query: { "sltr": { "params": {"query_string": "rambo"}, "model": "single_feature_linear_model" } } }
Expand All @@ -157,7 +165,11 @@ setup:

---
"three feature linear model using one active feature":
- skip:
features: allowed_warnings
- do:
allowed_warnings:
- "this request accesses system indices: [.ltrstore], but in a future major version, direct access to system indices will be prevented by default"
search:
index: test
body: { query: { "sltr": { "params": {}, "model": "three_feature_linear_model", "active_features": ["no_param_feature"]} } }
Expand Down

0 comments on commit 45ae58d

Please sign in to comment.