From 0662759a1851086dcb93ca6fc8f481d0ecfaf3b1 Mon Sep 17 00:00:00 2001 From: David Stephan Date: Thu, 18 Dec 2025 16:33:24 +0100 Subject: [PATCH 1/2] SED-4451 The performance of the analytics view is very poor on large datasets after re-ingestion --- .../main/java/step/core/timeseries/TimeSeries.java | 3 ++- .../step/core/timeseries/TimeSeriesCollection.java | 14 +++++--------- .../ingestion/TimeSeriesIngestionPipeline.java | 10 ++++++++++ .../step/core/timeseries/TimeSeriesBaseTest.java | 6 ++++-- .../timeseries/TimeSeriesHousekeepingTest.java | 4 ++-- .../java/step/core/timeseries/TimeSeriesTest.java | 6 +++++- 6 files changed, 28 insertions(+), 15 deletions(-) diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java index d3785ce8..714a0413 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeries.java @@ -68,7 +68,8 @@ public void ingestDataForEmptyCollections() { .setIgnoredAttributes(collection.getIgnoredAttributes()) .setResolution(collection.getResolution()) .setFlushingPeriodMs(TimeUnit.SECONDS.toMillis(30)) - .setFlushAsyncQueueSize(5000); + .setFlushAsyncQueueSize(5000) + .setFlushSeriesQueueSize(20000); try (TimeSeriesIngestionPipeline ingestionPipeline = new TimeSeriesIngestionPipeline(collection, ingestionSettings)) { SearchOrder searchOrder = new SearchOrder(TIMESTAMP_ATTRIBUTE, 1); Filter filter = collection.getTtl() > 0 ? Filters.gte("begin", System.currentTimeMillis() - collection.getTtl()): Filters.empty(); diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java index 4e46913a..50606779 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java @@ -27,24 +27,20 @@ public class TimeSeriesCollection { private long ttl; // In milliseconds. set to 0 in case deletion is never required private final Set ignoredAttributes; - public TimeSeriesCollection(Collection mainCollection, long resolution) { + protected TimeSeriesCollection(Collection mainCollection, long resolution) { this(mainCollection, new TimeSeriesCollectionSettings() .setResolution(resolution) + .setIngestionFlushSeriesQueueSize(20000) ); } - public TimeSeriesCollection(Collection mainCollection, long resolution, long flushPeriod) { - this(mainCollection, new TimeSeriesCollectionSettings() - .setResolution(resolution) - .setIngestionFlushingPeriodMs(flushPeriod) - ); - } - - public TimeSeriesCollection(Collection mainCollection, TimeSeriesCollectionSettings settings) { if (settings.getResolution() <= 0) { throw new IllegalArgumentException("The resolution parameter must be greater than zero"); } + if (settings.getIngestionFlushSeriesQueueSize() <= 1) { + throw new IllegalArgumentException("The ingestion series queue size must be greater than 1"); + } validateTtl(settings.getTtl()); this.mainCollection = mainCollection; this.resolution = settings.getResolution(); diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/ingestion/TimeSeriesIngestionPipeline.java b/step-framework-timeseries/src/main/java/step/core/timeseries/ingestion/TimeSeriesIngestionPipeline.java index 9394db1f..67a444a3 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/ingestion/TimeSeriesIngestionPipeline.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/ingestion/TimeSeriesIngestionPipeline.java @@ -42,6 +42,7 @@ public class TimeSeriesIngestionPipeline implements AutoCloseable { private final int seriesQueueSizeflush; public TimeSeriesIngestionPipeline(TimeSeriesCollection collection, TimeSeriesIngestionPipelineSettings settings) { + validateSettings(settings); this.collection = collection; this.sourceResolution = settings.getResolution(); long flushingPeriodMs = settings.getFlushingPeriodMs(); @@ -64,6 +65,15 @@ public TimeSeriesIngestionPipeline(TimeSeriesCollection collection, TimeSeriesIn }); } + public void validateSettings(TimeSeriesIngestionPipelineSettings settings) { + if (settings.getResolution() <= 0) { + throw new IllegalArgumentException("The resolution parameter must be greater than zero"); + } + if (settings.getFlushSeriesQueueSize() <= 1) { + throw new IllegalArgumentException("The ingestion series queue size must be greater than 1"); + } + } + public long getResolution() { return sourceResolution; } diff --git a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesBaseTest.java b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesBaseTest.java index 507f8c81..85602c3f 100644 --- a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesBaseTest.java +++ b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesBaseTest.java @@ -26,7 +26,8 @@ protected TimeSeriesCollection getCollection(long resolution, Set ignore InMemoryCollection col = new InMemoryCollection<>("resolution_" + resolution); TimeSeriesCollectionSettings settings = new TimeSeriesCollectionSettings() .setResolution(resolution) - .setIgnoredAttributes(ignoredAttributes); + .setIgnoredAttributes(ignoredAttributes) + .setIngestionFlushSeriesQueueSize(20000); return new TimeSeriesCollection(col, settings); } @@ -39,7 +40,8 @@ protected TimeSeriesCollection getCollectionWithTTL(long resolution, long ttl, S TimeSeriesCollectionSettings settings = new TimeSeriesCollectionSettings() .setTtl(ttl) .setResolution(resolution) - .setIgnoredAttributes(ignoredAttributes); + .setIgnoredAttributes(ignoredAttributes) + .setIngestionFlushSeriesQueueSize(100); return new TimeSeriesCollection(col, settings); } diff --git a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesHousekeepingTest.java b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesHousekeepingTest.java index f6c6af1d..ee532d86 100644 --- a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesHousekeepingTest.java +++ b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesHousekeepingTest.java @@ -34,8 +34,8 @@ public void simpleCleanupTest() { @Test public void ttlNotCoveringTest() { - TimeSeriesCollection tsCol1 = new TimeSeriesCollection(new InMemoryCollection<>(), new TimeSeriesCollectionSettings().setResolution(10).setTtl(40)); // this live longer - TimeSeriesCollection tsCol2 = new TimeSeriesCollection(new InMemoryCollection<>(), new TimeSeriesCollectionSettings().setResolution(100).setTtl(10_000)); // this should not cover the request + TimeSeriesCollection tsCol1 = new TimeSeriesCollection(new InMemoryCollection<>(), new TimeSeriesCollectionSettings().setResolution(10).setTtl(40).setIngestionFlushSeriesQueueSize(100)); // this live longer + TimeSeriesCollection tsCol2 = new TimeSeriesCollection(new InMemoryCollection<>(), new TimeSeriesCollectionSettings().setResolution(100).setTtl(10_000).setIngestionFlushSeriesQueueSize(100)); // this should not cover the request long now = System.currentTimeMillis(); for (int i = 0; i < 5; i++) { Bucket bucket = new Bucket(); diff --git a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesTest.java b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesTest.java index a40d4dc5..0177226d 100644 --- a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesTest.java +++ b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesTest.java @@ -344,7 +344,11 @@ public void ingestionPipelineResolution() { private void testResolutionFor10Points(int resolutionMs, int expectedBucketCount) { // Create ingestion pipeline InMemoryCollection bucketCollection = new InMemoryCollection<>(); - TimeSeriesCollection collection = new TimeSeriesCollection(bucketCollection, resolutionMs, resolutionMs); + TimeSeriesCollectionSettings timeSeriesCollectionSettings = new TimeSeriesCollectionSettings() + .setResolution(resolutionMs) + .setIngestionFlushingPeriodMs(resolutionMs) + .setIngestionFlushSeriesQueueSize(100); + TimeSeriesCollection collection = new TimeSeriesCollection(bucketCollection, timeSeriesCollectionSettings); TimeSeries timeSeries = new TimeSeriesBuilder().registerCollection(collection).build(); try (TimeSeriesIngestionPipeline ingestionPipeline = timeSeries.getIngestionPipeline()) { for (int i = 0; i < 10; i++) { From c0b6a6665c66f883af7b283d5104d9d0bfe39da4 Mon Sep 17 00:00:00 2001 From: David Stephan Date: Fri, 19 Dec 2025 09:45:19 +0100 Subject: [PATCH 2/2] SED-4451 The performance of the analytics view is very poor on large datasets after re-ingestion --- .../core/timeseries/TimeSeriesCollection.java | 10 ++++------ .../ingestion/TimeSeriesIngestionPipeline.java | 13 ++++++++----- .../step/core/timeseries/TimeSeriesBaseTest.java | 6 ++---- .../timeseries/TimeSeriesHousekeepingTest.java | 4 ++-- .../core/timeseries/TimeSeriesIngestionTest.java | 16 ++++++++++++++++ 5 files changed, 32 insertions(+), 17 deletions(-) diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java index 50606779..be8d88c5 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/TimeSeriesCollection.java @@ -12,6 +12,7 @@ import step.core.timeseries.query.TimeSeriesQueryBuilder; import java.util.LinkedHashSet; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -27,7 +28,7 @@ public class TimeSeriesCollection { private long ttl; // In milliseconds. set to 0 in case deletion is never required private final Set ignoredAttributes; - protected TimeSeriesCollection(Collection mainCollection, long resolution) { + public TimeSeriesCollection(Collection mainCollection, long resolution) { this(mainCollection, new TimeSeriesCollectionSettings() .setResolution(resolution) .setIngestionFlushSeriesQueueSize(20000) @@ -35,15 +36,12 @@ protected TimeSeriesCollection(Collection mainCollection, long resolutio } public TimeSeriesCollection(Collection mainCollection, TimeSeriesCollectionSettings settings) { + this.mainCollection = Objects.requireNonNull(mainCollection); if (settings.getResolution() <= 0) { throw new IllegalArgumentException("The resolution parameter must be greater than zero"); } - if (settings.getIngestionFlushSeriesQueueSize() <= 1) { - throw new IllegalArgumentException("The ingestion series queue size must be greater than 1"); - } - validateTtl(settings.getTtl()); - this.mainCollection = mainCollection; this.resolution = settings.getResolution(); + validateTtl(settings.getTtl()); this.ttl = settings.getTtl(); TimeSeriesIngestionPipelineSettings ingestionSettings = new TimeSeriesIngestionPipelineSettings() .setResolution(settings.getResolution()) diff --git a/step-framework-timeseries/src/main/java/step/core/timeseries/ingestion/TimeSeriesIngestionPipeline.java b/step-framework-timeseries/src/main/java/step/core/timeseries/ingestion/TimeSeriesIngestionPipeline.java index 67a444a3..cc8be6ee 100644 --- a/step-framework-timeseries/src/main/java/step/core/timeseries/ingestion/TimeSeriesIngestionPipeline.java +++ b/step-framework-timeseries/src/main/java/step/core/timeseries/ingestion/TimeSeriesIngestionPipeline.java @@ -45,6 +45,11 @@ public TimeSeriesIngestionPipeline(TimeSeriesCollection collection, TimeSeriesIn validateSettings(settings); this.collection = collection; this.sourceResolution = settings.getResolution(); + this.seriesQueueSizeflush = settings.getFlushSeriesQueueSize(); + this.ignoredAttributes = settings.getIgnoredAttributes(); + this.nextPipeline = settings.getNextPipeline(); + + //Enable periodical flush when configured long flushingPeriodMs = settings.getFlushingPeriodMs(); if (flushingPeriodMs > 0) { scheduler = Executors.newScheduledThreadPool(1, threadFactory); @@ -52,9 +57,7 @@ public TimeSeriesIngestionPipeline(TimeSeriesCollection collection, TimeSeriesIn } else { scheduler = null; } - this.ignoredAttributes = settings.getIgnoredAttributes(); - this.nextPipeline = settings.getNextPipeline(); - this.seriesQueueSizeflush = settings.getFlushSeriesQueueSize(); + //collection is null when overridden in TimeSeriesExecutionPlugin, in such case async processor is not required this.asyncProcessor = (collection == null) ? null : new AsyncProcessor<>(settings.getFlushAsyncQueueSize(), entity -> { try { @@ -69,8 +72,8 @@ public void validateSettings(TimeSeriesIngestionPipelineSettings settings) { if (settings.getResolution() <= 0) { throw new IllegalArgumentException("The resolution parameter must be greater than zero"); } - if (settings.getFlushSeriesQueueSize() <= 1) { - throw new IllegalArgumentException("The ingestion series queue size must be greater than 1"); + if (settings.getFlushingPeriodMs() > 0 && settings.getFlushSeriesQueueSize() <= 1) { + throw new IllegalArgumentException("The ingestion series queue size must be greater than 1 when flushing periodically (flushing period greater than 0)"); } } diff --git a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesBaseTest.java b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesBaseTest.java index 85602c3f..507f8c81 100644 --- a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesBaseTest.java +++ b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesBaseTest.java @@ -26,8 +26,7 @@ protected TimeSeriesCollection getCollection(long resolution, Set ignore InMemoryCollection col = new InMemoryCollection<>("resolution_" + resolution); TimeSeriesCollectionSettings settings = new TimeSeriesCollectionSettings() .setResolution(resolution) - .setIgnoredAttributes(ignoredAttributes) - .setIngestionFlushSeriesQueueSize(20000); + .setIgnoredAttributes(ignoredAttributes); return new TimeSeriesCollection(col, settings); } @@ -40,8 +39,7 @@ protected TimeSeriesCollection getCollectionWithTTL(long resolution, long ttl, S TimeSeriesCollectionSettings settings = new TimeSeriesCollectionSettings() .setTtl(ttl) .setResolution(resolution) - .setIgnoredAttributes(ignoredAttributes) - .setIngestionFlushSeriesQueueSize(100); + .setIgnoredAttributes(ignoredAttributes); return new TimeSeriesCollection(col, settings); } diff --git a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesHousekeepingTest.java b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesHousekeepingTest.java index ee532d86..f6c6af1d 100644 --- a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesHousekeepingTest.java +++ b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesHousekeepingTest.java @@ -34,8 +34,8 @@ public void simpleCleanupTest() { @Test public void ttlNotCoveringTest() { - TimeSeriesCollection tsCol1 = new TimeSeriesCollection(new InMemoryCollection<>(), new TimeSeriesCollectionSettings().setResolution(10).setTtl(40).setIngestionFlushSeriesQueueSize(100)); // this live longer - TimeSeriesCollection tsCol2 = new TimeSeriesCollection(new InMemoryCollection<>(), new TimeSeriesCollectionSettings().setResolution(100).setTtl(10_000).setIngestionFlushSeriesQueueSize(100)); // this should not cover the request + TimeSeriesCollection tsCol1 = new TimeSeriesCollection(new InMemoryCollection<>(), new TimeSeriesCollectionSettings().setResolution(10).setTtl(40)); // this live longer + TimeSeriesCollection tsCol2 = new TimeSeriesCollection(new InMemoryCollection<>(), new TimeSeriesCollectionSettings().setResolution(100).setTtl(10_000)); // this should not cover the request long now = System.currentTimeMillis(); for (int i = 0; i < 5; i++) { Bucket bucket = new Bucket(); diff --git a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesIngestionTest.java b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesIngestionTest.java index 8c59e79f..ba9ef963 100644 --- a/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesIngestionTest.java +++ b/step-framework-timeseries/src/test/java/step/core/timeseries/TimeSeriesIngestionTest.java @@ -10,6 +10,9 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + public class TimeSeriesIngestionTest extends TimeSeriesBaseTest { @Test @@ -202,6 +205,19 @@ public void ingestionWithManyBucketsTest() { } } + @Test + public void ingestionIncorrectSettings() throws InterruptedException { + TimeSeriesCollectionSettings timeSeriesCollectionSettings = new TimeSeriesCollectionSettings(); + timeSeriesCollectionSettings.setIngestionFlushAsyncQueueSize(500); + timeSeriesCollectionSettings.setIngestionFlushingPeriodMs(100); + timeSeriesCollectionSettings.setResolution(30_000); + try (TimeSeries timeSeries = getTimeSeriesWithSettings(timeSeriesCollectionSettings)) { + fail("Flush queue size should be mandatory"); + } catch (IllegalArgumentException e) { + assertEquals("The ingestion series queue size must be greater than 1 when flushing periodically (flushing period greater than 0)", e.getMessage()); + } + } + @Test public void ingestionExceedingQueueSizeTest() throws InterruptedException { TimeSeriesCollectionSettings timeSeriesCollectionSettings = new TimeSeriesCollectionSettings();