Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG]: a bulk request with Update operation for existing document results in 400 Bad request #590

Closed
IgorDomshchikov opened this issue Aug 2, 2023 · 6 comments · Fixed by #690
Assignees
Labels
bug Something isn't working good first issue Good for newcomers

Comments

@IgorDomshchikov
Copy link

Updating an existing document using a Bulk API with update operation type results in 400 Bad request.

Steps to reproduce the issue:

  1. Insert/Find a document to be updated in the index.
  2. Create update BulkRequest obj. The doc is a POJO obj that reflects the document in the Index.
public class MyDoc implements Serializable {

    private static final long serialVersionUID = 6880667215923483985L;

    private long id;
    private String status;
    // other fields, getters, setters.

}
MyDoc doc = MyDoc();
doc.setStatus("active");
BulkRequest request = new BulkRequest.Builder().operations(o -> o.update(u -> u.index(indexName).id(String.valueOf(id)).document(doc))).build();
BulkResponse bulkResponse = openSearchClient.bulk(request);

The received result is 400 Bad Request

Expected result: the document should be successfully updated if it exists in the index.

In the debug mode, the parsed bulk request looks different than it should per the bulk update documentation. It lacks the "doc" parent object, which should wrap the partially updated document:

{ "update" : { "_id" : "361710", "_index" : "program_search" }}
{ "status": "draft" }. <- doc is missing.


{ "update": { "_index": "movies", "_id": "tt0816711" } }
{ "doc": { "title": "World War Z" } }

The issue is reproducible using the latest available OpenSearch Java client version

<artifactId>opensearch-rest-client</artifactId>
<version>2.9.0</version>
<artifactId>opensearch-java</artifactId>
 <version>2.4.0</version>

The OpenSearch is provisioned by AWS.

If I replace a bulk update request with a single document update request, I get successful results. The parsed request structure matches the documentation one:

UpdateRequest<Object, Object> request = new UpdateRequest.Builder<>().id(String.valueOf(id)).doc(doc).index(indexName).build();
openSearchClient.update(request, Object.class);

{"doc":{"status":"active"}}

@IgorDomshchikov IgorDomshchikov added bug Something isn't working untriaged labels Aug 2, 2023
@dblock
Copy link
Member

dblock commented Aug 2, 2023

Looks like a legit bug. Want to try to turn it into a (failing) unit test?

@IgorDomshchikov
Copy link
Author

@dblock , thanks for your response. I will write a unit test for it.

@wbeckler wbeckler added good first issue Good for newcomers and removed untriaged labels Sep 19, 2023
@karthiks3000
Copy link
Contributor

Maybe this is only reproducible when using the RestClientTransport. I couldn't reproduce the error using ApacheHttpClient5Transport client.
I created a test based off the ones available under the samples folder. The sample code -

  • creates an index if it doesn't exist
  • adds 2 documents with the text "Text for document 1" and "Text for document 2"
  • Searches for those documents using a match on the text field
  • Loops through the results and performs the bulk update operation to change the text value to "Updated document"
  • Searches for the documents again
  • Loops through the results and prints the updated document values

The commented out code is another way to do the bulk update for both documents at once.
Here is the example -

package org.opensearch.client.samples;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.FieldValue;
import org.opensearch.client.opensearch._types.Refresh;
import org.opensearch.client.opensearch._types.mapping.IntegerNumberProperty;
import org.opensearch.client.opensearch._types.mapping.Property;
import org.opensearch.client.opensearch._types.mapping.TypeMapping;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;
import org.opensearch.client.opensearch.core.IndexRequest;
import org.opensearch.client.opensearch.core.SearchRequest;
import org.opensearch.client.opensearch.core.SearchResponse;
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.client.opensearch.core.bulk.IndexOperation;
import org.opensearch.client.opensearch.core.search.Hit;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.DeleteIndexRequest;
import org.opensearch.client.opensearch.indices.IndexSettings;
import org.opensearch.client.samples.util.IndexData;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class MyTest {
    private static final Logger LOGGER = LogManager.getLogger(MyTest.class);

    public static void main(String[] args) {
        try {
            var client = SampleClient.create();

            var version = client.info().version();
            LOGGER.info("Server: {}@{}", version.distribution(), version.number());

            final var indexName = "my-index";

            if (!client.indices().exists(r -> r.index(indexName)).value()) {
                LOGGER.info("Creating index {}", indexName);
                IndexSettings settings = new IndexSettings.Builder().numberOfShards("2").numberOfReplicas("1").build();
                TypeMapping mapping = new TypeMapping.Builder().properties(
                        "age",
                        new Property.Builder().integer(new IntegerNumberProperty.Builder().build()).build()
                ).build();
                CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(indexName)
                        .settings(settings)
                        .mappings(mapping)
                        .build();
                client.indices().create(createIndexRequest);
            }
            var result = client.indices().getFieldMapping(m -> m.index("my-index").fields("my-field"));
            LOGGER.info("Index mapping: {}", result.result());
            LOGGER.info("Indexing documents");
            IndexData indexData = new IndexData("Document 1", "Text for document 1");
            IndexRequest<IndexData> indexRequest = new IndexRequest.Builder<IndexData>().index(indexName)
                    .id("1")
                    .document(indexData)
                    .build();
            client.index(indexRequest);
            indexData = new IndexData("Document 2", "Text for document 2");
            indexRequest = new IndexRequest.Builder<IndexData>().index(indexName).id("2").document(indexData).build();
            client.index(indexRequest);

            // wait for the document to index
            Thread.sleep(3000);

            IndexData searchedData;
            ArrayList<BulkOperation> ops = new ArrayList<>();

            for (var hit : getSearchResults(client, "text", "Text for document")) {
                LOGGER.info("Found {} with score {} and id {}", hit.source(), hit.score(), hit.id());
                searchedData = hit.source();
                IndexData finalSearchedData = searchedData;
                finalSearchedData.setText("Updated document");
                ops.add(new BulkOperation.Builder().index(IndexOperation.of(io -> io.index(indexName).id(hit.id()).document(finalSearchedData))).build());
                BulkRequest request = new BulkRequest.Builder().operations(o -> o.update(u -> u.index(indexName)
                        .id(hit.id()).document(finalSearchedData))).build();
                BulkResponse bulkResponse = client.bulk(request);
                LOGGER.info("Bulk response items: {}", bulkResponse.items().size());
            }
//            BulkRequest.Builder bulkReq = new BulkRequest.Builder().index(indexName).operations(ops).refresh(Refresh.WaitFor);
//            BulkResponse bulkResponse = client.bulk(bulkReq.build());
//            LOGGER.info("Bulk response items: {}", bulkResponse.items().size());

            for (var hit : getSearchResults(client, "text", "Text for document")) {
                LOGGER.info("Updated record: id = {} | Title = {} | Text = {}", hit.id(), hit.source().getTitle(), hit.source().getText());
            }

            LOGGER.info("Deleting index {}", indexName);
            DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(indexName).build();
            client.indices().delete(deleteIndexRequest);

        } catch (Exception e) {
            LOGGER.error("Unexpected exception", e);
        }
    }

    private static List<Hit<IndexData>> getSearchResults(OpenSearchClient client, String field, String value) throws IOException {
        SearchRequest searchRequest = new SearchRequest.Builder().query(
                q -> q.match(m -> m.field(field).query(FieldValue.of(value)))
        ).build();

        SearchResponse<IndexData> searchResponse = client.search(searchRequest, IndexData.class);
        return searchResponse.hits().hits();
    }
}

@karthiks3000
Copy link
Contributor

@IgorDomshchikov I couldn't get my code to create a rest client with the 2.9.0 version of the library. However, I was able to validate that the bulk update works as expected with the 3.0.0-SNAPSHOT version. Most of the code I added above remains the same for this test, except the following -

  1. Added a dependency to the build.gradle.kts file under the samples directory
    implementation("org.opensearch.client","opensearch-rest-client","3.0.0-SNAPSHOT")
  2. Added the below method to the SampleClient.java file -
public static OpenSearchClient createRestClient()  {
        var env = System.getenv();
        var https = Boolean.parseBoolean(env.getOrDefault("HTTPS", "true"));
        var hostname = env.getOrDefault("HOST", "localhost");
        var port = Integer.parseInt(env.getOrDefault("PORT", "9200"));
        var user = env.getOrDefault("USERNAME", "admin");
        var pass = env.getOrDefault("PASSWORD", "admin");
        final var hosts = new HttpHost[] { new HttpHost(https ? "https" : "http", hostname, port) };

        final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(
                new AuthScope(hosts[0]),
                new UsernamePasswordCredentials(user, pass.toCharArray())
        );

        final RestClient restClient = RestClient.builder(hosts)
                .setHttpClientConfigCallback(httpClientBuilder -> {
                    try {
                        final TlsStrategy tlsStrategy = ClientTlsStrategyBuilder.create()
                                .setSslContext(SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build())
                                .setHostnameVerifier(NoopHostnameVerifier.INSTANCE)
                                .setTlsDetailsFactory(sslEngine -> new TlsDetails(sslEngine.getSession(), sslEngine.getApplicationProtocol()))
                                .build();

                        final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder.create()
                                .setTlsStrategy(tlsStrategy)
                                .build();

                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setConnectionManager(connectionManager);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                })
                .build();

        final RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
        return new OpenSearchClient(transport);
    }
  1. Updated my code from above to now use the createRestClient method when initializing the client -
    var client = SampleClient.createRestClient();

I also threw in a wait after the update operations since in some instances the code would run the select before the updates reflected in the index.

// wait for the changes to reflect
            Thread.sleep(3000);
            for (var hit : getSearchResults(client, "text", "Text for document")) {
                LOGGER.info("Updated record: id = {} | Title = {} | Text = {}", hit.id(), hit.source().getTitle(), hit.source().getText());
            }

Here is my console output -
Screenshot 2023-10-16 at 7 05 28 PM

@karthiks3000
Copy link
Contributor

@IgorDomshchikov I was able to setup a test using the versions you mentioned with the only difference being my opensearch was running on my local on docker. I'm unable to reproduce the error with either transport protocol.
I've created a sample repo with my code & also tested it with the latest versions of the java client (2.7.0) & rest client (2.11.0).

@VachaShah
Copy link
Collaborator

@IgorDomshchikov A similar example was added by @karthiks3000 in #690 to the samples folder which works successfully. Can you please let us know if this is the same use-case you have or if there is anything different about your use-case and you are still facing issues? Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working good first issue Good for newcomers
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants