Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ opt-level = 2
[profile.release]
lto = true
debug = "line-tables-only"

[features]
fuse-parallel = []
8 changes: 8 additions & 0 deletions pipelines/slim.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[meta]
name = "slim"

[options]
base = "std:topn"

[components.scorer]
class = "lenskit.knn.SLIMScorer"
5 changes: 2 additions & 3 deletions src/accel/als/explicit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use rayon::prelude::*;
use log::*;

use crate::{
parallel::maybe_fuse,
progress::ProgressHandle,
sparse::{CSRMatrix, CSR},
};
Expand Down Expand Up @@ -46,9 +47,7 @@ pub(super) fn train_explicit_matrix<'py>(
);

let frob: f32 = py.allow_threads(|| {
this.outer_iter_mut()
.into_par_iter()
.enumerate()
maybe_fuse(this.outer_iter_mut().into_par_iter().enumerate())
.map(|(i, row)| {
let f = train_row_solve(&matrix, i, row, &other, reg);
progress.tick();
Expand Down
5 changes: 2 additions & 3 deletions src/accel/als/implicit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use rayon::prelude::*;
use log::*;

use crate::{
parallel::maybe_fuse,
progress::ProgressHandle,
sparse::{CSRMatrix, CSR},
};
Expand Down Expand Up @@ -48,9 +49,7 @@ pub(super) fn train_implicit_matrix<'py>(
other.nrows()
);
let frob: f32 = py.allow_threads(|| {
this.outer_iter_mut()
.into_par_iter()
.enumerate()
maybe_fuse(this.outer_iter_mut().into_par_iter().enumerate())
.map(|(i, row)| {
let f = train_row_solve(&matrix, i, row, &other, &otor);
progress.tick();
Expand Down
27 changes: 5 additions & 22 deletions src/accel/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
// Licensed under the MIT license, see LICENSE.md for details.
// SPDX-License-Identifier: MIT

use log::*;
use pyo3::exceptions::PyRuntimeError;
use pyo3::prelude::*;
use rayon::{current_num_threads, ThreadPoolBuilder};

mod als;
mod arrow;
Expand All @@ -16,7 +13,9 @@ mod data;
mod funksvd;
mod indirect_hashing;
mod knn;
mod parallel;
mod progress;
mod slim;
mod sparse;

/// Entry point for LensKit accelerator module.
Expand All @@ -26,29 +25,13 @@ fn _accel(m: &Bound<'_, PyModule>) -> PyResult<()> {
knn::register_knn(m)?;
als::register_als(m)?;
data::register_data(m)?;
slim::register_slim(m)?;

m.add_class::<funksvd::FunkSVDTrainer>()?;
m.add_function(wrap_pyfunction!(init_accel_pool, m)?)?;
m.add_function(wrap_pyfunction!(thread_count, m)?)?;
m.add_function(wrap_pyfunction!(parallel::init_accel_pool, m)?)?;
m.add_function(wrap_pyfunction!(parallel::thread_count, m)?)?;
m.add_function(wrap_pyfunction!(sparse::sparse_row_debug_type, m)?)?;
m.add_function(wrap_pyfunction!(sparse::sparse_structure_debug_large, m)?)?;

Ok(())
}

#[pyfunction]
fn init_accel_pool(n_threads: usize) -> PyResult<()> {
debug!(
"initializing accelerator thread pool with {} threads",
n_threads
);
ThreadPoolBuilder::new()
.num_threads(n_threads)
.build_global()
.map_err(|_| PyErr::new::<PyRuntimeError, _>("Rayon initialization error"))
}

#[pyfunction]
fn thread_count() -> PyResult<usize> {
Ok(current_num_threads())
}
39 changes: 39 additions & 0 deletions src/accel/parallel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// This file is part of LensKit.
// Copyright (C) 2018-2023 Boise State University.
// Copyright (C) 2023-2025 Drexel University.
// Licensed under the MIT license, see LICENSE.md for details.
// SPDX-License-Identifier: MIT

use log::*;
use pyo3::{exceptions::PyRuntimeError, prelude::*};

#[cfg(feature = "fuse-parallel")]
use rayon::iter::PanicFuse;
use rayon::{current_num_threads, iter::ParallelIterator, ThreadPoolBuilder};

#[pyfunction]
pub fn init_accel_pool(n_threads: usize) -> PyResult<()> {
debug!(
"initializing accelerator thread pool with {} threads",
n_threads
);
ThreadPoolBuilder::new()
.num_threads(n_threads)
.build_global()
.map_err(|_| PyErr::new::<PyRuntimeError, _>("Rayon initialization error"))
}

#[pyfunction]
pub fn thread_count() -> PyResult<usize> {
Ok(current_num_threads())
}

#[cfg(not(feature = "fuse-parallel"))]
pub fn maybe_fuse<I: ParallelIterator>(iter: I) -> I {
iter
}

#[cfg(feature = "fuse-parallel")]
pub fn maybe_fuse<I: ParallelIterator>(iter: I) -> PanicFuse<I> {
iter.panic_fuse()
}
204 changes: 204 additions & 0 deletions src/accel/slim/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// This file is part of LensKit.
// Copyright (C) 2018-2023 Boise State University.
// Copyright (C) 2023-2025 Drexel University.
// Licensed under the MIT license, see LICENSE.md for details.
// SPDX-License-Identifier: MIT

//! Sparse Linear Methods for recommendation.

use log::*;
use pyo3::{exceptions::PyValueError, prelude::*};
use rayon::prelude::*;

use arrow::{
array::{make_array, Array, ArrayData},
pyarrow::PyArrowType,
};

use crate::{
parallel::maybe_fuse,
sparse::{ArrowCSRConsumer, CSRStructure, CSR},
};

const EPSILON: f64 = 1.0e-12;
// default value from Karypis code
const OPT_TOLERANCE: f64 = 1e-7;

#[derive(Debug, Clone, Copy)]
struct SLIMOptions {
l1_reg: f64,
l2_reg: f64,
max_iters: u32,
}

struct SLIMWorkspace<'a> {
options: SLIMOptions,
ui_matrix: &'a CSRStructure<i64>,
iu_matrix: &'a CSRStructure<i64>,
n_users: usize,
n_items: usize,
}

/// Register the lenskit._accel.slim module
pub fn register_slim(parent: &Bound<'_, PyModule>) -> PyResult<()> {
let slim = PyModule::new(parent.py(), "slim")?;
parent.add_submodule(&slim)?;
slim.add_function(wrap_pyfunction!(train_slim, &slim)?)?;

Ok(())
}

/// Learn SLIM regression weights.
///
/// This returns the **transpose** of the weight matrix, for convenient
/// implementation.
#[pyfunction]
fn train_slim<'py>(
py: Python<'py>,
ui_matrix: PyArrowType<ArrayData>,
iu_matrix: PyArrowType<ArrayData>,
l1_reg: f64,
l2_reg: f64,
max_iters: u32,
progress: Bound<'py, PyAny>,
) -> PyResult<Vec<PyArrowType<ArrayData>>> {
let ui_matrix = make_array(ui_matrix.0);
let ui_matrix = CSRStructure::<i64>::from_arrow(ui_matrix)?;
let iu_matrix = make_array(iu_matrix.0);
let iu_matrix = CSRStructure::<i64>::from_arrow(iu_matrix)?;

if ui_matrix.n_rows != iu_matrix.n_cols {
return Err(PyValueError::new_err("user count mismatch"));
}
if ui_matrix.n_cols != iu_matrix.n_rows {
return Err(PyValueError::new_err("item count mismatch"));
}
if ui_matrix.nnz() != iu_matrix.nnz() {
return Err(PyValueError::new_err("rating count mismatch"));
}

let progress = if progress.is_none() {
None
} else {
Some(progress.unbind())
};
let options = SLIMOptions {
l1_reg,
l2_reg,
max_iters,
};

debug!("computing similarity rows");
let collector = if let Some(pb) = progress {
ArrowCSRConsumer::with_progress(ui_matrix.n_cols, pb)
} else {
ArrowCSRConsumer::new(ui_matrix.n_cols)
};

let result = py.allow_threads(move || {
let range = 0..ui_matrix.n_cols;
let chunks = maybe_fuse(range.into_par_iter())
.map_init(
|| SLIMWorkspace::create(&ui_matrix, &iu_matrix, &options),
SLIMWorkspace::compute_column,
)
.drive(collector);
chunks.into_iter().map(|a| a.into_data().into()).collect()
});

Ok(result)
}

impl<'a> SLIMWorkspace<'a> {
fn create(
ui_matrix: &'a CSRStructure<i64>,
iu_matrix: &'a CSRStructure<i64>,
options: &SLIMOptions,
) -> Self {
let n_items = ui_matrix.n_cols;
let n_users = ui_matrix.n_rows;
SLIMWorkspace {
options: *options,
ui_matrix,
iu_matrix,
n_users,
n_items,
}
}

/// Train a single column of the SLIM weight matrix.
///
/// This code was written from the papers, referencing Karypis's LIBSLIM for
/// ideas on implementation details. The relevant LIBSLIM source code
/// is at https://github.com/KarypisLab/SLIM/tree/master/src/libslim.
fn compute_column(&mut self, item: usize) -> Vec<(i32, f32)> {
// get the active users for this item
let i_users = self.iu_matrix.row_cols(item);
// since it's all 1s, the length of active entries is the squared norm
let sq_cnorm = i_users.len() as f64;

let mut weights = vec![0.0; self.n_items];
let mut estimates = vec![0.0; self.n_users];

for iter in 0..self.options.max_iters {
let mut sqdelta = 0.0;
// coordinate descent - loop over items, learn that row in the weight vector
for i in 0..self.n_items {
let old_w = weights[i];
// subtract this item's contribution to the estimate
if old_w > 0.0 {
for c in i_users {
estimates[*c as usize] -= old_w
}
}

// compute the update value - sum errors where user is active (so rating is 1)
let mut update = 0.0;
for u in i_users {
let u = *u as usize;
update += 1.0 - estimates[u];
}
// convert to mean
update /= self.n_users as f64;

// soft-threshold and adjust
let new = if update >= self.options.l1_reg {
let num = update - self.options.l1_reg;
num / (sq_cnorm - self.options.l2_reg)
} else {
0.0
};
let delta = new - old_w;
sqdelta += delta * delta;
weights[i] = new;

// update estimates
if new > 0.0 {
for c in i_users {
estimates[*c as usize] += new
}
}
}
if sqdelta <= OPT_TOLERANCE {
debug!("finished column {} after {} iters", item, iter + 1);
break;
}
}

// sparsify weights for final result
let res: Vec<_> = weights
.into_iter()
.enumerate()
.filter_map(|(i, v)| {
if v >= EPSILON {
Some((i as i32, v as f32))
} else {
None
}
})
.collect();

// and we're done!
res
}
}
1 change: 1 addition & 0 deletions src/accel/sparse/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl ArrowCSRConsumer {
val_bld: Float32Builder::new(),
}
}

pub(crate) fn new(dim: usize) -> Self {
Self::from_state(CSRState::new(dim, None))
}
Expand Down
3 changes: 2 additions & 1 deletion src/lenskit/_accel/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import pyarrow as pa
from lenskit.data.types import NPMatrix, NPVector
from lenskit.funksvd import FunkSVDTrainingData, FunkSVDTrainingParams

from . import als, data
from . import als, data, slim

__all__ = [
"als",
"data",
"slim",
"init_accel_pool",
"thread_count",
"FunkSVDTrainer",
Expand Down
13 changes: 13 additions & 0 deletions src/lenskit/_accel/slim.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from collections.abc import Sequence

from lenskit.data.matrix import SparseRowArray
from lenskit.logging import Progress

def train_slim(
ui_matrix: SparseRowArray,
iu_matrix: SparseRowArray,
l1_reg: float,
l2_reg: float,
max_iters: int,
progress: Progress | None,
) -> Sequence[SparseRowArray]: ...
4 changes: 4 additions & 0 deletions src/lenskit/data/matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,10 @@ def values(self) -> pa.Array | None:
else:
return None

@property
def nnz(self) -> int:
return self.offsets[len(self)].as_py()

def structure(self) -> SparseRowArray:
"""
Get the structure of this matrix (without values).
Expand Down
Loading
Loading