Source code for glhmm.prediction

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Prediction from Gaussian Linear Hidden Markov Model
@author: Christine Ahrends 2023
"""

import numpy as np
import sys
from sklearn import preprocessing, model_selection, kernel_ridge, linear_model, svm
from sklearn import metrics as ms
import igraph as ig
import warnings
from . import glhmm, utils


[docs] def compute_gradient(hmm, Y, incl_Pi=True, incl_P=True, incl_Mu=False, incl_Sigma=True): """Computes the gradient of the log-likelihood for timeseries Y with respect to specified HMM parameters Parameters: ----------- hmm : HMM object An instance of the HMM class, estimated on the group-level Y : array-like of shape (n_samples, n_variables_2) (subject- or session-level) timeseries data incl_Pi : bool, default=True whether to compute gradient w.r.t state probabilities incl_P : bool, default=True whether to compute gradient w.r.t. transition probabilities incl_Mu : bool, default=False whether to compute gradient w.r.t state means (only possible if state means were estimated during training) incl_Sigma : bool, default=False whether to compute gradient w.r.t. state covariances (for now only for full covariance matrix) Returns: -------- hmmgrad : array of shape (sum(len(requested_parameters))) Raises: ------- Exception If the model has not been trained or if requested parameters do not exist (e.g. if Mu is requested but state means were not estimated) Notes: ------ Does not include gradient computation for X and beta """ if not hmm.trained: raise Exception("The model has not yet been trained") Pi = hmm.Pi P = hmm.P K = hmm.hyperparameters["K"] # number of states q = Y.shape[1] # number of variables (regions/parcels) subHMM, subGamma, subXi_tmp = hmm.dual_estimate(X=None, Y=Y, for_kernel=True) GammaT = subGamma.transpose() subXi = np.mean(subXi_tmp, axis=0) Xisum = np.sum(subXi, axis=1) Xisumreal = Xisum # make sure that values are not getting too close to 0 for i in range(K): Xisumreal[i] = max(Xisum[i], sys.float_info.min) XiT = subXi.transpose() Xi_tmp = XiT / Xisumreal Xi = Xi_tmp.transpose() # gradient w.r.t. state prior: if incl_Pi: Pireal = Pi # make sure that values are not getting too close to 0 for i in range(K): Pireal[i] = max(Pi[i], sys.float_info.min) dPi = GammaT[:,0] / Pireal # gradient w.r.t. transition probabilities: if incl_P: Preal = P # make sure that values are not getting too close to 0 for i in range(K): for j in range(K): Preal[i,j] = max(P[i,j], sys.float_info.min) dP = Xi / Preal if (incl_Mu) or (incl_Sigma): Sigma = np.zeros(shape=(q, q, K)) invSigma = np.zeros(shape=(q, q, K)) for k in range(K): Sigma[:,:,k] = hmm.get_covariance_matrix(k=k) invSigma[:,:,k] = hmm.get_inverse_covariance_matrix(k=k) # gradient w.r.t. state means if incl_Mu: if hmm.hyperparameters["model_mean"] == 'no': raise Exception("Cannot compute gradient w.r.t state means because state means were not modelled") Mu = hmm.get_means() dMu = np.zeros(shape=(q, K)) for k in range(K): Xi_V = Y - Mu[:,k] Xi_VT = Xi_V.transpose() iSigmacond = np.matmul(invSigma[:,:,k], Xi_VT) GamiSigmacond = GammaT[k,:]*iSigmacond dMu[:,k] = np.sum(GamiSigmacond, axis=1) # gradient w.r.t. state covariances if incl_Sigma: dSigma = np.zeros(shape=(q, q, K)) YT = Y.transpose() for k in range(K): if not hmm.hyperparameters["model_mean"] == 'no': Mu = hmm.get_means() Xi_V = Y- Mu[:,k] else: Xi_V = Y Xi_VT = Xi_V.transpose() Gamma_tmp = -sum(GammaT[k,:]/2) GamiSigma = Gamma_tmp * invSigma[:,:,k] GamXi = GammaT[k,:] * Xi_VT GamXi2 = np.matmul(GamXi, Xi_V) iSigmaGamXi = 0.5 * np.matmul(invSigma[:,:,k], GamXi2) iSigmaGamXi2 = np.matmul(iSigmaGamXi, invSigma[:,:,k]) dSigma[:,:,k] = GamiSigma + iSigmaGamXi2 hmmgrad = np.empty(shape=0) if incl_Pi: hmmgrad = -dPi if incl_P: dP_flat = np.ndarray.flatten(dP) hmmgrad = np.concatenate((hmmgrad, -dP_flat)) if incl_Mu: dMu_flat = np.ndarray.flatten(dMu) hmmgrad = np.concatenate((hmmgrad, -dMu_flat)) if incl_Sigma: dSigma_flat = np.ndarray.flatten(dSigma) hmmgrad = np.concatenate((hmmgrad, -dSigma_flat)) return hmmgrad
[docs] def hmm_kernel(hmm, Y, indices, type='Fisher', shape='linear', incl_Pi=True, incl_P=True, incl_Mu=False, incl_Sigma=True, tau=None, return_feat=False, return_dist=False): """Constructs a kernel from an HMM, as well as the respective feature matrix and/or distance matrix Parameters: ----------- hmm : HMM object An instance of the HMM class, estimated on the group-level Y : array-like of shape (n_samples, n_variables_2) (group-level) timeseries data indices : array-like of shape (n_sessions, 2) The start and end indices of each trial/session in the input data. Note that kernel cannot be computed if indices=None type : str, optional The type of kernel to be constructed (default: 'Fisher') shape : str, optional The shape of kernel to be constructed, either 'linear' or 'Gaussian' (default: 'linear') incl_Pi : bool, default=True whether to include state probabilities in kernel construction incl_P : bool, default=True whether to include transition probabilities in kernel construction incl_Mu : bool, default=False whether to include state means in kernel construction (only possible if state means were estimated during training) incl_Sigma : bool, default=False whether to include state covariances in kernel construction (for now only for full covariance matrix) return_feat : bool, default=False whether to return also the feature matrix return_dist : bool, default=False whether to return also the distance matrix Returns: -------- kernel : array of shape (n_sessions, n_sessions) HMM Kernel for subjects/sessions contained in Y feat : array of shape (n_sessions, sum(len(requested_parameters))) Feature matrix for subjects/sessions contained in Y for requested parameters dist : array of shape (n_sessions, n_sessions) Distance matrix for subjects/sessions contained in Y Raises: ------- Exception If the hmm has not been trained or if requested parameters do not exist (e.g. if Mu is requested but state means were not estimated) If kernel other than Fisher kernel is requested Notes: ------ Does not include X and beta in kernel construction Only Fisher kernel implemented at this point """ if not hmm.trained: raise Exception("The model has not yet been trained") S = indices.shape[0] K = hmm.hyperparameters["K"] q = Y.shape[1] feat_dim = incl_Pi*K + incl_P*K*K + incl_Mu*K*q + incl_Sigma*K*q*q feat = np.zeros(shape=(S,feat_dim)) for s in range(S): Y_s = Y[indices[s,0]:indices[s,1],:] if type=='Fisher': feat[s,:] = compute_gradient(hmm=hmm, Y=Y_s, incl_Pi=incl_Pi, incl_P=incl_P, incl_Mu=incl_Mu, incl_Sigma=incl_Sigma) else: raise Exception("This kernel is not yet implemented. Use type='Fisher' instead") if shape=='linear': featT = feat.transpose() kernel = feat @ featT # inner product if shape=='Gaussian': dist = np.zeros(shape=(S,S)) for i in range(S): for j in range(S): dist[i,j] = np.sqrt(sum(abs(feat[i,:]-feat[j,:])**2))**2 if not tau: tau = 1 kernel = np.exp(-dist/(2*tau**2)) if return_feat and not return_dist: return kernel, feat elif return_dist and not return_feat: if shape=='linear': raise Exception("Distance matrix is not defined for linear kernel") else: return kernel, dist elif return_feat and return_dist: if shape=='linear': raise Exception("Distance matrix is not defined for linear kernel") else: return kernel, feat, dist else: return kernel
[docs] def get_summ_features(hmm, Y, indices, metrics): """Util function to get summary features from HMM. Output can be used as input features for ML Parameters: ----------- hmm : HMM object An instance of the HMM class, estimated on the group-level Y : array-like of shape (n_samples, n_variables_2) (group-level) timeseries data indices : array-like of shape (n_sessions, 2) The start and end indices of each trial/session in the input data. Note that kernel cannot be computed if indices=None metrics : list names of metrics to be extracted. For now, this should be one or more of 'FO', 'switching_rate', 'lifetimes' Returns: -------- features : array-like of shape (n_sessions, n_features) The HMM summary metrics collected into a feature matrix """ # Note: lifetimes uses the mean lifetimes (change code if you want to use median or max lifetimes instead) if not set(metrics).issubset(['FO', 'switching_rate', 'lifetimes']): raise Exception('Requested summary metrics are not recognised. Use one or more of FO, switching rate, or lifetimes (for now)') features_tmp = np.zeros(shape=(indices.shape[0],1)) # get Gamma or vpath from hmm: if 'FO' in metrics or 'switching_rate' in metrics: Gamma,_,_ = hmm.decode(X=None, Y=Y, indices=indices, viterbi=False) if 'lifetimes' in metrics: vpath = hmm.decode(X=None, Y=Y, indices=indices, viterbi=True) if 'FO' in metrics: FO = utils.get_FO(Gamma, indices) features_tmp = np.append(features_tmp, FO, axis=1) if 'switching_rate' in metrics: SR = utils.get_switching_rate(Gamma, indices) features_tmp = np.append(features_tmp, SR, axis=1) if 'lifetimes' in metrics: LT,_,_ = utils.get_life_times(vpath, indices) features_tmp = np.append(features_tmp, LT, axis=1) features = features_tmp[:,1:] return features
[docs] def get_groups(group_structure): """Util function to get groups from group structure matrix such as family structure. Output can be used to make sure groups/families are not split across folds during cross validation, e.g. using sklearn's GroupKFold. Groups are defined as components in the adjacency matrix. Parameter: ---------- group_structure : array-like of shape (n_sessions, n_sessions) a matrix specifying the structure of the dataset, with positive values indicating relations between sessions(/subjects) and zeros indicating no relations. Note: The diagonal will be set to 1 Returns: -------- cs : array-like of shape (n_sessions,) 1D array containing the group each session belongs to """ # make sure diagonal of family matrix is 1: cs2 = group_structure np.fill_diagonal(cs2, 1) # create adjacency graph csG = ig.Graph.Adjacency(cs2) # get groups (connected components in adjacency matrix) groups = csG.connected_components() cs_tmp = groups.membership cs = np.asarray(cs_tmp) return cs
[docs] def deconfound(Y, confX, betaY=None, my=None): """Deconfound """ if betaY is None: betaY = np.zeros(shape=Y.shape) my = np.mean(Y) Y = Y - my if confX.ndim==1: confX = np.reshape(confX, (-1,1)) confXT = confX.T with warnings.catch_warnings(): warnings.simplefilter("ignore") betaY_tmp = np.linalg.lstsq((np.matmul(confXT, confX)) + 0.00001 * np.identity(confX.shape[1]), np.matmul(confXT, Y)) betaY = betaY_tmp[0] res = Y - np.matmul(confX,betaY) Y = res return betaY, my, Y
[docs] def reconfound(Y, conf, betaY, my): """Reconfound """ if conf.ndim==1: conf = np.reshape(conf, (-1,1)) Y = Y + np.matmul(conf, betaY) + my return Y
[docs] def predict_phenotype(hmm, Y, behav, indices, predictor='Fisherkernel', estimator='KernelRidge', options=None): """Predict phenotype from HMM This uses either the Fisher kernel (default) or a set of HMM summary metrics to predict a phenotype, in a nested cross-validated way. By default, X and Y are standardised/centered unless deconfounding is used. Estimators so far include: Kernel Ridge Regression and Ridge Regression Cross-validation strategies so far include: KFold and GroupKFold Hyperparameter optimization strategies so far include: only grid search Parameters: ----------- hmm : HMM object An instance of the HMM class, estimated on the group-level Y : array-like of shape (n_samples, n_variables_2) (group-level) timeseries data behav : array-like of shape (n_sessions,) phenotype, behaviour, or other external variable to be predicted indices : array-like of shape (n_sessions, 2) The start and end indices of each trial/session in the input data. Note that this function does not work if indices=None predictor : char (optional, default to 'Fisherkernel') What to predict from, either 'Fisherkernel' or 'summary_metrics' (default='Fisherkernel') estimator : char (optional, default to 'KernelRidge') Model to be used for prediction (default='KernelRidge') This should be the name of a sklearn base estimator (for now either 'KernelRidge' or 'Ridge') options : dict (optional, default to None) general relevant options are: 'CVscheme': char, which CVscheme to use (default: 'GroupKFold' if group structure is specified, otherwise: KFold) 'nfolds': int, number of folds k for (outer and inner) k-fold CV loops 'group_structure': ndarray of (n_sessions, n_sessions), matrix specifying group structure: positive values if sessions(/subjects) are related, zeros otherwise 'confounds': array-like of shape (n_sessions,) or (n_sessions, n_confounds) containing confounding variables 'return_scores': bool, whether to return also the model scores of each fold 'return_models': bool, whether to return also the trained models of each fold 'return_hyperparams': bool, whether to return also the optimised hyperparameters of each fold possible hyperparameters for model, e.g. 'alpha' for (kernel) ridge regression for Fisher kernel, relevant options are: 'shape': char, either 'linear' or 'Gaussian' (TO DO) 'incl_Pi': bool, whether to include the gradient w.r.t. the initial state probabilities when computing the Fisher kernel 'incl_P': bool, whether to include the gradient w.r.t. the transition probabilities 'incl_Mu': bool, whether to include the gradient w.r.t. the state means (note that this only works if means were not set to 0 when training HMM) 'incl_Sigma': bool, whether to include the gradient w.r.t. the state covariances for summary metrics, relevant options are: 'metrics': list of char, containing metrics to be included as features Returns: -------- results : dict containing 'behav_pred': predicted phenotype on test sets 'corr': correlation coefficient between predicted and actual values (if requested): 'scores': the model scores of each fold 'models': the trained models from each fold 'hyperparams': the optimised hyperparameters of each fold Raises: ------- Exception If the hmm has not been trained or if necessary input is missing Notes: ------ If behav contains NaNs, these subjects/sessions will be removed in Y and confounds """ # check conditions if not hmm.trained: raise Exception("The model has not yet been trained") if behav is None: raise Exception("Phenotype to be predicted needs to be provided") if indices is None: raise Exception("To predict phenotype from HMM, indices need to be provided") # check options or set default: if options is None: options = {} # necessary options for Fisher kernel: if predictor=='Fisherkernel': if not 'shape' in options: shape='linear' else: shape=options['shape'] if 'incl_Pi' in options: incl_Pi = options['incl_Pi'] else: incl_Pi = True if 'incl_P' in options: incl_P = options['incl_P'] else: incl_P = True if 'incl_Mu' in options: incl_Mu = options['incl_Mu'] else: incl_Mu = False if 'incl_Sigma' in options: incl_Sigma = options['incl_Sigma'] else: incl_Sigma = True estimator='KernelRidge' # necessary options for summary metrics if predictor=='summary_metrics': if not 'metrics' in options: metrics = ['FO', 'switching_rate', 'lifetimes'] else: metrics = options['metrics'] estimator='Ridge' # other necessary options if not 'CVscheme' in options: CVscheme = 'KFold' else: CVscheme = options['CVscheme'] if not 'nfolds' in options: nfolds = 5 else: nfolds = options['nfolds'] if not 'group_structure' in options: do_groupKFold = False allcs = None else: do_groupKFold = True allcs = options['group_structure'] CVscheme = 'GroupKFold' if not 'confounds' in options: confounds = None deconfounding = False else: confounds = options['confounds'] if confounds.ndim==1: confounds = confounds.reshape((-1,1)) deconfounding = True # find and remove missing values ind = ~np.isnan(behav) behav = behav[ind] indices = indices[ind,:] # to remove subjects' timeseries whose behavioural variable is missing if deconfounding: confounds = confounds[ind,:] # N = indices.shape[0] # number of samples N = sum(ind == True) # get features/kernel from HMM to be predicted from (default: Fisher kernel): if predictor=='Fisherkernel': if shape=='linear': tau=None elif shape=='Gaussian': tau=options['tau'] Xin = hmm_kernel(hmm, Y, indices, type='Fisher', shape=shape, incl_Pi=incl_Pi, incl_P=incl_P, incl_Mu=incl_Mu, incl_Sigma=incl_Sigma, tau=tau, return_feat=False, return_dist=False) # alternative: predict from HMM summary metrics elif predictor=='summary_metrics': Xin = get_summ_features(hmm, Y, indices, metrics) # create CV folds if do_groupKFold: # when using family/group structure - use GroupKFold cs = get_groups(allcs) cs = cs[ind] # remove missing values cvfolds = model_selection.GroupKFold(n_splits=nfolds) cvfolds.get_n_splits(Y, behav, cs) elif CVscheme=='KFold': # when not using family/group structure cvfolds = model_selection.KFold(n_splits=nfolds) cvfolds.get_n_splits(Y, behav) # create empty return structures behav_pred = np.zeros(shape=N) behav_pred_scaled = np.zeros(shape=N) behav_mean = np.zeros(shape=N) if deconfounding: behavD = np.zeros(shape=N) behav_predD = np.zeros(shape=N) behav_meanD = np.zeros(shape=N) # optional return: if 'return_scores' in options and options['return_scores']==True: scores = list() return_scores = True if deconfounding: scores_deconf = list() else: return_scores = False if 'return_models'in options and options['return_models']==True: models = list() return_models = True else: return_models = False if 'return_hyperparams' in options and options['return_hyperparams']==True: hyperparams = list() return_hyperparams = True else: return_hyperparams = False # main prediction: # KRR (default for Fisher kernel): if estimator=='KernelRidge': if not 'alpha' in options: alphas = np.logspace(-4, -1, 6) else: alphas = options['alpha'] model = kernel_ridge.KernelRidge(kernel="precomputed") if do_groupKFold: for train, test in cvfolds.split(Xin, behav, groups=cs): behav_train = behav[train] behav_mean[test] = np.mean(behav_train) behav_train_scaled = np.zeros(shape=behav_train.shape) if not deconfounding: scaler_y = preprocessing.StandardScaler().fit(behav_train.reshape(-1,1)) behav_train_scaled = scaler_y.transform(behav_train.reshape(-1,1)) behav_train_scaled = behav_train_scaled.reshape(-1,1) # deconfounding: elif deconfounding: confounds_train = confounds[train,:] CbetaY, CinterceptY, behav_train_scaled = deconfound(behav_train, confounds_train) # train model and make predictions: Xin_train = Xin[train, train.reshape(-1,1)] center_K = preprocessing.KernelCenterer().fit(Xin_train) Xin_train_scaled = center_K.transform(Xin_train) model_tuned = model_selection.GridSearchCV(estimator=model, param_grid=dict(alpha=alphas), cv=cvfolds) model_tuned.fit(Xin_train_scaled, behav_train_scaled, groups=cs[train]) Xin_test = Xin[train, test.reshape(-1,1)] Xin_test_scaled = center_K.transform(Xin_test) behav_pred_scaled_tmp = model_tuned.predict(Xin_test_scaled) behav_pred_scaled[test] = behav_pred_scaled_tmp.flatten() if not deconfounding: behav_pred_tmp = scaler_y.inverse_transform(behav_pred_scaled_tmp) behav_pred[test] = behav_pred_tmp.flatten() elif deconfounding: # in deconfounded space behav_pred[test] = behav_pred_scaled[test] behav_predD[test] = behav_pred[test] behavD[test] = behav[test] behav_meanD[test] = behav_mean[test] _,_,behavD[test] = deconfound(behavD[test], confounds[test,:], CbetaY, CinterceptY) behav_predD[test] = reconfound(behav_predD[test], confounds[test,:], CbetaY, CinterceptY) behav_meanD[test] = reconfound(behav_meanD[test], confounds[test,:], CbetaY, CinterceptY) # get additional output if return_scores: behav_test = behav[test] if not deconfounding: behav_test_scaled = scaler_y.transform(behav_test) scores.append(model_tuned.score(Xin_test_scaled, behav_test_scaled)) elif deconfounding: scores.append(model_tuned.score(Xin_test_scaled, behav_test)) scores_deconf.append(model_tuned.score(Xin_test_scaled, behavD[test])) if return_models: models.append(model_tuned) if return_hyperparams: hyperparams.append(model_tuned.best_estimator_.alpha) else: # KFold CV not accounting for family structure for train, test in cvfolds.split(Xin, behav): behav_train = behav[train] behav_mean[test] = np.mean(behav_train) behav_train_scaled = np.zeros(shape=behav_train.shape) if not deconfounding: scaler_y = preprocessing.StandardScaler().fit(behav_train.reshape(-1,1)) behav_train_scaled = scaler_y.transform(behav_train.reshape(-1,1)) behav_train_scaled = behav_train_scaled.reshape(-1,1) # deconfounding: elif deconfounding: confounds_train = confounds[train,:] CbetaY, CinterceptY, behav_train_scaled = deconfound(behav_train, confounds_train) # train model and make predictions: Xin_train = Xin[train, train.reshape(-1,1)] center_K = preprocessing.KernelCenterer().fit(Xin_train) Xin_train_scaled = center_K.transform(Xin_train) model_tuned = model_selection.GridSearchCV(estimator=model, param_grid=dict(alpha=alphas), cv=cvfolds) model_tuned.fit(Xin_train_scaled, behav_train_scaled) Xin_test = Xin[train, test.reshape(-1,1)] Xin_test_scaled = center_K.transform(Xin_test) behav_pred_scaled_tmp = model_tuned.predict(Xin_test_scaled) behav_pred_scaled[test] = behav_pred_scaled_tmp.flatten() if not deconfounding: behav_pred_tmp = scaler_y.inverse_transform(behav_pred_scaled_tmp) behav_pred[test] = behav_pred_tmp.flatten() elif deconfounding: # in deconfounded space behav_pred[test] = behav_pred_scaled[test] behav_predD[test] = behav_pred[test] behavD[test] = behav[test] behav_meanD[test] = behav_mean[test] _,_,behavD[test] = deconfound(behavD[test], confounds[test,:], CbetaY, CinterceptY) behav_predD[test] = reconfound(behav_predD[test], confounds[test,:], CbetaY, CinterceptY) behav_meanD[test] = reconfound(behav_meanD[test], confounds[test,:], CbetaY, CinterceptY) # get additional output if return_scores: behav_test = behav[test] if not deconfounding: behav_test_scaled = scaler_y.transform(behav_test) scores.append(model_tuned.score(Xin_test_scaled, behav_test_scaled)) elif deconfounding: scores.append(model_tuned.score(Xin_test_scaled, behav_test)) scores_deconf.append(model_tuned.score(Xin_test_scaled, behavD[test])) if return_models: models.append(model_tuned) if return_hyperparams: hyperparams.append(model_tuned.best_estimator_.alpha) # RR (default for summary metrics): elif estimator=='Ridge': if not 'alpha' in options: alphas = np.logspace(-4, -1, 6) else: alphas = options['alpha'] model = linear_model.Ridge() if do_groupKFold: for train, test in cvfolds.split(Xin, behav, groups=cs): behav_train = behav[train] behav_mean[test] = np.mean(behav_train) behav_train_scaled = np.zeros(shape=behav_train.shape) if not deconfounding: scaler_y = preprocessing.StandardScaler().fit(behav_train.reshape(-1,1)) behav_train_scaled = scaler_y.transform(behav_train.reshape(-1,1)) behav_train_scaled = behav_train_scaled.reshape(-1,1) # deconfounding: elif deconfounding: confounds_train = confounds[train,:] CbetaY, CinterceptY, behav_train_scaled = deconfound(behav_train, confounds_train) # train model and make predictions: Xin_train = Xin[train,:] scaler_x = preprocessing.StandardScaler().fit(Xin_train) Xin_train_scaled = scaler_x.transform(Xin_train) model_tuned = model_selection.GridSearchCV(estimator=model, param_grid=dict(alpha=alphas), cv=cvfolds) model_tuned.fit(Xin_train_scaled, behav_train_scaled, groups=cs[train]) Xin_test = Xin[test,:] Xin_test_scaled = scaler_x.transform(Xin_test) behav_pred_scaled_tmp = model_tuned.predict(Xin_test_scaled) behav_pred_scaled[test] = behav_pred_scaled_tmp.flatten() if not deconfounding: behav_pred_tmp = scaler_y.inverse_transform(behav_pred_scaled_tmp) behav_pred[test] = behav_pred_tmp.flatten() elif deconfounding: # in deconfounded space behav_pred[test] = behav_pred_scaled[test] behav_predD[test] = behav_pred[test] behavD[test] = behav[test] behav_meanD[test] = behav_mean[test] _,_,behavD[test] = deconfound(behavD[test], confounds[test,:], CbetaY, CinterceptY) behav_predD[test] = reconfound(behav_predD[test], confounds[test,:], CbetaY, CinterceptY) behav_meanD[test] = reconfound(behav_meanD[test], confounds[test,:], CbetaY, CinterceptY) # get additional output if return_scores: behav_test = behav[test] if not deconfounding: behav_test_scaled = scaler_y.transform(behav_test) scores.append(model_tuned.score(Xin_test_scaled, behav_test_scaled)) elif deconfounding: scores.append(model_tuned.score(Xin_test_scaled, behav_test)) scores_deconf.append(model_tuned.score(Xin_test_scaled, behavD[test])) if return_models: models.append(model_tuned) if return_hyperparams: hyperparams.append(model_tuned.best_estimator_.alpha) else: # KFold CV not using family structure for train, test in cvfolds.split(Xin, behav): behav_train = behav[train] behav_mean[test] = np.mean(behav_train) behav_train_scaled = np.zeros(shape=behav_train.shape) if not deconfounding: scaler_y = preprocessing.StandardScaler().fit(behav_train.reshape(-1,1)) behav_train_scaled = scaler_y.transform(behav_train.reshape(-1,1)) behav_train_scaled = behav_train_scaled.reshape(-1,1) # deconfounding: elif deconfounding: confounds_train = confounds[train,:] CbetaY, CinterceptY, behav_train_scaled = deconfound(behav_train, confounds_train) # train model and make predictions: Xin_train = Xin[train,:] scaler_x = preprocessing.StandardScaler().fit(Xin_train) Xin_train_scaled = scaler_x.transform(Xin_train) model_tuned = model_selection.GridSearchCV(estimator=model, param_grid=dict(alpha=alphas), cv=cvfolds) model_tuned.fit(Xin_train_scaled, behav_train_scaled) Xin_test = Xin[test,:] Xin_test_scaled = scaler_x.transform(Xin_test) behav_pred_scaled_tmp = model_tuned.predict(Xin_test_scaled) behav_pred_scaled[test] = behav_pred_scaled_tmp.flatten() if not deconfounding: behav_pred_tmp = scaler_y.inverse_transform(behav_pred_scaled_tmp) behav_pred[test] = behav_pred_tmp.flatten() elif deconfounding: # in deconfounded space behav_pred[test] = behav_pred_scaled[test] behav_predD[test] = behav_pred[test] behavD[test] = behav[test] behav_meanD[test] = behav_mean[test] _,_,behavD[test] = deconfound(behavD[test], confounds[test,:], CbetaY, CinterceptY) behav_predD[test] = reconfound(behav_predD[test], confounds[test,:], CbetaY, CinterceptY) behav_meanD[test] = reconfound(behav_meanD[test], confounds[test,:], CbetaY, CinterceptY) # get additional output if return_scores: behav_test = behav[test] if not deconfounding: behav_test_scaled = scaler_y.transform(behav_test) scores.append(model_tuned.score(Xin_test_scaled, behav_test_scaled)) elif deconfounding: scores.append(model_tuned.score(Xin_test_scaled, behav_test)) scores_deconf.append(model_tuned.score(Xin_test_scaled, behavD[test])) if return_models: models.append(model_tuned) if return_hyperparams: hyperparams.append(model_tuned.best_estimator_.alpha) # get correlation coefficient between model-predicted and actual values corr = np.corrcoef(behav_pred, behav)[0,1] if deconfounding: corr_deconf = np.corrcoef(behav_predD, behavD)[0,1] # aggregate results and optional returns results = {} results['behav_pred'] = behav_pred results['corr'] = corr if deconfounding: results['behav_predD'] = behav_predD results['corr_deconf'] = corr_deconf if return_scores: results['scores'] = scores if deconfounding: results['scores_deconf'] = scores_deconf if return_models: results['models'] = models if return_hyperparams: results['hyperparams'] = hyperparams return results
[docs] def classify_phenotype(hmm, Y, behav, indices, predictor='Fisherkernel', estimator='SVM', options=None): """Classify phenotype from HMM This uses either the Fisher kernel (default) or a set of HMM summary metrics to make a classification, in a nested cross-validated way. By default, X is standardised/centered. Estimators so far include: SVM and Logistic Regression Cross-validation strategies so far include: KFold and GroupKFold Hyperparameter optimization strategies so far include: only grid search Parameters: ----------- hmm : HMM object An instance of the HMM class, estimated on the group-level Y : array-like of shape (n_samples, n_variables_2) (group-level) timeseries data behav : array-like of shape (n_sessions,) phenotype, behaviour, or other external labels to be predicted indices : array-like of shape (n_sessions, 2) The start and end indices of each trial/session in the input data. Note that this function does not work if indices=None predictor : char (optional, default to 'Fisherkernel') What to predict from, either 'Fisherkernel' or 'summary_metrics' (default='Fisherkernel') estimator : char (optional, default to 'SVM') Model to be used for classification (default='SVM') This should be the name of a sklearn base estimator (for now either 'SVM' or 'LogisticRegression') options : dict (optional, default to None) general relevant options are: 'CVscheme': char, which CVscheme to use (default: 'GroupKFold' if group structure is specified, otherwise: KFold) 'nfolds': int, number of folds k for (outer and inner) k-fold CV loops 'group_structure': ndarray of (n_sessions, n_sessions), matrix specifying group structure: positive values if sessions(/subjects) are related, zeros otherwise 'return_scores': bool, whether to return also the model scores of each fold 'return_models': bool, whether to return also the trained models of each fold 'return_hyperparams': bool, whether to return also the optimised hyperparameters of each fold possible hyperparameters for model, e.g. 'alpha' for (kernel) ridge regression 'return_prob': bool, whether to return also the estimated probabilities for Fisher kernel, relevant options are: 'shape': char, either 'linear' or 'Gaussian' (TO DO) 'incl_Pi': bool, whether to include the gradient w.r.t. the initial state probabilities when computing the Fisher kernel 'incl_P': bool, whether to include the gradient w.r.t. the transition probabilities 'incl_Mu': bool, whether to include the gradient w.r.t. the state means (note that this only works if means were not set to 0 when training HMM) 'incl_Sigma': bool, whether to include the gradient w.r.t. the state covariances for summary metrics, relevant options are: 'metrics': list of char, containing metrics to be included as features Returns: -------- results : dict containing 'behav_pred': predicted labels on test sets 'acc': overall accuracy (if requested): 'behav_prob': predicted probabilities of each class on test set 'scores': the model scores of each fold 'models': the trained models from each fold 'hyperparams': the optimised hyperparameters of each fold Raises: ------- Exception If the hmm has not been trained or if necessary input is missing Notes: ------ If behav contains NaNs, these subjects/sessions will be removed in Y and confounds """ # check conditions if not hmm.trained: raise Exception("The model has not yet been trained") if behav is None: raise Exception("Phenotype to be predicted needs to be provided") elif behav.ndim>1 or np.unique(behav).shape[0]>2: behav = preprocessing.LabelBinarizer().fit_transform(behav) if indices is None: raise Exception("To predict phenotype from HMM, indices need to be provided") # check options or set default: if options is None: options = {} # necessary options for Fisher kernel: if predictor=='Fisherkernel': if not 'shape' in options: shape='linear' else: shape=options['shape'] if 'incl_Pi' in options: incl_Pi = options['incl_Pi'] else: incl_Pi = True if 'incl_P' in options: incl_P = options['incl_P'] else: incl_P = True if 'incl_Mu' in options: incl_Mu = options['incl_Mu'] else: incl_Mu = False if 'incl_Sigma' in options: incl_Sigma = options['incl_Sigma'] else: incl_Sigma = True estimator='SVM' # necessary options for summary metrics if predictor=='summary_metrics': if not 'metrics' in options: metrics = ['FO', 'switching_rate', 'lifetimes'] else: metrics = options['metrics'] estimator='LogisticRegression' # other necessary options if not 'CVscheme' in options: CVscheme = 'KFold' else: CVscheme = options['CVscheme'] if not 'nfolds' in options: nfolds = 5 else: nfolds = options['nfolds'] if not 'group_structure' in options: do_groupKFold = False allcs = None else: do_groupKFold = True allcs = options['group_structure'] CVscheme = 'GroupKFold' if 'confounds' in options: raise Exception("Deconfounding is not implemented for classification, use prediction instead or remove confounds") # find and remove missing values ind = ~np.isnan(behav) behav = behav[ind] indices = indices[ind,:] # to remove subjects' timeseries whose behavioural variable is missing # N = indices.shape[0] # number of samples N = sum(ind == True) # get features/kernel from HMM to be predicted from (default: Fisher kernel): if predictor=='Fisherkernel': if shape=='linear': tau=None elif shape=='Gaussian': tau=options['tau'] Xin = hmm_kernel(hmm, Y, indices, type='Fisher', shape=shape, incl_Pi=incl_Pi, incl_P=incl_P, incl_Mu=incl_Mu, incl_Sigma=incl_Sigma, tau=tau, return_feat=False, return_dist=False) # alternative: predict from HMM summary metrics elif predictor=='summary_metrics': Xin = get_summ_features(hmm, Y, indices, metrics) # create CV folds if do_groupKFold: # when using family/group structure - use GroupKFold cs = get_groups(allcs) cs = cs[ind] # remove missing values cvfolds = model_selection.GroupKFold(n_splits=nfolds) cvfolds.get_n_splits(Y, behav, cs) elif CVscheme=='KFold': # when not using family/group structure cvfolds = model_selection.KFold(n_splits=nfolds) cvfolds.get_n_splits(Y, behav) # create empty return structures behav_pred = np.zeros(shape=N) # optional return: if 'return_scores' in options and options['return_scores']==True: scores = list() return_scores = True else: return_scores = False if 'return_models'in options and options['return_models']==True: models = list() return_models = True else: return_models = False if 'return_hyperparams' in options and options['return_hyperparams']==True: hyperparams = list() return_hyperparams = True else: return_hyperparams = False if 'return_prob' in options and options['return_prob']==True: return_prob = True behav_prob = np.zeros(shape=(N,2)) else: return_prob = False # main classification: # SVM (default for Fisher kernel): if estimator=='SVM': if not 'C' in options: Cs = np.logspace(-10, 0, 10) else: Cs = options['C'] model = svm.SVC(kernel="precomputed") if return_prob: model = svm.SVC(kernel="precomputed", probability=True) if do_groupKFold: for train, test in cvfolds.split(Xin, behav, groups=cs): behav_train = behav[train] # train model and make predictions: Xin_train = Xin[train, train.reshape(-1,1)] center_K = preprocessing.KernelCenterer().fit(Xin_train) Xin_train_scaled = center_K.transform(Xin_train) model_tuned = model_selection.GridSearchCV(estimator=model, param_grid=dict(C=Cs), cv=cvfolds) model_tuned.fit(Xin_train_scaled, behav_train, groups=cs[train]) Xin_test = Xin[train, test.reshape(-1,1)] Xin_test_scaled = center_K.transform(Xin_test) behav_pred[test] = model_tuned.predict(Xin_test_scaled) if return_prob: behav_prob[test,:] = model_tuned.predict_proba(Xin_test_scaled) # get additional output if return_scores: scores.append(model_tuned.score(Xin_test_scaled, behav[test])) if return_models: models.append(model_tuned) if return_hyperparams: hyperparams.append(model_tuned.best_estimator_.C) else: # KFold CV not accounting for family structure for train, test in cvfolds.split(Xin, behav): behav_train = behav[train] # train model and make predictions: Xin_train = Xin[train, train.reshape(-1,1)] center_K = preprocessing.KernelCenterer().fit(Xin_train) Xin_train_scaled = center_K.transform(Xin_train) model_tuned = model_selection.GridSearchCV(estimator=model, param_grid=dict(C=Cs), cv=cvfolds) model_tuned.fit(Xin_train_scaled, behav_train) Xin_test = Xin[train, test.reshape(-1,1)] Xin_test_scaled = center_K.transform(Xin_test) behav_pred[test] = model_tuned.predict(Xin_test_scaled) if return_prob: behav_prob[test,:] = model_tuned.predict_proba(Xin_test_scaled) # get additional output if return_scores: scores.append(model_tuned.score(Xin_test_scaled, behav[test])) if return_models: models.append(model_tuned) if return_hyperparams: hyperparams.append(model_tuned.best_estimator_.C) # Logistic Regression (default for summary metrics): elif estimator=='LogisticRegression': if not 'C' in options: Cs = np.logspace(-10, 0, 10) else: Cs = options['C'] model = linear_model.LogisticRegression() if do_groupKFold: for train, test in cvfolds.split(Xin, behav, groups=cs): behav_train = behav[train] # train model and make predictions: Xin_train = Xin[train,:] scaler_x = preprocessing.StandardScaler().fit(Xin_train) Xin_train_scaled = scaler_x.transform(Xin_train) model_tuned = model_selection.GridSearchCV(estimator=model, param_grid=dict(C=Cs), cv=cvfolds) model_tuned.fit(Xin_train_scaled, behav_train, groups=cs[train]) Xin_test = Xin[test,:] Xin_test_scaled = scaler_x.transform(Xin_test) behav_pred[test] = model_tuned.predict(Xin_test_scaled) if return_prob: behav_prob[test,:] = model_tuned.predict_proba(Xin_test_scaled) # get additional output if return_scores: scores.append(model_tuned.score(Xin_test_scaled, behav[test])) if return_models: models.append(model_tuned) if return_hyperparams: hyperparams.append(model_tuned.best_estimator_.C) else: # KFold CV not using family structure for train, test in cvfolds.split(Xin, behav): behav_train = behav[train] # train model and make predictions: Xin_train = Xin[train,:] scaler_x = preprocessing.StandardScaler().fit(Xin_train) Xin_train_scaled = scaler_x.transform(Xin_train) model_tuned = model_selection.GridSearchCV(estimator=model, param_grid=dict(C=Cs), cv=cvfolds) model_tuned.fit(Xin_train_scaled, behav_train) Xin_test = Xin[test,:] Xin_test_scaled = scaler_x.transform(Xin_test) behav_pred[test] = model_tuned.predict(Xin_test_scaled) if return_prob: behav_prob[test,:] = model_tuned.predict_proba(Xin_test_scaled) # get additional output if return_scores: scores.append(model_tuned.score(Xin_test_scaled, behav[test])) if return_models: models.append(model_tuned) if return_hyperparams: hyperparams.append(model_tuned.best_estimator_.C) # get overall accuracy of model-predicted classes acc = ms.accuracy_score(behav_pred, behav) # aggregate results and optional returns results = {} results['behav_pred'] = behav_pred if return_prob: results['behav_prob'] = behav_prob results['acc'] = acc if return_scores: results['scores'] = scores if return_models: results['models'] = models if return_hyperparams: results['hyperparams'] = hyperparams return results
[docs] def train_pred(hmm, Y, behav, indices, predictor='Fisherkernel', estimator='KernelRidge', options=None): """Train prediction model from HMM This uses either the Fisher kernel (default) or a set of HMM summary metrics to predict a phenotype, in a nested cross-validated way. By default, X and Y are standardised/centered unless deconfounding is used. Note that all outputs except behavD, i.e. model and scalers, need to be passed on to test_pred to ensure that training and test variables are preprocessed in the same way, while avoiding leakage between training and test set. Estimators so far include: Kernel Ridge Regression and Ridge Regression Cross-validation strategies so far include: KFold and GroupKFold Hyperparameter optimization strategies so far include: grid search, no hyperparameter optimisation Parameters: ----------- hmm : HMM object An instance of the HMM class, estimated on the group-level Y : array-like of shape (n_samples, n_variables_2) (group-level) timeseries data of training set behav : array-like of shape (n_train_sessions,) phenotype, behaviour, or other external variable of training set indices : array-like of shape (n_train_sessions, 2) The start and end indices of each trial/session in the training data. Note that this function does not work if indices=None predictor : char (optional, default to 'Fisherkernel') What to predict from, either 'Fisherkernel' or 'summary_metrics' (default='Fisherkernel') estimator : char (optional, default to 'KernelRidge') Model to be used for prediction (default='KernelRidge') This should be the name of a sklearn base estimator (for now either 'KernelRidge' or 'Ridge') options : dict (optional, default to None) general relevant options are: 'optim_hyperparam' : char, which hyperparameter optimisation strategy to use (default: 'GridSearchCV'). If you don't want to use hyperparameter optimisation, set this to None and specify the hyperparameter (alpha) as an option When using hyperparameter optimisation, additional relevant options are: 'CVscheme': char, which CVscheme to use (default: 'GroupKFold' if group structure is specified, otherwise: KFold) 'nfolds': int, number of folds k for (outer and inner) k-fold CV loops 'group_structure': ndarray of (n_train_sessions, n_train_sessions), matrix specifying group structure: positive values if samples are related, zeros otherwise 'confounds': array-like of shape (n_train_sessions,) or (n_train_sessions, n_confounds) containing confounding variables possible hyperparameters for model, e.g. 'alpha' for (kernel) ridge regression for Fisher kernel, relevant options are: 'shape': char, either 'linear' or 'Gaussian' (TO DO) 'incl_Pi': bool, whether to include the gradient w.r.t. the initial state probabilities when computing the Fisher kernel 'incl_P': bool, whether to include the gradient w.r.t. the transition probabilities 'incl_Mu': bool, whether to include the gradient w.r.t. the state means (note that this only works if means were not set to 0 when training HMM) 'incl_Sigma': bool, whether to include the gradient w.r.t. the state covariances for summary metrics, relevant options are: 'metrics': list of char, containing metrics to be included as features Returns: -------- model_tuned : estimator the trained and (if applicable) hyperparameter-optimised scikit-learn estimator scaler_x : estimator the trained standard scaler/kernel centerer of the features/kernel x (if not using deconfounding): scaler_y : estimator the trained standard scaler of the variable to be predicted y. (if using deconfounding): CinterceptY : float the estimated intercept for deconfounding CbetaY : array-like of shape (n_confounds) the estimated beta weights for deconfounding of each confound behavD : array-like of shape (n_train_sessions) the phenotype/behaviour in deconfounded space Raises: ------- Exception If the hmm has not been trained or if necessary input is missing Notes: ------ If behav contains NaNs, these subjects/sessions will be removed in Y and confounds """ # check conditions if not hmm.trained: raise Exception("The model has not yet been trained") if behav is None: raise Exception("Phenotype to be predicted needs to be provided") if indices is None: raise Exception("To predict phenotype from HMM, indices need to be provided") # check options or set default: if options is None: options = {} # necessary options for Fisher kernel: if predictor=='Fisherkernel': if not 'shape' in options: shape='linear' else: shape=options['shape'] if 'incl_Pi' in options: incl_Pi = options['incl_Pi'] else: incl_Pi = True if 'incl_P' in options: incl_P = options['incl_P'] else: incl_P = True if 'incl_Mu' in options: incl_Mu = options['incl_Mu'] else: incl_Mu = False if 'incl_Sigma' in options: incl_Sigma = options['incl_Sigma'] else: incl_Sigma = True estimator='KernelRidge' # necessary options for summary metrics if predictor=='summary_metrics': if not 'metrics' in options: metrics = ['FO', 'switching_rate', 'lifetimes'] else: metrics = options['metrics'] estimator='Ridge' # other necessary options if not 'optim_hyperparam' in options: optim_hyperparam = 'GridSearchCV' else: optim_hyperparam = options['optim_hyperparam'] if optim_hyperparam == 'GridSearchCV': if not 'CVscheme' in options: CVscheme = 'KFold' else: CVscheme = options['CVscheme'] if not 'nfolds' in options: nfolds = 5 else: nfolds = options['nfolds'] if not 'group_structure' in options: do_groupKFold = False allcs = None else: do_groupKFold = True allcs = options['group_structure'] CVscheme = 'GroupKFold' elif optim_hyperparam is None: do_groupKFold = False CVscheme = None if not 'confounds' in options: confounds = None deconfounding = False else: confounds = options['confounds'] if confounds.ndim==1: confounds = confounds.reshape((-1,1)) deconfounding = True # find and remove missing values ind = ~np.isnan(behav) behav = behav[ind] indices = indices[ind,:] # to remove subjects' timeseries whose behavioural variable is missing if deconfounding: confounds = confounds[ind,:] # N = indices.shape[0] # number of samples N = sum(ind == True) # get features/kernel from HMM to be predicted from (default: Fisher kernel): if predictor=='Fisherkernel': if shape=='linear': tau=None elif shape=='Gaussian': tau=options['tau'] Xin = hmm_kernel(hmm, Y, indices, type='Fisher', shape=shape, incl_Pi=incl_Pi, incl_P=incl_P, incl_Mu=incl_Mu, incl_Sigma=incl_Sigma, tau=tau, return_feat=False, return_dist=False) # alternative: predict from HMM summary metrics elif predictor=='summary_metrics': Xin = get_summ_features(hmm, Y, indices, metrics) behav_mean = np.zeros(shape=N) if deconfounding: behavD = np.zeros(shape=N) behav_meanD = np.zeros(shape=N) if do_groupKFold: # when using family/group structure - use GroupKFold cs = get_groups(allcs) cs = cs[ind] # remove missing values cvfolds = model_selection.GroupKFold(n_splits=nfolds) cvfolds.get_n_splits(Y, behav, cs) elif CVscheme=='KFold': # when not using family/group structure cvfolds = model_selection.KFold(n_splits=nfolds) cvfolds.get_n_splits(Y, behav) # train estimator: if not 'alpha' in options: alphas = np.logspace(-4, -1, 6) else: alphas = options['alpha'] # KRR (default for Fisher kernel): if estimator=='KernelRidge': model = kernel_ridge.KernelRidge(kernel="precomputed") center_K = preprocessing.KernelCenterer().fit(Xin) Xin_scaled = center_K.transform(Xin) scaler_x = center_K # RR (default for summary metrics): elif estimator=='Ridge': model = linear_model.Ridge() scaler_x = preprocessing.StandardScaler().fit(Xin) Xin_scaled = scaler_x.transform(Xin) behav_train = behav behav_train_scaled = np.zeros(shape=behav_train.shape) if not deconfounding: scaler_y = preprocessing.StandardScaler().fit(behav_train.reshape(-1,1)) behav_train_scaled = scaler_y.transform(behav_train.reshape(-1,1)) behav_train_scaled = behav_train_scaled.reshape(-1,1) # behav_mean = np.mean(behav_train) # deconfounding: elif deconfounding: confounds_train = confounds CbetaY, CinterceptY, behavD = deconfound(behav_train, confounds_train) behav_train_scaled = behavD # train model: if optim_hyperparam == 'GridSearchCV': model_tuned = model_selection.GridSearchCV(estimator=model, param_grid=dict(alpha=alphas), cv=cvfolds) else: model.set_params(alpha=alphas) model_tuned = model if do_groupKFold: model_tuned.fit(Xin_scaled, behav_train_scaled, groups=cs) else: model_tuned.fit(Xin_scaled, behav_train_scaled) if deconfounding: return model_tuned, scaler_x, CinterceptY, CbetaY, behavD else: return model_tuned, scaler_x, scaler_y
[docs] def test_pred(hmm, Y, indices, model_tuned, scaler_x, scaler_y=None, behav=None, train_indices=None, CinterceptY=None, CbetaY=None, predictor='Fisherkernel', estimator='KernelRidge', options=None): """ Test prediction model from HMM This uses either the Fisher kernel (default) or a set of HMM summary metrics to predict a phenotype, in a nested cross-validated way. The specified predictor and estimator must be the same as the ones used to train the model. By default, X and Y are standardised/centered unless deconfounding is used. Note: When using a kernel method (e.g. Fisher kernel), Y must be the timeseries of both training and test set to construct the correct kernel, and indices of the training sessions (train_indices) must be provided. When using summary metrics, Y must be the timeseries of only the test set, and train_indices should be None. When using deconfounding, CinterceptY and CbetaY need to be specified Parameters: ----------- hmm : HMM object An instance of the HMM class, estimated on the group-level Y : array-like of shape (n_samples, n_variables_2) (group-level) timeseries data of test set indices : array-like of shape (n_test_sessions, 2) or (n_sessions, 2) The start and end indices of each trial/session in the test data (when using features) or in the train and test data (when using kernel). Note that this function does not work if indices=None model_tuned : estimator the trained and (if applicable) hyperparameter-optimised scikit-learn estimator scaler_x : estimator the trained standard scaler/kernel centerer of the features/kernel x scaler_y : estimator (optional, only specify when not using deconfounding) the trained standard scaler of the variable to be predicted y. behav : array-like of shape (n_test_sessions,) (optional) phenotype, behaviour, or other external variable of test set, to be compared with the predicted values train_indices : array-like of shape (n_train_sessions,) (optional, only use when using kernel) the indices of the sessions/subjects used for training. The function assumes that test indices are all other sessions. CinterceptY : float (optional, only specify when using deconfounding) the estimated intercept for deconfounding CbetaY : array-like of shape (n_confounds) (optional, only specify when using deconfounding) the estimated beta weights for deconfounding of each confound predictor : char (optional, default to 'Fisherkernel') What to predict from, either 'Fisherkernel' or 'summary_metrics' (default='Fisherkernel') estimator : char (optional, default to 'KernelRidge') Model to be used for prediction (default='KernelRidge') This should be the name of a sklearn base estimator (for now either 'KernelRidge' or 'Ridge') options : dict (optional, default to None) general relevant options are: 'confounds': array-like of shape (n_test_sessions,) or (n_test_sessions, n_confounds) containing confounding variables 'return_models': whether to return also the model for Fisher kernel, relevant options are: 'shape': char, either 'linear' or 'Gaussian' (TO DO) 'incl_Pi': bool, whether to include the gradient w.r.t. the initial state probabilities when computing the Fisher kernel 'incl_P': bool, whether to include the gradient w.r.t. the transition probabilities 'incl_Mu': bool, whether to include the gradient w.r.t. the state means (note that this only works if means were not set to 0 when training HMM) 'incl_Sigma': bool, whether to include the gradient w.r.t. the state covariances for summary metrics, relevant options are: 'metrics': list of char, containing metrics to be included as features Returns: -------- results : dict containing 'behav_pred': predicted phenotype on test sets (if behav was specified): 'corr': correlation coefficient between predicted and actual values 'scores': the model scores of each fold (if requested): 'model': the trained model Raises: ------- Exception If the hmm has not been trained or if necessary input is missing """ # check conditions if not hmm.trained: raise Exception("The model has not yet been trained") if indices is None: raise Exception("To predict phenotype from HMM, indices need to be provided") if behav is not None: return_corr = True return_scores = True else: return_corr = False return_scores = False # check options or set default: if options is None: options = {} # necessary options for Fisher kernel: if predictor=='Fisherkernel': if not 'shape' in options: shape='linear' else: shape=options['shape'] if 'incl_Pi' in options: incl_Pi = options['incl_Pi'] else: incl_Pi = True if 'incl_P' in options: incl_P = options['incl_P'] else: incl_P = True if 'incl_Mu' in options: incl_Mu = options['incl_Mu'] else: incl_Mu = False if 'incl_Sigma' in options: incl_Sigma = options['incl_Sigma'] else: incl_Sigma = True estimator='KernelRidge' # necessary options for summary metrics if predictor=='summary_metrics': if not 'metrics' in options: metrics = ['FO', 'switching_rate', 'lifetimes'] else: metrics = options['metrics'] estimator='Ridge' if not 'confounds' in options: confounds = None deconfounding = False else: confounds = options['confounds'] if confounds.ndim==1: confounds = confounds.reshape((-1,1)) deconfounding = True N = indices.shape[0] # number of samples ses_vec = np.arange(N) if train_indices is not None: test_indices = np.delete(ses_vec, train_indices) # get features/kernel from HMM to be predicted from (default: Fisher kernel): if predictor=='Fisherkernel': if shape=='linear': tau=None elif shape=='Gaussian': tau=options['tau'] Xin = hmm_kernel(hmm, Y, indices, type='Fisher', shape=shape, incl_Pi=incl_Pi, incl_P=incl_P, incl_Mu=incl_Mu, incl_Sigma=incl_Sigma, tau=tau, return_feat=False, return_dist=False) # alternative: predict from HMM summary metrics elif predictor=='summary_metrics': Xin = get_summ_features(hmm, Y, indices, metrics) if train_indices is not None: behav_pred = np.zeros(shape=test_indices.shape) behav_pred_scaled = np.zeros(shape=test_indices.shape) if deconfounding: if behav is not None: behavD = np.zeros(shape=test_indices.shape) behav_mean = np.mean(behav) behav_predD = np.zeros(shape=test_indices.shape) behav_meanD = np.zeros(shape=test_indices.shape) else: behav_pred = np.zeros(shape=N) behav_pred_scaled = np.zeros(shape=N) if deconfounding: if behav is not None: behavD = np.zeros(shape=N) behav_mean= np.mean(behav) behav_predD = np.zeros(shape=N) behav_meanD = np.zeros(shape=N) # optional return: if 'return_models'in options and options['return_models']==True: return_models = True else: return_models = False if estimator=="KernelRidge": Xin_test = Xin[train_indices, test_indices.reshape(-1,1)] Xin_test_scaled = scaler_x.transform(Xin_test) behav_pred_scaled = model_tuned.predict(Xin_test_scaled) if not deconfounding: behav_pred_tmp = scaler_y.inverse_transform(behav_pred_scaled) behav_pred = behav_pred_tmp.flatten() elif deconfounding: # in deconfounded space behav_pred = behav_pred_scaled behav_predD = behav_pred_scaled behav_predD = reconfound(behav_predD, confounds[test_indices,:], CbetaY, CinterceptY) if behav is not None: behavD = behav[test_indices] #behav_meanD = behav_mean[test_indices] _,_,behavD = deconfound(behavD, confounds[test_indices,:], CbetaY, CinterceptY) #behav_meanD = reconfound(behav_meanD, confounds[test_indices,:], CbetaY, CinterceptY) # get additional output if return_scores: behav_test = behav[test_indices] if not deconfounding: behav_test_scaled_tmp = scaler_y.transform(behav_test.reshape(-1,1)) behav_test_scaled = behav_test_scaled_tmp.flatten() score = model_tuned.score(Xin_test_scaled, behav_test_scaled) if deconfounding: score = model_tuned.score(Xin_test_scaled, behav_test) score_deconf = model_tuned.score(Xin_test_scaled, behavD) elif estimator=="Ridge": Xin_scaled = scaler_x.transform(Xin) behav_pred_scaled = model_tuned.predict(Xin_scaled) if not deconfounding: behav_pred_tmp = scaler_y.inverse_transform(behav_pred_scaled) behav_pred = behav_pred_tmp.flatten() elif deconfounding: # in deconfounded space behav_pred = behav_pred_scaled behav_predD = behav_pred_scaled behav_predD = reconfound(behav_predD, confounds, CbetaY, CinterceptY) if behav is not None: behavD = behav #behav_meanD = behav_mean _,_,behavD = deconfound(behavD, confounds, CbetaY, CinterceptY) #behav_meanD = reconfound(behav_meanD, confounds, CbetaY, CinterceptY) if return_scores: if not deconfounding: behav_scaled_tmp = scaler_y.transform(behav.reshape(-1,1)) behav_scaled = behav_scaled_tmp.flatten() score = model_tuned.score(Xin_scaled, behav_scaled) if deconfounding: score_deconf = model_tuned.score(Xin_scaled, behavD) if return_corr: if train_indices is not None: corr = np.corrcoef(behav_pred, behav[test_indices])[0,1] else: corr = np.corrcoef(behav_pred, behav)[0,1] if deconfounding: if train_indices is not None: corr_deconf = np.corrcoef(behav_predD, behavD)[0,1] else: corr_deconf = np.corrcoef(behav_predD, behavD)[0,1] # aggregate results and optional returns results = {} results['behav_pred'] = behav_pred if return_corr: results['corr'] = corr if deconfounding: results['behav_predD'] = behav_predD if return_corr: results['corr_deconf'] = corr_deconf if return_scores: results['scores'] = score if deconfounding: results['scores_deconf'] = score_deconf if return_models: results['models'] = model_tuned return results
[docs] def train_classif(hmm, Y, behav, indices, predictor='Fisherkernel', estimator='SVM', options=None): """Train classification model from HMM This uses either the Fisher kernel (default) or a set of HMM summary metrics to make a classification, in a nested cross-validated way. By default, X is standardised/centered. Note that all outputs need to be passed on to test_classif to ensure that training and test variables are preprocessed in the same way, while avoiding leakage between training and test set. Estimators so far include: SVM and Logistic Regression Cross-validation strategies so far include: KFold and GroupKFold Hyperparameter optimization strategies so far include: grid search, no hyperparameter optimisation Parameters: ----------- hmm : HMM object An instance of the HMM class, estimated on the group-level Y : array-like of shape (n_samples, n_variables_2) (group-level) timeseries data of training set behav : array-like of shape (n_train_sessions,) phenotype, behaviour, or other external labels of training set to be predicted indices : array-like of shape (n_train_sessions, 2) The start and end indices of each trial/session in the training set. Note that this function does not work if indices=None predictor : char (optional, default to 'Fisherkernel') What to predict from, either 'Fisherkernel' or 'summary_metrics' (default='Fisherkernel') estimator : char (optional, default to 'SVM') Model to be used for classification (default='SVM') This should be the name of a sklearn base estimator (for now either 'SVM' or 'LogisticRegression') options : dict (optional, default to None) general relevant options are: 'optim_hyperparam' : char, which hyperparameter optimisation strategy to use (default: 'GridSearchCV'). If you don't want to use hyperparameter optimisation, set this to None and specify the hyperparameter (alpha) as an option When using hyperparameter optimisation, additional relevant options are: 'CVscheme': char, which CVscheme to use (default: 'GroupKFold' if group structure is specified, otherwise: KFold) 'nfolds': int, number of folds k for (outer and inner) k-fold CV loops 'group_structure': ndarray of (n_train_sessions, n_train_sessions), matrix specifying group structure: positive values if samples are related, zeros otherwise possible hyperparameters for model, e.g. 'C' for SVM 'return_prob': bool, whether to also estimate the probabilities for Fisher kernel, relevant options are: 'shape': char, either 'linear' or 'Gaussian' (TO DO) 'incl_Pi': bool, whether to include the gradient w.r.t. the initial state probabilities when computing the Fisher kernel 'incl_P': bool, whether to include the gradient w.r.t. the transition probabilities 'incl_Mu': bool, whether to include the gradient w.r.t. the state means (note that this only works if means were not set to 0 when training HMM) 'incl_Sigma': bool, whether to include the gradient w.r.t. the state covariances for summary metrics, relevant options are: 'metrics': list of char, containing metrics to be included as features Returns: -------- model_tuned : estimator the trained and (if applicable) hyperparameter-optimised scikit-learn estimator scaler_x : estimator the trained standard scaler/kernel centerer of the features/kernel x Raises: ------- Exception If the hmm has not been trained or if necessary input is missing Notes: ------ If behav contains NaNs, these subjects/sessions will be removed """ # check conditions if not hmm.trained: raise Exception("The model has not yet been trained") if behav is None: raise Exception("Phenotype to be predicted needs to be provided") elif behav.ndim>1 or np.unique(behav).shape[0]>2: behav = preprocessing.LabelBinarizer().fit_transform(behav) if indices is None: raise Exception("To predict phenotype from HMM, indices need to be provided") # check options or set default: if options is None: options = {} # necessary options for Fisher kernel: if predictor=='Fisherkernel': if not 'shape' in options: shape='linear' else: shape=options['shape'] if 'incl_Pi' in options: incl_Pi = options['incl_Pi'] else: incl_Pi = True if 'incl_P' in options: incl_P = options['incl_P'] else: incl_P = True if 'incl_Mu' in options: incl_Mu = options['incl_Mu'] else: incl_Mu = False if 'incl_Sigma' in options: incl_Sigma = options['incl_Sigma'] else: incl_Sigma = True estimator='SVM' # necessary options for summary metrics if predictor=='summary_metrics': if not 'metrics' in options: metrics = ['FO', 'switching_rate', 'lifetimes'] else: metrics = options['metrics'] estimator='LogisticRegression' # other necessary options if not 'optim_hyperparam' in options: optim_hyperparam = 'GridSearchCV' else: optim_hyperparam = options['optim_hyperparam'] if optim_hyperparam == 'GridSearchCV': if not 'CVscheme' in options: CVscheme = 'KFold' else: CVscheme = options['CVscheme'] if not 'nfolds' in options: nfolds = 5 else: nfolds = options['nfolds'] if not 'group_structure' in options: do_groupKFold = False allcs = None else: do_groupKFold = True allcs = options['group_structure'] CVscheme = 'GroupKFold' elif optim_hyperparam is None: do_groupKFold = False CVscheme = None if 'confounds' in options: raise Exception("Deconfounding is not implemented for classification, use prediction instead or remove confounds") # find and remove missing values ind = ~np.isnan(behav) behav = behav[ind] indices = indices[ind,:] # to remove subjects' timeseries whose behavioural variable is missing # N = indices.shape[0] # number of samples N = sum(ind == True) # get features/kernel from HMM to be predicted from (default: Fisher kernel): if predictor=='Fisherkernel': if shape=='linear': tau=None elif shape=='Gaussian': tau=options['tau'] Xin = hmm_kernel(hmm, Y, indices, type='Fisher', shape=shape, incl_Pi=incl_Pi, incl_P=incl_P, incl_Mu=incl_Mu, incl_Sigma=incl_Sigma, tau=tau, return_feat=False, return_dist=False) # alternative: predict from HMM summary metrics elif predictor=='summary_metrics': Xin = get_summ_features(hmm, Y, indices, metrics) if do_groupKFold: # when using family/group structure - use GroupKFold cs = get_groups(allcs) cs = cs[ind] # remove missing values cvfolds = model_selection.GroupKFold(n_splits=nfolds) cvfolds.get_n_splits(Y, behav, cs) elif CVscheme=='KFold': # when not using family/group structure cvfolds = model_selection.KFold(n_splits=nfolds) cvfolds.get_n_splits(Y, behav) # train estimator: if 'return_prob' in options and options['return_prob']==True: return_prob = True else: return_prob = False # main classification: # SVM (default for Fisher kernel): if not 'C' in options: Cs = np.logspace(-10, 0, 10) else: Cs = options['C'] if estimator=='SVM': model = svm.SVC(kernel="precomputed") if return_prob: model = svm.SVC(kernel="precomputed", probability=True) center_K = preprocessing.KernelCenterer().fit(Xin) Xin_scaled = center_K.transform(Xin) scaler_x = center_K elif estimator=='LogisticRegression': model = linear_model.LogisticRegression() scaler_x = preprocessing.StandardScaler().fit(Xin) Xin_scaled = scaler_x.transform(Xin) behav_train = behav # behav_mean = np.mean(behav_train) # train model: if optim_hyperparam == 'GridSearchCV': model_tuned = model_selection.GridSearchCV(estimator=model, param_grid=dict(C=Cs), cv=cvfolds) else: model.set_params(C=Cs) model_tuned = model if do_groupKFold: model_tuned.fit(Xin_scaled, behav_train, groups=cs) else: model_tuned.fit(Xin_scaled, behav_train) return model_tuned, scaler_x
[docs] def test_classif(hmm, Y, indices, model_tuned, scaler_x, behav=None, train_indices=None, predictor='Fisherkernel', estimator='SVM', options=None): """ Test classification model from HMM This uses either the Fisher kernel (default) or a set of HMM summary metrics to make a classification, in a nested cross-validated way. The specified predictor and estimator must be the same as the ones used to train the classifier. By default, X is standardised/centered. Note: When using a kernel method (e.g. Fisher kernel), Y must be the timeseries of both training and test set to construct the correct kernel, and indices of the training sessions (train_indices) must be provided. When using summary metrics, Y must be the timeseries of only the test set, and train_indices should be None. Parameters: ----------- hmm : HMM object An instance of the HMM class, estimated on the group-level Y : array-like of shape (n_samples, n_variables_2) (group-level) timeseries data of test set indices : array-like of shape (n_test_sessions, 2) or (n_sessions, 2) The start and end indices of each trial/session in the test data (when using features) or in the train and test data (when using kernel). Note that this function does not work if indices=None model_tuned : estimator the trained and (if applicable) hyperparameter-optimised scikit-learn estimator scaler_x : estimator the trained standard scaler/kernel centerer of the features/kernel x behav : array-like of shape (n_test_sessions,) (optional) phenotype, behaviour, or other external label of test set, to be compared with the predicted labels train_indices : array-like of shape (n_train_sessions,) (optional, only use when using kernel) the indices of the sessions/subjects used for training. The function assumes that test indices are all other sessions. predictor : char (optional, default to 'Fisherkernel') What to predict from, either 'Fisherkernel' or 'summary_metrics' (default='Fisherkernel') estimator : char (optional, default to 'SVM') Model to be used for classification (default='SVM') This should be the name of a sklearn base estimator (for now either 'SVM' or 'LogisticRegression') options : dict (optional, default to None) general relevant options are: 'return_prob': bool, whether to return also the estimated probabilities 'return_models': whether to return also the model for Fisher kernel, relevant options are: 'shape': char, either 'linear' or 'Gaussian' (TO DO) 'incl_Pi': bool, whether to include the gradient w.r.t. the initial state probabilities when computing the Fisher kernel 'incl_P': bool, whether to include the gradient w.r.t. the transition probabilities 'incl_Mu': bool, whether to include the gradient w.r.t. the state means (note that this only works if means were not set to 0 when training HMM) 'incl_Sigma': bool, whether to include the gradient w.r.t. the state covariances for summary metrics, relevant options are: 'metrics': list of char, containing metrics to be included as features Returns: -------- results : dict containing 'behav_pred': predicted labels on test sets 'acc': overall accuracy (if requested): 'behav_prob': predicted probabilities of each class on test set 'scores': the model scores of each fold 'models': the trained model Raises: ------- Exception If the hmm has not been trained or if necessary input is missing """ # check conditions if not hmm.trained: raise Exception("The model has not yet been trained") if indices is None: raise Exception("To predict phenotype from HMM, indices need to be provided") if behav is not None: return_acc = True return_scores = True if behav.ndim>1 or np.unique(behav).shape[0]>2: behav = preprocessing.LabelBinarizer().fit_transform(behav) else: return_acc = False return_scores = False # check options or set default: if options is None: options = {} # necessary options for Fisher kernel: if predictor=='Fisherkernel': if not 'shape' in options: shape='linear' else: shape=options['shape'] if 'incl_Pi' in options: incl_Pi = options['incl_Pi'] else: incl_Pi = True if 'incl_P' in options: incl_P = options['incl_P'] else: incl_P = True if 'incl_Mu' in options: incl_Mu = options['incl_Mu'] else: incl_Mu = False if 'incl_Sigma' in options: incl_Sigma = options['incl_Sigma'] else: incl_Sigma = True estimator='SVM' # necessary options for summary metrics if predictor=='summary_metrics': if not 'metrics' in options: metrics = ['FO', 'switching_rate', 'lifetimes'] else: metrics = options['metrics'] estimator='LogisticRegression' if 'confounds' in options: raise Exception("Deconfounding is not implemented for classification, use prediction instead or remove confounds") N = indices.shape[0] # number of samples ses_vec = np.arange(N) if train_indices is not None: test_indices = np.delete(ses_vec, train_indices) if 'return_prob' in options and options['return_prob']==True: return_prob = True else: return_prob = False # get features/kernel from HMM to be predicted from (default: Fisher kernel): if predictor=='Fisherkernel': if shape=='linear': tau=None elif shape=='Gaussian': tau=options['tau'] Xin = hmm_kernel(hmm, Y, indices, type='Fisher', shape=shape, incl_Pi=incl_Pi, incl_P=incl_P, incl_Mu=incl_Mu, incl_Sigma=incl_Sigma, tau=tau, return_feat=False, return_dist=False) # alternative: predict from HMM summary metrics elif predictor=='summary_metrics': Xin = get_summ_features(hmm, Y, indices, metrics) if train_indices is not None: behav_pred = np.zeros(shape=test_indices.shape) if return_prob: behav_prob = np.zeros(shape=(test_indices.shape[0],2)) else: behav_pred = np.zeros(shape=N) if return_prob: behav_prob = np.zeros(shape=(N,2)) # optional return: if 'return_models'in options and options['return_models']==True: return_models = True else: return_models = False if estimator=="SVM": Xin_test = Xin[train_indices, test_indices.reshape(-1,1)] Xin_test_scaled = scaler_x.transform(Xin_test) behav_pred = model_tuned.predict(Xin_test_scaled) if return_prob: behav_prob = model_tuned.predict_proba(Xin_test_scaled) # get additional output if return_scores: score = model_tuned.score(Xin_test_scaled, behav[test_indices]) elif estimator=="LogisticRegression": Xin_scaled = scaler_x.transform(Xin) behav_pred = model_tuned.predict(Xin_scaled) if return_prob: behav_prob = model_tuned.predict_proba(Xin_scaled) if return_scores: score = model_tuned.score(Xin_scaled, behav) if return_acc: if train_indices is not None: acc = ms.accuracy_score(behav_pred, behav[test_indices]) else: acc = ms.accuracy_score(behav_pred, behav) # aggregate results and optional returns results = {} results['behav_pred'] = behav_pred if return_acc: results['acc'] = acc if return_scores: results['scores'] = score if return_prob: results['behav_prob'] = behav_prob if return_models: results['models'] = model_tuned return results
# TO DO: # add option to provide features/kernel so it does not have to be recomputed when predicting several traits # add/test multiclass classifier # add betas (gradient, prediction) # option for deconfounding X # add options for different hyperparameter optimisation # fix Gaussian Fisher kernel to do proper optimisation for tau # deconfounding for classifier