diff --git a/README.md b/README.md index 2aaabe7e..6202b8f0 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,10 @@ First, set up a virtual environment (e.g. via [miniconda](https://docs.conda.io/ `pip install -e .` +3. In addition, this project requires [rdot](), a python library of rate-distortion optimization tools. When a stable version is available, we will add this to the ULTK `setup.py` file; for now, install via git: + + `python3 -m pip install git+https://github.com/nathimel/rdot.git` + ## Getting started - Check out the [examples](https://github.com/CLMBRs/ultk/tree/main/src/examples), starting with a basic signaling game. The examples folder also contains a simiple efficient communication analysis of [indefinites](https://github.com/CLMBRs/ultk/tree/main/src/examples/indefinites). diff --git a/src/ultk/effcomm/agent.py b/src/ultk/effcomm/agent.py index 50c39bc4..7860b012 100644 --- a/src/ultk/effcomm/agent.py +++ b/src/ultk/effcomm/agent.py @@ -296,24 +296,3 @@ def __init__( for i in range(len(self.R)): col = speaker.S[:, i] self.R[i] = col @ prior / np.sum(col @ prior) - - -class BayesianListener(Listener): - """A Bayesian reciever chooses an interpretation according to p(meaning | word), where - - # BUG: This is extremely misleading since we basically only use this function for IB, and IB assumes a DETERMINISTIC bayes-derived listener. - - $P(m | w) = \\frac{P(M | W) \cdot P(M)} { P(W) }$ - - Furthermore, we sometimes require that each word w is deterministically interpreted as meaning $\hat{m}$ as follows: - - # BUG: This says nothing about determinism. - $\hat{m}_{w}(u) = \sum_m p(m|w) \cdot m(u)$ - - See ultk.effcomm.information for more details. - """ - - def __init__(self, speaker: Speaker, prior: np.ndarray, name: str = None): - weights = bayes(speaker.normalized_weights(), prior) - # TODO: Change this whole class to DeterministicBayesOptimalListener, and implement the correct weights! - super().__init__(speaker.language, weights=weights, name=name) diff --git a/src/ultk/effcomm/information.py b/src/ultk/effcomm/information.py deleted file mode 100644 index 0f9c0cb1..00000000 --- a/src/ultk/effcomm/information.py +++ /dev/null @@ -1,486 +0,0 @@ -"""Helper functions for Rate-Distortion based (including Information Bottleneck) efficient communication analyses.""" - -import numpy as np -from ultk.language.language import Language -from ultk.language.semantics import Universe, Referent -from ultk.effcomm.agent import LiteralSpeaker, BayesianListener -from ultk.effcomm import util -from embo import InformationBottleneck -from typing import Callable - - -def information_rate(source: np.ndarray, encoder: np.ndarray) -> float: - """Compute the information rate / complexity of the encoder q(w|m) as $I[W:M]$.""" - pXY = util.joint(pY_X=encoder, pX=source) - return util.MI(pXY=pXY) - - -############################################################################## -# Rate-Distortion Theory -############################################################################## - - -def get_rd_curve( - prior: np.ndarray, - dist_mat: np.ndarray, - betas: np.ndarray = None, -) -> list[tuple[float]]: - """Use the Blahut Arimoto algorithm to obtain a list of (rate, distortion) points.""" - if betas is None: - # B-A gets a bit sparse in low-rate regions for just one np.linspace - # betas = np.linspace(start=0, stop=2**7, num=50) - betas = np.concatenate( - [ - np.linspace(start=0, stop=0.29, num=333), - np.linspace(start=0.3, stop=0.9, num=333), - np.linspace(start=1.0, stop=2**7, num=334), - ] - ) - # TODO: unify or something - prior = np.array(prior) - dist_mat = np.array(dist_mat) - - rd = lambda beta: blahut_arimoto(dist_mat, p_x=prior, beta=beta)["final"] - pareto_points = [rd(beta) for beta in betas] - return pareto_points - - -def expected_distortion( - p_x: np.ndarray, p_xhat_x: np.ndarray, dist_mat: np.ndarray -) -> float: - """$D[X, \hat{X}] = \sum_x p(x) \sum_{\hat{x}} p(\hat{x}|x) \cdot d(x, \hat{x})$""" - # BUG to fix: you need to diagonalize the prior. - return np.sum(np.diag(p_x) @ (p_xhat_x * dist_mat)) - - -def compute_rate_distortion( - p_x, - p_xhat_x, - dist_mat, -) -> tuple[np.ndarray]: - """Compute the information rate $I(X;\hat{X})$ and total distortion $D[X, \hat{X}]$ of a joint distribution defind by $P(X)$ and $P(\hat{X}|X)$. - - Args: - p_x: array of shape `|X|` the prior probability of an input symbol (i.e., the source) - - p_xhat_x: array of shape `(|X|, |X_hat|)` the probability of an output symbol given the input - - dist_mat: array of shape `(|X|, |X_hat|)` representing the distoriton matrix between the input alphabet and the reconstruction alphabet. - - Returns: - a (rate, distortion) tuple containing the information rate (in bits) of compressing X into X_hat and the expected distortion between X, X_hat - """ - return ( - information_rate(p_x, p_xhat_x), - expected_distortion(p_x, p_xhat_x, dist_mat), - ) - - -def blahut_arimoto( - dist_mat: np.ndarray, - p_x: np.ndarray, - beta: float, - max_it: int = 200, - eps: float = 1e-5, - ignore_converge: bool = False, -) -> tuple[float]: - """Compute the rate-distortion function of an i.i.d distribution - - Args: - dist_mat: array of shape `(|X|, |X_hat|)` representing the distortion matrix between the input alphabet and the reconstruction alphabet. dist_mat[i,j] = dist(x[i],x_hat[j]). In this context, X is a random variable representing the a speaker's meaning (target referent), and X_hat is a random variable representing a listener's meaning (guessed referent). - - p_x: (1D array of shape `|X|`) representing the probability mass function of the source. In this context, the prior over states of nature. - - beta: (scalar) the slope of the rate-distoriton function at the point where evaluation is required - - max_it: max number of iterations - - eps: accuracy required by the algorithm: the algorithm stops if there is no change in distoriton value of more than 'eps' between consequtive iterations - - ignore_converge: whether to run the optimization until `max_it`, ignoring the stopping criterion specified by `eps`. - - Returns: - a dict of the form - - { - 'final': a tuple of (rate, distortion) values. This is the rate (in bits) of compressing X into X_hat, and distortion between X, X_hat - - 'trajectory': a list of the (rate, distortion) points discovered during optimization - } - """ - # start with iid conditional distribution, as p(x) may not be uniform - p_xhat_x = np.tile(p_x, (dist_mat.shape[1], 1)).T - - # normalize - p_x /= np.sum(p_x) - p_xhat_x /= np.sum(p_xhat_x, 1, keepdims=True) - - it = 0 - traj = [] - distortion = 2 * eps - converged = False - while not converged: - it += 1 - distortion_prev = distortion - - # p(x_hat) = sum p(x) p(x_hat | x) - p_xhat = p_x @ p_xhat_x - - # p(x_hat | x) = p(x_hat) exp(- beta * d(x_hat, x)) / Z - p_xhat_x = np.exp(-beta * dist_mat) * p_xhat - p_xhat_x /= np.expand_dims(np.sum(p_xhat_x, 1), 1) - - # update for convergence check - rate, distortion = compute_rate_distortion(p_x, p_xhat_x, dist_mat) - - # collect point - traj.append((rate, distortion)) - - # convergence check - if ignore_converge: - converged = it == max_it - else: - converged = it == max_it or np.abs(distortion - distortion_prev) < eps - - return { - "final": (rate, distortion), - "trajectory": traj, - } - - -############################################################################## -# Information Bottleneck -############################################################################## - -# === Main IB methods === - - -def get_ib_curve( - prior: np.ndarray, - meaning_dists: np.ndarray, - maxbeta: float, - minbeta: float, - numbeta: float, - processes: int = 1, - curve_type: str = "informativity", -) -> tuple[float]: - """Get a list of (complexity, accuracy) or (complexity, distortion) points. A minimal wrapper of `get_bottleneck.` - - Args: - prior: array of shape `|meanings|` - - meaning_dists: array of shape `(|meanings|, |meanings|)` representing the distribution over world states given meanings. - - curve_type: {'informativity', 'comm_cost'} specifies whether to return the (classic) IB axes of informativity vs. complexity, or the more Rate-Distortion Theory aligned axes of comm_cost vs. complexity. The latter can be obtained easily from the former by subtracting each informativity value from I[M:U], which is a constant for all languages in the same domain. - - maxbeta: the maximum value of beta to use to compute the curve. - - minbeta: the minimum value of beta to use. - - numbeta: the number of (equally-spaced) beta values to consider to compute the curve. - - processes: number of cpu threads to run in parallel (default = 1) - - Returns: - an array of shape `(num_points, 2)` representing the list of (accuracy/comm_cost, complexity) points on the information plane. - """ - - complexity, accuracy, distortion = get_bottleneck( - prior, meaning_dists, maxbeta, minbeta, numbeta, processes - ) - if curve_type == "comm_cost": - return np.array( - list( - zip( - distortion, - complexity, - ) - ) - ) # expected kl divergence, complexity - - else: - points = np.array( - list( - zip( - accuracy, - complexity, - ) - ) - ) # informativity, complexity - return points - - -def get_bottleneck( - prior: np.ndarray, - meaning_dists: np.ndarray, - maxbeta: float, - minbeta: float, - numbeta: float, - processes: int = 1, -) -> np.ndarray: - """Compute the IB curve bound (I[M:W] vs. I[W:U]). We use the embo package, which has support for smoothing any non-monotonicity in the bound resulting from BA optimization getting stuck in local minima. - - Args: - prior: array of shape `|meanings|` - - meaning_dists: array of shape `(|meanings|, |meanings|)` representing the distribution over world states given meanings. - - curve_type: {'informativity', 'comm_cost'} specifies whether to return the (classic) IB axes of informativity vs. complexity, or the more Rate-Distortion Theory aligned axes of comm_cost vs. complexity. The comm_cost can be obtained easily from informativity by subtracting each informativity value from I[M:U], which is a constant for all languages in the same domain. - - maxbeta: the maximum value of beta to use to compute the curve. - - minbeta: the minimum value of beta to use. - - numbeta: the number of (equally-spaced) beta values to consider to compute the curve. - - processes: number of cpu threads to run in parallel (default = 1) - - Returns: - a dict containing the coordinates and encoders corresponding to IB optima, of the form - - { - "encoders": an array of shape `(num_meanings, num_words)`,\n - "coordinates": a tuple of arrays `(complexity, accuracy, comm_cost)` each of shape (`numbeta`,) - "beta": an array of shape (`numbeta`,) corresponding to the actually used betas after non-monotonicity corrections. - } - """ - # N.B.: embo only uses numpy and scipy - prior = np.array(prior) - meaning_dists = np.array(meaning_dists) - - joint_pmu = util.joint(meaning_dists, prior) # P(u) = P(m) - I_mu = util.MI(joint_pmu) - - # I[M:W], I[W:U], H[W], beta, encoders - I_mw, I_wu, _, beta, encoders = InformationBottleneck( - pxy=joint_pmu, - maxbeta=maxbeta, - minbeta=minbeta, - numbeta=numbeta, - processes=processes, - ).get_bottleneck() - - def normalize_rows(mat: np.ndarray): - return mat / mat.sum(1, keepdims=True) - - # compute by hand for debug - # NOTE: I don't remember why I was doing this, maybe there's a bug - # Oh i remember, it's because the encoders computed by embo don't always sum to 1. - # encoders = np.array([normalize_rows(encoder) for encoder in encoders]) - # points = [ib_encoder_to_point(meaning_dists, prior, encoder)[:-1] for encoder in encoders] - # I_mw, I_wu = tuple(zip(*points)) - - coordinates = list(zip(*(I_mw, I_wu, I_mu - I_wu))) - - return { - "encoders": encoders, - "coordinates": coordinates, - "betas": beta, - } - - -############################################################################## -# Using altk.Language -############################################################################## - - -def ib_complexity( - language: Language, - prior: np.ndarray, -) -> float: - """Compute the IB encoder complexity of a language $I[M:W]$.""" - return float( - information_rate( - source=prior, - encoder=language_to_ib_encoder_decoder( - language, - prior, - )["encoder"], - ) - ) - - -def ib_informativity( - language: Language, - prior: np.ndarray, - meaning_dists: np.ndarray, -) -> float: - """Compute the expected informativity (accuracy) $I[W:U]$ of a lexicon. - - Args: - language: the Language to measure for informativity - - prior: communicative need distribution - - meaning_dists: array of shape `(|meanings|, |meanings|)` representing the distribution over world states given meanings. - - Returns: - the informativity of the language I[W:U] in bits. - """ - return float( - ib_accuracy( - language_to_ib_encoder_decoder(language, prior)["encoder"], - prior, - meaning_dists, - ) - ) - - -def ib_comm_cost( - language: Language, - prior: np.ndarray, - meaning_dists: np.ndarray, -) -> float: - """Compute the IB communicative cost, i.e. expected KL-divergence betweeen speaker and listener meanings, for a language. - - Args: - language: the Language to measure for communicative cost - - prior: communicative need distribution - - meaning_dists: array of shape `(|meanings|, |meanings|)` representing the distribution over world states given meanings. - - Returns: - the communicative cost, $\mathbb{E}[D_{KL}[M || \hat{M}]] = I[M:U] - I[W:U]$ in bits. - """ - return ib_distortion( - language_to_ib_encoder_decoder(language, prior)["encoder"], - prior, - meaning_dists, - ) - - -def language_to_ib_encoder_decoder( - language: Language, - prior: np.ndarray, -) -> dict[str, np.ndarray]: - """Convert a Language, a mapping of words to meanings, to IB encoder, q(w|m) and IB decoder q(m|w). - - Args: - language: the lexicon from which to infer a speaker (encoder). - - prior: communicative need distribution - - Returns: - a dict of the form - { - "encoder": np.ndarray of shape `(|meanings|, |words|)`, - "decoder": np.ndarray of shape `(|words|, |meanings|)`, - } - """ - # In the IB framework, the encoder _can_ be a literal speaker and the decoder is a bayes optimal listener. - speaker = LiteralSpeaker(language) - speaker.weights = util.rows_zero_to_uniform(speaker.normalized_weights()) - listener = BayesianListener(speaker, prior) - return { - "encoder": speaker.normalized_weights(), - "decoder": listener.normalized_weights(), - } - - -############################################################################## -# Without using altk.Language -############################################################################## - - -def ib_accuracy( - encoder: np.ndarray, prior: np.ndarray, meaning_dists: np.ndarray -) -> float: - """Return the accuracy of the lexicon I[W:U] - - Args: - encoder: array of shape `(|M|, |W|)` representing P(W | M) - - decoder: array of shape `(|W|, |M|)` representing P(M | W) - - meaning_dists: array of shape `(|M|, |U|)` representing P(U | M) - - prior: array of shape `|M|` representing P(M) - - Returns: - the accuracy of the lexicon I[W:U] - """ - pMW = util.joint(encoder, prior) - pWU = pMW.T @ meaning_dists - return util.MI(pWU) - - -def ib_distortion( - encoder: np.ndarray, prior: np.ndarray, meaning_dists: np.ndarray -) -> float: - """Return the IB distortion measure E[DKL[ M || M_hat ]] - - Args: - encoder: array of shape `(|M|, |W|)` representing P(W | M) - - decoder: array of shape `(|W|, |M|)` representing P(M | W) - - meaning_dists: array of shape `(|M|, |U|)` representing P(U | M) - - prior: array of shape `|M|` representing P(M) - - Returns: - the distortion E[DKL[ M || M_hat ]] = I[M:U] - I[W:U] - """ - pMU = util.joint(meaning_dists, prior) - I_mu = util.MI(pMU) - accuracy = ib_accuracy(encoder, prior, meaning_dists) - return I_mu - accuracy - - -def ib_encoder_to_point( - meaning_dists: np.ndarray, - prior: np.ndarray, - encoder: np.ndarray, - decoder: np.ndarray = None, -) -> tuple[float]: - """Return (complexity, accuracy, comm_cost) IB coordinates. - - Args: - meaning_dists: array of shape `(|meanings|, |meanings|)` representing the distribution over world states given meanings. - - prior: array of shape `|M|` representing the cognitive source - - encoder: array of shape `(|M|, |W|)` representing P(W | M) - - decoder: array of shape `(|W|, |M|)` representing P(M | W). By default is None, and the Bayesian optimal decoder will be inferred. - """ - # TODO: be consistent about tensors vs arrays - encoder = np.array(encoder) - meaning_dists = np.array(meaning_dists) - prior = np.array(prior) - if decoder is not None: - decoder = np.array(decoder) - else: - decoder = ib_optimal_decoder(encoder, prior, meaning_dists) - - encoder = util.rows_zero_to_uniform(encoder) - decoder = util.rows_zero_to_uniform(decoder) - - complexity = information_rate(prior, encoder) - accuracy = ib_accuracy(encoder, prior, meaning_dists) - distortion = ib_distortion(encoder, prior, meaning_dists) - - return (complexity, accuracy, distortion) - - -def ib_optimal_decoder( - encoder: np.ndarray, - prior: np.ndarray, - meaning_dists: np.ndarray, -) -> np.ndarray: - """Compute the bayesian optimal decoder. See https://github.com/nogazs/ib-color-naming/blob/master/src/ib_naming_model.py#L40 - - Args: - encoder: array of shape `(|words|, |meanings|)` - - prior: array of shape `(|meanings|,)` - - meaning_dists: array of shape `(|meanings|, |meanings|)` - - Returns: - array of shape `(|words|, |meanings|)` representing the 'optimal' deterministic decoder - """ - pMW = util.joint(encoder, prior) - pW_M = pMW.T / pMW.sum(axis=0)[:, None] - return pW_M @ meaning_dists diff --git a/src/ultk/effcomm/rate_distortion.py b/src/ultk/effcomm/rate_distortion.py new file mode 100644 index 00000000..1b73edc7 --- /dev/null +++ b/src/ultk/effcomm/rate_distortion.py @@ -0,0 +1,191 @@ +"""Helper functions for Rate-Distortion based (including Information Bottleneck) efficient communication analyses.""" + +import numpy as np +from ultk.language.language import Language +from ultk.effcomm.agent import LiteralSpeaker, Listener +from rdot.ba import IBOptimizer, IBResult +from rdot.probability import joint, bayes +from rdot.information import information_cond, MI + +############################################################################## +# Measuring Languages +############################################################################## + + +def language_to_ib_coordinate( + language: Language, + prior: np.ndarray, + meaning_dists: np.ndarray, +) -> tuple[float]: + """Compute the complexity, informativity, and communicative cost under the IB framework (Zaslavsky et al., 2018, i.a.). + + Args: + language: the Language to measure. This function will infer an IB encoder, which will be represented as a 2D array of shape `(|words|, |meanings|)` + + meaning_dists: array of shape `(|meanings|, |meanings|)` representing the distribution over world states given meanings. + + prior: array of shape `|M|` representing the communicative need distribution + + Returns: + a tuple of floats `(complexity, accuracy, distortion)`, s.t. + + `complexity`: the complexity of the lexicon I[M:W], in bits + + `accuracy`: the accuracy of the lexicon I[W:U], in bits + + `distortion`: the distortion E[DKL[ M || M_hat ]] = I[M:U] - I[W:U], in bits + """ + args = [language, prior, meaning_dists] + return ib_encoder_to_point( + *args[1:], language_to_ib_encoder_decoder(*args).values() + ) + + +def language_to_ib_encoder_decoder( + language: Language, + prior: np.ndarray, + meaning_dists: np.ndarray, +) -> dict[str, np.ndarray]: + """Convert a Language, a mapping of words to meanings, to IB encoder, q(w|m) and IB decoder q(m|w). + + A Bayesian decoder chooses an interpretation according to p(meaning | word), where + + $P(m | w) = \\frac{P(M | W) \cdot P(M)} { P(W) }$ + + Furthermore, we will require that each word w is deterministically interpreted as meaning $\hat{m}$ as follows: + + $\hat{m}_{w}(u) = \sum_m p(m|w) \cdot m(u)$ + + See https://github.com/nogazs/ib-color-naming/blob/master/src/ib_naming_model.py#L40. + + Args: + language: the lexicon from which to infer a speaker (encoder). + + prior: communicative need distribution + + Returns: + a dict of the form + { + "encoder": np.ndarray of shape `(|meanings|, |words|)`, + "decoder": np.ndarray of shape `(|words|, |meanings|)`, + } + """ + # In the IB framework, the encoder _can_ be a literal speaker and the decoder is a bayes optimal listener. + speaker = LiteralSpeaker(language) + speaker.weights = rows_zero_to_uniform(speaker.normalized_weights()) + listener = Listener(language) + listener.weights = ib_optimal_decoder(speaker.weights, prior, meaning_dists) + return { + "encoder": speaker, + "decoder": listener, + } + + +def ib_encoder_to_point( + meaning_dists: np.ndarray, + prior: np.ndarray, + encoder: np.ndarray, + decoder: np.ndarray = None, +) -> tuple[float]: + """Return (complexity, accuracy, comm_cost) IB coordinates. + + Args: + meaning_dists: array of shape `(|meanings|, |meanings|)` representing the distribution over world states given meanings. + + prior: array of shape `|M|` representing the communicative need distribution + + encoder: array of shape `(|M|, |W|)` representing P(W | M) + + decoder: array of shape `(|W|, |M|)` representing P(M | W). By default is None, and the Bayesian optimal decoder will be inferred. + """ + if decoder is None: + decoder = ib_optimal_decoder(encoder, prior, meaning_dists) + + complexity = information_cond(prior, encoder) + accuracy = MI(meaning_dists @ joint(encoder, prior)) + distortion = MI(joint(meaning_dists, prior)) - accuracy + + return (complexity, accuracy, distortion) + + +def ib_optimal_decoder( + encoder: np.ndarray, + prior: np.ndarray, + meaning_dists: np.ndarray, +) -> np.ndarray: + """Compute the bayesian optimal decoder. See https://github.com/nogazs/ib-color-naming/blob/master/src/ib_naming_model.py#L40 + + Args: + encoder: array of shape `(|words|, |meanings|)` + + prior: array of shape `(|meanings|,)` + + meaning_dists: array of shape `(|meanings|, |meanings|)` + + Returns: + array of shape `(|words|, |meanings|)` representing the 'optimal' deterministic decoder + """ + return bayes(encoder, prior) @ meaning_dists + + +############################################################################## +# Estimating bounds for a domain +############################################################################## + + +def get_ib_bound( + prior: np.ndarray, + meaning_dists: np.ndarray, + *args, + betas: np.ndarray = np.logspace(-2, 5, 30), + **kwargs, +) -> list[IBResult]: + """Estimate the IB theoretical bound for a domain, specified by a prior over meanings, (perceptually uncertain) meaning distributions. + + Args: + meaning_dists: array of shape `(|meanings|, |meanings|)` representing the distribution over world states given meanings. + + prior: array of shape `|M|` representing the communicative need distribution + + Returns: + a list of `rdot.ba.IBResult` namedtuples. + """ + return IBOptimizer( + joint(meaning_dists, prior), + betas, + *args, + **kwargs, + ).get_results() + + +############################################################################## +# Helper functions for working with stochastic matrices +############################################################################## + + +def rows_zero_to_uniform(mat: np.ndarray) -> np.ndarray: + """Ensure that `mat` encodes a probability distribution, i.e. each row (indexed by a meaning) is a distribution over expressions: sums to exactly 1.0. + + This is necessary when exploring mathematically possible languages (including natural languages, like Hausa in the case of modals) which sometimes have that a row of the matrix p(word|meaning) is a vector of 0s. + + Args: + mat: a 2D numpy array that should be normalized so that each row is a probability distribution. + """ + mat = np.array(mat) + + threshold = 1e-5 + + # Ensure if p(.|meaning) sums to > 0 at all, it must sum to 1. + for row in mat: + # less than 1.0 + if row.sum() and 1.0 - row.sum() > threshold: + print("row is nonzero and sums to less than 1.0!") + print(row, row.sum()) + raise Exception + # greater than 1.0 + if row.sum() and row.sum() - 1.0 > threshold: + print("row sums to greater than 1.0!") + print(row, row.sum()) + raise Exception + + return np.array([row if row.sum() else np.ones(len(row)) / len(row) for row in mat]) diff --git a/src/ultk/effcomm/util.py b/src/ultk/effcomm/util.py deleted file mode 100644 index 8d4e5de5..00000000 --- a/src/ultk/effcomm/util.py +++ /dev/null @@ -1,173 +0,0 @@ -"""Various helper functions for computing complexity and informativity.""" -import numpy as np -from scipy.special import logsumexp -from ultk.language.semantics import Universe, Referent -from typing import Callable - -############################################################################## -# Miscellaneous helper functions -############################################################################## - - -def rows_zero_to_uniform(mat: np.ndarray) -> np.ndarray: - """Ensure that `mat` encodes a probability distribution, i.e. each row (indexed by a meaning) is a distribution over expressions: sums to exactly 1.0. - - This is necessary when exploring mathematically possible languages (including natural languages, like Hausa in the case of modals) which sometimes have that a row of the matrix p(word|meaning) is a vector of 0s. - - Args: - mat: a 2D numpy array that should be normalized so that each row is a probability distribution. - """ - mat = np.array(mat) - - threshold = 1e-5 - - # Ensure if p(.|meaning) sums to > 0 at all, it must sum to 1. - for row in mat: - # less than 1.0 - if row.sum() and 1.0 - row.sum() > threshold: - print("row is nonzero and sums to less than 1.0!") - print(row, row.sum()) - raise Exception - # greater than 1.0 - if row.sum() and row.sum() - 1.0 > threshold: - print("row sums to greater than 1.0!") - print(row, row.sum()) - raise Exception - - return np.array([row if row.sum() else np.ones(len(row)) / len(row) for row in mat]) - - -def build_utility_matrix( - universe: Universe, utility: Callable[[Referent, Referent], float] -) -> np.ndarray: - """Construct the square matrix specifying the utility function defined for pairs of meanings, used for computing communicative success.""" - return np.array( - [ - [utility(ref, ref_) for ref_ in universe.referents] - for ref in universe.referents - ] - ) - - -############################################################################## -# Helper functions for measuring information-theoretic quantities. Code credit belongs to N. Zaslavsky: https://github.com/nogazs/ib-color-naming/blob/master/src/tools.py -############################################################################## - - -PRECISION = 1e-16 - -# === DISTRIBUTIONS === - - -def marginal(pXY, axis=1): - """Compute $p(x) = \sum_x p(x,y)$ - - Args: - pXY: a numpy array of shape `(|X|, |Y|)` - - Returns: - pY: (axis = 0) or pX (default, axis = 1) - """ - return pXY.sum(axis) - - -def conditional(pXY): - """Compute $p(y|x) = \\frac{p(x,y)}{p(x)}$ - - Args: - pXY: a numpy array of shape `(|X|, |Y|)` - - Returns: - pY_X: a numpy array of shape `(|X|, |Y|)` - """ - pX = pXY.sum(axis=1, keepdims=True) - return np.where(pX > PRECISION, pXY / pX, 1 / pXY.shape[1]) - - -def joint(pY_X, pX): - """Compute $p(x,y) = p(y|x) \cdot p(x) $ - - Args: - pY_X: a numpy array of shape `(|X|, |Y|)` - - pX: a numpy array `|X|` - Returns: - pXY: a numpy array of the shape `(|X|, |Y|)` - """ - # breakpoint() - return pY_X * pX[:, None] - - -def marginalize(pY_X, pX): - """Compute $p(y) = \sum_x p(y|x) \cdot p(x)$ - - Args: - pY_X: a numpy array of shape `(|X|, |Y|)` - - pX: a numpy array of shape `|X|` - - Returns: - pY: a numpy array of shape `|Y|` - """ - return pY_X.T @ pX - - -def bayes(pY_X, pX): - """Compute $p(x|y) = \\frac{p(y|x) \cdot p(x)}{p(y)}$ - Args: - pY_X: a numpy array of shape `(|X|, |Y|)` - """ - pXY = joint(pY_X, pX) - pY = marginalize(pY_X, pX) - # NOTE: original line gave shape errors for broadcasting pXY / pY, - # return np.where(pY > PRECISION, pXY.T / pY, 1 / pXY.shape[0]) - # but the below line still needs to be checked. - return np.where(pY > PRECISION, pXY / pY, 1 / pXY.shape[0]).T - - -# === INFORMATION === - - -def xlogx(p): - """Compute $x \\log p(x)$""" - with np.errstate(divide="ignore", invalid="ignore"): - return np.where(p > PRECISION, p * np.log2(p), 0) - - -def H(p, axis=None): - """Compute the entropy of p, $H(X) = - \sum_x x \\log p(x)$""" - return -xlogx(p).sum(axis=axis) - - -def MI(pXY): - """Compute mutual information, $I[X:Y]$""" - return H(pXY.sum(axis=0)) + H(pXY.sum(axis=1)) - H(pXY) - - -def DKL(p, q, axis=None): - """Compute KL divergences, $D_{KL}[p~||~q]$""" - return (xlogx(p) - np.where(p > PRECISION, p * np.log2(q + PRECISION), 0)).sum( - axis=axis - ) - - -def gNID(pW_X, pV_X, pX): - """Compute Generalized Normalized Informational Distance between two encoders. - - Args: - pW_X: first encoder of shape `(|meanings|, |words|)` - - pV_X: second encoder of shape `(|meanings|, |words|)` - - pX: prior over source variables of shape `(|meanings|,)` - """ - if len(pX.shape) == 1: - pX = pX[:, None] - elif pX.shape[0] == 1 and pX.shape[1] > 1: - pX = pX.T - pXW = pW_X * pX - pWV = pXW.T @ (pV_X) - pWW = pXW.T @ (pW_X) - pVV = (pV_X * pX).T @ (pV_X) - score = 1 - MI(pWV) / (np.max([MI(pWW), MI(pVV)])) - return score