diff --git a/sampling/reservoir_items_sketch.go b/sampling/reservoir_items_sketch.go index b571cae..f9abaca 100644 --- a/sampling/reservoir_items_sketch.go +++ b/sampling/reservoir_items_sketch.go @@ -20,8 +20,11 @@ package sampling import ( "encoding/binary" "errors" + "math" "math/rand" + "slices" + "github.com/apache/datasketches-go/common" "github.com/apache/datasketches-go/internal" ) @@ -38,7 +41,10 @@ const ( ResizeX8 ResizeFactor = 8 defaultResizeFactor = ResizeX8 - minK = 1 + minK = 2 + + // smallest sampling array allocated: 16 + minLgArrItems = 4 ) // ReservoirItemsSketch provides a uniform random sample of items @@ -52,19 +58,48 @@ const ( type ReservoirItemsSketch[T any] struct { k int // maximum reservoir size n int64 // total items seen - data []T // reservoir storage + rf ResizeFactor + data []T // reservoir storage +} + +type reservoirItemsSketchOptions struct { + resizeFactor ResizeFactor +} + +// ReservoirItemsSketchOptionFunc defines a functional option for configuring reservoirItemsSketchOptions. +type ReservoirItemsSketchOptionFunc func(*reservoirItemsSketchOptions) + +// WithReservoirItemsSketchResizeFactor sets the resize factor for the internal array. +func WithReservoirItemsSketchResizeFactor(rf ResizeFactor) ReservoirItemsSketchOptionFunc { + return func(r *reservoirItemsSketchOptions) { + r.resizeFactor = rf + } } // NewReservoirItemsSketch creates a new reservoir sketch with the given capacity k. -func NewReservoirItemsSketch[T any](k int) (*ReservoirItemsSketch[T], error) { +func NewReservoirItemsSketch[T any]( + k int, opts ...ReservoirItemsSketchOptionFunc, +) (*ReservoirItemsSketch[T], error) { if k < minK { - return nil, errors.New("k must be at least 1") + return nil, errors.New("k must be at least 2") } + options := &reservoirItemsSketchOptions{ + resizeFactor: defaultResizeFactor, + } + for _, opt := range opts { + opt(options) + } + + ceilingLgK, _ := internal.ExactLog2(common.CeilingPowerOf2(k)) + initialLgSize := startingSubMultiple( + ceilingLgK, int(math.Log2(float64(options.resizeFactor))), minLgArrItems, + ) return &ReservoirItemsSketch[T]{ k: k, n: 0, - data: make([]T, 0, min(k, int(defaultResizeFactor))), + rf: options.resizeFactor, + data: make([]T, 0, adjustedSamplingAllocationSize(k, 1<= int64(cap(s.data)) { + s.growReservoir() + } + s.data = append(s.data, item) } else { // Steady state: replace with probability k/n @@ -83,6 +122,11 @@ func (s *ReservoirItemsSketch[T]) Update(item T) { s.n++ } +func (s *ReservoirItemsSketch[T]) growReservoir() { + adjustedSize := adjustedSamplingAllocationSize(s.k, cap(s.data)<= 1 && sample <= int64(total)) + _, exists := seen[sample] + assert.False(t, exists) + seen[sample] = struct{}{} + } + }) } -func TestReservoirItemsSketchUpdateAboveK(t *testing.T) { - sketch, _ := NewReservoirItemsSketch[int64](10) - - for i := int64(1); i <= 1000; i++ { - sketch.Update(i) - } +func TestReservoirItemsSketchReset(t *testing.T) { + k := 1024 - assert.Equal(t, int64(1000), sketch.N()) - assert.Equal(t, 10, sketch.NumSamples()) -} + sketch, err := NewReservoirItemsSketch[int64](k) + assert.NoError(t, err) -func TestReservoirItemsSketchReset(t *testing.T) { - sketch, _ := NewReservoirItemsSketch[int64](10) + ceilingLgK, _ := internal.ExactLog2(common.CeilingPowerOf2(k)) + initialLgSize := startingSubMultiple( + ceilingLgK, int(math.Log2(float64(defaultResizeFactor))), minLgArrItems, + ) + expectedInitialCap := adjustedSamplingAllocationSize(k, 1<