diff --git a/Framework/AnalysisSupport/src/AODReaderHelpers.cxx b/Framework/AnalysisSupport/src/AODReaderHelpers.cxx index 40aa5a9537c7f..d4453b4176e77 100644 --- a/Framework/AnalysisSupport/src/AODReaderHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODReaderHelpers.cxx @@ -19,7 +19,7 @@ #include "Framework/AlgorithmSpec.h" #include "Framework/DataSpecUtils.h" #include "Framework/ConfigContext.h" -#include "Framework/AnalysisContext.h" +#include "Framework/DanglingEdgesContext.h" namespace o2::framework::readers { @@ -79,10 +79,10 @@ struct Buildable { } // namespace -AlgorithmSpec AODReaderHelpers::indexBuilderCallback(ConfigContext const& ctx) +AlgorithmSpec AODReaderHelpers::indexBuilderCallback(ConfigContext const& /*ctx*/) { - auto& ac = ctx.services().get(); - return AlgorithmSpec::InitCallback{[requested = ac.requestedIDXs](InitContext& /*ic*/) { + return AlgorithmSpec::InitCallback{[](InitContext& ic) { + auto requested = ic.services().get().requestedIDXs; std::vector buildables; for (auto& i : requested) { buildables.emplace_back(i); @@ -181,10 +181,10 @@ struct Spawnable { } // namespace -AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(ConfigContext const& ctx) +AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(ConfigContext const& /*ctx*/) { - auto& ac = ctx.services().get(); - return AlgorithmSpec::InitCallback{[requested = ac.spawnerInputs](InitContext& /*ic*/) { + return AlgorithmSpec::InitCallback{[](InitContext& ic) { + auto requested = ic.services().get().spawnerInputs; std::vector spawnables; for (auto& i : requested) { spawnables.emplace_back(i); diff --git a/Framework/AnalysisSupport/src/AODReaderHelpers.h b/Framework/AnalysisSupport/src/AODReaderHelpers.h index 197907ca3ccb1..848ef6b696713 100644 --- a/Framework/AnalysisSupport/src/AODReaderHelpers.h +++ b/Framework/AnalysisSupport/src/AODReaderHelpers.h @@ -20,8 +20,8 @@ namespace o2::framework::readers struct AODReaderHelpers { static AlgorithmSpec rootFileReaderCallback(); - static AlgorithmSpec aodSpawnerCallback(ConfigContext const& ctx); - static AlgorithmSpec indexBuilderCallback(ConfigContext const& ctx); + static AlgorithmSpec aodSpawnerCallback(ConfigContext const& /*ctx*/); + static AlgorithmSpec indexBuilderCallback(ConfigContext const& /*ctx*/); }; } // namespace o2::framework::readers diff --git a/Framework/AnalysisSupport/src/AODWriterHelpers.cxx b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx index bcf27d0be5ba3..5a43683afd364 100644 --- a/Framework/AnalysisSupport/src/AODWriterHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx @@ -8,7 +8,7 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#include "Framework/AnalysisContext.h" +#include "Framework/DanglingEdgesContext.h" #include "Framework/ConfigContext.h" #include "Framework/ControlService.h" #include "AODWriterHelpers.h" @@ -62,7 +62,7 @@ const static std::unordered_map ROOTfileNa AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx) { - auto& ac = ctx.services().get(); + auto& ac = ctx.services().get(); auto dod = AnalysisSupportHelpers::getDataOutputDirector(ctx); int compressionLevel = 505; if (ctx.options().hasOption("aod-writer-compression")) { @@ -245,7 +245,7 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx) AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx) { using namespace monitoring; - auto& ac = ctx.services().get(); + auto& ac = ctx.services().get(); auto tskmap = ac.outTskMap; auto objmap = ac.outObjHistMap; diff --git a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx index aba1f3ed4e13d..fcc856669cd92 100644 --- a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx @@ -18,7 +18,7 @@ #include "Framework/RawDeviceService.h" #include "Framework/Output.h" #include "Framework/Signpost.h" -#include "Framework/AnalysisContext.h" +#include "Framework/DanglingEdgesContext.h" #include "Framework/ConfigContext.h" #include "Framework/ConfigContext.h" #include @@ -69,7 +69,7 @@ void fillValidRoutes(CCDBFetcherHelper& helper, std::vector(); + auto& ac = ctx.services().get(); std::vector> schemas; auto schemaMetadata = std::make_shared(); diff --git a/Framework/Core/include/Framework/AnalysisSupportHelpers.h b/Framework/Core/include/Framework/AnalysisSupportHelpers.h index c0eeb3bd9697d..ef1d056b62f2b 100644 --- a/Framework/Core/include/Framework/AnalysisSupportHelpers.h +++ b/Framework/Core/include/Framework/AnalysisSupportHelpers.h @@ -14,7 +14,7 @@ #include "Framework/OutputSpec.h" #include "Framework/InputSpec.h" #include "Framework/DataProcessorSpec.h" -#include "Framework/AnalysisContext.h" +#include "Framework/DanglingEdgesContext.h" #include "Headers/DataHeader.h" #include diff --git a/Framework/Core/include/Framework/AnalysisContext.h b/Framework/Core/include/Framework/DanglingEdgesContext.h similarity index 92% rename from Framework/Core/include/Framework/AnalysisContext.h rename to Framework/Core/include/Framework/DanglingEdgesContext.h index 7d1544ed312a4..90a88974db038 100644 --- a/Framework/Core/include/Framework/AnalysisContext.h +++ b/Framework/Core/include/Framework/DanglingEdgesContext.h @@ -8,8 +8,8 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifndef O2_FRAMEWORK_ANALYSISCONTEXT_H_ -#define O2_FRAMEWORK_ANALYSISCONTEXT_H_ +#ifndef O2_FRAMEWORK_DANGLINGEDGESCONTEXT_H_ +#define O2_FRAMEWORK_DANGLINGEDGESCONTEXT_H_ #include #include "Framework/InputSpec.h" @@ -32,7 +32,7 @@ struct OutputObjectInfo { // This will keep track of the inputs which have // been requested and for which we will need to inject // some source device. -struct AnalysisContext { +struct DanglingEdgesContext { std::vector requestedAODs; std::vector providedAODs; std::vector requestedDYNs; @@ -63,4 +63,4 @@ struct AnalysisContext { extern template class std::vector; extern template class std::vector; -#endif // O2_FRAMEWORK_ANALYSISCONTEXT_H_ +#endif // O2_FRAMEWORK_DANGLINGEDGESCONTEXT_H_ diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index e59f36c72bdab..15b56f9afbff5 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -25,8 +25,8 @@ namespace o2::framework std::shared_ptr AnalysisSupportHelpers::getDataOutputDirector(ConfigContext const& ctx) { auto const& options = ctx.options(); - auto const& OutputsInputs = ctx.services().get().outputsInputs; - auto const& isDangling = ctx.services().get().isDangling; + auto const& OutputsInputs = ctx.services().get().outputsInputs; + auto const& isDangling = ctx.services().get().isDangling; std::shared_ptr dod = std::make_shared(); @@ -200,7 +200,7 @@ DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(ConfigContext con DataProcessorSpec AnalysisSupportHelpers::getGlobalAODSink(ConfigContext const& ctx) { - auto& ac = ctx.services().get(); + auto& ac = ctx.services().get(); // the command line options relevant for the writer are global // see runDataProcessing.h diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 26594252e888b..c0280b144e146 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -13,6 +13,7 @@ #include "Framework/ArrowContext.h" #include "Framework/ArrowTableSlicingCache.h" #include "Framework/DataProcessor.h" +#include "Framework/CommonDataProcessors.h" #include "Framework/DataProcessingStats.h" #include "Framework/ServiceRegistry.h" #include "Framework/ConfigContext.h" @@ -588,31 +589,31 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; }); auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; }); auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; }); - auto &ac = ctx.services().get(); - ac.requestedAODs.clear(); - ac.requestedDYNs.clear(); - ac.providedDYNs.clear(); - ac.providedTIMs.clear(); - ac.requestedTIMs.clear(); + auto& dec = ctx.services().get(); + dec.requestedAODs.clear(); + dec.requestedDYNs.clear(); + dec.providedDYNs.clear(); + dec.providedTIMs.clear(); + dec.requestedTIMs.clear(); auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); }; auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); }; if (builder != workflow.end()) { // collect currently requested IDXs - ac.requestedIDXs.clear(); + dec.requestedIDXs.clear(); for (auto& d : workflow | views::exclude_by_name(builder->name)) { d.inputs | views::partial_match_filter(header::DataOrigin{"IDX"}) | - sinks::update_input_list{ac.requestedIDXs}; + sinks::update_input_list{dec.requestedIDXs}; } // recreate inputs and outputs builder->inputs.clear(); builder->outputs.clear(); - // replace AlgorithmSpec - // FIXME: it should be made more generic, so it does not need replacement... - builder->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx); // readers::AODReaderHelpers::indexBuilderCallback(ctx); - AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, *builder); + + // load real AlgorithmSpec before deployment + builder->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx); + AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.requestedIDXs, dec.requestedAODs, dec.requestedDYNs, *builder); } if (spawner != workflow.end()) { @@ -620,45 +621,44 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() for (auto& d : workflow | views::exclude_by_name(spawner->name)) { d.inputs | views::partial_match_filter(header::DataOrigin{"DYN"}) | - sinks::update_input_list{ac.requestedDYNs}; + sinks::update_input_list{dec.requestedDYNs}; d.outputs | views::partial_match_filter(header::DataOrigin{"DYN"}) | - sinks::append_to{ac.providedDYNs}; + sinks::append_to{dec.providedDYNs}; } - std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan); - std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan); - ac.spawnerInputs.clear(); - ac.requestedDYNs | - views::filter_not_matching(ac.providedDYNs) | - sinks::append_to{ac.spawnerInputs}; + std::sort(dec.requestedDYNs.begin(), dec.requestedDYNs.end(), inputSpecLessThan); + std::sort(dec.providedDYNs.begin(), dec.providedDYNs.end(), outputSpecLessThan); + dec.spawnerInputs.clear(); + dec.requestedDYNs | + views::filter_not_matching(dec.providedDYNs) | + sinks::append_to{dec.spawnerInputs}; // recreate inputs and outputs spawner->outputs.clear(); spawner->inputs.clear(); - AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner); - // replace AlgorithmSpec - // FIXME: it should be made more generic, so it does not need replacement... + + // load real AlgorithmSpec before deployment spawner->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx); + AnalysisSupportHelpers::addMissingOutputsToSpawner({}, dec.spawnerInputs, dec.requestedAODs, *spawner); } if (analysisCCDB != workflow.end()) { for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) { - d.inputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::update_input_list{ac.requestedTIMs}; - d.outputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::append_to{ac.providedTIMs}; + d.inputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::update_input_list{dec.requestedTIMs}; + d.outputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::append_to{dec.providedTIMs}; } - std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan); - std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan); + std::sort(dec.requestedTIMs.begin(), dec.requestedTIMs.end(), inputSpecLessThan); + std::sort(dec.providedTIMs.begin(), dec.providedTIMs.end(), outputSpecLessThan); // Use ranges::to> in C++23... - ac.analysisCCDBInputs.clear(); - ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs}; + dec.analysisCCDBInputs.clear(); + dec.requestedTIMs | views::filter_not_matching(dec.providedTIMs) | sinks::append_to{dec.analysisCCDBInputs}; // recreate inputs and outputs analysisCCDB->outputs.clear(); analysisCCDB->inputs.clear(); - // replace AlgorithmSpec - // FIXME: it should be made more generic, so it does not need replacement... + // load real AlgorithmSpec before deployment // FIXME how can I make the lookup depend on DYN tables as well?? analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx); - AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB); + AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedDYNs, *analysisCCDB); } if (writer != workflow.end()) { @@ -671,17 +671,21 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() for (auto& d : workflow) { d.inputs | views::partial_match_filter(AODOrigins) | - sinks::update_input_list{ac.requestedAODs}; + sinks::update_input_list{dec.requestedAODs}; } // remove unmatched outputs auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](OutputSpec const& o) { - return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(ac.requestedAODs.begin(), ac.requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); }); + return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(dec.requestedAODs.begin(), dec.requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); }); }); reader->outputs.erase(o_end, reader->outputs.end()); if (reader->outputs.empty()) { // nothing to read workflow.erase(reader); + } else { + // load reader algorithm before deployment + auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx); + reader->algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(algo); } } @@ -694,22 +698,22 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() // select outputs of type AOD which need to be saved // ATTENTION: if there are dangling outputs the getGlobalAODSink // has to be created in any case! - ac.outputsInputsAOD.clear(); + dec.outputsInputsAOD.clear(); for (auto ii = 0u; ii < outputsInputs.size(); ii++) { if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) { auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]); if (!ds.empty() || isDangling[ii]) { - ac.outputsInputsAOD.emplace_back(outputsInputs[ii]); + dec.outputsInputsAOD.emplace_back(outputsInputs[ii]); } } } // file sink for any AOD output - if (!ac.outputsInputsAOD.empty()) { + if (!dec.outputsInputsAOD.empty()) { // add TFNumber and TFFilename as input to the writer - ac.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber"); - ac.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename"); + dec.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber"); + dec.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename"); workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx)); } // Move the dummy sink at the end, if needed diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 02141678fec7c..fd9099e1aa24e 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -156,6 +156,7 @@ int defaultConditionQueryRateMultiplier() void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext& ctx) { + int rateLimitingIPCID = std::stoi(ctx.options().get("timeframes-rate-limit-ipcid")); DataProcessorSpec ccdbBackend{ .name = "internal-dpl-ccdb-backend", .outputs = {}, @@ -230,25 +231,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}}, .requiredServices = CommonServices::defaultServices("O2FrameworkAnalysisSupport:RunSummary")}; - // AOD reader can be rate limited - int rateLimitingIPCID = std::stoi(ctx.options().get("timeframes-rate-limit-ipcid")); - std::string rateLimitingChannelConfigInput; - std::string rateLimitingChannelConfigOutput; - bool internalRateLimiting = false; - - // In case we have rate-limiting requested, any device without an input will get one on the special - // "DPL/RATE" message. - if (rateLimitingIPCID >= 0) { - rateLimitingChannelConfigInput = fmt::format("name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", - ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID); - rateLimitingChannelConfigOutput = fmt::format("name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", - ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID); - internalRateLimiting = true; - aodReader.options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, rateLimitingChannelConfigInput, {"how many timeframes can be in flight at the same time"}}); - } - - ctx.services().registerService(ServiceRegistryHelpers::handleForService(new AnalysisContext)); - auto& ac = ctx.services().get(); + ctx.services().registerService(ServiceRegistryHelpers::handleForService(new DanglingEdgesContext)); + auto& dec = ctx.services().get(); std::vector requestedCCDBs; std::vector providedCCDBs; @@ -257,7 +241,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext auto& processor = workflow[wi]; auto name = processor.name; auto hash = runtime_hash(name.c_str()); - ac.outTskMap.push_back({hash, name}); + dec.outTskMap.push_back({hash, name}); std::string prefix = "internal-dpl-"; if (processor.inputs.empty() && processor.name.compare(0, prefix.size(), prefix) != 0) { @@ -274,7 +258,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext // A timeframeSink consumes timeframes without creating new // timeframe data. bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs; - if (std::stoi(ctx.options().get("timeframes-rate-limit-ipcid")) != -1) { + if (rateLimitingIPCID != -1) { if (timeframeSink && processor.name.find("internal-dpl-injected-dummy-sink") == std::string::npos) { O2_SIGNPOST_ID_GENERATE(sid, workflow_helpers); uint32_t hash = runtime_hash(processor.name.c_str()); @@ -336,16 +320,16 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext break; } if (DataSpecUtils::partialMatch(input, AODOrigins)) { - DataSpecUtils::updateInputList(ac.requestedAODs, InputSpec{input}); + DataSpecUtils::updateInputList(dec.requestedAODs, InputSpec{input}); } if (DataSpecUtils::partialMatch(input, header::DataOrigin{"DYN"})) { - DataSpecUtils::updateInputList(ac.requestedDYNs, InputSpec{input}); + DataSpecUtils::updateInputList(dec.requestedDYNs, InputSpec{input}); } if (DataSpecUtils::partialMatch(input, header::DataOrigin{"IDX"})) { - DataSpecUtils::updateInputList(ac.requestedIDXs, InputSpec{input}); + DataSpecUtils::updateInputList(dec.requestedIDXs, InputSpec{input}); } if (DataSpecUtils::partialMatch(input, header::DataOrigin{"ATIM"})) { - DataSpecUtils::updateInputList(ac.requestedTIMs, InputSpec{input}); + DataSpecUtils::updateInputList(dec.requestedTIMs, InputSpec{input}); } } @@ -353,16 +337,16 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext for (auto& output : processor.outputs) { if (DataSpecUtils::partialMatch(output, AODOrigins)) { - ac.providedAODs.emplace_back(output); + dec.providedAODs.emplace_back(output); } else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"DYN"})) { - ac.providedDYNs.emplace_back(output); + dec.providedDYNs.emplace_back(output); } else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"ATIM"})) { - ac.providedTIMs.emplace_back(output); + dec.providedTIMs.emplace_back(output); } else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"ATSK"})) { - ac.providedOutputObjHist.emplace_back(output); - auto it = std::find_if(ac.outObjHistMap.begin(), ac.outObjHistMap.end(), [&](auto&& x) { return x.id == hash; }); - if (it == ac.outObjHistMap.end()) { - ac.outObjHistMap.push_back({hash, {output.binding.value}}); + dec.providedOutputObjHist.emplace_back(output); + auto it = std::find_if(dec.outObjHistMap.begin(), dec.outObjHistMap.end(), [&](auto&& x) { return x.id == hash; }); + if (it == dec.outObjHistMap.end()) { + dec.outObjHistMap.push_back({hash, {output.binding.value}}); } else { it->bindings.push_back(output.binding.value); } @@ -375,36 +359,35 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); }; auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); }; - std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan); - std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan); - std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan); - std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan); + std::sort(dec.requestedDYNs.begin(), dec.requestedDYNs.end(), inputSpecLessThan); + std::sort(dec.requestedTIMs.begin(), dec.requestedTIMs.end(), inputSpecLessThan); + std::sort(dec.providedDYNs.begin(), dec.providedDYNs.end(), outputSpecLessThan); + std::sort(dec.providedTIMs.begin(), dec.providedTIMs.end(), outputSpecLessThan); DataProcessorSpec indexBuilder{ "internal-dpl-aod-index-builder", {}, {}, - PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx), // readers::AODReaderHelpers::indexBuilderCallback(ctx), + AlgorithmSpec::dummyAlgorithm(), // real algorithm will be set in adjustTopology {}}; - AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, indexBuilder); + AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.requestedIDXs, dec.requestedAODs, dec.requestedDYNs, indexBuilder); - ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs}; + dec.requestedTIMs | views::filter_not_matching(dec.providedTIMs) | sinks::append_to{dec.analysisCCDBInputs}; DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode(); if (deploymentMode != DeploymentMode::OnlineDDS && deploymentMode != DeploymentMode::OnlineECS) { - AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedTIMs, analysisCCDBBackend); + AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedTIMs, analysisCCDBBackend); } - ac.requestedDYNs | views::filter_not_matching(ac.providedDYNs) | sinks::append_to{ac.spawnerInputs}; + dec.requestedDYNs | views::filter_not_matching(dec.providedDYNs) | sinks::append_to{dec.spawnerInputs}; DataProcessorSpec aodSpawner{ "internal-dpl-aod-spawner", {}, {}, - PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx), // readers::AODReaderHelpers::aodSpawnerCallback(ctx), + AlgorithmSpec::dummyAlgorithm(), // real algorithm will be set in adjustTopology {}}; - AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, aodSpawner); - - AnalysisSupportHelpers::addMissingOutputsToReader(ac.providedAODs, ac.requestedAODs, aodReader); + AnalysisSupportHelpers::addMissingOutputsToSpawner({}, dec.spawnerInputs, dec.requestedAODs, aodSpawner); + AnalysisSupportHelpers::addMissingOutputsToReader(dec.providedAODs, dec.requestedAODs, aodReader); std::sort(requestedCCDBs.begin(), requestedCCDBs.end(), inputSpecLessThan); std::sort(providedCCDBs.begin(), providedCCDBs.end(), outputSpecLessThan); @@ -432,13 +415,11 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](auto const& x) { return x.name == "mctracks-to-aod"; }); if (mctracks2aod == workflow.end()) { // add normal reader - auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx); - aodReader.algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(algo); aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"}); aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"}); } else { - // AODs are being injected on-the-fly, add dummy reader - auto algo = AlgorithmSpec{ + // AODs are being injected on-the-fly, add error-handler reader + aodReader.algorithm = AlgorithmSpec{ adaptStateful( [outputs = aodReader.outputs](DeviceSpec const&) { LOGP(warn, "Workflow with injected AODs has unsatisfied inputs:"); @@ -449,7 +430,6 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext // to ensure the output type for adaptStateful return adaptStateless([](DataAllocator&) {}); })}; - aodReader.algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(algo); } auto concrete = DataSpecUtils::asConcreteDataMatcher(aodReader.inputs[0]); timer.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration); @@ -534,9 +514,6 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext // add the Analysys CCDB backend which reads CCDB objects using a provided table if (analysisCCDBBackend.outputs.empty() == false) { - // add normal reader - auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx); - analysisCCDBBackend.algorithm = algo; extraSpecs.push_back(analysisCCDBBackend); } @@ -547,7 +524,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext // This is to inject a file sink so that any dangling ATSK object is written // to a ROOT file. - if (ac.providedOutputObjHist.empty() == false) { + if (dec.providedOutputObjHist.empty() == false) { auto rootSink = AnalysisSupportHelpers::getOutputObjHistSink(ctx); extraSpecs.push_back(rootSink); } @@ -557,8 +534,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext /// Analyze all ouputs auto [outputsInputsTmp, isDanglingTmp] = analyzeOutputs(workflow); - ac.isDangling = isDanglingTmp; - ac.outputsInputs = outputsInputsTmp; + dec.isDangling = isDanglingTmp; + dec.outputsInputs = outputsInputsTmp; // create DataOutputDescriptor std::shared_ptr dod = AnalysisSupportHelpers::getDataOutputDirector(ctx); @@ -566,28 +543,28 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext // select outputs of type AOD which need to be saved // ATTENTION: if there are dangling outputs the getGlobalAODSink // has to be created in any case! - for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) { - if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) { - auto ds = dod->getDataOutputDescriptors(ac.outputsInputs[ii]); - if (ds.size() > 0 || ac.isDangling[ii]) { - ac.outputsInputsAOD.emplace_back(ac.outputsInputs[ii]); + for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) { + if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], extendedAODOrigins)) { + auto ds = dod->getDataOutputDescriptors(dec.outputsInputs[ii]); + if (ds.size() > 0 || dec.isDangling[ii]) { + dec.outputsInputsAOD.emplace_back(dec.outputsInputs[ii]); } } } // file sink for any AOD output - if (ac.outputsInputsAOD.size() > 0) { + if (dec.outputsInputsAOD.size() > 0) { // add TFNumber and TFFilename as input to the writer - ac.outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"}); - ac.outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"}); + dec.outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"}); + dec.outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"}); auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx); extraSpecs.push_back(fileSink); - auto it = std::find_if(ac.outputsInputs.begin(), ac.outputsInputs.end(), [](InputSpec& spec) -> bool { + auto it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec& spec) -> bool { return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN")); }); - size_t ii = std::distance(ac.outputsInputs.begin(), it); - ac.isDangling[ii] = false; + size_t ii = std::distance(dec.outputsInputs.begin(), it); + dec.isDangling[ii] = false; } workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end()); @@ -595,20 +572,20 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext // Select dangling outputs which are not of type AOD std::vector redirectedOutputsInputs; - for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) { + for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) { if (ctx.options().get("forwarding-policy") == "none") { continue; } // We forward to the output proxy all the inputs only if they are dangling // or if the forwarding policy is "proxy". - if (!ac.isDangling[ii] && (ctx.options().get("forwarding-policy") != "all")) { + if (!dec.isDangling[ii] && (ctx.options().get("forwarding-policy") != "all")) { continue; } // AODs are skipped in any case. - if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) { + if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], extendedAODOrigins)) { continue; } - redirectedOutputsInputs.emplace_back(ac.outputsInputs[ii]); + redirectedOutputsInputs.emplace_back(dec.outputsInputs[ii]); } std::vector unmatched; @@ -638,6 +615,10 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext extraSpecs.push_back(CommonDataProcessors::getScheduledDummySink(ignored)); } else { O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "injectServiceDevices", "Injecting rate limited dummy sink"); + std::string rateLimitingChannelConfigOutput; + if (rateLimitingIPCID != -1) { + rateLimitingChannelConfigOutput = fmt::format("name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID); + } extraSpecs.push_back(CommonDataProcessors::getDummySink(ignored, rateLimitingChannelConfigOutput)); } } diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index c36b1deadeefb..14bdb2d8c72d9 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -9,6 +9,7 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. #include +#include "Framework/DanglingEdgesContext.h" #include "Framework/TopologyPolicyHelpers.h" #define BOOST_BIND_GLOBAL_PLACEHOLDERS #include @@ -1016,6 +1017,7 @@ void doDefaultWorkflowTerminationHook() } int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, + DanglingEdgesContext& danglingEdgesContext, RunningWorkflowInfo const& runningWorkflow, RunningDeviceRef ref, DriverConfig const& driverConfig, @@ -1078,6 +1080,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, &spec, "aEvaluator, &serviceRegistry, + &danglingEdgesContext, &deviceState, &deviceProxy, &processingPolicies, @@ -1101,6 +1104,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, serviceRef.registerService(ServiceRegistryHelpers::handleForService(&runningWorkflow)); serviceRef.registerService(ServiceRegistryHelpers::handleForService(deviceContext.get())); serviceRef.registerService(ServiceRegistryHelpers::handleForService(&driverConfig)); + serviceRef.registerService(ServiceRegistryHelpers::handleForService(&danglingEdgesContext)); auto device = std::make_unique(ref, serviceRegistry); @@ -1953,6 +1957,7 @@ int runStateMachine(DataProcessorSpecs const& workflow, if (runningWorkflow.devices[di].id == frameworkId) { return doChild(driverInfo.argc, driverInfo.argv, serviceRegistry, + driverInfo.configContext->services().get(), runningWorkflow, ref, driverConfig, driverInfo.processingPolicies,