diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java index d3785ce8..fd2692ce 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java @@ -47,6 +47,16 @@ public TimeSeries setTtlEnabled(boolean ttlEnabled) { return this; } + public void updateAllCollectionsTtl(boolean ttlEnabled, Map resolutionsToTtl) { + setTtlEnabled(ttlEnabled); + resolutionsToTtl.forEach(this::updateCollectionTtl); + } + public void updateCollectionTtl(long resolution, long ttl) { + if (hasCollection(resolution)) { + getCollection(resolution).setTtl(ttl); + } + } + /** * This method will ingest the data for all the resolutions which are empty. * Each collection is ingesting the data from the previous collection only. @@ -54,10 +64,11 @@ public TimeSeries setTtlEnabled(boolean ttlEnabled) { * If this fails by any reason, the entire collection is dropped. */ public void ingestDataForEmptyCollections() { + long controllerStart = System.currentTimeMillis(); logger.info("Configured collections: {}", handledCollections.stream().map(TimeSeriesCollection::getName).collect(Collectors.toList())); //This is run in as an async task; since data can be flushed in between, we need to get the list of empty collections before starting processing any of them List emptyCollections = handledCollections.stream().filter(TimeSeriesCollection::isEmpty).collect(Collectors.toList()); - logger.info("Empty collections detected: {}", emptyCollections.stream().map(TimeSeriesCollection::getName).collect(Collectors.toList())); + logger.warn("Empty collections detected: {}, do not stop the controller until it's re-ingestion is completed.", emptyCollections.stream().map(TimeSeriesCollection::getName).collect(Collectors.toList())); for (int i = 1; i < handledCollections.size(); i++) { TimeSeriesCollection collection = handledCollections.get(i); if (emptyCollections.contains(collection)) { @@ -71,7 +82,10 @@ public void ingestDataForEmptyCollections() { .setFlushAsyncQueueSize(5000); try (TimeSeriesIngestionPipeline ingestionPipeline = new TimeSeriesIngestionPipeline(collection, ingestionSettings)) { SearchOrder searchOrder = new SearchOrder(TIMESTAMP_ATTRIBUTE, 1); - Filter filter = collection.getTtl() > 0 ? Filters.gte("begin", System.currentTimeMillis() - collection.getTtl()): Filters.empty(); + Filter afterStart = Filters.lt("begin", controllerStart); + Filter filter = collection.getTtl() > 0 ? + Filters.and(List.of(afterStart, Filters.gte("begin", System.currentTimeMillis() - collection.getTtl()))) + : afterStart; try (Stream bucketStream = previousCollection.findLazy(filter, searchOrder)) { bucketStream.forEach(ingestionPipeline::ingestBucket); ingestionPipeline.flush(); diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesBuilder.java b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesBuilder.java index f1f322c3..498d45f6 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesBuilder.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesBuilder.java @@ -76,7 +76,7 @@ public TimeSeriesSettings getSettings() { public TimeSeries build() { if (handledCollections.isEmpty()) { - throw new IllegalArgumentException("At least one time series collection must be registered"); + throw new IllegalArgumentException("At least one time series collection must be registered."); } handledCollections.sort(Comparator.comparingLong(TimeSeriesCollection::getResolution)); validateResolutions(); diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationPipeline.java b/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationPipeline.java index fc5fd861..c46039c0 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationPipeline.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationPipeline.java @@ -64,6 +64,9 @@ private Set collectAllUsedAttributes(TimeSeriesAggregationQuery query) { * 3. Go backward from the resolution obtained above and choose the first collection which handle all the attributes */ public TimeSeriesAggregationResponse collect(TimeSeriesAggregationQuery query) { + if (logger.isDebugEnabled()){ + logger.debug("Aggregation query: {}", query); + } validateQuery(query); Set usedAttributes = collectAllUsedAttributes(query).stream().map(a -> a.replace("attributes.", "")).collect(Collectors.toSet()); long queryFrom = query.getFrom() != null ? query.getFrom() : 0; @@ -83,6 +86,10 @@ public TimeSeriesAggregationResponse collect(TimeSeriesAggregationQuery query) { long sourceResolution = idealAvailableCollection.getResolution(); TimeSeriesProcessedParams finalParams = processQueryParams(query, sourceResolution); + if (logger.isDebugEnabled()) { + logger.debug("Ideal resolution calculated: {}, source resolution covering the data: {}", idealResolution, sourceResolution); + } + Map> seriesBuilder = new HashMap<>(); LongAdder bucketCount = new LongAdder(); @@ -113,6 +120,9 @@ public TimeSeriesAggregationResponse collect(TimeSeriesAggregationQuery query) { Map> result = seriesBuilder.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, i -> i.getValue().build())))); + if (logger.isDebugEnabled()) { + logger.debug("Processed query results in " + (System.currentTimeMillis() - t2) + "ms. Results size: " + result.size()); + } return new TimeSeriesAggregationResponseBuilder() .setSeries(result) .setStart(finalParams.getFrom()) diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationQuery.java b/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationQuery.java index aca023bf..4d915257 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationQuery.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/aggregation/TimeSeriesAggregationQuery.java @@ -94,4 +94,17 @@ public TimeSeriesAggregationQuery setBucketsResolution(@Nullable Long bucketsRes public TimeSeriesOptimizationType getOptimizationType() { return optimizationType; } + + @Override + public String toString() { + return "TimeSeriesAggregationQuery{" + + "optimizationType=" + optimizationType + + ", groupDimensions=" + groupDimensions + + ", bucketsCount=" + bucketsCount + + ", bucketsResolution=" + bucketsResolution + + ", shrink=" + shrink + + ", collectAttributeKeys=" + collectAttributeKeys + + ", collectAttributesValuesLimit=" + collectAttributesValuesLimit + + '}'; + } } diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/bucket/Bucket.java b/step-framework-timeseries/src/main/java/step/core/timeseries/bucket/Bucket.java index fdba0781..ad74ca4d 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/bucket/Bucket.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/bucket/Bucket.java @@ -115,4 +115,19 @@ public long getPercentile(double percentile) { } return percentileValue; } + + @Override + public String toString() { + return "Bucket{" + + "begin=" + begin + + ", end=" + end + + ", attributes=" + attributes + + ", count=" + count + + ", sum=" + sum + + ", min=" + min + + ", max=" + max + + ", pclPrecision=" + pclPrecision + + ", distribution=" + distribution + + '}'; + } } diff --git a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesAggergationQueryTest.java b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesAggergationQueryTest.java index 3bb704a4..6194699a 100644 --- a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesAggergationQueryTest.java +++ b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesAggergationQueryTest.java @@ -2,20 +2,31 @@ import org.junit.Assert; import org.junit.Test; +import step.core.collections.Filters; import step.core.ql.OQLFilterBuilder; import step.core.timeseries.aggregation.TimeSeriesAggregationPipeline; import step.core.timeseries.aggregation.TimeSeriesAggregationQuery; import step.core.timeseries.aggregation.TimeSeriesAggregationQueryBuilder; import step.core.timeseries.aggregation.TimeSeriesAggregationResponse; import step.core.timeseries.bucket.Bucket; +import step.core.timeseries.bucket.BucketAttributes; import step.core.timeseries.ingestion.TimeSeriesIngestionPipeline; import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; public class TimeSeriesAggergationQueryTest extends TimeSeriesBaseTest { + public static final long DURATION_T1_PASSED = 10L; + public static final long DURATION_T1_FAILED = 20L; + public static final long DURATION_T2_PASSED = 30L; + public static final long DURATION_T2_FAILED = 40L; + @Test(expected = IllegalArgumentException.class) public void lowInvalidResolutionTest() { TimeSeries timeSeries = getNewTimeSeries(10); @@ -47,10 +58,10 @@ public void shrinkTest() { TimeSeries timeSeries = getNewTimeSeries(10); try (TimeSeriesIngestionPipeline ingestionPipeline = timeSeries.getIngestionPipeline()) { - ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "PASSED"), 1L, 10L); - ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "FAILED"), 2L, 10L); - ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "PASSED"), 1L, 10L); - ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "FAILED"), 2L, 10L); + ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "PASSED"), 1L, DURATION_T1_PASSED); + ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "FAILED"), 2L, DURATION_T1_PASSED); + ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "PASSED"), 1L, DURATION_T1_PASSED); + ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "FAILED"), 2L, DURATION_T1_PASSED); } TimeSeriesAggregationQuery query = new TimeSeriesAggregationQueryBuilder() @@ -59,9 +70,9 @@ public void shrinkTest() { TimeSeriesAggregationPipeline pipeline = timeSeries.getAggregationPipeline(); TimeSeriesAggregationResponse response = pipeline.collect(query); Assert.assertEquals(0, response.getStart()); - Assert.assertTrue(response.getResolution() > System.currentTimeMillis() - 3_000); + assertTrue(response.getResolution() > System.currentTimeMillis() - 3_000); response.getSeries().values().forEach(map -> { - Assert.assertTrue(map.values().size() <= 1); + assertTrue(map.values().size() <= 1); }); } @@ -181,4 +192,127 @@ public void responseRangeWithCustomWindowTest() { Assert.assertEquals(5000, response.getResolution()); } + /** + * This test aims to validate that a collection created initially with a resolution (i.e. 1 seconds) + * and with data already ingested, can then be reconfigured and resued with a higher resolution (i.e. 5 seconds). + */ + @Test + public void aggregationTestWithResolutionChanges() { + int resolution = 1000; + int iterations = 1000; + TimeSeries timeSeries = getNewTimeSeries(1000); + Random random = new Random(); + long end = System.currentTimeMillis(); + long start = end - (iterations * resolution); + //Ingest some data + try (TimeSeriesIngestionPipeline ingestionPipeline = timeSeries.getIngestionPipeline()) { + for (int i = 0; i < iterations; i++ ) { + long relativeStart = start + (i * resolution); + int startOffset = resolution / 4; + ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "PASSED"), relativeStart + random.nextInt(startOffset), DURATION_T1_PASSED); + ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "FAILED"), relativeStart + startOffset + random.nextInt(startOffset), DURATION_T1_FAILED); + ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "PASSED"), relativeStart + 2*startOffset + random.nextInt(startOffset), DURATION_T2_PASSED); + ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "FAILED"), relativeStart + 3*startOffset + random.nextInt(startOffset), DURATION_T2_FAILED); + } + } + aggregationTest(timeSeries, resolution, iterations, start, end); + + //Change timeSerie resolution on the fly to make sure this doesn't induce aggregation errors + int newResolution = 5000; + TimeSeriesCollection collection = new TimeSeriesCollection(timeSeries.getDefaultCollection().getUnderlyingCollection(), newResolution); + TimeSeries timeSeriesHigherResolution = new TimeSeriesBuilder().registerCollection(collection).build(); + aggregationTest(timeSeriesHigherResolution, newResolution, iterations, start, end); + } + + + public void aggregationTest(TimeSeries timeSeries, int resolution, int iterations, long start, long end) { + + //Test aggregation with a single time bucket + TimeSeriesAggregationQuery query = new TimeSeriesAggregationQueryBuilder() + .split(1) + .build(); + TimeSeriesAggregationPipeline pipeline = timeSeries.getAggregationPipeline(); + TimeSeriesAggregationResponse response = pipeline.collect(query); + Assert.assertEquals(0, response.getStart()); + //query has no time range, so the effective resolution (range used by the aggregation) is from 0 to now + assertTrue(response.getResolution() > System.currentTimeMillis() - 3_000); + assertEquals(resolution, response.getCollectionResolution()); + Map> series = response.getSeries(); + assertEquals(1, series.size()); + assertEquals(0, series.keySet().stream().findFirst().orElseThrow().size()); + Map longBucketMap = series.values().stream().findFirst().orElseThrow(); + assertEquals(1, longBucketMap.size()); + assertEquals(Long.valueOf(0), longBucketMap.keySet().stream().findFirst().orElseThrow()); + Bucket bucket = longBucketMap.values().stream().findFirst().orElseThrow(); + assertEquals(iterations * 4, bucket.getCount()); + assertEquals((iterations * (DURATION_T1_FAILED + DURATION_T1_PASSED + DURATION_T2_FAILED + DURATION_T2_PASSED)), bucket.getSum()); + assertEquals(DURATION_T1_PASSED, bucket.getMin()); + assertEquals(DURATION_T2_FAILED, bucket.getMax()); + + //Test aggregation with filter, grouping and 100 time buckets (since the response resolution it 10 seconds, same response is expected with source 1 and 5 seconds + testWithManyTimeBuckets(resolution, iterations, start, end, pipeline, 100); + + //Test aggregation with filter, grouping and 1000 time buckets -> response resolution is 1 second for source 1 second, but will be 5s for the 5s source resolution + testWithManyTimeBuckets(resolution, iterations, start, end, pipeline, 1000); + } + + private void testWithManyTimeBuckets(int sourceResolution, int iterations, long start, long end, TimeSeriesAggregationPipeline pipeline, int targetBucketCount) { + TimeSeriesAggregationQuery query; + TimeSeriesAggregationResponse response; + Map> series; + query = new TimeSeriesAggregationQueryBuilder() + .split(targetBucketCount) + .withFilter(Filters.equals("attributes.name", "t1")) + .withGroupDimensions(Set.of("status")) + .range(start, end) + .build(); + long expectedResponseResolution = (end - start) / targetBucketCount; + expectedResponseResolution = (expectedResponseResolution < sourceResolution) ? sourceResolution : expectedResponseResolution; + long expectedBucketsCount = (end - start) / expectedResponseResolution; + response = pipeline.collect(query); + Assert.assertEquals(expectedResponseResolution, response.getResolution()); + assertEquals(sourceResolution, response.getCollectionResolution()); + series = response.getSeries(); + assertEquals(2, series.size()); + Map failedSeries = series.get(Map.of("status", "FAILED")); + Map passedSeries = series.get(Map.of("status", "PASSED")); + validateSeries(failedSeries, DURATION_T1_FAILED, iterations, expectedBucketsCount); + validateSeries(passedSeries, DURATION_T1_PASSED, iterations, expectedBucketsCount); + } + + private void validateSeries(Map series, long expectedDuration, int iterations, long targetBucketCount) { + assertNotNull(series); + long expectedCountPerBucket = iterations / targetBucketCount; + long expectedCountPerBucketMin = expectedCountPerBucket - 1; + long expectedCountPerBucketMax = expectedCountPerBucket + 1; + //depending on the start and end time we might get back more buckets than requested to cover the range + assertTrue("Series should have max target buckets +1 " + targetBucketCount + " time buckets, but size was: " + series.size() + "Series content: " + series, series.size() <= (targetBucketCount+1)); + //For response resolution equals source resolution, i.e. 1 second with count between 0 and 2 we might have significantly less time bucket because the one with 0 are not included + if (expectedCountPerBucket == 1) { + assertTrue("Series does not have sufficient number of buckets, target " + targetBucketCount + ", but size was: " + series.size() + "Series content: " + series, series.size() >= targetBucketCount*0.7); + } else { + assertTrue("Series should have at least the target buckets count, target " + targetBucketCount + ", but size was: " + series.size() + "Series content: " + series, series.size() >= targetBucketCount); + } + AtomicInteger bucketIdx = new AtomicInteger(0); + AtomicLong totalSum = new AtomicLong(0); + AtomicLong totalCount = new AtomicLong(0); + series.entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(e -> { + Bucket bucket = e.getValue(); + int currentIdx = bucketIdx.getAndIncrement(); + //Edge bucket can have lower c counts + if (currentIdx > 0 && currentIdx < targetBucketCount ) { + assertTrue("Count should be between " + expectedCountPerBucketMin + " and " + expectedCountPerBucketMax + " but was " + bucket.getCount() + ". Current idx: " + currentIdx + + "next bucket: " + series, expectedCountPerBucketMin <= bucket.getCount() && bucket.getCount() <= expectedCountPerBucketMax); + assertTrue((expectedCountPerBucketMin) * expectedDuration <= bucket.getSum() && bucket.getSum() <= (expectedCountPerBucketMax) * expectedDuration); + } + assertEquals(expectedDuration, bucket.getMin()); + assertEquals(expectedDuration, bucket.getMax()); + totalSum.addAndGet(bucket.getSum()); + totalCount.addAndGet(bucket.getCount()); + }); + assertEquals(expectedDuration*iterations, totalSum.get()); + assertEquals(iterations, totalCount.get()); + System.out.println("totalSum: " + totalSum.get() + ", totalCount: " + totalCount.get() + ", #buckets: " + series.size()); + } + }