-
Notifications
You must be signed in to change notification settings - Fork 21
Pluggable flow aggregation function functionality added #22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for flowmapgl canceled.
|
| result.push(aggregateFlow); | ||
| aggFlowsByKey.set(key, aggregateFlow); | ||
| } else { | ||
| aggregateFlow.values.push(flowCountsMapReduce.map(flow)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you add values to flow here? It will likely significantly increase the memory use for the resulting flows data structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I try to explain why I picked this approach. Previously, you applied a map-reduce approach which means summing up every new counts added to a cluster in each iteration. This approach works fine when our aggregation function is 'sum'.
The same can not be achieved if we wanna 'average'. To explain, say we have three edges with values of 10,5 and 8. If we apply the same approach for sum and just average them every time 'reduce' function is called, we get a different number than a real average.
(((10+5)/2) + 8)/2 != (10+5+8)/3
This approach, despite your concern, gives the developer full flexibility on what aggregation function to be used (e.x. weighted sum, logarithmic, exponential, etc.).
To avoid any performance loss with normal 'sum' aggregation, I change the code to use the previous 'reduce' function (and stop pushing values into the array) if getFlowAggFunc is not defined.
I also consider replacing arrays with a better performing function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I get that. I was more concerned with the memory footprint of the resulting data structure. In line 209 we add the new values property to the flow object, so the resulting data structure will keep more data than necessary. It appears to me that values is only used as a temporary accumulator for the flow counts, so we don't need to keep it forever.
Another issue with your proposed approach is that the reduce function is called every time a new value is added. This might slow the calculations down unnecessarily esp. if the aggregation function is costly.
Maybe we can instead accumulate the values in a separate temporary map similar to aggFlowsByKey, e.g.:
const aggFlowCountsByKey = new Map<string, number[]>();
After iterating over all flows we can call flowCountsMapReduce.reduce once for each of them and save the results to the flow counts. Then, we can leave aggFlowCountsByKey behind to be garbage collected.
Or alternatively, we can use your accumulation approach, but give the resulting array another pass at the end in which we calculate the averages from the values (calling the agg function only once per flow), add them as counts to the flows and delete the values property from the results. Maybe that's simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made the changes following the proposed approach. Added node aggregation feature as well. Please let me know if they work better now.
To be able to allow weighted aggregation I added another argument that allows to determine the weight attribute. Example would be we have a performance metric and we wanna weigthed sum it considering the volume(flow).
getFlowMagnitude: (flow) => flow.metric_1,
getFlowAggWeight: (flow) => flow.count,
getFlowAggFunc: (values) => values.reduce((accumulator, curr:any) => accumulator + curr.aggvalue*curr.aggweight, 0)
/values.reduce((acc,cur:any)=>acc + cur.aggweight,0),
The example app seem to be working quite responsive as the memory footprint dropped dramatically.
|
I think we also need to apply the aggregation function to the location totals, at least here: |
| result.push(aggregateFlow); | ||
| aggFlowsByKey.set(key, aggregateFlow); | ||
| } else { | ||
| aggregateFlow.values.push(flowCountsMapReduce.map(flow)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the latest commit, I made the required changes to apply the plugged aggregation function on the node clustering and volume function.
| getFlowTime?: FlowAccessor<F, Date>; // TODO: use number instead of Date | ||
| // getFlowColor?: FlowAccessor<string | undefined>; | ||
| getFlowAggFunc: FlowAggregatorFunc<number[], number>; | ||
| getFlowAggWeight: FlowAccessor<F, number>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the new weight function that similar to the magnitude function is fed into flowmap. getFlowAggWeight provides the weight for each edge volume/count provided via getFlowMagnitude.

As discussed this will allow the users to plug their own function to the flowmap layer via getFlowAggFunc.
Example would be below that averages rather than summing:
new FlowmapLayer<LocationDatum, FlowDatum>({
id: 'my-flowmap-layer',
data,
opacity: config.opacity,
pickable: true,
darkMode: config.darkMode,
colorScheme: config.colorScheme,
fadeAmount: config.fadeAmount,
fadeEnabled: config.fadeEnabled,
fadeOpacityEnabled: config.fadeOpacityEnabled,
locationTotalsEnabled: config.locationTotalsEnabled,
locationLabelsEnabled: config.locationLabelsEnabled,
animationEnabled: config.animationEnabled,
clusteringEnabled: config.clusteringEnabled,
clusteringAuto: config.clusteringAuto,
clusteringLevel: config.clusteringLevel,
adaptiveScalesEnabled: config.adaptiveScalesEnabled,
highlightColor: config.highlightColor,
maxTopFlowsDisplayNum: config.maxTopFlowsDisplayNum,
getLocationId: (loc) => loc.id,
getLocationLat: (loc) => loc.lat,
getLocationLon: (loc) => loc.lon,
getFlowOriginId: (flow) => flow.origin,
getLocationName: (loc) => loc.name,
getFlowDestId: (flow) => flow.dest,
getFlowMagnitude: (flow) => flow.count,
getFlowAggFunc: (values) => values.reduce((a, b) => a + b, 0)/values.length,
onHover: (info) => setTooltip(getTooltipState(info)),
onClick: (info) =>
console.log('clicked', info.object?.type, info.object, info),
});
Also, it defaults to sum when the function is not provided!