Loading...

Snakemake

Python-based workflow management system for creating reproducible and scalable data analyses

Research Infrastructure & Reproducibility Advanced Recommended Tool
Quick Info
  • Category: Research Infrastructure & Reproducibility
  • Level: Advanced
  • Type: Recommended Tool
  • Requires:
    • python

Why We Recommend Snakemake

Snakemake helps you build reproducible analysis pipelines that track dependencies automatically. It ensures your analyses run in the correct order, reruns only what's necessary when data changes, and scales from your laptop to compute clusters.

Common Use Cases

  • Building reproducible analysis pipelines
  • Managing complex multi-step workflows
  • Parallel processing of multiple files
  • Ensuring reproducibility in research

Getting Started

Snakemake is a workflow management system that helps you create reproducible and scalable data analyses. It uses a Python-based language to define rules that describe how to create output files from input files.

Why Snakemake?

  • Reproducibility: Tracks all steps and dependencies
  • Automatic Parallelization: Runs independent tasks simultaneously
  • Scalability: Works on laptops and compute clusters
  • Resumable: Continues from where it left off after failures
  • Readable: Python-based syntax
  • Dependency Tracking: Automatically determines what needs to run

Basic Concepts

Rules

Rules define how to create outputs from inputs:

# Snakefile
rule download_data:
    output:
        "data/raw/experiment.csv"
    shell:
        "wget https://example.com/data.csv -O {output}"

rule preprocess:
    input:
        "data/raw/experiment.csv"
    output:
        "data/processed/experiment_clean.csv"
    script:
        "scripts/preprocess.py"

rule analyze:
    input:
        "data/processed/experiment_clean.csv"
    output:
        "results/analysis.png"
    script:
        "scripts/analyze.py"

Running Workflows

# Run workflow (creates final outputs)
snakemake --cores 1

# Dry run (show what would be executed)
snakemake -n

# Run with multiple cores
snakemake --cores 4

# Create specific output
snakemake results/analysis.png

Wildcards for Multiple Files

Process Multiple Subjects

# Define subjects
SUBJECTS = ['S01', 'S02', 'S03']

rule all:
    input:
        expand("results/{subject}_analysis.png", subject=SUBJECTS)

rule preprocess_subject:
    input:
        "data/raw/{subject}_recording.tif"
    output:
        "data/processed/{subject}_traces.csv"
    shell:
        "python scripts/extract_traces.py {input} {output}"

rule analyze_subject:
    input:
        "data/processed/{subject}_traces.csv"
    output:
        "results/{subject}_analysis.png"
    script:
        "scripts/plot_traces.py"

Neuroscience Pipeline Example

Complete Calcium Imaging Workflow

# Snakefile
SUBJECTS = ['S01', 'S02', 'S03']
SESSIONS = ['session1', 'session2']

rule all:
    input:
        "results/summary_report.html"

rule motion_correction:
    input:
        "data/raw/{subject}/{session}/recording.tif"
    output:
        "data/processed/{subject}/{session}/registered.tif"
    threads: 4
    shell:
        """
        python scripts/motion_correct.py \
            --input {input} \
            --output {output} \
            --threads {threads}
        """

rule roi_detection:
    input:
        "data/processed/{subject}/{session}/registered.tif"
    output:
        rois="data/processed/{subject}/{session}/rois.npy",
        traces="data/processed/{subject}/{session}/traces.csv"
    script:
        "scripts/detect_rois.py"

rule spike_inference:
    input:
        "data/processed/{subject}/{session}/traces.csv"
    output:
        "data/processed/{subject}/{session}/spikes.csv"
    params:
        tau=1.0  # GCaMP time constant
    script:
        "scripts/infer_spikes.py"

rule aggregate_sessions:
    input:
        expand(
            "data/processed/{subject}/{session}/spikes.csv",
            subject=SUBJECTS,
            session=SESSIONS
        )
    output:
        "results/all_sessions.csv"
    script:
        "scripts/aggregate.py"

rule create_report:
    input:
        "results/all_sessions.csv"
    output:
        "results/summary_report.html"
    script:
        "scripts/generate_report.py"

Configuration Files

Using Config Files

# config.yaml
subjects:
  - S01
  - S02
  - S03

parameters:
  gaussian_sigma: 1.5
  threshold: 0.5
  min_roi_size: 20

# Snakefile
configfile: "config.yaml"

rule process:
    input:
        "data/{subject}.tif"
    output:
        "results/{subject}_processed.npy"
    params:
        sigma=config['parameters']['gaussian_sigma'],
        threshold=config['parameters']['threshold']
    script:
        "scripts/process.py"

Python Scripts Integration

Access Snakemake Variables

# scripts/process.py
import numpy as np
from skimage import io, filters

# Access snakemake object
input_file = snakemake.input[0]
output_file = snakemake.output[0]
sigma = snakemake.params.sigma

# Process
image = io.imread(input_file)
filtered = filters.gaussian(image, sigma=sigma)

# Save
np.save(output_file, filtered)

Parallel Processing

Process Files in Parallel

# Process 10 subjects simultaneously
snakemake --cores 10

# Use all available cores
snakemake --cores all

# Limit resource usage
rule heavy_computation:
    input:
        "data/{subject}.tif"
    output:
        "results/{subject}.npy"
    threads: 4
    resources:
        mem_mb=8000
    shell:
        "python scripts/analyze.py {input} {output} --threads {threads}"

Conda Integration

Environment Management

rule analyze:
    input:
        "data/processed.csv"
    output:
        "results/analysis.png"
    conda:
        "envs/analysis.yaml"
    script:
        "scripts/analyze.py"

# envs/analysis.yaml
channels:
  - conda-forge
dependencies:
  - python=3.11
  - numpy
  - pandas
  - matplotlib
  - scipy

Logging and Reporting

Track Execution

rule process:
    input:
        "data/{subject}.tif"
    output:
        "results/{subject}.npy"
    log:
        "logs/{subject}_processing.log"
    benchmark:
        "benchmarks/{subject}_processing.txt"
    shell:
        "python scripts/process.py {input} {output} 2> {log}"

Visualization

Create Workflow Diagram

# Generate DAG visualization
snakemake --dag | dot -Tpng > workflow.png

# Rule graph
snakemake --rulegraph | dot -Tpng > rules.png

Advanced Features

Checkpoints for Dynamic Workflows

checkpoint detect_outliers:
    input:
        "data/all_subjects.csv"
    output:
        "results/outliers.txt"
    script:
        "scripts/detect_outliers.py"

def get_good_subjects(wildcards):
    with open(checkpoints.detect_outliers.get().output[0]) as f:
        outliers = f.read().splitlines()
    return [s for s in SUBJECTS if s not in outliers]

rule analyze_good_subjects:
    input:
        lambda wildcards: expand(
            "data/{subject}.csv",
            subject=get_good_subjects(wildcards)
        )
    output:
        "results/final_analysis.png"
    script:
        "scripts/analyze.py"

Best Practices

Project Structure

project/
β”œβ”€β”€ Snakefile
β”œβ”€β”€ config.yaml
β”œβ”€β”€ data/
β”‚   β”œβ”€β”€ raw/
β”‚   └── processed/
β”œβ”€β”€ scripts/
β”‚   β”œβ”€β”€ preprocess.py
β”‚   β”œβ”€β”€ analyze.py
β”‚   └── plot.py
β”œβ”€β”€ results/
β”œβ”€β”€ logs/
└── envs/
    └── analysis.yaml

Rule Naming

# Good: descriptive names
rule motion_correct_calcium_imaging:
    ...

rule extract_roi_traces:
    ...

# Avoid: vague names
rule process:
    ...

rule step2:
    ...

Common Patterns

Batch Download

URLS = {
    'subject1': 'https://example.com/S01.zip',
    'subject2': 'https://example.com/S02.zip'
}

rule download:
    output:
        "data/raw/{subject}.zip"
    params:
        url=lambda wildcards: URLS[wildcards.subject]
    shell:
        "wget {params.url} -O {output}"

rule extract:
    input:
        "data/raw/{subject}.zip"
    output:
        directory("data/raw/{subject}/")
    shell:
        "unzip {input} -d {output}"

Quality Control

rule quality_check:
    input:
        "data/processed/{subject}.csv"
    output:
        qc_report="reports/{subject}_qc.html",
        passed="qc/{subject}_passed.txt"
    script:
        "scripts/quality_check.py"

# Only analyze files that passed QC
rule analyze:
    input:
        data="data/processed/{subject}.csv",
        qc="qc/{subject}_passed.txt"  # Ensures QC passed
    output:
        "results/{subject}_analysis.png"
    script:
        "scripts/analyze.py"

Debugging

Useful Commands

# Dry run with reasons
snakemake -n -r

# Print shell commands
snakemake -p

# Detailed debugging
snakemake --debug-dag

# Force rerun specific rule
snakemake --forcerun rulename

# Unlock directory after crash
snakemake --unlock

Cluster Execution

Run on HPC Cluster

# SLURM
snakemake --cluster "sbatch --cpus-per-task={threads}" --jobs 100

# With profile
snakemake --profile slurm/

Installation

# With Pixi
pixi add snakemake

# With pip
pip install snakemake

# With conda
conda install -c conda-forge snakemake

Resources

Summary

Snakemake is essential for:

  • Reproducible pipelines: Track every step
  • Scalable analyses: Laptop to cluster
  • Automatic parallelization: Speed up processing
  • Resume capability: Continue after failures

For any multi-step analysis, Snakemake ensures reproducibility and scalability.

Prerequisites

  • python
Top