diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java index d3785ce8..714a0413 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java @@ -68,7 +68,8 @@ public void ingestDataForEmptyCollections() { .setIgnoredAttributes(collection.getIgnoredAttributes()) .setResolution(collection.getResolution()) .setFlushingPeriodMs(TimeUnit.SECONDS.toMillis(30)) - .setFlushAsyncQueueSize(5000); + .setFlushAsyncQueueSize(5000) + .setFlushSeriesQueueSize(20000); try (TimeSeriesIngestionPipeline ingestionPipeline = new TimeSeriesIngestionPipeline(collection, ingestionSettings)) { SearchOrder searchOrder = new SearchOrder(TIMESTAMP_ATTRIBUTE, 1); Filter filter = collection.getTtl() > 0 ? Filters.gte("begin", System.currentTimeMillis() - collection.getTtl()): Filters.empty(); diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java index 4e46913a..be8d88c5 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java @@ -12,6 +12,7 @@ import step.core.timeseries.query.TimeSeriesQueryBuilder; import java.util.LinkedHashSet; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -30,24 +31,17 @@ public class TimeSeriesCollection { public TimeSeriesCollection(Collection mainCollection, long resolution) { this(mainCollection, new TimeSeriesCollectionSettings() .setResolution(resolution) + .setIngestionFlushSeriesQueueSize(20000) ); } - public TimeSeriesCollection(Collection mainCollection, long resolution, long flushPeriod) { - this(mainCollection, new TimeSeriesCollectionSettings() - .setResolution(resolution) - .setIngestionFlushingPeriodMs(flushPeriod) - ); - } - - public TimeSeriesCollection(Collection mainCollection, TimeSeriesCollectionSettings settings) { + this.mainCollection = Objects.requireNonNull(mainCollection); if (settings.getResolution() <= 0) { throw new IllegalArgumentException("The resolution parameter must be greater than zero"); } - validateTtl(settings.getTtl()); - this.mainCollection = mainCollection; this.resolution = settings.getResolution(); + validateTtl(settings.getTtl()); this.ttl = settings.getTtl(); TimeSeriesIngestionPipelineSettings ingestionSettings = new TimeSeriesIngestionPipelineSettings() .setResolution(settings.getResolution()) diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/ingestion/TimeSeriesIngestionPipeline.java b/step-framework-timeseries/src/main/java/step/core/timeseries/ingestion/TimeSeriesIngestionPipeline.java index 9394db1f..cc8be6ee 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/ingestion/TimeSeriesIngestionPipeline.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/ingestion/TimeSeriesIngestionPipeline.java @@ -42,8 +42,14 @@ public class TimeSeriesIngestionPipeline implements AutoCloseable { private final int seriesQueueSizeflush; public TimeSeriesIngestionPipeline(TimeSeriesCollection collection, TimeSeriesIngestionPipelineSettings settings) { + validateSettings(settings); this.collection = collection; this.sourceResolution = settings.getResolution(); + this.seriesQueueSizeflush = settings.getFlushSeriesQueueSize(); + this.ignoredAttributes = settings.getIgnoredAttributes(); + this.nextPipeline = settings.getNextPipeline(); + + //Enable periodical flush when configured long flushingPeriodMs = settings.getFlushingPeriodMs(); if (flushingPeriodMs > 0) { scheduler = Executors.newScheduledThreadPool(1, threadFactory); @@ -51,9 +57,7 @@ public TimeSeriesIngestionPipeline(TimeSeriesCollection collection, TimeSeriesIn } else { scheduler = null; } - this.ignoredAttributes = settings.getIgnoredAttributes(); - this.nextPipeline = settings.getNextPipeline(); - this.seriesQueueSizeflush = settings.getFlushSeriesQueueSize(); + //collection is null when overridden in TimeSeriesExecutionPlugin, in such case async processor is not required this.asyncProcessor = (collection == null) ? null : new AsyncProcessor<>(settings.getFlushAsyncQueueSize(), entity -> { try { @@ -64,6 +68,15 @@ public TimeSeriesIngestionPipeline(TimeSeriesCollection collection, TimeSeriesIn }); } + public void validateSettings(TimeSeriesIngestionPipelineSettings settings) { + if (settings.getResolution() <= 0) { + throw new IllegalArgumentException("The resolution parameter must be greater than zero"); + } + if (settings.getFlushingPeriodMs() > 0 && settings.getFlushSeriesQueueSize() <= 1) { + throw new IllegalArgumentException("The ingestion series queue size must be greater than 1 when flushing periodically (flushing period greater than 0)"); + } + } + public long getResolution() { return sourceResolution; } diff --git a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesIngestionTest.java b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesIngestionTest.java index 8c59e79f..ba9ef963 100644 --- a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesIngestionTest.java +++ b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesIngestionTest.java @@ -10,6 +10,9 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + public class TimeSeriesIngestionTest extends TimeSeriesBaseTest { @Test @@ -202,6 +205,19 @@ public void ingestionWithManyBucketsTest() { } } + @Test + public void ingestionIncorrectSettings() throws InterruptedException { + TimeSeriesCollectionSettings timeSeriesCollectionSettings = new TimeSeriesCollectionSettings(); + timeSeriesCollectionSettings.setIngestionFlushAsyncQueueSize(500); + timeSeriesCollectionSettings.setIngestionFlushingPeriodMs(100); + timeSeriesCollectionSettings.setResolution(30_000); + try (TimeSeries timeSeries = getTimeSeriesWithSettings(timeSeriesCollectionSettings)) { + fail("Flush queue size should be mandatory"); + } catch (IllegalArgumentException e) { + assertEquals("The ingestion series queue size must be greater than 1 when flushing periodically (flushing period greater than 0)", e.getMessage()); + } + } + @Test public void ingestionExceedingQueueSizeTest() throws InterruptedException { TimeSeriesCollectionSettings timeSeriesCollectionSettings = new TimeSeriesCollectionSettings(); diff --git a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesTest.java b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesTest.java index a40d4dc5..0177226d 100644 --- a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesTest.java +++ b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesTest.java @@ -344,7 +344,11 @@ public void ingestionPipelineResolution() { private void testResolutionFor10Points(int resolutionMs, int expectedBucketCount) { // Create ingestion pipeline InMemoryCollection bucketCollection = new InMemoryCollection<>(); - TimeSeriesCollection collection = new TimeSeriesCollection(bucketCollection, resolutionMs, resolutionMs); + TimeSeriesCollectionSettings timeSeriesCollectionSettings = new TimeSeriesCollectionSettings() + .setResolution(resolutionMs) + .setIngestionFlushingPeriodMs(resolutionMs) + .setIngestionFlushSeriesQueueSize(100); + TimeSeriesCollection collection = new TimeSeriesCollection(bucketCollection, timeSeriesCollectionSettings); TimeSeries timeSeries = new TimeSeriesBuilder().registerCollection(collection).build(); try (TimeSeriesIngestionPipeline ingestionPipeline = timeSeries.getIngestionPipeline()) { for (int i = 0; i < 10; i++) {