Source code for tfrStats.tfr_spw_stats_minmax

import numpy as np
from tqdm.auto import tqdm
import scipy.io as sio
from numpy import inf


[docs]def tfr_spw_stats_minmax(paths, cond, svar, fband, n_perm): """ Permutation based TFR statistical asessement based on min-max Function to compute the truncated min-max distribution keeping the permutations for each condition and recording site. It captures the variations at the extreme of the null ditribution. In the min-max approach the minimum and maximum values at each permutations are used. .. todo:: * Implement onset shifting to account for whole trial (in the current example we pool values from the 400-1000 ms time window). * Implement compatibilityu with Syncopy (for now it relies on ftPool_... .mat containing the TFRs computed in Fieldtrip). :param string input_path: path to the .npz file. :param in condition: condition index (i.e. 0, 1, 2, 3). :param int svar: spectral power or GPR (not implemented here). :param int fband: frequency band index (i.e. low, high, higher). :param int obs: [nullType, percentile], two integeres: 0 for min-max, 1 for whole, 0-100 percentile. :param int correction: 1 for p-values, 2 for cluster corrected p-values. :param int cluster_size: cluster size. :param float alpha: alpha. :return: empirical time frequency representation n_conds x n_sites x n_freqs x n_time (i.e. 30, 12, 16, 113). :return: null time frequency representation (i.e. 30, 12, 16, 113 or 1000, 30, 12, 16, 2). :rtype: float @author: Nicolas Gravel, 19.09.2023 """ tps = [57,113,141,140] fps = [19,16,11,1] fbands = ['low','high','higher'] blocks = ['grat', 'nat'] svars = ['spw', 'gpr'] methods = ['hanning', 'wavelet','wavelet'] svar = 0 # Conditions if cond == 0: block = 0 n_sess = 10 # ============================================================================= else: block = 1 n_sess = 11 # ============================================================================= # How the indices are organized within the dataset # ============================================================================= channels = [i for i in range(12*n_sess)] # Total channels site_idx = np.zeros((12,n_sess)).astype(np.uint) # Index to sites for n in range(12): # for time site = [x for x in channels if x%12 == n] site_idx[n,:] = site print('site indices :') print(site_idx) if fband == 0: bs_t0 = -700 bs_t1 = -100 elif fband == 1: bs_t0 = -700 bs_t1 = -100 elif fband == 2: bs_t0 = -700 bs_t1 = -100 # ============================================================================= # Empirical TFR # ============================================================================= fname = str(paths[0] + 'ftPool_' + blocks[block] + '_' + fbands[fband] + '_' + methods[fband] + '.mat') print(fname) mat = sio.loadmat(fname) dataPool = mat.get(str('dataLump_' + svars[svar])) print(dataPool.shape) time = np.linspace(start = -800, stop = 2000, num = tps[fband]) b0 = np.searchsorted(time,bs_t0,side='left', sorter=None) bf = np.searchsorted(time,bs_t1,side='left', sorter=None) tfr_ = np.zeros((dataPool.shape[0],dataPool.shape[1],12,dataPool.shape[3],dataPool.shape[4])) for i_cond in range(dataPool.shape[0]): for i_rep in range(dataPool.shape[1]): for i_depth in range(12): for i_freq in range(dataPool.shape[3]): X = dataPool[i_cond,i_rep,site_idx[i_depth,:],i_freq,:] X = np.nanmean(X,axis=0) # average sessions baseline = dataPool[:,:,site_idx[i_depth,:],i_freq,b0:bf] baseline = np.nanmean(baseline,axis=2) # average time X_bs = np.nanmean(baseline.flatten()) tfr_[i_cond,i_rep,i_depth,i_freq,:] = ((X-X_bs)/X_bs)*100 tfr_[tfr_ == -inf] = np.nan tfr_[tfr_ == inf] = np.nan tfr_emp = np.nanmean(tfr_,axis=1) # repetition average # ============================================================================= # Null TFR # ============================================================================= time = np.linspace(start = -800, stop = 2000, num = tps[fband]) b0 = np.searchsorted(time,bs_t0,side='right', sorter=None) bf = np.searchsorted(time,bs_t1,side='right', sorter=None) t0 = np.searchsorted(time,400,side='left', sorter=None) tf = np.searchsorted(time,1000,side='left', sorter=None) win = time[t0:tf] X_h0 = np.zeros((10,dataPool.shape[3],dataPool.shape[4])) tfr_null = np.zeros((n_perm,dataPool.shape[0],12,2)) msg = (str(cond) + ' - ' + str(blocks[block]) + ' - ' + str(fbands[fband])) choices = np.random.random(n_perm) > 0.5 for i_perm in tqdm(range(n_perm),desc=msg, position=0): for i_cond in range(dataPool.shape[1]): for i_depth in range(12): for i_freq in range(dataPool.shape[3]): for i_rep in range(dataPool.shape[1]): if choices[i_perm] == True: X = dataPool[:,:,site_idx[i_depth,:],i_freq,t0:tf] X = np.nanmean(X,axis=3) # average time X = np.nanmean(X.flatten()) XX = np.tile(X,[1,win.shape[0]]) X_bs = dataPool[i_cond,i_rep,site_idx[i_depth,:],i_freq,b0:bf] XX_bs = np.nanmean(X_bs,axis=0) # average sessions X_h0[i_rep,i_freq,t0:tf] = ((XX_bs-XX)/XX)*100 elif choices[i_perm] == False: X = dataPool[i_cond,i_rep,site_idx[i_depth,:],i_freq,:] X = np.nanmean(X,axis=0) # average sessions baseline = dataPool[:,:,site_idx[i_depth,:],i_freq,b0:bf] baseline = np.nanmean(baseline,axis=3) # average time X_bs = np.nanmean(baseline.flatten()) XX_bs = np.tile(X_bs,[1,dataPool.shape[4]]) X_h0[i_rep,i_freq,:] = ((X-XX_bs)/XX_bs)*100 X_h0[X_h0 == -inf] = np.nan X_h0[X_h0 == inf] = np.nan X = X_h0[i_depth,i_rep,i_freq,t0:tf] # pool repetitions, frequency bins (all..) and time bins (400-1000ms)) # save permutation's min-max for each condition and depth tfr_null[i_perm,i_cond,i_depth,i_freq,0] = np.nanmin(X.flatten()) tfr_null[i_perm,i_cond,i_depth,i_freq,1] = np.nanmax(X.flatten()) print(tfr_emp.shape) print(tfr_null.shape) fname = str(paths[1] + 'uvtfr_stats_' + fbands[fband] + '_' + blocks[cond] + '_' + svars[svar] + '_' + str(n_perm) + '_minmax.npz') print(fname) np.savez(fname, tfr_emp, tfr_null) return tfr_emp, tfr_null