Skip to content

ETL Documentation

Build real-time Postgres replication applications in Rust

ETL is a Rust framework by Supabase that enables you to build high-performance, real-time data replication applications for Postgres. Whether you're creating ETL pipelines, implementing CDC (Change Data Capture), or building custom data synchronization solutions, ETL provides the building blocks you need.

Getting Started

Choose your path based on your needs:

New to ETL?

Start with our Tutorials to learn ETL through hands-on examples:

Ready to solve specific problems?

Jump to our How-To Guides for practical solutions:

Want to understand the bigger picture?

Read our Explanations for deeper insights:

Core Concepts

Postgres Logical Replication streams data changes from Postgres databases in real-time using the Write-Ahead Log (WAL). ETL builds on this foundation to provide:

  • 🚀 Real-time replication - Stream changes as they happen
  • 🔄 Multiple destinations - BigQuery and more coming soon
  • 🛡️ Fault tolerance - Built-in error handling and recovery
  • High performance - Efficient batching and parallel processing
  • 🔧 Extensible - Plugin architecture for custom destinations

Quick Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use etl::{
    config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig},
    destination::memory::MemoryDestination,
    pipeline::Pipeline,
    store::both::memory::MemoryStore,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure Postgres connection
    let pg_config = PgConnectionConfig {
        host: "localhost".to_string(),
        port: 5432,
        name: "mydb".to_string(),
        username: "postgres".to_string(),
        password: Some("password".to_string().into()),
        tls: TlsConfig { enabled: false, trusted_root_certs: String::new() },
    };

    // Create memory-based store and destination for testing
    let store = MemoryStore::new();
    let destination = MemoryDestination::new();

    // Configure the pipeline
    let config = PipelineConfig {
        id: 1,
        publication_name: "my_publication".to_string(),
        pg_connection: pg_config,
        batch: BatchConfig { max_size: 1000, max_fill_ms: 5000 },
        table_error_retry_delay_ms: 10000,
        max_table_sync_workers: 4,
    };

    // Create and start the pipeline
    let mut pipeline = Pipeline::new(config, store, destination);
    pipeline.start().await?;

    // Pipeline will run until stopped
    pipeline.wait().await?;

    Ok(())
}

Next Steps