import math
import random
import warnings
from collections.abc import Sequence
from typing import Any, cast
import numpy as np
import scipy.linalg as spl
from numpy.typing import ArrayLike, NDArray
from scipy.linalg import solve_triangular
from scipy.special import logsumexp
from scipy.stats import kendalltau, rankdata
from .typing import PairwiseData, RankingData, Top1Data
SQRT2 = math.sqrt(2.0)
SQRT2PI = math.sqrt(2.0 * math.pi)
def log_transform(weights: ArrayLike) -> NDArray[np.float64]:
"""Transform weights into centered log-scale parameters."""
params = np.log(weights)
return params - params.mean()
def exp_transform(params: ArrayLike) -> NDArray[np.float64]:
"""Transform parameters into exp-scale weights."""
params = np.asarray(params)
weights = np.exp(params - np.mean(params))
return (len(weights) / weights.sum()) * weights
def softmax(xs: NDArray[np.float64]) -> NDArray[np.float64]:
"""Stable implementation of the softmax function."""
ys = xs - np.max(xs)
exps = np.exp(ys)
return exps / exps.sum(axis=0)
def normal_cdf(x: float) -> float:
"""Normal cumulative density function."""
# If X ~ N(0,1), returns P(X < x).
return math.erfc(-x / SQRT2) / 2.0
def normal_pdf(x: float) -> float:
"""Normal probability density function."""
return math.exp(-x * x / 2.0) / SQRT2PI
def inv_posdef(mat: NDArray[np.float64]) -> NDArray[np.float64]:
"""Stable inverse of a positive definite matrix."""
# See:
# - http://www.seas.ucla.edu/~vandenbe/103/lectures/chol.pdf
# - http://scicomp.stackexchange.com/questions/3188
chol = np.linalg.cholesky(mat)
ident = np.eye(mat.shape[0])
res = solve_triangular(chol, ident, lower=True, overwrite_b=True)
return np.transpose(res).dot(res)
[docs]
def kendalltau_dist(
params1: NDArray[np.float64],
params2: NDArray[np.float64] | None = None,
) -> float:
r"""Compute the Kendall tau distance between two models.
This function computes the Kendall tau distance between the rankings
induced by two parameter vectors. Let :math:`\sigma_i` be the rank of item
``i`` in the model described by ``params1``, and :math:`\tau_i` be its rank
in the model described by ``params2``. The Kendall tau distance is defined
as the number of pairwise disagreements between the two rankings, i.e.,
.. math::
\sum_{i=1}^N \sum_{j=1}^N
\mathbf{1} \{ \sigma_i > \sigma_j \wedge \tau_i < \tau_j \}
By convention, items with the lowest parameters are ranked first (i.e.,
sorted using the natural order).
If the argument ``params2`` is ``None``, the second model is assumed to
rank the items by their index: item ``0`` has rank 1, item ``1`` has rank
2, etc.
If some values are equal within a parameter vector, all items are given a
distinct rank, corresponding to the order in which the values occur.
Parameters
----------
params1 : array_like
Parameters of the first model.
params2 : array_like, optional
Parameters of the second model.
Returns
-------
dist : float
Kendall tau distance.
"""
assert params2 is None or len(params1) == len(params2)
ranks1 = rankdata(params1, method="ordinal")
if params2 is None:
ranks2 = np.arange(1, len(params1) + 1, dtype=float)
else:
ranks2 = rankdata(params2, method="ordinal")
tau, _ = kendalltau(ranks1, ranks2)
n_items = len(params1)
n_pairs = n_items * (n_items - 1) / 2
return round((n_pairs - n_pairs * cast(float, tau)) / 2)
def rmse(params1: NDArray[np.float64], params2: NDArray[np.float64]) -> float:
r"""Compute the root-mean-squared error between two models.
Parameters
----------
params1 : array_like
Parameters of the first model.
params2 : array_like
Parameters of the second model.
Returns
-------
error : float
Root-mean-squared error.
"""
assert len(params1) == len(params2)
params1 = np.asarray(params1) - np.mean(params1)
params2 = np.asarray(params2) - np.mean(params2)
sqrt_n = math.sqrt(len(params1))
return float(np.linalg.norm(params1 - params2, ord=2) / sqrt_n)
[docs]
def log_likelihood_pairwise(data: PairwiseData, params: NDArray[np.float64]) -> float:
"""Compute the log-likelihood of model parameters."""
loglik = 0
for winner, loser in data:
loglik -= np.logaddexp(0, -(params[winner] - params[loser]))
return loglik
[docs]
def log_likelihood_rankings(data: RankingData, params: NDArray[np.float64]) -> float:
"""Compute the log-likelihood of model parameters."""
loglik: float = 0.0
params = np.asarray(params)
for ranking in data:
for i, winner in enumerate(ranking[:-1]):
loglik -= logsumexp(params.take(ranking[i:]) - params[winner]) # pyright: ignore[reportOperatorIssue]
return loglik
[docs]
def log_likelihood_top1(data: Top1Data, params: NDArray[np.float64]) -> float:
"""Compute the log-likelihood of model parameters."""
loglik: float = 0.0
params = np.asarray(params)
for winner, losers in data:
idx = np.append(winner, losers)
loglik -= logsumexp(params.take(idx) - params[winner]) # pyright: ignore[reportOperatorIssue]
return loglik
[docs]
def log_likelihood_network(
digraph: Any,
traffic_in: Sequence,
traffic_out: Sequence,
params: NDArray[np.float64],
weight: str | None = None,
) -> float:
"""
Compute the log-likelihood of model parameters.
If ``weight`` is not ``None``, the log-likelihood is correct only up to a
constant (independent of the parameters).
"""
loglik: float = 0.0
for i in range(len(traffic_in)):
loglik += traffic_in[i] * params[i]
if digraph.out_degree(i) > 0:
neighbors = list(digraph.successors(i))
if weight is None:
loglik -= traffic_out[i] * logsumexp(params.take(neighbors))
else:
weights = [digraph[i][j][weight] for j in neighbors]
loglik -= traffic_out[i] * logsumexp(params.take(neighbors), b=weights)
return loglik
def statdist(generator: ArrayLike) -> NDArray[np.float64]:
"""Compute the stationary distribution of a Markov chain.
Parameters
----------
generator : array_like
Infinitesimal generator matrix of the Markov chain.
Returns
-------
dist : numpy.ndarray
The unnormalized stationary distribution of the Markov chain.
Raises
------
ValueError
If the Markov chain does not have a unique stationary distribution.
"""
generator = np.asarray(generator)
n = generator.shape[0]
with warnings.catch_warnings():
# The LU decomposition raises a warning when the generator matrix is
# singular (which it, by construction, is!).
warnings.filterwarnings("ignore")
lu, piv = spl.lu_factor(generator.T, check_finite=False)
# The last row contains 0's only.
left = lu[:-1, :-1]
right = -lu[:-1, -1]
# Solves system `left * x = right`. Assumes that `left` is
# upper-triangular (ignores lower triangle).
try:
res = spl.solve_triangular(left, right, check_finite=False)
except: # noqa: E722
# Ideally we would like to catch `spl.LinAlgError` only, but there seems
# to be a bug in scipy, in the code that raises the LinAlgError (!!).
raise ValueError(
"stationary distribution could not be computed. "
"Perhaps the Markov chain has more than one absorbing class?"
)
res = np.append(res, 1.0)
return (n / res.sum()) * res
[docs]
def generate_params(
n_items: int, interval: float = 5.0, ordered: bool = False
) -> NDArray[np.float64]:
r"""Generate random model parameters.
This function samples a parameter independently and uniformly for each
item. ``interval`` defines the width of the uniform distribution.
Parameters
----------
n_items : int
Number of distinct items.
interval : float
Sampling interval.
ordered : bool, optional
If true, the parameters are ordered from lowest to highest.
Returns
-------
params : numpy.ndarray
Model parameters.
"""
params = np.random.uniform(low=0, high=interval, size=n_items)
if ordered:
params.sort()
return params - params.mean()
[docs]
def generate_pairwise(params: ArrayLike, n_comparisons: int = 10) -> PairwiseData:
"""Generate pairwise comparisons from a Bradley--Terry model.
This function samples comparisons pairs independently and uniformly at
random over the ``len(params)`` choose 2 possibilities, and samples the
corresponding comparison outcomes from a Bradley--Terry model parametrized
by ``params``.
Parameters
----------
params : array_like
Model parameters.
n_comparisons : int
Number of comparisons to be returned.
Returns
-------
data : list of (int, int)
Pairwise-comparison samples (see :ref:`data-pairwise`).
"""
params = np.asarray(params)
n = len(params)
items = tuple(range(n))
data = list()
for _ in range(n_comparisons):
# Pick the pair uniformly at random.
a, b = random.sample(items, 2)
if compare((a, b), params) == a:
data.append((a, b))
else:
data.append((b, a))
return tuple(data)
[docs]
def generate_rankings(params: ArrayLike, n_rankings: int, size: int = 3) -> RankingData:
"""Generate rankings according to a Plackett--Luce model.
This function samples subsets of items (of size ``size``) independently and
uniformly at random, and samples the correspoding partial ranking from a
Plackett--Luce model parametrized by ``params``.
Parameters
----------
params : array_like
Model parameters.
n_rankings : int
Number of rankings to generate.
size : int, optional
Number of items to include in each ranking.
Returns
-------
data : list of numpy.ndarray
A list of (partial) rankings generated according to a Plackett--Luce
model with the specified model parameters.
"""
params = np.asarray(params)
n = len(params)
items = tuple(range(n))
data = list()
for _ in range(n_rankings):
# Pick the alternatives uniformly at random.
alts = random.sample(items, size)
ranking = compare(alts, params, rank=True)
data.append(ranking)
return tuple(data)
[docs]
def compare(items: Sequence[int], params: ArrayLike, rank: bool = False) -> int | NDArray[np.int64]:
"""Generate a comparison outcome that follows Luce's axiom.
This function samples an outcome for the comparison of a subset of items,
from a model parametrized by ``params``. If ``rank`` is True, it returns a
ranking over the items, otherwise it returns a single item.
Parameters
----------
items : list
Subset of items to compare.
params : array_like
Model parameters.
rank : bool, optional
If true, returns a ranking over the items instead of a single item.
Returns
-------
outcome : int or list of int
The chosen item, or a ranking over ``items``.
"""
probs = probabilities(items, params)
if rank:
return np.random.choice(items, size=len(items), replace=False, p=probs)
else:
return np.random.choice(items, p=probs)
[docs]
def probabilities(items: Sequence[int], params: ArrayLike) -> NDArray[np.float64]:
"""Compute the comparison outcome probabilities given a subset of items.
This function computes, for each item in ``items``, the probability that it
would win (i.e., be chosen) in a comparison involving the items, given
model parameters.
Parameters
----------
items : list
Subset of items to compare.
params : array_like
Model parameters.
Returns
-------
probs : numpy.ndarray
A probability distribution over ``items``.
"""
params = np.asarray(params)
return softmax(params.take(items))