Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,28 @@ public TimeSeries setTtlEnabled(boolean ttlEnabled) {
return this;
}

public void updateAllCollectionsTtl(boolean ttlEnabled, Map<Long, Long> resolutionsToTtl) {
setTtlEnabled(ttlEnabled);
resolutionsToTtl.forEach(this::updateCollectionTtl);
}
public void updateCollectionTtl(long resolution, long ttl) {
if (hasCollection(resolution)) {
getCollection(resolution).setTtl(ttl);
}
}

/**
* This method will ingest the data for all the resolutions which are empty.
* Each collection is ingesting the data from the previous collection only.
* <p>
* If this fails by any reason, the entire collection is dropped.
*/
public void ingestDataForEmptyCollections() {
long controllerStart = System.currentTimeMillis();
logger.info("Configured collections: {}", handledCollections.stream().map(TimeSeriesCollection::getName).collect(Collectors.toList()));
//This is run in as an async task; since data can be flushed in between, we need to get the list of empty collections before starting processing any of them
List<TimeSeriesCollection> emptyCollections = handledCollections.stream().filter(TimeSeriesCollection::isEmpty).collect(Collectors.toList());
logger.info("Empty collections detected: {}", emptyCollections.stream().map(TimeSeriesCollection::getName).collect(Collectors.toList()));
logger.warn("Empty collections detected: {}, do not stop the controller until it's re-ingestion is completed.", emptyCollections.stream().map(TimeSeriesCollection::getName).collect(Collectors.toList()));
for (int i = 1; i < handledCollections.size(); i++) {
TimeSeriesCollection collection = handledCollections.get(i);
if (emptyCollections.contains(collection)) {
Expand All @@ -71,7 +82,10 @@ public void ingestDataForEmptyCollections() {
.setFlushAsyncQueueSize(5000);
try (TimeSeriesIngestionPipeline ingestionPipeline = new TimeSeriesIngestionPipeline(collection, ingestionSettings)) {
SearchOrder searchOrder = new SearchOrder(TIMESTAMP_ATTRIBUTE, 1);
Filter filter = collection.getTtl() > 0 ? Filters.gte("begin", System.currentTimeMillis() - collection.getTtl()): Filters.empty();
Filter afterStart = Filters.lt("begin", controllerStart);
Filter filter = collection.getTtl() > 0 ?
Filters.and(List.of(afterStart, Filters.gte("begin", System.currentTimeMillis() - collection.getTtl())))
: afterStart;
try (Stream<Bucket> bucketStream = previousCollection.findLazy(filter, searchOrder)) {
bucketStream.forEach(ingestionPipeline::ingestBucket);
ingestionPipeline.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public TimeSeriesSettings getSettings() {

public TimeSeries build() {
if (handledCollections.isEmpty()) {
throw new IllegalArgumentException("At least one time series collection must be registered");
throw new IllegalArgumentException("At least one time series collection must be registered.");
}
handledCollections.sort(Comparator.comparingLong(TimeSeriesCollection::getResolution));
validateResolutions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ private Set<String> collectAllUsedAttributes(TimeSeriesAggregationQuery query) {
* 3. Go backward from the resolution obtained above and choose the first collection which handle all the attributes
*/
public TimeSeriesAggregationResponse collect(TimeSeriesAggregationQuery query) {
if (logger.isDebugEnabled()){
logger.debug("Aggregation query: {}", query);
}
validateQuery(query);
Set<String> usedAttributes = collectAllUsedAttributes(query).stream().map(a -> a.replace("attributes.", "")).collect(Collectors.toSet());
long queryFrom = query.getFrom() != null ? query.getFrom() : 0;
Expand All @@ -83,6 +86,10 @@ public TimeSeriesAggregationResponse collect(TimeSeriesAggregationQuery query) {
long sourceResolution = idealAvailableCollection.getResolution();
TimeSeriesProcessedParams finalParams = processQueryParams(query, sourceResolution);

if (logger.isDebugEnabled()) {
logger.debug("Ideal resolution calculated: {}, source resolution covering the data: {}", idealResolution, sourceResolution);
}

Map<BucketAttributes, Map<Long, BucketBuilder>> seriesBuilder = new HashMap<>();

LongAdder bucketCount = new LongAdder();
Expand Down Expand Up @@ -113,6 +120,9 @@ public TimeSeriesAggregationResponse collect(TimeSeriesAggregationQuery query) {
Map<BucketAttributes, Map<Long, Bucket>> result = seriesBuilder.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, i -> i.getValue().build()))));

if (logger.isDebugEnabled()) {
logger.debug("Processed query results in " + (System.currentTimeMillis() - t2) + "ms. Results size: " + result.size());
}
return new TimeSeriesAggregationResponseBuilder()
.setSeries(result)
.setStart(finalParams.getFrom())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,17 @@ public TimeSeriesAggregationQuery setBucketsResolution(@Nullable Long bucketsRes
public TimeSeriesOptimizationType getOptimizationType() {
return optimizationType;
}

@Override
public String toString() {
return "TimeSeriesAggregationQuery{" +
"optimizationType=" + optimizationType +
", groupDimensions=" + groupDimensions +
", bucketsCount=" + bucketsCount +
", bucketsResolution=" + bucketsResolution +
", shrink=" + shrink +
", collectAttributeKeys=" + collectAttributeKeys +
", collectAttributesValuesLimit=" + collectAttributesValuesLimit +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,19 @@ public long getPercentile(double percentile) {
}
return percentileValue;
}

@Override
public String toString() {
return "Bucket{" +
"begin=" + begin +
", end=" + end +
", attributes=" + attributes +
", count=" + count +
", sum=" + sum +
", min=" + min +
", max=" + max +
", pclPrecision=" + pclPrecision +
", distribution=" + distribution +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,31 @@

import org.junit.Assert;
import org.junit.Test;
import step.core.collections.Filters;
import step.core.ql.OQLFilterBuilder;
import step.core.timeseries.aggregation.TimeSeriesAggregationPipeline;
import step.core.timeseries.aggregation.TimeSeriesAggregationQuery;
import step.core.timeseries.aggregation.TimeSeriesAggregationQueryBuilder;
import step.core.timeseries.aggregation.TimeSeriesAggregationResponse;
import step.core.timeseries.bucket.Bucket;
import step.core.timeseries.bucket.BucketAttributes;
import step.core.timeseries.ingestion.TimeSeriesIngestionPipeline;

import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;

public class TimeSeriesAggergationQueryTest extends TimeSeriesBaseTest {

public static final long DURATION_T1_PASSED = 10L;
public static final long DURATION_T1_FAILED = 20L;
public static final long DURATION_T2_PASSED = 30L;
public static final long DURATION_T2_FAILED = 40L;

@Test(expected = IllegalArgumentException.class)
public void lowInvalidResolutionTest() {
TimeSeries timeSeries = getNewTimeSeries(10);
Expand Down Expand Up @@ -47,10 +58,10 @@ public void shrinkTest() {
TimeSeries timeSeries = getNewTimeSeries(10);

try (TimeSeriesIngestionPipeline ingestionPipeline = timeSeries.getIngestionPipeline()) {
ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "PASSED"), 1L, 10L);
ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "FAILED"), 2L, 10L);
ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "PASSED"), 1L, 10L);
ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "FAILED"), 2L, 10L);
ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "PASSED"), 1L, DURATION_T1_PASSED);
ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "FAILED"), 2L, DURATION_T1_PASSED);
ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "PASSED"), 1L, DURATION_T1_PASSED);
ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "FAILED"), 2L, DURATION_T1_PASSED);
}

TimeSeriesAggregationQuery query = new TimeSeriesAggregationQueryBuilder()
Expand All @@ -59,9 +70,9 @@ public void shrinkTest() {
TimeSeriesAggregationPipeline pipeline = timeSeries.getAggregationPipeline();
TimeSeriesAggregationResponse response = pipeline.collect(query);
Assert.assertEquals(0, response.getStart());
Assert.assertTrue(response.getResolution() > System.currentTimeMillis() - 3_000);
assertTrue(response.getResolution() > System.currentTimeMillis() - 3_000);
response.getSeries().values().forEach(map -> {
Assert.assertTrue(map.values().size() <= 1);
assertTrue(map.values().size() <= 1);
});

}
Expand Down Expand Up @@ -181,4 +192,127 @@ public void responseRangeWithCustomWindowTest() {
Assert.assertEquals(5000, response.getResolution());
}

/**
* This test aims to validate that a collection created initially with a resolution (i.e. 1 seconds)
* and with data already ingested, can then be reconfigured and resued with a higher resolution (i.e. 5 seconds).
*/
@Test
public void aggregationTestWithResolutionChanges() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test look good but I haven't reviewed them in detail. A short comment describing the purpose could be useful

int resolution = 1000;
int iterations = 1000;
TimeSeries timeSeries = getNewTimeSeries(1000);
Random random = new Random();
long end = System.currentTimeMillis();
long start = end - (iterations * resolution);
//Ingest some data
try (TimeSeriesIngestionPipeline ingestionPipeline = timeSeries.getIngestionPipeline()) {
for (int i = 0; i < iterations; i++ ) {
long relativeStart = start + (i * resolution);
int startOffset = resolution / 4;
ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "PASSED"), relativeStart + random.nextInt(startOffset), DURATION_T1_PASSED);
ingestionPipeline.ingestPoint(Map.of("name", "t1", "status", "FAILED"), relativeStart + startOffset + random.nextInt(startOffset), DURATION_T1_FAILED);
ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "PASSED"), relativeStart + 2*startOffset + random.nextInt(startOffset), DURATION_T2_PASSED);
ingestionPipeline.ingestPoint(Map.of("name", "t2", "status", "FAILED"), relativeStart + 3*startOffset + random.nextInt(startOffset), DURATION_T2_FAILED);
}
}
aggregationTest(timeSeries, resolution, iterations, start, end);

//Change timeSerie resolution on the fly to make sure this doesn't induce aggregation errors
int newResolution = 5000;
TimeSeriesCollection collection = new TimeSeriesCollection(timeSeries.getDefaultCollection().getUnderlyingCollection(), newResolution);
TimeSeries timeSeriesHigherResolution = new TimeSeriesBuilder().registerCollection(collection).build();
aggregationTest(timeSeriesHigherResolution, newResolution, iterations, start, end);
}


public void aggregationTest(TimeSeries timeSeries, int resolution, int iterations, long start, long end) {

//Test aggregation with a single time bucket
TimeSeriesAggregationQuery query = new TimeSeriesAggregationQueryBuilder()
.split(1)
.build();
TimeSeriesAggregationPipeline pipeline = timeSeries.getAggregationPipeline();
TimeSeriesAggregationResponse response = pipeline.collect(query);
Assert.assertEquals(0, response.getStart());
//query has no time range, so the effective resolution (range used by the aggregation) is from 0 to now
assertTrue(response.getResolution() > System.currentTimeMillis() - 3_000);
assertEquals(resolution, response.getCollectionResolution());
Map<BucketAttributes, Map<Long, Bucket>> series = response.getSeries();
assertEquals(1, series.size());
assertEquals(0, series.keySet().stream().findFirst().orElseThrow().size());
Map<Long, Bucket> longBucketMap = series.values().stream().findFirst().orElseThrow();
assertEquals(1, longBucketMap.size());
assertEquals(Long.valueOf(0), longBucketMap.keySet().stream().findFirst().orElseThrow());
Bucket bucket = longBucketMap.values().stream().findFirst().orElseThrow();
assertEquals(iterations * 4, bucket.getCount());
assertEquals((iterations * (DURATION_T1_FAILED + DURATION_T1_PASSED + DURATION_T2_FAILED + DURATION_T2_PASSED)), bucket.getSum());
assertEquals(DURATION_T1_PASSED, bucket.getMin());
assertEquals(DURATION_T2_FAILED, bucket.getMax());

//Test aggregation with filter, grouping and 100 time buckets (since the response resolution it 10 seconds, same response is expected with source 1 and 5 seconds
testWithManyTimeBuckets(resolution, iterations, start, end, pipeline, 100);

//Test aggregation with filter, grouping and 1000 time buckets -> response resolution is 1 second for source 1 second, but will be 5s for the 5s source resolution
testWithManyTimeBuckets(resolution, iterations, start, end, pipeline, 1000);
}

private void testWithManyTimeBuckets(int sourceResolution, int iterations, long start, long end, TimeSeriesAggregationPipeline pipeline, int targetBucketCount) {
TimeSeriesAggregationQuery query;
TimeSeriesAggregationResponse response;
Map<BucketAttributes, Map<Long, Bucket>> series;
query = new TimeSeriesAggregationQueryBuilder()
.split(targetBucketCount)
.withFilter(Filters.equals("attributes.name", "t1"))
.withGroupDimensions(Set.of("status"))
.range(start, end)
.build();
long expectedResponseResolution = (end - start) / targetBucketCount;
expectedResponseResolution = (expectedResponseResolution < sourceResolution) ? sourceResolution : expectedResponseResolution;
long expectedBucketsCount = (end - start) / expectedResponseResolution;
response = pipeline.collect(query);
Assert.assertEquals(expectedResponseResolution, response.getResolution());
assertEquals(sourceResolution, response.getCollectionResolution());
series = response.getSeries();
assertEquals(2, series.size());
Map<Long, Bucket> failedSeries = series.get(Map.of("status", "FAILED"));
Map<Long, Bucket> passedSeries = series.get(Map.of("status", "PASSED"));
validateSeries(failedSeries, DURATION_T1_FAILED, iterations, expectedBucketsCount);
validateSeries(passedSeries, DURATION_T1_PASSED, iterations, expectedBucketsCount);
}

private void validateSeries(Map<Long, Bucket> series, long expectedDuration, int iterations, long targetBucketCount) {
assertNotNull(series);
long expectedCountPerBucket = iterations / targetBucketCount;
long expectedCountPerBucketMin = expectedCountPerBucket - 1;
long expectedCountPerBucketMax = expectedCountPerBucket + 1;
//depending on the start and end time we might get back more buckets than requested to cover the range
assertTrue("Series should have max target buckets +1 " + targetBucketCount + " time buckets, but size was: " + series.size() + "Series content: " + series, series.size() <= (targetBucketCount+1));
//For response resolution equals source resolution, i.e. 1 second with count between 0 and 2 we might have significantly less time bucket because the one with 0 are not included
if (expectedCountPerBucket == 1) {
assertTrue("Series does not have sufficient number of buckets, target " + targetBucketCount + ", but size was: " + series.size() + "Series content: " + series, series.size() >= targetBucketCount*0.7);
} else {
assertTrue("Series should have at least the target buckets count, target " + targetBucketCount + ", but size was: " + series.size() + "Series content: " + series, series.size() >= targetBucketCount);
}
AtomicInteger bucketIdx = new AtomicInteger(0);
AtomicLong totalSum = new AtomicLong(0);
AtomicLong totalCount = new AtomicLong(0);
series.entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(e -> {
Bucket bucket = e.getValue();
int currentIdx = bucketIdx.getAndIncrement();
//Edge bucket can have lower c counts
if (currentIdx > 0 && currentIdx < targetBucketCount ) {
assertTrue("Count should be between " + expectedCountPerBucketMin + " and " + expectedCountPerBucketMax + " but was " + bucket.getCount() + ". Current idx: " + currentIdx +
"next bucket: " + series, expectedCountPerBucketMin <= bucket.getCount() && bucket.getCount() <= expectedCountPerBucketMax);
assertTrue((expectedCountPerBucketMin) * expectedDuration <= bucket.getSum() && bucket.getSum() <= (expectedCountPerBucketMax) * expectedDuration);
}
assertEquals(expectedDuration, bucket.getMin());
assertEquals(expectedDuration, bucket.getMax());
totalSum.addAndGet(bucket.getSum());
totalCount.addAndGet(bucket.getCount());
});
assertEquals(expectedDuration*iterations, totalSum.get());
assertEquals(iterations, totalCount.get());
System.out.println("totalSum: " + totalSum.get() + ", totalCount: " + totalCount.get() + ", #buckets: " + series.size());
}

}