From c542b1ff040d4aceef07b5811c7e09c111f03fe8 Mon Sep 17 00:00:00 2001 From: Filip Hrisafov Date: Tue, 14 May 2024 22:30:05 +0200 Subject: [PATCH] Add support for bulk updates --- .../engine/impl/db/BulkUpdateOperation.java | 56 +++++++++++++++++ .../common/engine/impl/db/DbSqlSession.java | 61 ++++++++++++++----- .../test/profiler/ProfilingDbSqlSession.java | 17 ++++-- 3 files changed, 113 insertions(+), 21 deletions(-) create mode 100644 modules/flowable-engine-common/src/main/java/org/flowable/common/engine/impl/db/BulkUpdateOperation.java diff --git a/modules/flowable-engine-common/src/main/java/org/flowable/common/engine/impl/db/BulkUpdateOperation.java b/modules/flowable-engine-common/src/main/java/org/flowable/common/engine/impl/db/BulkUpdateOperation.java new file mode 100644 index 00000000000..146d43d31dd --- /dev/null +++ b/modules/flowable-engine-common/src/main/java/org/flowable/common/engine/impl/db/BulkUpdateOperation.java @@ -0,0 +1,56 @@ +/* Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.flowable.common.engine.impl.db; + +import org.apache.ibatis.session.SqlSession; + +/** + * Use this to execute a dedicated update statement. It is important to note there won't be any optimistic locking checks done for these kind of update operations! + * + * @author Filip Hrisafov + */ +public class BulkUpdateOperation { + + protected String statement; + protected Object parameter; + + public BulkUpdateOperation(String statement, Object parameter) { + this.statement = statement; + this.parameter = parameter; + } + + public void execute(SqlSession sqlSession) { + sqlSession.update(statement, parameter); + } + + public String getStatement() { + return statement; + } + + public void setStatement(String statement) { + this.statement = statement; + } + + public Object getParameter() { + return parameter; + } + + public void setParameter(Object parameter) { + this.parameter = parameter; + } + + @Override + public String toString() { + return "bulk update: " + statement + "(" + parameter + ")"; + } +} diff --git a/modules/flowable-engine-common/src/main/java/org/flowable/common/engine/impl/db/DbSqlSession.java b/modules/flowable-engine-common/src/main/java/org/flowable/common/engine/impl/db/DbSqlSession.java index a1b375dda37..043c0dc3747 100755 --- a/modules/flowable-engine-common/src/main/java/org/flowable/common/engine/impl/db/DbSqlSession.java +++ b/modules/flowable-engine-common/src/main/java/org/flowable/common/engine/impl/db/DbSqlSession.java @@ -58,6 +58,7 @@ public class DbSqlSession implements Session { protected Map, Map> deletedObjects = new HashMap<>(); protected Map, List> bulkDeleteOperations = new HashMap<>(); protected List updatedObjects = new ArrayList<>(); + protected List bulkUpdateOperations = new ArrayList<>(); public DbSqlSession(DbSqlSessionFactory dbSqlSessionFactory, EntityCache entityCache) { this.dbSqlSessionFactory = dbSqlSessionFactory; @@ -107,6 +108,13 @@ public void update(Entity entity) { entity.setUpdated(true); } + /** + * Executes a {@link BulkUpdateOperation}, with the sql in the statement parameter. + */ + public void update(String statement, Object parameter) { + bulkUpdateOperations.add(new BulkUpdateOperation(statement, parameter)); + } + public int directUpdate(String statement, Object parameters) { String updateStatement = dbSqlSessionFactory.mapStatement(statement); return getSqlSession().update(updateStatement, parameters); @@ -567,28 +575,49 @@ protected void incrementRevision(Entity insertedObject) { } protected void flushUpdates() { - for (Entity updatedObject : updatedObjects) { - String updateStatement = dbSqlSessionFactory.getUpdateStatement(updatedObject); - updateStatement = dbSqlSessionFactory.mapStatement(updateStatement); + if (updatedObjects.isEmpty() && bulkUpdateOperations.isEmpty()) { + return; + } - if (updateStatement == null) { - throw new FlowableException("no update statement for " + updatedObject.getClass() + " in the ibatis mapping files"); - } + // Unlike bulk deletes, bulk updates are executed before the regular updates. + // The reason for that, is due to the fact that regular updates might change something that would lead to an invalid bulk update. + + if (!bulkUpdateOperations.isEmpty()) { + bulkUpdateOperations.forEach(this::flushBulkUpdate); + } - LOGGER.debug("updating: {}", updatedObject); + if (!updatedObjects.isEmpty()) { + updatedObjects.forEach(this::flushUpdateEntity); + } - int updatedRecords = sqlSession.update(updateStatement, updatedObject); - if (updatedRecords == 0) { - throw new FlowableOptimisticLockingException(updatedObject + " was updated by another transaction concurrently"); - } + updatedObjects.clear(); + bulkUpdateOperations.clear(); + } - // See https://activiti.atlassian.net/browse/ACT-1290 - if (updatedObject instanceof HasRevision) { - ((HasRevision) updatedObject).setRevision(((HasRevision) updatedObject).getRevisionNext()); - } + protected void flushUpdateEntity(Entity updatedObject) { + String updateStatement = dbSqlSessionFactory.getUpdateStatement(updatedObject); + updateStatement = dbSqlSessionFactory.mapStatement(updateStatement); + if (updateStatement == null) { + throw new FlowableException("no update statement for " + updatedObject.getClass() + " in the ibatis mapping files"); } - updatedObjects.clear(); + + LOGGER.debug("updating: {}", updatedObject); + + int updatedRecords = sqlSession.update(updateStatement, updatedObject); + if (updatedRecords == 0) { + throw new FlowableOptimisticLockingException(updatedObject + " was updated by another transaction concurrently"); + } + + // See https://activiti.atlassian.net/browse/ACT-1290 + if (updatedObject instanceof HasRevision) { + ((HasRevision) updatedObject).setRevision(((HasRevision) updatedObject).getRevisionNext()); + } + } + + protected void flushBulkUpdate(BulkUpdateOperation bulkUpdateOperation) { + // Bulk update + bulkUpdateOperation.execute(sqlSession); } protected void flushDeletes() { diff --git a/modules/flowable-engine/src/main/java/org/flowable/engine/test/profiler/ProfilingDbSqlSession.java b/modules/flowable-engine/src/main/java/org/flowable/engine/test/profiler/ProfilingDbSqlSession.java index c55b660cd95..eee25eda236 100644 --- a/modules/flowable-engine/src/main/java/org/flowable/engine/test/profiler/ProfilingDbSqlSession.java +++ b/modules/flowable-engine/src/main/java/org/flowable/engine/test/profiler/ProfilingDbSqlSession.java @@ -17,6 +17,7 @@ import java.util.List; import org.flowable.common.engine.impl.db.BulkDeleteOperation; +import org.flowable.common.engine.impl.db.BulkUpdateOperation; import org.flowable.common.engine.impl.db.DbSqlSession; import org.flowable.common.engine.impl.db.DbSqlSessionFactory; import org.flowable.common.engine.impl.persistence.cache.EntityCache; @@ -119,14 +120,20 @@ protected void flushBulkInsert(Collection entities, Class