diff --git a/nitools/cifti.py b/nitools/cifti.py index 7d82c86..aefcb18 100644 --- a/nitools/cifti.py +++ b/nitools/cifti.py @@ -26,7 +26,7 @@ def make_label_cifti(data, bm_axis, label_RGBA (list): List of rgba vectors for labels Returns: - cifti (GiftiImage): Label gifti image + cifti (GiftiImage): Label gifti image """ if data.ndim == 1: # reshape to (1, num_vertices) @@ -213,7 +213,7 @@ def split_cifti_to_giftis(cifti_img, type = "label", column_names = None): )) return gii -def volume_from_cifti(cifti, struct_names=[]): +def volume_from_cifti(cifti, struct_names=None): """ Gets the 4D nifti object containing the data for all subcortical (volume-based) structures @@ -222,7 +222,7 @@ def volume_from_cifti(cifti, struct_names=[]): cifti object containing the data struct_names (list or None): List of structure names that are included - defaults to None + defaults to None (all) Returns: nii_vol(niftiImage): nifti object containing the data @@ -232,20 +232,22 @@ def volume_from_cifti(cifti, struct_names=[]): # get the data array with all the time points, all the structures d_array = cifti.get_fdata(dtype=np.float32) - struct_names = [nb.cifti2.BrainModelAxis.to_cifti_brain_structure_name(n) for n in struct_names] + if struct_names is None: + struct_names = [a for a,_,_ in bmf.iter_structures()] + else: + struct_names = [nb.cifti2.BrainModelAxis.to_cifti_brain_structure_name(n) for n in struct_names] # initialize a matrix representing 4D data (x, y, z, time_point) vol = np.zeros(bmf.volume_shape + (d_array.shape[0],)) for idx, (nam,slc,bm) in enumerate(bmf.iter_structures()): - if (any(s in nam for s in struct_names)): + if (nam in struct_names): ijk = bm.voxel bm_data = d_array[:, slc] i = (ijk[:,0] > -1) # fill in data vol[ijk[i, 0], ijk[i, 1], ijk[i, 2], :]=bm_data[:,i].T - # save as nii nii_vol_4d = nb.Nifti1Image(vol,bmf.affine) return nii_vol_4d diff --git a/nitools/gifti.py b/nitools/gifti.py index 165ec6e..b377751 100644 --- a/nitools/gifti.py +++ b/nitools/gifti.py @@ -296,8 +296,9 @@ def get_gifti_colortable(gifti,ignore_zero=False): labels = labels[1:] cmap = mpl.colors.LinearSegmentedColormap.from_list('mylist', rgba, N=len(rgba)) - mpl.cm.unregister_cmap("mycolormap") - mpl.cm.register_cmap("mycolormap", cmap) + # Removed - incomplatible with newer versions of matplotlib + # mpl.cm.unregister_cmap("mycolormap") + # mpl.cm.register_cmap("mycolormap", cmap) return rgba, cmap diff --git a/nitools/spm.py b/nitools/spm.py index 549a7a7..63f0715 100644 --- a/nitools/spm.py +++ b/nitools/spm.py @@ -12,8 +12,12 @@ from __future__ import annotations from os.path import normpath, dirname import numpy as np +import nibabel as nb import nitools as nt from scipy.io import loadmat +import pandas as pd +from scipy.stats import gamma +from scipy.special import gammaln class SpmGlm: @@ -35,7 +39,18 @@ def get_info_from_spm_mat(self): Args: spm_mat_path (str): _description_ """ - SPM = loadmat(f"{self.path}/SPM.mat", simplify_cells=True)['SPM'] + try: + # SPM = loadmat(f"{self.path}/SPM.mat", simplify_cells=True)['SPM'] + SPM = loadmat(f"{self.path}/SPM.mat", simplify_cells=True)['SPM'] + spm_file_loaded_with_scipyio = True + except Exception as e: + print(e) + print( + f"Error loading SPM.mat file. The file was saved as mat-file version 7.3 (see https://www.mathworks.com/help/matlab/import_export/mat-file-versions.html). Try loading the mat-file with Matlab, and saving it as mat-file version 7.0 intead. Use this command: ") + print(f"cp {self.path}/SPM.mat {self.path}/SPM.mat.backup") + print( + f"matlab -nodesktop -nosplash -r \"load('{self.path}/SPM.mat'); save('{self.path}/SPM.mat', '-struct', 'SPM', '-v7'); exit\"") + # Get basic information from SPM.mat self.nscans = SPM['nscan'] self.nruns = len(self.nscans) @@ -45,7 +60,7 @@ def get_info_from_spm_mat(self): self.run_number = [] # Extract run number and condition name from SPM names for reg_name in SPM['xX']['name']: - s=reg_name.split(' ') + s = reg_name.split(' ') self.run_number.append(int(s[0][3:-1])) self.beta_names.append(s[1]) self.run_number = np.array(self.run_number) @@ -55,10 +70,17 @@ def get_info_from_spm_mat(self): # Get the necesssary matrices to reestimate the GLM for getting the residuals self.filter_matrices = [k['X0'] for k in SPM['xX']['K']] self.reg_of_interest = SPM['xX']['iC'] - self.design_matrix = SPM['xX']['xKXs']['X'] # Filtered and whitened design matrix - self.eff_df = SPM['xX']['erdf'] # Effective degrees of freedom - self.weight = SPM['xX']['W'] # Weight matrix for whitening - self.pinvX = SPM['xX']['pKX'] # Pseudo-inverse of (filtered and weighted) design matrix + self.design_matrix = SPM['xX']['xKXs']['X'] # Filtered and whitened design matrix + self.eff_df = SPM['xX']['erdf'] # Effective degrees of freedom + self.weight = SPM['xX']['W'] # Weight matrix for whitening + self.pinvX = SPM['xX']['pKX'] # Pseudo-inverse of (filtered and weighted) design matrix + self.X = SPM["xX"]["X"] + self.bf = SPM['xBF']['bf'] + self.Volterra = SPM['xBF']['Volterra'] + self.Sess = SPM['Sess'] + self.T = SPM["xBF"]["T"] + self.T0 = SPM["xBF"]["T0"] + def relocate_file(self, fpath: str) -> str: """SPM file entries to current project directory and OS. @@ -76,10 +98,12 @@ def relocate_file(self, fpath: str) -> str: """ norm_fpath = fpath.replace('\\', '/') base_path = dirname(self.path).replace('\\', '/') - c = norm_fpath.find('func') - return base_path + '/' + norm_fpath[c:] + c = norm_fpath.find('imaging_data') + g = base_path.find('glm') + return base_path[:g] + norm_fpath[c:] + # return norm_fpath - def get_betas(self,mask): + def get_betas(self, mask): """ Samples the beta images of an estimated SPM GLM at the mask locations also returns the ResMS values, and the obseration descriptors (run and condition) name @@ -95,19 +119,26 @@ def get_betas(self,mask): obs_descriptors (dict): with lists reg_name and run_number (N long) """ - coords = nt.get_mask_coords(mask) + if isinstance(mask, str): + mask = nb.load(mask) + if isinstance(mask, nb.Nifti1Image): + coords = nt.get_mask_coords(mask) + elif isinstance(mask, np.ndarray) and (mask.shape[0] == 3): + coords = mask + else: + raise ValueError('Mask should be a 3xP array or coordinates, a nifti1image, or nifti file name') # Generate the list of relevant beta images: - indx = self.reg_of_interest-1 + indx = self.reg_of_interest - 1 beta_files = [f'{self.path}/{self.beta_files[i]}' for i in indx] # Get the data from beta and ResMS files rms_file = [f'{self.path}/ResMS.nii'] - data = nt.sample_images(beta_files + rms_file,coords,use_dataobj=False) + data = nt.sample_images(beta_files + rms_file, coords, use_dataobj=False) # Return the data and the observation descriptors info = {'reg_name': self.beta_names[indx], 'run_number': self.run_number[indx]} - return data[:-1,:], data[-1,:], info + return data[:-1, :], data[-1, :], info - def get_residuals(self,mask): + def get_residuals(self, mask): """ Collects 3d images of a range of GLM residuals (typical SPM GLM results) and corresponding metadata @@ -117,22 +148,30 @@ def get_residuals(self,mask): res_range (range): range of to be saved residual images per run """ # Sample the relevant time series data - coords = nt.get_mask_coords(mask) - data = nt.sample_images(self.rawdata_files,coords,use_dataobj=True) + if isinstance(mask, str): + mask = nb.load(mask) + if isinstance(mask, nb.Nifti1Image): + coords = nt.get_mask_coords(mask) + elif isinstance(mask, np.ndarray) and (mask.shape[0] == 3): + coords = mask + else: + raise ValueError('Mask should be a 3xP array or coordinates, a nifti1image, or nifti file name') + + data = nt.sample_images(self.rawdata_files, coords, use_dataobj=True) # Filter and temporal pre-whiten the data - fdata= self.spm_filter(self.weight @ data) # spm_filter + fdata = self.spm_filter(self.weight @ data) # spm_filter - # Estimate the beta coefficients abd residuals + # Estimate the beta coefficients and residuals beta = self.pinvX @ fdata residuals = fdata - self.design_matrix @ beta # Return the regressors of interest - indx = self.reg_of_interest-1 + indx = self.reg_of_interest - 1 info = {'reg_name': self.beta_names[indx], 'run_number': self.run_number[indx]} - return residuals, beta[indx,:], info + return residuals, beta[indx, :], info - def spm_filter(self,data): + def spm_filter(self, data): """ Does high pass-filtering and temporal weighting of the data (indentical to spm_filter) @@ -142,10 +181,499 @@ def spm_filter(self,data): data (ndarray): 2d array of time series data (TxP) """ scan_bounds = self.nscans.cumsum() - scan_bounds = np.insert(scan_bounds,0,0) + scan_bounds = np.insert(scan_bounds, 0, 0) fdata = data.copy() for i in range(self.nruns): - Y = fdata[scan_bounds[i]:scan_bounds[i+1],:]; - Y = Y - self.filter_matrices[i] @ (self.filter_matrices[i].T @ Y) + Y = fdata[scan_bounds[i]:scan_bounds[i + 1], :] + if self.filter_matrices[i].size > 0: + # Only apply with filter matrices are not empty + Y = Y - self.filter_matrices[i] @ (self.filter_matrices[i].T @ Y) return fdata + + def rerun_glm(self, data): + """ + Re-estimate the GLM (without hyperparameter estimation) on new data + + Args: + data (ndarray): 2d array of time series data (TxP) + Returns: + beta (ndarray): 2d array of beta coefficients (PxQ) + info (dict): with lists reg_name and run_number (Q long) + data_filt (ndarray): 2d array of filtered time series data (TxP) + data_hat (ndarray): 2d array of predicted time series data (TxP) - + This is predicted only using regressors of interest (without the constant or other nuisance regressors) + data_adj (ndarray): 2d array of adjusted time series data (TxP) + This is filtered timeseries with constants and other nuisance regressors substrated out + residuals (ndarray): 2d array of residuals (TxP) + """ + + # Filter and temporal pre-whiten the data + data_filt = self.spm_filter(self.weight @ data) # spm_filter + + # Estimate the beta coefficients and residuals + beta = self.pinvX @ data_filt + residuals = data_filt - self.design_matrix @ beta + # Get estimated (predicted) timeseries (regressors of interest without the constant) + data_hat = self.design_matrix[:, self.reg_of_interest[:-1]] @ beta[self.reg_of_interest[:-1], :] + + # Get adjusted timeseries + data_adj = data_hat + residuals + + # Return the regressors of interest (apart from the constant) + indx = self.reg_of_interest - 1 + info = pd.DataFrame({'reg_name': self.beta_names[indx], 'run_number': self.run_number[indx]}) + + return beta[indx, :], info, data_filt, data_hat, data_adj, residuals + + def convolve_glm(self, bf): + """ + Re-convolves the SPM structure with a new basis function. + + Args: + bf (ndarray): new basis function (output of spm_hrf): + + """ + + self.bf = bf + Xx = np.array([]) + Xb = np.array([]) + # iCs, iCc, iCb, iN = [], [], [], [] + + # if self.bf.ndim == 2: + # num_basis = self.bf.shape[1] + # else: + # num_basis = 1 + num_scan = self.nscans + + for s in range(len(self.Sess)): + # Number of scans for this session + k = num_scan[s] + + # Get stimulus functions U + U = self.Sess[s]["U"] + num_cond = len(U) + + # Convolve stimulus functions with basis functions + X, Xn, Fc = _spm_Volterra(U, self.bf, self.Volterra) + + # Resample regressors at acquisition times (32 bin offset) + X = X[np.arange(k) * self.T + self.T0 + 32, :] + + # Orthogonalize within trial type + for i in range(len(Fc)): + X[:, Fc[i]["i"][0] - 1] = _spm_orth(X[:, Fc[i]["i"][0] - 1]) + + # Get user-specified regressors + C = self.Sess[s]["C"]["C"] + if C.size > 0: + num_reg = C.shape[1] + X = np.hstack([X, _spm_detrend(C)]) + else: + num_reg = 0 + + # Store session info + if Xx.size == 0: + Xx = X + Xb = np.ones((k, 1)) + else: + Xx = _blkdiag([Xx, X]) + Xb = _blkdiag([Xb, np.ones((k, 1))]) + + # iCs.extend([s + 1] * (X.shape[1] + num_reg)) + # iCc.extend(np.kron(np.arange(1, num_cond + 1), np.ones(num_basis, dtype=int)).tolist() + [0] * num_reg) + # iCb.extend(np.kron(np.ones(num_cond, dtype=int), np.arange(1, num_basis + 1)).tolist() + [0] * num_reg) + # iN.extend([0] * (num_cond * num_basis) + [1] * num_reg) + + # Finalize design matrix + self.X = np.hstack([Xx, Xb]) + + # Compute weighted and filtered design matrix + if hasattr(self, "weight"): + self.design_matrix = self.spm_filter(self.weight @ self.X) + else: + self.design_matrix = self.spm_filter(self.X) + + self.design_matrix = self.design_matrix.astype(float) + + # Compute pseudoinverse of weighted and filtered design matrix + self.pinvX = np.linalg.inv(self.design_matrix.T @ self.design_matrix) @ self.design_matrix.T + + # # Indices for regressors + # SPM["xX"]["iC"] = list(range(Xx.shape[1])) + # SPM["xX"]["iB"] = list(range(Xb.shape[1])) + Xx.shape[1] + # SPM["xX"]["iCs"] = iCs + list(range(1, num_scan + 1)) + # SPM["xX"]["iCc"] = iCc + [0] * num_scan + # SPM["xX"]["iCb"] = iCb + [0] * num_scan + # SPM["xX"]["iN"] = iN + [2] * num_scan + + def update_hrf_params(self, P, mask_img): + """ + Updates the HRF parameters of the SPM structure and return the timeseries calculated with the new HRF parameters. + Args: + P (ndarray): HRF parameters (SPM12 default: [6, 16, 1, 1, 6, 0, 32]) + mask_img (str or Nifti1Image): mask image (e.g., ROI mask) + Returns: + data_filt (ndarray): 2d array of filtered time series data (TxP) + data_hat (ndarray): 2d array of predicted time series data (TxP) - + This is predicted only using regressors of interest (without the constant or other nuisance regressors) + data_adj (ndarray): 2d array of adjusted time series data (TxP) + This is filtered timeseries with constants and other nuisance regressors substrated out + residuals (ndarray): 2d array of residuals (TxP) + """ + if isinstance(mask_img, str): + mask_img = nb.load(mask_img) + if isinstance(mask_img, nb.Nifti1Image): + coords = nt.get_mask_coords(mask_img) + else: + raise TypeError("mask_img must be a nifti1 image or a string") + + hrf, p = spm_hrf(1, P) + self.convolve_glm(hrf) + + # get raw time series in roi + data = nt.sample_images(self.rawdata_files, coords) + + # rerun glm + _, info, data_filt, data_hat, data_adj, residuals = self.rerun_glm(data) + + return data_filt, data_hat, data_adj, residuals + + +def _cut(X, pre, at, post, padding='last'): + """ + Cut segment from signal X. + + Parameters: + X : np.ndarray + Input data (rows: time samples, cols: cols) + pre : int + N samples before `at` + at : int or None + Sample index at which to cut. If None or NaN, returns NaNs. + post : int + N samples after `at` + padding : str, optional + Padding strategy when time is not available ('nan', 'zero', 'last'). Default is 'last'. + + Returns: + np.ndarray + The extracted segment of the trajectory. + """ + if at is None or np.isnan(at): + return np.full((pre + post + 1, X.shape[1]), np.nan) + + rows, cols = X.shape + start, end = max(0, at - pre), min(at + post + 1, rows) + y0 = X[start:end, :] + + pad_before, pad_after = max(0, pre - (at - start)), max(0, post - (rows - at - 1)) + + if padding == 'nan': + y = np.pad(y0, ((pad_before, pad_after),(0, 0), ), mode='constant', constant_values=np.nan) + elif padding == 'zero': + y = np.pad(y0, ((pad_before, pad_after), (0, 0), ), mode='constant', constant_values=0) + elif padding == 'last': + y = np.pad(y0, ( (pad_before, pad_after), (0, 0), ), mode='edge') + else: + raise ValueError("Unknown padding option. Use: 'nan', 'last', 'zero'") + + return y + + +def cut(X, pre, at, post, padding='last'): + """ + Takes a vector of sample locations (at) and returns the signal (X) aligned and averaged around those locations + Args: + X (np.array): + Signal to cut. + pre (int): + N samples before cut + at (np.array or list): + Cut locations in samples + post (int): + N samples after cut + padding (str, optional): + Padding strategy when time is not available ('nan', 'zero', 'last'). Default is 'last'. + stats (str): + Sufficient statistic used for the area + mean: Mean of the voxels + whitemean: Mean of the voxel, spatially weighted by noise + axis (int): + Axis along which the sufficient statistic is applied. If X.shape = (n_voxels, n_samples) then axis=0 + ResMS (np.array): + Residual mean squares of shape (n_voxels,) only used if stats='prewhiten' for spatial prewhitening + + Returns: + + y (np.array): + Signal cut around locations at. + + """ + + y_tmp = [] + for a in at: + y_tmp.append(_cut(X, pre, a, post, padding)) + + return np.array(y_tmp) + + +def _blkdiag(matrices): + """ + Constructs a block diagonal matrix from multiple input matrices. + + Parameters: + matrices : list of ndarray + Matrices to be placed on the block diagonal. + + Returns: + y : ndarray + Block diagonal concatenated matrix. + """ + if len(matrices) == 0: + return np.array([]) + + # Determine final shape + rows = np.array([m.shape[0] for m in matrices]).sum() + cols = np.array([m.shape[1] for m in matrices]).sum() + + # Preallocate result matrix with zeros + y = np.zeros((rows, cols), dtype=matrices[0].dtype) + + # Fill the block diagonal + r_offset, c_offset = 0, 0 + for m in matrices: + r, c = m.shape + y[r_offset:r_offset + r, c_offset:c_offset + c] = m + r_offset += r + c_offset += c + + return y + + +def _spm_detrend(x, p=0): + """ + Polynomial detrending over columns. + + Parameters: + x : ndarray or list of ndarrays + Data matrix (or list of matrices). + p : int, optional + Order of polynomial (default: 0, i.e., mean subtraction). + + Returns: + y : ndarray or list of ndarrays + Detrended data matrix. + """ + + # Handle case where x is a list (equivalent to cell arrays in MATLAB) + if isinstance(x, list): + return [spm_detrend(xi, p) for xi in x] + + # Check dimensions + m, n = x.shape + if m == 0 or n == 0: + return np.array([]) + + # Mean subtraction (order 0) + if p == 0: + return x - np.mean(x, axis=0, keepdims=True) + + # Polynomial adjustment + G = np.zeros((m, p + 1)) + for i in range(p + 1): + G[:, i] = (np.arange(1, m + 1) ** i) + + y = x - G @ np.linalg.pinv(G) @ x + return y + + +def _spm_orth(X, OPT='pad'): + """ + Recursive Gram-Schmidt orthogonalisation of basis functions + + Parameters: + X : ndarray + Input matrix + OPT : str, optional + 'norm' for Euclidean normalization + 'pad' for zero padding of null space (default) + + Returns: + X : ndarray + Orthogonalized matrix + """ + # Turn off warnings (equivalent to MATLAB warning('off','all')) + np.seterr(all='ignore') + + if X.ndim==1: + X = X[:, np.newaxis] + n, m = X.shape + X = X[:, np.any(X, axis=0)] # Remove zero columns + rankX = np.linalg.matrix_rank(X) + + try: + x = X[:, [0]] + j = [0] + + for i in range(1, X.shape[1]): + D = X[:, [i]] + D = D - x @ np.linalg.pinv(x) @ D + + if np.linalg.norm(D, 1) > np.exp(-32): + x = np.hstack((x, D)) + j.append(i) + + if len(j) == rankX: + break + except: + x = np.zeros((n, 0)) + j = [] + + # Restore warnings + np.seterr(all='warn') + + # Normalization if requested + if OPT == 'pad': + X_out = np.zeros((n, m)) + X_out[:, j] = x + elif OPT == 'norm': + X_out = x / np.linalg.norm(x, axis=0, keepdims=True) + else: + X_out = x + + # drop extra dimensions if input was a vector + X_out = np.squeeze(X_out) + + return X_out + + +def _spm_Volterra(U, bf, V=1): + """ + Generalized convolution of inputs (U) with basis set (bf) + + Parameters: + U : list of dict + Input structure array containing 'u' (time series) and 'name' (labels) + bf : ndarray + Basis functions + V : int, optional + Order of Volterra expansion (default: 1) + + Returns: + X : ndarray + Design Matrix + Xname : list + Names of regressors (columns) in X + Fc : list of dict + Contains indices and names for each input + """ + X = [] + Xname = [] + Fc = [] + + if bf.ndim == 2: + num_basis = bf.shape[1] + else: + num_basis = 1 + + # First-order terms + for i, u_dict in enumerate(U): + ind = [] + ip = [] + for k in range(u_dict['u'].shape[1]): + for p in range(num_basis): + if num_basis > 1: + x = u_dict['u'].todense()[:, k] + d = np.arange(x.shape[0]) + x = np.convolve(x, bf[:, p], mode='full')[:len(d)] + else: + x = u_dict['u'].todense()[:, k] + d = np.arange(x.shape[0]) + x = np.asarray(x).flatten() + x = np.convolve(x, bf, mode='full')[:len(d)] + x = x[:, None] + X.append(x) + + Xname.append(f"{u_dict['name'][k]}*bf({p + 1})") + ind.append(len(X)) + ip.append(k + 1) + + Fc.append({'i': ind, 'name': u_dict['name'][0], 'p': ip}) + + X = np.column_stack(X) if X else np.empty((len(U[0]['u']), 0)) + + # Return if first order + if V == 1: + return X, Xname, Fc + + # Second-order terms + for i in range(len(U)): + for j in range(i, len(U)): + ind = [] + ip = [] + for p in range(bf.shape[1]): + for q in range(bf.shape[1]): + x = U[i]['u'][:, 0] + y = U[j]['u'][:, 0] + x = np.convolve(x, bf[:, p], mode='full')[:len(d)] + y = np.convolve(y, bf[:, q], mode='full')[:len(d)] + X = np.column_stack((X, x * y)) + + Xname.append(f"{U[i]['name'][0]}*bf({p + 1})x{U[j]['name'][0]}*bf({q + 1})") + ind.append(X.shape[1]) + ip.append(1) + + Fc.append({'i': ind, 'name': f"{U[i]['name'][0]}x{U[j]['name'][0]}", 'p': ip}) + + return X, Xname, Fc + + +def spm_hrf(RT, P=None, T=16): + """ + Haemodynamic response function (HRF) + + Parameters: + RT : float + Scan repeat time in seconds (TR) + P : list or ndarray, optional + Parameters of the response function (two Gamma functions), defaults to [6, 16, 1, 1, 6, 0, 32] + T : int, optional + Microtime resolution (default: 16) + + Returns: + hrf : ndarray + Hemodynamic response function + p : ndarray + Parameters of the response function + """ + # Default parameters if not provided + if P is None: + P = [6, 16, 1, 1, 6, 0, 32] + + # Ensure P has the correct length + p = np.array([6, 16, 1, 1, 6, 0, 32]) + p[:len(P)] = P + + # Microtime resolution + RT = RT / T + dt = RT / T + t = np.linspace(0, p[6], np.ceil(1 + p[6] / dt).astype(int)) - p[5] + + peak = (t ** p[0]) * np.exp(-t / p[2]) + peak /= np.max(peak) # Normalize + + undershoot = (t ** p[1]) * np.exp(-t / p[3]) + undershoot /= np.max(undershoot) # Normalize + + hrf = peak - (undershoot / p[4]) + + # Downsample to TR resolution + n_samples = int(p[6] / RT) + 1 + indices = np.round(np.linspace(0, T * n_samples, n_samples, endpoint=False)).astype(int) + hrf = hrf[indices] + + # Normalize HRF + hrf = hrf / np.sum(hrf) + + return hrf, p diff --git a/nitools/volume.py b/nitools/volume.py index 12a25d0..1d02faa 100644 --- a/nitools/volume.py +++ b/nitools/volume.py @@ -164,6 +164,7 @@ def euclidean_dist_sq(coordA,coordB): def sample_image(img,xm,ym,zm,interpolation): """ Return values after resample image + ignores NaN values Args: img (Nifti image): @@ -218,15 +219,13 @@ def sample_image(img,xm,ym,zm,interpolation): c011 = D[ir, jr+1, kr+1] c001 = D[ir, jr, kr+1] - c00 = c000*(1-id)+c100*id - c01 = c001*(1-id)+c101*id - c10 = c010*(1-id)+c110*id - c11 = c011*(1-id)+c111*id - - c0 = c00*(1-jd)+c10*jd - c1 = c01*(1-jd)+c11*jd - - value = c0*(1-kd)+c1*kd + c00 = nan_weighted_mean(c100,c000,id) + c01 = nan_weighted_mean(c101,c001,id) + c10 = nan_weighted_mean(c110,c010,id) + c11 = nan_weighted_mean(c111,c011,id) + c0 = nan_weighted_mean(c10,c00,jd) + c1 = nan_weighted_mean(c11,c01,jd) + value = nan_weighted_mean(c1,c0,kd) elif interpolation == 0: ir = np.rint(im).astype('int') jr = np.rint(jm).astype('int') @@ -245,6 +244,18 @@ def sample_image(img,xm,ym,zm,interpolation): value[invalid]=0 return value +def nan_weighted_mean(d1,d2,weight): + j = np.isnan(d1) + k = np.isnan(d2) + if j.sum()==0 & k.sum()==0: + return d1*weight + d2*(1-weight) + else: + dd1 = d1.copy() + dd2 = d2.copy() + dd1[j]=dd2[j] + dd2[k]=dd1[k] + return (dd1*weight + dd2*(1-weight)) + def check_voxel_range(img,i,j,k): """ Checks if i,j,k voxels coordinates are within an image @@ -294,17 +305,17 @@ def change_nifti_numformat(infile, """Changes the number format of a nifti file and saves it. Args: - infile (str): file name of input file - outfile (str): file name output file + infile (str): file name of input file + outfile (str): file name output file new_numformat (str): New number format. Defaults to 'uint16'. - type_cast_data (bool): If true, typecasts the data as well. + type_cast_data (bool): If true, typecasts the data as well. Note: - typecast_data changes the data format and can lead to and wrap-around of values, as the data is typecasted as well. - Do only when correcting a previous mistake in processing. + typecast_data changes the data format and can lead to and wrap-around of values, as the data is typecasted as well. + Do only when correcting a previous mistake in processing. """ A = nb.load(infile) X = A.get_fdata() - if typecast_data: + if typecast_data: X = X.astype(new_numformat) head = A.header.copy() head.set_data_dtype(new_numformat) @@ -314,20 +325,20 @@ def change_nifti_numformat(infile, def sample_images(imgs,coords,use_dataobj=True): """ Samples a list of images at a set of voxels (coordinates nearest neighbor) - 3d images are being returned as a single value per voxel. 4d images are being returned as a vector per voxel. - The function respects ths SPM-convention of specifying time slices 'img.nii,1' - Note that either all or none of the image names need to follow the 'img.nii,1' convention. + 3d images are being returned as a single value per voxel. 4d images are being returned as a vector per voxel. + The function respects ths SPM-convention of specifying time slices 'img.nii,1' + Note that either all or none of the image names need to follow the 'img.nii,1' convention. Args: imgs (list): - List of Nifti file names (SPM convention for time slices is respected) + List of Nifti file names (SPM convention for time slices is respected) coords (np-array): - 3xN matrix of coordinates of to be sampled voxels + 3xN matrix of coordinates of to be sampled voxels Returns: values (np-array): Array contains all values in the image """ n = len(imgs) - if ',' in imgs[0]: # SPM-convention of indicating time slice with ',x' notation + if ',' in imgs[0]: # SPM-convention of indicating time slice with ',x' notation fnames=[f.split(',')[0] for f in imgs] slice=np.array([f.split(',')[1] for f in imgs],dtype=int)-1 unique_fnames,img_indx = np.unique(fnames,return_inverse=True) @@ -344,7 +355,7 @@ def sample_images(imgs,coords,use_dataobj=True): img_indx.append(np.array([i],dtype=int)) slice.append(np.array([0],dtype=int)) slice = np.concatenate(slice) - img_indx = np.concatenate(img_indx) + img_indx = np.concatenate(img_indx) N = len(slice) # Load the header from all the input files voxels = coords_to_voxelidxs(coords,input_vols[0]) @@ -355,7 +366,7 @@ def sample_images(imgs,coords,use_dataobj=True): data_block = v.dataobj[min(voxels[0]):max(voxels[0])+1,min(voxels[1]):max(voxels[1])+1,min(voxels[2]):max(voxels[2])+1] D = data_block[voxels[0]-min(voxels[0]),voxels[1]-min(voxels[1]),voxels[2]-min(voxels[2])] if v.ndim==3: - data[dindx,:]=D + data[dindx,:]=D else: data[dindx,:]=D[:,slice[dindx]].T else: