Skip to content

pwwang/pipen

Repository files navigation

pipen logo showing a stylized pipeline diagram with interconnected nodes

A pipeline framework for python


Pypi Github Building Docs and API Codacy Codacy coverage Deps

Documentation | ChangeLog | Examples | API

Why pipen?

pipen is designed for data scientists, bioinformaticians, and researchers who need to create reproducible, scalable computational pipelines without the complexity of traditional workflow systems.

Target Audience

  • Data Scientists: Process large datasets with automatic parallelization and caching
  • Bioinformaticians: Build reproducible analysis pipelines for genomics data
  • Researchers: Create transparent, reproducible workflows for computational research
  • DevOps Engineers: Orchestrate batch jobs across different schedulers (SLURM, SGE, Google Cloud)

Key Benefits

1. Zero Configuration

  • Get started immediately with sensible defaults
  • Configure only what you need, when you need it
  • Profile-based configuration for different environments

2. Reproducibility Built-In

  • Automatic job caching based on input/output signatures
  • Full audit trail of pipeline runs and parameters
  • Dependency tracking ensures processes run in correct order

3. Flexible Scheduling

  • Run locally for development
  • Scale to HPC clusters (SLURM, SGE)
  • Deploy to cloud (Google Cloud Batch, SSH)
  • Run in containers for reproducibility

4. Developer-Friendly

  • Define pipelines as Python classes
  • Use familiar Python syntax and tools
  • Extensible plugin system for custom functionality
  • Rich, informative logging and progress tracking

5. Data Flow Management

  • Automatic data passing between pipeline stages
  • Support for files, directories, and in-memory data
  • Built-in operations for transforming and aggregating data

Comparison with Alternatives

Feature pipen Snakemake Nextflow Airflow
Target Audience Data Scientists, Bioinformaticians, Researchers, DevOps Bioinformaticians Bioinformaticians Data Engineers
Learning Curve Low Medium High High
Python Integration Native Limited Limited Native
Scheduler Support 6+ (Local, SGE, SLURM, SSH, Container, Gbatch) Limited Limited Plugin-based
Caching Built-in, automatic Manual Manual Plugin-based
Cloud Native Support Yes (Google Cloud Batch) Partial Yes Yes
Interactive Debugging Yes Limited No No
Easy to Use Define pipelines as Python classes, familiar syntax Workflow DSL, separate config files DAG definition in Python, complex UI
Zero Configuration Sensible defaults, configure only what needed Many configuration options Heavy configuration required Complex setup
Nice Logging Rich, informative, color-coded, progress bars Text-based Text-based Basic logging
Highly Extensible Simple plugin system, hook-based Custom rules/scripts Custom operators Custom operators/providers
Data Flow Management Built-in channel operations (expand_dir, collapse_files) Manual handling Channel system XCom system
Reproducibility Built-in caching, full audit trail Manual Versioned containers DAG versioning
Flexible Scheduling Switch schedulers without code changes Config-based Config-based Config-based

Installation

pip install -U pipen

Quickstart

example.py

from pipen import Proc, Pipen, run

class P1(Proc):
    """Sort input file"""
    input = "infile"
    input_data = ["/tmp/data.txt"]
    output = "outfile:file:intermediate.txt"
    script = "cat {{in.infile}} | sort > {{out.outfile}}"

class P2(Proc):
    """Paste line number"""
    requires = P1
    input = "infile:file"
    output = "outfile:file:result.txt"
    script = "paste <(seq 1 3) {{in.infile}} > {{out.outfile}}"

# class MyPipeline(Pipen):
#     starts = P1

if __name__ == "__main__":
    # MyPipeline().run()
    run("MyPipeline", starts=P1)
> echo -e "3\n2\n1" > /tmp/data.txt
> python example.py
04-17 16:19:35 I core                   _____________________________________   __
04-17 16:19:35 I core                   ___  __ \___  _/__  __ \__  ____/__  | / /
04-17 16:19:35 I core                   __  /_/ /__  / __  /_/ /_  __/  __   |/ /
04-17 16:19:35 I core                   _  ____/__/ /  _  ____/_  /___  _  /|  /
04-17 16:19:35 I core                   /_/     /___/  /_/     /_____/  /_/ |_/
04-17 16:19:35 I core
04-17 16:19:35 I core                               version: 1.1.8
04-17 16:19:35 I core
04-17 16:19:35 I core    ╔═══════════════════════════ MYPIPELINE ════════════════════════════╗
04-17 16:19:35 I core    ║ My pipeline                                                       ║
04-17 16:19:35 I core    ╚═══════════════════════════════════════════════════════════════════╝
04-17 16:19:35 I core    plugins         : verbose v1.1.1
04-17 16:19:35 I core    # procs         : 2
04-17 16:19:35 I core    profile         : default
04-17 16:19:35 I core    outdir          : /path/to/cwd/MyPipeline-output
04-17 16:19:35 I core    cache           : True
04-17 16:19:35 I core    dirsig          : 1
04-17 16:19:35 I core    error_strategy  : ignore
04-17 16:19:35 I core    forks           : 1
04-17 16:19:35 I core    lang            : bash
04-17 16:19:35 I core    loglevel        : info
04-17 16:19:35 I core    num_retries     : 3
04-17 16:19:35 I core    scheduler       : local
04-17 16:19:35 I core    submission_batch: 8
04-17 16:19:35 I core    template        : liquid
04-17 16:19:35 I core    workdir         : /path/to/cwd/.pipen/MyPipeline
04-17 16:19:35 I core    plugin_opts     :
04-17 16:19:35 I core    template_opts   : filters={'realpath': <function realpath at 0x7fc3eba12...
04-17 16:19:35 I core                    : globals={'realpath': <function realpath at 0x7fc3eba12...
04-17 16:19:35 I core    Initializing plugins ...
04-17 16:19:36 I core
04-17 16:19:36 I core    ╭─────────────────────────────── P1 ────────────────────────────────╮
04-17 16:19:36 I core    │ Sort input file                                                   │
04-17 16:19:36 I core    ╰───────────────────────────────────────────────────────────────────╯
04-17 16:19:36 I core    P1: Workdir: '/path/to/cwd/.pipen/MyPipeline/P1'
04-17 16:19:36 I core    P1: <<< [START]
04-17 16:19:36 I core    P1: >>> ['P2']
04-17 16:19:36 I verbose P1: in.infile: /tmp/data.txt
04-17 16:19:36 I verbose P1: out.outfile: /path/to/cwd/.pipen/MyPipeline/P1/0/output/intermediate.txt
04-17 16:19:38 I verbose P1: Time elapsed: 00:00:02.051s
04-17 16:19:38 I core
04-17 16:19:38 I core    ╭═══════════════════════════════ P2 ════════════════════════════════╮
04-17 16:19:38 I core    ║ Paste line number                                                 ║
04-17 16:19:38 I core    ╰═══════════════════════════════════════════════════════════════════╯
04-17 16:19:38 I core    P2: Workdir: '/path/to/cwd/.pipen/MyPipeline/P2'
04-17 16:19:38 I core    P2: <<< ['P1']
04-17 16:19:38 I core    P2: >>> [END]
04-17 16:19:38 I verbose P2: in.infile: /path/to/cwd/.pipen/MyPipeline/P1/0/output/intermediate.txt
04-17 16:19:38 I verbose P2: out.outfile: /path/to/cwd/MyPipeline-output/P2/result.txt
04-17 16:19:41 I verbose P2: Time elapsed: 00:00:02.051s
04-17 16:19:41 I core

             MYPIPELINE: 100%|██████████████████████████████| 2/2 [00:06<00:00, 0.35 procs/s]
> cat ./MyPipeline-output/P2/result.txt
1       1
2       2
3       3

Examples

See more examples at examples/ and a more realcase example at:

https://github.com/pwwang/pipen-report/tree/master/example

Plugin gallery

Plugins make pipen even better.