Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add runAs(Subject subject) to Client interface #16976

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow extended plugins to be optional ([#16909](https://github.com/opensearch-project/OpenSearch/pull/16909))
- Use the correct type to widen the sort fields when merging top docs ([#16881](https://github.com/opensearch-project/OpenSearch/pull/16881))
- Limit reader writer separation to remote store enabled clusters [#16760](https://github.com/opensearch-project/OpenSearch/pull/16760)
- Add runAs(Subject subject) to Client interface ([#16976](https://github.com/opensearch-project/OpenSearch/pull/16976))

### Deprecated
- Performing update operation with default pipeline or final pipeline is deprecated ([#16712](https://github.com/opensearch-project/OpenSearch/pull/16712))
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.identity.Subject;

import java.util.Map;

Expand Down Expand Up @@ -125,6 +126,8 @@ public interface Client extends OpenSearchClient, Releasable {
*/
AdminClient admin();

Client runAs(Subject subject);

/**
* Index a JSON source associated with a given index.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.identity.RunAsSubjectClient;
import org.opensearch.identity.Subject;
import org.opensearch.threadpool.ThreadPool;

import java.util.Map;
Expand Down Expand Up @@ -464,6 +466,11 @@ public final AdminClient admin() {
return admin;
}

@Override
public final Client runAs(Subject subject) {
return new RunAsSubjectClient(this, subject);
}

@Override
public final <Request extends ActionRequest, Response extends ActionResponse> ActionFuture<Response> execute(
ActionType<Response> action,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.identity;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionType;
import org.opensearch.client.Client;
import org.opensearch.client.FilterClient;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;

/**
* Implementation of client that will run transport actions in a stashed context and inject the name of the provided
* subject into the context.
*
* @opensearch.internal
*/
@InternalApi
public class RunAsSubjectClient extends FilterClient {

private static final Logger logger = LogManager.getLogger(RunAsSubjectClient.class);

private final Subject subject;

public RunAsSubjectClient(Client delegate, Subject subject) {
super(delegate);
this.subject = subject;
}

@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
try (ThreadContext.StoredContext ctx = threadPool().getThreadContext().newStoredContext(false)) {
subject.runAs(() -> {
logger.info("Running transport action with subject: {}", subject.getPrincipal().getName());
super.doExecute(action, request, ActionListener.runBefore(listener, ctx::restore));
return null;
});
} catch (Exception e) {
throw new RuntimeException(e);

Check warning on line 53 in server/src/main/java/org/opensearch/identity/RunAsSubjectClient.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/identity/RunAsSubjectClient.java#L52-L53

Added lines #L52 - L53 were not covered by tests
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.identity;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
import org.junit.Before;

import java.security.Principal;

import static org.hamcrest.Matchers.equalTo;

public class RunAsSubjectClientTests extends OpenSearchSingleNodeTestCase {

private final Subject TEST_SUBJECT = new Subject() {
@Override
public Principal getPrincipal() {
return new NamedPrincipal("testSubject");
}
};

@Before
public void setup() {
client().threadPool().getThreadContext().stashContext(); // start with fresh context
}

public void testThatContextIsRestoredOnActionListenerResponse() throws Exception {
try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(RunAsSubjectClient.class))) {
mockLogAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"testSubject",
"org.opensearch.identity.RunAsSubjectClient",
Level.INFO,
"Running transport action with subject: testSubject"
)
);

client().threadPool().getThreadContext().putHeader("test_header", "foo");

client().runAs(TEST_SUBJECT).admin().cluster().health(new ClusterHealthRequest(), new ActionListener<>() {
@Override
public void onResponse(ClusterHealthResponse clusterHealthResponse) {
String testHeader = client().threadPool().getThreadContext().getHeader("test_header");
assertThat(testHeader, equalTo("foo"));

mockLogAppender.assertAllExpectationsMatched();
}

@Override
public void onFailure(Exception e) {
fail("Expected cluster health action to succeed");
}
});
}
}

public void testThatContextIsRestoredOnActionListenerFailure() throws Exception {
try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(RunAsSubjectClient.class))) {
mockLogAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"testSubject",
"org.opensearch.identity.RunAsSubjectClient",
Level.INFO,
"Running transport action with subject: testSubject"
)
);
client().threadPool().getThreadContext().putHeader("test_header", "bar");

client().runAs(TEST_SUBJECT).admin().cluster().health(new ClusterHealthRequest("dne"), new ActionListener<>() {
@Override
public void onResponse(ClusterHealthResponse clusterHealthResponse) {
fail("Expected cluster health action to fail");
}

@Override
public void onFailure(Exception e) {
String testHeader = client().threadPool().getThreadContext().getHeader("test_header");
assertThat(testHeader, equalTo("bar"));

mockLogAppender.assertAllExpectationsMatched();
}
});
}
}
}
Loading