#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Input/output functions - Gaussian Linear Hidden Markov Model
@author: Diego Vidaurre 2023
"""
import numpy as np
import scipy.special
import scipy.io
import pickle
import os
import warnings
import h5py
from zipfile import ZipFile
from tqdm import tqdm
from pathlib import Path
import requests
from . import glhmm
from . import auxiliary
[docs]
def load_files(files,I=None,do_only_indices=False):
"""
Loads data from files and returns the loaded data, indices, and individual indices for each file.
"""
X = []
Y = []
indices = []
indices_individual = []
sum_T = 0
if I is None:
I = np.arange(len(files))
elif type(I) is int:
I = np.array([I])
for ij in range(I.shape[0]):
j = I[ij]
# if type(files[j]) is tuple:
# if len(files[j][0]) > 0: # X
# if files[j][0][-4:] == '.npy':
# X.append(np.load(files[j][0]))
# elif files[j][0][-4:] == '.txt':
if files[j][-4:] == '.mat':
# depending on the matlab version used to create the data,
#scipy.io or h5py will be used to load them
try:
dat = scipy.io.loadmat(files[j])
except:
dat = h5py.File(files[j],'r')
elif files[j][-4:] == '.npz':
dat = np.load(files[j])
if not do_only_indices:
if ('X' in dat) and (not 'Y' in dat):
Y.append(np.array(dat["X"]))
else:
if 'X' in dat: X.append(np.array(dat["X"]))
Y.append(np.array(dat["Y"]))
if 'indices' in dat:
ind = np.array(dat['indices'])
elif 'T' in dat:
ind = auxiliary.make_indices_from_T(np.array(dat['T']))
else:
ind = np.zeros((1,2)).astype(int)
ind[0,0] = 0
ind[0,1] = Y[-1].shape[0]
if len(ind.shape) == 1: ind = np.expand_dims(ind,axis=0)
indices_individual.append(ind)
indices.append(ind + sum_T)
sum_T += dat["Y"].shape[0]
if not do_only_indices:
if len(X) > 0: X = np.concatenate(X)
Y = np.concatenate(Y)
indices = np.concatenate(indices)
if len(indices.shape) == 1: indices = np.expand_dims(indices,axis=0)
if len(X) == 0: X = None
return X,Y,indices,indices_individual
[docs]
def read_flattened_hmm_mat(file):
"""
Reads a MATLAB file containing hidden Markov model (HMM) parameters from the HMM-MAR toolbox,
and initializes a Gaussian linear hidden Markov model (GLHMM) using those parameters.
"""
hmm_mat = scipy.io.loadmat(file)
K = hmm_mat["K"][0][0]
covtype = hmm_mat["train"]["covtype"][0][0][0]
zeromean = hmm_mat["train"]["zeromean"][0][0][0][0]
if not zeromean: model_mean = 'state'
else: model_mean = 'no'
if "state_0_Mu_W" in hmm_mat:
if (model_mean == 'state') and (hmm_mat["state_0_Mu_W"].shape[0] == 1):
model_beta = 'no'
elif hmm_mat["state_0_Mu_W"].shape[0] == 0:
model_beta = 'no'
else:
model_beta = 'state'
else:
model_beta = 'no'
dirichlet_diag = hmm_mat["train"]["DirichletDiag"][0][0][0][0]
connectivity = hmm_mat["train"]["S"][0][0]
Pstructure = np.array(hmm_mat["train"]["Pstructure"][0][0], dtype=bool)
Pistructure = np.squeeze(np.array(hmm_mat["train"]["Pistructure"][0][0], dtype=bool))
shared_covmat = (covtype == 'shareddiag') or (covtype == 'sharedfull')
diagonal_covmat = (covtype == 'shareddiag') or (covtype == 'diag')
if "prior_Omega_Gam_rate" in hmm_mat:
prior_Omega_Gam_rate = hmm_mat["prior_Omega_Gam_rate"]
prior_Omega_Gam_shape = hmm_mat["prior_Omega_Gam_shape"][0][0]
else:
prior_Omega_Gam_rate = hmm_mat["state_0_prior_Omega_Gam_rate"]
prior_Omega_Gam_shape = hmm_mat["state_0_prior_Omega_Gam_shape"][0][0]
if diagonal_covmat: prior_Omega_Gam_rate = np.squeeze(prior_Omega_Gam_rate)
q = prior_Omega_Gam_rate.shape[0]
if "state_0_Mu_W" in hmm_mat:
p = hmm_mat["state_0_Mu_W"].shape[0]
if model_mean == 'state': p -= 1
else: p = 0
hmm = glhmm.glhmm(
K=K,
covtype=covtype,
model_mean=model_mean,
model_beta=model_beta,
dirichlet_diag=dirichlet_diag,
connectivity=connectivity,
Pstructure=Pstructure,
Pistructure=Pistructure
)
# mean
if model_mean == 'state':
hmm.mean = []
for k in range(K):
hmm.mean.append({})
Sigma_W = np.squeeze(hmm_mat["state_" + str(k) + "_S_W"])
Mu_W = np.squeeze(hmm_mat["state_" + str(k) + "_Mu_W"])
if model_beta == 'state':
if q==1: hmm.mean[k]["Mu"] = np.array(Mu_W[0])
else: hmm.mean[k]["Mu"] = Mu_W[0,:]
else:
if q==1: hmm.mean[k]["Mu"] = np.array(Mu_W)
else: hmm.mean[k]["Mu"] = Mu_W
if diagonal_covmat:
if model_beta == 'state':
if q==1: hmm.mean[k]["Sigma"] = np.array([[Sigma_W[0,0]]])
else: hmm.mean[k]["Sigma"] = np.diag(Sigma_W[:,0,0])
else:
if q==1: hmm.mean[k]["Sigma"] = np.array([[Sigma_W]])
hmm.mean[k]["Sigma"] = np.diag(Sigma_W)
else:
if q==1: np.array([[Sigma_W[0,0]]])
else: hmm.mean[k]["Sigma"] = Sigma_W[0:q,0:q]
# beta
if model_beta == 'state':
if model_mean == 'state': j0 = 1
else: j0 = 0
hmm.beta = []
for k in range(K):
hmm.beta.append({})
Sigma_W = hmm_mat["state_" + str(k) + "_S_W"]
Mu_W = hmm_mat["state_" + str(k) + "_Mu_W"]
hmm.beta[k]["Mu"] = np.zeros((p,q))
hmm.beta[k]["Mu"][:,:] = Mu_W[j0:,:]
if diagonal_covmat:
hmm.beta[k]["Sigma"] = np.zeros((p,p,q))
if q==1:
hmm.beta[k]["Sigma"][:,:,0] = Sigma_W[j0:,j0:]
else:
for j in range(q):
hmm.beta[k]["Sigma"][:,:,j] = Sigma_W[j,j0:,j0:]
else:
hmm.beta[k]["Sigma"] = Sigma_W[(j0*q):,(j0*q):]
hmm._glhmm__init_priors_sub(prior_Omega_Gam_rate,prior_Omega_Gam_shape,p,q)
hmm._glhmm__update_priors()
# covmatrix
hmm.Sigma = []
if diagonal_covmat and shared_covmat:
hmm.Sigma.append({})
hmm.Sigma[0]["rate"] = np.zeros(q)
hmm.Sigma[0]["rate"][:] = hmm_mat["Omega_Gam_rate"]
hmm.Sigma[0]["shape"] = hmm_mat["Omega_Gam_shape"][0][0]
elif diagonal_covmat and not shared_covmat:
for k in range(K):
hmm.Sigma.append({})
hmm.Sigma[k]["rate"] = np.zeros(q)
hmm.Sigma[k]["rate"][:] = hmm_mat["state_" + str(k) + "_Omega_Gam_rate"]
hmm.Sigma[k]["shape"] = hmm_mat["state_" + str(k) + "_Omega_Gam_shape"][0][0]
elif not diagonal_covmat and shared_covmat:
hmm.Sigma.append({})
hmm.Sigma[0]["rate"] = hmm_mat["Omega_Gam_rate"]
hmm.Sigma[0]["irate"] = hmm_mat["Omega_Gam_irate"]
hmm.Sigma[0]["shape"] = hmm_mat["Omega_Gam_shape"][0][0]
else: # not diagonal_covmat and not shared_covmat
for k in range(K):
hmm.Sigma.append({})
hmm.Sigma[k]["rate"] = hmm_mat["state_" + str(k) + "_Omega_Gam_rate"]
hmm.Sigma[k]["irate"] = hmm_mat["state_" + str(k) + "_Omega_Gam_irate"]
hmm.Sigma[k]["shape"] = hmm_mat["state_" + str(k) + "_Omega_Gam_shape"][0][0]
#hmm.init_dynamics()
hmm.P = hmm_mat["P"]
hmm.Pi = np.squeeze(hmm_mat["Pi"])
hmm.Dir2d_alpha = hmm_mat["Dir2d_alpha"]
hmm.Dir_alpha = np.squeeze(hmm_mat["Dir_alpha"])
hmm.trained = True
return hmm
[docs]
def save_hmm(hmm, filename, directory=None):
"""
Save a glhmm object in the specified directory with the given filename.
Parameters:
-----------
hmm (object)
The glhmm object to be saved.
filename (str)
The name of the file to which the object will be saved.
directory (str, optional), default=None:
The directory where the file will be saved. If None, saves in the current working directory.
"""
# Combine the directory path and filename
if directory:
# Ensure the directory exists, create it if not
if not os.path.exists(directory):
print(f"Created a folder here: {directory}")
os.makedirs(directory)
filepath = os.path.join(directory, filename)
else:
filepath = filename
# Save the glhmm object to the specified file
with open(filepath, 'wb') as outp: # Overwrites any existing file.
pickle.dump(hmm, outp, pickle.HIGHEST_PROTOCOL)
print(f"{filename} saved to: {filepath}") if directory else print(f"{filename} saved")
[docs]
def load_hmm(filename, directory=None):
"""
Load a glhmm object from the specified file.
Parameters:
-----------
filename (str):
Name of the file containing the glhmm object.
directory (str, optional), default=None:
Directory where the file is located. If None, searches in the current working directory.
Returns:
--------
glhmm : object
Loaded glhmm object.
"""
# Combine the directory path and filename
if directory:
filepath = os.path.join(directory, filename)
else:
filepath = filename
# Check if the directory exists
if directory and not os.path.exists(directory):
warnings.warn(f"The specified directory '{directory}' does not exist.")
# Load the glhmm object from the specified file
with open(filepath, 'rb') as inp:
hmm = pickle.load(inp)
return hmm
[docs]
def save_statistics(data_dict, filename='statistics', directory=None, format='npy'):
"""
Save statistics data to a file in the specified directory with optional format (npy or npz).
Parameters
----------
data_dict (dict):
Dictionary containing statistics data to be saved.
filename (str, optional), default='statistics':
Name of the file.
directory (str, optional), default=None:
Directory path where the file will be saved (default is the current working directory).
format (str, optional), default='npy':
Serialization format ('npy' or 'npz').
"""
# Construct the full file path
if directory:
# Ensure the directory exists, create it if not
if not os.path.exists(directory):
print(f"Created a folder here: {directory}")
os.makedirs(directory)
filepath = os.path.join(directory, f'{filename}.{format}')
else:
filepath = f'{filename}.{format}'
# Save the dictionary to the file using the specified format
if format == 'npy':
np.save(filepath, data_dict)
elif format == 'npz':
np.savez(filepath, **data_dict)
else:
raise ValueError("Invalid format. Use 'npy' or 'npz'.")
print(f"{filename}.{format} saved to: {filepath}") if directory else print(f"{filename}.{format} saved")
[docs]
def load_statistics(filename, directory=None):
"""
Load statistics data from a file.
Parameters
----------
filename : str
The name of the file containing the saved statistics data, with or without extension.
load_directory (str, optional), default=None:
The directory path where the file is located (default is the current working directory).
Returns
-------
data_dict : dict
The dictionary containing the loaded statistics data.
"""
# Set default directory to current working directory if not provided
directory = directory or os.getcwd()
# Construct the full file path
file_path = os.path.join(directory, filename)
if not os.path.exists(file_path):
# If the file with the given name does not exist, try adding '.npy' and '.npz' extensions
file_path_npy = file_path + '.npy'
file_path_npz = file_path + '.npz'
if not (os.path.exists(file_path_npy) or os.path.exists(file_path_npz)):
raise FileNotFoundError(f"File not found: {filename} or {filename}.npy or {filename}.npz")
try:
if os.path.exists(file_path):
# If the file exists with the given name, use it
# The .item() method extracts the single item from the loaded data.
data_dict = np.load(file_path, allow_pickle=True).item()
elif os.path.exists(file_path_npy):
data_dict = np.load(file_path_npy, allow_pickle=True).item()
elif os.path.exists(file_path_npz):
loaded_data = np.load(file_path_npz, allow_pickle=True)
data_dict = {key: loaded_data[key] for key in loaded_data.files}
except Exception as e:
raise ValueError(f"Error loading data from {filename}: {e}")
return data_dict
[docs]
def download_file_with_progress_bar(url: str, dest_path: Path):
"""
Download a file with a progress bar.
Parameters
----------
url : str
URL of the file to download.
dest_path : Path
Path to save the downloaded file.
"""
response = requests.get(url, stream=True)
response.raise_for_status() # Raise an error for bad responses
total_size = int(response.headers.get('content-length', 0)) # Get the file size from headers
block_size = 1024 # 1 Kilobyte per block
with open(dest_path, 'wb') as file, tqdm(
desc=f"Downloading {dest_path.name}",
total=total_size,
unit='B',
unit_scale=True,
unit_divisor=1024,
) as progress:
for data in response.iter_content(block_size):
file.write(data)
progress.update(len(data))
[docs]
def prepare_data_directory(PATH_DATA: Path, zenodo_url: str = "https://zenodo.org/record/14756003/files/data.zip"):
"""
Ensure the data directory is prepared. If the specified directory does not exist,
download and extract the data from Zenodo.
Parameters
----------
data_dir : Path
Path to the specific data directory (e.g., 'data/Procedure_1').
zenodo_url : str, optional, default='https://zenodo.org/record/14756003/files/data.zip'
URL of the Zenodo zip file containing the data.
Returns
-------
Path
Path to the specific data directory where the data is stored.
"""
# Check if the specific data directory exists
if PATH_DATA.exists():
return PATH_DATA
# Define the parent directory for downloading and extracting the data
PATH_PARENT = PATH_DATA.parent.parent # that is the folder of the working directory
# Define the zip file name in the parent directory
zip_path = PATH_PARENT / "data.zip"
# Download the data zip file from Zenodo with a progress bar
print(f"Downloading data from {zenodo_url}...")
download_file_with_progress_bar(zenodo_url, zip_path)
# Extract the zip file
print(f"Extracting data to {PATH_PARENT}...")
with ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(PATH_PARENT)
print(f"Data extracted successfully.")
# Remove the zip file after extraction
zip_path.unlink()
print(f"Removed temporary zip file: {zip_path}.")
return PATH_DATA