Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic search detectors tool; pull plugin deps in gradle run #39

Merged
merged 10 commits into from
Dec 22, 2023
70 changes: 53 additions & 17 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ buildscript {
opensearch_version = System.getProperty("opensearch.version", "3.0.0-SNAPSHOT")
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
version_tokens = opensearch_version.tokenize('-')
opensearch_build = version_tokens[0] + '.0'
if (buildVersionQualifier) {
opensearch_build += "-${buildVersionQualifier}"
}
if (isSnapshot) {
opensearch_build += "-SNAPSHOT"
}
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
}

repositories {
Expand Down Expand Up @@ -77,13 +69,15 @@ apply plugin: 'opensearch.testclusters'
apply plugin: 'opensearch.pluginzip'

def sqlJarDirectory = "$buildDir/dependencies/opensearch-sql-plugin"
def jsJarDirectory = "$buildDir/dependencies/opensearch-job-scheduler"
def adJarDirectory = "$buildDir/dependencies/opensearch-anomaly-detection"

configurations {
zipArchive
all {
resolutionStrategy {
force "org.mockito:mockito-core:5.8.0"
force "com.google.guava:guava:32.1.3-jre" // CVE for 31.1
force "com.google.guava:guava:32.1.2-jre" // CVE for 31.1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment why we are not using the latest version? Mend will generate an auto-bump for this (if it hasn't already) and would like to include a reason when closing it to prevent future bumps...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the catch. Reverted this as it's a leftover change from my inter-plugin dependency issues.

force("org.eclipse.platform:org.eclipse.core.runtime:3.30.0") // CVE for < 3.29.0, forces JDK17 for spotless
}
}
Expand All @@ -96,28 +90,51 @@ task addJarsToClasspath(type: Copy) {
include "protocol-${version}.jar"
}
into("$buildDir/classes")

from(fileTree(dir: jsJarDirectory)) {
include "opensearch-job-scheduler-${version}.jar"
}
into("$buildDir/classes")

from(fileTree(dir: adJarDirectory)) {
include "opensearch-anomaly-detection-${version}.jar"
}
into("$buildDir/classes")
}

dependencies {
compileOnly group: 'org.opensearch', name:'opensearch-ml-client', version: "${version}"
// 3P dependencies
compileOnly group: 'com.google.code.gson', name: 'gson', version: '2.10.1'
compileOnly "org.apache.logging.log4j:log4j-slf4j-impl:2.22.0"
compileOnly group: 'org.json', name: 'json', version: '20231013'
zipArchive group: 'org.opensearch.plugin', name:'opensearch-sql-plugin', version: "${version}"
implementation("com.google.guava:guava:32.1.3-jre")
implementation("com.google.guava:guava:32.1.2-jre")
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved

// Plugin dependencies
compileOnly group: 'org.opensearch', name:'opensearch-ml-client', version: "${version}"
implementation fileTree(dir: jsJarDirectory, include: ["opensearch-job-scheduler-${version}.jar"])
implementation fileTree(dir: adJarDirectory, include: ["opensearch-anomaly-detection-${version}.jar"])
implementation fileTree(dir: sqlJarDirectory, include: ["opensearch-sql-${version}.jar", "ppl-${version}.jar", "protocol-${version}.jar"])
compileOnly "org.opensearch:common-utils:${version}"

// ZipArchive dependencies used for integration tests
zipArchive group: 'org.opensearch.plugin', name:'opensearch-ml-plugin', version: "${version}"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${version}"
zipArchive "org.opensearch.plugin:opensearch-anomaly-detection:${version}"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-sql-plugin', version: "${version}"

// Test dependencies
testImplementation "org.opensearch.test:framework:${opensearch_version}"
testImplementation "org.mockito:mockito-core:5.8.0"
testImplementation group: 'junit', name: 'junit', version: '4.13.2'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.8.0'
testImplementation group: 'org.mockito', name: 'mockito-inline', version: '5.2.0'
testImplementation("net.bytebuddy:byte-buddy:1.14.7")
testImplementation("net.bytebuddy:byte-buddy-agent:1.14.7")
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.1'
testImplementation 'org.mockito:mockito-junit-jupiter:5.8.0'
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0"
testImplementation "com.cronutils:cron-utils:9.2.1"
testImplementation "commons-validator:commons-validator:1.8.0"
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.1'

// ZipArchive dependencies used for integration tests
zipArchive group: 'org.opensearch.plugin', name:'opensearch-ml-plugin', version: "${opensearch_build}"
}

task extractSqlJar(type: Copy) {
Expand All @@ -126,7 +143,21 @@ task extractSqlJar(type: Copy) {
into sqlJarDirectory
}

task extractJsJar(type: Copy) {
mustRunAfter()
from(zipTree(configurations.zipArchive.find { it.name.startsWith("opensearch-job-scheduler")}))
into jsJarDirectory
}

task extractAdJar(type: Copy) {
mustRunAfter()
from(zipTree(configurations.zipArchive.find { it.name.startsWith("opensearch-anomaly-detection")}))
into adJarDirectory
}

tasks.addJarsToClasspath.dependsOn(extractSqlJar)
tasks.addJarsToClasspath.dependsOn(extractJsJar)
tasks.addJarsToClasspath.dependsOn(extractAdJar)
project.tasks.delombok.dependsOn(addJarsToClasspath)
tasks.publishNebulaPublicationToMavenLocal.dependsOn ':generatePomFileForPluginZipPublication'
tasks.validateNebulaPom.dependsOn ':generatePomFileForPluginZipPublication'
Expand All @@ -137,12 +168,13 @@ testingConventions.enabled = false
thirdPartyAudit.enabled = false

test {
useJUnitPlatform()
testLogging {
exceptionFormat "full"
events "skipped", "passed", "failed" // "started"
showStandardStreams true
}
include '**/*Tests.class'
systemProperty 'tests.security.manager', 'false'
}

spotless {
Expand All @@ -161,6 +193,8 @@ spotless {

compileJava {
dependsOn extractSqlJar
dependsOn extractJsJar
dependsOn extractAdJar
dependsOn delombok
options.compilerArgs.addAll(["-processor", 'lombok.launch.AnnotationProcessorHider$AnnotationProcessor'])
}
Expand All @@ -169,6 +203,8 @@ compileTestJava {
options.compilerArgs.addAll(["-processor", 'lombok.launch.AnnotationProcessorHider$AnnotationProcessor'])
}

forbiddenApisTest.ignoreFailures = true


opensearchplugin {
name 'skills'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.agent.tools;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.client.AnomalyDetectionNodeClient;
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
import org.opensearch.client.Client;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.query.WildcardQueryBuilder;
import org.opensearch.ml.common.output.model.ModelTensors;
import org.opensearch.ml.common.spi.tools.Parser;
import org.opensearch.ml.common.spi.tools.Tool;
import org.opensearch.ml.common.spi.tools.ToolAnnotation;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.sort.SortOrder;

import lombok.Getter;
import lombok.Setter;

@ToolAnnotation(SearchAnomalyDetectorsTool.TYPE)
public class SearchAnomalyDetectorsTool implements Tool {
public static final String TYPE = "SearchAnomalyDetectorsTool";
private static final String DEFAULT_DESCRIPTION = "Use this tool to search anomaly detectors.";

@Setter
@Getter
private String name = TYPE;
@Getter
@Setter
private String description = DEFAULT_DESCRIPTION;
@Getter
private String type;
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
@Getter
private String version;

private Client client;

private AnomalyDetectionNodeClient adClient;

@Setter
private Parser<?, ?> inputParser;
@Setter
private Parser<?, ?> outputParser;

public SearchAnomalyDetectorsTool(Client client) {
this.client = client;
this.adClient = new AnomalyDetectionNodeClient(client);

// probably keep this overridden output parser. need to ensure the output matches what's expected
outputParser = new Parser<>() {
@Override
public Object parse(Object o) {
@SuppressWarnings("unchecked")
List<ModelTensors> mlModelOutputs = (List<ModelTensors>) o;
return mlModelOutputs.get(0).getMlModelTensors().get(0).getDataAsMap().get("response");
}
};
}

// Response is currently in a simple string format including the list of anomaly detectors (only name and ID attached), and
// number of total detectors. The output will likely need to be updated, standardized, and include more fields in the
// future to cover a sufficient amount of potential questions the agent will need to handle.
@Override
public <T> void run(Map<String, String> parameters, ActionListener<T> listener) {
final String detectorName = parameters.getOrDefault("detectorName", null);
final String detectorNamePattern = parameters.getOrDefault("detectorNamePattern", null);
final String indices = parameters.getOrDefault("indices", null);
final Boolean highCardinality = parameters.containsKey("highCardinality")
? Boolean.parseBoolean(parameters.get("highCardinality"))
: null;
final Long lastUpdateTime = parameters.containsKey("lastUpdateTime") ? Long.parseLong(parameters.get("lastUpdateTime")) : null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long.parseLong might throw NumberFormatException if the input parameter is null or invalid number format, suggestion is to use StringUtils.isNumeric() to check if the input is a number.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I will update.

final String sortOrderStr = parameters.getOrDefault("sortOrder", "asc");
final SortOrder sortOrder = sortOrderStr == "asc" ? SortOrder.ASC : SortOrder.DESC;
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
final String sortString = parameters.getOrDefault("sortString", "name.keyword");
final int size = parameters.containsKey("size") ? Integer.parseInt(parameters.get("size")) : 20;
final int startIndex = parameters.containsKey("startIndex") ? Integer.parseInt(parameters.get("startIndex")) : 0;
final Boolean running = parameters.containsKey("running") ? Boolean.parseBoolean(parameters.get("running")) : null;
final Boolean disabled = parameters.containsKey("disabled") ? Boolean.parseBoolean(parameters.get("disabled")) : null;
final Boolean failed = parameters.containsKey("failed") ? Boolean.parseBoolean(parameters.get("failed")) : null;

List<QueryBuilder> mustList = new ArrayList<QueryBuilder>();
if (detectorName != null) {
mustList.add(new TermQueryBuilder("name.keyword", detectorName));
}
if (detectorNamePattern != null) {
mustList.add(new WildcardQueryBuilder("name.keyword", detectorNamePattern));
}
if (indices != null) {
mustList.add(new TermQueryBuilder("indices", indices));
}
if (highCardinality != null) {
mustList.add(new TermQueryBuilder("detector_type", highCardinality ? "MULTI_ENTITY" : "SINGLE_ENTITY"));
}
if (lastUpdateTime != null) {
mustList.add(new BoolQueryBuilder().filter(new RangeQueryBuilder("last_update_time").gte(lastUpdateTime)));

}

BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.must().addAll(mustList);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(boolQueryBuilder)
.size(size)
.from(startIndex)
.sort(sortString, sortOrder);

SearchRequest searchDetectorRequest = new SearchRequest().source(searchSourceBuilder);

if (running != null || disabled != null || failed != null) {
// TODO: add a listener to trigger when the first response is received, to trigger the profile API call
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this missed to address or will be addressed in future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be addressed in the future. I'll create a follow up issue alongside some other improvements soon.

// to fetch the detector state, etc.
// Will need AD client to onboard the profile API first.
}

ActionListener<SearchResponse> searchDetectorListener = ActionListener.<SearchResponse>wrap(response -> {
StringBuilder sb = new StringBuilder();
SearchHit[] hits = response.getHits().getHits();
sb.append("AnomalyDetectors=[");
for (SearchHit hit : hits) {
sb.append("{");
sb.append("id=").append(hit.getId()).append(",");
sb.append("name=").append(hit.getSourceAsMap().get("name"));
sb.append("}");
}
sb.append("]");
sb.append("TotalAnomalyDetectors=").append(response.getHits().getTotalHits().value);
listener.onResponse((T) sb.toString());
}, e -> { listener.onFailure(e); });

adClient.searchAnomalyDetectors(searchDetectorRequest, searchDetectorListener);
}

@Override
public boolean validate(Map<String, String> parameters) {
return true;
}

@Override
public String getType() {
return TYPE;
}
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved

/**
* Factory for the {@link SearchAnomalyDetectorsTool}
*/
public static class Factory implements Tool.Factory<SearchAnomalyDetectorsTool> {
private Client client;

private AnomalyDetectionNodeClient adClient;

private static Factory INSTANCE;

/**
* Create or return the singleton factory instance
*/
public static Factory getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
synchronized (SearchAnomalyDetectorsTool.class) {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Factory();
return INSTANCE;
}
}

/**
* Initialize this factory
* @param client The OpenSearch client
*/
public void init(Client client) {
this.client = client;
this.adClient = new AnomalyDetectionNodeClient(client);
}

@Override
public SearchAnomalyDetectorsTool create(Map<String, Object> map) {
return new SearchAnomalyDetectorsTool(client);
}

@Override
public String getDefaultDescription() {
return DEFAULT_DESCRIPTION;
}
}

}
Loading
Loading