Skip to content
Open
188 changes: 186 additions & 2 deletions LFPAnalysis/statistics_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import statsmodels.api as sm
import statsmodels.formula.api as smf
from tqdm import tqdm
# from joblib import Parallel, delayed
from joblib import Parallel, delayed
from multiprocessing import Pool

import patsy
Expand All @@ -15,6 +15,8 @@
import seaborn as sns
import matplotlib.pyplot as plt
from scipy import stats
from scipy.stats import ttest_1samp
from scipy.ndimage import label

import warnings
warnings.filterwarnings('ignore')
Expand Down Expand Up @@ -113,7 +115,7 @@ def permutation_regression_zscore(data, formula, n_permutations=1000, plot_res=F
return results

def time_resolved_regression_single_channel(timeseries=None, regressors=None,
win_len=100, slide_len=25,
win_len=100, slide_len=25, ts_index=None,
standardize=True, smooth=False, permute=False, sr=500):
"""
In this function, if you provide a 2D array of z-scored time-varying neural data and a sert of regressors,
Expand Down Expand Up @@ -189,8 +191,190 @@ def time_resolved_regression_single_channel(timeseries=None, regressors=None,
else:
all_res['ts'] = all_res['ts'] * (1000/sr)

if ts_index is not None:
all_res['time'] = [ts_index[int(t*(sr/1000))] for t in ts['ts'].values]

return all_res

def find_clusters(roi_df, t_col='ts', cluster_p_thr=0.05, min_cluster_size=3):
"""
Computes the one-sample t-test for each unique ts in the ROI DataFrame,
identifies clusters of significant ts values, and sums the t-statistics
within each cluster.

Args:
roi_df (pd.DataFrame): DataFrame with 'Original_Estimate' and 'ts' columns.
t_col (str): Name of the column containing the time samples (default: 'ts')
cluster_p_thr (float): Threshold for statistical significance (default: 0.05)
min_cluster_size (int): Minimum number of consecutive significant ts values (e.g., `3`)

Returns:
pd.DataFrame: DataFrame with summed t-statistics for each cluster.
"""
# sort df by t_col
roi_df = roi_df.sort_values(t_col)

# One-sample t-test at each ts
roi_ttest = roi_df.groupby(t_col).apply(
lambda x: pd.Series(
{
'tstat': ttest_1samp(x['Original_Estimate'], 0)[0],
'pval': ttest_1samp(x['Original_Estimate'], 0)[1]
}
)
).reset_index()

# Identify significant clusters (p < p_thr)
roi_ttest['mask'] = roi_ttest['pval'] < cluster_p_thr

# Identify and label non-overlapping clusters of 1s
clusters, num_clusters = label(roi_ttest['mask'])

# Extract cluster indices and labels
cluster_info = [(i+1, np.where(clusters == i+1)[0].tolist()) for i in range(num_clusters)]

# Initialize the 'cluster' column with NaN values
roi_ttest['cluster'] = np.nan

# Assign cluster labels to the corresponding indices
for cluster_id, indices in cluster_info:
roi_ttest.loc[indices, 'cluster'] = cluster_id

# Sum t-stats for each cluster and pick the largest cluster
cluster_tstats = roi_ttest.groupby('cluster').apply(
lambda x: pd.Series(
{
'sum_tstat': x['tstat'].sum(),
'abs_sum_tstat': np.abs(x['tstat']).sum(),
'start_ts': x[t_col].min(),
'end_ts': x[t_col].max(),
'cluster_size': len(x)
}
)
)

# Filter clusters by minimum size
if not cluster_tstats.empty:
if min_cluster_size:
cluster_tstats = cluster_tstats[cluster_tstats.cluster_size >= min_cluster_size]

return roi_ttest, cluster_tstats

def ts_permutation_test(roi_df, n_permutations=1000, t_col='ts', cluster_p_thr=.05, min_cluster_size=3, n_jobs=-1):
"""
Performs permutation testing by shuffling the 'ts' variable and generating a null
distribution of the summed t-statistics of the largest cluster using parallel processing.

Args:
roi_df (pd.DataFrame): DataFrame with 'Original_Estimate' and 'ts' columns.
n_permutations (int): Number of permutations to perform.
t_col (str): Name of the column containing the time samples (default: 'ts').
cluster_p_thr (float): Threshold for statistical significance (default: 0.05).
min_cluster_size (int): Minimum number of consecutive significant ts values (e.g., `3`).
n_jobs (int): Number of parallel jobs (cores) to use. Default is -1 (use all available cores).

Returns:
list: Null distribution of the summed t-statistics of the largest cluster.
"""
def get_largest_cluster_stat(roi_df, t_col='ts', cluster_p_thr=0.05, min_cluster_size=3, random_state=None, verbose=False):
"""
Performs a single permutation test by shuffling the 'ts' values and computing the largest summed t-statistic.

Args:
roi_df (pd.DataFrame): DataFrame with 'Original_Estimate' and 'ts' columns.
t_col (str): Name of the column containing the time samples (default: 'ts').
cluster_p_thr (float): Threshold for statistical significance (default: 0.05).
min_cluster_size (int): Minimum number of consecutive significant ts values (e.g., `3`).
random_state (int): Random seed for reproducibility.

Returns:
float: The summed t-statistic of the largest cluster from the permuted data.
"""
if random_state:
np.random.seed(random_state)

shuffled_df = roi_df.copy()
shuffled_df[t_col] = np.random.permutation(shuffled_df[t_col])

# Compute cluster stats for the shuffled data
_, permuted_clusters = find_clusters(shuffled_df, t_col=t_col, cluster_p_thr=cluster_p_thr, min_cluster_size=min_cluster_size)

# Find the largest summed t-stat cluster in permuted data
if not permuted_clusters.empty:
largest_cluster_sum_tstat = permuted_clusters['abs_sum_tstat'].max()
else:
largest_cluster_sum_tstat = 0 # Handle case with no clusters

return largest_cluster_sum_tstat

# Parallel computation of permutation tests
if verbose:
null_distribution = Parallel(n_jobs=n_jobs)(
delayed(get_largest_cluster_stat)(roi_df, t_col=t_col, cluster_p_thr=cluster_p_thr, min_cluster_size=min_cluster_size, random_state=iter) for iter in tqdm(range(n_permutations))
)
else:
null_distribution = Parallel(n_jobs=n_jobs)(
delayed(get_largest_cluster_stat)(roi_df, t_col=t_col, cluster_p_thr=cluster_p_thr, min_cluster_size=min_cluster_size, random_state=iter) for iter in range(n_permutations)
)

return null_distribution

def cluster_based_permutation(roi_df, n_permutations=1000, t_col='ts', cluster_p_thr=.05, fwe_thr=.05, min_cluster_size=3, output_null=False, verbose=False):
"""
Perform cluster-based permutation test across all electrodes at each timepoint and compute FWE p-values.

Parameters:
- roi_df: DataFrame containing the region of interest (ROI) data.
- n_permutations: Number of permutations for the null distribution (default: 1000).
- t_col: Name of the column containing time data (default: 'ts').
- cluster_p_thr: cluster forming threshold.
- fwe_thr: Family-wise error rate threshold for significance.
- min_cluster_size: Minimum number of consecutive significant ts values (default: 3).
- output_null: Whether to output the null distribution (default: False).

Returns:
- roi_ttest: DataFrame with t-statistics and p-values for each timepoint.
- cluster_tstats: DataFrame with summed t-statistics for each cluster identified.
- null_distribution: List of summed t-statistics


Example Usage:
# can use outputs from `statistics_utils.time_resolved_regression_single_channel(timeseries,regressors,standardize=True,smooth=False)`
```
roi_df = all_channels_df[all_channels_df.region == 'AMY'][['Original_Estimate', 'ts']]
roi_ttest, cluster_tstats = cluster_based_permutation(roi_df)
```
"""

# Compute actual statistics
roi_ttest, cluster_tstats = find_clusters(roi_df, t_col=t_col, cluster_p_thr=cluster_p_thr, min_cluster_size=min_cluster_size)

# check if any clusters identified; if not, then no need to do permutation testing
if not cluster_tstats.empty:
# Generate permutation-based null distribution
null_distribution = ts_permutation_test(roi_df, n_permutations=n_permutations, t_col=t_col, cluster_p_thr=cluster_p_thr, min_cluster_size=min_cluster_size, verbose=verbose)

# Add min and max values of the null distribution to cluster statistics
cluster_tstats['null_min'] = np.min(null_distribution)
cluster_tstats['null_max'] = np.max(null_distribution)

# Compute FWE-corrected p-values and mask based on significance threshold
cluster_tstats['fwe_pval'] = cluster_tstats['abs_sum_tstat'].apply(
lambda x: np.mean(np.abs(null_distribution) >= np.abs(x))
)
cluster_tstats['fwe_mask'] = cluster_tstats['fwe_pval'] < fwe_thr

# create fwe_mask in roi_ttest based on cluster_tstats
thresholded_clusters = np.unique(cluster_tstats.index.values)
roi_ttest['fwe_mask'] = roi_ttest['cluster'].apply(lambda x: x in thresholded_clusters)

if output_null:
return roi_ttest, cluster_tstats, null_distribution
else:
return roi_ttest, cluster_tstats
else:
return roi_ttest, cluster_tstats

# def time_resolved_regression_perm(timeseries=None, regressors=None, win_len=100, slide_len=25, standardize=True, sr=None, nsurr=500):

# """
Expand Down