Skip to content

Conversation

@bplatz
Copy link
Contributor

@bplatz bplatz commented Dec 16, 2025

Summary

Implements incremental streaming aggregation for queries, reducing memory overhead for aggregate queries over large result sets. Instead of collecting all solutions into groups before computing aggregates, this approach updates aggregate state incrementally as solutions flow through the query pipeline.

Key Changes

  • Streaming aggregate descriptors (src/fluree/db/query/exec/eval.cljc) - Registry of {:init :step! :final} functions for count, count-distinct, sum, avg, min, max
  • Eligibility detection (src/fluree/db/query/exec/group.cljc) - Automatically uses streaming path when query structure permits (no HAVING clause, all selectors are group vars or streamable aggregates)
  • Selector updates (src/fluree/db/query/exec/select.cljc) - AggregateSelector and AsSelector now carry optional streaming-agg descriptor
  • Parse-time detection (src/fluree/db/query/fql/parse.cljc) - Builds streaming descriptors for simple aggregate forms like (count ?x), (sum ?y)

How It Works

Traditional grouping collects all solutions, then applies aggregate functions:
solutions → [collect all] → group → apply agg-fn → results

Streaming aggregation updates state incrementally:
solutions → step!(state, value) per solution → final(state) → results

Supported Aggregates

Function Streaming Support
count
count with *
count-distinct
sum
avg
min
max

Limitations

Falls back to traditional grouping when:

  • Query has a HAVING clause
  • Aggregate uses complex expressions (e.g., (sum (+ ?x ?y)))
  • Non-streamable aggregate functions (e.g., sample, group_concat)

@bplatz bplatz requested a review from a team December 16, 2025 13:38
Copy link
Contributor

@zonotope zonotope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to start a discussion about the approach and whether or not my suggestions are feasible.


(declare compare*)

(def streaming-aggregate-registry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having these grouped in a map like this implies that they are dynamic and will change, but these are a static, closed list as far as I can tell.

I think we're reinventing facilities the language already has. I think we could implement the same functionality with a StreamingAggregator protocol whose methods are initialize!, step!, and finalize, and then records that implement that protocol for all the keys in this registry map. Those records could also encapsulate their state .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zonotope take a look at commit 22d94e6 -- let me know if you prefer that.

@bplatz bplatz requested a review from zonotope January 7, 2026 16:05
Comment on lines 110 to 127
#?(:clj (defmulti ->offset-date-time
#(when-let [t (#{OffsetDateTime LocalDateTime LocalDate} (type %))]
t)))
#?(:clj (defmethod ->offset-date-time OffsetDateTime
[^OffsetDateTime datetime]
datetime))
#?(:clj (defmethod ->offset-date-time LocalDateTime
[^LocalDateTime datetime]
(.atOffset datetime ZoneOffset/UTC)))
#?(:clj (defmethod ->offset-date-time LocalDate
[^LocalDate date]
(.atOffset (.atStartOfDay date) ZoneOffset/UTC)))
#?(:clj (defmethod ->offset-date-time :default
[x]
(throw (ex-info "Cannot convert value to OffsetDateTime."
{:value x
:status 400
:error :db/invalid-fn-call}))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this fits better as a protocol since we're dealing with predefined types directly. I like to reserve multimethods for flat clojure data like maps and vectors, but protocols get much better performance if we have the option to let the jvm dispatch on type.

Here's how I would define and implement a protocol to convert to an offset datetime. I haven't run this code, so there might be some typos.

Suggested change
#?(:clj (defmulti ->offset-date-time
#(when-let [t (#{OffsetDateTime LocalDateTime LocalDate} (type %))]
t)))
#?(:clj (defmethod ->offset-date-time OffsetDateTime
[^OffsetDateTime datetime]
datetime))
#?(:clj (defmethod ->offset-date-time LocalDateTime
[^LocalDateTime datetime]
(.atOffset datetime ZoneOffset/UTC)))
#?(:clj (defmethod ->offset-date-time LocalDate
[^LocalDate date]
(.atOffset (.atStartOfDay date) ZoneOffset/UTC)))
#?(:clj (defmethod ->offset-date-time :default
[x]
(throw (ex-info "Cannot convert value to OffsetDateTime."
{:value x
:status 400
:error :db/invalid-fn-call}))))
#?(:clj
(defprotocol OffsetDateTimeConverter
(->offset-date-time [this])))
#?(:clj
(extend-protocol OffsetDateTimeConverter
OffsetDateTime
(->offset-date-time
[^OffsetDateTime this]
this)
LocalDateTime
(->offset-date-time
[^LocalDateTime this]
(.atOffset this ZoneOffset/UTC))
LocalDate
(->offset-date-time
[^LocalDate this]
(-> this .atStartOfDay (.atOffset ZoneOffset/UTC)))
Object
(->offset-date-time
[this]
(throw (ex-info "Cannot convert value to OffsetDateTime."
{:value this
:status 400
:error :db/invalid-fn-call})))))

Comment on lines 129 to 143
#?(:clj (defmulti ->offset-time
#(when-let [t (#{OffsetTime LocalTime} (type %))]
t)))
#?(:clj (defmethod ->offset-time OffsetTime
[^OffsetTime time]
time))
#?(:clj (defmethod ->offset-time LocalTime
[^LocalTime time]
(.atOffset time ZoneOffset/UTC)))
#?(:clj (defmethod ->offset-time :default
[x]
(throw (ex-info "Cannot convert value to OffsetTime."
{:value x
:status 400
:error :db/invalid-fn-call}))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, here's how I would define and implement an offset time converter protocol.

Suggested change
#?(:clj (defmulti ->offset-time
#(when-let [t (#{OffsetTime LocalTime} (type %))]
t)))
#?(:clj (defmethod ->offset-time OffsetTime
[^OffsetTime time]
time))
#?(:clj (defmethod ->offset-time LocalTime
[^LocalTime time]
(.atOffset time ZoneOffset/UTC)))
#?(:clj (defmethod ->offset-time :default
[x]
(throw (ex-info "Cannot convert value to OffsetTime."
{:value x
:status 400
:error :db/invalid-fn-call}))))
#?(:clj
(defprotocol OffsetTimeConverter
(->offset-time [this])))
#?(:clj
(extend-protocol OffsetTimeConverter
OffsetTime
(->offset-time
[^OffsetTime this]
this)
LocalTime
(->offset-time
[^LocalTime this]
(.atOffset time ZoneOffset/UTC))
Object
(->offset-time
[this]
(throw (ex-info "Cannot convert value to OffsetTime."
{:value this
:status 400
:error :db/invalid-fn-call})))))

(defn- streaming-agg-selector?
"Returns true if selector supports streaming aggregation."
[sel]
(or (instance? fluree.db.query.exec.select.StreamingAggregateSelector sel)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could require this type to avoid this long line

(every? streaming-agg-selector? selectors)
(and (seq group-vars)
(every? (fn [sel]
(or (and (instance? fluree.db.query.exec.select.VariableSelector sel)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could require this type to avoid this long line

selectors)))))

(defn- update-streaming-groups
"Reducer function that updates streaming aggregate states for each solution.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I read "reducer function", I think a function you pass to reduce. Then the function signature with four arguments is immediately confusing. Perhaps the docstring could be tweaked so that the first sentence is "Updates streaming aggregae states for a solution". I think that's clearer.

Comment on lines 19 to 27
(defrecord CountAggregator []
StreamingAggregator
(initialize [_] 0)
(step [_ state tv]
(if (some-> tv :value some?)
(inc state)
state))
(finalize [_ state]
(where/->typed-val state)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code would be simpler if the aggregator records encapsulated their state. Then you could use the aggregators themselves to keep track of the running tally. I also would remove the initialize method in favor of a constructor. Here's how I'd change the CountAggregator:

Suggested change
(defrecord CountAggregator []
StreamingAggregator
(initialize [_] 0)
(step [_ state tv]
(if (some-> tv :value some?)
(inc state)
state))
(finalize [_ state]
(where/->typed-val state)))
(defrecord CountAggregator [tally]
StreamingAggregator
(step [this tv]
(if (some-> tv :value some?)
(update this :tally inc)))
(finalize [_]
(where/->typed-val tally)))
(defn count-aggregator
[]
(->CountAggregator 0))

I'd change the other records similarly.

(cond-> (->AggregateSelector agg-function)
(->AggregateSelector agg-function))
([agg-function streaming-agg]
(if streaming-agg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this conditional should be lifted to the caller of this constructor function, and this function should be replaced with two different functions. In general I think conditionals should be evaluated as soon as possible. This simplifies the code by making clear execution branches instead of putting everything together and transmitting conditionals without context. If there's shared functionality between the two branches, then that could still be implemented with helper functions.

Comment on lines 724 to 731
(let [code (parse-code f)
fn-name (when (seq? code) (first code))
agg-vars (variables code)
agg-fn (eval/compile code context)
streaming-agg (build-streaming-agg code)
agg-info {:fn-name fn-name
:vars agg-vars}]
(select/aggregate-selector agg-fn streaming-agg agg-info)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With respect to my previous comment about lifting the conditional, I would rewrite this function this way:

Suggested change
(let [code (parse-code f)
fn-name (when (seq? code) (first code))
agg-vars (variables code)
agg-fn (eval/compile code context)
streaming-agg (build-streaming-agg code)
agg-info {:fn-name fn-name
:vars agg-vars}]
(select/aggregate-selector agg-fn streaming-agg agg-info)))
(let [code (parse-code f)
fn-name (when (seq? code) (first code))
agg-vars (variables code)
agg-fn (eval/compile code context)
agg-info {:fn-name fn-name
:vars agg-vars}]
(if-let [streaming-agg (build-streaming-agg code)]
(select/streaming-aggregate-selector agg-fn streaming-agg agg-info)
(select/aggregate-selector agg-fn agg-info))))

I'd do the same thing to parse-select-as-fn too, although a lot of what I'd change was here before this pr.

streaming-aggs (->> selectors
(filter streaming-agg-selector?)
(mapv :streaming-agg))
implicit? (and (empty? group-vars)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is necessary, then I think it's a bug unless I'm missing something. select/implicit-grouping?. If I remember correctly, select/implicit-grouping? should only return a truthy value if there are no grouped variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

select/implicit-grouping? only checks if a selector is an aggregate type, not whether there are group variables. Both conditions are needed for implicit grouping detection

(mapv :streaming-agg))
implicit? (and (empty? group-vars)
(some select/implicit-grouping? selectors))
streaming? (streaming-eligible? having streaming-aggs implicit?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be a separate binding. I think (if (streaming-eligible? having ...) is just as if not more readable.

@bplatz
Copy link
Contributor Author

bplatz commented Jan 15, 2026

@zonotope ready for re-review:

Changes Made

  1. aggregate.cljc - Protocol conversions
    Converted ->offset-date-time multimethod to OffsetDateTimeConverter protocol for better JVM dispatch performance
    Converted ->offset-time multimethod to OffsetTimeConverter protocol
  2. aggregate.cljc - Aggregator state encapsulation
    Refactored StreamingAggregator protocol to encapsulate state in records
    Removed initialize method; aggregators now start with initial state via constructor functions
    Changed step signature from (step this state tv) to (step this tv) - returns new aggregator
    Renamed finalize to complete (to avoid conflict with Java's Object.finalize())
    Updated all aggregator records: CountAggregator, CountStarAggregator, CountDistinctAggregator, SumAggregator, AvgAggregator, MinAggregator, MaxAggregator
    streaming-aggregators map now stores constructor functions
  3. group.cljc
    Added imports for AsSelector, StreamingAggregateSelector, VariableSelector to avoid long fully-qualified names
    Updated streaming-agg-selector? and streaming-eligible? to use short names
    Updated docstring for update-streaming-groups
    Updated update-streaming-groups and finalize-streaming-groups to work with new aggregator interface
    Inlined the streaming? binding into the if condition
  4. select.cljc
    Split aggregate-selector into two separate functions:
    aggregate-selector - for non-streaming aggregates
    streaming-aggregate-selector - for streaming aggregates
    Removed conditional logic from the function (lifted to caller)
  5. parse.cljc
    Updated parse-select-aggregate to explicitly choose between aggregate-selector and streaming-aggregate-selector

@bplatz
Copy link
Contributor Author

bplatz commented Jan 15, 2026

Note on ci/cd errors, this is fixed with #1190 -- we use 'latest' version of clj-kondo and I think a new version is flagging these.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants