Build Custom Stores and Destinations
30 minutes: Implement your own stores and destinations.
Prerequisites: Completed Your First Pipeline or familiar with ETL basics.
Understanding the Destination Trait
ETL delivers data to destinations in two phases:
| Phase | Method | When | Data Type |
|---|---|---|---|
| Initial Copy | write_table_rows() |
Startup | Vec<TableRow> |
| Streaming | write_events() |
After copy | Vec<Event> |
Note: During initial copy, parallel table sync workers each process their own replication slot, so Begin and Commit transaction markers may appear multiple times. This does not duplicate actual row data.
Step 1: Create the Project
1 2 | |
Update Cargo.toml:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | |
Verify: cargo check succeeds.
Step 2: Implement a Custom Store
Create src/custom_store.rs. A store must implement three traits (see Extension Points for full details):
SchemaStore- Table schema storage and retrievalStateStore- Replication progress and table mapping trackingCleanupStore- Metadata cleanup when tables leave the publication
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | |
Verify: cargo check succeeds.
Step 3: Implement a Custom Destination
Create src/http_destination.rs. A destination implements the Destination trait with four required methods:
name()- Return an identifier for loggingtruncate_table()- Clear table before bulk load (called even if table does not exist)write_table_rows()- Receive rows during initial copy (may receive empty vec for table creation)write_events()- Receive streaming changes (batches may span multiple tables)
There's also an optional shutdown() method with a default no-op implementation. Override it if your destination needs cleanup when the pipeline shuts down.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | |
Verify: cargo check succeeds.
Step 4: Wire It Together
Create src/main.rs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | |
Note: Update the database name, password, and HTTP endpoint to match your setup.
Step 5: Test
1 | |
The pipeline will connect to Postgres and start replicating. You'll see your custom store logging state transitions and your destination receiving HTTP calls.
What You Built
- Custom Store - In-memory implementation of
SchemaStore,StateStore, andCleanupStore - HTTP Destination - Forwards replicated data via HTTP POST with retry logic
- Working Pipeline - Connects your custom components to the ETL core
Next Steps
- Extension Points - Full trait API documentation
- Event Types - Details on all events your destination receives
- Configure Postgres - Production database setup
- Architecture - How ETL works internally