Build Your First ETL Pipeline
Learn the fundamentals by building a working pipeline in 15 minutes
By the end of this tutorial, you'll have a complete ETL pipeline that streams data changes from Postgres to a memory destination in real-time. You'll see how to set up publications, configure pipelines, and handle live data replication.
What You'll Build
A real-time data pipeline that:
- Monitors a Postgres table for changes
- Streams INSERT, UPDATE, and DELETE operations
- Stores replicated data in memory for immediate access
Who This Tutorial Is For
- Rust developers new to ETL
- Anyone interested in Postgres logical replication
- Developers building data synchronization tools
Time required: 15 minutes
Difficulty: Beginner
Safety Note
This tutorial uses an isolated test database. To clean up, simply drop the test database when finished. No production data is affected.
Step 1: Set Up Your Environment
Create a new Rust project for this tutorial:
| cargo new etl-tutorial
cd etl-tutorial
|
Add ETL to your dependencies in Cargo.toml:
| [dependencies]
etl = { git = "https://github.com/supabase/etl" }
tokio = { version = "1.0", features = ["full"] }
|
Checkpoint: Run cargo check - it should compile successfully.
Step 2: Prepare Postgres
Connect to your Postgres server and create a test database:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 | CREATE DATABASE etl_tutorial;
\c etl_tutorial
-- Create a sample table
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
-- Insert sample data
INSERT INTO users (name, email) VALUES
('Alice Johnson', 'alice@example.com'),
('Bob Smith', 'bob@example.com');
|
Create a publication for replication:
| CREATE PUBLICATION my_publication FOR TABLE users;
|
Checkpoint: Verify the publication exists:
| SELECT * FROM pg_publication WHERE pubname = 'my_publication';
|
You should see one row returned.
Replace the contents of src/main.rs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 | use etl::config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig};
use etl::pipeline::Pipeline;
use etl::destination::memory::MemoryDestination;
use etl::store::both::memory::MemoryStore;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Configure Postgres connection.
let pg_connection_config = PgConnectionConfig {
host: "localhost".to_string(),
port: 5432,
name: "postgres".to_string(),
username: "postgres".to_string(),
password: Some("your_password".to_string().into()),
tls: TlsConfig {
trusted_root_certs: String::new(),
enabled: false,
},
};
// Configure pipeline behavior.
let pipeline_config = PipelineConfig {
id: 1,
publication_name: "my_publication".to_string(),
pg_connection: pg_connection_config,
batch: BatchConfig {
max_size: 1000,
max_fill_ms: 5000,
},
table_error_retry_delay_ms: 10000,
table_error_retry_max_attempts: 5,
max_table_sync_workers: 4,
};
// Create stores and destination.
let store = MemoryStore::new();
let destination = MemoryDestination::new();
// We spawn a task to periodically print the content of the destination.
let destination_clone = destination.clone();
tokio::spawn(async move {
loop {
println!("Destination Contents At This Time\n");
// Table rows are the initial rows in the table that are copied.
for (table_id, table_rows) in destination_clone.table_rows().await {
println!("Table ({:?}): {:?}", table_id, table_rows);
}
// Events are realtime events that are sent by Postgres after the table has been copied.
for event in destination_clone.events().await {
println!("Event: {:?}", event);
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
print!("\n\n");
}
});
println!("Starting ETL pipeline...");
// Create and start the pipeline.
let mut pipeline = Pipeline::new(pipeline_config, store, destination);
pipeline.start().await?;
println!("Waiting for pipeline to finish...");
// Wait for the pipeline to finish, without a shutdown signal it will continue to work until the
// connection is closed.
pipeline.wait().await?;
Ok(())
}
|
Important: Replace "your_password" with your Postgres password.
Step 4: Start Your Pipeline
Run your pipeline:
You should see output like:
| Starting ETL pipeline...
Waiting for pipeline to finish...
Destination Contents At This Time
Destination Contents At This Time
Table (TableId(32341)): [TableRow { values: [I32(1), String("Alice"), String("alice@example.com"), TimeStampTz(2025-08-05T11:14:54.400235Z)] }, TableRow { values: [I32(2), String("Bob"), String("bob@example.com"), TimeStampTz(2025-08-05T11:14:54.400235Z)] }, TableRow { values: [I32(3), String("Charlie"), String("charlie@example.com"), TimeStampTz(2025-08-05T11:14:54.400235Z)] }]
Table (TableId(245615)): [TableRow { values: [I32(1), Array(I32([Some(1), Some(2), None, Some(4)]))] }, TableRow { values: [I32(2), Array(I32([None, None, Some(3)]))] }, TableRow { values: [I32(3), Array(I32([Some(5), None]))] }, TableRow { values: [I32(4), Array(I32([None]))] }, TableRow { values: [I32(5), Null] }]
|
Checkpoint: Your pipeline is now running and has completed initial synchronization.
Step 5: Test Real-Time Replication
With your pipeline running, open a new terminal and connect to Postgres:
Make some changes to test replication:
| -- Insert a new user
INSERT INTO users (name, email) VALUES ('Charlie Brown', 'charlie@example.com');
-- Update an existing user
UPDATE users SET name = 'Alice Cooper' WHERE email = 'alice@example.com';
-- Delete a user
DELETE FROM users WHERE email = 'bob@example.com';
|
Checkpoint: In your pipeline terminal, you should see log messages indicating these changes were captured and processed.
Step 6: Verify Data Replication
The data is now replicated in your memory destination. While this tutorial uses memory (perfect for testing), the same pattern works with any destination.
Checkpoint: You've successfully built and tested a complete ETL pipeline!
What You've Learned
You've mastered the core ETL concepts:
- Publications define which tables to replicate
- Pipeline configuration controls behavior and performance
- Memory destinations provide fast, local testing
- Real-time replication captures all data changes automatically
Cleanup
Remove the test database:
| DROP DATABASE etl_tutorial;
|
Next Steps
Now that you understand the basics:
See Also