Author | Li Zongwen
Backgroud of SeaTunnel CDC support
Change data capture (CDC) refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in real-time to a downstream process or system.
CDC is mainly divided into two ways: query-based and Binlog-based.
We know that MySQL has binlog (binary log) to record the user's changes to the database, so it is logical that one of the simplest and most efficient CDC implementations can be done using binlog. Of course, there are already many open source MySQL CDC implementations that work out-of-the-box. Using binlog is not the only way to implement CDC (at least for MySQL), even database triggers can perform similar functions, but they may be dwarfed in terms of efficiency and impact on the database.
Typically, after a CDC captures changes to a database, it will publish the change events to a message queue for consumers to consume, such as Debezium, which persists MySQL (and also supports PostgreSQL, Mongo, etc.) changes to Kafka, and by subscribing to the events in Kafka, we can get the content of the changes and implement the functionality we need.
I think CDC is crucial to data synchronization, and we need SeaTunnel to support it as a feature. Here below is the reason why SeaTunnel need this feature and my overall design upon it. Hope to hear from you all about how it can be implemented in SeaTunnel in the best way.
Motivation
- Support parallel reading of historical data (Fast synchronization, billions of large table)
- Support reading incremental data (CDC)
- Support heartbeat detection (metrics, small traffic table)
- Support for dynamically adding new tables (Make it easier to operate and maintain)
This design does not support
- Multi-table and sharding (Easy Configuration)
- Schema evolution(DDL)
Overall Design
Basic flow
The CDC basic process contains:
- Snapshot phase: Read the history data of the table
- Minimum split granularity: the primary key range data of a table
- Incremental phase: Read the incremental log change data for the table
- Minimum split granularity: a table
Snapshot phase
The enumerator generates multiple SnapshotSplit
s of a table and assigns them to the reader.
// pseudo-code.
public class SnapshotSplit implements SourceSplit {
private final String splitId;
private final TableId tableId;
private final SeaTunnelRowType splitKeyType;
private final Object splitStart;
private final Object splitEnd;
}
When a SnapshotSplit
reading is completed, the reader reports the high watermark of the split to the enumerator.
When all SnapshotSplit
report high watermark, the enumerator start the incremental phase.
// pseudo-code.
public class CompletedSnapshotSplitReportEvent implements SourceEvent {
private final String splitId;
private final Offset highWatermark;
}
Snapshot phase - SnapshotSplit read flow
There are 4 steps:
- Log low watermark: get current log offset before reading snapshot data.
-
Read SnapshotSplit data: Read the range data belonging to the split
- Case 1: step 1 & 2 cannot be atomized (MySQL)
Because we can't add table locks, nor can we add interval locks based on the low watermark, steps 1 & 2 are not atomic.
- Exactly-once: use memory table to hold history data & filter the log data from the low to high watermark
- At-least-once: direct output data & use low watermark instead of high watermark
- **Case 2**: step 1 & 2 can be atomized (Oracle)
> You can use `for scn` to ensure the atomicity of the two steps
- Exactly-once: direct output data & use low watermark instead of high watermark
- log high watermark:
-
step 2 case 1 & Exactly-once
: get current log offset after reading snapshot data. -
other
: use low watermark instead of high watermark
-
- if high > low watermark, read range log data
Snapshot phase - MySQL Snapshot Read & Exactly-once
Because we can't determine where the query statement is executed between the high and low water levels, in order to ensure the exact-once of the data, we need to use the memory table to temporarily save the data.
- log low watermark: get current log offset before reading snapshot data.
- read SnapshotSplit data: read the range data belonging to the split and write to the memory table.
- log high watermark: get current log offset after reading snapshot data.
- read range log data: read log data and write to memory table
- output the data of the memory table and release memory usage.
Incremental phase
When all snapshot splits report the water level, start the incremental phase.
Combine all snapshot splits and water level information to get LogSplit
s.
We want to minimize the number of log connections:
- In the incremental phase, only one reader works by default, and the user can also configure the option to specify the number (cannot exceed the number of readers)
- A reader gets at most one connection
// pseudo-code.
public class LogSplit implements SourceSplit {
private final String splitId;
/**
* All the tables that this log split needs to capture.
*/
private final List<TableId> tableIds;
/**
* Minimum watermark for SnapshotSplits for all tables in this LogSplit
*/
private final Offset startingOffset;
/**
* Obtained by configuration, may not end
*/
private final Offset endingOffset;
/**
* SnapshotSplit information for all tables in this LogSplit.
* </br> Used to support Exactly-Once.
*/
private final List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos;
/**
* Maximum watermark in SnapshotSplits per table.
* </br> Used to delete information in completedSnapshotSplitInfos, reducing state size.
* </br> Used to support Exactly-Once.
*/
private final Map<TableId, Offset> tableWatermarks;
}
// pseudo-code.
public class CompletedSnapshotSplitInfo implements Serializable {
private final String splitId;
private final TableId tableId;
private final SeaTunnelRowType splitKeyType;
private final Object splitStart;
private final Object splitEnd;
private final Offset watermark;
}
Exactly-Once:
- Phase 1: Use completedSnapshotSplitInfos filter before the watermark data.
- Phase 2: A table no longer needs to be filtered, delete the data belonging to the table in completedSnapshotSplitInfos, because the following data needs to be processed.
At-Least-Once: No need to filter data, and completedSnapshotSplitInfos doesn't need any data.
Dynamic discovery of new tables
Case 1: When a new table is discovered, the enumerator is in the snapshot phase and directly assigns a new split.
Case 2: When a new table is discovered, the enumerator is in the increment phase.
Dynamic discovery of new tables in the increment phase.
- Suspend the LogSplit reader. (Do we need to suspend the reader now if there is an idle reader?)
- Reader performs suspend operation.
- Reader report current log offset. (If it is not reported, the reader needs to support the combination of LogSplit)
- Assign SnapshotSplit to reader.
- Reader execution snapshot phase read.
- Reader report all SnapshotSplit watermark.
- Assign a new LogSplit to the reader.
- The reader starts incremental reading again and ACK to the enumerator.
Multiple structured tables
- Advantages: take up fewer database connections, reduce database pressure
- Disadvantage: In the SeaTunnel engine, multiple tables will be in a pipeline, and the granularity of fault tolerance will become larger.
This feature expects that the source can support the reading of multiple structure tables, and then use the side stream output to be consistent with the single table stream.
Also since this will involve changes to the DAG and translation modules, I also expect support for defining partitioners (Hash and forward).
Some features have been implemented, can see #2490
What's your opinion on this schema? Or if you're interested in building it with our team, welcome to comment under the issue to join us: https://github.com/apache/incubator-seatunnel/issues/2394
Top comments (0)