Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for search replica to return segrep stats #16678

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add vertical scaling and SoftReference for snapshot repository data cache ([#16489](https://github.com/opensearch-project/OpenSearch/pull/16489))
- Support prefix list for remote repository attributes([#16271](https://github.com/opensearch-project/OpenSearch/pull/16271))
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
- Added implementation for the stats calculation for search and regular replica in shards ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678))

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
Expand Down Expand Up @@ -433,4 +434,103 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception {

}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
}

public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();

int numShards = 2;
assertAcked(
prepareCreate(
INDEX_NAME,
0,
Settings.builder()
.put("number_of_shards", numShards)
.put("number_of_replicas", 1)
.put("number_of_search_only_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
)
);
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index(INDEX_NAME, "doc", Integer.toString(i));
}
refresh(INDEX_NAME);
ensureSearchable(INDEX_NAME);

assertBusy(() -> {
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setDetailed(true)
.execute()
.actionGet();
SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0);
final SegmentReplicationState currentReplicationState = perGroupStats.getReplicaStats()
.stream()
.findFirst()
.get()
.getCurrentReplicationState();
assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1);
assertEquals(segmentReplicationStatsResponse.getTotalShards(), numShards * 3);
assertEquals(segmentReplicationStatsResponse.getSuccessfulShards(), numShards * 3);
assertNotNull(currentReplicationState);
assertEquals(currentReplicationState.getStage(), SegmentReplicationState.Stage.DONE);
assertTrue(currentReplicationState.getIndex().recoveredFileCount() > 0);
}, 1, TimeUnit.MINUTES);
}

public void testSegmentReplicationStatsResponseWithOnlySearchReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();

int numShards = 1;
assertAcked(
prepareCreate(
INDEX_NAME,
0,
Settings.builder()
.put("number_of_shards", numShards)
.put("number_of_replicas", 0)
.put("number_of_search_only_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
)
);
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index(INDEX_NAME, "doc", Integer.toString(i));
}
refresh(INDEX_NAME);
ensureSearchable(INDEX_NAME);

assertBusy(() -> {
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setDetailed(true)
.execute()
.actionGet();
SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0);
final SegmentReplicationState currentReplicationState = perGroupStats.getReplicaStats()
.stream()
.findFirst()
.get()
.getCurrentReplicationState();
assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1);
assertEquals(segmentReplicationStatsResponse.getTotalShards(), 2);
assertEquals(segmentReplicationStatsResponse.getSuccessfulShards(), 2);
assertNotNull(currentReplicationState);
assertEquals(currentReplicationState.getStage(), SegmentReplicationState.Stage.DONE);
assertTrue(currentReplicationState.getIndex().recoveredFileCount() > 0);
}, 1, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.indices.replication.SegmentReplicationState;

import java.io.IOException;
Expand All @@ -31,19 +32,31 @@
@Nullable
private final SegmentReplicationState replicaStats;

@Nullable
private final SegmentReplicationShardStats segmentReplicationShardStats;

public SegmentReplicationShardStatsResponse(StreamInput in) throws IOException {
this.primaryStats = in.readOptionalWriteable(SegmentReplicationPerGroupStats::new);
this.replicaStats = in.readOptionalWriteable(SegmentReplicationState::new);
this.segmentReplicationShardStats = in.readOptionalWriteable(SegmentReplicationShardStats::new);

Check warning on line 41 in server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java#L41

Added line #L41 was not covered by tests
}

public SegmentReplicationShardStatsResponse(SegmentReplicationPerGroupStats primaryStats) {
this.primaryStats = primaryStats;
this.replicaStats = null;
this.segmentReplicationShardStats = null;

Check warning on line 47 in server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java#L47

Added line #L47 was not covered by tests
}

public SegmentReplicationShardStatsResponse(SegmentReplicationState replicaStats) {
this.replicaStats = replicaStats;
this.primaryStats = null;
this.segmentReplicationShardStats = null;
}

Check warning on line 54 in server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java#L53-L54

Added lines #L53 - L54 were not covered by tests

public SegmentReplicationShardStatsResponse(SegmentReplicationShardStats segmentReplicationShardStats) {
this.primaryStats = null;
this.replicaStats = null;
this.segmentReplicationShardStats = segmentReplicationShardStats;

Check warning on line 59 in server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java#L56-L59

Added lines #L56 - L59 were not covered by tests
}

public SegmentReplicationPerGroupStats getPrimaryStats() {
Expand All @@ -54,10 +67,15 @@
return replicaStats;
}

public SegmentReplicationShardStats getSegmentReplicationShardStats() {
return segmentReplicationShardStats;

Check warning on line 71 in server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java#L71

Added line #L71 was not covered by tests
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(primaryStats);
out.writeOptionalWriteable(replicaStats);
out.writeOptionalWriteable(segmentReplicationShardStats);

Check warning on line 78 in server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java#L78

Added line #L78 was not covered by tests
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,23 @@
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -101,6 +106,9 @@
final Map<String, SegmentReplicationState> replicaStats = new HashMap<>();
// map of index name to list of replication group stats.
final Map<String, List<SegmentReplicationPerGroupStats>> primaryStats = new HashMap<>();
// search replica responses
final Set<SegmentReplicationShardStats> searchReplicaSegRepShardStats = new HashSet<>();

Check warning on line 110 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L110

Added line #L110 was not covered by tests

for (SegmentReplicationShardStatsResponse response : responses) {
if (response != null) {
if (response.getReplicaStats() != null) {
Expand All @@ -109,6 +117,11 @@
replicaStats.putIfAbsent(shardRouting.allocationId().getId(), response.getReplicaStats());
}
}

if (response.getSegmentReplicationShardStats() != null) {
searchReplicaSegRepShardStats.add(response.getSegmentReplicationShardStats());

Check warning on line 122 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L122

Added line #L122 was not covered by tests
}

if (response.getPrimaryStats() != null) {
final ShardId shardId = response.getPrimaryStats().getShardId();
if (shardsToFetch.isEmpty() || shardsToFetch.contains(shardId.getId())) {
Expand All @@ -134,6 +147,15 @@
}
}
}
// combine the search replica stats with the stats of other replicas
for (Map.Entry<String, List<SegmentReplicationPerGroupStats>> entry : primaryStats.entrySet()) {
for (SegmentReplicationPerGroupStats group : entry.getValue()) {
Set<SegmentReplicationShardStats> updatedSet = new HashSet<>(group.getReplicaStats());
updatedSet.addAll(searchReplicaSegRepShardStats);
group.setReplicaStats(updatedSet);
}
}

Check warning on line 157 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L153-L157

Added lines #L153 - L157 were not covered by tests

return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, primaryStats, shardFailures);
}

Expand All @@ -154,13 +176,17 @@

if (shardRouting.primary()) {
return new SegmentReplicationShardStatsResponse(pressureService.getStatsForShard(indexShard));
} else if (shardRouting.isSearchOnly()) {
SegmentReplicationShardStats segmentReplicationShardStats = calcualteSegmentReplicationShardStats(

Check warning on line 180 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L180

Added line #L180 was not covered by tests
shardRouting,
indexShard,
shardId,
request.activeOnly()

Check warning on line 184 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L184

Added line #L184 was not covered by tests
);
return new SegmentReplicationShardStatsResponse(segmentReplicationShardStats);

Check warning on line 186 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L186

Added line #L186 was not covered by tests
} else {
return new SegmentReplicationShardStatsResponse(getSegmentReplicationState(shardId, request.activeOnly()));

Check warning on line 188 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L188

Added line #L188 was not covered by tests
}

// return information about only on-going segment replication events.
if (request.activeOnly()) {
return new SegmentReplicationShardStatsResponse(targetService.getOngoingEventSegmentReplicationState(shardId));
}
return new SegmentReplicationShardStatsResponse(targetService.getSegmentReplicationState(shardId));
}

@Override
Expand All @@ -181,4 +207,75 @@
) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
}

private SegmentReplicationShardStats calcualteSegmentReplicationShardStats(
ShardRouting shardRouting,
IndexShard indexShard,
ShardId shardId,
boolean isActiveOnly
) {
ReplicationCheckpoint indexReplicationCheckpoint = indexShard.getLatestReplicationCheckpoint();
SegmentReplicationState segmentReplicationState = getSegmentReplicationState(shardId, isActiveOnly);

Check warning on line 218 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L217-L218

Added lines #L217 - L218 were not covered by tests
if (segmentReplicationState != null) {
ReplicationCheckpoint latestReplicationCheckpointReceived = segmentReplicationState.getLatestReplicationCheckpoint();

Check warning on line 220 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L220

Added line #L220 was not covered by tests

SegmentReplicationShardStats segmentReplicationShardStats = new SegmentReplicationShardStats(
shardRouting.allocationId().getId(),
calculateCheckpointsBehind(indexReplicationCheckpoint, latestReplicationCheckpointReceived),
calculateBytesBehind(indexReplicationCheckpoint, latestReplicationCheckpointReceived),

Check warning on line 225 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L222-L225

Added lines #L222 - L225 were not covered by tests
0,
calculateCurrentReplicationLag(shardId),
getLastCompletedReplicationLag(shardId)

Check warning on line 228 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L227-L228

Added lines #L227 - L228 were not covered by tests
);

segmentReplicationShardStats.setCurrentReplicationState(segmentReplicationState);
return segmentReplicationShardStats;

Check warning on line 232 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L231-L232

Added lines #L231 - L232 were not covered by tests
} else {
return new SegmentReplicationShardStats(shardRouting.allocationId().getId(), 0, 0, 0, 0, 0);

Check warning on line 234 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L234

Added line #L234 was not covered by tests
}
}

private SegmentReplicationState getSegmentReplicationState(ShardId shardId, boolean isActiveOnly) {
if (isActiveOnly) {
return targetService.getOngoingEventSegmentReplicationState(shardId);

Check warning on line 240 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L240

Added line #L240 was not covered by tests
} else {
return targetService.getSegmentReplicationState(shardId);

Check warning on line 242 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L242

Added line #L242 was not covered by tests
}
}

private long calculateCheckpointsBehind(
ReplicationCheckpoint indexReplicationCheckpoint,
ReplicationCheckpoint latestReplicationCheckpointReceived
) {
if (latestReplicationCheckpointReceived != null) {
return latestReplicationCheckpointReceived.getSegmentInfosVersion() - indexReplicationCheckpoint.getSegmentInfosVersion();

Check warning on line 251 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L251

Added line #L251 was not covered by tests
}
return 0;

Check warning on line 253 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L253

Added line #L253 was not covered by tests
}

private long calculateBytesBehind(
ReplicationCheckpoint indexReplicationCheckpoint,
ReplicationCheckpoint latestReplicationCheckpointReceived
) {
if (latestReplicationCheckpointReceived != null) {
Store.RecoveryDiff diff = Store.segmentReplicationDiff(
latestReplicationCheckpointReceived.getMetadataMap(),
indexReplicationCheckpoint.getMetadataMap()

Check warning on line 263 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L261-L263

Added lines #L261 - L263 were not covered by tests
);
return diff.missing.stream().mapToLong(StoreFileMetadata::length).sum();

Check warning on line 265 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L265

Added line #L265 was not covered by tests
}
return 0;

Check warning on line 267 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L267

Added line #L267 was not covered by tests
}

private long calculateCurrentReplicationLag(ShardId shardId) {
SegmentReplicationState ongoingEventSegmentReplicationState = targetService.getOngoingEventSegmentReplicationState(shardId);

Check warning on line 271 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L271

Added line #L271 was not covered by tests
return ongoingEventSegmentReplicationState != null ? ongoingEventSegmentReplicationState.getTimer().time() : 0;
}

private long getLastCompletedReplicationLag(ShardId shardId) {
SegmentReplicationState lastCompletedSegmentReplicationState = targetService.getlatestCompletedEventSegmentReplicationState(

Check warning on line 276 in server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java#L276

Added line #L276 was not covered by tests
shardId
);
return lastCompletedSegmentReplicationState != null ? lastCompletedSegmentReplicationState.getTimer().time() : 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class SegmentReplicationPerGroupStats implements Writeable, ToXContentFragment {

private final ShardId shardId;
private final Set<SegmentReplicationShardStats> replicaStats;
private Set<SegmentReplicationShardStats> replicaStats;
private final long rejectedRequestCount;

public SegmentReplicationPerGroupStats(ShardId shardId, Set<SegmentReplicationShardStats> replicaStats, long rejectedRequestCount) {
Expand All @@ -55,6 +55,10 @@
return shardId;
}

public void setReplicaStats(Set<SegmentReplicationShardStats> replicaStats) {
this.replicaStats = replicaStats;
}

Check warning on line 60 in server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java#L59-L60

Added lines #L59 - L60 were not covered by tests

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("rejected_requests", rejectedRequestCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
this.currentReplicationTimeMillis = in.readVLong();
this.lastCompletedReplicationTimeMillis = in.readVLong();
this.currentReplicationLagMillis = in.readVLong();
this.currentReplicationState = in.readOptionalWriteable(SegmentReplicationState::new);

Check warning on line 67 in server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java#L67

Added line #L67 was not covered by tests
}

public String getAllocationId() {
Expand Down Expand Up @@ -118,7 +119,7 @@
builder.field("current_replication_lag", new TimeValue(currentReplicationLagMillis));
builder.field("last_completed_replication_time", new TimeValue(lastCompletedReplicationTimeMillis));
if (currentReplicationState != null) {
builder.startObject();
builder.startObject("current_replication_state");

Check warning on line 122 in server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java#L122

Added line #L122 was not covered by tests
currentReplicationState.toXContent(builder, params);
builder.endObject();
}
Expand All @@ -134,6 +135,7 @@
out.writeVLong(currentReplicationTimeMillis);
out.writeVLong(lastCompletedReplicationTimeMillis);
out.writeVLong(currentReplicationLagMillis);
out.writeOptionalWriteable(currentReplicationState);

Check warning on line 138 in server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java#L138

Added line #L138 was not covered by tests
}

@Override
Expand Down
Loading
Loading