From c3b9e3483c1fa2d59edadd0246af121331bf9ed0 Mon Sep 17 00:00:00 2001 From: pillot Date: Mon, 25 Nov 2024 16:24:40 +0100 Subject: [PATCH 1/2] send messages at every TF --- .../DevIO/Digits/digits-sampler-workflow.cxx | 70 ++++++++++--------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/Detectors/MUON/MCH/DevIO/Digits/digits-sampler-workflow.cxx b/Detectors/MUON/MCH/DevIO/Digits/digits-sampler-workflow.cxx index 7f3819f110ba3..c5815536bcad9 100644 --- a/Detectors/MUON/MCH/DevIO/Digits/digits-sampler-workflow.cxx +++ b/Detectors/MUON/MCH/DevIO/Digits/digits-sampler-workflow.cxx @@ -27,6 +27,7 @@ #include #include #include +#include using namespace o2::framework; @@ -63,61 +64,64 @@ class DigitSamplerTask : public io::DigitIOBaseTask void outputAndClear(DataAllocator& out) { - printSummary(mDigits, mROFs, "-> to output"); + LOGP(info, "Sending {} rofs with {} digits", mROFs.size(), mDigits.size()); out.snapshot(OutputRef{"rofs"}, mROFs); out.snapshot(OutputRef{"digits"}, mDigits); mDigits.clear(); mROFs.clear(); } - bool shouldEnd() const + bool shouldEnd() { bool maxTFreached = mNofProcessedTFs >= mMaxNofTimeFrames; bool maxROFreached = mNofProcessedROFs >= mMaxNofROFs; - return !mReadIsOk || maxTFreached || maxROFreached; + bool lastTF = mInput.peek() == EOF; + return !mReadIsOk || lastTF || maxTFreached || maxROFreached; } void run(ProcessingContext& pc) { if (shouldEnd()) { - // output remaining data if any - if (mROFs.size() > 0) { - --mTFid; - outputAndClear(pc.outputs()); - } - pc.services().get().endOfStream(); - return; + throw std::invalid_argument("process should have ended already"); } std::vector rofs; std::vector digits; - mReadIsOk = mDigitSampler->read(digits, rofs); - if (!mReadIsOk) { - return; - } + while((mReadIsOk = mDigitSampler->read(digits, rofs))) { + + // process the current input TF if requested + if (shouldProcess()) { + incNofProcessedTFs(); + mNofProcessedROFs += rofs.size(); + // append rofs to mROFs, but shift the indices by the amount of digits + // we have read so far. + auto offset = mDigits.size(); + std::transform(rofs.begin(), rofs.end(), std::back_inserter(mROFs), + [offset](ROFRecord r) { + r.setDataRef(r.getFirstIdx() + offset, r.getNEntries()); + return r; + }); + mDigits.insert(mDigits.end(), digits.begin(), digits.end()); + printSummary(mDigits, mROFs); + printFull(mDigits, mROFs); + } - if (shouldProcess()) { - incNofProcessedTFs(); - mNofProcessedROFs += rofs.size(); - // append rofs to mROFs, but shift the indices by the amount of digits - // we have read so far. - auto offset = mDigits.size(); - std::transform(rofs.begin(), rofs.end(), std::back_inserter(mROFs), - [offset](ROFRecord r) { - r.setDataRef(r.getFirstIdx() + offset, r.getNEntries()); - return r; - }); - mDigits.insert(mDigits.end(), digits.begin(), digits.end()); - printSummary(mDigits, mROFs); - printFull(mDigits, mROFs); - } + // increment the input TF id for the next one + incTFid(); - // output if we've accumulated enough ROFs - if (mROFs.size() >= mMinNumberOfROFsPerTF) { - outputAndClear(pc.outputs()); + // stop here if we've accumulated enough ROFs or TFs + if (mROFs.size() >= mMinNumberOfROFsPerTF || shouldEnd()) { + break; + } } - incTFid(); + // output whatever has been accumulated, even if empty + outputAndClear(pc.outputs()); + + if (shouldEnd()) { + pc.services().get().endOfStream(); + pc.services().get().readyToQuit(QuitRequest::Me); + } } }; From 7931ac769c241a744ca44132c4c78a06366cb93a Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Tue, 26 Nov 2024 15:47:17 +0000 Subject: [PATCH 2/2] Please consider the following formatting changes --- Detectors/MUON/MCH/DevIO/Digits/digits-sampler-workflow.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Detectors/MUON/MCH/DevIO/Digits/digits-sampler-workflow.cxx b/Detectors/MUON/MCH/DevIO/Digits/digits-sampler-workflow.cxx index c5815536bcad9..0184e1c78c0c6 100644 --- a/Detectors/MUON/MCH/DevIO/Digits/digits-sampler-workflow.cxx +++ b/Detectors/MUON/MCH/DevIO/Digits/digits-sampler-workflow.cxx @@ -87,7 +87,7 @@ class DigitSamplerTask : public io::DigitIOBaseTask std::vector rofs; std::vector digits; - while((mReadIsOk = mDigitSampler->read(digits, rofs))) { + while ((mReadIsOk = mDigitSampler->read(digits, rofs))) { // process the current input TF if requested if (shouldProcess()) {