Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<!-- internal dependencies -->
<dependencies.version>2025.6.25</dependencies.version>
<step-grid.version>0.0.0-25-SNAPSHOT</step-grid.version>
<step-framework.version>0.0.0-25-SNAPSHOT</step-framework.version>
<step-framework.version>0.0.0-SED-4451-SNAPSHOT</step-framework.version>

<!-- external, non-transitive, dependencies -->
<dep.groovy.version>3.0.23</dep.groovy.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class MeasurementPlugin extends AbstractExecutionEnginePlugin {
public static final String SCHEDULE = "schedule";
public static final String TEST_CASE = "testcase";
public static final String EXECUTION_DESCRIPTION = "execution";
public static final String PROJECT = "project";
public static final String CTX_SCHEDULER_TASK_ID = "$schedulerTaskId";
public static final String CTX_SCHEDULE_NAME = "$scheduleName";
public static final String CTX_EXECUTION_DESCRIPTION = "$executionDescription";
Expand All @@ -66,7 +67,7 @@ public class MeasurementPlugin extends AbstractExecutionEnginePlugin {

// These are used by the MeasurementControllerPlugin to "reconstruct" measures from measurements, and indicate the
// "internal" fields which should NOT be added to the measure data field. Keep this in sync with the fields defined above.
static final Set<String> MEASURE_NOT_DATA_KEYS = Set.of("_id", "project", "projectName", ATTRIBUTE_EXECUTION_ID, RN_ID,
static final Set<String> MEASURE_NOT_DATA_KEYS = Set.of("_id", PROJECT, "projectName", ATTRIBUTE_EXECUTION_ID, RN_ID,
ORIGIN, RN_STATUS, PLAN_ID, PLAN, AGENT_URL, TASK_ID, SCHEDULE, TEST_CASE, EXECUTION_DESCRIPTION);
// Same use, but for defining which fields SHOULD be directly copied to the top-level fields of a measure.
static final Set<String> MEASURE_FIELDS = Set.of(NAME, BEGIN, VALUE, STATUS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import step.core.collections.*;
import step.core.collections.Collection;
import step.core.entities.EntityManager;
import step.plugins.measurements.MeasurementPlugin;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Stream;

public class MeasurementAccessor {
Expand Down Expand Up @@ -97,4 +95,8 @@ private static Document getRawMeasurement(Map o) {
document.remove(MeasurementPlugin.PLAN);
return document;
}

public void createOrUpdateCompoundIndex(LinkedHashSet<IndexField> indexFields) {
coll.createOrUpdateCompoundIndex(indexFields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class MetricsConstants {

public static final MetricAttribute STATUS_ATTRIBUTE = new MetricAttribute()

// The static method getAllAttributeNames is placed at the end of this class (to make sure all constant are initialized).
// It is used to register all metric as supported timeseries fields.
// WARNING: Do not forget to update this method when adding new constants

public static final MetricAttribute STATUS_ATTRIBUTE = new MetricAttribute()
.setName("rnStatus")
.setType(MetricAttributeType.TEXT)
.setMetadata(Map.of("knownValues", Arrays.asList("PASSED", "FAILED", "TECHNICAL_ERROR", "INTERRUPTED")))
Expand Down Expand Up @@ -49,4 +55,21 @@ public class MetricsConstants {
.setName("result")
.setType(MetricAttributeType.TEXT)
.setDisplayName("Result");


public static String getAllAttributeNames() {
return Stream.of(
STATUS_ATTRIBUTE,
TYPE_ATRIBUTE,
TASK_ATTRIBUTE,
EXECUTION_ATTRIBUTE,
PLAN_ATTRIBUTE,
NAME_ATTRIBUTE,
ERROR_CODE_ATTRIBUTE,
EXECUTION_BOOLEAN_RESULT,
EXECUTION_RESULT
)
.map(MetricAttribute::getName)
.collect(Collectors.joining(","));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

import java.util.*;

import static step.core.timeseries.TimeSeriesConstants.ATTRIBUTES_PREFIX;
import static step.core.timeseries.TimeSeriesConstants.TIMESTAMP_ATTRIBUTE;
import static step.plugins.measurements.MeasurementPlugin.ATTRIBUTE_EXECUTION_ID;
import static step.plugins.timeseries.MetricsConstants.*;
import static step.plugins.timeseries.TimeSeriesExecutionPlugin.*;
Expand All @@ -45,7 +47,9 @@ public class TimeSeriesControllerPlugin extends AbstractControllerPlugin {
private static final Logger logger = LoggerFactory.getLogger(TimeSeriesControllerPlugin.class);
public static final String TIME_SERIES_MAIN_COLLECTION = "timeseries";
public static final String TIME_SERIES_ATTRIBUTES_PROPERTY = "timeseries.attributes";
public static final String TIME_SERIES_ATTRIBUTES_DEFAULT = EXECUTION_ID + "," + TASK_ID + "," + PLAN_ID + ",metricType,origin,name,rnStatus,project,type";
//We should review the usage of the following property default value, it is technically possible to set the value in properties: not sure if that really work.
//This is used to determine if we fall back to RAW measurement when we filter or group by fields that are not supported by time-series and when reingesting timeseries from RAW measurements
public static final String TIME_SERIES_ATTRIBUTES_DEFAULT = MetricsConstants.getAllAttributeNames() + ",metricType,origin,project";

// Following properties are used by the UI. In the future we could remove the prefix 'plugins.' to align with other properties
public static final String PARAM_KEY_EXECUTION_DASHBOARD_ID = "plugins.timeseries.execution.dashboard.id";
Expand Down Expand Up @@ -123,10 +127,21 @@ public ExecutionEnginePlugin getExecutionEnginePlugin() {
public void initializeData(GlobalContext context) throws Exception {
super.initializeData(context);
timeSeries.createIndexes(new LinkedHashSet<>(List.of(new IndexField(ATTRIBUTE_EXECUTION_ID, Order.ASC, String.class))));
IndexField metricTypeIndexField = new IndexField(ATTRIBUTES_PREFIX + METRIC_TYPE, Order.ASC, String.class);
IndexField beginIndexField = new IndexField(TIMESTAMP_ATTRIBUTE, Order.ASC, Long.class);
timeSeries.createCompoundIndex(new LinkedHashSet<>(List.of(
new IndexField("attributes.taskId", Order.ASC, String.class),
new IndexField("attributes.metricType", Order.ASC, String.class),
new IndexField("begin", Order.ASC, String.class)
metricTypeIndexField,
beginIndexField
)));
timeSeries.createCompoundIndex(new LinkedHashSet<>(List.of(
new IndexField(ATTRIBUTES_PREFIX + TASK_ATTRIBUTE.getName(), Order.ASC, String.class),
metricTypeIndexField,
beginIndexField
)));
timeSeries.createCompoundIndex(new LinkedHashSet<>(List.of(
new IndexField(ATTRIBUTES_PREFIX + PLAN_ATTRIBUTE.getName(), Order.ASC, String.class),
metricTypeIndexField,
beginIndexField
)));

List<MetricType> metrics = createOrUpdateMetrics(context.require(MetricTypeAccessor.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ public void initializeExecutionContext(ExecutionEngineContext executionEngineCon
super.initializeExecutionContext(executionEngineContext, executionContext);
TimeSeriesIngestionPipeline mainIngestionPipeline = timeSeries.getIngestionPipeline();
TreeMap<String, String> additionalAttributes = executionContext.getObjectEnricher().getAdditionalAttributes();
TimeSeriesIngestionPipeline ingestionPipeline = new TimeSeriesIngestionPipeline(null, new TimeSeriesIngestionPipelineSettings()) {
//Crete a wrapper of the ingestion pipeline to automatically enrich data with execution attributes
//This approach is quite error-prone and should be refactored
TimeSeriesIngestionPipeline ingestionPipeline = new TimeSeriesIngestionPipeline(null,
new TimeSeriesIngestionPipelineSettings().setResolution(mainIngestionPipeline.getResolution())) {
@Override
public void ingestPoint(Map<String, Object> attributes, long timestamp, long value) {
attributes.putAll(additionalAttributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,19 @@ public TimeSeriesAPIResponse getMeasurements(@NotNull FetchBucketsRequest reques
}

private void enrichRequest(FetchBucketsRequest request) {
request.setOqlFilter(enrichOqlFilter(request.getOqlFilter()));
request.setOqlFilter(enrichOqlFilter(request.getOqlFilter(), request.isIncludeGlobalEntities()));
if (request.getMaxNumberOfSeries() <= 0) {
request.setMaxNumberOfSeries(maxNumberOfSeries);
}
}

private String enrichOqlFilter(String oqlFilter) {
String additionalOqlFilter = getObjectFilter().getOQLFilter();
private String enrichOqlFilter(String oqlFilter, boolean includeGlobalEntities) {
String additionalOqlFilter = "";
if (includeGlobalEntities) {
additionalOqlFilter = getObjectFilter().getOQLFilter();
} else {
additionalOqlFilter = getRestrictedObjectFilter().getOQLFilter();
}
if (StringUtils.isNotEmpty(additionalOqlFilter)) {
return (StringUtils.isNotEmpty(oqlFilter)) ?
oqlFilter + " and (" + additionalOqlFilter + ")" :
Expand Down Expand Up @@ -155,8 +160,9 @@ public boolean timeSeriesIsBuilt(@PathParam("executionId") String executionId) {
@Path("/measurements-fields")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Set<String> getMeasurementsAttributes(@QueryParam("filter") String oqlFilter) {
oqlFilter = enrichOqlFilter(oqlFilter);
public Set<String> getMeasurementsAttributes(@QueryParam("filter") String oqlFilter,
@DefaultValue("false") @QueryParam("includeGlobalEntities") boolean includeGlobalEntities) {
oqlFilter = enrichOqlFilter(oqlFilter, includeGlobalEntities);
return handler.getMeasurementsAttributes(oqlFilter);
}

Expand All @@ -168,9 +174,10 @@ public Set<String> getMeasurementsAttributes(@QueryParam("filter") String oqlFil
public List<Measurement> discoverMeasurements(
@QueryParam("filter") String oqlFilter,
@QueryParam("limit") int limit,
@QueryParam("skip") int skip
@QueryParam("skip") int skip,
@DefaultValue("false") @QueryParam("includeGlobalEntities") boolean includeGlobalEntities
) {
oqlFilter = enrichOqlFilter(oqlFilter);
oqlFilter = enrichOqlFilter(oqlFilter, includeGlobalEntities);
return handler.getRawMeasurements(oqlFilter, skip, limit);
}

Expand All @@ -179,8 +186,9 @@ public List<Measurement> discoverMeasurements(
@Path("/raw-measurements/stats")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public MeasurementsStats getRawMeasurementsStats(@QueryParam("filter") String oqlFilter) {
oqlFilter = enrichOqlFilter(oqlFilter);
public MeasurementsStats getRawMeasurementsStats(@QueryParam("filter") String oqlFilter,
@DefaultValue("false") @QueryParam("includeGlobalEntities") boolean includeGlobalEntities) {
oqlFilter = enrichOqlFilter(oqlFilter, includeGlobalEntities);
return handler.getRawMeasurementsStats(oqlFilter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class FetchBucketsRequest {
private Set<String> collectAttributeKeys;
private int collectAttributesValuesLimit;
private int maxNumberOfSeries;
private boolean includeGlobalEntities; //If not set default is false

public Long getStart() {
return start;
Expand Down Expand Up @@ -104,4 +105,12 @@ public int getMaxNumberOfSeries() {
public void setMaxNumberOfSeries(int maxNumberOfSeries) {
this.maxNumberOfSeries = maxNumberOfSeries;
}

public boolean isIncludeGlobalEntities() {
return includeGlobalEntities;
}

public void setIncludeGlobalEntities(boolean includeGlobalEntities) {
this.includeGlobalEntities = includeGlobalEntities;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ protected ObjectFilter getObjectFilter() {
return objectHookRegistry.getObjectFilter(getSession());
}

protected ObjectFilter getRestrictedObjectFilter() {
Session<User> session = new RestrictedScopeSession(getSession());
return objectHookRegistry.getObjectFilter(session);
}

protected ObjectPredicate getObjectPredicate(){
return objectHookRegistry.getObjectPredicate(getSession());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public void serverStart(GlobalContext context) throws Exception {
migrationManager.register(V27_4_DropResolvedPlanNodesIndexForPSQLMigrationTask.class);
migrationManager.register(V28_0_FixEmptyDefaultMavenSettingsMigrationTask.class);
migrationManager.register(V29_0_UpdateAutomationPackageModel.class);
migrationManager.register(V29_2_TimeSeriesNewIndexes.class);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package step.migration.tasks;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import step.core.Version;

import step.core.collections.Collection;
import step.core.collections.CollectionFactory;
import step.core.collections.Document;
import step.core.collections.postgresql.PostgreSQLCollection;
import step.migration.MigrationContext;
import step.migration.MigrationTask;

import java.util.List;

public class V29_2_TimeSeriesNewIndexes extends MigrationTask {


private static final Logger log = LoggerFactory.getLogger(V29_2_TimeSeriesNewIndexes.class);

public V29_2_TimeSeriesNewIndexes(CollectionFactory collectionFactory, MigrationContext migrationContext) {
super(new Version(3,29,2), collectionFactory, migrationContext);
}

@Override
public void runUpgradeScript() {
// Compound index created for timeseries collection with metricType, taskId and begin where wrongly using a string type for begin, we need to remove them.
// The correct ones are then created automatically in the initializeData hook as for a fresh setup
// This type of the field is only relevant for the PSQL indexes
List<String> suffix = List.of("", "_minute", "_hour", "_day", "_week");
suffix.forEach(s -> {
String collectionName = "timeseries" + s;
String indexName = "idx_" + collectionName + "_attributes_taskidasc_attributes_metrictypeasc_beginasc";
Collection<Document> collection = collectionFactory.getCollection(collectionName, Document.class);
if (collection instanceof PostgreSQLCollection) {
collection.dropIndex(indexName);
log.info("Time-series index migration - dropped index {}", indexName);
}
});
}

@Override
public void runDowngradeScript() {

}
}
2 changes: 1 addition & 1 deletion step-core/src/main/java/step/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package step.core;

public interface Constants {
String STEP_API_VERSION_STRING = "3.29.1";
String STEP_API_VERSION_STRING = "3.29.2";
Version STEP_API_VERSION = new Version(STEP_API_VERSION_STRING);

String STEP_YAML_SCHEMA_VERSION_STRING = "1.2.0";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
import step.core.timeseries.bucket.BucketAttributes;
import step.core.timeseries.ingestion.TimeSeriesIngestionPipeline;

import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static step.core.timeseries.TimeSeriesConstants.ATTRIBUTES_PREFIX;
import static step.core.timeseries.TimeSeriesConstants.TIMESTAMP_ATTRIBUTE;

public class ReportNodeTimeSeries implements AutoCloseable {

public static final String CONF_KEY_REPORT_NODE_TIME_SERIES_ENABLED = "execution.engine.reportnodes.timeseries.enabled";
Expand All @@ -40,6 +44,15 @@ public ReportNodeTimeSeries(CollectionFactory collectionFactory, TimeSeriesColle
timeSeries = new TimeSeriesBuilder().registerCollections(timeSeriesCollections).build();
ingestionPipeline = timeSeries.getIngestionPipeline();
timeSeries.createIndexes(Set.of(new IndexField(EXECUTION_ID, Order.ASC, String.class)));
IndexField beginIndexField = new IndexField(TIMESTAMP_ATTRIBUTE, Order.ASC, Long.class);
timeSeries.createCompoundIndex(new LinkedHashSet<>(List.of(
new IndexField(ATTRIBUTES_PREFIX + "taskId", Order.ASC, String.class),
beginIndexField
)));
timeSeries.createCompoundIndex(new LinkedHashSet<>(List.of(
new IndexField(ATTRIBUTES_PREFIX + "planId", Order.ASC, String.class),
beginIndexField
)));
this.ingestionEnabled = ingestionEnabled;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public List<TimeSeriesCollection> getTimeSeriesCollections(String mainCollection
List<TimeSeriesCollection> enabledCollections = new ArrayList<>();
int flushSeriesQueueSize = collectionsSettings.getFlushSeriesQueueSize();
int flushAsyncQueueSize = collectionsSettings.getFlushAsyncQueueSize();
addIfEnabled(enabledCollections, mainCollectionName, Duration.ofSeconds(1), collectionsSettings.getMainFlushInterval(), flushSeriesQueueSize, flushAsyncQueueSize,null, true);
addIfEnabled(enabledCollections, mainCollectionName, Duration.ofMillis(collectionsSettings.getMainResolution()), collectionsSettings.getMainFlushInterval(), flushSeriesQueueSize, flushAsyncQueueSize,null, true);
addIfEnabled(enabledCollections, mainCollectionName + TIME_SERIES_SUFFIX_PER_MINUTE, Duration.ofMinutes(1), collectionsSettings.getPerMinuteFlushInterval(), flushSeriesQueueSize, flushAsyncQueueSize,null, collectionsSettings.isPerMinuteEnabled());
addIfEnabled(enabledCollections, mainCollectionName + TIME_SERIES_SUFFIX_HOURLY, Duration.ofHours(1), collectionsSettings.getHourlyFlushInterval(), flushSeriesQueueSize, flushAsyncQueueSize, ignoredAttributesForHighResolution, collectionsSettings.isHourlyEnabled());
addIfEnabled(enabledCollections, mainCollectionName + TIME_SERIES_SUFFIX_DAILY, Duration.ofDays(1), collectionsSettings.getDailyFlushInterval(), flushSeriesQueueSize, flushAsyncQueueSize, ignoredAttributesForHighResolution, collectionsSettings.isDailyEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ public static TimeSeriesCollectionsSettings buildSingleResolutionSettings(long m
timeSeriesCollectionsSettings.setWeeklyEnabled(false);
timeSeriesCollectionsSettings.setMainResolution(mainResolution);
timeSeriesCollectionsSettings.setMainFlushInterval(mainFlushInterval);
timeSeriesCollectionsSettings.setFlushSeriesQueueSize(20000);
return timeSeriesCollectionsSettings;
}

Expand Down