Skip to content

Commit

Permalink
Adding MapperServiceProvider for plugins
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Sep 30, 2024
1 parent 0b96565 commit de087da
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
- Add MapperServiceProvider for plugins ([#16110](https://github.com/opensearch-project/OpenSearch/pull/16110))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,9 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
searchRequestContext.setTotalHits(internalSearchResponse.hits().getTotalHits());
searchRequestContext.setShardStats(results.getNumShards(), successfulOps.get(), skippedOps.get(), failures.length);
searchRequestContext.setSuccessfulSearchShardIndices(
results.getSuccessfulResults().map(result -> result.getSearchShardTarget().getIndex()).collect(Collectors.toSet())
results.getSuccessfulResults()
.map(result -> result.getSearchShardTarget().getShardId().getIndex())
.collect(Collectors.toSet())
);
onPhaseEnd(searchRequestContext);
onRequestEnd(searchRequestContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TotalHits;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.core.index.Index;
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;

import java.util.ArrayList;
Expand All @@ -37,7 +38,7 @@ public class SearchRequestContext {
private final Map<String, Long> phaseTookMap;
private TotalHits totalHits;
private final EnumMap<ShardStatsFieldNames, Integer> shardStats;
private Set<String> successfulSearchShardIndices;
private Set<Index> successfulSearchShardIndices;

private final SearchRequest searchRequest;
private final LinkedBlockingQueue<TaskResourceInfo> phaseResourceUsage;
Expand Down Expand Up @@ -144,15 +145,15 @@ public SearchRequest getRequest() {
return searchRequest;
}

void setSuccessfulSearchShardIndices(Set<String> successfulSearchShardIndices) {
void setSuccessfulSearchShardIndices(Set<Index> successfulSearchShardIndices) {
this.successfulSearchShardIndices = successfulSearchShardIndices;
}

/**
* @return A {@link List} of {@link String} representing the names of the indices that were
* @return A {@link Set} of {@link Index} representing the indices that were
* successfully queried at the shard level.
*/
public Set<String> getSuccessfulSearchShardIndices() {
public Set<Index> getSuccessfulSearchShardIndices() {
return successfulSearchShardIndices;
}
}
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,19 @@ public boolean hasIndex(Index index) {
return indices.containsKey(index.getUUID());
}

/**
* Returns a MapperService for the specified index if exists otherwise returns <code>null</code>.
*/
@Nullable
public MapperService getMapperService(Index index) {
IndexService indexService = indexService(index);
if (indexService == null) {
return null;
} else {
return indexService.mapperService();
}
}

/**
* Returns an IndexService for the specified index if exists otherwise returns <code>null</code>.
*/
Expand Down
25 changes: 25 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.plugins.IngestPlugin;
import org.opensearch.plugins.MapperPlugin;
import org.opensearch.plugins.MappingAwarePlugin;
import org.opensearch.plugins.MetadataUpgrader;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.PersistentTaskPlugin;
Expand Down Expand Up @@ -1023,6 +1024,30 @@ protected Node(
// Add the telemetryAwarePlugin components to the existing pluginComponents collection.
pluginComponents.addAll(telemetryAwarePluginComponents);

Collection<Object> mappingAwarePluginComponents = pluginsService.filterPlugins(MappingAwarePlugin.class)
.stream()
.flatMap(
p -> p.createComponents(
client,
clusterService,
threadPool,
resourceWatcherService,
scriptService,
xContentRegistry,
environment,
nodeEnvironment,
namedWriteableRegistry,
clusterModule.getIndexNameExpressionResolver(),
repositoriesServiceReference::get,
metricsRegistry,
indicesService::getMapperService
).stream()
)
.collect(Collectors.toList());

// Add the mappingAwarePluginComponents components to the existing pluginComponents collection.
pluginComponents.addAll(mappingAwarePluginComponents);

List<IdentityAwarePlugin> identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class);
identityService.initializeIdentityAwarePlugins(identityAwarePlugins);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugins;

import org.opensearch.core.index.Index;
import org.opensearch.index.mapper.MapperService;

@FunctionalInterface
public interface MapperServiceProvider {
MapperService getMapperService(Index index);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugins;

import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.core.common.io.stream.NamedWriteable;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.Collections;
import java.util.function.Supplier;

/**
* Plugin that uses {@link IndicesService}
*
* @opensearch.internal
*/
@ExperimentalApi
public interface MappingAwarePlugin {
/**
* Returns components added by this plugin.
* <p>
* Any components returned that implement {@link LifecycleComponent} will have their lifecycle managed.
* Note: To aid in the migration away from guice, all objects returned as components will be bound in guice
* to themselves.
*
* @param client A client to make requests to the system
* @param clusterService A service to allow watching and updating cluster state
* @param threadPool A service to allow retrieving an executor to run an async action
* @param resourceWatcherService A service to watch for changes to node local files
* @param scriptService A service to allow running scripts on the local node
* @param xContentRegistry the registry for extensible xContent parsing
* @param environment the environment for path and setting configurations
* @param nodeEnvironment the node environment used coordinate access to the data paths
* @param namedWriteableRegistry the registry for {@link NamedWriteable} object parsing
* @param indexNameExpressionResolver A service that resolves expression to index and alias names
* @param repositoriesServiceSupplier A supplier for the service that manages snapshot repositories; will return null when this method
* is called, but will return the repositories service once the node is initialized.
* @param metricsRegistry the registry for metrics instrumentation.
* @param mapperServiceProvider A mapping service provider to get index mappings
*/
default Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier,
MetricsRegistry metricsRegistry,
MapperServiceProvider mapperServiceProvider
) {
return Collections.emptyList();
}
}

0 comments on commit de087da

Please sign in to comment.