"""Information-theoretic measures (e.g. entropy and divergence) and analyses (e.g. Minimum Description Length Principle for feature selection)."""
import numpy as np
from scipy.stats import entropy
from . import utility
import multiprocessing
import logging
#logger = multiprocessing.log_to_stderr()
#logger.setLevel(multiprocessing.SUBDEBUG)
[docs]def Entropy(distributions):
"""Calculate information entropy (in bits) for a set of distributions.
Wrapper for scipy.stats.entropy(distributions.T) with base=2.
Args:
distributions (numpy.ndarray): shape (n,p) representing relative frequencies of the p features across n specimens.
Returns:
numpy.ndarray(): n information entropy values, one for each specimen.
"""
return entropy(np.asarray(distributions).T, base=2)
[docs]class Divergence(object):
"""Calculate different divergence measures, D(P||Q), to describe the 'statistical distance' between a set of probability distributions and a reference.
Utilises scipy.stats.entropy() under the hood but can accept multiple values for Q simultaneously.
See `Wikipedia: Statistical distance <https://en.wikipedia.org/wiki/Statistical_distance>`_ as well as links in individual measures.
Args:
P (Optional[numpy.ndarray]): shape (1,p) defining a reference sequence against which divergence is calculated. If not provided then a p-dimensional discrete uniform is used.
Qs (numpy.ndarray): shape (n,p) defining the relative frequencies of features in the n specimens. NOTE: Although it has a default value of None, this is not optional (the ordering of P and Qs parameters is a sane choice given scipy.stats.entropy and this requires that Qs have a default if P does).
Raises:
Exception: If no value is provided for Qs.
Exception: If P is provided and it has a different number of features to Qs (i.e. not P.shape[1]==Qs.shape[1]).
Exception: If P is provided and it contains more than one reference (i.e. P.shape[0]>1).
"""
def __init__(self, P=None, Qs=None):
"""Perform sanity checks on the sizes of the distributions of P and Q."""
if Qs is None:
raise Exception("No default value is possible for Qs when calculating divergence")
Qs = utility.VectorToMatrix(Qs)
if P is None:
P = utility.DiscreteUniform(Qs.shape[1])
else:
P = utility.VectorToMatrix(P)
if not Qs.shape[1]==P.shape[1]:
raise Exception("Can not calculate divergence for distributions of different sizes")
if not P.shape[0]==1:
raise Exception("Only one reference distribution should be provided for calculation of divergence")
self.P = P
self.Qs = Qs
self.nQs = Qs.shape[0]
[docs] def KL(self):
"""Return the Kullback-Leibler divergence (in bits).
*Note that this is not symmetrical.*
See `Wikipedia: Kullback-Leibler divergence <https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence>`_.
Returns:
numpy.ndarray: n divergence values, one for each specimen.
"""
return np.asarray([entropy(self.P[0], self.Qs[i], base=2) for i in range(self.nQs)])
[docs] def JS(self):
"""Return the Jensen-Shannon divergence (in bits).
Unlike Kullback-Leibler divergence (KLD), this is symmetrical. The square-root of the value is also a metric so it will satisfy the triangle inequality.
See `Wikipedia: Jensen-Shannon divergence <https://en.wikipedia.org/wiki/Jensen%E2%80%93Shannon_divergence>`_.
.. note::
JSD(P||Q) is calculated from KLD(P||Q) as follows::
let M = (P+Q) / 2
JSD(P||Q) = (KLD(P||M) + KLD(Q||M)) / 2
Returns:
numpy.ndarray: n divergence values, one for each specimen.
"""
Ms = (self.Qs + self.P) / 2
D_PM = Divergence(self.P, Ms).KL()
D_QM = [Divergence(self.Qs[i], Ms[i]).KL()[0] for i in range(self.nQs)]
return (D_PM + D_QM) / 2
[docs]def Complexity(distributions, reference=None):
"""For each of a group of n specimens, return the specimen entropy multiplied by the Jensen-Shannon divergence (each in bits).
See `Berretta et al. Cancer biomarker discovery: The entropic hallmark <https://dx.doi.org/10.1371/journal.pone.0012262>`_.
Args:
distributions (numpy.ndarray): shape (n,p) defining the relative frequencies of features in the n specimens.
reference (Optional[numpy.ndarray]): shape(1,p) defining a reference sequence against which complexity is calculated. If not provided then the average of all distributions is used.
Returns:
numpy.ndarray: n complexity values, one for each specimen.
"""
if reference is None:
reference = np.sum(distributions, axis=0)
assert len(reference)==distributions.shape[1], "Calculated average reference distribution across wrong axis"
E = Entropy(distributions)
D = Divergence(P=reference, Qs=distributions)
return np.multiply(E, D.JS()) # element-wise multiplication
def _ParallelFeatureDiscretization(tupleArguments):
"""Determine threshold value for a single feature and decide if the MDLP criterion is met.
This is not a method of class information.Discretize as we require a function defined at the top of the module to allow for pickling prior to use with multiprocessing.Pool.
We are also limited to a single argument so the tuple is unwrapped as (feature, classes, baseEntropy).
Args:
tupleArguments (numpy.ndarray, numpy.ndarray, float): [0] feature: shape (n,1); feature values for all specimens; [1] classes: shape (n,) defining categorical designation of each specimen; [2] baseEntropy: entropy value pre-calculated for classes
Returns:
tuple(bool, float, numpy.ndarray, float, float): [0] for the best threshold, has the MDLP criterion been met (i.e should we include this feature); [1] best threshold; [2] for each specimen, does it exceed the best threshold; [3] entropy gain as defined in Fayyad and Irani paper; [4] MDLP criterion which entropy gain must exceed
"""
(feature, classes, baseEntropy) = tupleArguments
thresholds = sorted(np.unique(feature))
nSamples = len(classes)
# if all values are the same then it is impossible to improve entropy so bail out early with baseEntropy + 1 as the MDLP criterion as it is impossible to reach
if len(thresholds)==1:
return (False, thresholds[0], np.asarray([True]*nSamples), 0, baseEntropy + 1)
minEntropy = baseEntropy
bestThreshold = None
bestSeparationEntropies = None
for t, threshold in enumerate(thresholds[:-1]): # therefore MUST use > in splitting and not >=
"""
This loop is O(n) if we consider its working to be constant. Can we find the optimal threshold faster? I thought that a binary partitioning O(logn) would work but the entropy is non-convex with respect to the threshold.
The point of this function is to determine entropy-reducing ability of each feature so we can't make assumptions; thus I think O(n) is best possible as we must check every threshold.
"""
separation = Discretize.getSeparation(feature, threshold) # boolean values indicating if each specimen surpasses the threshold for this feature
negated = [separation, ~separation]
n = [np.count_nonzero(s) for s in negated]
ent = [Discretize.groupClassEntropy(utility.BooleanListIndexing(classes, s)) for s in negated]
thresholdEntropy = np.dot(n, ent) / nSamples
if thresholdEntropy<minEntropy:
minEntropy = thresholdEntropy
bestThreshold = (thresholds[t] + thresholds[t+1])/2 # note that we only enumerate up to thresholds[-1]
bestSeparationEntropies = ent[:]
# used to have else: break here but I have demonstrated that the function is non-convex so this is wrong!
bestSeparation = Discretize.getSeparation(feature, bestThreshold)
delta = Discretize.deltaMDLP(classes, baseEntropy, bestSeparation, bestSeparationEntropies) # see Fayyad and Irani paper
mdlpCriterion = (np.log2(nSamples-1) + delta) / nSamples
gain = baseEntropy - minEntropy
return (gain>mdlpCriterion, bestThreshold, bestSeparation, gain, mdlpCriterion)
[docs]class Discretize:
"""Information-theoretic method for discrete feature selection based on Minimum Description Length Principle (MDLP).
This method additionally provides an entropy-minimising means of defining a context-dependent threshold for over- / under-expression of genes across categorical designations. This is the (binary) discretisation step.
Along with gene-expression values, all specimens are labelled by class (i.e. context-dependency) such as cancer vs normal tissue, and a threshold is produced such that the entropy of classes within the groups (above and below said threshold) is minimised. Any number of classes (>=2) can be used.
Features are only included if the reduction in entropy, compared to no separation by a threshold, is sufficiently great (the MDLP criterion).
See `Fayyad and Irani. Multi-interval discretization of continuous-valued attributes for classification learning <http://ourmine.googlecode.com/svn-history/r1513/trunk/share/pdf/fayyad93.pdf>`_.
Attributes:
baseEntropy (float): class entropy for the whole set of specimens.
bestThresholds (numpy.ndarray): float; shape(p,) optimal, entropy-minimising, threshold for each of the p features.
discretizedFeatures (numpy.ndarray): bool; shape (n,p); whether or not each specimen-feature value exceeds the optimal threshold for said feature.
includeFeatures (numpy.ndarray): bool; shape(p,); whether or not the optimal threshold for each feature is sufficient such that the decrease in entropy meets the MDLP criterion.
gains (numpy.ndarray): float; shape(p,); entropy improvement based on best threshold for each feature.
mdlpCriteria (numpy.ndarray): float; shape(p,); minimum gain required for inclusion of each feature.
"""
def __init__(self):
pass
[docs] def fit(self, distributions, classes):
"""Determine threshold values for each feature.
Args:
distributions (numpy.ndarray): shape (n,p) defining the relative frequencies of features in the n specimens.
classes (numpy.ndarray): shape (n,) defining categorical designation of each specimen.
Raises:
Exception: if number of samples is different in distributions and classes arguments.
"""
if not len(classes)==distributions.shape[0]:
raise Exception("Number of class indicators for descritization must equal number of samples provided.")
self.classes = classes
self.baseEntropy = self.groupClassEntropy(self.classes)
self.distributions = distributions
logging.info("Initializing multiprocessing pool for feature discretization")
pool = multiprocessing.Pool()
# pool.map() will only allow a single argument so combine them as a tuple
mappingArguments = [(self.distributions[:,featureIdx], self.classes, self.baseEntropy) for featureIdx in range(distributions.shape[1])]
logging.debug("Mapping feature-discretization function to %d features" % len(mappingArguments))
featureDecisions = pool.map(_ParallelFeatureDiscretization, mappingArguments)
logging.debug("Parallel feature discretization completed")
pool.close()
decisionTuples = zip(*featureDecisions)
(self.includeFeatures, self.bestThresholds, self.discretizedFeatures, self.gains, self.mdlpCriteria) = (np.asarray(t).T for t in decisionTuples)
logging.info("Completed feature discretization")
@staticmethod
[docs] def deltaMDLP(classes, baseEntropy, bestSeparation, bestSeparationEntropies):
"""Calculate the delta value for a particular feature, as defined in Fayyad and Irani paper for calculating MDLP criterion."""
assert bestSeparation.dtype=='bool', "Expecting boolean separation values when calculating delta value."
assert len(bestSeparationEntropies)==2, "Only two separations can be used for calculating MDLP criterion."
# Fayyad and Irani use k for number of classes prior to separation, and k1 / k2 for number of classes in above/below threshold separations
k = len(np.unique(classes))
k12 = [len(np.unique(utility.BooleanListIndexing(classes, s))) for s in [bestSeparation, ~bestSeparation]]
logging.debug("Calculating MDLP delta: %d classes prior to separation and %d / %d post." % (k, k12[0], k12[1]))
for i in range(2):
if k12[i]==1:
assert bestSeparationEntropies[i]==0, "Separation with only one class must have zero entropy"
return np.log2(3**k - 2) - (k*baseEntropy - np.dot(k12, bestSeparationEntropies))
@staticmethod
[docs] def getSeparation(feature, threshold):
"""Ensure that thresholding is performed identically at all times. Threshold candidates are simply the values of the features so we MUST use > and not >=.
Args:
feature (numpy.ndarray): shape (n,) vector defining relative frequency of feature across all specimens.
threshold (float): value for separation of specimens into groups based on their feature value.
Returns:
numpy.ndarray: boolean array defining if a specimen is greater than the threshold.
"""
return feature>threshold
@staticmethod
[docs] def groupClassEntropy(classes):
"""Calculate the entropy of classes a group of specimens. Note that this may be the full set of features or separations based on a threshold."""
(__, freq) = np.unique(classes, return_counts=True)
return Entropy(freq)