diff --git a/Detectors/MUON/MCH/DevIO/Digits/digits-sampler-workflow.cxx b/Detectors/MUON/MCH/DevIO/Digits/digits-sampler-workflow.cxx index 7f3819f110ba3..0184e1c78c0c6 100644 --- a/Detectors/MUON/MCH/DevIO/Digits/digits-sampler-workflow.cxx +++ b/Detectors/MUON/MCH/DevIO/Digits/digits-sampler-workflow.cxx @@ -27,6 +27,7 @@ #include #include #include +#include using namespace o2::framework; @@ -63,61 +64,64 @@ class DigitSamplerTask : public io::DigitIOBaseTask void outputAndClear(DataAllocator& out) { - printSummary(mDigits, mROFs, "-> to output"); + LOGP(info, "Sending {} rofs with {} digits", mROFs.size(), mDigits.size()); out.snapshot(OutputRef{"rofs"}, mROFs); out.snapshot(OutputRef{"digits"}, mDigits); mDigits.clear(); mROFs.clear(); } - bool shouldEnd() const + bool shouldEnd() { bool maxTFreached = mNofProcessedTFs >= mMaxNofTimeFrames; bool maxROFreached = mNofProcessedROFs >= mMaxNofROFs; - return !mReadIsOk || maxTFreached || maxROFreached; + bool lastTF = mInput.peek() == EOF; + return !mReadIsOk || lastTF || maxTFreached || maxROFreached; } void run(ProcessingContext& pc) { if (shouldEnd()) { - // output remaining data if any - if (mROFs.size() > 0) { - --mTFid; - outputAndClear(pc.outputs()); - } - pc.services().get().endOfStream(); - return; + throw std::invalid_argument("process should have ended already"); } std::vector rofs; std::vector digits; - mReadIsOk = mDigitSampler->read(digits, rofs); - if (!mReadIsOk) { - return; - } + while ((mReadIsOk = mDigitSampler->read(digits, rofs))) { + + // process the current input TF if requested + if (shouldProcess()) { + incNofProcessedTFs(); + mNofProcessedROFs += rofs.size(); + // append rofs to mROFs, but shift the indices by the amount of digits + // we have read so far. + auto offset = mDigits.size(); + std::transform(rofs.begin(), rofs.end(), std::back_inserter(mROFs), + [offset](ROFRecord r) { + r.setDataRef(r.getFirstIdx() + offset, r.getNEntries()); + return r; + }); + mDigits.insert(mDigits.end(), digits.begin(), digits.end()); + printSummary(mDigits, mROFs); + printFull(mDigits, mROFs); + } - if (shouldProcess()) { - incNofProcessedTFs(); - mNofProcessedROFs += rofs.size(); - // append rofs to mROFs, but shift the indices by the amount of digits - // we have read so far. - auto offset = mDigits.size(); - std::transform(rofs.begin(), rofs.end(), std::back_inserter(mROFs), - [offset](ROFRecord r) { - r.setDataRef(r.getFirstIdx() + offset, r.getNEntries()); - return r; - }); - mDigits.insert(mDigits.end(), digits.begin(), digits.end()); - printSummary(mDigits, mROFs); - printFull(mDigits, mROFs); - } + // increment the input TF id for the next one + incTFid(); - // output if we've accumulated enough ROFs - if (mROFs.size() >= mMinNumberOfROFsPerTF) { - outputAndClear(pc.outputs()); + // stop here if we've accumulated enough ROFs or TFs + if (mROFs.size() >= mMinNumberOfROFsPerTF || shouldEnd()) { + break; + } } - incTFid(); + // output whatever has been accumulated, even if empty + outputAndClear(pc.outputs()); + + if (shouldEnd()) { + pc.services().get().endOfStream(); + pc.services().get().readyToQuit(QuitRequest::Me); + } } };