Skip to content

Pre-Aggregation Storage + Management #1626

@shangyian

Description

@shangyian

Summary

We should promote pre-aggregations from embedded JSON blobs in materialization config to first-class database entities. This would enable sharing across cubes, independent status tracking, and better introspection.

When generating SQL for a set of metrics and dimensions, DJ computes grain groups that:

  • Source from a specific fact/parent node
  • Are defined by a specific grain (set of dimension columns)
  • Contain one or more measures (aggregations)

These grain groups are the natural unit of materialization - they can be computed once and reused by multiple metric queries. Grain groups remain abstract until a user decides to materialize them. Then we can create a PreAggregation record to track the materialization.

How it works today

Pre-aggregations are stored as MeasuresMaterialization within a JSON blob in the cube's materialization_config field:

cube.materialization_config = {
    "measures_materialization": {
        "source_node": "orders.fact_orders",
        "grain": ["date_id", "customer_id"],
        "measures": ["sum_revenue", "count_orders"],
        "table_hash": "abc123",
        "status": "active"
    }
}

Problems with this approach include:

  • If two cubes have the same source + grain, they create separate materializations.
  • Finding all pre-aggregations requires parsing JSON blobs across all cubes.
  • Pre-agg status is tied to the cube, not tracked independently.
  • Users can't easily see what pre-aggregations exist, their status, or which cubes use them.

Proposed Solution

Database Schema:

class PreAggregation(Base):
    """
    First-class pre-aggregation entity that can be shared across cubes.
    """
    __tablename__ = "pre_aggregations"
    
    id = Column(Integer, primary_key=True)
    
    # Identity - what makes this pre-agg unique
    source_node_id = Column(Integer, ForeignKey("node.id"), nullable=False)
    grain_hash = Column(String, nullable=False)  # Hash of sorted grain columns
    
    # Content
    grain_columns = Column(JSON, nullable=False)  # ["date_id", "customer_id"]
    measures = Column(JSON, nullable=False)       # ["sum_revenue", "count_orders"]
    
    # Versioning
    source_node_version = Column(String, nullable=False)
    table_hash = Column(String, nullable=False)   # For materialization naming
    
    # Status
    status = Column(Enum(PreAggStatus), default=PreAggStatus.PENDING)
    materialized_at = Column(DateTime, nullable=True)
    materialization_error = Column(Text, nullable=True)
    
    # Metadata
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, onupdate=datetime.utcnow)
    
    # Relationships
    source_node = relationship("Node")
    cubes = relationship("Cube", secondary="cube_pre_aggregation", back_populates="pre_aggregations")

    # Unique constraint on identity
    __table_args__ = (
        UniqueConstraint('source_node_id', 'grain_hash', 'source_node_version', name='uq_pre_agg_identity'),
    )


class PreAggStatus(StrEnum):
    PENDING = "pending"           # Not yet materialized
    MATERIALIZING = "materializing"  # In progress
    ACTIVE = "active"             # Materialized and ready
    FAILED = "failed"             # Materialization failed
    STALE = "stale"               # Source node updated, needs refresh

# Join table for cube <-> pre-aggregation relationship
cube_pre_aggregation = Table(
    'cube_pre_aggregation',
    Base.metadata,
    Column('cube_id', Integer, ForeignKey('cube.id'), primary_key=True),
    Column('pre_aggregation_id', Integer, ForeignKey('pre_aggregation.id'), primary_key=True),
)
  1. When a cube is created/updated, the system checks if a matching pre-aggregation already exists.
  2. When a source node is updated:
  • Existing pre-aggregations are marked STALE
  • New pre-aggregations are created for the new version
  • Old versions remain queryable until explicitly removed
  1. The query builder checks for materialized pre-aggregations when generating SQL.

API Changes

GET  /pre-aggregations/                    # List all pre-aggregations
GET  /pre-aggregations/{id}                # Get pre-aggregation details
GET  /pre-aggregations/{id}/cubes          # List cubes using this pre-agg
POST /pre-aggregations/{id}/materialize    # Trigger materialization
DELETE /pre-aggregations/{id}              # Delete (if not in use)

GET  /cubes/{name}/pre-aggregations        # List pre-aggs for a cube
GET  /nodes/{name}/pre-aggregations        # List pre-aggs sourcing from node

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions