Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public void ingestDataForEmptyCollections() {
.setIgnoredAttributes(collection.getIgnoredAttributes())
.setResolution(collection.getResolution())
.setFlushingPeriodMs(TimeUnit.SECONDS.toMillis(30))
.setFlushAsyncQueueSize(5000);
.setFlushAsyncQueueSize(5000)
.setFlushSeriesQueueSize(20000);
try (TimeSeriesIngestionPipeline ingestionPipeline = new TimeSeriesIngestionPipeline(collection, ingestionSettings)) {
SearchOrder searchOrder = new SearchOrder(TIMESTAMP_ATTRIBUTE, 1);
Filter filter = collection.getTtl() > 0 ? Filters.gte("begin", System.currentTimeMillis() - collection.getTtl()): Filters.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import step.core.timeseries.query.TimeSeriesQueryBuilder;

import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -30,24 +31,17 @@ public class TimeSeriesCollection {
public TimeSeriesCollection(Collection<Bucket> mainCollection, long resolution) {
this(mainCollection, new TimeSeriesCollectionSettings()
.setResolution(resolution)
.setIngestionFlushSeriesQueueSize(20000)
);
}

public TimeSeriesCollection(Collection<Bucket> mainCollection, long resolution, long flushPeriod) {
this(mainCollection, new TimeSeriesCollectionSettings()
.setResolution(resolution)
.setIngestionFlushingPeriodMs(flushPeriod)
);
}


public TimeSeriesCollection(Collection<Bucket> mainCollection, TimeSeriesCollectionSettings settings) {
this.mainCollection = Objects.requireNonNull(mainCollection);
if (settings.getResolution() <= 0) {
throw new IllegalArgumentException("The resolution parameter must be greater than zero");
}
validateTtl(settings.getTtl());
this.mainCollection = mainCollection;
this.resolution = settings.getResolution();
validateTtl(settings.getTtl());
this.ttl = settings.getTtl();
TimeSeriesIngestionPipelineSettings ingestionSettings = new TimeSeriesIngestionPipelineSettings()
.setResolution(settings.getResolution())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,22 @@ public class TimeSeriesIngestionPipeline implements AutoCloseable {
private final int seriesQueueSizeflush;

public TimeSeriesIngestionPipeline(TimeSeriesCollection collection, TimeSeriesIngestionPipelineSettings settings) {
validateSettings(settings);
this.collection = collection;
this.sourceResolution = settings.getResolution();
this.seriesQueueSizeflush = settings.getFlushSeriesQueueSize();
this.ignoredAttributes = settings.getIgnoredAttributes();
this.nextPipeline = settings.getNextPipeline();

//Enable periodical flush when configured
long flushingPeriodMs = settings.getFlushingPeriodMs();
if (flushingPeriodMs > 0) {
scheduler = Executors.newScheduledThreadPool(1, threadFactory);
scheduler.scheduleAtFixedRate(() -> flush(false), flushingPeriodMs, flushingPeriodMs, TimeUnit.MILLISECONDS);
} else {
scheduler = null;
}
this.ignoredAttributes = settings.getIgnoredAttributes();
this.nextPipeline = settings.getNextPipeline();
this.seriesQueueSizeflush = settings.getFlushSeriesQueueSize();

//collection is null when overridden in TimeSeriesExecutionPlugin, in such case async processor is not required
this.asyncProcessor = (collection == null) ? null : new AsyncProcessor<>(settings.getFlushAsyncQueueSize(), entity -> {
try {
Expand All @@ -64,6 +68,15 @@ public TimeSeriesIngestionPipeline(TimeSeriesCollection collection, TimeSeriesIn
});
}

public void validateSettings(TimeSeriesIngestionPipelineSettings settings) {
if (settings.getResolution() <= 0) {
throw new IllegalArgumentException("The resolution parameter must be greater than zero");
}
if (settings.getFlushingPeriodMs() > 0 && settings.getFlushSeriesQueueSize() <= 1) {
throw new IllegalArgumentException("The ingestion series queue size must be greater than 1 when flushing periodically (flushing period greater than 0)");
}
}

public long getResolution() {
return sourceResolution;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

public class TimeSeriesIngestionTest extends TimeSeriesBaseTest {

@Test
Expand Down Expand Up @@ -202,6 +205,19 @@ public void ingestionWithManyBucketsTest() {
}
}

@Test
public void ingestionIncorrectSettings() throws InterruptedException {
TimeSeriesCollectionSettings timeSeriesCollectionSettings = new TimeSeriesCollectionSettings();
timeSeriesCollectionSettings.setIngestionFlushAsyncQueueSize(500);
timeSeriesCollectionSettings.setIngestionFlushingPeriodMs(100);
timeSeriesCollectionSettings.setResolution(30_000);
try (TimeSeries timeSeries = getTimeSeriesWithSettings(timeSeriesCollectionSettings)) {
fail("Flush queue size should be mandatory");
} catch (IllegalArgumentException e) {
assertEquals("The ingestion series queue size must be greater than 1 when flushing periodically (flushing period greater than 0)", e.getMessage());
}
}

@Test
public void ingestionExceedingQueueSizeTest() throws InterruptedException {
TimeSeriesCollectionSettings timeSeriesCollectionSettings = new TimeSeriesCollectionSettings();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,11 @@ public void ingestionPipelineResolution() {
private void testResolutionFor10Points(int resolutionMs, int expectedBucketCount) {
// Create ingestion pipeline
InMemoryCollection<Bucket> bucketCollection = new InMemoryCollection<>();
TimeSeriesCollection collection = new TimeSeriesCollection(bucketCollection, resolutionMs, resolutionMs);
TimeSeriesCollectionSettings timeSeriesCollectionSettings = new TimeSeriesCollectionSettings()
.setResolution(resolutionMs)
.setIngestionFlushingPeriodMs(resolutionMs)
.setIngestionFlushSeriesQueueSize(100);
TimeSeriesCollection collection = new TimeSeriesCollection(bucketCollection, timeSeriesCollectionSettings);
TimeSeries timeSeries = new TimeSeriesBuilder().registerCollection(collection).build();
try (TimeSeriesIngestionPipeline ingestionPipeline = timeSeries.getIngestionPipeline()) {
for (int i = 0; i < 10; i++) {
Expand Down