Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 60 additions & 93 deletions Framework/Core/src/WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -488,116 +488,83 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
extraSpecs.push_back(timePipeline(aodReader, ctx.options().get<int64_t>("readers")));
}

ConcreteDataMatcher dstf{"FLP", "DISTSUBTIMEFRAME", 0xccdb};
if (ccdbBackend.outputs.empty() == false) {
ccdbBackend.outputs.push_back(OutputSpec{"CTP", "OrbitReset", 0});
InputSpec matcher{"dstf", "FLP", "DISTSUBTIMEFRAME", 0xccdb};
bool providesDISTSTF = false;
// Check if any of the provided outputs is a DISTSTF
// Check if any of the requested inputs is for a 0xccdb message
for (auto& dp : workflow) {
for (auto& output : dp.outputs) {
if (DataSpecUtils::match(matcher, output)) {
providesDISTSTF = true;
dstf = DataSpecUtils::asConcreteDataMatcher(output);
break;
}
}
if (providesDISTSTF) {
break;
}
InputSpec matcher{"dstf", "FLP", "DISTSUBTIMEFRAME", 0xccdb};
auto& dstf = std::get<ConcreteDataMatcher>(matcher.matcher);
// Check if any of the provided outputs is a DISTSTF
// Check if any of the requested inputs is for a 0xccdb message
bool providesDISTSTF = std::any_of(workflow.begin(), workflow.end(),
[&matcher](auto const& dp) {
return std::any_of(dp.outputs.begin(), dp.outputs.end(), [&matcher](auto const& output) {
return DataSpecUtils::match(matcher, output);
});
});

// If there is no CCDB requested, but we still ask for a FLP/DISTSUBTIMEFRAME/0xccdb
// we add to the first data processor which has no inputs (apart from
// enumerations / timers) the responsibility to provide the DISTSUBTIMEFRAME
bool requiresDISTSUBTIMEFRAME = std::any_of(workflow.begin(), workflow.end(),
[&dstf](auto const& dp) {
return std::any_of(dp.inputs.begin(), dp.inputs.end(), [&dstf](auto const& input) {
return DataSpecUtils::match(input, dstf);
});
});

// We find the first device which has either just enumerations or
// just timers, and we will add the DISTSUBTIMEFRAME to it.
// Notice how we do so in a stable manner by sorting the devices
// by name.
int enumCandidate = -1;
int timerCandidate = -1;
for (auto wi = 0U; wi < workflow.size(); ++wi) {
auto& dp = workflow[wi];
if (dp.inputs.size() != 1) {
continue;
}
auto lifetime = dp.inputs[0].lifetime;
if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].name > dp.name)) {
enumCandidate = wi;
}
// * If there are AOD outputs we use TFNumber as the CCDB clock
// * If one device provides a DISTSTF we use that as the CCDB clock
// * If one of the devices provides a timer we use that as the CCDB clock
// * If none of the above apply add to the first data processor
// which has no inputs apart from enumerations the responsibility
// to provide the DISTSUBTIMEFRAME.
if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].name > dp.name)) {
timerCandidate = wi;
}
}

// * If there are AOD outputs we use TFNumber as the CCDB clock
// * If one device provides a DISTSTF we use that as the CCDB clock
// * If one of the devices provides a timer we use that as the CCDB clock
// * If none of the above apply, add to the first data processor
// which has no inputs apart from enumerations the responsibility
// to provide the DISTSUBTIMEFRAME.
if (ccdbBackend.outputs.empty() == false) {
if (aodReader.outputs.empty() == false) {
// fetcher clock follows AOD source (TFNumber)
ccdbBackend.inputs.push_back(InputSpec{"tfn", "TFN", "TFNumber"});
} else if (providesDISTSTF) {
// fetcher clock follows DSTF/ccdb source (DISTSUBTIMEFRAME)
ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
} else {
// We find the first device which has either just enumerations or
// just timers, and we add the DISTSUBTIMEFRAME to it.
// Notice how we do so in a stable manner by sorting the devices
// by name.
int enumCandidate = -1;
int timerCandidate = -1;
for (size_t wi = 0; wi < workflow.size(); wi++) {
auto& dp = workflow[wi];
if (dp.inputs.size() != 1) {
continue;
}
auto lifetime = dp.inputs[0].lifetime;
if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].name > dp.name)) {
enumCandidate = wi;
}
if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].name > dp.name)) {
timerCandidate = wi;
}
}
if (enumCandidate != -1) {
auto& dp = workflow[enumCandidate];
DataSpecUtils::updateOutputList(dp.outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
// add DSTF/ccdb source to the enumeration-driven source explicitly
// fetcher clock is provided by enumeration-driven source (DISTSUBTIMEFRAME)
DataSpecUtils::updateOutputList(workflow[enumCandidate].outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
} else if (timerCandidate != -1) {
auto& dp = workflow[timerCandidate];
dstf = DataSpecUtils::asConcreteDataMatcher(dp.outputs[0]);
ccdbBackend.inputs.push_back(InputSpec{{"tfn"}, dstf, Lifetime::Timeframe});
// fetcher clock is proived by timer source
auto timer_dstf = DataSpecUtils::asConcreteDataMatcher(workflow[timerCandidate].outputs[0]);
ccdbBackend.inputs.push_back(InputSpec{"tfn", timer_dstf, Lifetime::Timeframe});
}
}

ccdbBackend.outputs.push_back(OutputSpec{"CTP", "OrbitReset", 0});
// Load the CCDB backend from the plugin
ccdbBackend.algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "CCDBFetcherPlugin", ctx);
extraSpecs.push_back(ccdbBackend);
} else {
// If there is no CCDB requested, but we still ask for a FLP/DISTSUBTIMEFRAME/0xccdb
// we add to the first data processor which has no inputs (apart from
// enumerations / timers) the responsibility to provide the DISTSUBTIMEFRAME
bool requiresDISTSUBTIMEFRAME = false;
for (auto& dp : workflow) {
for (auto& input : dp.inputs) {
if (DataSpecUtils::match(input, dstf)) {
requiresDISTSUBTIMEFRAME = true;
break;
}
}
}
if (requiresDISTSUBTIMEFRAME) {
// We find the first device which has either just enumerations or
// just timers, and we add the DISTSUBTIMEFRAME to it.
// Notice how we do so in a stable manner by sorting the devices
// by name.
int enumCandidate = -1;
int timerCandidate = -1;
for (size_t wi = 0; wi < workflow.size(); wi++) {
auto& dp = workflow[wi];
if (dp.inputs.size() != 1) {
continue;
}
auto lifetime = dp.inputs[0].lifetime;
if (lifetime == Lifetime::Enumeration && (enumCandidate == -1 || workflow[enumCandidate].name > dp.name)) {
enumCandidate = wi;
}
if (lifetime == Lifetime::Timer && (timerCandidate == -1 || workflow[timerCandidate].name > dp.name)) {
timerCandidate = wi;
}
}
if (enumCandidate != -1) {
auto& dp = workflow[enumCandidate];
DataSpecUtils::updateOutputList(dp.outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
ccdbBackend.inputs.push_back(InputSpec{"tfn", dstf, Lifetime::Timeframe});
} else if (timerCandidate != -1) {
auto& dp = workflow[timerCandidate];
dstf = DataSpecUtils::asConcreteDataMatcher(dp.outputs[0]);
ccdbBackend.inputs.push_back(InputSpec{{"tfn"}, dstf, Lifetime::Timeframe});
}
}
} else if (requiresDISTSUBTIMEFRAME && enumCandidate != -1) {
// add DSTF/ccdb source to the enumeration-driven source explicitly if it is required in the workflow
DataSpecUtils::updateOutputList(workflow[enumCandidate].outputs, OutputSpec{{"ccdb-diststf"}, dstf, Lifetime::Timeframe});
}

// add the Analysys CCDB backend which reads CCDB objects using a provided
// table
// add the Analysys CCDB backend which reads CCDB objects using a provided table
if (analysisCCDBBackend.outputs.empty() == false) {
// add normal reader
auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
Expand Down