1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295 | use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::info;
use etl::error::EtlResult;
use etl::state::table::TableReplicationPhase;
use etl::store::schema::SchemaStore;
use etl::store::state::StateStore;
use etl::store::cleanup::CleanupStore;
use etl::types::{TableId, TableSchema};
// Represents data stored in our in-memory cache for fast access
#[derive(Debug, Clone)]
struct CachedEntry {
schema: Option<Arc<TableSchema>>, // Table structure info
state: Option<TableReplicationPhase>, // Current replication progress
mapping: Option<String>, // Source -> destination table mapping
}
// Represents data as it would be stored persistently (e.g., in files/database)
#[derive(Debug, Clone)]
struct PersistentEntry {
schema: Option<TableSchema>, // Not Arc-wrapped in "persistent" storage
state: Option<TableReplicationPhase>,
mapping: Option<String>,
}
#[derive(Debug, Clone)]
pub struct CustomStore {
// Fast in-memory cache for all read operations - this is what ETL queries
cache: Arc<Mutex<HashMap<TableId, CachedEntry>>>,
// Simulated persistent storage - in real implementation this might be files/database
persistent: Arc<Mutex<HashMap<TableId, PersistentEntry>>>,
}
impl CustomStore {
pub fn new() -> Self {
info!("Creating custom store with dual-layer architecture (cache + persistent)");
Self {
cache: Arc::new(Mutex::new(HashMap::new())),
persistent: Arc::new(Mutex::new(HashMap::new())),
}
}
// Helper to ensure we have a cache entry to work with - creates if missing
fn ensure_cache_slot<'a>(
cache: &'a mut HashMap<TableId, CachedEntry>,
id: TableId,
) -> &'a mut CachedEntry {
cache.entry(id).or_insert_with(|| {
// Initialize empty entry if this table hasn't been seen before
CachedEntry {
schema: None,
state: None,
mapping: None
}
})
}
// Helper to ensure we have a persistent entry to work with - creates if missing
fn ensure_persistent_slot<'a>(
persistent: &'a mut HashMap<TableId, PersistentEntry>,
id: TableId,
) -> &'a mut PersistentEntry {
persistent.entry(id).or_insert_with(|| {
// Initialize empty persistent entry if this table hasn't been seen before
PersistentEntry {
schema: None,
state: None,
mapping: None
}
})
}
}
// Implementation of ETL's SchemaStore trait - handles table structure information
impl SchemaStore for CustomStore {
// ETL calls this frequently during data processing - must be fast (cache-only)
async fn get_table_schema(&self, table_id: &TableId) -> EtlResult<Option<Arc<TableSchema>>> {
let cache = self.cache.lock().await;
let result = cache.get(table_id).and_then(|e| e.schema.clone());
info!("Schema cache read for table {}: found={}", table_id.0, result.is_some());
Ok(result)
}
// Return all cached schemas - used by ETL for bulk operations
async fn get_table_schemas(&self) -> EtlResult<Vec<Arc<TableSchema>>> {
let cache = self.cache.lock().await;
let schemas: Vec<_> = cache.values()
.filter_map(|e| e.schema.clone()) // Only include entries that have schemas
.collect();
info!("Retrieved {} schemas from cache", schemas.len());
Ok(schemas)
}
// Called at startup - load persistent data into cache for fast access
async fn load_table_schemas(&self) -> EtlResult<usize> {
info!("Loading schemas from persistent storage into cache (startup phase)");
let persistent = self.persistent.lock().await;
let mut cache = self.cache.lock().await;
let mut loaded = 0;
for (id, pentry) in persistent.iter() {
if let Some(schema) = &pentry.schema {
// Move schema from persistent storage to cache, wrapping in Arc for sharing
let centry = Self::ensure_cache_slot(&mut cache, *id);
centry.schema = Some(Arc::new(schema.clone()));
loaded += 1;
}
}
info!("Loaded {} schemas into cache from persistent storage", loaded);
Ok(loaded)
}
// Store new schema - implements dual-write pattern (persistent first, then cache)
async fn store_table_schema(&self, table_schema: TableSchema) -> EtlResult<()> {
let id = table_schema.id;
info!("Storing schema for table {} using dual-write pattern", id.0);
// First write to persistent storage (this would be a file/database in reality)
{
let mut persistent = self.persistent.lock().await;
let p = Self::ensure_persistent_slot(&mut persistent, id);
p.schema = Some(table_schema.clone());
}
// Then update cache for immediate availability
{
let mut cache = self.cache.lock().await;
let c = Self::ensure_cache_slot(&mut cache, id);
c.schema = Some(Arc::new(table_schema));
}
Ok(())
}
}
// Implementation of ETL's StateStore trait - handles replication progress tracking
impl StateStore for CustomStore {
// Get current replication state for a table - cache-only for speed
async fn get_table_replication_state(
&self,
table_id: TableId,
) -> EtlResult<Option<TableReplicationPhase>> {
let cache = self.cache.lock().await;
let result = cache.get(&table_id).and_then(|e| e.state.clone());
info!("State cache read for table {}: {:?}", table_id.0, result);
Ok(result)
}
// Get all replication states - used by ETL to understand overall progress
async fn get_table_replication_states(
&self,
) -> EtlResult<HashMap<TableId, TableReplicationPhase>> {
let cache = self.cache.lock().await;
let states: HashMap<_, _> = cache.iter()
.filter_map(|(id, e)| e.state.clone().map(|s| (*id, s))) // Only include tables with state
.collect();
info!("Retrieved {} table states from cache", states.len());
Ok(states)
}
// Load persistent states into cache at startup
async fn load_table_replication_states(&self) -> EtlResult<usize> {
info!("Loading replication states from persistent storage into cache");
let persistent = self.persistent.lock().await;
let mut cache = self.cache.lock().await;
let mut loaded = 0;
for (id, pentry) in persistent.iter() {
if let Some(state) = pentry.state.clone() {
// Move state from persistent to cache
let centry = Self::ensure_cache_slot(&mut cache, *id);
centry.state = Some(state);
loaded += 1;
}
}
info!("Loaded {} replication states into cache", loaded);
Ok(loaded)
}
// Update replication state - critical for tracking progress, uses dual-write
async fn update_table_replication_state(
&self,
table_id: TableId,
state: TableReplicationPhase,
) -> EtlResult<()> {
info!("Updating replication state for table {} to {:?} (dual-write)", table_id.0, state);
// First persist the state (ensures durability)
{
let mut persistent = self.persistent.lock().await;
let p = Self::ensure_persistent_slot(&mut persistent, table_id);
p.state = Some(state.clone());
}
// Then update cache (ensures immediate availability)
{
let mut cache = self.cache.lock().await;
let c = Self::ensure_cache_slot(&mut cache, table_id);
c.state = Some(state);
}
Ok(())
}
// Rollback state to previous version - not implemented in this simple example
async fn rollback_table_replication_state(
&self,
_table_id: TableId,
) -> EtlResult<TableReplicationPhase> {
// In a real implementation, you'd maintain state history and rollback to previous version
todo!("Implement state history tracking for rollback")
}
// Get table name mapping from source to destination
async fn get_table_mapping(&self, source_table_id: &TableId) -> EtlResult<Option<String>> {
let cache = self.cache.lock().await;
let mapping = cache.get(source_table_id).and_then(|e| e.mapping.clone());
info!("Mapping lookup for table {}: {:?}", source_table_id.0, mapping);
Ok(mapping)
}
// Get all table mappings - used when ETL needs to understand all table relationships
async fn get_table_mappings(&self) -> EtlResult<HashMap<TableId, String>> {
let cache = self.cache.lock().await;
let mappings: HashMap<_, _> = cache.iter()
.filter_map(|(id, e)| e.mapping.clone().map(|m| (*id, m))) // Only include mapped tables
.collect();
info!("Retrieved {} table mappings from cache", mappings.len());
Ok(mappings)
}
// Load persistent mappings into cache at startup
async fn load_table_mappings(&self) -> EtlResult<usize> {
info!("Loading table mappings from persistent storage into cache");
let persistent = self.persistent.lock().await;
let mut cache = self.cache.lock().await;
let mut loaded = 0;
for (id, pentry) in persistent.iter() {
if let Some(m) = &pentry.mapping {
// Load mapping into cache
let centry = Self::ensure_cache_slot(&mut cache, *id);
centry.mapping = Some(m.clone());
loaded += 1;
}
}
info!("Loaded {} table mappings into cache", loaded);
Ok(loaded)
}
// Store a new table mapping (source table -> destination table name)
async fn store_table_mapping(
&self,
source_table_id: TableId,
destination_table_id: String,
) -> EtlResult<()> {
info!(
"Storing table mapping: {} -> {} (dual-write)",
source_table_id.0, destination_table_id
);
// First persist the mapping
{
let mut persistent = self.persistent.lock().await;
let p = Self::ensure_persistent_slot(&mut persistent, source_table_id);
p.mapping = Some(destination_table_id.clone());
}
// Then update cache
{
let mut cache = self.cache.lock().await;
let c = Self::ensure_cache_slot(&mut cache, source_table_id);
c.mapping = Some(destination_table_id);
}
Ok(())
}
}
// Cleanup primitives spanning both schema and state storage
impl CleanupStore for CustomStore {
// Delete everything ETL tracks for a specific table in a consistent, idempotent way
async fn cleanup_table_state(&self, table_id: TableId) -> EtlResult<()> {
{
// Remove from persistent storage first
let mut persistent = self.persistent.lock().await;
persistent.remove(&table_id);
}
{
// Then clear the cache to maintain consistency
let mut cache = self.cache.lock().await;
cache.remove(&table_id);
}
Ok(())
}
}
|