Build Your First ETL Pipeline
15 minutes: Learn the fundamentals by building a working pipeline.
By the end of this tutorial, you'll have a complete ETL pipeline that streams Postgres row changes to a tiny custom destination in real time.
What You'll Build
Section titled “What You'll Build”A real-time data pipeline that:
- Monitors a Postgres table for changes
- Streams
INSERT,UPDATE, andDELETEoperations - Prints copied rows and streaming events through your own
Destinationimplementation
Prerequisites
Section titled “Prerequisites”- Rust toolchain 1.93.1, matching
rust-toolchain.toml - Postgres 14+ with logical replication enabled (
wal_level = logicalinpostgresql.conf) - Basic familiarity with Rust and SQL
New to Postgres logical replication? Read Postgres Replication Concepts first.
Step 1: Create the Project
Section titled “Step 1: Create the Project”cargo new etl-tutorialcd etl-tutorialAdd dependencies to Cargo.toml:
[dependencies]etl = { git = "https://github.com/supabase/etl" }tokio = { version = "1", features = ["full"] }tracing-subscriber = { version = "0.3", features = ["env-filter"] }Verify: Run cargo check and confirm it compiles without errors.
Step 2: Set Up Postgres
Section titled “Step 2: Set Up Postgres”Connect to Postgres and create a test database, table, seed rows, and publication:
CREATE DATABASE etl_tutorial;\c etl_tutorial
CREATE TABLE users ( id SERIAL PRIMARY KEY, name TEXT NOT NULL, email TEXT UNIQUE NOT NULL, created_at TIMESTAMP DEFAULT NOW());
INSERT INTO users (name, email) VALUES ('Alice Johnson', 'alice@example.com'), ('Bob Smith', 'bob@example.com');
CREATE PUBLICATION my_publication FOR TABLE users;Verify: SELECT * FROM pg_publication WHERE pubname = 'my_publication'; returns one row.
Step 3: Write the Pipeline
Section titled “Step 3: Write the Pipeline”Replace src/main.rs:
use etl::{ config::{ BatchConfig, InvalidatedSlotBehavior, MemoryBackpressureConfig, PgConnectionConfig, PipelineConfig, TableSyncCopyConfig, TcpKeepaliveConfig, TlsConfig, }, destination::{Destination, DropTableForCopyResult, WriteEventsResult, WriteTableRowsResult}, error::EtlResult, pipeline::Pipeline, store::MemoryStore, types::{Event, ReplicatedTableSchema, TableRow},};use std::error::Error;
#[derive(Clone)]struct LoggingDestination;
impl Destination for LoggingDestination { fn name() -> &'static str { "logging" }
async fn drop_table_for_copy( &self, _replicated_table_schema: &ReplicatedTableSchema, async_result: DropTableForCopyResult<()>, ) -> EtlResult<()> { println!("preparing fresh table copy"); async_result.send(Ok(())); Ok(()) }
async fn write_table_rows( &self, _replicated_table_schema: &ReplicatedTableSchema, rows: Vec<TableRow>, async_result: WriteTableRowsResult<()>, ) -> EtlResult<()> { println!("copied {} rows", rows.len()); async_result.send(Ok(())); Ok(()) }
async fn write_events( &self, events: Vec<Event>, async_result: WriteEventsResult<()>, ) -> EtlResult<()> { println!("received {} streaming events", events.len()); async_result.send(Ok(())); Ok(()) }}
#[tokio::main]async fn main() -> Result<(), Box<dyn Error>> { tracing_subscriber::fmt::init();
let pg_config = PgConnectionConfig { host: "localhost".to_string(), hostaddr: None, port: 5432, name: "etl_tutorial".to_string(), username: "postgres".to_string(), password: Some("your_password".to_string().into()), tls: TlsConfig { enabled: false, trusted_root_certs: String::new(), }, keepalive: TcpKeepaliveConfig::default(), };
let config = PipelineConfig { id: 1, publication_name: "my_publication".to_string(), pg_connection: pg_config, store_pg_connection: None, batch: BatchConfig { max_fill_ms: 5000, memory_budget_ratio: 0.2, max_bytes: 8 * 1024 * 1024, }, table_error_retry_delay_ms: 10000, table_error_retry_max_attempts: 5, max_table_sync_workers: 4, max_copy_connections_per_table: PipelineConfig::DEFAULT_MAX_COPY_CONNECTIONS_PER_TABLE, memory_refresh_interval_ms: 100, memory_backpressure: Some(MemoryBackpressureConfig::default()), table_sync_copy: TableSyncCopyConfig::default(), invalidated_slot_behavior: InvalidatedSlotBehavior::default(), };
let store = MemoryStore::new(); let destination = LoggingDestination;
println!("Starting pipeline..."); let mut pipeline = Pipeline::new(config, store, destination); pipeline.start().await?; pipeline.wait().await?;
Ok(())}Note: Update the password field to match your Postgres credentials.
Pipeline::start() installs the ETL source-side schema helpers before
replication begins, even when the tutorial keeps runtime state in MemoryStore.
Step 4: Run the Pipeline
Section titled “Step 4: Run the Pipeline”RUST_LOG=info cargo runYou should see ETL startup logs plus messages from LoggingDestination as the initial rows are copied. After that, the pipeline keeps running and prints the size of each streaming event batch.
Step 5: Test Real-Time Replication
Section titled “Step 5: Test Real-Time Replication”In another terminal, make changes to the database:
\c etl_tutorial
INSERT INTO users (name, email) VALUES ('Charlie Brown', 'charlie@example.com');UPDATE users SET name = 'Alice Cooper' WHERE email = 'alice@example.com';DELETE FROM users WHERE email = 'bob@example.com';Your pipeline terminal should show new streaming batches being received in real time.
Cleanup
Section titled “Cleanup”Stop the pipeline with Ctrl+C, then clean up the database:
-- Connect to a different database first (e.g., postgres)\c postgresDROP DATABASE etl_tutorial;What You Learned
Section titled “What You Learned”- Publications define which tables to replicate via Postgres logical replication
- Pipeline configuration controls batching behavior and error retry policies
- Custom destinations are normal Rust types that implement the
Destinationtrait - The pipeline performs an initial table copy, then streams changes in real-time
Next Steps
Section titled “Next Steps”- Custom Stores and Destinations: Build your own components
- Configure Postgres: Production Postgres setup
- Architecture: How ETL works internally