Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 37 additions & 33 deletions Detectors/MUON/MCH/DevIO/Digits/digits-sampler-workflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <iostream>
#include <memory>
#include <string>
#include <stdexcept>

using namespace o2::framework;

Expand Down Expand Up @@ -63,61 +64,64 @@ class DigitSamplerTask : public io::DigitIOBaseTask

void outputAndClear(DataAllocator& out)
{
printSummary(mDigits, mROFs, "-> to output");
LOGP(info, "Sending {} rofs with {} digits", mROFs.size(), mDigits.size());
out.snapshot(OutputRef{"rofs"}, mROFs);
out.snapshot(OutputRef{"digits"}, mDigits);
mDigits.clear();
mROFs.clear();
}

bool shouldEnd() const
bool shouldEnd()
{
bool maxTFreached = mNofProcessedTFs >= mMaxNofTimeFrames;
bool maxROFreached = mNofProcessedROFs >= mMaxNofROFs;
return !mReadIsOk || maxTFreached || maxROFreached;
bool lastTF = mInput.peek() == EOF;
return !mReadIsOk || lastTF || maxTFreached || maxROFreached;
}

void run(ProcessingContext& pc)
{
if (shouldEnd()) {
// output remaining data if any
if (mROFs.size() > 0) {
--mTFid;
outputAndClear(pc.outputs());
}
pc.services().get<ControlService>().endOfStream();
return;
throw std::invalid_argument("process should have ended already");
}

std::vector<ROFRecord> rofs;
std::vector<Digit> digits;
mReadIsOk = mDigitSampler->read(digits, rofs);
if (!mReadIsOk) {
return;
}
while ((mReadIsOk = mDigitSampler->read(digits, rofs))) {

// process the current input TF if requested
if (shouldProcess()) {
incNofProcessedTFs();
mNofProcessedROFs += rofs.size();
// append rofs to mROFs, but shift the indices by the amount of digits
// we have read so far.
auto offset = mDigits.size();
std::transform(rofs.begin(), rofs.end(), std::back_inserter(mROFs),
[offset](ROFRecord r) {
r.setDataRef(r.getFirstIdx() + offset, r.getNEntries());
return r;
});
mDigits.insert(mDigits.end(), digits.begin(), digits.end());
printSummary(mDigits, mROFs);
printFull(mDigits, mROFs);
}

if (shouldProcess()) {
incNofProcessedTFs();
mNofProcessedROFs += rofs.size();
// append rofs to mROFs, but shift the indices by the amount of digits
// we have read so far.
auto offset = mDigits.size();
std::transform(rofs.begin(), rofs.end(), std::back_inserter(mROFs),
[offset](ROFRecord r) {
r.setDataRef(r.getFirstIdx() + offset, r.getNEntries());
return r;
});
mDigits.insert(mDigits.end(), digits.begin(), digits.end());
printSummary(mDigits, mROFs);
printFull(mDigits, mROFs);
}
// increment the input TF id for the next one
incTFid();

// output if we've accumulated enough ROFs
if (mROFs.size() >= mMinNumberOfROFsPerTF) {
outputAndClear(pc.outputs());
// stop here if we've accumulated enough ROFs or TFs
if (mROFs.size() >= mMinNumberOfROFsPerTF || shouldEnd()) {
break;
}
}

incTFid();
// output whatever has been accumulated, even if empty
outputAndClear(pc.outputs());

if (shouldEnd()) {
pc.services().get<ControlService>().endOfStream();
pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
}
}
};

Expand Down