Source code for egglib.stats._ld

"""
    Copyright 2015-2023 Stephane De Mita, Mathieu Siol

    This file is part of EggLib.

    EggLib is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    EggLib is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with EggLib.  If not, see <http://www.gnu.org/licenses/>.
"""

from .. import eggwrapper as _eggwrapper
from .. import _interface
from .. import _site

def _get_stats(pld, multiple_policy, min_freq):
    if pld.num_alleles1() > 2 or pld.num_alleles2() > 2:
        if multiple_policy == 'forbid':
            raise ValueError('at least one locus has more than two alleles and this is not allowed')
        elif multiple_policy == 'main':
            freq1 = [pld.freq1(i) for i in range(pld.num_alleles1())]
            freq2 = [pld.freq2(i) for i in range(pld.num_alleles2())]
            a1 = [freq1.index(max(freq1))]
            a2 = [freq2.index(max(freq2))]
        elif multiple_policy == 'average':
            a1 = [i for i in range(pld.num_alleles1()) if pld.freq1(i) >= min_freq]
            a2 = [i for i in range(pld.num_alleles2()) if pld.freq2(i) >= min_freq]
        else:
            raise ValueError('invalid value for `multiple_policy` option: {0}'.format(multiple_policy))
    else:
        a1 = [0]
        a2 = [0]

    n = 0
    D = 0.0
    Dp = 0.0
    r = 0.0
    rsq = 0.0
    for i in a1:
        for j in a2:
            n += 1
            pld.compute(i, j)
            D += pld.D()
            Dp += pld.Dp()
            r += pld.r()
            rsq += pld.rsq()
    if n == 0:
        return {'D': None, 'Dp': None, 'r': None, 'rsq': None}
    if n > 1:
        D /= n
        Dp /= n
        r /= n
        rsq /= n
    return {'D': D, 'Dp': Dp, 'r': r, 'rsq': rsq, 'n': n}

[docs]def pairwise_LD(locus1, locus2, struct=None, multiple_policy='main', min_freq=0): """ Linkage disequilibium between a pair of loci. :class:`.Site` instances must be used to describe loci. Only the order of samples is considered and both loci must have the same number of samples. :param locus1: A :class:`.Site` instance. :param locus2: A :class:`.Site` instance. :param struct: A :class:`.Structure` object used to describe the individual level. If specified, genotypes are automatically described and genotypic linkage is processed. By default, consider all samples as originating from haploid individuals. :param multiple_policy: Determine what is done if either input locus has more than two alleles. Possible values are, ``"forbid"`` (raise an exception if this occurs), ``"main"`` (take the most frequent allele of each locus) and ``"average"`` ( compute the unweighted average over all possible pair of alleles). More options might be added in future versions. This option is ignored if both loci have less than three alleles. If ``"main"`` and there are several equally most frequent alleles, the first-occurring one is used (arbitrarily). :param min_freq: Only used if at least one site has more than two alleles and *multiple_policy* is set to *average*. Set the minimum absolute frequency to consider an allele. :return: A :class:`dict` of linkage disequilibrium statistics. In case statistics cannot be computed (either site fixed, or less than two samples with non-missing data at both sites), computed values are replaced by ``None``. ``n`` gives the number of pairs of alleles considered (0 if statistics are not computed at all). """ if locus1.ns != locus2.ns: raise ValueError('the number of samples must match in the two loci') if locus1.ns < 2: raise ValueError('the number of samples must be at least 2') if struct is None: struct = _eggwrapper.StructureHolder() struct.mk_dummy_structure(locus1.ns, 1) struct2 = struct else: if struct.req_ns > locus1.ns: raise ValueError('mismatch with structure') struct2 = struct.make_auxiliary()._obj struct = struct._obj site1 = _eggwrapper.Genotypes() site2 = _eggwrapper.Genotypes() site1.process(locus1._obj, struct) site2.process(locus2._obj, struct) frq1 = _eggwrapper.FreqHolder() frq1.setup_structure(struct2) frq1.process_site(site1.site()) frq2 = _eggwrapper.FreqHolder() frq2.setup_structure(struct2) frq2.process_site(site2.site()) ld = _eggwrapper.PairwiseLD() if not ld.process(site1.site(), site2.site(), frq1, frq2, struct2, 0, 1.0): return {'D': None, 'Dp': None, 'r': None, 'rsq': None, 'n': 0} return _get_stats(ld, multiple_policy, min_freq)
[docs]def matrix_LD(align, stats, struct=None, multiple_policy='main', min_freq=0, min_n=2, max_maj=1.0, positions=None): """ Linkage disequilibrium statistics between all pairs of sites. The computed statistics are selected by an argument of this function. Return a matrix (as a nested :class:`list`) of the requested statistics. In all cases, all pairs of sites are present in the returned matrices. If statistics cannot be computed, they are replaced by :data:`None`. The available statistics are: * ``d`` -- Distance between sites of the pairs. * ``D`` -- Standard linkage disequilibrium. * ``Dp`` -- Lewontin's D'. * ``r`` -- Correlation coefficient. * ``rsq`` -- Equivalent to r\ :sup:`2`. :param align: An :class:`.Align` instance. :param stats: Requested statistic or statistics (see list of available statistics above, as a single string or as a list of one or more of these statistics (in any order). :param struct: A :class:`.Structure` object used to describe the individual level. If specified, genotypes are automatically described and genotypic linkage is processed. By default, consider all samples as originating from haploid individuals. :param multiple_policy: Specify what is done for pairs of sites for which at least one locus has only one allele. See :func:`pairwise_LD` for further description. :param min_freq: Only used if at least one site has more than two alleles and depending on the value of *multiple_policy*. See :func:`pairwise_LD` for further description. :param min_n: Minimum number of samples used (this value must always be larger than 1). Sites not fulfilling this criterion will be dropped. :param max_maj: Maximum relative frequency of the majority allele. Sites not fulfilling this criterion will be dropped. :param positions: A sequence of positions, whose length must match the number of sites of the provided alignment. Used in the return value to describe the used sites, and, if requested, to compute the distance between sites. By default, the position of sites in the original alignment is used. :return: A :class:`tuple` with two items: first is the :class:`list` of positions of sites used in the matrix (a subset of the sites of the provided alignment), with positions provided by the corresponding argument (by default, the index of sites); second is the matrix, as the nested lower half matrix. The matrix contains items for all i and j indexes with 0 <= j <= i < n where n is the number of retained sites. The content of the matrix is represented by a single value (if a single statistic has been requested) or as a list of 1 or more values (if a list of 1 or more, accordingly, statistics have been requested), or ``None`` for the diagonal or if the pairwise comparison was dropped for any reason. """ # initialize local variables mLD = _eggwrapper.MatrixLD() retained = [] if struct is None: struct = _eggwrapper.StructureHolder() struct.mk_dummy_structure(align.ns, 1) else: struct = struct._obj mLD.set_structure(struct) # check arguments min_n = int(min_n) if min_n < 2: raise ValueError('too small value for `min_n` argument: {0}'.format(min_n)) max_maj = float(max_maj) if max_maj < 0.5 or max_maj > 1.0: raise ValueError('invalid value for `max_maj` argument: {0}'.format(max_maj)) if positions is None: positions = list(map(float, range(align.ls))) elif len(positions) != (struct.get_ni() + struct.get_no()): raise ValueError('list of positions does not have the right number of items') if stats in ['D', 'Dp', 'r', 'rsq']: multi = False stats = [stats] else: multi = True # make a list of variable sites genosite = _eggwrapper.Genotypes() genosites = [] # need to be kept to perform calculations site = _site.Site() final_positions = [] sd = _eggwrapper.SiteDiversity() frq = _eggwrapper.FreqHolder() frq.setup_structure(struct) for i in range(align.ls): site.from_align(align, i) genosite.process(site._obj, struct) frq.process_site(genosite.site()) if (sd.process(frq)&2) != 0 and sd.Aing() > 1.0: final_positions.append(positions[i]) mLD.load(genosite, positions[i]) genosites.append(genosite) genosite = _eggwrapper.Genotypes() matrix = [[None for j in range(i+1)] for i in range(len(genosites))] # compute LD mLD.computeLD(min_n, max_maj) # extract requested values for i in range(mLD.num_pairs()): pld = mLD.pairLD(i) idx2 = mLD.index1(i) idx1 = mLD.index2(i) # reverse indexes computed_stats = _get_stats(pld, multiple_policy, min_freq) matrix[idx1][idx2] = [] for stat in stats: if stat == 'd': matrix[idx1][idx2].append( final_positions[idx1] - final_positions[idx2] ) elif stat == 'D': matrix[idx1][idx2].append( computed_stats['D'] ) elif stat == 'Dp': matrix[idx1][idx2].append( computed_stats['Dp'] ) elif stat == 'r': matrix[idx1][idx2].append( computed_stats['r'] ) elif stat == 'rsq': matrix[idx1][idx2].append( computed_stats['rsq'] ) else: raise ValueError('invalid statistic code: `{0}`'.format(stat)) if not multi: matrix[idx1][idx2] = matrix[idx1][idx2][0] # return return final_positions, matrix