Batch Processing#
Process multiple datasets efficiently using parallel processing.
Basic Usage#
from meeg_utils.preprocessing import BatchPreprocessingPipeline
from mne_bids import BIDSPath
# Define multiple inputs
bids_paths = [
BIDSPath(
subject=f"{i:02d}",
session="01",
task="rest",
datatype="eeg",
root="/path/to/bids/dataset"
)
for i in range(1, 21) # Process 20 subjects
]
# Create batch pipeline
batch = BatchPreprocessingPipeline(
input_paths=bids_paths,
output_dir="/path/to/output",
n_jobs=4 # Use 4 parallel workers
)
# Run batch processing
batch.run()
Parallel Processing#
Control the number of parallel jobs:
# Sequential processing (n_jobs=1)
batch = BatchPreprocessingPipeline(
input_paths=bids_paths,
output_dir="/path/to/output",
n_jobs=1
)
# Parallel processing
batch = BatchPreprocessingPipeline(
input_paths=bids_paths,
output_dir="/path/to/output",
n_jobs=4 # 4 parallel workers
)
# Use all available CPUs
import multiprocessing
batch = BatchPreprocessingPipeline(
input_paths=bids_paths,
output_dir="/path/to/output",
n_jobs=multiprocessing.cpu_count()
)
Note: Each worker uses 1 CPU core. Don’t set n_jobs higher than available cores.
Custom Parameters#
Apply same preprocessing to all files:
batch.run(
filter_params={
"highpass": 0.1,
"lowpass": 100.0,
"sfreq": 250.0
},
detect_bad_channels=True,
remove_line_noise=True,
apply_ica=True,
ica_params={"n_components": 20}
)
Skip Existing Files#
Resume interrupted batch processing:
batch.run(skip_existing=True)
This skips files that have already been processed, allowing you to:
Resume after interruption
Add new files to existing batch
Re-process only failed files
Logging#
Enable logging to file:
batch.run(
save_logs=True,
logging_level="INFO" # or "DEBUG", "WARNING", "ERROR"
)
This creates a log file: output_dir/batch_preprocessing.log
Control logging verbosity:
"DEBUG": Detailed information"INFO": General progress (default)"WARNING": Only warnings and errors"ERROR": Only errors
Input Path Types#
Multiple path types are supported:
BIDS Paths#
from mne_bids import BIDSPath
bids_paths = [
BIDSPath(subject="01", session="01", task="rest",
datatype="eeg", root="/data/bids"),
BIDSPath(subject="02", session="01", task="rest",
datatype="eeg", root="/data/bids"),
]
String Paths#
string_paths = [
"/data/subject01_eeg.fif",
"/data/subject02_eeg.fif",
]
Path Objects#
from pathlib import Path
path_objects = [
Path("/data/subject01_eeg.fif"),
Path("/data/subject02_eeg.fif"),
]
Mixed Types#
mixed_paths = [
bids_paths[0], # BIDSPath
"/data/subject02.fif", # string
Path("/data/subject03.fif"), # Path
]
batch = BatchPreprocessingPipeline(input_paths=mixed_paths)
Error Handling#
Batch processing continues even if individual files fail:
# Include an invalid path
paths_with_invalid = bids_paths + [
BIDSPath(subject="99", session="99", task="nonexistent",
datatype="eeg", root="/nonexistent")
]
batch = BatchPreprocessingPipeline(
input_paths=paths_with_invalid,
output_dir="/path/to/output"
)
# Processing continues for valid files
batch.run()
Check logs to see which files failed and why.
Output Organization#
BIDS Structure#
Output preserves BIDS directory structure:
output_dir/
├── sub-01/
│ └── ses-01/
│ └── eeg/
│ ├── sub-01_ses-01_task-rest_preproc_eeg.fif
│ └── sub-01_ses-01_task-rest_bad_channels.tsv
├── sub-02/
│ └── ses-01/
│ └── eeg/
│ ├── sub-02_ses-01_task-rest_preproc_eeg.fif
│ └── sub-02_ses-01_task-rest_bad_channels.tsv
└── batch_preprocessing.log
Custom Output#
For non-BIDS paths:
output_dir/
├── subject01_preproc_eeg.fif
├── subject02_preproc_eeg.fif
└── batch_preprocessing.log
Performance Tips#
Optimal n_jobs#
import multiprocessing
# Leave 1 core free for system
n_jobs = max(1, multiprocessing.cpu_count() - 1)
batch = BatchPreprocessingPipeline(
input_paths=bids_paths,
output_dir="/path/to/output",
n_jobs=n_jobs
)
Memory Considerations#
Each worker loads one dataset into memory. If datasets are large:
# Reduce n_jobs to avoid memory issues
batch = BatchPreprocessingPipeline(
input_paths=bids_paths,
output_dir="/path/to/output",
n_jobs=2 # Use fewer workers for large datasets
)
Progress Monitoring#
Monitor progress in real-time:
batch.run(
save_logs=True,
logging_level="INFO"
)
# Watch log file in another terminal
# tail -f output_dir/batch_preprocessing.log
Example: Large Study#
Complete example for processing a large study:
from meeg_utils.preprocessing import BatchPreprocessingPipeline
from mne_bids import BIDSPath
import multiprocessing
# Define study parameters
bids_root = "/data/my_study/bids"
output_dir = "/data/my_study/derivatives/meeg-utils"
subjects = [f"{i:02d}" for i in range(1, 101)] # 100 subjects
# Create BIDSPaths for all subjects
bids_paths = [
BIDSPath(
subject=sub,
session="01",
task="task",
datatype="eeg",
root=bids_root
)
for sub in subjects
]
# Configure batch processing
n_jobs = max(1, multiprocessing.cpu_count() - 1)
batch = BatchPreprocessingPipeline(
input_paths=bids_paths,
output_dir=output_dir,
n_jobs=n_jobs,
random_state=42 # For reproducibility
)
# Run with standard parameters
batch.run(
filter_params={
"highpass": 0.5,
"lowpass": 50.0,
"sfreq": 250.0
},
detect_bad_channels=True,
remove_line_noise=True,
apply_ica=True,
ica_params={"n_components": 20},
save_intermediate=True,
skip_existing=True, # Resume if interrupted
save_logs=True,
logging_level="INFO"
)
print("Batch processing complete!")
print(f"Results saved to: {output_dir}")
print(f"Log file: {output_dir}/batch_preprocessing.log")