Extension Points
Traits you implement to customize ETL behavior
ETL provides four traits for customization. Implement these to control where data goes and how state is stored.
Destination
Receives replicated data. This is the primary extension point for sending data to custom systems.
1 2 3 4 5 6 7 | |
Methods
| Method | Purpose |
|---|---|
name() |
Returns identifier for logging and diagnostics |
shutdown() |
Called when the pipeline shuts down. Default is a no-op. Override for cleanup or bookkeeping |
truncate_table() |
Clears table data before initial sync. Receives the current replicated schema for the table |
write_table_rows() |
Writes rows during initial table copy. Receives the current replicated schema and may get an empty vector for tables with no data |
write_events() |
Processes streaming replication events (inserts, updates, deletes). Batches may span multiple tables |
Implementation Notes
- Operations should be idempotent when possible (ETL may retry on failure)
- Handle concurrent calls safely (parallel table sync workers)
- Process events in order to maintain data consistency
- All three write-like methods use async results, but ETL waits differently:
truncate_table()waits immediately.write_table_rows()also waits immediately, requesting the next batch only after the current one finishes for that copy partition.write_events()is the method where ETL can keep processing while the destination finishes the current batch.
See Event Types for details on the events received by write_events().
SchemaStore
Stores versioned table schema information (column names, types, primary keys, and snapshot IDs).
1 2 3 4 5 6 7 | |
Methods
| Method | Purpose |
|---|---|
get_table_schema() |
Returns the schema version with the largest snapshot_id <= requested_snapshot_id |
get_table_schemas() |
Returns all cached schemas |
load_table_schemas() |
Loads schemas from persistent storage into cache. Call once at startup. Returns the number of schemas loaded |
store_table_schema() |
Saves a schema version to both cache and persistent storage and returns the cached Arc |
prune_table_schemas() |
For the supplied per-table retention boundaries, preserves the newest schema version at or before each retention LSN and removes older versions. Implementations with both cache and persistent storage must prune both |
StateStore
Tracks replication progress and destination table metadata.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | |
Replication State Methods
| Method | Purpose |
|---|---|
get_table_replication_state() |
Returns current phase for a table from cache |
get_table_replication_states() |
Returns phases for all tables from cache as [TableReplicationStates] |
load_table_replication_states() |
Loads phases from persistent storage into cache. Call once at startup. Returns the number of states loaded |
update_table_replication_states() |
Atomically updates multiple table phases in both cache and persistent storage |
update_table_replication_state() |
Updates phase in both cache and persistent storage |
rollback_table_replication_state() |
Reverts table to previous phase. Returns the phase after rollback |
Destination Metadata Methods
Destination table metadata connects source table IDs to destination state, including the destination table identifier, the currently applied schema snapshot, and the replication mask.
| Method | Purpose |
|---|---|
get_destination_table_metadata() |
Returns destination table metadata for a source table from cache |
get_applied_destination_table_metadata() |
Returns destination table metadata only when the destination schema is fully applied |
load_destination_tables_metadata() |
Loads destination table metadata from persistent storage into cache |
store_destination_table_metadata() |
Saves destination table metadata to both cache and persistent storage |
Table Replication Phases
Tables progress through these phases:
| Phase | Persisted | Description |
|---|---|---|
Init |
Yes | Table discovered, ready to start |
DataSync |
Yes | Initial data being copied |
FinishedCopy |
Yes | Copy complete, waiting for coordination |
SyncWait |
No | Table sync worker signaling apply worker to pause |
Catchup { lsn } |
No | Apply worker paused, table sync worker catching up to LSN |
SyncDone { lsn } |
Yes | Caught up to LSN, ready for handoff |
Ready |
Yes | Streaming changes via apply worker |
Errored { reason, solution, retry_policy } |
Yes | Error occurred, excluded until rollback |
CleanupStore
Removes ETL metadata when tables are removed from the publication.
1 2 3 | |
| Method | Purpose |
|---|---|
cleanup_table_state() |
Deletes all stored state for a table: replication state, schema versions, and destination table metadata. Does not modify destination tables |
Combining Traits
A single type typically implements all store traits:
1 2 3 4 5 | |
ETL provides two built-in implementations:
MemoryStore: In-memory storage, not persistent across restartsPostgresStore: Persistent storage backed by PostgreSQL
Thread Safety
All trait implementations must be thread-safe. ETL calls these methods concurrently from:
- Multiple table sync workers (parallel initial copy)
- Apply worker (streaming changes)
- Pipeline coordination
Use Arc<Mutex<_>>, RwLock, or similar synchronization primitives for shared state.
Next Steps
- Custom Stores and Destinations: Implement these traits
- Event Types: Events received by
write_events() - Architecture: How these components fit together