Skip to content

Commit

Permalink
Update workflow state without using painless script (#894)
Browse files Browse the repository at this point in the history
* Update resources without using painless script

Signed-off-by: Daniel Widdis <[email protected]>

* Remove error field without using painless script

Signed-off-by: Daniel Widdis <[email protected]>

* Remove unused script update method

Signed-off-by: Daniel Widdis <[email protected]>

* Reorder methods for logical relationships

Signed-off-by: Daniel Widdis <[email protected]>

* Add change log, more test coverage

Signed-off-by: Daniel Widdis <[email protected]>

* Do 5 retries for resource list updates

Signed-off-by: Daniel Widdis <[email protected]>

* Make retries a constant

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis authored Oct 3, 2024
1 parent 5f87827 commit d0879d7
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 120 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
- Add query assist data summary agent into sample templates ([#875](https://github.com/opensearch-project/flow-framework/pull/875))
### Maintenance
### Refactoring
- Update workflow state without using painless script ([#894](https://github.com/opensearch-project/flow-framework/pull/894))
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;
Expand All @@ -45,12 +46,13 @@
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.index.engine.VersionConflictEngineException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -80,6 +82,8 @@ public class FlowFrameworkIndicesHandler {
private static final Map<String, AtomicBoolean> indexMappingUpdated = new HashMap<>();
private static final Map<String, Object> indexSettings = Map.of("index.auto_expand_replicas", "0-1");
private final NamedXContentRegistry xContentRegistry;
// Retries in case of simultaneous updates
private static final int RETRIES = 5;

/**
* constructor
Expand Down Expand Up @@ -576,14 +580,14 @@ public <T> void canDeleteWorkflowStateDoc(
}

/**
* Updates a document in the workflow state index
* Updates a complete document in the workflow state index
* @param documentId the document ID
* @param updatedFields the fields to update the global state index with
* @param updatedDocument a complete document to update the global state index with
* @param listener action listener
*/
public void updateFlowFrameworkSystemIndexDoc(
String documentId,
Map<String, Object> updatedFields,
ToXContentObject updatedDocument,
ActionListener<UpdateResponse> listener
) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
Expand All @@ -593,11 +597,11 @@ public void updateFlowFrameworkSystemIndexDoc(
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, documentId);
Map<String, Object> updatedContent = new HashMap<>(updatedFields);
updateRequest.doc(updatedContent);
XContentBuilder builder = XContentFactory.jsonBuilder();
updatedDocument.toXContent(builder, null);
updateRequest.doc(builder);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.retryOnConflict(5);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
updateRequest.retryOnConflict(RETRIES);
client.update(updateRequest, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
String errorMessage = "Failed to update " + WORKFLOW_STATE_INDEX + " entry : " + documentId;
Expand All @@ -608,99 +612,60 @@ public void updateFlowFrameworkSystemIndexDoc(
}

/**
* Deletes a document in the workflow state index
* Updates a partial document in the workflow state index
* @param documentId the document ID
* @param updatedFields the fields to update the global state index with
* @param listener action listener
*/
public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener<DeleteResponse> listener) {
public void updateFlowFrameworkSystemIndexDoc(
String documentId,
Map<String, Object> updatedFields,
ActionListener<UpdateResponse> listener
) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = "Failed to delete document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
String errorMessage = "Failed to update document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
DeleteRequest deleteRequest = new DeleteRequest(WORKFLOW_STATE_INDEX, documentId);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));
UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, documentId);
Map<String, Object> updatedContent = new HashMap<>(updatedFields);
updateRequest.doc(updatedContent);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.retryOnConflict(RETRIES);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
client.update(updateRequest, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
String errorMessage = "Failed to delete " + WORKFLOW_STATE_INDEX + " entry : " + documentId;
String errorMessage = "Failed to update " + WORKFLOW_STATE_INDEX + " entry : " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}
}

/**
* Updates a document in the workflow state index
* @param indexName the index that we will be updating a document of.
* Deletes a document in the workflow state index
* @param documentId the document ID
* @param script the given script to update doc
* @param listener action listener
*/
public void updateFlowFrameworkSystemIndexDocWithScript(
String indexName,
String documentId,
Script script,
ActionListener<UpdateResponse> listener
) {
if (!doesIndexExist(indexName)) {
String errorMessage = "Failed to update document for given workflow due to missing " + indexName + " index";
public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener<DeleteResponse> listener) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = "Failed to delete document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
logger.error(errorMessage);
listener.onFailure(new Exception(errorMessage));
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest(indexName, documentId);
// TODO: Also add ability to change other fields at the same time when adding detailed provision progress
updateRequest.script(script);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.retryOnConflict(3);
// TODO: Implement our own concurrency control to improve on retry mechanism
client.update(updateRequest, ActionListener.runBefore(listener, context::restore));
DeleteRequest deleteRequest = new DeleteRequest(WORKFLOW_STATE_INDEX, documentId);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
String errorMessage = "Failed to update " + indexName + " entry : " + documentId;
String errorMessage = "Failed to delete " + WORKFLOW_STATE_INDEX + " entry : " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}
}

/**
* Creates a new ResourceCreated object and a script to update the state index
* @param workflowId workflowId for the relevant step
* @param nodeId current process node (workflow step) id
* @param workflowStepName the workflowstep name that created the resource
* @param resourceId the id of the newly created resource
* @param listener the ActionListener for this step to handle completing the future after update
* @throws IOException if parsing fails on new resource
*/
private void updateResourceInStateIndex(
String workflowId,
String nodeId,
String workflowStepName,
String resourceId,
ActionListener<UpdateResponse> listener
) throws IOException {
ResourceCreated newResource = new ResourceCreated(
workflowStepName,
nodeId,
getResourceByWorkflowStep(workflowStepName),
resourceId
);

// The script to append a new object to the resources_created array
Script script = new Script(
ScriptType.INLINE,
"painless",
"ctx._source.resources_created.add(params.newResource);",
Collections.singletonMap("newResource", newResource.resourceMap())
);

updateFlowFrameworkSystemIndexDocWithScript(WORKFLOW_STATE_INDEX, workflowId, script, ActionListener.wrap(updateResponse -> {
logger.info("updated resources created of {}", workflowId);
listener.onResponse(updateResponse);
}, listener::onFailure));
}

/**
* Adds a resource to the state index, including common exception handling
* @param currentNodeInputs Inputs to the current node
Expand All @@ -716,26 +681,93 @@ public void addResourceToStateIndex(
String resourceId,
ActionListener<WorkflowData> listener
) {
String workflowId = currentNodeInputs.getWorkflowId();
String resourceName = getResourceByWorkflowStep(workflowStepName);
try {
updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
nodeId,
workflowStepName,
resourceId,
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
listener.onResponse(new WorkflowData(Map.of(resourceName, resourceId), currentNodeInputs.getWorkflowId(), nodeId));
}, exception -> {
String errorMessage = "Failed to update new created " + nodeId + " resource " + workflowStepName + " id " + resourceId;
logger.error(errorMessage, exception);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
})
ResourceCreated newResource = new ResourceCreated(workflowStepName, nodeId, resourceName, resourceId);
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = "Failed to update state for " + workflowId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
getAndUpdateResourceInStateDocumentWithRetries(
workflowId,
newResource,
RETRIES,
ActionListener.runBefore(listener, context::restore)
);
}
}
}

/**
* Performs a get and update of a State Index document adding a new resource with strong consistency and retries
* @param workflowId The document id to update
* @param newResource The resource to add to the resources created list
* @param retries The number of retries on update version conflicts
* @param listener The listener to complete on success or failure
*/
private void getAndUpdateResourceInStateDocumentWithRetries(
String workflowId,
ResourceCreated newResource,
int retries,
ActionListener<WorkflowData> listener
) {
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, workflowId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (!getResponse.isExists()) {
listener.onFailure(new FlowFrameworkException("Workflow state not found for " + workflowId, RestStatus.NOT_FOUND));
return;
}
WorkflowState currentState = WorkflowState.parse(getResponse.getSourceAsString());
List<ResourceCreated> resourcesCreated = new ArrayList<>(currentState.resourcesCreated());
resourcesCreated.add(newResource);
XContentBuilder builder = XContentFactory.jsonBuilder();
WorkflowState newState = WorkflowState.builder(currentState).resourcesCreated(resourcesCreated).build();
newState.toXContent(builder, null);
UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, workflowId).doc(builder)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setIfSeqNo(getResponse.getSeqNo())
.setIfPrimaryTerm(getResponse.getPrimaryTerm());
client.update(
updateRequest,
ActionListener.wrap(
r -> handleStateUpdateSuccess(workflowId, newResource, listener),
e -> handleStateUpdateException(workflowId, newResource, retries, listener, e)
)
);
} catch (Exception e) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}, ex -> handleStateUpdateException(workflowId, newResource, 0, listener, ex)));
}

private void handleStateUpdateSuccess(String workflowId, ResourceCreated newResource, ActionListener<WorkflowData> listener) {
String resourceName = newResource.resourceType();
String resourceId = newResource.resourceId();
String nodeId = newResource.workflowStepId();
logger.info("Updated resources created for {} on step {} with {} {}", workflowId, nodeId, resourceName, resourceId);
listener.onResponse(new WorkflowData(Map.of(resourceName, resourceId), workflowId, nodeId));
}

private void handleStateUpdateException(
String workflowId,
ResourceCreated newResource,
int retries,
ActionListener<WorkflowData> listener,
Exception e
) {
if (e instanceof VersionConflictEngineException && retries > 0) {
// Retry if we haven't exhausted retries
getAndUpdateResourceInStateDocumentWithRetries(workflowId, newResource, retries - 1, listener);
return;
}
String errorMessage = "Failed to update workflow state for "
+ workflowId
+ " on step "
+ newResource.workflowStepId()
+ " with "
+ newResource.resourceType()
+ " "
+ newResource.resourceId();
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.plugins.PluginsService;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand All @@ -58,7 +57,6 @@
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.RESOURCES_CREATED_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
import static org.opensearch.flowframework.util.ParseUtils.resolveUserAndExecute;
Expand Down Expand Up @@ -210,24 +208,14 @@ private void executeReprovisionRequest(

// Remove error field if any prior to subsequent execution
if (response.getWorkflowState().getError() != null) {
Script script = new Script(
ScriptType.INLINE,
"painless",
"if(ctx._source.containsKey('error')){ctx._source.remove('error')}",
Collections.emptyMap()
);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDocWithScript(
WORKFLOW_STATE_INDEX,
workflowId,
script,
ActionListener.wrap(updateResponse -> {

}, exception -> {
String errorMessage = "Failed to update workflow state: " + workflowId;
logger.error(errorMessage, exception);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
})
);
WorkflowState newState = WorkflowState.builder(response.getWorkflowState()).error(null).build();
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(workflowId, newState, ActionListener.wrap(updateResponse -> {

}, exception -> {
String errorMessage = "Failed to update workflow state: " + workflowId;
logger.error(errorMessage, exception);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
}));
}

// Update State Index, maintain resources created for subsequent execution
Expand Down
Loading

0 comments on commit d0879d7

Please sign in to comment.