Skip to content

ETL Architecture

How ETL replicates data from Postgres to your destinations

ETL uses Postgres logical replication to stream database changes in real time.

flowchart LR
subgraph Postgres
WAL["WAL + Publication"]
end
subgraph Pipeline
Apply["Apply Worker"]
Sync["Table Sync Workers"]
end
subgraph Storage
Store["Store<br>(State + Schema)"]
end
subgraph Target
Dest["Destination<br>(Built-ins + Custom)"]
end
WAL --> Apply
Apply --> Sync
Apply --> Dest
Sync --> Dest
Apply --> Store
Sync --> Store

ETL operates in two phases:

When a pipeline starts, it copies all existing data from each table in the publication. Multiple Table Sync Workers run in parallel to copy tables concurrently. Each worker:

  1. Creates a replication slot to capture a consistent snapshot
  2. Copies all rows using Postgres COPY
  3. Sends rows to the destination via write_table_rows()

Note: During this phase, Begin and Commit events may be delivered multiple times because workers consume slots in parallel. This is expected: destinations should rely on per-table event ordering and idempotent writes rather than treating transaction markers as a complete global transaction boundary.

Once tables are copied, the Apply Worker streams ongoing changes from the Postgres WAL. It:

  1. Receives change events (inserts, updates, deletes)
  2. Batches events for efficiency
  3. Sends batches to the destination via write_events()

ETL supports schema changes for simple ALTER TABLE column evolution. A source-side event trigger emits internal DDL messages for published permanent tables, ETL stores a new schema snapshot, and destinations observe the change through a fresh Relation event before following row events. Today ETL intentionally models add, drop, and rename column cases; broader DDL behavior is being improved. See Schema Changes for exact semantics and limitations.

The central orchestrator that manages the entire replication process. It spawns workers, coordinates state transitions, and handles shutdown.

Where replicated data goes. Implement the Destination trait to send data anywhere:

pub trait Destination {
fn name() -> &'static str;
fn shutdown(&self) -> impl Future<Output = EtlResult<()>> + Send { async { Ok(()) } }
fn drop_table_for_copy(&self, replicated_table_schema: &ReplicatedTableSchema, async_result: DropTableForCopyResult<()>) -> impl Future<Output = EtlResult<()>> + Send;
fn write_table_rows(&self, replicated_table_schema: &ReplicatedTableSchema, rows: Vec<TableRow>, async_result: WriteTableRowsResult<()>) -> impl Future<Output = EtlResult<()>> + Send;
fn write_events(&self, events: Vec<Event>, async_result: WriteEventsResult<()>) -> impl Future<Output = EtlResult<()>> + Send;
}
MethodWhen calledPurpose
name()On initializationIdentify the destination
drop_table_for_copy()Before restarting a table copy when previous destination state existsDrop the existing destination object and destination-private replay state using the previously stored replicated schema
write_table_rows()During initial copyReceive bulk rows for the current replicated schema
write_events()During catch-up and continuous replicationReceive streaming changes

Each write-like method receives an async result handle. The intent is different per method:

  • write_events(): after dispatch succeeds, ETL may keep processing other work while the destination finishes the batch. ETL still waits for that batch's async result before handing the destination the next streaming batch.
  • drop_table_for_copy() and write_table_rows(): ETL waits for the result immediately. After a successful drop, ETL clears its own copy-scoped schema, destination metadata, and table-sync progress before storing the fresh 0/0 copy schema.

Persists pipeline state so replication can resume after restarts. Three traits work together:

  • StateStore: Tracks table state, durable replication progress, and destination table metadata
  • SchemaStore: Stores versioned table schema information (columns, types, primary keys, snapshot IDs) and prunes obsolete schema versions after acknowledged progress while preserving the retained boundary schema and newer versions
  • TableStateLifecycleStore: Prepares table-copy state, resets table states for resync, and deletes all ETL-owned state when a table leaves the publication

StateStore and SchemaStore use a cache-first pattern: normal reads hit an in-memory cache, startup loaders hydrate that cache from persistent storage, and writes go to both the cache and persistent storage. get_table_schema() may load a missing version from persistent storage, while get_table_schemas() returns cached schemas only. Schema pruning follows the same rule for implementations with durable storage: obsolete versions are removed from both the cache and the persistent store.

ETL provides at-least-once delivery. If restarts occur, some events may be delivered more than once. This is a deliberate design choice.

Exactly-once delivery requires distributed transactions between Postgres and the destination, adding complexity and latency. Instead, ETL optimizes for throughput and simplicity while minimizing duplicates through:

  • Controlled shutdown: The pipeline gracefully drains in-flight events before stopping
  • Frequent status updates: Progress is reported to Postgres regularly, reducing the replay window after restarts

Destinations should make writes idempotent using the source table's replica identity or primary key plus ETL's event ordering metadata. For append-style CDC tables, persist a sequence key derived from commit_lsn and tx_ordinal. For current-state tables, upsert by the destination's chosen row key so replayed events converge to the same state.

The start_lsn and commit_lsn fields on events are useful for ordering and checkpointing. For example, BigQuery destinations use these to maintain correct event order in destination tables. See Event Types for details on LSN semantics.

Each table progresses through these states:

StateSet ByDescription
InitPipelineTable discovered, ready for initial copy
DataSyncTable Sync WorkerInitial table copy in progress
FinishedCopyTable Sync WorkerInitial copy complete
SyncWaitTable Sync WorkerWaiting for Apply Worker to pause (in-memory only)
CatchupApply WorkerApply Worker paused; Table Sync Worker catching up to its LSN (in-memory only)
SyncDoneTable Sync WorkerCatch-up complete, ready for handoff
ReadyApply WorkerApply Worker now handles this table exclusively
ErroredEitherError occurred; contains reason, solution hint, and retry policy