Skip to content

Commit

Permalink
- Use multithreading for clean phase using --thread parameter (#108)
Browse files Browse the repository at this point in the history
- Refactored AbstractSynchronize (removed inner classes)
- Fixed ident
  • Loading branch information
soisik committed Oct 23, 2023
1 parent d8befc1 commit eacf33d
Show file tree
Hide file tree
Showing 8 changed files with 638 additions and 574 deletions.
616 changes: 101 additions & 515 deletions src/main/java/org/lsc/AbstractSynchronize.java

Large diffs are not rendered by default.

78 changes: 37 additions & 41 deletions src/main/java/org/lsc/SimpleSynchronize.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@

import org.apache.commons.lang3.ArrayUtils;
import org.lsc.beans.IBean;
import org.lsc.beans.InfoCounter;
import org.lsc.configuration.LscConfiguration;
import org.lsc.configuration.TaskType;
import org.lsc.exception.LscConfigurationException;
import org.lsc.jmx.LscServerImpl;
import org.lsc.runnable.SynchronizeEntryRunner;
import org.lsc.service.IAsynchronousService;
import org.lsc.utils.LSCStructuralLogger;
import org.slf4j.Logger;
Expand Down Expand Up @@ -98,14 +100,14 @@ public SimpleSynchronize() {
setThreads(5);
cache = new TreeMap<String, Task>();
}

public void init() throws LscConfigurationException {
Collection<TaskType> tasks = LscConfiguration.getTasks();
for(TaskType t: tasks) {
cache.put(t.getName(), new Task(t));
}
}

private void close() {
for (Task task: cache.values()) {
if (task.getSourceService() instanceof Closeable) {
Expand All @@ -128,17 +130,14 @@ private void close() {
/**
* Main method Check properties, and for each task, launch the
* synchronization and the cleaning phases.
* @param asyncTasks
* string list of the asynchronous synchronization tasks to launch
* @param asyncTasks string list of the asynchronous synchronization tasks to launch
* @param syncTasks string list of the synchronization tasks to launch
* @param cleanTasks string list of the cleaning tasks to launch
*
* @return the launch status - true if all tasks executed successfully,
* false if no tasks were executed or any failed
* @return the launch status - true if all tasks executed successfully, false if no tasks were executed or any failed
* @throws Exception
*/
public final boolean launch(final List<String> asyncTasks, final List<String> syncTasks,
final List<String> cleanTasks) throws Exception {
final List<String> cleanTasks) throws Exception {
boolean foundATask = false;
boolean canClose = true;
boolean launchResult = true;
Expand Down Expand Up @@ -215,48 +214,45 @@ private Collection<Task> userOrderedTask(List<String> userValues, boolean all) {
/**
* Launch a task. Call this for once each task type and task mode.
*
* @param taskName
* the task name (historically the LDAP object class name, but can be any string)
* @param taskMode
* the task mode (clean or sync)
*
* @param taskName the task name (historically the LDAP object class name, but can be any string)
* @param taskMode the task mode (clean or sync)
* @return boolean true on success, false if an error occurred
* @throws Exception
*/
private boolean launchTask(final Task task, final Task.Mode taskMode) throws Exception {
boolean status = true;
addScriptingContext(task);
boolean status = true;

addScriptingContext(task);

try {
LSCStructuralLogger.DESTINATION.info("Starting {} for {}", taskMode.name(), task.getName());
// Do the work!
switch (taskMode) {
case clean:
status = clean2Ldap(task);
break;
case sync:
status = synchronize2Ldap(task);
break;
case async:
if(task.getSourceService() instanceof IAsynchronousService
|| task.getDestinationService() instanceof IAsynchronousService) {
startAsynchronousSynchronize2Ldap(task);
} else {
LOGGER.error("Requested asynchronous source service does not implement IAsynchronousService ! (" + task.getSourceService().getClass().getName() + ")");
}
break;
default:
//Should not happen
LOGGER.error("Unknown task mode type {}", taskMode.toString());
throw new InvalidParameterException("Unknown task mode type " + taskMode.toString());
case clean:
status = clean2Ldap(task);
break;
case sync:
status = synchronize2Ldap(task);
break;
case async:
if(task.getSourceService() instanceof IAsynchronousService
|| task.getDestinationService() instanceof IAsynchronousService) {
startAsynchronousSynchronize2Ldap(task);
} else {
LOGGER.error("Requested asynchronous source service does not implement IAsynchronousService ! (" + task.getSourceService().getClass().getName() + ")");
}
break;
default:
//Should not happen
LOGGER.error("Unknown task mode type {}", taskMode.toString());
throw new InvalidParameterException("Unknown task mode type " + taskMode.toString());
}

// Manage exceptions
} catch (Exception e) {
Class<?>[] exceptionsCaught = {InstantiationException.class, IllegalAccessException.class,
ClassNotFoundException.class, SecurityException.class, NoSuchMethodException.class,
IllegalArgumentException.class, InvocationTargetException.class};
ClassNotFoundException.class, SecurityException.class, NoSuchMethodException.class,
IllegalArgumentException.class, InvocationTargetException.class};

if (ArrayUtils.contains(exceptionsCaught, e.getClass())) {
String errorDetail;
Expand All @@ -275,7 +271,7 @@ private boolean launchTask(final Task task, final Task.Mode taskMode) throws Exc

return status;
}

private void addScriptingContext(Task task) {
task.addScriptingVar("nocreate", nocreate);
task.addScriptingVar("noupdate", noupdate);
Expand Down Expand Up @@ -341,7 +337,7 @@ private void runPostHook(String taskName, String servicePostHook, TaskType taskT
public Set<Entry<String, Task>> getTasksName() {
return cache.entrySet();
}

public boolean isAsynchronousTask(String taskName) {
return cache.get(taskName).getSourceService() instanceof IAsynchronousService;
}
Expand All @@ -366,14 +362,14 @@ public final boolean launchById(String taskName, Map<String, LscDatasets> entrie
Task task = cache.get(taskName);
InfoCounter counter = new InfoCounter();
for(Entry<String, LscDatasets> entry : entries.entrySet()) {
new SynchronizeTask(task, counter, this, entry, true).run();
new SynchronizeEntryRunner(task, counter, this, entry, true).run();
}
return counter.getCountError() == 0;
}

public final boolean launch(String taskName, IBean bean) {
Task task = cache.get(taskName);
InfoCounter counter = new InfoCounter();
return new SynchronizeTask(task, counter, this, null, true).run(bean);
return new SynchronizeEntryRunner(task, counter, this, null, true).run(bean);
}
}
26 changes: 8 additions & 18 deletions src/main/java/org/lsc/SynchronizeThreadPoolExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.lsc.runnable.AbstractEntryRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -23,10 +24,9 @@ public class SynchronizeThreadPoolExecutor extends ThreadPoolExecutor {
BlockingQueue<Runnable> queue;

/** Default logger */
final Logger LOGGER = LoggerFactory
.getLogger(SynchronizeThreadPoolExecutor.class);
final Logger LOGGER = LoggerFactory.getLogger(SynchronizeThreadPoolExecutor.class);

protected SynchronizeThreadPoolExecutor(int threads) {
public SynchronizeThreadPoolExecutor(int threads) {
super(threads, threads, keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueCapacity), new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// this will block if the queue is full
Expand All @@ -48,19 +48,9 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
* the pool consume it as soon as it can, in a FIFO way without any priority
* @param task the runnable object
*/
protected void runTask(SynchronizeTask task) {
// if (LOGGER.isDebugEnabled()) {
// LOGGER.debug("Task count.." + getTaskCount());
// LOGGER.debug("Queue Size before assigning the task.."
// + queue.size());
// }
execute(task);
public void runTask(AbstractEntryRunner task) {
this.beforeExecute(new Thread(task.getSyncName() + "-" + task.getId().getKey()), task);
// if (LOGGER.isDebugEnabled()) {
// LOGGER.debug("Queue Size after assigning the task: {}", queue.size());
// LOGGER.debug("Pool Size after assigning the task: {}", getActiveCount());
// LOGGER.debug("Task count: {}", getTaskCount());
// }
execute(task);
}

/**
Expand All @@ -71,10 +61,10 @@ protected void runTask(SynchronizeTask task) {
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
if(r instanceof SynchronizeTask) {
SynchronizeTask task = (SynchronizeTask) r;
if(r instanceof AbstractEntryRunner) {
AbstractEntryRunner task = (AbstractEntryRunner) r;
t.setName(task.getSyncName() + "-" + t.getId());
}
super.beforeExecute(t, r);
}
}
72 changes: 72 additions & 0 deletions src/main/java/org/lsc/beans/InfoCounter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.lsc.beans;

/**
* This object is storing counters across all tasks Update methods are specified
* as synchronized to avoid loosing counts of operations
*
* @author Sebastien Bahloul &lt;[email protected]&gt;
*/
public class InfoCounter {

private int countAll = 0;
private int countError = 0;
private int countModifiable = 0;
private int countCompleted = 0;

public synchronized void incrementCountAll() {
countAll++;
}

public synchronized void incrementCountError() {
countError++;
}

public synchronized void incrementCountModifiable() {
countModifiable++;
}

public synchronized void incrementCountCompleted() {
countCompleted++;
}

/**
* Return the count of all objects concerned by synchronization It does not
* include objects in data source that are not selected by requests or
* filters, but it includes any of the objects retrieved from the data
* source
*
* @return the count of all objects taken from the data source
*/
public synchronized int getCountAll() {
return countAll;
}

/**
* Return the count of all objects that have encountered an error while
* synchronizing, either for a technical or for a functional reason
*
* @return the number of objects in error
*/
public synchronized int getCountError() {
return countError;
}

/**
* Return the count of all objects that should be modify
*
* @return the count of all updates to do
*/
public synchronized int getCountModifiable() {
return countModifiable;
}

/**
* Return the count of all objects that have been embraced in a data
* modification successfully
*
* @return the count of all successful updates
*/
public synchronized int getCountCompleted() {
return countCompleted;
}
}
43 changes: 43 additions & 0 deletions src/main/java/org/lsc/runnable/AbstractEntryRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.lsc.runnable;

import java.util.Map.Entry;

import org.lsc.AbstractSynchronize;
import org.lsc.LscDatasets;
import org.lsc.Task;
import org.lsc.beans.InfoCounter;

public abstract class AbstractEntryRunner implements Runnable {

protected String syncName;
protected Entry<String, LscDatasets> id;
protected InfoCounter counter;
protected AbstractSynchronize abstractSynchronize;
protected Task task;

protected AbstractEntryRunner(final Task task, InfoCounter counter,
AbstractSynchronize abstractSynchronize,
Entry<String, LscDatasets> id) {
this.syncName = task.getName();
this.counter = counter;
this.task = task;
this.abstractSynchronize = abstractSynchronize;
this.id = id;
}

public String getSyncName() {
return syncName;
}

public InfoCounter getCounter() {
return counter;
}

public AbstractSynchronize getAbstractSynchronize() {
return abstractSynchronize;
}

public Entry<String, LscDatasets> getId() {
return id;
}
}
Loading

0 comments on commit eacf33d

Please sign in to comment.