Skip to content

Commit

Permalink
ENH: Optimize observer pattern in ctkDICOMScheduler
Browse files Browse the repository at this point in the history
This commit addresses an efficiency issue in the implementation of the
Observer pattern within the ctkDICOMScheduler and its associated UI widgets.
The system had the potential to emit a large number of signals:
The ctkDICOMScheduler functions as an intermediary between the UI and the
underlying logic, tunneling all processing signals from the logic through itself.
The UI components monitor the ctkDICOMScheduler to react to these signals.
This leads to an O(N^2) complexity problem when dealing with many
patients/studies/series.

To mitigate this, several strategies have been implemented:

1. **Batching and Throttling**: Changes are now batched together and a
throttling mechanism has been introduced. This mechanism delays the
processing of changes, reducing the number of signals by waiting a
certain amount of time since the last signal before sending a new one.
This is particularly effective when changes often occur in bursts.

2. **Filtering**: A filtering mechanism has been added to the signals,
allowing only relevant changes to be signaled to each observer.
This is achieved by adding a parameter to the signals that specifies
the type of changes the signal represents.

3. **Hierarchical Observers**: The hierarchical relationship of the
observers has been leveraged to reduce the number of signals.
Now, each object observes its nearest ancestor that has changed,
rather than observing the ctkDICOMScheduler directly.
  • Loading branch information
Punzo committed May 24, 2024
1 parent afd1fcc commit b2d5bb4
Show file tree
Hide file tree
Showing 20 changed files with 881 additions and 538 deletions.
112 changes: 90 additions & 22 deletions Libs/Core/ctkJobScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,17 @@ ctkJobSchedulerPrivate::~ctkJobSchedulerPrivate() = default;
//---------------------------------------------------------------------------
void ctkJobSchedulerPrivate::init()
{
Q_Q(ctkJobScheduler);
QObject::connect(this, SIGNAL(queueJobsInThreadPool()),
this, SLOT(onQueueJobsInThreadPool()));

this->ThreadPool = QSharedPointer<QThreadPool>(new QThreadPool());
this->ThreadPool = QSharedPointer<QThreadPool>(new QThreadPool(this));
this->ThreadPool->setMaxThreadCount(20);
this->ThrottleTimer = QSharedPointer<QTimer>(new QTimer(this));
this->ThrottleTimer->setSingleShot(true);

QObject::connect(this->ThrottleTimer.data(), SIGNAL(timeout()),
q, SLOT(emitThrottledSignals()));
}

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -144,15 +150,19 @@ bool ctkJobSchedulerPrivate::insertJob(QSharedPointer<ctkAbstractJob> job)
QMetaObject::Connection failedConnection = QObject::connect(job.data(), &ctkAbstractJob::failed, q, [q, job](){
q->onJobFailed(job.data());
});
QMap<QString, QMetaObject::Connection> connections = {
QMetaObject::Connection progressConnection =
QObject::connect(job.data(), SIGNAL(progressJobDetail(QVariant)),
q, SLOT(onProgressJobDetail(QVariant)));

QMap<QString, QMetaObject::Connection> connections =
{
{"started", startedConnection},
{"userStopped", userStoppedConnection},
{"finished", finishedConnection},
{"attemptFailed", attemptFailedConnection},
{"failed", failedConnection}
{"failed", failedConnection},
{"progress", progressConnection},
};
QObject::connect(job.data(), SIGNAL(progressJobDetail(QVariant)),
q, SIGNAL(progressJobDetail(QVariant)));

{
QMutexLocker locker(&this->QueueMutex);
Expand All @@ -179,8 +189,6 @@ bool ctkJobSchedulerPrivate::insertJob(QSharedPointer<ctkAbstractJob> job)
//------------------------------------------------------------------------------
bool ctkJobSchedulerPrivate::removeJob(const QString& jobUID)
{
Q_Q(ctkJobScheduler);

logger.debug(QString("ctkJobScheduler: deleting job object %1 in thread %2.\n")
.arg(jobUID)
.arg(QString::number(reinterpret_cast<quint64>(QThread::currentThreadId()), 16)));
Expand All @@ -199,8 +207,7 @@ bool ctkJobSchedulerPrivate::removeJob(const QString& jobUID)
QObject::disconnect(connections.value("finished"));
QObject::disconnect(connections.value("attemptFailed"));
QObject::disconnect(connections.value("failed"));
QObject::disconnect(job.data(), SIGNAL(progressJobDetail(QVariant)),
q, SIGNAL(progressJobDetail(QVariant)));
QObject::disconnect(connections.value("progress"));

this->JobsConnections.remove(jobUID);
this->JobsQueue.remove(jobUID);
Expand Down Expand Up @@ -237,24 +244,19 @@ void ctkJobSchedulerPrivate::removeJobs(const QStringList &jobUIDs)
QObject::disconnect(connections.value("finished"));
QObject::disconnect(connections.value("attemptFailed"));
QObject::disconnect(connections.value("failed"));
QObject::disconnect(job.data(), SIGNAL(progressJobDetail(QVariant)), q, SIGNAL(progressJobDetail(QVariant)));
QObject::disconnect(connections.value("progress"));

this->JobsConnections.remove(jobUID);
this->JobsQueue.remove(jobUID);
}
}

foreach (QVariant data, datas)
{
emit q->jobUserStopped(data);
}
emit q->jobUserStopped(datas);
}

//------------------------------------------------------------------------------
void ctkJobSchedulerPrivate::removeAllJobs()
{
Q_Q(ctkJobScheduler);

{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
Expand All @@ -278,7 +280,7 @@ void ctkJobSchedulerPrivate::removeAllJobs()
QObject::disconnect(connections.value("finished"));
QObject::disconnect(connections.value("attemptFailed"));
QObject::disconnect(connections.value("failed"));
QObject::disconnect(job.data(), SIGNAL(progressJobDetail(QVariant)), q, SIGNAL(progressJobDetail(QVariant)));
QObject::disconnect(connections.value("progress"));

this->JobsConnections.remove(jobUID);
this->JobsQueue.remove(jobUID);
Expand Down Expand Up @@ -314,6 +316,17 @@ QString ctkJobSchedulerPrivate::generateUniqueJobUID()
return QUuid::createUuid().toString(QUuid::StringFormat::WithoutBraces);
}

//------------------------------------------------------------------------------
void ctkJobSchedulerPrivate::clearBactchedJobsLists()
{
this->BatchedJobsStarted.clear();
this->BatchedJobsUserStopped.clear();
this->BatchedJobsFinished.clear();
this->BatchedJobsAttemptFailed.clear();
this->BatchedJobsFailed.clear();
this->BatchedJobsProgress.clear();
}

//---------------------------------------------------------------------------
// ctkJobScheduler methods

Expand Down Expand Up @@ -592,6 +605,8 @@ void ctkJobScheduler::stopAllJobs(bool stopPersistentJobs)

worker->requestCancel();
}

d->clearBactchedJobsLists();
}

//----------------------------------------------------------------------------
Expand Down Expand Up @@ -722,18 +737,25 @@ QSharedPointer<QThreadPool> ctkJobScheduler::threadPoolShared() const
//----------------------------------------------------------------------------
void ctkJobScheduler::onJobStarted(ctkAbstractJob* job)
{
Q_D(ctkJobScheduler);
if (!job)
{
return;
}

logger.debug(job->loggerReport(tr("started")));
emit this->jobStarted(job->toVariant());

d->BatchedJobsStarted.append(job->toVariant());
if (!d->ThrottleTimer->isActive())
{
d->ThrottleTimer->start(d->ThrottleTimeInterval);
}
}

//----------------------------------------------------------------------------
void ctkJobScheduler::onJobUserStopped(ctkAbstractJob* job)
{
Q_D(ctkJobScheduler);
if (!job)
{
return;
Expand All @@ -746,12 +768,17 @@ void ctkJobScheduler::onJobUserStopped(ctkAbstractJob* job)
this->deleteWorker(jobUID);
this->deleteJob(jobUID);

emit this->jobUserStopped(data);
d->BatchedJobsUserStopped.append(job->toVariant());
if (!d->ThrottleTimer->isActive())
{
d->ThrottleTimer->start(d->ThrottleTimeInterval);
}
}

//----------------------------------------------------------------------------
void ctkJobScheduler::onJobFinished(ctkAbstractJob* job)
{
Q_D(ctkJobScheduler);
if (!job)
{
return;
Expand All @@ -764,12 +791,17 @@ void ctkJobScheduler::onJobFinished(ctkAbstractJob* job)
this->deleteWorker(jobUID);
this->deleteJob(jobUID);

emit this->jobFinished(data);
d->BatchedJobsFinished.append(job->toVariant());
if (!d->ThrottleTimer->isActive())
{
d->ThrottleTimer->start(d->ThrottleTimeInterval);
}
}

//----------------------------------------------------------------------------
void ctkJobScheduler::onJobAttemptFailed(ctkAbstractJob* job)
{
Q_D(ctkJobScheduler);
if (!job)
{
return;
Expand All @@ -782,12 +814,17 @@ void ctkJobScheduler::onJobAttemptFailed(ctkAbstractJob* job)
this->deleteWorker(jobUID);
this->deleteJob(jobUID);

emit this->jobAttemptFailed(data);
d->BatchedJobsAttemptFailed.append(job->toVariant());
if (!d->ThrottleTimer->isActive())
{
d->ThrottleTimer->start(d->ThrottleTimeInterval);
}
}

//----------------------------------------------------------------------------
void ctkJobScheduler::onJobFailed(ctkAbstractJob* job)
{
Q_D(ctkJobScheduler);
if (!job)
{
return;
Expand All @@ -800,5 +837,36 @@ void ctkJobScheduler::onJobFailed(ctkAbstractJob* job)
this->deleteWorker(jobUID);
this->deleteJob(jobUID);

emit this->jobFailed(data);
d->BatchedJobsFailed.append(job->toVariant());
if (!d->ThrottleTimer->isActive())
{
d->ThrottleTimer->start(d->ThrottleTimeInterval);
}
}

//----------------------------------------------------------------------------
void ctkJobScheduler::onProgressJobDetail(QVariant data)
{
Q_D(ctkJobScheduler);

d->BatchedJobsProgress.append(data);
if (!d->ThrottleTimer->isActive())
{
d->ThrottleTimer->start(d->ThrottleTimeInterval);
}
}

//----------------------------------------------------------------------------
void ctkJobScheduler::emitThrottledSignals()
{
Q_D(ctkJobScheduler);

emit this->jobStarted(d->BatchedJobsStarted);
emit this->jobUserStopped(d->BatchedJobsUserStopped);
emit this->jobFinished(d->BatchedJobsFinished);
emit this->jobAttemptFailed(d->BatchedJobsAttemptFailed);
emit this->jobFailed(d->BatchedJobsFailed);
emit this->progressJobDetail(d->BatchedJobsProgress);

d->clearBactchedJobsLists();
}
28 changes: 15 additions & 13 deletions Libs/Core/ctkJobScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,23 @@ class CTK_CORE_EXPORT ctkJobScheduler : public QObject
QSharedPointer<QThreadPool> threadPoolShared() const;

Q_SIGNALS:
void jobInitialized(QVariant data);
void jobQueued(QVariant data);
void jobStarted(QVariant data);
void jobUserStopped(QVariant data);
void jobFinished(QVariant data);
void jobAttemptFailed(QVariant data);
void jobFailed(QVariant data);
void progressJobDetail(QVariant data);
void jobInitialized(QVariant);
void jobQueued(QVariant);
void jobStarted(QList<QVariant>);
void jobUserStopped(QList<QVariant>);
void jobFinished(QList<QVariant>);
void jobAttemptFailed(QList<QVariant>);
void jobFailed(QList<QVariant>);
void progressJobDetail(QList<QVariant>);

public Q_SLOTS:
virtual void onJobStarted(ctkAbstractJob* job);
virtual void onJobUserStopped(ctkAbstractJob* job);
virtual void onJobFinished(ctkAbstractJob* job);
virtual void onJobAttemptFailed(ctkAbstractJob* job);
virtual void onJobFailed(ctkAbstractJob* job);
virtual void onJobStarted(ctkAbstractJob*);
virtual void onJobUserStopped(ctkAbstractJob*);
virtual void onJobFinished(ctkAbstractJob*);
virtual void onJobAttemptFailed(ctkAbstractJob*);
virtual void onJobFailed(ctkAbstractJob*);
virtual void onProgressJobDetail(QVariant);
virtual void emitThrottledSignals();

protected:
QScopedPointer<ctkJobSchedulerPrivate> d_ptr;
Expand Down
10 changes: 10 additions & 0 deletions Libs/Core/ctkJobScheduler_p.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
// Qt includes
#include <QMutex>
#include <QSharedPointer>
#include <QTimer>
class QThreadPool;

// ctkCore includes
Expand Down Expand Up @@ -62,6 +63,7 @@ public Q_SLOTS:
virtual void removeAllJobs();
int getSameTypeJobsInThreadPoolQueueOrRunning(QSharedPointer<ctkAbstractJob> job);
QString generateUniqueJobUID();
void clearBactchedJobsLists();

QMutex QueueMutex;

Expand All @@ -73,6 +75,14 @@ public Q_SLOTS:
QMap<QString, QSharedPointer<ctkAbstractJob>> JobsQueue;
QMap<QString, QMap<QString, QMetaObject::Connection>> JobsConnections;
QMap<QString, QSharedPointer<ctkAbstractWorker>> Workers;
QList<QVariant> BatchedJobsStarted;
QList<QVariant> BatchedJobsUserStopped;
QList<QVariant> BatchedJobsFinished;
QList<QVariant> BatchedJobsAttemptFailed;
QList<QVariant> BatchedJobsFailed;
QList<QVariant> BatchedJobsProgress;
QSharedPointer<QTimer> ThrottleTimer;
int ThrottleTimeInterval{300};
};

#endif
1 change: 1 addition & 0 deletions Libs/DICOM/Core/ctkDICOMJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class CTK_DICOM_CORE_EXPORT ctkDICOMJob : public ctkAbstractJob
{
Q_OBJECT
Q_ENUMS(DICOMLevel)
Q_PROPERTY(QString patientID READ patientID WRITE setPatientID);
Q_PROPERTY(QString studyInstanceUID READ studyInstanceUID WRITE setStudyInstanceUID);
Q_PROPERTY(QString seriesInstanceUID READ seriesInstanceUID WRITE setSeriesInstanceUID);
Q_PROPERTY(QString sopInstanceUID READ sopInstanceUID WRITE setSOPInstanceUID);
Expand Down
1 change: 1 addition & 0 deletions Libs/DICOM/Core/ctkDICOMRetrieveWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ void ctkDICOMRetrieveWorker::run()
newJob->setRetryCounter(0);
newJob->setServer(*proxyServer);
scheduler->addJob(newJob);
retrieveJob->setReferenceInserterJobUID("Proxy");
}
else if (d->Retrieve->jobResponseSetsShared().count() > 0 &&
server->retrieveProtocol() == ctkDICOMServer::RetrieveProtocol::CGET)
Expand Down
3 changes: 3 additions & 0 deletions Libs/DICOM/Core/ctkDICOMStorageListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ OFCondition ctkDICOMStorageListenerSCUPrivate::handleIncomingCommand(T_DIMSE_Mes
reqDataset->findAndGetOFString(DCM_SeriesInstanceUID, seriesUID);
OFString studyUID;
reqDataset->findAndGetOFString(DCM_StudyInstanceUID, studyUID);
OFString patientID;
reqDataset->findAndGetOFString(DCM_PatientID, patientID);
emit this->listener->progress(
ctkDICOMStorageListener::tr("Got STORE request for %1").arg(instanceUID.c_str()));
emit this->listener->progress(0);
Expand All @@ -131,6 +133,7 @@ OFCondition ctkDICOMStorageListenerSCUPrivate::handleIncomingCommand(T_DIMSE_Mes
QSharedPointer<ctkDICOMJobResponseSet> jobResponseSet =
QSharedPointer<ctkDICOMJobResponseSet>(new ctkDICOMJobResponseSet);
jobResponseSet->setJobType(ctkDICOMJobResponseSet::JobType::StoreSOPInstance);
jobResponseSet->setPatientID(patientID.c_str());
jobResponseSet->setStudyInstanceUID(studyUID.c_str());
jobResponseSet->setSeriesInstanceUID(seriesUID.c_str());
jobResponseSet->setSOPInstanceUID(instanceUID.c_str());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ int ctkDICOMPatientItemWidgetTest1(int argc, char* argv[])
CHECK_QSTRING(widget.filteringStudyDescription(), "");
CHECK_QSTRING(widget.filteringSeriesDescription(), "");
CHECK_INT(widget.filteringDate(), ctkDICOMPatientItemWidget::DateType::Any);
CHECK_INT(widget.numberOfStudiesPerPatient(), 2);
CHECK_INT(widget.numberOfOpenedStudiesPerPatient(), 2);
CHECK_INT(widget.thumbnailSize(), ctkDICOMStudyItemWidget::ThumbnailSizeOption::Medium);

// Test setting and getting
Expand All @@ -66,8 +66,8 @@ int ctkDICOMPatientItemWidgetTest1(int argc, char* argv[])
CHECK_QSTRING(widget.filteringSeriesDescription(), "series");
widget.setFilteringDate(ctkDICOMPatientItemWidget::DateType::LastYear);
CHECK_INT(widget.filteringDate(), ctkDICOMPatientItemWidget::DateType::LastYear);
widget.setNumberOfStudiesPerPatient(6);
CHECK_INT(widget.numberOfStudiesPerPatient(), 6);
widget.setNumberOfOpenedStudiesPerPatient(6);
CHECK_INT(widget.numberOfOpenedStudiesPerPatient(), 6);
widget.setThumbnailSize(ctkDICOMStudyItemWidget::ThumbnailSizeOption::Small);
CHECK_INT(widget.thumbnailSize(), ctkDICOMStudyItemWidget::ThumbnailSizeOption::Small);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ int ctkDICOMVisualBrowserWidgetTest1(int argc, char* argv[])
CHECK_QSTRING(browser.filteringSeriesDescription(), "");
CHECK_QSTRING(browser.filteringModalities().at(0), "Any");
CHECK_INT(browser.filteringDate(), ctkDICOMPatientItemWidget::DateType::Any);
CHECK_INT(browser.numberOfStudiesPerPatient(), 2);
CHECK_INT(browser.numberOfOpenedStudiesPerPatient(), 2);
CHECK_INT(browser.thumbnailSize(), ctkDICOMStudyItemWidget::ThumbnailSizeOption::Medium);
CHECK_BOOL(browser.isSendActionVisible(), false);
CHECK_BOOL(browser.isDeleteActionVisible(), true);
Expand Down Expand Up @@ -145,8 +145,8 @@ int ctkDICOMVisualBrowserWidgetTest1(int argc, char* argv[])
CHECK_QSTRING(browser.filteringModalities().at(0), "CT");
browser.setFilteringDate(ctkDICOMPatientItemWidget::DateType::LastYear);
CHECK_INT(browser.filteringDate(), ctkDICOMPatientItemWidget::DateType::LastYear);
browser.setNumberOfStudiesPerPatient(6);
CHECK_INT(browser.numberOfStudiesPerPatient(), 6);
browser.setNumberOfOpenedStudiesPerPatient(6);
CHECK_INT(browser.numberOfOpenedStudiesPerPatient(), 6);
browser.setThumbnailSize(ctkDICOMStudyItemWidget::ThumbnailSizeOption::Small);
CHECK_INT(browser.thumbnailSize(), ctkDICOMStudyItemWidget::ThumbnailSizeOption::Small);
browser.setSendActionVisible(true);
Expand Down
Loading

0 comments on commit b2d5bb4

Please sign in to comment.