Source code for choix.mm

"""Minorization-maximization inference algorithms."""

import itertools
from collections.abc import Callable
from typing import Any, TypeVar

import numpy as np
from numpy.typing import NDArray

from .convergence import NormOfDifferenceTest
from .typing import PairwiseData, RankingData, Top1Data
from .utils import exp_transform, log_transform

T = TypeVar("T")


def _mm(
    n_items: int,
    data: T,
    initial_params: NDArray[np.float64] | None,
    alpha: float,
    max_iter: int,
    tol: float,
    mm_fun: Callable[
        [int, T, NDArray[np.float64]], tuple[NDArray[np.float64], NDArray[np.float64]]
    ],
) -> NDArray[np.float64]:
    """
    Iteratively refine MM estimates until convergence.

    Raises
    ------
    RuntimeError
        If the algorithm does not converge after `max_iter` iterations.
    """
    if initial_params is None:
        params = np.zeros(n_items)
    else:
        params = initial_params
    converged = NormOfDifferenceTest(tol=tol, order=1)
    for _ in range(max_iter):
        nums, denoms = mm_fun(n_items, data, params)
        params = log_transform((nums + alpha) / (denoms + alpha))
        if converged(params):
            return params
    raise RuntimeError("Did not converge after {} iterations".format(max_iter))


def _mm_pairwise(
    n_items: int,
    data: PairwiseData,
    params: NDArray[np.float64],
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
    """Inner loop of MM algorithm for pairwise data."""
    weights = exp_transform(params)
    wins = np.zeros(n_items, dtype=float)
    denoms = np.zeros(n_items, dtype=float)
    for winner, loser in data:
        wins[winner] += 1.0
        val = 1.0 / (weights[winner] + weights[loser])
        denoms[winner] += val
        denoms[loser] += val
    return wins, denoms


[docs] def mm_pairwise( n_items: int, data: PairwiseData, initial_params: NDArray[np.float64] | None = None, alpha: float = 0.0, max_iter: int = 10000, tol: float = 1e-8, ) -> NDArray[np.float64]: """Compute the ML estimate of model parameters using the MM algorithm. This function computes the maximum-likelihood (ML) estimate of model parameters given pairwise-comparison data (see :ref:`data-pairwise`), using the minorization-maximization (MM) algorithm [Hun04]_, [CD12]_. If ``alpha > 0``, the function returns the maximum a-posteriori (MAP) estimate under a (peaked) Dirichlet prior. See :ref:`regularization` for details. Parameters ---------- n_items : int Number of distinct items. data : list of lists Pairwise-comparison data. initial_params : array_like, optional Parameters used to initialize the iterative procedure. alpha : float, optional Regularization parameter. max_iter : int, optional Maximum number of iterations allowed. tol : float, optional Maximum L1-norm of the difference between successive iterates to declare convergence. Returns ------- params : numpy.ndarray The ML estimate of model parameters. """ return _mm(n_items, data, initial_params, alpha, max_iter, tol, _mm_pairwise)
def _mm_rankings( n_items: int, data: RankingData, params: NDArray[np.float64], ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """Inner loop of MM algorithm for ranking data.""" weights = exp_transform(params) wins = np.zeros(n_items, dtype=float) denoms = np.zeros(n_items, dtype=float) for ranking in data: sum_ = weights.take(ranking).sum() for i, winner in enumerate(ranking[:-1]): wins[winner] += 1 val = 1.0 / sum_ for item in ranking[i:]: denoms[item] += val sum_ -= weights[winner] return wins, denoms
[docs] def mm_rankings( n_items: int, data: RankingData, initial_params: NDArray[np.float64] | None = None, alpha: float = 0.0, max_iter: int = 10000, tol: float = 1e-8, ) -> NDArray[np.float64]: """Compute the ML estimate of model parameters using the MM algorithm. This function computes the maximum-likelihood (ML) estimate of model parameters given ranking data (see :ref:`data-rankings`), using the minorization-maximization (MM) algorithm [Hun04]_, [CD12]_. If ``alpha > 0``, the function returns the maximum a-posteriori (MAP) estimate under a (peaked) Dirichlet prior. See :ref:`regularization` for details. Parameters ---------- n_items : int Number of distinct items. data : list of lists Ranking data. initial_params : array_like, optional Parameters used to initialize the iterative procedure. alpha : float, optional Regularization parameter. max_iter : int, optional Maximum number of iterations allowed. tol : float, optional Maximum L1-norm of the difference between successive iterates to declare convergence. Returns ------- params : numpy.ndarray The ML estimate of model parameters. """ return _mm(n_items, data, initial_params, alpha, max_iter, tol, _mm_rankings)
def _mm_top1( n_items: int, data: Top1Data, params: NDArray[np.float64], ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """Inner loop of MM algorithm for top1 data.""" weights = exp_transform(params) wins = np.zeros(n_items, dtype=float) denoms = np.zeros(n_items, dtype=float) for winner, losers in data: wins[winner] += 1 val = 1 / (weights.take(losers).sum() + weights[winner]) for item in itertools.chain([winner], losers): denoms[item] += val return wins, denoms
[docs] def mm_top1( n_items: int, data: Top1Data, initial_params: NDArray[np.float64] | None = None, alpha: float = 0.0, max_iter: int = 10000, tol: float = 1e-8, ) -> NDArray[np.float64]: """Compute the ML estimate of model parameters using the MM algorithm. This function computes the maximum-likelihood (ML) estimate of model parameters given top-1 data (see :ref:`data-top1`), using the minorization-maximization (MM) algorithm [Hun04]_, [CD12]_. If ``alpha > 0``, the function returns the maximum a-posteriori (MAP) estimate under a (peaked) Dirichlet prior. See :ref:`regularization` for details. Parameters ---------- n_items : int Number of distinct items. data : list of lists Top-1 data. initial_params : array_like, optional Parameters used to initialize the iterative procedure. alpha : float, optional Regularization parameter. max_iter : int, optional Maximum number of iterations allowed. tol : float, optional Maximum L1-norm of the difference between successive iterates to declare convergence. Returns ------- params : numpy.ndarray The ML estimate of model parameters. """ return _mm(n_items, data, initial_params, alpha, max_iter, tol, _mm_top1)
def _choicerank( n_items: int, data: tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]], params: NDArray[np.float64], ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """Inner loop of ChoiceRank algorithm.""" weights = exp_transform(params) adj, adj_t, traffic_in, traffic_out = data # First phase of message passing. zs = adj.dot(weights) # Second phase of message passing. with np.errstate(invalid="ignore"): denoms = adj_t.dot(traffic_out / zs) return traffic_in, denoms
[docs] def choicerank( digraph: Any, traffic_in: NDArray[np.float64], traffic_out: NDArray[np.float64], weight: str | None = None, initial_params: NDArray[np.float64] | None = None, alpha: float = 1.0, max_iter: int = 10000, tol: float = 1e-8, ) -> NDArray[np.float64]: """Compute the MAP estimate of a network choice model's parameters. This function computes the maximum-a-posteriori (MAP) estimate of model parameters given a network structure and node-level traffic data (see :ref:`data-network`), using the ChoiceRank algorithm [MG17]_, [KTVV15]_. The nodes are assumed to be labeled using consecutive integers starting from 0. Parameters ---------- digraph : networkx.DiGraph Directed graph representing the network. traffic_in : array_like Number of arrivals at each node. traffic_out : array_like Number of departures at each node. weight : str, optional The edge attribute that holds the numerical value used for the edge weight. If None (default) then all edge weights are 1. initial_params : array_like, optional Parameters used to initialize the iterative procedure. alpha : float, optional Regularization parameter. max_iter : int, optional Maximum number of iterations allowed. tol : float, optional Maximum L1-norm of the difference between successive iterates to declare convergence. Returns ------- params : numpy.ndarray The MAP estimate of model parameters. Raises ------ ImportError If the NetworkX library cannot be imported. """ import networkx as nx # Compute the (sparse) adjacency matrix. n_items = len(digraph) nodes = np.arange(n_items) adj = nx.to_scipy_sparse_matrix(digraph, nodelist=nodes, weight=weight) # pyright: ignore[reportArgumentType] adj_t = adj.T.tocsr() # Process the data into a standard form. traffic_in = np.asarray(traffic_in) traffic_out = np.asarray(traffic_out) data = (adj, adj_t, traffic_in, traffic_out) return _mm(n_items, data, initial_params, alpha, max_iter, tol, _choicerank)