Source code for glhmm.glhmm

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Gaussian Linear Hidden Markov Model
@author: Diego Vidaurre 2023
"""

import numpy as np
import math
import scipy
import scipy.special
import scipy.spatial
import sys
import warnings
import copy
import time

### Import cupy for GPU acceleration if available, otherwise pass.
try:
    import cupy as cp
except:
    pass

# import auxiliary
# import io
# import utils

from . import auxiliary
from . import io
from . import utils


[docs] class glhmm(): """ Gaussian Linear Hidden Markov Model class to decode stimulus from data. Attributes: ----------- K : int, default=10 number of states in the model. covtype : str, {'shareddiag', 'diag','sharedfull','full'}, default 'shareddiag' Type of covariance matrix. Choose 'shareddiag' to have one diagonal covariance matrix for all states, or 'diag' to have a diagonal full covariance matrix for each state, or 'sharedfull' to have a shared full covariance matrix for all states, or 'full' to have a full covariance matrix for each state. model_mean : str, {'state', 'shared', 'no'}, default 'state' Model for the mean. If 'state', the mean will be modelled state-dependent. If 'shared', the mean will be modelled globally (shared between all states). If 'no' the mean of the timeseries will not be used to drive the states. model_beta : str, {'state', 'shared', 'no'}, default 'state' Model for the beta. If 'state', the regression coefficients will be modelled state-dependent. If 'shared', the regression coefficients will be modelled globally (shared between all states). If 'no' the regression coefficients will not be used to drive the states. dirichlet_diag : float, default=10 The value of the diagonal of the Dirichlet distribution for the transition probabilities. The higher the value, the more persistent the states will be. Note that this value is relative; the prior competes with the data, so if the timeseries is very long, the `dirichlet_diag` may have little effect unless it is set to a very large value. connectivity : array_like of shape (n_states, n_states), optional Matrix of binary values defining the connectivity of the states. This parameter can only be used with a diagonal covariance matrix (i.e., `covtype='diag'`). Pstructure : array_like, optional Binary matrix defining the allowed transitions between states. The default is a (n_states, n_states) matrix of all ones, allowing all possible transitions between states. Pistructure : array_like, optional Binary vector defining the allowed initial states. The default is a (n_states,) vector of all ones, allowing all states to be used as initial states. Notes: ------ This class requires the following modules: numpy, math, scipy, sys, warnings, copy, and time. """ ### Private methods def __init__(self, K=10, # model options covtype='shareddiag', model_mean='state', model_beta='state', dirichlet_diag=10, connectivity=None, Pstructure=None, Pistructure=None, ): if (connectivity is not None) and not ((covtype == 'shareddiag') or (covtype == 'diag')): warnings.warn('Parameter connectivity can only be used with a diagonal covariance matrix') connectivity = None self.hyperparameters = {} self.hyperparameters["K"] = K self.hyperparameters["covtype"] = covtype self.hyperparameters["model_mean"] = model_mean self.hyperparameters["model_beta"] = model_beta self.hyperparameters["dirichlet_diag"] = dirichlet_diag self.hyperparameters["connectivity"] = connectivity if Pstructure is None: self.hyperparameters["Pstructure"] = np.ones((K,K), dtype=bool) else: self.hyperparameters["Pstructure"] = Pstructure if Pistructure is None: self.hyperparameters["Pistructure"] = np.ones((K,), dtype=bool) else: self.hyperparameters["Pistructure"] = Pistructure self.beta = None self.mean = None self.alpha_beta = None self.alpha_mean = None self.Sigma = None self.P = None self.Pi = None self.active_states = np.ones(K,dtype=bool) self.trained = False ## Private methods def __forward_backward(self,L,indices,indices_individual,serial,gpu_acceleration): """ Calculate state time courses for a collection of segments """ ind = indices if len(ind.shape) == 1: ind = np.expand_dims(indices,axis=0) if serial: T,K = L.shape N = ind.shape[0] indices_Xi = auxiliary.Gamma_indices_to_Xi_indices(ind) Gamma = np.zeros((T,K)) Xi = np.zeros((T-N,K,K)) scale = np.zeros((T)) for j in range(N): tt = range(ind[j,0],ind[j,1]) tt_xi = range(indices_Xi[j,0],indices_Xi[j,1]) a,b,sc = auxiliary.compute_alpha_beta_serial(L[tt,:],self.Pi,self.P) scale[tt] = sc Gamma[tt,:] = b * a Xi[tt_xi,:,:] = np.matmul( np.expand_dims(a[0:-1,:],axis=2), \ np.expand_dims((b[1:,:] * L[tt[1:],:]),axis=1)) * self.P # repeat if a Nan is produced, scaling the loglikelood if np.any(np.isinf(Gamma)) or np.any(np.isinf(Xi)): LL = np.log(L[tt,:]) t = np.all(LL<0,axis=1) LL[t,:] = LL[t,:] - np.expand_dims(np.max(LL[t,:],axis=1), axis=1) a,b,_ = auxiliary.compute_alpha_beta_serial(np.exp(LL),self.Pi,self.P) Gamma[tt,:] = b * a Xi[tt_xi,:,:] = np.matmul( np.expand_dims(a[0:-1,:],axis=2), \ np.expand_dims((b[1:,:] * L[tt[1:],:]),axis=1)) * self.P Gamma[tt,:] = Gamma[tt,:] / np.expand_dims(np.sum(Gamma[tt,:],axis=1), axis=1) Xi[tt_xi,:,:] = Xi[tt_xi,:,:] / np.expand_dims(np.sum(Xi[tt_xi,:,:],axis=(1,2)),axis=(1,2)) else: T,N,K = L.shape indices_Xi = auxiliary.Gamma_indices_to_Xi_indices(ind) xp = cp if gpu_acceleration == 2 else np L = xp.asarray(L) P = xp.asarray(self.P) a,b,sc = auxiliary.compute_alpha_beta_parallel(L,self.Pi,P,indices_individual,gpu_acceleration) ## convert scale to a vector for output. scale = np.zeros(max(ind[:,1])) for j in range(N): tt = range(ind[j,0],ind[j,1]) tt_ind = range(indices_individual[j,0],indices_individual[j,1]) scale[tt] = sc[tt_ind,j] if gpu_acceleration == 2: Gamma_ = cp.asnumpy(b * a) Xi_ = cp.asnumpy(cp.einsum('ijk,ijl,ijl,kl->ijkl',a[0:-1,:,:],b[1:,:,:],L[1:,:,:],P)) else: Gamma_ = b * a Xi_ = np.einsum('ijk,ijl,ijl,kl->ijkl',a[0:-1,:,:],b[1:,:,:],L[1:,:,:],self.P) has_inf=[] del a, b for jj in range(N): ind_indiv = range(indices_individual[jj,0],indices_individual[jj,1]) Xi_ind_indiv = range(indices_individual[jj,0],indices_individual[jj,1]-1) if np.any(np.isinf(Gamma_[ind_indiv,jj,:])) or np.any(np.isinf(Xi_[Xi_ind_indiv,jj,:])): has_inf.append(jj) if len(has_inf)>0: Lsub = xp.asarray(L[:,has_inf,:]) del L LL = xp.log(Lsub) for jj in range(len(has_inf)): t = xp.all(LL[:,jj,:]<0,axis=1) LL[t,jj,:] = LL[t,jj,:] - xp.expand_dims(xp.max(LL[t,jj,:],axis=1), axis=1) a,b,_ = auxiliary.compute_alpha_beta_parallel(xp.exp(LL),self.Pi,P,indices_individual[has_inf,:],gpu_acceleration) del LL if gpu_acceleration == 2: Gamma_[:,has_inf,:] = cp.asnumpy(b * a) Xi_[:,has_inf,:,:] = cp.asnumpy(cp.einsum('ijk,ijl,ijl,kl->ijkl',a[0:-1,:,:],b[1:,:,:],Lsub[1:,:,:],P)) else: Gamma_[:,has_inf,:] = b * a Xi_[:,has_inf,:,:] = np.einsum('ijk,ijl,ijl,kl->ijkl',a[0:-1,:,:],b[1:,:,:],Lsub[1:,:,:],self.P) del Lsub, b, a else: del L del P ## Restructure data to n_samples * n_states Gamma = np.zeros((max(ind[:,1]),K)) Xi = np.zeros((max(ind[:,1])-N,K,K)) for jj in range(N): tt = range(ind[jj,0],ind[jj,1]) tt_ind = range(indices_individual[jj,0],indices_individual[jj,1]) tt_xi = range(indices_Xi[jj,0],indices_Xi[jj,1]) tt_xi_ind = range(indices_individual[jj,0],indices_individual[jj,1]-1) Gamma[tt,:] = Gamma_[tt_ind,jj,:] / np.expand_dims(np.sum(Gamma_[tt_ind,jj,:],axis=1), axis=1) Xi[tt_xi,:,:] = Xi_[tt_xi_ind,jj,:,:] / np.expand_dims(np.sum(Xi_[tt_xi_ind,jj,:,:],axis=(1,2)),axis=(1,2)) del Gamma_, Xi_ return Gamma,Xi,scale def __forward_backward_vp(self,L,indices,indices_individual,serial=False): """ Calculate viterbi path for a collection of segments """ ind = indices if len(ind.shape) == 1: ind = np.expand_dims(indices,axis=0) if serial: T,K = L.shape N = ind.shape[0] vpath = np.zeros((T,K)) for j in range(N): tt = range(ind[j,0],ind[j,1]) qstar = auxiliary.compute_qstar_serial(L[tt,:],self.Pi,self.P) vpath[tt,:] = qstar else: T,N,K = L.shape vpath = np.zeros((max(ind[:,1]),K)) qstar = auxiliary.compute_qstar_parallel(L,self.Pi,self.P,indices_individual) for j in range(N): tt = range(ind[j,0],ind[j,1]) tt_ind = range(indices_individual[j,0],indices_individual[j,1]) vpath[tt,:] = qstar[tt_ind,j,:] return vpath def __loglikelihood_k(self,X,Y,L,k,cache): T,q = Y.shape if self.hyperparameters["model_beta"] != 'no': p = X.shape[1] else: p = 0 shared_covmat = (self.hyperparameters["covtype"] == 'shareddiag') or \ (self.hyperparameters["covtype"] == 'sharedfull') diagonal_covmat = (self.hyperparameters["covtype"] == 'shareddiag') or \ (self.hyperparameters["covtype"] == 'diag') k_mean,k_beta = k,k if self.hyperparameters["model_mean"] == 'shared': k_mean = 0 if self.hyperparameters["model_beta"] == 'shared': k_beta = 0 constant = - q / 2 * math.log(2*math.pi) #+ q / 2 if (k==0) and shared_covmat: if diagonal_covmat: PsiWish_alphasum = 0.5 * q * scipy.special.psi(self.Sigma[0]['shape']) ldetWishB = 0 for j in range(q): ldetWishB += np.log(self.Sigma[0]['rate'][j]) ldetWishB = - 0.5 * ldetWishB C = self.Sigma[0]['shape'] / self.Sigma[0]['rate'] else: PsiWish_alphasum = 0 for j in range(1,q+1): PsiWish_alphasum += scipy.special.psi(0.5 * (self.Sigma[0]['shape'] + 1 - j)) PsiWish_alphasum = 0.5 * PsiWish_alphasum (s, logdet) = np.linalg.slogdet(self.Sigma[0]['rate']) ldetWishB = - 0.5 * s * logdet C = self.Sigma[0]['shape'] * self.Sigma[0]['irate'] cache["PsiWish_alphasum"] = PsiWish_alphasum cache["ldetWishB"] = ldetWishB cache["C"] = C elif shared_covmat: PsiWish_alphasum = cache["PsiWish_alphasum"] ldetWishB = cache["ldetWishB"] C = cache["C"] elif diagonal_covmat: # not shared_covmat PsiWish_alphasum = 0.5 * q * scipy.special.psi(self.Sigma[k]['shape']) ldetWishB = 0 for j in range(q): ldetWishB += np.log(self.Sigma[k]['rate'][j]) ldetWishB = - 0.5 * ldetWishB C = self.Sigma[k]['shape'] / self.Sigma[k]['rate'] diagonal_covmat = True else: # not shared_covmat, full matrix PsiWish_alphasum = 0 for j in range(1,q+1): PsiWish_alphasum += scipy.special.psi(0.5 * (self.Sigma[k]['shape'] + 1 - j)) PsiWish_alphasum = 0.5 * PsiWish_alphasum (s, logdet) = np.linalg.slogdet(self.Sigma[k]['rate']) ldetWishB = - 0.5 * s * logdet C = self.Sigma[k]['shape'] * self.Sigma[k]['irate'] # distance dist = np.zeros((T,)) d = np.copy(Y) if self.mean is not None: d -= np.expand_dims(self.mean[k_mean]['Mu'],axis=0) if self.beta is not None: d -= (X @ self.beta[k_beta]['Mu']) if diagonal_covmat: Cd = d * C else: Cd = d @ C for j in range(q): dist -= 0.5 * d[:,j] * Cd[:,j] # cov trace for beta norm_wish_trace_W = np.zeros((T,)) if self.beta is not None: if diagonal_covmat: jj = np.arange(p) for j in range(q): if self.hyperparameters["connectivity"] is not None: jj = np.where(self.hyperparameters["connectivity"][:,j]==1)[0] Cb = self.beta[k_beta]['Sigma'][jj,jj[:,np.newaxis],j] norm_wish_trace_W -= 0.5 * C[j] * np.sum(((X[:,jj] @ Cb)) * X[:,jj], axis=1) else: ind = np.arange(p) * q for j1 in range(q): ind1 = ind + j1 tmp = X @ self.beta[k_beta]['Sigma'][ind1,:] for j2 in range(q): ind2 = ind + j2 norm_wish_trace_W -= 0.5 * C[j1,j2] * np.sum(tmp[:,ind2] * X, axis=1) # cov trace for mean norm_wish_trace_mean = np.zeros(T) if self.mean is not None: if diagonal_covmat: for j in range(q): norm_wish_trace_mean -= 0.5 * C[j] * self.mean[k_mean]['Sigma'][j] else: norm_wish_trace_mean = - 0.5 * np.trace(self.mean[k_mean]['Sigma'] @ C) L[:,k] = constant + dist + norm_wish_trace_W + norm_wish_trace_mean + ldetWishB + PsiWish_alphasum @staticmethod def __check_options(options): if options is None: options = {} if not "cyc" in options: options["cyc"] = 100 if not "cyc_to_go_under_th" in options: options["cyc_to_go_under_th"] = 10 if not "initcyc" in options: options["initcyc"] = 10 if not "initrep" in options: options["initrep"] = 5 if not "tol" in options: options["tol"] = 1e-4 if not "threshold_active" in options: options["threshold_active"] = 20 if not "deactivate_states" in options: options["deactivate_states"] = True if not "stochastic" in options: options["stochastic"] = False if not "updateGamma" in options: options["updateGamma"] = True if not "updateDyn" in options: options["updateDyn"] = True if not "updateObs" in options: options["updateObs"] = True if not "verbose" in options: options["verbose"] = True if not "serial" in options: options["serial"] = False if not "gpu_acceleration" in options: options["gpu_acceleration"] = 0 if not "gpuChunks" in options: options["gpuChunks"] = 1 ### Check gpuChunks validity. if options["serial"] and options["gpuChunks"] > 1: print("WARNING: GPU chunking setting is selected for serial computing. This will have no effect on Memory use. If serial computation exceeds Memory limits, use stochastic training. Disabling Memory Saver.") options["gpuChunks"] = 1 ### Check GPU acceletation validity. if options["gpu_acceleration"] > 0 and options["serial"]: print("WARNING: GPU acceleration for serial processing is non-sensical. Disabling GPU acceleration.") options["gpu_acceleration"] = 0 try: cp except: if options["gpu_acceleration"] > 0: ### Throw an error if GPU acceleration is selected but cupy could not be imported. raise ImportError("GPU acceleration selected but cupy not available. Install RAPIDS or disable GPU acceleration.") else: if options["verbose"]: if options["gpu_acceleration"] > 0: print("GPU acceleration enabled.") if options["gpuChunks"] > 1: print("GPU Chunking selected. Running GPU computations in ", options["gpuChunks"], "chunks.") elif options["gpu_acceleration"] == 0 and (not options["serial"]): ### Notify users of GPU aceleration if it is readily available. print("GPU acceleration not selected, but cupy detected. Consider enabling GPU acceleration by setting the \"gpu_acceleration\" option to >=1.") if options["gpuChunks"] > 1: print("Warning: Chunked computations are selected without GPU computing. This likely increases peak memory use and run time with no benefit. To reduce run-time and peak memory use, run stochastic training. Training will now run with chunked computations.") return options @staticmethod def __check_options_stochastic(options,files): if options is None: options = {} if not "Nbatch" in options: options["Nbatch"] = int(min(len(files)/2,10)) if not "initNbatch" in options: options["initNbatch"] = options["Nbatch"] if not "cyc" in options: options["cyc"] = 100 if not "initcyc" in options: options["initcyc"] = 25 if not "forget_rate" in options: options["forget_rate"] = 0.75 if not "base_weights" in options: options["base_weights"] = 0.25 if not "min_cyc" in options: options["min_cyc"] = 10 if ("updateGamma" in options) and (not options["updateGamma"]): options["updateGamma"] = True warnings.warn('updateGamma has to be True for stochastic learning') if ("updateDyn" in options) and (not options["updateDyn"]): options["updateDyn"] = True warnings.warn('updateDyn has to be True for stochastic learning') options = glhmm.__check_options(options) return options @staticmethod def __check_Gamma(Gamma): K = Gamma.shape[1] if np.any(np.isnan(Gamma)): raise Exception("NaN were generated in the state time courses, probably due to an artifacts") status = np.all(np.std(Gamma,axis=0)<0.001) #status = (np.max(Gamma)<0.6) and (np.min(Gamma)>(1/K/2)) return status def __init_Gamma(self,X,Y,indices,options): verbose = options["verbose"] options["verbose"] = False if options["initrep"] == 0: self.__init_prior_P_Pi() # init P,Pi priors self.__update_dynamics() # make P,Pi based on priors Gamma = self.sample_Gamma(indices) options["verbose"] = verbose return Gamma fe = np.zeros(options["initrep"]) for r in range(options["initrep"]): hmm_r = copy.deepcopy(self) options_r = copy.deepcopy(options) options_r["cyc"] = options_r["initcyc"] options_r["stochastic"] = False options_r["initrep"] = 0 Gamma_r,_,fe_r = hmm_r.train(X,Y,indices,options=options_r) fe[r] = fe_r[-1] if (r == 0) or (fe[r] < np.min(fe[0:r])): Gamma = np.copy(Gamma_r) best = r if verbose: print("Init repetition " + str(r+1) + " free energy = " + str(fe[r])) if verbose: print("Best repetition: " + str(best+1)) options["verbose"] = verbose return Gamma def __update_Pi(self): K = self.hyperparameters["K"] self.Pi = np.zeros((K,)) PsiSum0 = scipy.special.psi(sum(self.Dir_alpha)) for k in range(K): if self.Dir_alpha[k] == 0: continue self.Pi[k] = math.exp(scipy.special.psi(self.Dir_alpha[k])-PsiSum0) self.Pi = self.Pi / np.sum(self.Pi) def __update_P(self): K = self.hyperparameters["K"] self.P = np.zeros((K,K)) for j in range(K): PsiSum = scipy.special.psi(sum(self.Dir2d_alpha[j,:])) for k in range(K): if self.Dir2d_alpha[j,k] == 0: continue self.P[j,k] = math.exp(scipy.special.psi(self.Dir2d_alpha[j,k])-PsiSum) self.P[j,:] = self.P[j,:] / np.sum(self.P[j,:]) def __Gamma_loglikelihood(self,Gamma,Xi,indices): K = self.hyperparameters["K"] minreal = sys.float_info.min Gamma_0 = Gamma[indices[:,0]] Gamma_0[Gamma_0 < minreal] = minreal PsiDir_alphasum = scipy.special.psi(sum(self.Dir_alpha)) L = 0 for k in range(K): L += np.sum(Gamma_0[:,k]) * (scipy.special.psi(self.Dir_alpha[k]) - PsiDir_alphasum) PsiDir2d_alphasum = np.zeros(K) for l in range(K): PsiDir2d_alphasum[l] = scipy.special.psi(sum(self.Dir2d_alpha[l,:])) for k in range(K): for l in range(K): L += np.sum(Xi[:,l,k]) * (scipy.special.psi(self.Dir2d_alpha[l,k]) - PsiDir2d_alphasum[l]) return L def __update_priors(self): K = self.hyperparameters["K"] diagonal_covmat = (self.hyperparameters["covtype"] == 'shareddiag') or \ (self.hyperparameters["covtype"] == 'diag') shared_beta = self.hyperparameters["model_beta"] == 'shared' shared_mean = self.hyperparameters["model_mean"] == 'shared' K_mean,K_beta = K,K if shared_mean: K_mean = 1 if shared_beta: K_beta = 1 if self.hyperparameters["model_mean"] != 'no': for k in range(K_mean): if diagonal_covmat: self.alpha_mean[k]["rate"] = self.priors["alpha_mean"]["rate"] \ + 0.5 * self.mean[k]["Sigma"] + self.mean[k]["Mu"] ** 2 else: self.alpha_mean[k]["rate"] = self.priors["alpha_mean"]["rate"] \ + 0.5 * np.diag(self.mean[k]["Sigma"]) + self.mean[k]["Mu"] ** 2 self.alpha_mean[k]["shape"] = self.priors["alpha_mean"]["shape"] + 0.5 if self.hyperparameters["model_beta"] != 'no': p,q = self.beta[0]["Mu"].shape jj = np.arange(p) for k in range(K_beta): self.alpha_beta[k]["rate"] = self.priors["alpha_beta"]["rate"] + 0.5 * self.beta[k]["Mu"] ** 2 if diagonal_covmat: for j in range(q): if self.hyperparameters["connectivity"] is not None: jj = np.where(self.hyperparameters["connectivity"][:,j]==1)[0] sjj = self.beta[k]["Sigma"][jj,jj[:,np.newaxis],j] if (np.squeeze(sjj).shape != ()): sjj = np.squeeze(sjj) self.alpha_beta[k]["rate"][jj,j] += 0.5 * np.diag(sjj) else: self.alpha_beta[k]["rate"] += 0.5 * np.reshape(np.diag(self.beta[k]["Sigma"]),(p,q)) self.alpha_beta[k]["shape"] = self.priors["alpha_beta"]["shape"] + 0.5 def __init_priors(self,X=None,Y=None,files=None): if Y is None: X,Y,_,_ = io.load_files(files,0) p = X.shape[1] if X is not None else None q = Y.shape[1] if files is None: prior_shape,prior_rate = self.__compute_prior_covmat(X,Y) else: prior_shape,prior_rate = self.__compute_prior_covmat(files=files) self.__init_priors_sub(prior_rate,prior_shape,p,q) def __init_priors_sub(self,prior_rate,prior_shape,p,q): K = self.hyperparameters["K"] shared_beta = self.hyperparameters["model_beta"] == 'shared' shared_mean = self.hyperparameters["model_mean"] == 'shared' diagonal_covmat = (self.hyperparameters["covtype"] == 'shareddiag') or \ (self.hyperparameters["covtype"] == 'diag') K_mean,K_beta = K,K if shared_mean: K_mean = 1 if shared_beta: K_beta = 1 # priors for dynamics self.__init_prior_P_Pi() # Covariance matrix, use the range of the global error to set the prior self.priors["Sigma"] = {} self.priors["Sigma"]["shape"] = prior_shape self.priors["Sigma"]["rate"] = prior_rate if diagonal_covmat: self.priors["Sigma"]["irate"] = 1 / self.priors["Sigma"]["rate"] else: self.priors["Sigma"]["irate"] = np.linalg.inv(self.priors["Sigma"]["rate"]) # alpha (state betas and mean priors) if self.hyperparameters["model_mean"] != 'no': self.alpha_mean = [] for k in range(K_mean): self.alpha_mean.append({}) self.alpha_mean[k] = {} if self.hyperparameters["model_beta"] != 'no': self.alpha_beta = [] for k in range(K_beta): self.alpha_beta.append({}) self.alpha_beta[k] = {} if self.hyperparameters["model_mean"] != 'no': self.priors["alpha_mean"] = {} self.priors["alpha_mean"]["rate"] = 0.1 * np.ones(q) self.priors["alpha_mean"]["shape"] = 0.1 if self.hyperparameters["model_beta"] != 'no': self.priors["alpha_beta"] = {} self.priors["alpha_beta"]["rate"] = 0.1 * np.ones((p,q)) self.priors["alpha_beta"]["shape"] = 0.1 def __init_prior_P_Pi(self): K = self.hyperparameters["K"] # priors for dynamics self.priors = {} self.priors["Dir_alpha"] = np.ones(K) #self.priors["Dir_alpha"][self.Pistructure] = 1 self.priors["Dir2d_alpha"] = np.ones((K,K)) for k in range(K): #self.priors["Dir2d_alpha"][self.Pstructure[k,],:] = 1 self.priors["Dir2d_alpha"][k,k] = self.hyperparameters["dirichlet_diag"] def __compute_prior_covmat(self,X=None,Y=None,files=None): diagonal_covmat = (self.hyperparameters["covtype"] == 'shareddiag') or \ (self.hyperparameters["covtype"] == 'diag') if not files is None: # iterative calculation N = len(files) if self.hyperparameters["model_mean"] != 'no': for j in range(N): _,Yj,_,_ = io.load_files(files,j) if j == 0: m = np.sum(Yj,axis=0) nt = Yj.shape[0] else: m += np.sum(Yj,axis=0) nt += Yj.shape[0] m /= nt if self.hyperparameters["model_beta"] != 'no': for j in range(N): Xj,Yj,_,_ = io.load_files(files,j) if j == 0: XX = Xj.T @ Xj XY = Xj.T @ Yj else: XX += Xj.T @ Xj XY += Xj.T @ Yj beta = np.linalg.inv(XX + 0.1 * Xj.shape[1]) @ XY for j in range(N): Xj,Yj,_,_ = io.load_files(files,j) if j == 0: q = Yj.shape[1] if self.hyperparameters["model_mean"] != 'no': Yj -= np.expand_dims(m,axis=0) if self.hyperparameters["model_beta"] != 'no': Yj -= Xj @ beta rj = np.max(Yj,axis=0) - np.min(Yj,axis=0) if j == 0: r = np.copy(rj) else: r = np.maximum(r,rj) else: T,q = Y.shape if self.hyperparameters["model_mean"] != 'no': Yr = Y - np.expand_dims(np.mean(Y,axis=0),axis=0) else: Yr = np.copy(Y) if self.hyperparameters["model_beta"] != 'no': p = X.shape[1] beta = np.linalg.inv(X.T @ X + 0.1 * np.eye(p)) @ (X.T @ Yr) Yr -= X @ beta r = np.max(Yr,axis=0) - np.min(Yr,axis=0) if diagonal_covmat: shape = 0.5 * (q+0.1-1) #shape = 0.5 * T rate = 0.5 * r else: shape = (q+0.1-1) #shape = T rate = np.diag(r) return shape,rate def __update_dynamics(self,Gamma=None,Xi=None,indices=None, Dir_alpha=None,Dir2d_alpha=None,rho=1,init=False): """ Update transition prob matrix and initial probabilities """ Pistructure = self.hyperparameters["Pistructure"] Pstructure = self.hyperparameters["Pstructure"] # Transition probability matrix if (Xi is None) and (Gamma is None) and (Dir2d_alpha is None): self.Dir2d_alpha = self.priors["Dir2d_alpha"] else: if Dir2d_alpha is None: if Xi is None: Xi = auxiliary.approximate_Xi(Gamma,indices) Dir2d_alpha = np.sum(Xi,axis=0) if init: self.Dir2d_alpha = Dir2d_alpha + self.priors["Dir2d_alpha"] else: self.Dir2d_alpha = rho * (Dir2d_alpha + self.priors["Dir2d_alpha"]) \ + (1-rho) * np.copy(self.Dir2d_alpha) self.Dir2d_alpha[~Pstructure] = 0 self.__update_P() # Initial probabilities if (Gamma is None) and (Dir_alpha is None): self.Dir_alpha = self.priors["Dir_alpha"] else: if Dir_alpha is None: Dir_alpha = np.sum(Gamma[indices[:,0]],axis=0) if init: self.Dir_alpha = Dir_alpha + self.priors["Dir_alpha"] else: self.Dir_alpha = rho * (Dir_alpha + self.priors["Dir_alpha"]) \ + (1-rho) * np.copy(self.Dir_alpha) self.Dir_alpha[~Pistructure] = 0 self.__update_Pi() def __init_dynamics(self,Gamma=None,indices=None): """ Initialise transition prob matrix and initial probabilities """ self.__update_dynamics(Gamma,None,indices,init=True) def __update_obsdist(self,X,Y,Gamma,Nfactor=1,rho=1): """ Update state distributions """ K = self.hyperparameters["K"] T,q = Y.shape if self.hyperparameters["model_beta"] != 'no': p = X.shape[1] shared_covmat = (self.hyperparameters["covtype"] == 'shareddiag') or \ (self.hyperparameters["covtype"] == 'sharedfull') diagonal_covmat = (self.hyperparameters["covtype"] == 'shareddiag') or \ (self.hyperparameters["covtype"] == 'diag') shared_beta = self.hyperparameters["model_beta"] == 'shared' shared_mean = self.hyperparameters["model_mean"] == 'shared' K_mean,K_beta = K,K if shared_mean: K_mean = 1 if shared_beta: K_beta = 1 if self.hyperparameters["model_beta"] != 'no': XGX = np.zeros((p,p,K)) for k in range(K): XGX[:,:,k] = (X * np.expand_dims(Gamma[:,k],axis=1)).T @ X XGXb = np.expand_dims(np.sum(XGX,axis=2),axis=2) if shared_beta else XGX Gb = np.ones((T,1)) if shared_beta else Gamma Gm = np.ones((T,1)) if shared_mean else Gamma # Mean if self.hyperparameters["model_mean"] != 'no': if self.hyperparameters["model_beta"] != 'no': Yr = np.copy(Y) for k in range(K_beta): Yr -= (X @ self.beta[k]["Mu"]) * np.expand_dims(Gb[:,k], axis=1) else: Yr = Y for k in range(K_mean): if (not shared_mean) and (not self.active_states[k]): continue k_sigma = 0 if shared_covmat else k GY = np.expand_dims(Gm[:,k],axis=1).T @ Yr Nk = np.sum(Gm[:,k]) if diagonal_covmat: alpha = self.alpha_mean[k]["shape"] / self.alpha_mean[k]["rate"] isigma = self.Sigma[k_sigma]["shape"] / self.Sigma[k_sigma]["rate"] iS = Nfactor * isigma * Nk + alpha S = 1 / iS mu = np.squeeze(Nfactor * isigma * S * GY) self.mean[k]["Sigma"] = rho * S + (1-rho) * np.copy(self.mean[k]["Sigma"]) self.mean[k]["Mu"] = rho * mu + (1-rho) * np.copy(self.mean[k]["Mu"]) else: alpha = np.diag(self.alpha_mean[k]["shape"] / self.alpha_mean[k]["rate"]) isigma = (self.Sigma[k_sigma]["shape"] * self.Sigma[k_sigma]["irate"]) gram = isigma * Nk maxlik_mean = (GY / Nk).T iS = Nfactor * gram + alpha iS = (iS + iS.T) / 2 S = np.linalg.inv(iS) mu = np.squeeze(Nfactor * S @ gram @ maxlik_mean) self.mean[k]["Sigma"] = rho * S + (1-rho) * np.copy(self.mean[k]["Sigma"]) self.mean[k]["Mu"] = rho * mu + (1-rho) * np.copy(self.mean[k]["Mu"]) # betas if self.hyperparameters["model_beta"] != 'no': if self.hyperparameters["model_mean"] != 'no': Yr = np.copy(Y) for k in range(K_mean): Yr -= np.expand_dims(self.mean[k]["Mu"], axis=0) * np.expand_dims(Gm[:,k], axis=1) else: Yr = Y for k in range(K_beta): if (not shared_beta) and (not self.active_states[k]): continue k_sigma = 0 if shared_covmat else k XGY = (X * np.expand_dims(Gb[:,k],axis=1)).T @ Yr if diagonal_covmat: jj = np.arange(p) for j in range(q): if self.hyperparameters["connectivity"] is not None: jj = np.where(self.hyperparameters["connectivity"][:,j]==1)[0] alpha = np.diag(self.alpha_beta[k]["shape"] / self.alpha_beta[k]["rate"][jj,j]) isigma = self.Sigma[k_sigma]["shape"] / self.Sigma[k_sigma]["rate"][j] iS = Nfactor * isigma * XGXb[jj,jj[:,np.newaxis],k] + alpha iS = (iS + iS.T) / 2 S = np.linalg.inv(iS) mu = np.squeeze(S @ np.expand_dims(Nfactor * isigma * XGY[jj,j],axis=1)) S_old = np.copy(self.beta[k]["Sigma"][jj,jj[:,np.newaxis],j]) mu_old = np.copy(self.beta[k]["Mu"][jj,j]) self.beta[k]["Sigma"][jj,jj[:,np.newaxis],j] = rho * S + (1-rho) * S_old self.beta[k]["Mu"][jj,j] = rho * mu + (1-rho) * mu_old else: alpha = np.diag(self.alpha_beta[k]["shape"] \ / np.reshape(self.alpha_beta[k]["rate"],p*q)) isigma = self.Sigma[k_sigma]["shape"] * self.Sigma[k_sigma]["irate"] gram = np.kron(XGXb[:,:,k],isigma) maxlik_beta = np.reshape(np.linalg.lstsq(XGXb[:,:,k],XGY,rcond=None)[0],(p*q,1)) iS = Nfactor * gram + alpha iS = (iS + iS.T) / 2 S = np.linalg.inv(iS) mu = Nfactor * S @ gram @ maxlik_beta mu = np.reshape(mu,(p,q)) self.beta[k]["Sigma"] = rho * S + (1-rho) * np.copy(self.beta[k]["Sigma"]) self.beta[k]["Mu"] = rho * mu + (1-rho) * np.copy(self.beta[k]["Mu"]) # Sigma if shared_covmat: if diagonal_covmat: rate = np.copy(self.priors["Sigma"]["rate"]) shape = self.priors["Sigma"]["shape"] + 0.5 * Nfactor * T else: rate = np.copy(self.priors["Sigma"]["rate"]) shape = self.priors["Sigma"]["shape"] + Nfactor * T for k in range(K): d = np.copy(Y) sm = np.zeros(q) if diagonal_covmat else np.zeros((q,q)) if self.hyperparameters["model_mean"] != 'no': kk = 0 if shared_mean else k d -= np.expand_dims(self.mean[kk]["Mu"], axis=0) sm = self.mean[kk]["Sigma"] * np.sum(Gamma[:,k]) sb = np.zeros(q) if diagonal_covmat else np.zeros((q,q)) if self.hyperparameters["model_beta"] != 'no': kk = 0 if shared_beta else k d -= (X @ self.beta[kk]["Mu"]) if diagonal_covmat: sb = np.zeros((T,q)) jj = np.arange(p) for j in range(q): if self.hyperparameters["connectivity"] is not None: jj = np.where(self.hyperparameters["connectivity"][:,j]==1)[0] sb[:,j] += np.sum((X[:,jj] @ \ self.beta[kk]["Sigma"][jj,jj[:,np.newaxis],j]) * X[:,jj],axis=1) sb = np.sum(sb * np.expand_dims(Gamma[:,k], axis=1), axis=0) else: for j1 in range(q): ind1 = np.arange(p) * q + j1 for j2 in range(j1,q): ind2 = np.arange(p) * q + j2 sb[j1,j2] = np.sum(self.beta[kk]["Sigma"][ind1,ind2[:,np.newaxis]] * XGX[:,:,k]) sb[j2,j1] = sb[j1,j2] if shared_covmat: # self.Sigma[0]["rate"] if diagonal_covmat: rate += 0.5 * Nfactor * \ ( (np.sum((d ** 2) * np.expand_dims(Gamma[:,k], axis=1),axis=0)) \ + sm + sb ) else: rate += Nfactor * \ ( ((d * np.expand_dims(Gamma[:,k], axis=1)).T @ d) + sm + sb) else: if diagonal_covmat: rate = self.priors["Sigma"]["rate"] \ + 0.5 * Nfactor * \ ( np.sum((d ** 2) * np.expand_dims(Gamma[:,k], axis=1),axis=0) \ + sm + sb ) shape = self.priors["Sigma"]["shape"] + \ 0.5 * Nfactor * np.sum(Gamma[:,k]) self.Sigma[k]["rate"] = rho * rate + (1-rho) * np.copy(self.Sigma[k]["rate"]) self.Sigma[k]["shape"] = rho * shape + (1-rho) * np.copy(self.Sigma[k]["shape"]) self.Sigma[k]["irate"] = 1 / self.Sigma[k]["rate"] else: rate = self.priors["Sigma"]["rate"] + Nfactor * \ ( ((d * np.expand_dims(Gamma[:,k], axis=1)).T @ d) \ + sm + sb ) shape = self.priors["Sigma"]["shape"] + \ Nfactor * np.sum(Gamma[:,k]) self.Sigma[k]["rate"] = rho * rate + (1-rho) * np.copy(self.Sigma[k]["rate"]) self.Sigma[k]["shape"] = rho * shape + (1-rho) * np.copy(self.Sigma[k]["shape"]) self.Sigma[k]["irate"] = np.linalg.inv(self.Sigma[k]["rate"]) if shared_covmat: self.Sigma[0]["rate"] = rho * rate + (1-rho) * np.copy(self.Sigma[0]["rate"]) self.Sigma[0]["shape"] = rho * shape + (1-rho) * np.copy(self.Sigma[0]["shape"]) if diagonal_covmat: self.Sigma[0]["irate"] = 1 / self.Sigma[0]["irate"] else: self.Sigma[0]["irate"] = np.linalg.inv(self.Sigma[0]["rate"]) self.__update_priors() def __update_obsdist_stochastic(self,files,I,Gamma,rho): Nfactor = len(files) / len(I) X,Y,_,_ = io.load_files(files,I) self.__update_obsdist(X,Y,Gamma,Nfactor,rho) def __init_obsdist(self,X,Y,Gamma): K = self.hyperparameters["K"] q = Y.shape[1] if self.hyperparameters["model_beta"] != 'no': p = X.shape[1] shared_covmat = (self.hyperparameters["covtype"] == 'shareddiag') or \ (self.hyperparameters["covtype"] == 'sharedfull') diagonal_covmat = (self.hyperparameters["covtype"] == 'shareddiag') or \ (self.hyperparameters["covtype"] == 'diag') shared_beta = self.hyperparameters["model_beta"] == 'shared' shared_mean = self.hyperparameters["model_mean"] == 'shared' K_mean,K_beta = K,K if shared_mean: K_mean = 1 if shared_beta: K_beta = 1 # alpha (keep it unregularised) if self.hyperparameters["model_mean"] != 'no': self.alpha_mean = [] for k in range(K_mean): self.alpha_mean.append({}) self.alpha_mean[k]["rate"] = 0.1 * np.ones(q) self.alpha_mean[k]["shape"] = 0.0001 # mild-regularised start if self.hyperparameters["model_beta"] != 'no': self.alpha_beta = [] for k in range(K_beta): self.alpha_beta.append({}) self.alpha_beta[k]["rate"] = 0.1 * np.ones((p,q)) self.alpha_beta[k]["shape"] = 0.0001 # mild-regularised start # Sigma (set to priors) self.Sigma = [] if diagonal_covmat and shared_covmat: self.Sigma.append({}) self.Sigma[0]["rate"] = np.copy(self.priors["Sigma"]["rate"]) self.Sigma[0]["irate"] = 1 / self.Sigma[0]["rate"] self.Sigma[0]["shape"] = self.priors["Sigma"]["shape"] elif diagonal_covmat and not shared_covmat: for k in range(K): self.Sigma.append({}) self.Sigma[k]["rate"] = np.copy(self.priors["Sigma"]["rate"]) self.Sigma[k]["irate"] = 1 / self.Sigma[k]["rate"] self.Sigma[k]["shape"] = self.priors["Sigma"]["shape"] elif not diagonal_covmat and shared_covmat: self.Sigma.append({}) self.Sigma[0]["rate"] = np.copy(self.priors["Sigma"]["rate"]) self.Sigma[0]["irate"] = np.linalg.inv(self.Sigma[0]["rate"]) self.Sigma[0]["shape"] = self.priors["Sigma"]["shape"] else: # not diagonal_covmat and not shared_covmat for k in range(K): self.Sigma.append({}) self.Sigma[k]["rate"] = np.copy(self.priors["Sigma"]["rate"]) self.Sigma[k]["irate"] = np.linalg.inv(self.Sigma[k]["rate"]) self.Sigma[k]["shape"] = self.priors["Sigma"]["shape"] # create initial values for mean and beta if self.hyperparameters["model_beta"] != 'no': self.beta = [] for k in range(K_beta): self.beta.append({}) self.beta[k]["Mu"] = np.zeros((p,q)) if diagonal_covmat: self.beta[k]["Sigma"] = np.zeros((p,p,q)) for j in range(q): self.beta[k]["Sigma"][:,:,j] = 0.01 * np.eye(p) else: self.beta[k]["Sigma"] = 0.01 * np.eye(p*q) if self.hyperparameters["model_mean"] != 'no': self.mean = [] for k in range(K_mean): self.mean.append({}) self.mean[k]["Mu"] = np.zeros(q) if diagonal_covmat: self.mean[k]["Sigma"] = 0.01 * np.ones(q) else: self.mean[k]["Sigma"] = 0.01 * np.eye(q) # do beta and mean conventionally, and redo alpha and Sigma self.__update_obsdist(X,Y,Gamma) def __init_obsdist_stochastic(self,files,I,Gamma): X,Y,_,_ = io.load_files(files,I) self.__init_obsdist(X,Y,Gamma) def __init_stochastic(self,files,options): N = len(files) I = np.random.choice(np.arange(N), size=options["initNbatch"], replace=False) X,Y,indices,_ = io.load_files(files,I) Gamma = self.__init_Gamma(X,Y,indices,options) self.__init_priors(files=files) self.__init_dynamics(Gamma,indices=indices) self.__init_obsdist_stochastic(files,I,Gamma) self.__update_priors() def __train_stochastic(self,files,Gamma,options): options = self.__check_options_stochastic(options,files) N = len(files) K = self.hyperparameters["K"] if options["verbose"]: start = time.time() # init model with a subset of subjects if not self.trained: if Gamma is None: self.__init_stochastic(files,options) else: I = np.random.choice(np.arange(N), size=options["initNbatch"], replace=False) X,Y,indices,_ = io.load_files(files,I) _,_,indices_all,_ = io.load_files(files,do_only_indices=True) Gamma_subset = auxiliary.slice_matrix(Gamma,indices) self.__init_priors(files=files) self.__init_dynamics(Gamma,indices=indices_all) self.__update_obsdist_stochastic(files,I,Gamma_subset,1) self.__update_priors() self.trained = True fe = np.empty(0) loglik_entropy = np.zeros((N,3)) # data likelihood and Gamma likelihood & entropy n_used = np.zeros(N) sampling_prob = np.ones(N) / N ever_used = np.zeros(N).astype(bool) sum_Gamma = np.zeros((K,N)) Dir_alpha_each = np.zeros((K,N)) Dir2d_alpha_each = np.zeros((K,K,N)) cyc_to_go = options["cyc_to_go_under_th"] # collect subject specific free energy terms for j in range(N): X,Y,indices,indices_individual = io.load_files(files,j) Gamma,Xi,_ = self.decode(X,Y,indices,serial=True,gpu_acceleration=0,gpuChunks=1) # data likelihood todo = (False,True,False,False,False) if X is None: loglik_entropy[j,0] = np.sum(self.get_fe(None,Y,Gamma,Xi,None,indices_individual[0],todo)) else: loglik_entropy[j,0] = np.sum(self.get_fe(X,Y,Gamma,Xi,None,indices_individual[0],todo)) # Gamma likelihood and entropy todo = (True,False,False,False,False) loglik_entropy[j,1] = np.sum(self.get_fe(None,Y,Gamma,Xi,None,indices_individual[0],todo)) todo = (False,False,True,False,False) loglik_entropy[j,2] = np.sum(self.get_fe(None,Y,Gamma,Xi,None,indices_individual[0],todo)) # do the actual training for it in range(options["cyc"]): rho = (it + 2)**(-options["forget_rate"]) I = np.random.choice(np.arange(N), size=options["Nbatch"], replace=False, p=sampling_prob) n_used[I] += 1 n_used = n_used - np.min(n_used) + 1 ever_used[I] = True sampling_prob = options["base_weights"] ** n_used sampling_prob = sampling_prob / np.sum(sampling_prob) Nfactor = N / np.sum(ever_used) X,Y,indices,indices_individual = io.load_files(files,I) indices_Xi = auxiliary.Gamma_indices_to_Xi_indices(indices) # E-step Gamma,Xi,_ = self.decode(X,Y,indices,serial=options["serial"],gpu_acceleration=options["gpu_acceleration"],gpuChunks=options["gpuChunks"]) sum_Gamma[:,I] = utils.get_FO(Gamma,indices,True).T # which states are active? if options["deactivate_states"]: for k in range(K): FO = np.sum(sum_Gamma[k,:]) active_state = self.active_states[k] self.active_states[k] = FO > options["threshold_active"] if options["verbose"]: if (not active_state) and self.active_states[k]: print("State " + str(k) + " is reactivated") if active_state and (not self.active_states[k]): print("State " + str(k) + " is deactivated") # M-step if options["updateDyn"]: for j in range(options["Nbatch"]): Dir_alpha_each[:,I[j]] = Gamma[indices[j,0]] tt_j = range(indices_Xi[j,0],indices_Xi[j,1]) Dir2d_alpha_each[:,:,I[j]] = np.sum(Xi[tt_j,:,:],axis=0) Dir_alpha = Nfactor * np.sum(Dir_alpha_each[:,ever_used],axis=1) Dir2d_alpha = Nfactor * np.sum(Dir2d_alpha_each[:,:,ever_used],axis=2) self.__update_dynamics(Dir_alpha=Dir_alpha,Dir2d_alpha=Dir2d_alpha,rho=rho) if options["updateObs"]: self.__update_obsdist(X,Y,Gamma,Nfactor,rho) # collect subject specific free energy terms for j in range(options["Nbatch"]): tt_j = range(indices[j,0],indices[j,1]) tt_j_xi = range(indices_Xi[j,0],indices_Xi[j,1]) # data likelihood todo = (False,True,False,False,False) if X is None: loglik_entropy[I[j],0] = np.sum(self.get_fe(None,Y[tt_j,:], \ Gamma[tt_j,:],Xi[tt_j_xi,:,:],None,indices_individual[j],todo)) else: loglik_entropy[I[j],0] = np.sum(self.get_fe(X[tt_j,:],Y[tt_j,:], \ Gamma[tt_j,:],Xi[tt_j_xi,:,:],None,indices_individual[j],todo)) # Gamma likelihood and entropy todo = (True,False,False,False,False) loglik_entropy[I[j],1] = np.sum(self.get_fe(None,Y[tt_j,:], \ Gamma[tt_j,:],Xi[tt_j_xi,:,:],None,indices_individual[j],todo)) todo = (False,False,True,False,False) loglik_entropy[I[j],2] = np.sum(self.get_fe(None,Y[tt_j,:], \ Gamma[tt_j,:],Xi[tt_j_xi,:,:],None,indices_individual[j],todo)) # KL divergences todo = (False,False,False,True,True) kl = np.sum(self.get_fe(None,None,None,None,None,None,todo)) fe_it = np.sum(kl) + np.sum(loglik_entropy) fe = np.append(fe, fe_it) if len(fe) > 1: chgFrEn = abs((fe[-1]-fe[-2]) / (fe[-1]-fe[0])) if it > 10: if np.abs(chgFrEn) < options["tol"]: cyc_to_go -= 1 else: cyc_to_go = options["cyc_to_go_under_th"] if options["verbose"]: print("Cycle " + str(it+1) + ", free energy = " + str(fe_it) + \ ", relative change = " + str(chgFrEn) + ", rho = " + str(rho)) if cyc_to_go == 0: if options["verbose"]: print("Reached early convergence") break else: if options["verbose"]: print("Cycle " + str(it+1) + " free energy = " + str(fe_it)) K_active = np.sum(self.active_states) if options["verbose"]: end = time.time() elapsed = end - start print("Finished stochastic training in " + str(round(elapsed,2)) + \ "s : active states = " + str(K_active)) return fe ### Public methods
[docs] def loglikelihood(self,X,Y): """Computes the likelihood of the model per state and time point given the data X and Y. Parameters: ----------- X : array-like of shape (n_samples, n_parcels) The timeseries of set of variables 1. Y : array-like of shape (n_samples, n_parcels) The timeseries of set of variables 2. Returns: -------- L : array of shape (n_samples, n_states) The likelihood of the model per state and time point given the data X and Y. Raises: -------- Exception If the model has not been trained. """ if not self.trained: raise Exception("The model has not yet been trained") K = self.hyperparameters["K"] T = Y.shape[0] L = np.zeros((T,K)) cache = {} for k in range(K): self.__loglikelihood_k(X,Y,L,k,cache) return L
[docs] def decode(self,X,Y,indices=None,files=None,viterbi=False,set=None,serial=False,gpu_acceleration=0,gpuChunks=1): """Calculates state time courses for all the data using either parallel or sequential processing. Parameters: ----------- X : array-like of shape (n_samples, n_parcels) The timeseries of set of variables 1. Y : array-like of shape (n_samples, n_parcels) The timeseries of set of variables 2. indices : array-like of shape (n_sessions, 2), optional, default=None The start and end indices of each trial/session in the input data. files : list of str, optional, default=None List of filenames corresponding to the indices. viterbi : bool, optional, default=False Whether or not the Viterbi algorithm should be used. set : int, optional, default=None Index of the sessions set to decode. Returns: -------- If viterbi=True: vpath : array of shape (n_samples,) The most likely state sequence. If viterbi=False: Gamma : array of shape (n_samples, n_states) The state probability timeseries. Xi : array of shape (n_samples - n_sessions, n_states, n_states) The joint probabilities of past and future states conditioned on data. scale : array-like of shape (n_samples,) The scaling factors from the inference, used to compute the free energy. In normal use, we would do Gamma,Xi,_ = hmm.decode(X,Y,indices) Raises: ------- Exception If the model has not been trained. If both 'files' and 'Y' arguments are provided. """ if (files is not None) and (Y is not None): raise Exception("Argument 'files' cannot be used if the data (Y) is also provided") if not self.trained: raise Exception("The model has not yet been trained") if files is not None: X,Y,indices,_ = io.load_files(files) if indices is None: indices = np.zeros((1,2)).astype(int) indices[0,0] = 0 indices[0,1] = Y.shape[0] # else: #indices_individual = indices - np.expand_dims(indices[:,0],axis=1) if len(indices.shape) == 1: indices = np.expand_dims(indices,axis=0) X_sliced = X Y_sliced = Y if set is not None: if self.hyperparameters["model_beta"] != 'no': X_sliced = auxiliary.slice_matrix(X,indices[set,:]) Y_sliced = auxiliary.slice_matrix(Y,indices[set,:]) indices_sliced = indices[set,:] else: indices_sliced = indices indices_individual = indices_sliced - np.expand_dims(indices_sliced[:,0],axis=1) L = np.exp(self.loglikelihood(X_sliced,Y_sliced)) minreal = sys.float_info.min maxreal = sys.float_info.max L[L < minreal] = minreal L[L > maxreal] = maxreal N = indices.shape[0] maxt = np.max(indices_individual[:,1]) if serial: L_ = L else: ## Convert L into a 3D matrix (time points * samples * states) L_ = np.zeros((maxt, N, L.shape[1])) for j in range(N): tt = range(indices_sliced[j,0],indices_sliced[j,1]) tti = range(indices_individual[j,0],indices_individual[j,1]) L_[tti,j,:] = L[tt,:] if viterbi: vpath = self.__forward_backward_vp(L_,indices_sliced,indices_individual) return vpath else: if gpuChunks > 1: n_chunks = gpuChunks chunk_size = int(np.ceil(N/n_chunks)) if N < (chunk_size*n_chunks): n_chunks -= 1 chunk_rem = N-(chunk_size * n_chunks) else: chunk_rem = 0 T,K = L.shape Gamma = np.zeros((T,K)) Xi = np.zeros((T-N,K,K)) scale = np.zeros((T)) for nn in range(n_chunks): sub_n = range(nn*chunk_size,(nn+1)*chunk_size) ind_chunk = indices_sliced[sub_n,:]-indices_sliced[sub_n[0],0] Gamma_,Xi_,scale_ = self.__forward_backward(L_[:,sub_n,:],ind_chunk,indices_individual[sub_n,:],serial,gpu_acceleration) tt = range(indices_sliced[nn*chunk_size,0],indices_sliced[(nn+1)*chunk_size-1,1]) tt_xi = range(indices_sliced[nn*chunk_size,0]-nn*chunk_size,indices_sliced[(nn+1)*chunk_size-1,1]-(nn+1)*chunk_size) Gamma[tt,:] = Gamma_ Xi[tt_xi,:,:] = Xi_ scale[tt] = scale_ if chunk_rem>0: sub_n = range((n_chunks * chunk_size), ((n_chunks * chunk_size) + chunk_rem)) ind_chunk = indices_sliced[sub_n,:]-indices_sliced[sub_n[0],0] Gamma_,Xi_,scale_ = self.__forward_backward(L_[:,sub_n,:],ind_chunk,indices_individual[sub_n,:],serial,gpu_acceleration) tt = range(indices_sliced[(n_chunks * chunk_size),0],indices_sliced[(n_chunks * chunk_size) + chunk_rem-1,1]) tt_xi = range(indices_sliced[(n_chunks * chunk_size),0]-n_chunks*chunk_size,indices_sliced[(n_chunks * chunk_size) + chunk_rem-1,1]-(n_chunks * chunk_size + chunk_rem)) Gamma[tt,:] = Gamma_ Xi[tt_xi,:,:] = Xi_ scale[tt] = scale_ return Gamma,Xi,scale else: Gamma,Xi,scale = self.__forward_backward(L_,indices_sliced,indices_individual,serial,gpu_acceleration) return Gamma,Xi,scale
[docs] def sample_Gamma(self,size): """Generates Gamma, for timeseries of lengths specified in variable size. Parameters: ----------- size : array Array of shape (n_sessions,) or (n_sessions, 2). If `size` is 1-dimensional, each element represents the length of a session. If `size` is 2-dimensional, each row of `size` represents the start and end indices of a session in a timeseries. Returns: -------- Gamma : array of shape (n_samples, n_states) The state probability timeseries. """ #if not self.trained: # raise Exception("The model has not yet been trained") K = self.hyperparameters["K"] if len(size.shape)==1: # T T = size indices = auxiliary.make_indices_from_T(T) else: # indices indices = size if len(indices.shape) == 1: indices = np.expand_dims(indices,axis=0) T = size[:,1] - size[:,0] Gamma = np.zeros((np.sum(T),K)) N = indices.shape[0] rng = np.random.default_rng() for j in range(N): tt = np.arange(indices[j,0],indices[j,1]) gamma = np.zeros((T[j],K)) gamma[0,:] = rng.multinomial(1,self.Pi) for t in range(1,T[j]): k = np.where(gamma[t-1,:])[0][0] gamma[t,:] = rng.multinomial(1,self.P[k,:]) Gamma[tt,:] = gamma return Gamma
[docs] def sample(self,size,X=None,Gamma=None): """Generates Gamma and Y for timeseries of lengths specified in variable size. Parameters: ----------- size : array of shape (n_sessions,) or (n_sessions, 2) If `size` is 1-dimensional, each element represents the length of a session. If `size` is 2-dimensional, each row of `size` represents the start and end indices of a session in a timeseries. X : array of shape (n_samples, n_parcels), default=None The timeseries of set of variables 1. Gamma : array of shape (n_samples, n_states), default=None The state probability timeseries. Returns: -------- If X is not None: X : array of shape (n_samples, n_parcels) The timeseries of set of variables 1. Y: array of shape (n_samples,n_parcels) The timeseries of set of variables 2. Gamma : array of shape (n_samples, n_states) The state probability timeseries. """ if not self.trained: raise Exception("The model has not yet been trained") K = self.hyperparameters["K"] shared_covmat = (self.hyperparameters["covtype"] == 'shareddiag') or \ (self.hyperparameters["covtype"] == 'sharedfull') diagonal_covmat = (self.hyperparameters["covtype"] == 'shareddiag') or \ (self.hyperparameters["covtype"] == 'diag') if len(np.zeros(100).shape)==1: # T T = size indices = auxiliary.make_indices_from_T(T) else: # indices indices = size if len(indices.shape) == 1: indices = np.expand_dims(indices,axis=0) T = size[:,1] - size[:,0] N = indices.shape[0] q = self.Sigma[0]["rate"].shape[0] if Gamma is None: Gamma = self.sample_Gamma(size) rng = np.random.default_rng() if (self.hyperparameters["model_beta"] != 'no') and (X is None): p = self.beta[0]["Mu"].shape[0] X = np.random.normal(size=(np.sum(T),p)) # Y, mean Y = np.zeros((np.sum(T),q)) if self.hyperparameters["model_mean"] == 'shared': Y += np.expand_dims(self.mean[0]['Mu'],axis=0) if self.hyperparameters["model_beta"] == 'shared': Y += X @ self.beta[0]["Mu"] for k in range(K): if self.hyperparameters["model_mean"] == 'state': Y += np.expand_dims(self.mean[k]["Mu"],axis=0) * np.expand_dims(Gamma[:,k],axis=1) if self.hyperparameters["model_beta"] == 'state': Y += (X @ self.beta[k]["Mu"]) * np.expand_dims(Gamma[:,k],axis=1) # Y, covariance if shared_covmat: C = self.get_covariance_matrix() if diagonal_covmat: Y += rng.normal(loc=np.zeros(q),scale=C,size=Y.shape) else: Y += rng.multivariate_normal(mean=np.zeros(q),cov=C,size=Y.shape[0]) else: for k in range(K): C = self.get_covariance_matrix(k) if diagonal_covmat: Y += rng.normal(loc=np.zeros(q),scale=C,size=Y.shape) \ * np.expand_dims(Gamma[:,k],axis=1) else: Y += rng.multivariate_normal(mean=np.zeros(q),cov=C,size=Y.shape[0]) \ * np.expand_dims(Gamma[:,k],axis=1) if X is None: return Y,Gamma else: return X,Y,Gamma
[docs] def get_active_K(self): """Returns the number of active states Returns: -------- K_active : int Number of active states. """ K_active = np.sum(self.active_states) return K_active
[docs] def get_r2(self,X,Y,Gamma,indices=None): """Computes the explained variance per session/trial and per column of Y Parameters: ----------- X : array of shape (n_samples, n_variables_1) The timeseries of set of variables 1. Y : array of shape (n_samples, n_variables_2) The timeseries of set of variables 2. Gamma : array of shape (n_samples, n_states), default=None The state timeseries probabilities. indices : array-like of shape (n_sessions, 2), optional, default=None The start and end indices of each trial/session in the input data. Returns: -------- r2 : array of shape (n_sessions, n_variables_2) The R-squared (proportion of the variance explained) for each session and each variable in Y. Raises: -------- Exception If the model has not been trained, or if it does not have neither mean or beta Notes: ------- This function does not take the covariance matrix into account """ if not self.trained: raise Exception("The model has not yet been trained") K = self.hyperparameters["K"] q = Y.shape[1] N = indices.shape[0] r2 = np.zeros((N,q)) m = np.mean(Y,axis=0) for j in range(N): tt_j = range(indices[j,0],indices[j,1]) if X is not None: Xj = np.copy(X[tt_j,:]) d = np.copy(Y[tt_j,:]) if self.hyperparameters["model_mean"] == 'shared': d -= np.expand_dims(self.mean[0]['Mu'],axis=0) if self.hyperparameters["model_beta"] == 'shared': d -= (Xj @ self.beta[0]['Mu']) for k in range(K): if self.hyperparameters["model_mean"] == 'state': d -= np.expand_dims(self.mean[k]['Mu'],axis=0) * np.expand_dims(Gamma[:,k],axis=1) if self.hyperparameters["model_beta"] == 'state': d -= (Xj @ self.beta[k]['Mu']) * np.expand_dims(Gamma[:,k],axis=1) d = np.sum(d**2,axis=0) d0 = np.copy(Y[tt_j,:]) if self.hyperparameters["model_mean"] != 'no': d0 -= np.expand_dims(m,axis=0) d0 = np.sum(d0**2,axis=0) r2[j,:] = 1 - (d / d0) return r2
[docs] def get_fe(self,X,Y,Gamma,Xi,scale=None,indices=None,todo=None,non_informative_prior_P=False): """Computes the Free Energy of an HMM depending on observation model. Parameters: ----------- X : array of shape (n_samples, n_parcels) The timeseries of set of variables 1. Y : array of shape (n_samples, n_parcels) The timeseries of set of variables 2. Gamma : array of shape (n_samples, n_states), default=None The state timeseries probabilities. Xi : array-like of shape (n_samples - n_sessions, n_states, n_states) The joint probabilities of past and future states conditioned on data. scale : array-like of shape (n_samples,), default=None The scaling factors used to compute the free energy of the dataset. If None, scaling is automatically computed. indices : array-like of shape (n_sessions, 2), optional, default=None The start and end indices of each trial/session in the input data. todo: bool of shape (n_terms,) or None, default=None Whether or not each of the 5 elements (see `fe_terms`) should be computed. Only for internal use. non_informative_prior_P: array-like of shape (n_states, n_states), optional, default=False Prior of transition probability matrix Only for internal use. Returns: -------- fe_terms : array of shape (n_terms,) The variational free energy, separated into different terms: - element 1: Gamma Entropy - element 2: Data negative log-likelihood - element 3: Gamma negative log-likelihood - element 4: KL divergence for initial and transition probabilities - element 5: KL divergence for the state parameters Raises: -------- Exception If the model has not been trained. Notes: ------- This function computes the variational free energy using a specific algorithm. For more information on the algorithm, see [^1]. References: ------------ [^1] Smith, J. et al. "A variational approach to Bayesian learning of switching dynamics in dynamical systems." Journal of Machine Learning Research, vol. 18, no. 4, 2017. """ if not self.trained: raise Exception("The model has not yet been trained") if todo is None: # Gamma_entropy, data loglik, Gamma loglik, P/Pi KL, state KL todo = (True,True,True,True,True) K = self.hyperparameters["K"] shared_covmat = (self.hyperparameters["covtype"] == 'shareddiag') or \ (self.hyperparameters["covtype"] == 'sharedfull') diagonal_covmat = (self.hyperparameters["covtype"] == 'shareddiag') or \ (self.hyperparameters["covtype"] == 'diag') shared_beta = self.hyperparameters["model_beta"] == 'shared' shared_mean = self.hyperparameters["model_mean"] == 'shared' K_mean,K_beta = K,K if shared_mean: K_mean = 1 if shared_beta: K_beta = 1 if todo[0] or todo[2]: if indices is None: indices = np.zeros((1,2)).astype(int) indices[0,0] = 0 indices[0,1] = Y.shape[0] elif len(indices.shape) == 1: indices = np.expand_dims(np.copy(indices),axis=0) if (scale is None) or (sum(todo)<5): # standard way use_scale = False fe_some_terms = np.zeros(3) if todo[0]: fe_some_terms[0] = -auxiliary.Gamma_entropy(Gamma,Xi,indices) if todo[1]: fe_some_terms[1] = -np.sum(self.loglikelihood(X,Y) * Gamma) if todo[2]: fe_some_terms[2] = -self.__Gamma_loglikelihood(Gamma,Xi,indices) else: # short way if we have the scale variables from the forward-backward algorithm use_scale = True fe_some_terms = -np.log(scale) # (only valid just after) kldyn = [] if todo[3]: if non_informative_prior_P: P_prior = np.ones((K,K)) else: P_prior = self.priors["Dir2d_alpha"] kldyn.append(auxiliary.dirichlet_kl(self.Dir_alpha,self.priors["Dir_alpha"])) for k in range(K): kldyn.append(auxiliary.dirichlet_kl(self.Dir2d_alpha[k,:],P_prior[k,:])) klobs = [] if todo[4]: q = self.Sigma[0]["rate"].shape[0] if self.hyperparameters["model_mean"] != 'no': for k in range(K_mean): if diagonal_covmat: for j in range(q): klobs.append(auxiliary.gauss1d_kl( \ self.mean[k]["Mu"][j], self.mean[k]["Sigma"][j], \ 0,self.alpha_mean[k]["rate"][j] / self.alpha_mean[k]["shape"] \ )) klobs.append(auxiliary.gamma_kl( self.alpha_mean[k]["shape"],self.alpha_mean[k]["rate"][j], \ self.priors["alpha_mean"]["shape"],self.priors["alpha_mean"]["rate"][j] \ )) else: klobs.append(auxiliary.gauss_kl( \ self.mean[k]["Mu"],self.mean[k]["Sigma"], \ np.zeros(q),np.diag(self.alpha_mean[k]["rate"] / self.alpha_mean[k]["shape"]) \ )) klobs.append(np.sum(auxiliary.gamma_kl( \ self.alpha_mean[k]["shape"],self.alpha_mean[k]["rate"],\ self.priors["alpha_mean"]["shape"],self.priors["alpha_mean"]["rate"] \ ))) if self.hyperparameters["model_beta"] != 'no': p = self.beta[0]["Mu"].shape[0] jj = np.arange(p) for k in range(K_beta): if diagonal_covmat: for j in range(q): if self.hyperparameters["connectivity"] is not None: jj = np.where(self.hyperparameters["connectivity"][:,j]==1)[0] pj = len(jj) klobs.append(auxiliary.gauss_kl( \ self.beta[k]["Mu"][jj,j], self.beta[k]["Sigma"][jj,jj[:,np.newaxis],j], \ np.zeros((pj,)), np.diag(self.alpha_beta[k]["rate"][jj,j] / self.alpha_beta[k]["shape"]) \ )) klobs.append(np.sum(auxiliary.gamma_kl( \ self.alpha_beta[k]["shape"],self.alpha_beta[k]["rate"][jj,j], \ self.priors["alpha_beta"]["shape"],self.priors["alpha_beta"]["rate"][jj,j] \ ))) else: klobs.append(auxiliary.gauss_kl( np.reshape(self.beta[k]["Mu"],(p*q,)),\ self.beta[k]["Sigma"],\ np.zeros(p*q), np.diag(np.reshape(self.alpha_beta[k]["rate"],(p*q,)) / self.alpha_beta[k]["shape"]) \ )) klobs.append(np.sum(auxiliary.gamma_kl( \ self.alpha_beta[k]["shape"],\ np.reshape(self.alpha_beta[k]["rate"],(p*q,)),\ self.priors["alpha_beta"]["shape"],\ np.reshape(self.priors["alpha_beta"]["rate"],(p*q,)) \ ))) if shared_covmat and (not diagonal_covmat): klobs.append(auxiliary.wishart_kl(self.Sigma[0]["shape"],self.Sigma[0]["rate"],\ self.priors["Sigma"]["shape"],self.priors["Sigma"]["rate"])) elif (not shared_covmat) and (not diagonal_covmat): for k in range(K): klobs.append(auxiliary.wishart_kl(self.Sigma[k]["shape"],self.Sigma[k]["rate"],\ self.priors["Sigma"]["shape"],self.priors["Sigma"]["rate"])) elif shared_covmat and diagonal_covmat: klobs.append(np.sum(auxiliary.gamma_kl(self.Sigma[0]["shape"],self.Sigma[0]["rate"],\ self.priors["Sigma"]["shape"],self.priors["Sigma"]["rate"]))) elif (not shared_covmat) and diagonal_covmat: for k in range(K): klobs.append(np.sum(auxiliary.gamma_kl(self.Sigma[k]["shape"],self.Sigma[k]["rate"],\ self.priors["Sigma"]["shape"],self.priors["Sigma"]["rate"]))) if use_scale: fe_terms = np.zeros(3) fe_terms[0] = np.sum(fe_some_terms) fe_terms[1] = sum(kldyn) fe_terms[2] = sum(klobs) else: fe_terms = np.zeros(5) for j in range(3): fe_terms[j] = fe_some_terms[j] fe_terms[3] = sum(kldyn) fe_terms[4] = sum(klobs) return fe_terms
[docs] def get_covariance_matrix(self,k=0): """Returns the covariance matrix for the specified state. Parameters: ----------- k : int, optional The index of the state. Default=0. Returns: -------- array of shape (n_parcels, n_parcels) The covariance matrix for the specified state. Raises: ------- Exception If the model has not been trained. """ if not self.trained: raise Exception("The model has not yet been trained") return self.Sigma[k]["rate"] / self.Sigma[k]["shape"]
[docs] def get_inverse_covariance_matrix(self,k=0): """Returns the inverse covariance matrix for the specified state. Parameters: ----------- k : int, optional The index of the state. Default=0. Returns: -------- array of shape (n_parcels, n_parcels) The inverse covariance matrix for the specified state. Raises: ------- Exception If the model has not been trained. """ if not self.trained: raise Exception("The model has not yet been trained") return self.Sigma[k]["irate"] * self.Sigma[k]["shape"]
[docs] def set_covariance_matrix(rate,shape,self,k=0): """Sets the covariance matrix to specific values. Useful to create synthetic data for simulations. Parameters: ----------- rate : ndarray of shape (n_variables_2 x n_variables_2), The rate parameter of the covariance shape : int, the shape parameter of the covariance k : int, optional The index of the state. Default=0. """ self.Sigma[k]["rate"] = rate self.Sigma[k]["shape"] = shape
[docs] def get_beta(self,k=0): """Returns the regression coefficients (beta) for the specified state. Parameters: ----------- k : int, optional, default=0 The index of the state for which to retrieve the beta value. Returns: -------- beta: ndarray of shape (n_variables_1 x n_variables_2) The regression coefficients of each variable in X on each variable in Y for the specified state. Raises: ------- Exception If the model has not yet been trained. If the model has no beta. """ if not self.trained: raise Exception("The model has not yet been trained") if self.hyperparameters["model_beta"] == 'no': raise Exception("The model has no beta") return self.beta[k]["Mu"]
[docs] def get_betas(self): """Returns the regression coefficients (beta) for all states. Returns: -------- betas: ndarray of shape (n_variables_1 x n_variables_2 x n_states) The regression coefficients of each variable in X on each variable in Y for all states. Raises: ------- Exception If the model has not yet been trained. If the model has no beta. """ if not self.trained: raise Exception("The model has not yet been trained") if self.hyperparameters["model_beta"] == 'no': raise Exception("The model has no beta") (p,q) = self.beta[0]["Mu"].shape K = self.hyperparameters["K"] betas = np.zeros((p,q,K)) for k in range(K): betas[:,:,k] = self.beta[k]["Mu"] return betas
[docs] def set_beta(self,beta,k=0): """Sets the regression coefficients (beta) to specific values. Useful to create synthetic data for simulations. Parameters: ----------- beta: ndarray of shape (n_variables_1 x n_variables_2) The regression coefficients of each variable in X on each variable in Y for the specified state k. k : int, optional, default=0 The index of the state for which to retrieve the beta value. """ self.beta[k]["Mu"] = beta
[docs] def get_mean(self,k=0): """Returns the mean for the specified state. Parameters: ----------- k : int, optional, default=0 The index of the state for which to retrieve the mean. Returns: -------- mean: ndarray of shape (n_variables_2,) The mean value of each variable in Y for the specified state. Raises: ------- Exception If the model has not yet been trained. If the model has no mean. """ if not self.trained: raise Exception("The model has not yet been trained") if self.hyperparameters["model_mean"] == 'no': raise Exception("The model has no mean") return self.mean[k]["Mu"]
[docs] def get_means(self): """Returns the means for all states. Returns: -------- means: ndarray of shape (n_variables_2, n_states) The mean value of each variable in Y for all states. Raises: ------- Exception If the model has not yet been trained. If the model has no mean. """ if not self.trained: raise Exception("The model has not yet been trained") if self.hyperparameters["model_mean"] == 'no': raise Exception("The model has no mean") if self.hyperparameters["model_beta"] != 'no': q = self.beta[0]["Mu"].shape[1] else: q = self.Sigma[0]["rate"].shape[0] K = self.hyperparameters["K"] means = np.zeros((q,K)) for k in range(K): means[:,k] = self.mean[k]["Mu"] return means
[docs] def set_mean(self,mean,k=0): """Sets the mean to specific values. Useful to create synthetic data for simulations. Parameters: ----------- mean: ndarray of shape (n_variables_2,) The mean value of each variable in Y for the specified state. k : int, optional, default=0 The index of the state for which to retrieve the beta value. """ self.mean[k]["Mu"] = mean
[docs] def get_P(self): """Returns transition probability matrix Returns: -------- P: ndarray of shape (K,K), where K is the number of states Raises: ------- Exception If the model has not yet been trained. """ if not self.trained: raise Exception("The model has not yet been trained") return self.P
[docs] def get_Pi(self): """Returns initial probabilities Returns: -------- Pi: ndarray of shape (K,), where K is the number of states Raises: ------- Exception If the model has not yet been trained. """ if not self.trained: raise Exception("The model has not yet been trained") return self.Pi
[docs] def set_P(self,P): """Set transition probability matrix. Useful to create synthetic data for simulations. Parameters: -------- P: ndarray of shape (K,K), where K is the number of states """ self.P = P
[docs] def set_Pi(self,Pi): """Returns initial probabilities. Useful to create synthetic data for simulations. Parameters: -------- Pi: ndarray of shape (K,), where K is the number of states """ self.Pi = Pi
[docs] def dual_estimate(self,X,Y,indices=None,Gamma=None,Xi=None,for_kernel=False): """Dual estimation of HMM parameters. Parameters: ----------- X : array-like of shape (n_samples, n_variables_1) The timeseries of set of variables 1. Y : array-like of shape (n_samples, n_variables_2) The timeseries of set of variables 2. indices : array-like of shape (n_sessions, 2), optional The start and end indices of each trial/session in the input data. If None, a single segment spanning the entire sequence is used. Gamma : array-like of shape (n_samples, n_states), optional The state probabilities. If None, it is computed from the input observations. Xi : array-like of shape (n_samples - n_sessions, n_states, n_states), optional The joint probabilities of past and future states conditioned on data. If None, it is computed from the input observations. for_kernel : bool, optional Whether purpose of dual estimation is kernel (gradient) computation, or not If True, function will also return Gamma and Xi (default False) Returns: --------- hmm_dual : object A copy of the HMM object with updated dynamics and observation distributions. """ if not self.trained: raise Exception("The model has not yet been trained") if indices is None: # one big chunk with no cuts indices = np.zeros((1,2)).astype(int) indices[0,0] = 0 indices[0,1] = Y.shape[0] if len(indices.shape) == 1: indices = np.expand_dims(indices,axis=0) N = indices.shape[0] hmm_dual = [] if Gamma is None: Gamma,Xi,_ = self.decode(X,Y,indices) hmm_dual = copy.deepcopy(self) hmm_dual.__update_dynamics(Gamma,Xi,indices) hmm_dual.__update_obsdist(X,Y,Gamma) # for j in range(N): # tt = np.arange(indices[j,0],indices[j,1]) # tt_xi = tt[0:-1] - j # indices_j = np.zeros((1,2)).astype(int) # indices_j[0,1] = indices[j,1] - indices[j,0] # hmm_dual.append = copy.deepcopy(self) # hmm_dual[j].update_dynamics(Gamma[tt,:],Xi[tt_xi,:,:],indices_j) # hmm_dual[j].update_obsdist(X[tt,:],Y[tt,:],Gamma[tt,:]) if for_kernel: return hmm_dual,Gamma,Xi else: return hmm_dual
[docs] def initialize(self,p,q): """ Initialize the parameters of the HMM with initial random values. IMPORTANT: This should not be run before training. This is only useful for sampling data, and should be combined with subsequent calls to set_beta, set_mean, set_covariance_matrix, set_P and set_Pi. Parameters: ----------- p : number of channels in X (not used if only Y is modelled) q : number of channels in Y """ T = 1000 if self.hyperparameters["model_beta"] != 'no': X = np.random.random((T,p)) else: X = None Y = np.random.random((T,q)) options = {} options["cyc"] = 1 options["initrep"] = 0 self.train(X=X,Y=Y,options=options)
[docs] def train(self,X=None,Y=None,indices=None,files=None,Gamma=None,Xi=None,scale=None,options=None): """ Train the GLHMM on input data X and Y, which most general formulation is Y = mu_k + X beta_k + noise where noise is Gaussian with mean zero and standard deviation Sigma_k It supports both standard and stochastic variational learning; for the latter, data must be supplied in files format Parameters: ----------- X : array-like of shape (n_samples, n_variables_1) The timeseries of set of variables 1. Y : array-like of shape (n_samples, n_variables_2) The timeseries of set of variables 2. indices : array-like of shape (n_sessions, 2), optional The start and end indices of each trial/session in the input data. If None, one big segment with no cuts is assumed. files : str or list of str, optional The filename(s) containing the data to load. If not None, X, Y, and indices are ignored. Gamma : array-like of shape (n_samples, n_states), optional The initial values of the state probabilities. Xi : array-like of shape (n_samples - n_sessions, n_states, n_states), optional The joint probabilities of past and future states conditioned on data. scale : array-like of shape (n_samples,), optional The scaling factors used to compute the free energy of the dataset. If None, scaling is automatically computed. options : dict, optional A dictionary with options to control the training process. Returns: -------- Gamma : array-like of shape (n_samples, n_states) The state probabilities. To avoid unnecessary use of memory, Gamma is only returned if learning is non-stochastic; otherwise it is returned as an empty numpy array. To get Gamma after stochastic learning, use the decode method. Xi : array-like of shape (n_samples - n_sessions, n_states, n_states) The joint probabilities of past and future states conditioned on data. To avoid unnecessary use of memory, Xi is only returned if learning is non-stochastic; otherwise it is returned as an empty numpy array. To get Xi after stochastic learning, use the decode method. fe : array-like The free energy computed at each iteration of the training process. Raises: ------- Exception If `files` and `Y` are both provided or if neither are provided. If `X` is not provided and the hyperparameter 'model_beta' is True. If 'files' is not provided and stochastic learning is called upon """ stochastic = (options is not None) and ("stochastic" in options) and (options["stochastic"]) if (files is not None) and (Y is not None) and (not stochastic): warnings.warn("Argument 'files' cannot be used if the data (Y) is also provided") if (files is None) and (Y is None): raise Exception("Training needs data") if (X is None) and (self.hyperparameters["model_beta"] != 'no'): raise Exception("If you want to model beta, X is needed as an argument") if stochastic: if files is None: raise Exception("For stochastic learning, argument 'files' must be provided") if (X is not None) or (Y is not None): warnings.warn("X and Y are not used in stochastic learning") fe = self.__train_stochastic(files,Gamma,options) return np.empty(0),np.empty(0),fe options = self.__check_options(options) K = self.hyperparameters["K"] if files is not None: X,Y,indices,_ = io.load_files(files) if indices is None: # one big chunk with no cuts indices = np.zeros((1,2)).astype(int) indices[0,1] = Y.shape[0] if len(indices.shape) == 1: indices = np.expand_dims(indices,axis=0) if options["verbose"]: start = time.time() if not self.trained: if Gamma is None: Gamma = self.__init_Gamma(X,Y,indices,options) elif Gamma.shape != (Y.shape[0],K): raise Exception('Supplied initial Gamma has not the correct dimensions') self.__init_priors(X,Y) self.__init_dynamics(Gamma,indices=indices) self.__init_obsdist(X,Y,Gamma) self.__update_priors() self.trained = True fe = np.empty(0) cyc_to_go = options["cyc_to_go_under_th"] for it in range(options["cyc"]): if options["updateGamma"]: # E-step Gamma,Xi,scale = self.decode(X,Y,indices,serial=options["serial"],gpu_acceleration=options["gpu_acceleration"],gpuChunks=options["gpuChunks"]) status = self.__check_Gamma(Gamma) if status: warnings.warn('Gamma has almost zero variance: stuck in a weird solution') # which states are active? if options["deactivate_states"]: FO = np.sum(Gamma,axis=0) for k in range(K): active_state = self.active_states[k] self.active_states[k] = FO[k] > options["threshold_active"] if options["verbose"]: if (not active_state) and self.active_states[k]: print("State " + str(k) + " is reactivated") if active_state and (not self.active_states[k]): print("State " + str(k) + " is deactivated") # epsilon = 1 # while status: # self.perturb(epsilon) # Gamma,Xi,scale = self.decode(X,Y,indices) # status = self.__check_Gamma(Gamma) # epsilon *= 2 # if we use the scale to compute the FE, it's only valid after the E-step fe_it = np.sum(self.get_fe(X,Y,Gamma,Xi,scale,indices)) fe = np.append(fe, fe_it) if it > 1: chgFrEn = abs((fe[-1]-fe[-2]) / (fe[-1]-fe[0])) if np.abs(chgFrEn) < options["tol"]: cyc_to_go -= 1 else: cyc_to_go = options["cyc_to_go_under_th"] if options["verbose"]: print("Cycle " + str(it+1) + ", free energy = " + str(fe_it) + \ ", relative change = " + str(chgFrEn)) if cyc_to_go == 0: if options["verbose"]: print("Reached early convergence") break else: if options["verbose"]: print("Cycle " + str(it+1) + " free energy = " + str(fe_it)) # M-step if options["updateDyn"]: self.__update_dynamics(Gamma,Xi,indices) if options["updateObs"]: self.__update_obsdist(X,Y,Gamma) K_active = np.sum(self.active_states) if options["verbose"]: end = time.time() elapsed = end - start print("Finished training in " + str(round(elapsed,2)) + \ "s : active states = " + str(K_active)) return Gamma,Xi,fe