Skip to content

Build Your First ETL Pipeline

Learn the fundamentals by building a working pipeline in 15 minutes

By the end of this tutorial, you'll have a complete ETL pipeline that streams data changes from Postgres to a memory destination in real-time. You'll see how to set up publications, configure pipelines, and handle live data replication.

What You'll Build

A real-time data pipeline that:

  • Monitors a Postgres table for changes
  • Streams INSERT, UPDATE, and DELETE operations
  • Stores replicated data in memory for immediate access

Who This Tutorial Is For

  • Rust developers new to ETL
  • Anyone interested in Postgres logical replication
  • Developers building data synchronization tools

Time required: 15 minutes
Difficulty: Beginner

Safety Note

This tutorial uses an isolated test database. To clean up, simply drop the test database when finished. No production data is affected.

Step 1: Set Up Your Environment

Create a new Rust project for this tutorial:

1
2
cargo new etl-tutorial
cd etl-tutorial

Add ETL to your dependencies in Cargo.toml:

1
2
3
[dependencies]
etl = { git = "https://github.com/supabase/etl" }
tokio = { version = "1.0", features = ["full"] }

Checkpoint: Run cargo check - it should compile successfully.

Step 2: Prepare Postgres

Connect to your Postgres server and create a test database:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
CREATE DATABASE etl_tutorial;
\c etl_tutorial

-- Create a sample table
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    name TEXT NOT NULL,
    email TEXT UNIQUE NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

-- Insert sample data
INSERT INTO users (name, email) VALUES
    ('Alice Johnson', 'alice@example.com'),
    ('Bob Smith', 'bob@example.com');

Create a publication for replication:

1
CREATE PUBLICATION my_publication FOR TABLE users;

Checkpoint: Verify the publication exists:

1
SELECT * FROM pg_publication WHERE pubname = 'my_publication';

You should see one row returned.

Step 3: Configure Your Pipeline

Replace the contents of src/main.rs:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
use etl::config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig};
use etl::pipeline::Pipeline;
use etl::destination::memory::MemoryDestination;
use etl::store::both::memory::MemoryStore;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Configure Postgres connection.
    let pg_connection_config = PgConnectionConfig {
        host: "localhost".to_string(),
        port: 5432,
        name: "postgres".to_string(),
        username: "postgres".to_string(),
        password: Some("your_password".to_string().into()),
        tls: TlsConfig {
            trusted_root_certs: String::new(),
            enabled: false,
        },
    };

    // Configure pipeline behavior.
    let pipeline_config = PipelineConfig {
        id: 1,
        publication_name: "my_publication".to_string(),
        pg_connection: pg_connection_config,
        batch: BatchConfig {
            max_size: 1000,
            max_fill_ms: 5000,
        },
        table_error_retry_delay_ms: 10000,
        table_error_retry_max_attempts: 5,
        max_table_sync_workers: 4,
    };

    // Create stores and destination.
    let store = MemoryStore::new();
    let destination = MemoryDestination::new();

    // We spawn a task to periodically print the content of the destination.
    let destination_clone = destination.clone();
    tokio::spawn(async move {
        loop {
            println!("Destination Contents At This Time\n");

            // Table rows are the initial rows in the table that are copied.
            for (table_id, table_rows) in destination_clone.table_rows().await {
                println!("Table ({:?}): {:?}", table_id, table_rows);
            }

            // Events are realtime events that are sent by Postgres after the table has been copied.
            for event in destination_clone.events().await {
                println!("Event: {:?}", event);
            }

            tokio::time::sleep(std::time::Duration::from_secs(1)).await;

            print!("\n\n");
        }
    });

    println!("Starting ETL pipeline...");

    // Create and start the pipeline.
    let mut pipeline = Pipeline::new(pipeline_config, store, destination);
    pipeline.start().await?;

    println!("Waiting for pipeline to finish...");

    // Wait for the pipeline to finish, without a shutdown signal it will continue to work until the
    // connection is closed.
    pipeline.wait().await?;

    Ok(())
}

Important: Replace "your_password" with your Postgres password.

Step 4: Start Your Pipeline

Run your pipeline:

1
cargo run

You should see output like:

1
2
3
4
5
6
7
8
9
Starting ETL pipeline...
Waiting for pipeline to finish...

Destination Contents At This Time

Destination Contents At This Time

Table (TableId(32341)): [TableRow { values: [I32(1), String("Alice"), String("alice@example.com"), TimeStampTz(2025-08-05T11:14:54.400235Z)] }, TableRow { values: [I32(2), String("Bob"), String("bob@example.com"), TimeStampTz(2025-08-05T11:14:54.400235Z)] }, TableRow { values: [I32(3), String("Charlie"), String("charlie@example.com"), TimeStampTz(2025-08-05T11:14:54.400235Z)] }]
Table (TableId(245615)): [TableRow { values: [I32(1), Array(I32([Some(1), Some(2), None, Some(4)]))] }, TableRow { values: [I32(2), Array(I32([None, None, Some(3)]))] }, TableRow { values: [I32(3), Array(I32([Some(5), None]))] }, TableRow { values: [I32(4), Array(I32([None]))] }, TableRow { values: [I32(5), Null] }]

Checkpoint: Your pipeline is now running and has completed initial synchronization.

Step 5: Test Real-Time Replication

With your pipeline running, open a new terminal and connect to Postgres:

1
psql -d etl_tutorial

Make some changes to test replication:

1
2
3
4
5
6
7
8
-- Insert a new user
INSERT INTO users (name, email) VALUES ('Charlie Brown', 'charlie@example.com');

-- Update an existing user
UPDATE users SET name = 'Alice Cooper' WHERE email = 'alice@example.com';

-- Delete a user
DELETE FROM users WHERE email = 'bob@example.com';

Checkpoint: In your pipeline terminal, you should see log messages indicating these changes were captured and processed.

Step 6: Verify Data Replication

The data is now replicated in your memory destination. While this tutorial uses memory (perfect for testing), the same pattern works with any destination.

Checkpoint: You've successfully built and tested a complete ETL pipeline!

What You've Learned

You've mastered the core ETL concepts:

  • Publications define which tables to replicate
  • Pipeline configuration controls behavior and performance
  • Memory destinations provide fast, local testing
  • Real-time replication captures all data changes automatically

Cleanup

Remove the test database:

1
DROP DATABASE etl_tutorial;

Next Steps

Now that you understand the basics:

See Also