Source code for enigmatoolbox.utils.parcellation

"""Utility functions for parcellations/labelings."""

# Author: Oualid Benkarim <oualid.benkarim@mcgill.ca>
# License: BSD 3 clause

# Last modifications:
#   Sara Lariviere <Aug2020>

import os
import numpy as np
from scipy.stats import mode
from scipy.optimize import linear_sum_assignment

from sklearn.utils.extmath import weighted_mode


def relabel_consecutive(lab, start_from=0):
    """Relabel array with consecutive values.

    Parameters
    ----------
    lab : ndarray
        Array to relabel.
    start_from : int, optional
        Initial label. The default is 0.

    Returns
    -------
    new_lab : ndarray
        Array with consecutive labels.

    """
    new_lab = np.empty_like(lab)
    new_lab[:] = np.unique(lab, return_inverse=True)[1]
    new_lab += start_from
    return new_lab


def relabel(lab, new_labels=None):
    """Relabel array.

    Parameters
    ----------
    lab : array_like
        Array to relabel.
    new_labels: array_like or dict, optional
        New labels. If dict, provide new label for each label in input array.
        If array_like, mapping is performed in ascending order. If None,
        relabel consecutively, starting from 0. Default is None.

    Returns
    -------
    new_lab : ndarray
        Array with new labels.

    """
    if isinstance(new_labels, dict):
        new_lab = lab.copy()
        for l1, l2 in new_labels.items():
            new_lab[lab == l1] = l2
        return new_lab

    if new_labels is None:
        return relabel_consecutive(lab)

    keys = np.unique(lab)[:new_labels.size]
    return relabel(lab, dict(zip(keys, new_labels)))


def find_label_correspondence(lab1, lab2):
    """Find label correspondences.


    Parameters
    ----------
    lab1 : ndarray, shape = (n_lab,)
        First array of labels.
    lab2 : ndarray, shape = (n_lab,)
        Second array of labels.

    Returns
    -------
    dict
        Dictionary with label correspondences between first and second arrays.

    Notes
    -----
    Correspondences are based on largest overlap using the Hungarian algorithm.
    """
    u1, idx1 = np.unique(lab1, return_inverse=True)
    u2, idx2 = np.unique(lab2, return_inverse=True)

    upairs, n_overlap = np.unique(list(zip(idx1, idx2)), axis=0,
                                  return_counts=True)

    cost = np.full((u1.size, u2.size), max(lab1.size, lab2.size),
                   dtype=np.float32)
    cost[tuple([*upairs.T])] -= n_overlap
    ridx, cidx = linear_sum_assignment(cost)

    return dict(zip(u1[ridx], u2[cidx]))


def relabel_by_overlap(lab, ref_lab):
    """Relabel according to overlap with reference.

    Parameters
    ----------
    lab : ndarray, shape = (n_lab,)
        Array of labels.
    ref_lab : ndarray, shape = (n_lab,)
        Reference array of labels.

    Returns
    -------
    new_lab : ndarray, shape = (n_lab,)
        Array relabeled using the reference array.

    Notes
    -----
    Correspondences between labels are based on largest overlap using the
    Hungarian algorithm.
    """
    u1 = np.unique(lab)
    u2 = np.unique(ref_lab)
    if u1.size > u2.size:
        thresh = lab.max() + 1
        lab_shifted = lab + thresh

        lab_corr = find_label_correspondence(lab_shifted, ref_lab)
        lab_shifted = relabel(lab_shifted, new_labels=lab_corr)

        ulab = np.unique(lab_shifted)
        ulab = ulab[ulab >= thresh]
        map_seq = dict(zip(ulab, np.arange(ulab.size) + ref_lab.max() + 1))
        return relabel(lab_shifted, new_labels=map_seq)

    lab_corr = find_label_correspondence(lab, ref_lab)
    return relabel(lab, new_labels=lab_corr)


def map_to_mask(values, mask, fill=0, axis=0):
    """Assign data to mask.

    Parameters
    ----------
    values : ndarray, shape = (n_rows, n_cols) or (n_cols,)
        Source array of values.
    mask : ndarray, shape = (n_mask,)
        Mask of boolean values. Data is mapped to mask.
        If `values` is 2D, the mask is applied according to `axis`.
    fill : float, optional
        Value used to fill elements outside the mask. Default is 0.
    axis : {0, 1}, optional
        If ``axis == 0`` map rows. Otherwise, map columns. Default is 0.

    Returns
    -------
    output : ndarray
        Values mapped to mask. If `values` is 1D, shape (n_mask,).
        When `values` is 2D, shape (n_rows, n_mask) if ``axis == 0`` and
        (n_mask, n_cols) otherwise.

    """
    if np.issubdtype(values.dtype, np.integer) and not np.isfinite(fill):
        raise ValueError("Cannot use non-finite 'fill' with integer arrays.")

    if values.ndim == 1:
        axis = 0

    values2d = np.atleast_2d(values)
    n = values2d.shape[axis]
    mapped = np.full((n, mask.size), fill, dtype=values.dtype)
    mapped[:, mask] = values2d if axis == 0 else values2d.T

    if values.ndim == 1:
        return mapped[0]
    if axis == 1:
        return mapped.T
    return mapped


[docs]def parcel_to_surface(source_val, target_lab, mask=None, fill=0, source_lab=None): """Map data in source to target according to their labels (authors: @OualidBenkarim, @saratheriver) Target labels are sorted in ascending order, such that the smallest label indexes the value at position 0 in `source_val`. If `source_lab` is specified, any label in `target_lab` must be in `source_lab`. Parameters ---------- source_val : ndarray, shape = (n_val,) Source array of values. target_lab : can be a string (e.g., aparc_fsa5) or an ndarray, shape = (n_lab,) Target labels. mask : ndarray, shape = (n_lab,), optional If mask is not None, only consider target labels in mask. Default is None. fill : float, optional Value used to fill elements outside the mask. Default is 0. source_lab : ndarray, shape = (n_val,), optional Source labels for source values. If None, use unique labels in `target_lab` in ascending order. Default is None. Returns ------- target_val : ndarray, shape = (n_lab,) Target array with corresponding source values. """ if isinstance(target_lab, str): fname = target_lab + '.csv' parc_pth = os.path.dirname(os.path.dirname(__file__)) + '/datasets/parcellations/' + fname target_lab = np.loadtxt(parc_pth, dtype=np.int) if source_val.size == 68 and np.unique(target_lab).size == 71: a_idx = list(range(1, 4)) + list(range(5, 39)) + list(range(40, 71)) ddk = np.ones(71) * fill ddk[a_idx] = source_val source_val = ddk elif np.max(source_val.size) % 100 == 0 and np.unique(target_lab).size in np.arange(101, 1002, 100): source_val = np.append(0, source_val) elif np.max(source_val.size) % 10 == 0 and np.unique(target_lab).size == 361: source_val = np.append(0, source_val) if mask is not None: target_lab2 = target_lab[mask] labs2 = parcel_to_surface(source_val, target_lab2, source_lab=source_lab) return map_to_mask(labs2, mask, fill=fill) if source_lab is None: _, idx_tl = np.unique(target_lab, return_inverse=True) return source_val[idx_tl] if source_lab.size != source_val.size: raise ValueError('Source values and labels must have same size.') uq_sl, idx_sl = np.unique(source_lab, return_inverse=True) if source_lab.size != uq_sl.size: raise ValueError('Source labels must have distinct labels.') source_val = source_val[idx_sl] return source_val[target_lab]
def _get_redop(red_op, weights=None, axis=None): if red_op in ['mean', 'average']: if weights is None: def fred(x, w): return np.mean(x, axis=axis) else: def fred(x, w): return np.average(x, weights=w, axis=axis) elif red_op == 'median': def fred(x, w): return np.median(x, axis=axis) elif red_op == 'mode': if weights is None: def fred(x, w): return mode(x, axis=axis)[0].ravel() else: def fred(x, w): return weighted_mode(x, w, axis=axis) elif red_op == 'sum': def fred(x, w): return np.sum(x if w is None else w * x, axis=axis) elif red_op == 'max': def fred(x, w): return np.max(x, axis=axis) elif red_op == 'min': def fred(x, w): return np.min(x, axis=axis) else: raise ValueError("Unknown reduction operation '{0}'".format(red_op)) return fred
[docs]def surface_to_parcel(values, labels, weights=None, target_labels=None, red_op='mean', axis=0, dtype=np.float): """Summarize data in `values` according to `labels` (author: @OualidBenkarim) Parameters ---------- values : 1D or 2D ndarray Array of values. labels : name of parcellation or 1D ndarray, shape = (n_lab,) Labels used summarize values. weights : 1D ndarray, shape = (n_lab,), optional Weights associated with labels. Only used when `red_op` is 'average', 'mean', 'sum' or 'mode'. Weights are not normalized. Default is None. target_labels : 1D ndarray, optional Target labels. Arrange new array following the ordering of labels in the `target_labels`. When None, new array is arranged in ascending order of `labels`. Default is None. red_op : str or callable, optional How to summarize data. If str, options are: {'min', 'max', 'sum', 'mean', 'median', 'mode', 'average'}. If callable, it should receive a 1D array of values, array of weights (or None) and return a scalar value. Default is 'mean'. dtype : dtype, optional Data type of output array. Default is float. axis : {0, 1}, optional If ``axis == 0``, apply to each row (reduce number of columns per row). Otherwise, apply to each column (reduce number of rows per column). Default is 0. Returns ------- target_values : ndarray Summarized target values. """ if isinstance(labels, str): fname = labels + '.csv' parc_pth = os.path.dirname(os.path.dirname(__file__)) + '/datasets/parcellations/' + fname labels = np.loadtxt(parc_pth, dtype=np.int) if axis == 1 and values.ndim == 1: axis = 0 if target_labels is None: uq_tl = np.unique(labels) idx_back = None else: uq_tl, idx_back = np.unique(target_labels, return_inverse=True) if weights is not None: weights = np.atleast_2d(weights) v2d = np.atleast_2d(values) if axis == 1: v2d = v2d.T if isinstance(red_op, str): fred = _get_redop(red_op, weights=weights, axis=1) else: fred = red_op mapped = np.empty((v2d.shape[0], uq_tl.size), dtype=dtype) for ilab, lab in enumerate(uq_tl): mask = labels == lab wm = None if weights is None else weights[:, mask] if isinstance(red_op, str): mapped[:, ilab] = fred(v2d[:, mask], wm) else: for idx in range(v2d.shape[0]): mapped[idx, ilab] = fred(v2d[idx, mask], wm) if idx_back is not None: mapped = mapped[:, idx_back] if axis == 1: mapped = mapped.T if values.ndim == 1: return mapped[0] return mapped
[docs]def subcorticalvertices(subcortical_values=None): """ Map one value per subcortical area to surface vertices (author: @saratheriver) Parameters ---------- subcortical_values : 1D ndarray Shape = (16,), order of subcortical structure must be = L_accumbens, L_amygdala, L_caudate, L_hippocampus, L_pallidun, L_putamen, L_thalamus, L_ventricles, R_accumbens, R_amygdala, R_caudate, R_hippocampus, R_pallidun, R_putamen, R_thalamus, R_ventricles Returns ------- data : 1D ndarray Transformed data, shape = (51278,) """ numvertices = [867, 1419, 3012, 3784, 1446, 4003, 3726, 7653, 838, 1457, 3208, 3742, 1373, 3871, 3699, 7180] data = [] if isinstance(subcortical_values, np.ndarray): for ii in range(16): data.append(np.tile(subcortical_values[ii], (numvertices[ii], 1))) data = np.vstack(data).flatten() return data