Skip to content

Commit

Permalink
send messages at every TF
Browse files Browse the repository at this point in the history
  • Loading branch information
pillot committed Nov 27, 2024
1 parent 3d3ee4d commit 64077ed
Showing 1 changed file with 37 additions and 33 deletions.
70 changes: 37 additions & 33 deletions Detectors/MUON/MCH/DevIO/Digits/digits-sampler-workflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <iostream>
#include <memory>
#include <string>
#include <stdexcept>

using namespace o2::framework;

Expand Down Expand Up @@ -63,61 +64,64 @@ class DigitSamplerTask : public io::DigitIOBaseTask

void outputAndClear(DataAllocator& out)
{
printSummary(mDigits, mROFs, "-> to output");
LOGP(info, "Sending {} rofs with {} digits", mROFs.size(), mDigits.size());
out.snapshot(OutputRef{"rofs"}, mROFs);
out.snapshot(OutputRef{"digits"}, mDigits);
mDigits.clear();
mROFs.clear();
}

bool shouldEnd() const
bool shouldEnd()
{
bool maxTFreached = mNofProcessedTFs >= mMaxNofTimeFrames;
bool maxROFreached = mNofProcessedROFs >= mMaxNofROFs;
return !mReadIsOk || maxTFreached || maxROFreached;
bool lastTF = mInput.peek() == EOF;
return !mReadIsOk || lastTF || maxTFreached || maxROFreached;
}

void run(ProcessingContext& pc)
{
if (shouldEnd()) {
// output remaining data if any
if (mROFs.size() > 0) {
--mTFid;
outputAndClear(pc.outputs());
}
pc.services().get<ControlService>().endOfStream();
return;
throw std::invalid_argument("process should have ended already");
}

std::vector<ROFRecord> rofs;
std::vector<Digit> digits;
mReadIsOk = mDigitSampler->read(digits, rofs);
if (!mReadIsOk) {
return;
}
while ((mReadIsOk = mDigitSampler->read(digits, rofs))) {

// process the current input TF if requested
if (shouldProcess()) {
incNofProcessedTFs();
mNofProcessedROFs += rofs.size();
// append rofs to mROFs, but shift the indices by the amount of digits
// we have read so far.
auto offset = mDigits.size();
std::transform(rofs.begin(), rofs.end(), std::back_inserter(mROFs),
[offset](ROFRecord r) {
r.setDataRef(r.getFirstIdx() + offset, r.getNEntries());
return r;
});
mDigits.insert(mDigits.end(), digits.begin(), digits.end());
printSummary(mDigits, mROFs);
printFull(mDigits, mROFs);
}

if (shouldProcess()) {
incNofProcessedTFs();
mNofProcessedROFs += rofs.size();
// append rofs to mROFs, but shift the indices by the amount of digits
// we have read so far.
auto offset = mDigits.size();
std::transform(rofs.begin(), rofs.end(), std::back_inserter(mROFs),
[offset](ROFRecord r) {
r.setDataRef(r.getFirstIdx() + offset, r.getNEntries());
return r;
});
mDigits.insert(mDigits.end(), digits.begin(), digits.end());
printSummary(mDigits, mROFs);
printFull(mDigits, mROFs);
}
// increment the input TF id for the next one
incTFid();

// output if we've accumulated enough ROFs
if (mROFs.size() >= mMinNumberOfROFsPerTF) {
outputAndClear(pc.outputs());
// stop here if we've accumulated enough ROFs or TFs
if (mROFs.size() >= mMinNumberOfROFsPerTF || shouldEnd()) {
break;
}
}

incTFid();
// output whatever has been accumulated, even if empty
outputAndClear(pc.outputs());

if (shouldEnd()) {
pc.services().get<ControlService>().endOfStream();
pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
}
}
};

Expand Down

0 comments on commit 64077ed

Please sign in to comment.