diff --git a/docs/api/metrics.rst b/docs/api/metrics.rst index 8ad31053e..761c27def 100644 --- a/docs/api/metrics.rst +++ b/docs/api/metrics.rst @@ -38,6 +38,10 @@ Basic Statistics ~lenskit.metrics.ListLength ~lenskit.metrics.TestItemCount + ~lenskit.stats.BagOfLittleBootstraps + +The Bag of Little Bootstraps (BLB) implementation provides both sequential and parallel +processing capabilities. To use the parallel implementation with Ray .. _metrics-topn: diff --git a/src/lenskit/basic/random.py b/src/lenskit/basic/random.py index 76e0719ae..6be059a81 100644 --- a/src/lenskit/basic/random.py +++ b/src/lenskit/basic/random.py @@ -11,9 +11,9 @@ from lenskit.data import ItemList from lenskit.data.query import QueryInput, RecQuery +from lenskit.data.types import argtopn from lenskit.pipeline import Component from lenskit.random import DerivableSeed, RNGFactory, derivable_rng -from lenskit.stats import argtopn class RandomConfig(BaseModel, arbitrary_types_allowed=True): diff --git a/src/lenskit/data/items.py b/src/lenskit/data/items.py index 89759ea97..0bfa652d2 100644 --- a/src/lenskit/data/items.py +++ b/src/lenskit/data/items.py @@ -28,8 +28,8 @@ ) from lenskit._accel import data as _data_accel +from lenskit.data.types import argtopn from lenskit.diagnostics import DataWarning -from lenskit.stats import argtopn from .arrow import get_indexer from .checks import check_1d diff --git a/src/lenskit/data/types.py b/src/lenskit/data/types.py index 38408f606..6f27fae5f 100644 --- a/src/lenskit/data/types.py +++ b/src/lenskit/data/types.py @@ -16,6 +16,7 @@ import numpy as np import pandas as pd import pyarrow as pa +from numpy.typing import ArrayLike from typing_extensions import Any, Generic, Literal, Sequence, TypeAlias, TypeVar FeedbackType: TypeAlias = Literal["explicit", "implicit"] @@ -79,3 +80,39 @@ class UIPair(Generic[T]): user: T item: T + + +def argtopn(xs: ArrayLike, n: int) -> NPVector[np.int64]: + """ + Compute the ordered positions of the top *n* elements. Similar to + :func:`torch.topk`, but works with NumPy arrays and only returns the + indices. + + .. deprecated:: 2025.3.0 + + This was never declared stable, but is now deprecated and will be + removed in 2026.1. + """ + if n == 0: + return np.empty(0, np.int64) + + xs = np.asarray(xs) + + N = len(xs) + invalid = np.isnan(xs) + if np.any(invalid): + mask = ~invalid + vxs = xs[mask] + remap = np.arange(N)[mask] + res = argtopn(vxs, n) + return remap[res] + + if n >= 0 and n < N: + parts = np.argpartition(-xs, n) + top_scores = xs[parts[:n]] + top_sort = np.argsort(-top_scores) + order = parts[top_sort] + else: + order = np.argsort(-xs) + + return order diff --git a/src/lenskit/stats/__init__.py b/src/lenskit/stats/__init__.py new file mode 100644 index 000000000..2fd147d1c --- /dev/null +++ b/src/lenskit/stats/__init__.py @@ -0,0 +1,14 @@ +# This file is part of LensKit. +# Copyright (C) 2018-2023 Boise State University. +# Copyright (C) 2023-2025 Drexel University. +# Licensed under the MIT license, see LICENSE.md for details. +# SPDX-License-Identifier: MIT + +""" +Statistical utilities for LensKit. +""" + +from .base import gini +from .blb import BagOfLittleBootstraps + +__all__ = ["gini", "BagOfLittleBootstraps"] diff --git a/src/lenskit/stats.py b/src/lenskit/stats/base.py similarity index 66% rename from src/lenskit/stats.py rename to src/lenskit/stats/base.py index 6b66a59cc..b29a57f06 100644 --- a/src/lenskit/stats.py +++ b/src/lenskit/stats/base.py @@ -14,10 +14,13 @@ import numpy as np from numpy.typing import ArrayLike +from typing_extensions import TypeVar from lenskit.data.types import NPVector from lenskit.diagnostics import DataWarning +T = TypeVar("T") + def gini(xs: ArrayLike) -> float: """ @@ -60,39 +63,3 @@ def gini(xs: ArrayLike) -> float: "Gini coefficient is not defined for non-positive totals", DataWarning, stacklevel=2 ) return max(num / denom, 0) - - -def argtopn(xs: ArrayLike, n: int) -> NPVector[np.int64]: - """ - Compute the ordered positions of the top *n* elements. Similar to - :func:`torch.topk`, but works with NumPy arrays and only returns the - indices. - - .. deprecated:: 2025.3.0 - - This was never declared stable, but is now deprecated and will be - removed in 2026.1. - """ - if n == 0: - return np.empty(0, np.int64) - - xs = np.asarray(xs) - - N = len(xs) - invalid = np.isnan(xs) - if np.any(invalid): - mask = ~invalid - vxs = xs[mask] - remap = np.arange(N)[mask] - res = argtopn(vxs, n) - return remap[res] - - if n >= 0 and n < N: - parts = np.argpartition(-xs, n) - top_scores = xs[parts[:n]] - top_sort = np.argsort(-top_scores) - order = parts[top_sort] - else: - order = np.argsort(-xs) - - return order diff --git a/src/lenskit/stats/blb.py b/src/lenskit/stats/blb.py new file mode 100644 index 000000000..ad1be6f6e --- /dev/null +++ b/src/lenskit/stats/blb.py @@ -0,0 +1,226 @@ +# This file is part of LensKit. +# Copyright (C) 2018-2023 Boise State University. +# Copyright (C) 2023-2025 Drexel University. +# Licensed under the MIT license, see LICENSE.md for details. +# SPDX-License-Identifier: MIT + +""" +Implementation of the Bag of Little Bootstraps (BLB) method. +""" + +from __future__ import annotations + +import warnings +from dataclasses import dataclass +from typing import Callable, TypeVar + +import numpy as np +import pandas as pd + +from lenskit.diagnostics import DataWarning +from lenskit.random import RNGInput, random_generator + +T = TypeVar("T") +S = TypeVar("S") + + +@dataclass +class BLBResult: + """ + Results from a BLB analysis. + """ + + mean: float + "The mean of the statistic across all bootstrap replicates." + std: float + "The standard deviation of the statistic across all bootstrap replicates." + ci_lower: float + "The lower bound of the confidence interval." + ci_upper: float + "The upper bound of the confidence interval." + replicates: np.ndarray + "The individual replicate values." + + def __str__(self): + return f"mean={self.mean:.4f} (±{self.std:.4f}, {self.ci_lower:.4f}—{self.ci_upper:.4f})" + + +def _process_subsample( + data: np.ndarray | pd.DataFrame, + statistic: Callable[[np.ndarray | pd.DataFrame], float], + subsample_idx: np.ndarray, + n: int, + subsample_n: int, + n_bootstrap: int, + seed: int | None, +) -> np.ndarray: + """ + Process a single subsample for BLB analysis. + This is separated out to facilitate parallel processing. + """ + rng = random_generator(seed) + values = np.zeros(n_bootstrap) + + subsample = data[subsample_idx] if isinstance(data, np.ndarray) else data.iloc[subsample_idx] + + for j in range(n_bootstrap): + boot_idx = rng.choice(subsample_n, size=n, replace=True) + bootstrap = ( + subsample[boot_idx] if isinstance(data, np.ndarray) else subsample.iloc[boot_idx] + ) + values[j] = statistic(bootstrap) + + return values + + +class BagOfLittleBootstraps: + """ + Implementation of the Bag of Little Bootstraps method for statistical analysis. + + The BLB method works by: + 1. Taking multiple subsamples of the data + 2. For each subsample, computing bootstrap replicates + 3. Computing the desired statistic on each replicate + 4. Aggregating the results across all subsamples and replicates + + Args: + n_subsamples: + The number of subsamples to use (default: 10) + subsample_size: + The size of each subsample as a fraction of the data (default: 0.5) + n_bootstrap: + The number of bootstrap replicates per subsample (default: 100) + confidence: + The confidence level for intervals (default: 0.95) + rng: + Random number generator or seed + """ + + def __init__( + self, + n_subsamples: int = 10, + subsample_size: float = 0.5, + n_bootstrap: int = 100, + confidence: float = 0.95, + rng: RNGInput = None, + ): + self.n_subsamples = n_subsamples + self.subsample_size = subsample_size + self.n_bootstrap = n_bootstrap + self.confidence = confidence + self._rng = random_generator(rng) + + def analyze( + self, + data: np.ndarray | pd.DataFrame, + statistic: Callable[[np.ndarray | pd.DataFrame], float], + ) -> BLBResult: + """ + Analyze a dataset using the BLB method (sequential implementation). + + Args: + data: + The input data (numpy array or pandas DataFrame) + statistic: + A function that computes the desired statistic on the data + + Returns: + The BLB analysis results + """ + n = len(data) + subsample_n = int(n * self.subsample_size) + alpha = 1 - self.confidence + all_values = np.zeros(self.n_subsamples * self.n_bootstrap) + + for i in range(self.n_subsamples): + subsample_idx = self._rng.choice(n, size=subsample_n, replace=False) + values = _process_subsample( + data, + statistic, + subsample_idx, + n, + subsample_n, + self.n_bootstrap, + self._rng.integers(0, 2**32), + ) + all_values[i * self.n_bootstrap : (i + 1) * self.n_bootstrap] = values + + return self._compute_results(all_values) + + def analyze_parallel( + self, + data: np.ndarray | pd.DataFrame, + statistic: Callable[[np.ndarray | pd.DataFrame], float], + ) -> BLBResult: + """ + Analyze a dataset using the BLB method with Ray-based parallelization. + + This method requires Ray to be installed and initialized. If Ray is not + available, it falls back to the sequential implementation with a warning. + + Args: + data: + The input data (numpy array or pandas DataFrame) + statistic: + A function that computes the desired statistic on the data + + Returns: + The BLB analysis results + """ + try: + import ray + except ImportError: + warnings.warn( + "Ray is not available - falling back to sequential implementation", + DataWarning, + stacklevel=2, + ) + return self.analyze(data, statistic) + + if not ray.is_initialized(): + warnings.warn( + "Ray is not initialized - falling back to sequential implementation", + DataWarning, + stacklevel=2, + ) + return self.analyze(data, statistic) + + n = len(data) + subsample_n = int(n * self.subsample_size) + + @ray.remote + def remote_process_subsample( + data, statistic, subsample_idx, n, subsample_n, n_bootstrap, seed + ): + return _process_subsample( + data, statistic, subsample_idx, n, subsample_n, n_bootstrap, seed + ) + + subsample_indices = [ + self._rng.choice(n, size=subsample_n, replace=False) for _ in range(self.n_subsamples) + ] + seeds = self._rng.integers(0, 2**32, size=self.n_subsamples) + + futures = [ + remote_process_subsample.remote( + data, statistic, idx, n, subsample_n, self.n_bootstrap, seed + ) + for idx, seed in zip(subsample_indices, seeds) + ] + + results = ray.get(futures) + all_values = np.concatenate(results) + + return self._compute_results(all_values) + + def _compute_results(self, all_values: np.ndarray) -> BLBResult: + """ + Compute final BLB results from replicate values. + """ + alpha = 1 - self.confidence + mean = np.mean(all_values) + std = np.std(all_values, ddof=1) + ci_lower = np.percentile(all_values, alpha * 100 / 2) + ci_upper = np.percentile(all_values, 100 - alpha * 100 / 2) + + return BLBResult(mean, std, ci_lower, ci_upper, all_values) diff --git a/tests/benchmarks/benchmark_topn.py b/tests/benchmarks/benchmark_topn.py index 314e93cf3..0f7d36c1d 100644 --- a/tests/benchmarks/benchmark_topn.py +++ b/tests/benchmarks/benchmark_topn.py @@ -15,8 +15,8 @@ from pytest import mark from lenskit._accel import data +from lenskit.data.types import argtopn from lenskit.parallel import ensure_parallel_init -from lenskit.stats import argtopn @mark.benchmark(group="all") diff --git a/tests/math/test_argtopn.py b/tests/math/test_argtopn.py index f9730402d..0b6729900 100644 --- a/tests/math/test_argtopn.py +++ b/tests/math/test_argtopn.py @@ -10,7 +10,7 @@ import hypothesis.strategies as st from hypothesis import given, settings -from lenskit.stats import argtopn +from lenskit.data.types import argtopn def test_simple_topn(): diff --git a/tests/test_stats_blb.py b/tests/test_stats_blb.py new file mode 100644 index 000000000..a33ad3af7 --- /dev/null +++ b/tests/test_stats_blb.py @@ -0,0 +1,135 @@ +# This file is part of LensKit. +# Copyright (C) 2018-2023 Boise State University. +# Copyright (C) 2023-2025 Drexel University. +# Licensed under the MIT license, see LICENSE.md for details. +# SPDX-License-Identifier: MIT + +""" +Tests for the Bag of Little Bootstraps implementation. +""" + +import sys +import warnings +from unittest.mock import patch + +import numpy as np +import pandas as pd + +import pytest +from pytest import approx + +from lenskit.diagnostics import DataWarning +from lenskit.stats import BagOfLittleBootstraps + + +def test_blb_mean(): + rng = np.random.default_rng(42) + data = rng.normal(loc=5.0, scale=2.0, size=1000) + + blb = BagOfLittleBootstraps(n_subsamples=5, n_bootstrap=50, rng=42) + + result = blb.analyze(data, np.mean) + + assert result.mean == approx(5.0, abs=0.2) + assert 0.05 < result.std < 0.5 + assert result.ci_lower < 5.0 < result.ci_upper + assert len(result.replicates) == 5 * 50 + + +def test_blb_dataframe(): + rng = np.random.default_rng(42) + df = pd.DataFrame( + { + "value": rng.normal(loc=5.0, scale=2.0, size=1000), + "group": rng.choice(["A", "B"], size=1000), + } + ) + + blb = BagOfLittleBootstraps(rng=42) + + def df_mean(data): + return data["value"].mean() + + result = blb.analyze(df, df_mean) + + assert result.mean == approx(5.0, abs=0.2) + assert result.ci_lower < 5.0 < result.ci_upper + + +def test_blb_str(): + result = BagOfLittleBootstraps(rng=42).analyze(np.array([1.0, 2.0, 3.0, 4.0, 5.0]), np.mean) + s = str(result) + assert "mean=" in s + assert "±" in s + assert "—" in s + + +def test_blb_parallel_no_ray(): + data = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) + blb = BagOfLittleBootstraps(rng=42) + with patch.dict(sys.modules, {"ray": None}): + with warnings.catch_warnings(record=True) as w: + result = blb.analyze_parallel(data, np.mean) + assert any(issubclass(warn.category, DataWarning) for warn in w) + assert any( + "Ray is not available - falling back to sequential implementation" + in str(warn.message) + for warn in w + ) + assert result.mean == approx(3.0, abs=0.5) + + +@pytest.mark.skipif( + pytest.importorskip("ray", reason="Ray not available") is None, reason="Ray not available" +) +def test_blb_parallel(): + import ray + + ray.init(num_cpus=2, ignore_reinit_error=True) + + try: + rng = np.random.default_rng(42) + data = rng.normal(loc=5.0, scale=2.0, size=1000) + + blb = BagOfLittleBootstraps(n_subsamples=5, n_bootstrap=50, rng=42) + + result = blb.analyze_parallel(data, np.mean) + + assert result.mean == approx(5.0, abs=0.2) + assert 0.05 < result.std < 0.5 + assert result.ci_lower < 5.0 < result.ci_upper + assert len(result.replicates) == 5 * 50 + + finally: + ray.shutdown() + + +@pytest.mark.skipif( + pytest.importorskip("ray", reason="Ray not available") is None, reason="Ray not available" +) +def test_blb_parallel_dataframe(): + import ray + + ray.init(num_cpus=2, ignore_reinit_error=True) + + try: + rng = np.random.default_rng(42) + df = pd.DataFrame( + { + "value": rng.normal(loc=5.0, scale=2.0, size=1000), + "group": rng.choice(["A", "B"], size=1000), + } + ) + + blb = BagOfLittleBootstraps(rng=42) + + def df_mean(data): + return data["value"].mean() + + result = blb.analyze_parallel(df, df_mean) + + assert result.mean == approx(5.0, abs=0.2) + assert result.ci_lower < 5.0 < result.ci_upper + + finally: + ray.shutdown()