From aad93d339dbc541445d6d4580b231504a71fbd33 Mon Sep 17 00:00:00 2001 From: David Stephan Date: Mon, 1 Dec 2025 13:29:37 +0100 Subject: [PATCH 1/7] SED-4415 optimize-reporting-request-and-query-performance --- .../core/timeseries/TimeSeriesCollection.java | 4 + .../TimeSeriesAggregationPipeline.java | 10 ++ .../TimeSeriesAggregationQuery.java | 13 ++ .../step/core/timeseries/bucket/Bucket.java | 15 ++ .../TimeSeriesAggergationQueryTest.java | 144 +++++++++++++++++- 5 files changed, 179 insertions(+), 7 deletions(-) diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java index 0a1bcf40..d67c59d3 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java @@ -188,4 +188,8 @@ public void save(Iterable entities) { protected void drop() { mainCollection.drop(); } + + protected Collection getMainCollection() { + return mainCollection; + } } diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationPipeline.java b/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationPipeline.java index fc5fd861..c46039c0 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationPipeline.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationPipeline.java @@ -64,6 +64,9 @@ private Set collectAllUsedAttributes(TimeSeriesAggregationQuery query) { * 3. Go backward from the resolution obtained above and choose the first collection which handle all the attributes */ public TimeSeriesAggregationResponse collect(TimeSeriesAggregationQuery query) { + if (logger.isDebugEnabled()){ + logger.debug("Aggregation query: {}", query); + } validateQuery(query); Set usedAttributes = collectAllUsedAttributes(query).stream().map(a -> a.replace("attributes.", "")).collect(Collectors.toSet()); long queryFrom = query.getFrom() != null ? query.getFrom() : 0; @@ -83,6 +86,10 @@ public TimeSeriesAggregationResponse collect(TimeSeriesAggregationQuery query) { long sourceResolution = idealAvailableCollection.getResolution(); TimeSeriesProcessedParams finalParams = processQueryParams(query, sourceResolution); + if (logger.isDebugEnabled()) { + logger.debug("Ideal resolution calculated: {}, source resolution covering the data: {}", idealResolution, sourceResolution); + } + Map> seriesBuilder = new HashMap<>(); LongAdder bucketCount = new LongAdder(); @@ -113,6 +120,9 @@ public TimeSeriesAggregationResponse collect(TimeSeriesAggregationQuery query) { Map> result = seriesBuilder.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, i -> i.getValue().build())))); + if (logger.isDebugEnabled()) { + logger.debug("Processed query results in " + (System.currentTimeMillis() - t2) + "ms. Results size: " + result.size()); + } return new TimeSeriesAggregationResponseBuilder() .setSeries(result) .setStart(finalParams.getFrom()) diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationQuery.java b/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationQuery.java index aca023bf..4d915257 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationQuery.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationQuery.java @@ -94,4 +94,17 @@ public TimeSeriesAggregationQuery setBucketsResolution(@Nullable Long bucketsRes public TimeSeriesOptimizationType getOptimizationType() { return optimizationType; } + + @Override + public String toString() { + return "TimeSeriesAggregationQuery{" + + "optimizationType=" + optimizationType + + ", groupDimensions=" + groupDimensions + + ", bucketsCount=" + bucketsCount + + ", bucketsResolution=" + bucketsResolution + + ", shrink=" + shrink + + ", collectAttributeKeys=" + collectAttributeKeys + + ", collectAttributesValuesLimit=" + collectAttributesValuesLimit + + '}'; + } } diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/bucket/Bucket.java b/step-framework-timeseries/src/main/java/step/core/timeseries/bucket/Bucket.java index fdba0781..ad74ca4d 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/bucket/Bucket.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/bucket/Bucket.java @@ -115,4 +115,19 @@ public long getPercentile(double percentile) { } return percentileValue; } + + @Override + public String toString() { + return "Bucket{" + + "begin=" + begin + + ", end=" + end + + ", attributes=" + attributes + + ", count=" + count + + ", sum=" + sum + + ", min=" + min + + ", max=" + max + + ", pclPrecision=" + pclPrecision + + ", distribution=" + distribution + + '}'; + } } diff --git a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesAggergationQueryTest.java b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesAggergationQueryTest.java index 3bb704a4..85405104 100644 --- a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesAggergationQueryTest.java +++ b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesAggergationQueryTest.java @@ -2,20 +2,31 @@ import org.junit.Assert; import org.junit.Test; +import step.core.collections.Filters; import step.core.ql.OQLFilterBuilder; import step.core.timeseries.aggregation.TimeSeriesAggregationPipeline; import step.core.timeseries.aggregation.TimeSeriesAggregationQuery; import step.core.timeseries.aggregation.TimeSeriesAggregationQueryBuilder; import step.core.timeseries.aggregation.TimeSeriesAggregationResponse; import step.core.timeseries.bucket.Bucket; +import step.core.timeseries.bucket.BucketAttributes; import step.core.timeseries.ingestion.TimeSeriesIngestionPipeline; import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; public class TimeSeriesAggergationQueryTest extends TimeSeriesBaseTest { + public static final long DURATION_T1_PASSED = 10L; + public static final long DURATION_T1_FAILED = 20L; + public static final long DURATION_T2_PASSED = 30L; + public static final long DURATION_T2_FAILED = 40L; + @Test(expected = IllegalArgumentException.class) public void lowInvalidResolutionTest() { TimeSeries timeSeries = getNewTimeSeries(10); @@ -47,10 +58,10 @@ public void shrinkTest() { TimeSeries timeSeries = getNewTimeSeries(10); try (TimeSeriesIngestionPipeline ingestionPipeline = timeSeries.getIngestionPipeline()) { - ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "PASSED"), 1L, 10L); - ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "FAILED"), 2L, 10L); - ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "PASSED"), 1L, 10L); - ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "FAILED"), 2L, 10L); + ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "PASSED"), 1L, DURATION_T1_PASSED); + ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "FAILED"), 2L, DURATION_T1_PASSED); + ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "PASSED"), 1L, DURATION_T1_PASSED); + ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "FAILED"), 2L, DURATION_T1_PASSED); } TimeSeriesAggregationQuery query = new TimeSeriesAggregationQueryBuilder() @@ -59,9 +70,9 @@ public void shrinkTest() { TimeSeriesAggregationPipeline pipeline = timeSeries.getAggregationPipeline(); TimeSeriesAggregationResponse response = pipeline.collect(query); Assert.assertEquals(0, response.getStart()); - Assert.assertTrue(response.getResolution() > System.currentTimeMillis() - 3_000); + assertTrue(response.getResolution() > System.currentTimeMillis() - 3_000); response.getSeries().values().forEach(map -> { - Assert.assertTrue(map.values().size() <= 1); + assertTrue(map.values().size() <= 1); }); } @@ -181,4 +192,123 @@ public void responseRangeWithCustomWindowTest() { Assert.assertEquals(5000, response.getResolution()); } + @Test + public void aggregationTestWithResolutionChanges() { + int resolution = 1000; + int iterations = 1000; + TimeSeries timeSeries = getNewTimeSeries(1000); + Random random = new Random(); + long end = System.currentTimeMillis(); + long start = end - (iterations * resolution); + //Ingest some data + try (TimeSeriesIngestionPipeline ingestionPipeline = timeSeries.getIngestionPipeline()) { + for (int i = 0; i < iterations; i++ ) { + long relativeStart = start + (i * resolution); + int startOffset = resolution / 4; + ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "PASSED"), relativeStart + random.nextInt(startOffset), DURATION_T1_PASSED); + ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "FAILED"), relativeStart + startOffset + random.nextInt(startOffset), DURATION_T1_FAILED); + ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "PASSED"), relativeStart + 2*startOffset + random.nextInt(startOffset), DURATION_T2_PASSED); + ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "FAILED"), relativeStart + 3*startOffset + random.nextInt(startOffset), DURATION_T2_FAILED); + } + } + aggregationTest(timeSeries, resolution, iterations, start, end); + + //Change timeSerie resolution on the fly to make sure this doesn't induce aggregation errors + int newResolution = 5000; + TimeSeriesCollection collection = new TimeSeriesCollection(timeSeries.getDefaultCollection().getMainCollection(), newResolution); + TimeSeries timeSeriesHigherResolution = new TimeSeriesBuilder().registerCollection(collection).build(); + aggregationTest(timeSeriesHigherResolution, newResolution, iterations, start, end); + } + + + public void aggregationTest(TimeSeries timeSeries, int resolution, int iterations, long start, long end) { + + //Test aggregation with a single time bucket + TimeSeriesAggregationQuery query = new TimeSeriesAggregationQueryBuilder() + .split(1) + .build(); + TimeSeriesAggregationPipeline pipeline = timeSeries.getAggregationPipeline(); + TimeSeriesAggregationResponse response = pipeline.collect(query); + Assert.assertEquals(0, response.getStart()); + //query has no time range, so the effective resolution (range used by the aggregation) is from 0 to now + assertTrue(response.getResolution() > System.currentTimeMillis() - 3_000); + assertEquals(resolution, response.getCollectionResolution()); + Map> series = response.getSeries(); + assertEquals(1, series.size()); + assertEquals(0, series.keySet().stream().findFirst().orElseThrow().size()); + Map longBucketMap = series.values().stream().findFirst().orElseThrow(); + assertEquals(1, longBucketMap.size()); + assertEquals(Long.valueOf(0), longBucketMap.keySet().stream().findFirst().orElseThrow()); + Bucket bucket = longBucketMap.values().stream().findFirst().orElseThrow(); + assertEquals(iterations * 4, bucket.getCount()); + assertEquals((iterations * (DURATION_T1_FAILED + DURATION_T1_PASSED + DURATION_T2_FAILED + DURATION_T2_PASSED)), bucket.getSum()); + assertEquals(DURATION_T1_PASSED, bucket.getMin()); + assertEquals(DURATION_T2_FAILED, bucket.getMax()); + + //Test aggregation with filter, grouping and 100 time buckets (since the response resolution it 10 seconds, same response is expected with source 1 and 5 seconds + testWithManyTimeBuckets(resolution, iterations, start, end, pipeline, 100); + + //Test aggregation with filter, grouping and 1000 time buckets -> response resolution is 1 second for source 1 second, but will be 5s for the 5s source resolution + testWithManyTimeBuckets(resolution, iterations, start, end, pipeline, 1000); + } + + private void testWithManyTimeBuckets(int sourceResolution, int iterations, long start, long end, TimeSeriesAggregationPipeline pipeline, int targetBucketCount) { + TimeSeriesAggregationQuery query; + TimeSeriesAggregationResponse response; + Map> series; + query = new TimeSeriesAggregationQueryBuilder() + .split(targetBucketCount) + .withFilter(Filters.equals("attributes.name", "t1")) + .withGroupDimensions(Set.of("status")) + .range(start, end) + .build(); + long expectedResponseResolution = (end - start) / targetBucketCount; + expectedResponseResolution = (expectedResponseResolution < sourceResolution) ? sourceResolution : expectedResponseResolution; + long expectedBucketsCount = (end - start) / expectedResponseResolution; + response = pipeline.collect(query); + Assert.assertEquals(expectedResponseResolution, response.getResolution()); + assertEquals(sourceResolution, response.getCollectionResolution()); + series = response.getSeries(); + assertEquals(2, series.size()); + Map failedSeries = series.get(Map.of("status", "FAILED")); + Map passedSeries = series.get(Map.of("status", "PASSED")); + validateSeries(failedSeries, DURATION_T1_FAILED, iterations, expectedBucketsCount); + validateSeries(passedSeries, DURATION_T1_PASSED, iterations, expectedBucketsCount); + } + + private void validateSeries(Map series, long expectedDuration, int iterations, long targetBucketCount) { + assertNotNull(series); + long expectedCountPerBucket = iterations / targetBucketCount; + long expectedCountPerBucketMin = expectedCountPerBucket - 1; + long expectedCountPerBucketMax = expectedCountPerBucket + 1; + //depending on the start and end time we might get back more buckets than requested to cover the range + assertTrue("Series should have max target buckets +1 " + targetBucketCount + " time buckets, but size was: " + series.size() + "Series content: " + series, series.size() <= (targetBucketCount+1)); + //For response resolution equals source resolution, i.e. 1 second with count between 0 and 2 we might have significantly less time bucket because the one with 0 are not included + if (expectedCountPerBucket == 1) { + assertTrue("Series does not have sufficient number of buckets, target " + targetBucketCount + ", but size was: " + series.size() + "Series content: " + series, series.size() >= targetBucketCount*0.7); + } else { + assertTrue("Series should have at least the target buckets count, target " + targetBucketCount + ", but size was: " + series.size() + "Series content: " + series, series.size() >= targetBucketCount); + } + AtomicInteger bucketIdx = new AtomicInteger(0); + AtomicLong totalSum = new AtomicLong(0); + AtomicLong totalCount = new AtomicLong(0); + series.entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(e -> { + Bucket bucket = e.getValue(); + int currentIdx = bucketIdx.getAndIncrement(); + //Edge bucket can have lower c counts + if (currentIdx > 0 && currentIdx < targetBucketCount ) { + assertTrue("Count should be between " + expectedCountPerBucketMin + " and " + expectedCountPerBucketMax + " but was " + bucket.getCount() + ". Current idx: " + currentIdx + + "next bucket: " + series, expectedCountPerBucketMin <= bucket.getCount() && bucket.getCount() <= expectedCountPerBucketMax); + assertTrue((expectedCountPerBucketMin) * expectedDuration <= bucket.getSum() && bucket.getSum() <= (expectedCountPerBucketMax) * expectedDuration); + } + assertEquals(expectedDuration, bucket.getMin()); + assertEquals(expectedDuration, bucket.getMax()); + totalSum.addAndGet(bucket.getSum()); + totalCount.addAndGet(bucket.getCount()); + }); + assertEquals(expectedDuration*iterations, totalSum.get()); + assertEquals(iterations, totalCount.get()); + System.out.println("totalSum: " + totalSum.get() + ", totalCount: " + totalCount.get() + ", #buckets: " + series.size()); + } + } From b28d93bda424f5c573ef8f919cab5c44a8a0a80f Mon Sep 17 00:00:00 2001 From: David Stephan Date: Tue, 2 Dec 2025 16:48:35 +0100 Subject: [PATCH 2/7] SED-4415 changing ideal resolution selection --- .../TimeSeriesAggregationPipeline.java | 64 +++++++++++++------ ...eriesMultipleCollectionsAggregateTest.java | 14 ++-- 2 files changed, 54 insertions(+), 24 deletions(-) diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationPipeline.java b/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationPipeline.java index c46039c0..305e76db 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationPipeline.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationPipeline.java @@ -75,7 +75,7 @@ public TimeSeriesAggregationResponse collect(TimeSeriesAggregationQuery query) { if (query.getOptimizationType() == TimeSeriesOptimizationType.MOST_ACCURATE) { idealResolution = collections.get(0).getResolution(); // first collection with the best resolution } else { // most efficient - idealResolution = this.roundDownToAvailableResolution(getIdealResolution(query)); + idealResolution = getIdealResolution(query); } TimeSeriesCollection idealAvailableCollection = ttlEnabled ? chooseFirstAvailableCollectionBasedOnTTL(idealResolution, query) : this.collections.get(this.resolutionsIndexes.get(idealResolution)); idealAvailableCollection = chooseLastCollectionWhichHandleAttributes(idealAvailableCollection.getResolution(), usedAttributes); @@ -135,16 +135,6 @@ public TimeSeriesAggregationResponse collect(TimeSeriesAggregationQuery query) { .build(); } - private long roundDownToAvailableResolution(long targetResolution) { - List availableResolutions = getAvailableResolutions(); - for (int i = 1; i < availableResolutions.size(); i++) { - if (availableResolutions.get(i) > targetResolution) { - return availableResolutions.get(i - 1); - } - } - return availableResolutions.get(availableResolutions.size() - 1); // return last resolution - } - private TimeSeriesProcessedParams processQueryParams(TimeSeriesAggregationQuery query, long sourceResolution) { if (query.getFrom() == null) { throw new IllegalArgumentException("From parameters must be specified"); @@ -266,17 +256,51 @@ private long getIdealResolution(TimeSeriesAggregationQuery query) { Long bucketsResolution = query.getBucketsResolution(); long queryTo = query.getTo() != null ? query.getTo() : System.currentTimeMillis(); long requestedRange = queryTo - query.getFrom(); - long idealResolution; - if (query.isShrink()) { - idealResolution = requestedRange / idealResponseIntervals; - } else if (bucketsCount != null && bucketsCount > 0) { - idealResolution = requestedRange / bucketsCount; + // First determine how many time buckets should ideally be returned + long idealBucketCount = idealResponseIntervals; + if (bucketsCount != null && bucketsCount > 0) { + idealBucketCount = bucketsCount; } else if (bucketsResolution != null && bucketsResolution > 0) { - idealResolution = bucketsResolution; - } else { - idealResolution = requestedRange / idealResponseIntervals; + idealBucketCount = requestedRange / bucketsResolution; + if (idealBucketCount <= 0) { + idealBucketCount = 1; // at least one bucket + } + } + // Then find the resolution which would be closest to that number of bucket + return findClosestResolution(requestedRange, idealBucketCount); + } + + private long findClosestResolution(long requestedRange, long idealBucketCount) { + List availableResolutions = getAvailableResolutions(); + if (availableResolutions == null || availableResolutions.isEmpty()) { + throw new IllegalStateException("No available resolutions configured"); + } + long bestResolution = availableResolutions.get(0); + long bestDiff = Long.MAX_VALUE; + long bestBuckets = -1; + + for (Long resolution : availableResolutions) { + if (resolution == null || resolution <= 0) { + continue; + } + + long buckets = requestedRange / resolution; + if (buckets <= 0) { + buckets = 1; // clamp to at least one bucket + } + + long diff = Math.abs(buckets - idealBucketCount); + + // Pick the resolution whose bucket count is closest to idealBucketCount. + // Tie-breaker: prefer fewer buckets (i.e. coarser resolution) to reduce load. + if (diff < bestDiff || (diff == bestDiff && buckets < bestBuckets)) { + bestDiff = diff; + bestBuckets = buckets; + bestResolution = resolution; + } } - return idealResolution; + + return bestResolution; } private boolean collectionTtlCoverInterval(TimeSeriesCollection collection, long from, long to) { diff --git a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesMultipleCollectionsAggregateTest.java b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesMultipleCollectionsAggregateTest.java index aa6c74b7..3f0d6de4 100644 --- a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesMultipleCollectionsAggregateTest.java +++ b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesMultipleCollectionsAggregateTest.java @@ -19,36 +19,42 @@ public void testChosenResolutionBasedOnRange() { timeSeries.getDefaultCollection().save(b1); timeSeries.ingestDataForEmptyCollections(); TimeSeriesAggregationPipeline aggregationPipeline = timeSeries.getAggregationPipeline(); + //Requested range 10_000_000 -> select coarser resolution TimeSeriesAggregationQuery query = new TimeSeriesAggregationQueryBuilder() .range(now - 10_000 * 1000, now) .build(); TimeSeriesAggregationResponse response = aggregationPipeline.collect(query); - Assert.assertEquals(10000, response.getCollectionResolution()); + Assert.assertEquals(10_000, response.getCollectionResolution()); + //Requested range 5000 -> select finest resolution query = new TimeSeriesAggregationQueryBuilder() .range(now - 5000, now) .build(); response = aggregationPipeline.collect(query); + //Requested range 1000 -> select finest resolution Assert.assertEquals(1000, response.getCollectionResolution()); query = new TimeSeriesAggregationQueryBuilder() .range(now - 1000, now) .build(); response = aggregationPipeline.collect(query); Assert.assertEquals(1000, response.getCollectionResolution()); + //Requested range 500_001 -> select resolution closed to 100 buckets (1000 -> 500 buckets, 5000 -> 100 buckest (winner)) query = new TimeSeriesAggregationQueryBuilder() .range(now - 5000 * AGGREGATION_RESOLUTION_LAMBDA + 1, now) .build(); response = aggregationPipeline.collect(query); - Assert.assertEquals(1000, response.getCollectionResolution()); + Assert.assertEquals(5000, response.getCollectionResolution()); + //Requested range 499_999 -> select resolution closed to 100 buckets (1000 -> 500 buckets, 5000 -> 100 buckest (winner)) query = new TimeSeriesAggregationQueryBuilder() - .range(now - 5000 * AGGREGATION_RESOLUTION_LAMBDA, now) + .range(now - 5000 * AGGREGATION_RESOLUTION_LAMBDA-1, now) .build(); response = aggregationPipeline.collect(query); Assert.assertEquals(5000, response.getCollectionResolution()); + //Requested range 1_000_001 -> select resolution closed to 100 buckets (5000 -> 200 buckets, 10000 -> 100 buckest (winner)) query = new TimeSeriesAggregationQueryBuilder() .range(now - 10_000 * AGGREGATION_RESOLUTION_LAMBDA + 1, now) .build(); response = aggregationPipeline.collect(query); - Assert.assertEquals(5000, response.getCollectionResolution()); + Assert.assertEquals(10_000, response.getCollectionResolution()); query = new TimeSeriesAggregationQueryBuilder() .range(now - 10_000 * AGGREGATION_RESOLUTION_LAMBDA, now) .build(); From 69457073bd7455f8e2960b39062d512dbc131892 Mon Sep 17 00:00:00 2001 From: David Stephan Date: Wed, 3 Dec 2025 12:04:21 +0100 Subject: [PATCH 3/7] Revert "SED-4415 changing ideal resolution selection" This reverts commit b28d93bda424f5c573ef8f919cab5c44a8a0a80f. --- .../TimeSeriesAggregationPipeline.java | 64 ++++++------------- ...eriesMultipleCollectionsAggregateTest.java | 14 ++-- 2 files changed, 24 insertions(+), 54 deletions(-) diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationPipeline.java b/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationPipeline.java index 305e76db..c46039c0 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationPipeline.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationPipeline.java @@ -75,7 +75,7 @@ public TimeSeriesAggregationResponse collect(TimeSeriesAggregationQuery query) { if (query.getOptimizationType() == TimeSeriesOptimizationType.MOST_ACCURATE) { idealResolution = collections.get(0).getResolution(); // first collection with the best resolution } else { // most efficient - idealResolution = getIdealResolution(query); + idealResolution = this.roundDownToAvailableResolution(getIdealResolution(query)); } TimeSeriesCollection idealAvailableCollection = ttlEnabled ? chooseFirstAvailableCollectionBasedOnTTL(idealResolution, query) : this.collections.get(this.resolutionsIndexes.get(idealResolution)); idealAvailableCollection = chooseLastCollectionWhichHandleAttributes(idealAvailableCollection.getResolution(), usedAttributes); @@ -135,6 +135,16 @@ public TimeSeriesAggregationResponse collect(TimeSeriesAggregationQuery query) { .build(); } + private long roundDownToAvailableResolution(long targetResolution) { + List availableResolutions = getAvailableResolutions(); + for (int i = 1; i < availableResolutions.size(); i++) { + if (availableResolutions.get(i) > targetResolution) { + return availableResolutions.get(i - 1); + } + } + return availableResolutions.get(availableResolutions.size() - 1); // return last resolution + } + private TimeSeriesProcessedParams processQueryParams(TimeSeriesAggregationQuery query, long sourceResolution) { if (query.getFrom() == null) { throw new IllegalArgumentException("From parameters must be specified"); @@ -256,51 +266,17 @@ private long getIdealResolution(TimeSeriesAggregationQuery query) { Long bucketsResolution = query.getBucketsResolution(); long queryTo = query.getTo() != null ? query.getTo() : System.currentTimeMillis(); long requestedRange = queryTo - query.getFrom(); - // First determine how many time buckets should ideally be returned - long idealBucketCount = idealResponseIntervals; - if (bucketsCount != null && bucketsCount > 0) { - idealBucketCount = bucketsCount; + long idealResolution; + if (query.isShrink()) { + idealResolution = requestedRange / idealResponseIntervals; + } else if (bucketsCount != null && bucketsCount > 0) { + idealResolution = requestedRange / bucketsCount; } else if (bucketsResolution != null && bucketsResolution > 0) { - idealBucketCount = requestedRange / bucketsResolution; - if (idealBucketCount <= 0) { - idealBucketCount = 1; // at least one bucket - } - } - // Then find the resolution which would be closest to that number of bucket - return findClosestResolution(requestedRange, idealBucketCount); - } - - private long findClosestResolution(long requestedRange, long idealBucketCount) { - List availableResolutions = getAvailableResolutions(); - if (availableResolutions == null || availableResolutions.isEmpty()) { - throw new IllegalStateException("No available resolutions configured"); - } - long bestResolution = availableResolutions.get(0); - long bestDiff = Long.MAX_VALUE; - long bestBuckets = -1; - - for (Long resolution : availableResolutions) { - if (resolution == null || resolution <= 0) { - continue; - } - - long buckets = requestedRange / resolution; - if (buckets <= 0) { - buckets = 1; // clamp to at least one bucket - } - - long diff = Math.abs(buckets - idealBucketCount); - - // Pick the resolution whose bucket count is closest to idealBucketCount. - // Tie-breaker: prefer fewer buckets (i.e. coarser resolution) to reduce load. - if (diff < bestDiff || (diff == bestDiff && buckets < bestBuckets)) { - bestDiff = diff; - bestBuckets = buckets; - bestResolution = resolution; - } + idealResolution = bucketsResolution; + } else { + idealResolution = requestedRange / idealResponseIntervals; } - - return bestResolution; + return idealResolution; } private boolean collectionTtlCoverInterval(TimeSeriesCollection collection, long from, long to) { diff --git a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesMultipleCollectionsAggregateTest.java b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesMultipleCollectionsAggregateTest.java index 3f0d6de4..aa6c74b7 100644 --- a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesMultipleCollectionsAggregateTest.java +++ b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesMultipleCollectionsAggregateTest.java @@ -19,42 +19,36 @@ public void testChosenResolutionBasedOnRange() { timeSeries.getDefaultCollection().save(b1); timeSeries.ingestDataForEmptyCollections(); TimeSeriesAggregationPipeline aggregationPipeline = timeSeries.getAggregationPipeline(); - //Requested range 10_000_000 -> select coarser resolution TimeSeriesAggregationQuery query = new TimeSeriesAggregationQueryBuilder() .range(now - 10_000 * 1000, now) .build(); TimeSeriesAggregationResponse response = aggregationPipeline.collect(query); - Assert.assertEquals(10_000, response.getCollectionResolution()); - //Requested range 5000 -> select finest resolution + Assert.assertEquals(10000, response.getCollectionResolution()); query = new TimeSeriesAggregationQueryBuilder() .range(now - 5000, now) .build(); response = aggregationPipeline.collect(query); - //Requested range 1000 -> select finest resolution Assert.assertEquals(1000, response.getCollectionResolution()); query = new TimeSeriesAggregationQueryBuilder() .range(now - 1000, now) .build(); response = aggregationPipeline.collect(query); Assert.assertEquals(1000, response.getCollectionResolution()); - //Requested range 500_001 -> select resolution closed to 100 buckets (1000 -> 500 buckets, 5000 -> 100 buckest (winner)) query = new TimeSeriesAggregationQueryBuilder() .range(now - 5000 * AGGREGATION_RESOLUTION_LAMBDA + 1, now) .build(); response = aggregationPipeline.collect(query); - Assert.assertEquals(5000, response.getCollectionResolution()); - //Requested range 499_999 -> select resolution closed to 100 buckets (1000 -> 500 buckets, 5000 -> 100 buckest (winner)) + Assert.assertEquals(1000, response.getCollectionResolution()); query = new TimeSeriesAggregationQueryBuilder() - .range(now - 5000 * AGGREGATION_RESOLUTION_LAMBDA-1, now) + .range(now - 5000 * AGGREGATION_RESOLUTION_LAMBDA, now) .build(); response = aggregationPipeline.collect(query); Assert.assertEquals(5000, response.getCollectionResolution()); - //Requested range 1_000_001 -> select resolution closed to 100 buckets (5000 -> 200 buckets, 10000 -> 100 buckest (winner)) query = new TimeSeriesAggregationQueryBuilder() .range(now - 10_000 * AGGREGATION_RESOLUTION_LAMBDA + 1, now) .build(); response = aggregationPipeline.collect(query); - Assert.assertEquals(10_000, response.getCollectionResolution()); + Assert.assertEquals(5000, response.getCollectionResolution()); query = new TimeSeriesAggregationQueryBuilder() .range(now - 10_000 * AGGREGATION_RESOLUTION_LAMBDA, now) .build(); From d2fe3858119a6633fad4f5a1130baf5fc781d92e Mon Sep 17 00:00:00 2001 From: David Stephan Date: Fri, 5 Dec 2025 16:17:00 +0100 Subject: [PATCH 4/7] SED-4415 adding generic support for TS TTL settings --- .../main/java/step/core/timeseries/TimeSeries.java | 14 ++++++++++++++ .../step/core/timeseries/TimeSeriesBuilder.java | 4 ++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java index d3785ce8..831cdbce 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java @@ -47,6 +47,20 @@ public TimeSeries setTtlEnabled(boolean ttlEnabled) { return this; } + public void updateAllCollectionsTtl(boolean ttlEnabled, Map resolutionsToTtl) { + setTtlEnabled(ttlEnabled); + resolutionsToTtl.forEach((resolution, ttl) -> { + if (hasCollection(resolution)) { + getCollection(resolution).setTtl(ttl); + } + }); + } + public void updateCollectionTtl(long resolution, long ttl) { + if (hasCollection(resolution)) { + getCollection(resolution).setTtl(ttl); + } + } + /** * This method will ingest the data for all the resolutions which are empty. * Each collection is ingesting the data from the previous collection only. diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesBuilder.java b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesBuilder.java index f1f322c3..4330097a 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesBuilder.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesBuilder.java @@ -75,8 +75,8 @@ public TimeSeriesSettings getSettings() { } public TimeSeries build() { - if (handledCollections.isEmpty()) { - throw new IllegalArgumentException("At least one time series collection must be registered"); + if (handledCollections.isEmpty() || (handledCollections.get(0).getIgnoredAttributes() != null && !handledCollections.get(0).getIgnoredAttributes().isEmpty())) { + throw new IllegalArgumentException("At least one time series collection must be registered with a resolution interval lower than 1 hour."); } handledCollections.sort(Comparator.comparingLong(TimeSeriesCollection::getResolution)); validateResolutions(); From 898f5ee105f46f9a7f12c2d295df8ef0dd0846db Mon Sep 17 00:00:00 2001 From: David Stephan Date: Fri, 5 Dec 2025 16:33:32 +0100 Subject: [PATCH 5/7] SED-4415 adding generic support for TS TTL settings --- .../src/main/java/step/core/timeseries/TimeSeriesBuilder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesBuilder.java b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesBuilder.java index 4330097a..498d45f6 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesBuilder.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesBuilder.java @@ -75,8 +75,8 @@ public TimeSeriesSettings getSettings() { } public TimeSeries build() { - if (handledCollections.isEmpty() || (handledCollections.get(0).getIgnoredAttributes() != null && !handledCollections.get(0).getIgnoredAttributes().isEmpty())) { - throw new IllegalArgumentException("At least one time series collection must be registered with a resolution interval lower than 1 hour."); + if (handledCollections.isEmpty()) { + throw new IllegalArgumentException("At least one time series collection must be registered."); } handledCollections.sort(Comparator.comparingLong(TimeSeriesCollection::getResolution)); validateResolutions(); From 78cdd04d68fdffab2a77138c3099deb6973f278a Mon Sep 17 00:00:00 2001 From: David Stephan Date: Tue, 9 Dec 2025 10:55:37 +0100 Subject: [PATCH 6/7] SED-4415 PR feedbacks --- .../src/main/java/step/core/timeseries/TimeSeries.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java index 831cdbce..a05a09a9 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java @@ -68,10 +68,11 @@ public void updateCollectionTtl(long resolution, long ttl) { * If this fails by any reason, the entire collection is dropped. */ public void ingestDataForEmptyCollections() { + long controllerStart = System.currentTimeMillis(); logger.info("Configured collections: {}", handledCollections.stream().map(TimeSeriesCollection::getName).collect(Collectors.toList())); //This is run in as an async task; since data can be flushed in between, we need to get the list of empty collections before starting processing any of them List emptyCollections = handledCollections.stream().filter(TimeSeriesCollection::isEmpty).collect(Collectors.toList()); - logger.info("Empty collections detected: {}", emptyCollections.stream().map(TimeSeriesCollection::getName).collect(Collectors.toList())); + logger.warn("Empty collections detected: {}, do not stop the controller until it's re-ingestion is completed.", emptyCollections.stream().map(TimeSeriesCollection::getName).collect(Collectors.toList())); for (int i = 1; i < handledCollections.size(); i++) { TimeSeriesCollection collection = handledCollections.get(i); if (emptyCollections.contains(collection)) { @@ -85,7 +86,10 @@ public void ingestDataForEmptyCollections() { .setFlushAsyncQueueSize(5000); try (TimeSeriesIngestionPipeline ingestionPipeline = new TimeSeriesIngestionPipeline(collection, ingestionSettings)) { SearchOrder searchOrder = new SearchOrder(TIMESTAMP_ATTRIBUTE, 1); - Filter filter = collection.getTtl() > 0 ? Filters.gte("begin", System.currentTimeMillis() - collection.getTtl()): Filters.empty(); + Filter afterStart = Filters.lt("begin", controllerStart); + Filter filter = collection.getTtl() > 0 ? + Filters.and(List.of(afterStart, Filters.gte("begin", System.currentTimeMillis() - collection.getTtl()))) + : afterStart; try (Stream bucketStream = previousCollection.findLazy(filter, searchOrder)) { bucketStream.forEach(ingestionPipeline::ingestBucket); ingestionPipeline.flush(); From 3904198eaad441a2fafc289451941af4b1cf573d Mon Sep 17 00:00:00 2001 From: David Stephan Date: Wed, 10 Dec 2025 15:56:45 +0100 Subject: [PATCH 7/7] SED-4415 PR feedbacks --- .../src/main/java/step/core/timeseries/TimeSeries.java | 6 +----- .../java/step/core/timeseries/TimeSeriesCollection.java | 4 ---- .../core/timeseries/TimeSeriesAggergationQueryTest.java | 6 +++++- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java index a05a09a9..fd2692ce 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java @@ -49,11 +49,7 @@ public TimeSeries setTtlEnabled(boolean ttlEnabled) { public void updateAllCollectionsTtl(boolean ttlEnabled, Map resolutionsToTtl) { setTtlEnabled(ttlEnabled); - resolutionsToTtl.forEach((resolution, ttl) -> { - if (hasCollection(resolution)) { - getCollection(resolution).setTtl(ttl); - } - }); + resolutionsToTtl.forEach(this::updateCollectionTtl); } public void updateCollectionTtl(long resolution, long ttl) { if (hasCollection(resolution)) { diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java index 09877816..df439abd 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java @@ -192,8 +192,4 @@ public void save(Iterable entities) { protected void drop() { mainCollection.drop(); } - - protected Collection getMainCollection() { - return mainCollection; - } } diff --git a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesAggergationQueryTest.java b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesAggergationQueryTest.java index 85405104..6194699a 100644 --- a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesAggergationQueryTest.java +++ b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesAggergationQueryTest.java @@ -192,6 +192,10 @@ public void responseRangeWithCustomWindowTest() { Assert.assertEquals(5000, response.getResolution()); } + /** + * This test aims to validate that a collection created initially with a resolution (i.e. 1 seconds) + * and with data already ingested, can then be reconfigured and resued with a higher resolution (i.e. 5 seconds). + */ @Test public void aggregationTestWithResolutionChanges() { int resolution = 1000; @@ -215,7 +219,7 @@ public void aggregationTestWithResolutionChanges() { //Change timeSerie resolution on the fly to make sure this doesn't induce aggregation errors int newResolution = 5000; - TimeSeriesCollection collection = new TimeSeriesCollection(timeSeries.getDefaultCollection().getMainCollection(), newResolution); + TimeSeriesCollection collection = new TimeSeriesCollection(timeSeries.getDefaultCollection().getUnderlyingCollection(), newResolution); TimeSeries timeSeriesHigherResolution = new TimeSeriesBuilder().registerCollection(collection).build(); aggregationTest(timeSeriesHigherResolution, newResolution, iterations, start, end); }