Skip to content

Commit

Permalink
HPCC-32885 Add POC unittests for new eclccserver jobs
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Jan 13, 2025
1 parent 4a8ecb4 commit a542e0b
Showing 1 changed file with 155 additions and 10 deletions.
165 changes: 155 additions & 10 deletions testing/unittests/dalitests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3284,12 +3284,16 @@ class DaliJobQueueTester : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(DaliJobQueueTester);
CPPUNIT_TEST(testInit);
CPPUNIT_TEST(testCppServer);
CPPUNIT_TEST(testSingle);
CPPUNIT_TEST(testDouble);
CPPUNIT_TEST(testMany);
CPPUNIT_TEST(testCleanup);
CPPUNIT_TEST_SUITE_END();

static constexpr const char * mainQueueName = "DaliTestJobQueue";
static constexpr const char * childQueueName = "DaliTestCppQueue";

struct JobEntry
{
unsigned delayMs;
Expand Down Expand Up @@ -3352,7 +3356,7 @@ class DaliJobQueueTester : public CppUnit::TestFixture

virtual void processAll() = 0;

protected:
public:
Semaphore & startedSem;
Semaphore & processedSem;
Linked<IJobQueue> queue;
Expand Down Expand Up @@ -3448,8 +3452,6 @@ class DaliJobQueueTester : public CppUnit::TestFixture
for (;;)
{
Owned<IJobQueueItem> item = queue->dequeuePriority(priority);
if (!item)
item.setown(queue->dequeue(0, INFINITE, 0));
bool ret = processItem(item);
if (!ret)
break;
Expand All @@ -3458,12 +3460,82 @@ class DaliJobQueueTester : public CppUnit::TestFixture
}
};

class CppJobProcessor : public JobProcessor
{
public:
CppJobProcessor(Semaphore & _startedSem, Semaphore & _processedSem, IJobQueue * _queue, unsigned _id)
: JobProcessor(_startedSem, _processedSem, _queue, _id)
{
}

virtual void processAll() override
{
for (;;)
{
__uint64 priority = getTimeStampNowValue();
unsigned cppTimeout = 100 * tickScaling;
Owned<IJobQueueItem> item = queue->dequeuePriority(priority, cppTimeout);
if (!item)
{
output.append("*");
log.append("*");
break;
}
bool ret = processItem(item);
if (!ret)
{
queue->setActiveQueue(mainQueueName);
queue->enqueue(item.getClear());
break;
}
}
}
};

class CppServerJobProcessor : public JobProcessor
{
public:
CppServerJobProcessor(Semaphore & _startedSem, Semaphore & _processedSem, IJobQueue * _queue, unsigned _id)
: JobProcessor(_startedSem, _processedSem, _queue, _id)
{
}

virtual void processAll() override
{
Owned<IJobQueue> childQueue = createJobQueue(childQueueName);
__uint64 priority = 0;
for (;;)
{
Owned<IJobQueueItem> item = queue->dequeue();
assertex(item);
const char * name = item->queryWUID();
if (name[0] == '!')
break;

childQueue->enqueue(item.getClear());

StringBuffer cppQueues;
cppQueues.append(mainQueueName).append(",").append(childQueueName);

Owned<IJobQueue> localQueues = createJobQueue(cppQueues);
Semaphore dummySem;
Owned<JobProcessor> child = new CppJobProcessor(dummySem, processedSem, localQueues, id);

child->start(true);
child->join();
output.append(child->output);
log.append(child->log);
}
}
};

enum JobProcessorType
{
StandardProcessor,
ThorProcessor,
NewThorProcessor,
PriorityProcessor,
CppServerProcessor,
};

void testInit()
Expand All @@ -3480,7 +3552,7 @@ class DaliJobQueueTester : public CppUnit::TestFixture
{
try
{
Owned<IJobQueue> queue = createJobQueue("DaliJobQueueTester");
Owned<IJobQueue> queue = createJobQueue(mainQueueName);
queue->connect(true);
queue->clear();

Expand All @@ -3492,7 +3564,7 @@ class DaliJobQueueTester : public CppUnit::TestFixture
{
JobProcessor * cur = nullptr;
//All listening threads must have a unique queue objects
Owned<IJobQueue> localQueue = createJobQueue("DaliJobQueueTester");
Owned<IJobQueue> localQueue = createJobQueue(mainQueueName);

switch (processor)
{
Expand All @@ -3508,6 +3580,9 @@ class DaliJobQueueTester : public CppUnit::TestFixture
case PriorityProcessor:
cur = new PriorityJobProcessor(startedSem, processedSem, localQueue, jobProcessors.ordinality());
break;
case CppServerProcessor:
cur = new CppServerJobProcessor(startedSem, processedSem, localQueue, jobProcessors.ordinality());
break;
default:
UNIMPLEMENTED;
}
Expand All @@ -3525,11 +3600,14 @@ class DaliJobQueueTester : public CppUnit::TestFixture
jobQueueSleep(job.delayMs);
if (traceJobQueue)
DBGLOG("Add (%s, %d, %d) @%u", job.name, job.delayMs, job.processingMs, getJobQueueTick());
Owned<IJobQueueItem> item = createJobQueueItem(job.name);
item->setPort(job.processingMs);
item->setPriority(job.priority);
if (job.name)
{
Owned<IJobQueueItem> item = createJobQueueItem(job.name);
item->setPort(job.processingMs);
item->setPriority(job.priority);

queue->enqueue(item.getClear());
queue->enqueue(item.getClear());
}
}

for (;;)
Expand Down Expand Up @@ -3569,9 +3647,13 @@ class DaliJobQueueTester : public CppUnit::TestFixture
{
JobProcessor & cur = jobProcessors.item(i3);
DBGLOG(" Result: '%s' '%s'", cur.queryOutput(), cur.queryLog());
}

ForEachItemIn(i4, jobProcessors)
{
//If expected results are provided, check that the result matches one of them (it is undefined which
//processor will match which result)
JobProcessor & cur = jobProcessors.item(i4);
if (expectedResults.size())
{
bool matched = false;
Expand All @@ -3587,7 +3669,7 @@ class DaliJobQueueTester : public CppUnit::TestFixture
}
if (!matched)
{
DBGLOG("Test %s: No match for output %u: %s", name, i3, expectedText.str()+2);
DBGLOG("Test %s: No match for output %u: %s", name, i4, expectedText.str()+2);
CPPUNIT_ASSERT_MESSAGE("Result does not match any of the expected results", false);
}
}
Expand All @@ -3609,6 +3691,14 @@ class DaliJobQueueTester : public CppUnit::TestFixture
{ 100, "d", 90, 0 },
};

static constexpr std::initializer_list<JobEntry> dripSingleTest = {
{ 0, "a", 10, 0 },
{ 200, "b", 10, 0 },
{ 200, "c", 10, 0 },
{ 200, "d", 10, 0 },
{ 20, nullptr, 0, 0 }, // Ensure d has completed before termination is sent
};

static constexpr std::initializer_list<JobEntry> twoWuTest = {
{ 0, "a", 90, 0 },
{ 50, "A", 90, 0},
Expand Down Expand Up @@ -3679,6 +3769,61 @@ class DaliJobQueueTester : public CppUnit::TestFixture
{ 50, "o", 60, 0},
};

static constexpr std::initializer_list<JobEntry> Cpp1Test = {
{ 0, "a", 50, 0 },
{ 60, "b", 50, 0},
{ 60, "c", 50, 0},
{ 60, "d", 50, 0},
{ 60, "e", 50, 0},
{ 10, "f", 80, 0},
{ 50, "g", 50, 0},
{ 40, "h", 50, 0},
{ 60, "i", 50, 0},
{ 60, "j", 50, 0},
{ 200, nullptr, 0, 0 },
};

static constexpr std::initializer_list<JobEntry> Cpp2Test = {
{ 0, "a", 50, 0 },
{ 60, "b", 50, 0},
{ 10, "c", 50, 0},
{ 10, "d", 50, 0},
{ 10, "e", 50, 0},
{ 10, "f", 50, 0},
{ 70, "g", 50, 0},
{ 20, "h", 50, 0},
{ 60, "i", 50, 0},
{ 200, nullptr, 0, 0 },
};

static constexpr std::initializer_list<JobEntry> Cpp3Test = {
{ 0, "a", 50, 0 },
{ 10, "b", 50, 0},
{ 40, "c", 50, 0},
{ 20, "d", 50, 0},
{ 70, "e", 50, 0},
{ 60, "f", 50, 0},
{ 60, "g", 50, 0},
{ 10, "h", 50, 0},
{ 10, "i", 50, 0},
{ 10, "j", 50, 0},
{ 100, "k", 50, 0},
{ 200, nullptr, 0, 0 },
};


//MODEL The way that cpp server should work - an agent that listens to the main queue, and jobs that listen to the main queue and a child queue
//Timings are very temperamental - and if there are a choice of threads to restart processing then it isn't well defined which one will pick it up
void testCppServer()
{
runTestCase("single, 1 c++", singleWuTest, { CppServerProcessor }, { "abcd" });
runTestCase("drip, 1 c++", dripSingleTest, { CppServerProcessor }, { "a*b*c*d" });
runTestCase("single, 2 c++", singleWuTest, { CppServerProcessor, CppServerProcessor }, { "abcd", "" });
runTestCase("cpp, 2 c++", Cpp1Test, { CppServerProcessor, CppServerProcessor }, { "abcdeg*", "fhij*" });
runTestCase("cpp, 3 c++", Cpp1Test, { CppServerProcessor, CppServerProcessor, CppServerProcessor }, { "abcdeg*", "fhij*", "" });
runTestCase("cpp2, 3 c++", Cpp2Test, { CppServerProcessor, CppServerProcessor, CppServerProcessor }, { "abeg*", "cfhi*", "d*" });
runTestCase("cpp3, 2 c++", Cpp3Test, { CppServerProcessor, CppServerProcessor }, { "ac*hjk*", "bdefgi*" });
}
void testSingle()
{
runTestCase("1 wu, 1 standard", singleWuTest, { StandardProcessor }, { "abcd" });
Expand Down

0 comments on commit a542e0b

Please sign in to comment.