Skip to content

Commit

Permalink
Modify optimisation approach without Cluster Settings
Browse files Browse the repository at this point in the history
Signed-off-by: Pranshu Shukla <[email protected]>
  • Loading branch information
Pranshu-S committed Jul 10, 2024
1 parent af0ab18 commit a267aef
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
Expand All @@ -31,10 +36,13 @@
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.hamcrest.MatcherAssert;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.Collections.singletonMap;
Expand Down Expand Up @@ -246,47 +254,140 @@ public void testNodeIndicesStatsDocStatusStatsCreateDeleteUpdate() {
}
}
}
public void testNodeIndicesStatsOptimisedResponse() {

/**
* Default behavior - without consideration of request level param on level, the NodeStatsRequest always
* returns ShardStats which is aggregated on the coordinator node when creating the XContent.
*/
public void testNodeIndicesStatsDefaultResponse() {
String testLevel = randomFrom("null", "node", "indices", "shards", "unknown");
internalCluster().startNode();
ensureGreen();
String indexName = "test1";
index(indexName, "type", "1", "f", "f");
refresh();
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();

NodesStatsResponse response = client().admin().cluster().prepareNodesStats().get();
NodesStatsResponse response;
if (!testLevel.equals("null")) {
ArrayList<String> level_arg = new ArrayList<>();
level_arg.add(testLevel);

CommonStatsFlags commonStatsFlags = new CommonStatsFlags();
commonStatsFlags.setLevels(level_arg.toArray(new String[0]));
response = client().admin().cluster().prepareNodesStats().setIndices(commonStatsFlags).get();
} else {
response = client().admin().cluster().prepareNodesStats().get();
}

response.getNodes().forEach(nodeStats -> {
assertNotNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex()));
assertNull(nodeStats.getIndices().getIndexStats(clusterState.metadata().index(indexName).getIndex()));
try {
// Without any param - default is level = nodes
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
builder = nodeStats.getIndices().toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();

Map<String, Object> xContentMap = xContentBuilderToMap(builder);
LinkedHashMap indicesStatsMap = (LinkedHashMap) xContentMap.get("indices");
assertFalse(indicesStatsMap.containsKey("indices"));
assertFalse(indicesStatsMap.containsKey("shards"));

// With param containing level as 'indices', the indices stats are returned
builder = XContentFactory.jsonBuilder();
builder.startObject();
builder = nodeStats.getIndices()
.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap("level", "indices")));
builder.endObject();

xContentMap = xContentBuilderToMap(builder);
indicesStatsMap = (LinkedHashMap) xContentMap.get("indices");
assertTrue(indicesStatsMap.containsKey("indices"));
assertFalse(indicesStatsMap.containsKey("shards"));

LinkedHashMap indexLevelStats = (LinkedHashMap) indicesStatsMap.get("indices");
assertTrue(indexLevelStats.containsKey(indexName));

// With param containing level as 'shards', the shard stats are returned
builder = XContentFactory.jsonBuilder();
builder.startObject();
builder = nodeStats.getIndices().toXContent(builder, new ToXContent.MapParams(Collections.singletonMap("level", "shards")));
builder.endObject();

xContentMap = xContentBuilderToMap(builder);
indicesStatsMap = (LinkedHashMap) xContentMap.get("indices");
assertFalse(indicesStatsMap.containsKey("indices"));
assertTrue(indicesStatsMap.containsKey("shards"));

LinkedHashMap shardLevelStats = (LinkedHashMap) indicesStatsMap.get("shards");
assertTrue(shardLevelStats.containsKey(indexName));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

/**
* Optimized behavior - to avoid unnecessary IO in the form of shard-stats when not required, we not honor the levels on the
* individual data nodes instead and pre-compute information as required.
*/
public void testNodeIndicesStatsOptimizedResponse() {
String testLevel = randomFrom("null", "node", "indices", "shards", "unknown");
internalCluster().startNode();
ensureGreen();
String indexName = "test1";
index(indexName, "type", "1", "f", "f");
refresh();

NodesStatsResponse response;
CommonStatsFlags commonStatsFlags = new CommonStatsFlags();
commonStatsFlags.optimizeNodeIndicesStatsOnLevel(true);
response = client().admin().cluster().prepareNodesStats().setIndices(commonStatsFlags).get();
response.getNodes().forEach(nodeStats -> {
assertNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex()));
assertNull(nodeStats.getIndices().getIndexStats(clusterState.metadata().index(indexName).getIndex()));
if (!testLevel.equals("null")) {
ArrayList<String> level_arg = new ArrayList<>();
level_arg.add(testLevel);

});
ArrayList<String> level_arg = new ArrayList<>();
level_arg.add("indices");
commonStatsFlags.setLevels(level_arg.toArray(new String[0]));
commonStatsFlags.setLevels(level_arg.toArray(new String[0]));
}
response = client().admin().cluster().prepareNodesStats().setIndices(commonStatsFlags).get();
response.getNodes().forEach(nodeStats -> {
assertNotNull(nodeStats.getIndices().getIndexStats(clusterState.metadata().index(indexName).getIndex()));
assertNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex()));
});

level_arg.clear();
level_arg.add("shards");
commonStatsFlags.setLevels(level_arg.toArray(new String[0]));
response = client().admin().cluster().prepareNodesStats().setIndices(commonStatsFlags).get();
response.getNodes().forEach(nodeStats -> {
assertNotNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex()));
assertNull(nodeStats.getIndices().getIndexStats(clusterState.metadata().index(indexName).getIndex()));
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
builder = nodeStats.getIndices().toXContent(builder, new ToXContent.MapParams(Collections.singletonMap("level", "shards")));
builder.endObject();

Map<String, Object> xContentMap = xContentBuilderToMap(builder);
LinkedHashMap indicesStatsMap = (LinkedHashMap) xContentMap.get("indices");
LinkedHashMap indicesStats = (LinkedHashMap) indicesStatsMap.get("indices");
LinkedHashMap shardStats = (LinkedHashMap) indicesStatsMap.get("shards");

switch (testLevel) {
case "shards":
assertFalse(shardStats.isEmpty());
assertFalse(indicesStats.isEmpty());
break;
case "indices":
assertTrue(shardStats.isEmpty());
assertFalse(indicesStats.isEmpty());
break;
case "node":
case "null":
case "unknown":
assertTrue(shardStats.isEmpty());
assertTrue(indicesStats.isEmpty());
break;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}


private Map<String, Object> xContentBuilderToMap(XContentBuilder xContentBuilder) {
return XContentHelper.convertToMap(BytesReference.bytes(xContentBuilder), true, xContentBuilder.contentType()).v2();
}

private void assertDocStatusStats() {
DocStatusStats docStatusStats = client().admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ public NodesStatsRequest indices(boolean indices) {
/**
* Use Optimized Response filtered based on level
*/
public NodesStatsRequest useOptimizedNodeIndicesStats(boolean useOptimizedNodeIndicesStats){
if (this.indices!=null) {
public NodesStatsRequest useOptimizedNodeIndicesStats(boolean useOptimizedNodeIndicesStats) {
if (this.indices != null) {
this.indices.optimizeNodeIndicesStatsOnLevel(true);
}
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public class CommonStatsFlags implements Writeable, Cloneable {
private String[] levels = new String[0];
private boolean optimizeNodeIndicesStatsOnLevel = false;


/**
* @param flags flags to set. If no flags are supplied, default flags will be set.
*/
Expand Down Expand Up @@ -102,7 +101,7 @@ public CommonStatsFlags(StreamInput in) throws IOException {
includeCaches = in.readEnumSet(CacheType.class);
levels = in.readStringArray();
}
if (in.getVersion().onOrAfter(Version.V_2_15_0)) {
if (in.getVersion().onOrAfter(Version.V_2_16_0)) {
optimizeNodeIndicesStatsOnLevel = in.readBoolean();
}
}
Expand All @@ -129,7 +128,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeEnumSet(includeCaches);
out.writeStringArrayNullable(levels);
}
if (out.getVersion().onOrAfter(Version.V_2_15_0)){
if (out.getVersion().onOrAfter(Version.V_2_16_0)) {
out.writeBoolean(optimizeNodeIndicesStatsOnLevel);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,6 @@ public NodeIndicesStats stats(CommonStatsFlags flags) {
}
}
if (flags.optimizeNodeIndicesStatsOnLevel()) {
logger.info("Picked NodeIndicesStats");
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats, flags.getLevels());
}
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
Expand Down
101 changes: 39 additions & 62 deletions server/src/main/java/org/opensearch/indices/NodeIndicesStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@

package org.opensearch.indices;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.IndexShardStats;
Expand Down Expand Up @@ -78,13 +76,13 @@
*/
@PublicApi(since = "1.0.0")
public class NodeIndicesStats implements Writeable, ToXContentFragment {
private CommonStats stats;
private Map<Index, CommonStats> statsByIndex;
private Map<Index, List<IndexShardStats>> statsByShard;
protected CommonStats stats;
protected Map<Index, CommonStats> statsByIndex;
protected Map<Index, List<IndexShardStats>> statsByShard;

public NodeIndicesStats(StreamInput in) throws IOException {
stats = new CommonStats(in);
if (in.getVersion().onOrAfter(Version.V_2_15_0)) {
if (in.getVersion().onOrAfter(Version.V_2_16_0)) {
// contains statsByIndex
if (in.readBoolean()) {
statsByIndex = new HashMap<>();
Expand Down Expand Up @@ -136,11 +134,17 @@ public NodeIndicesStats(
}

if (levels != null) {
if (Arrays.stream(levels).anyMatch(NodeIndicesStats.levels.indices::equals)) {
this.statsByIndex = createStatsByIndex(statsByShard);
} else if (Arrays.stream(levels).anyMatch(NodeIndicesStats.levels.shards::equals)) {
this.statsByShard = statsByShard;
}
Arrays.stream(levels).anyMatch(level -> {
switch (level) {
case Fields.INDICES:
this.statsByIndex = createStatsByIndex(statsByShard);
return true;
case Fields.SHARDS:
this.statsByShard = statsByShard;
return true;
}
return false;
});
}
}

Expand Down Expand Up @@ -250,7 +254,7 @@ public RecoveryStats getRecoveryStats() {
public void writeTo(StreamOutput out) throws IOException {
stats.writeTo(out);

if (out.getVersion().onOrAfter(Version.V_2_15_0)) {
if (out.getVersion().onOrAfter(Version.V_2_16_0)) {
out.writeBoolean(statsByIndex != null);
if (statsByIndex != null) {
writeStatsByIndex(out);
Expand Down Expand Up @@ -300,29 +304,33 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject(Fields.INDICES);
stats.toXContent(builder, params);

if (levels.indices.equals(level)) {
builder.startObject(Fields.INDICES);
if (statsByIndex == null && statsByShard!=null) {
if (Fields.INDICES.equals(level)) {
if (statsByIndex == null && statsByShard != null) {
statsByIndex = createStatsByIndex(statsByShard);
}
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
builder.startObject(entry.getKey().getName());
entry.getValue().toXContent(builder, params);
builder.endObject();
builder.startObject(Fields.INDICES);
if (statsByIndex != null) {
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
builder.startObject(entry.getKey().getName());
entry.getValue().toXContent(builder, params);
builder.endObject();
}
}
builder.endObject();
} else if (levels.shards.equals(level)) {
} else if (Fields.SHARDS.equals(level)) {
builder.startObject("shards");
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
builder.startArray(entry.getKey().getName());
for (IndexShardStats indexShardStats : entry.getValue()) {
builder.startObject().startObject(String.valueOf(indexShardStats.getShardId().getId()));
for (ShardStats shardStats : indexShardStats.getShards()) {
shardStats.toXContent(builder, params);
if (statsByShard != null) {
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
builder.startArray(entry.getKey().getName());
for (IndexShardStats indexShardStats : entry.getValue()) {
builder.startObject().startObject(String.valueOf(indexShardStats.getShardId().getId()));
for (ShardStats shardStats : indexShardStats.getShards()) {
shardStats.toXContent(builder, params);
}
builder.endObject().endObject();
}
builder.endObject().endObject();
builder.endArray();
}
builder.endArray();
}
builder.endObject();
}
Expand Down Expand Up @@ -356,44 +364,13 @@ public List<IndexShardStats> getShardStats(Index index) {
}
}

public CommonStats getIndexStats(Index index) {
if (statsByIndex == null) {
return null;
} else {
return statsByIndex.get(index);
}
}

/**
* Fields used for parsing and toXContent
*
* @opensearch.internal
*/
static final class Fields {
static final String INDICES = "indices";
}

/**
* Levels for the NodeIndicesStats
*/
public enum levels {
node("node"),
indices("indices"),
shards("shards");

private final String name;

levels(String name) {
this.name = name;
}

@Override
public String toString() {
return name;
}

public boolean equals(String value) {
return this.name.equals(value);
}
public static final class Fields {
public static final String INDICES = "indices";
public static final String SHARDS = "shards";
}
}
Loading

0 comments on commit a267aef

Please sign in to comment.