So hello everyone!
Here I am, taking you in this journey of my attempt on creating a Flink BigQuery Source Connector.
For the sake of it, I will skip a lot of "What's Flink", "BigQuery: An introduction" etc.. etc.. and I will try to jump directly to the action.
FOR THE SAKE OF IT: this is a tentative to make things work. They work, badly, inneficiently, and in a very ugly way, but they work.
But let's begin at least with one important image taken from Flink Official Documentation.
From what we can understand, writing a Source Connector needs at least to implementing the DataSource structure
Let's start looking at my project tree, just to have an overview
lib/src
├── main
│ ├── java
│ │ └── com
│ │ └── antoniocali
│ │ ├── Library.java
│ │ ├── bq
│ │ │ ├── BigQueryClient.java
│ │ │ ├── BigQueryReadOptions.java
│ │ │ ├── BigQueryUtils.java
│ │ │ └── converters
│ │ │ ├── AvroToRowDataConverters.java
│ │ │ └── FieldValueListToRowDataConverters.java
│ │ └── sources
│ │ ├── BigQueryDataStreamSource.java
│ │ ├── emitter
│ │ │ └── BigQueryRecordEmitter.java
│ │ ├── reader
│ │ │ ├── BigQuerySourceReader.java
│ │ │ ├── BigQuerySplitEnumerator.java
│ │ │ └── BigQuerySplitReader.java
│ │ └── split
│ │ ├── BigQuerySourceSplit.java
│ │ ├── BigQuerySourceSplitSerializer.java
│ │ ├── BigQuerySourceSplitState.java
│ │ ├── BigQuerySplitEnumeratorState.java
│ │ └── BigQuerySplitEnumeratorStateSerializer.java
Wow a lot of files. Yup.
I will problably go in each one LOL SO BE READY!.
But let's make a step back and describe the image above and each of those components.
Split
So what's a Split? (the little yellow box in the graph above)
A split is a base unit of "data" operation in Flink. I would describe mostly like a metadata of information more than data itself, at least in my case, because it contains the information needed to retrieve the actual data from BigQuery.
Let's take a look at the code
// antoniocali/sources/split/BigQuerySourceSplit.java
import org.apache.flink.api.connector.source.SourceSplit;
import java.io.Serializable;
public class BigQuerySourceSplit implements SourceSplit, Serializable {
private final String splitName;
private final Long minTimestamp;
private final Long maxTimestamp;
public BigQuerySourceSplit(String splitName, Long minTimestamp, Long maxTimestamp) {
this.splitName = splitName;
this.maxTimestamp = maxTimestamp;
this.minTimestamp = minTimestamp;
}
@Override
public String splitId() {
return splitName;
}
public String getSplitName() {
return splitName;
}
public Long getMinTimestamp() {
return minTimestamp;
}
public Long getMaxTimestamp() {
return maxTimestamp;
}
@Override
public String toString() {
return "BigQuerySourceSplit{" + "splitName='" + splitName + '\'' + ", minTimestamp=" + minTimestamp + ", maxTimestamp=" + maxTimestamp + '}';
}
}
In this case a Split (that I called BigQuerySourceSplit) contains the information needed for then retrieve the data from BigQuery.
I think it's worth mentioning that in this case a BigQuery Flink Source is designed to read a single BigQuery Table, and that's why the only informations needed, in terms of useful metadata, are minTimestamp
and maxTimestamp
.
I do not need to have the information of the project.dataset.table
of BigQuery because they are "fix" metadata of the Source.
To clarify, when we will cover later how to collect data from BigQuery using a Split, we could expect a query like
SELECT *
FROM project.dataset.table
WHERE ??? >= minTimestamp AND ??? < maxTimestamp
The ???
will fulfilled at runtime, based on a Configuration option of my Source Connector, that contains a Timestamp column of the table that can be used for "chunking" the table. (e.g. created_at
).
Since we are talking about Split, let's also see all the class around it.
A very first simple one is the SerDe class for the Split. Since most of information describes run across the network, we need a simple SerDe for the Split.
// antoniocali/sources/split/BigQuerySourceSplitSerializer.java
public class BigQuerySourceSplitSerializer implements SimpleVersionedSerializer<BigQuerySourceSplit> {
public static final BigQuerySourceSplitSerializer INSTANCE = new BigQuerySourceSplitSerializer();
final int VERSION = 0;
@Override
public int getVersion() {
return VERSION;
}
@Override
public byte[] serialize(BigQuerySourceSplit bigQuerySourceSplit) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(
baos)) {
out.writeUTF(bigQuerySourceSplit.getSplitName());
out.writeLong(bigQuerySourceSplit.getMinTimestamp());
out.writeLong(bigQuerySourceSplit.getMaxTimestamp());
out.flush();
return baos.toByteArray();
}
}
@Override
public BigQuerySourceSplit deserialize(int version, byte[] serialized) throws IOException {
if (getVersion() != version) {
throw new IllegalArgumentException(
String.format("The provided serializer version (%d) is not expected (expected : %s).", version,
VERSION));
}
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(
bais)) {
switch (version) {
case VERSION:
String splitName = in.readUTF();
long minTimestamp = in.readLong();
long maxTimestamp = in.readLong();
return new BigQuerySourceSplit(splitName, minTimestamp, maxTimestamp);
default:
throw new IOException("Unknown version: " + version);
}
}
}
}
A more interesting class is the State of a Split.
A State contains the "latest" snapshot of a Split.
In my scenario the State contains the latest timestamp "processed".
A State is important for when creating a Checkpoint
, needed for restoring a Split in case of errors.
// antoniocali/sources/split/BigQuerySourceSplitState.java
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import java.io.Serializable;
@Getter
@Setter
public class BigQuerySourceSplitState implements Serializable {
private String splitId;
private Long minCurrentTimestamp;
private Long maxCurrentTimestamp;
public BigQuerySourceSplitState(@NonNull String splitId, @NonNull Long minCurrentTimestamp,
@NonNull Long currentTimestamp) {
this.splitId = splitId;
this.minCurrentTimestamp = minCurrentTimestamp;
this.maxCurrentTimestamp = currentTimestamp;
}
public BigQuerySourceSplit toBigQuerySourceSplit() {
return new BigQuerySourceSplit(splitId, minCurrentTimestamp, maxCurrentTimestamp);
}
}
Good, now we have our split and we understand how it works. What's next?
Back to the graph!
SplitEnumerator
Ok, we have a split, but now? We need a way to create Splits, and the SplitEnumerator
class comes in our help.
⚠️ Pay also attention on where these classes live: the Master/Job Manager in Flink has the job to create Splits and assign them to each Task process.
This compared to the actual process a reading Split, that it's a Task Manager job.
SplitEnumerator
is one of the most important class to understand, so be ready.
public class BigQuerySplitEnumerator implements SplitEnumerator<BigQuerySourceSplit, BigQuerySplitEnumeratorState> {
private final long DISCOVER_INTERVAL = 60_000L;
private final long INITIAL_DELAY = 0L;
private static final Logger LOG = LoggerFactory.getLogger(BigQuerySplitEnumerator.class);
protected Long maxCurrentTimetstamp;
private final SplitEnumeratorContext<BigQuerySourceSplit> enumContext;
private BigQuerySourceSplit currentSplit;
private boolean isAssigned;
private final BigQueryReadOptions readOptions;
private final TreeSet<Integer> readersAwaitingSplit;
public BigQuerySplitEnumerator(SplitEnumeratorContext<BigQuerySourceSplit> enumContext,
BigQueryReadOptions readOptions, BigQuerySplitEnumeratorState enumeratorState) {
this.enumContext = enumContext;
this.maxCurrentTimetstamp = enumeratorState == null ? 0L : enumeratorState.getCurrentMaxTimestamp();
this.readOptions = readOptions;
this.currentSplit = enumeratorState == null ? new BigQuerySourceSplit("0L", 0L,
Long.MAX_VALUE) : enumeratorState.getCurrentSplit();
this.readersAwaitingSplit = new TreeSet<>();
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
SplitEnumerator.super.notifyCheckpointAborted(checkpointId);
}
@Override
public void start() {
LOG.info("Starting BigQuery split enumerator");
this.scheduleNextSplit();
}
@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
if (!enumContext.registeredReaders().containsKey(subtaskId)) {
// reader failed between sending the request and now. skip this request.
return;
}
readersAwaitingSplit.add(subtaskId);
}
@Override
public void addSplitsBack(List<BigQuerySourceSplit> list, int subTaskId) {
if (!list.isEmpty()) {
this.currentSplit = list.remove(0);
}
this.isAssigned = false;
}
@Override
public void addReader(int i) {
}
@Override
public BigQuerySplitEnumeratorState snapshotState(long checkpointId) throws Exception {
return new BigQuerySplitEnumeratorState(this.currentSplit, this.isAssigned, this.maxCurrentTimetstamp);
}
@Override
public void close() throws IOException {
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
SplitEnumerator.super.notifyCheckpointComplete(checkpointId);
}
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
SplitEnumerator.super.handleSourceEvent(subtaskId, sourceEvent);
}
Optional<BigQuerySourceSplit> discoverNewSplit() {
try {
BigQuery bigquery = BigQueryClient.builder().setReadOptions(this.readOptions).build().getBigQuery();
var tableName = readOptions.getFullTableName(true);
var query = "SELECT MAX(" + readOptions.getColumnFetcher() + ") as max_timestamp FROM `" + tableName + "`";
LOG.info("Discovering new split - Query: {}", query);
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).setUseLegacySql(false).build();
TableResult result = bigquery.query(queryConfig);
var maxColumnFetcher = result.iterateAll().iterator().next().get("max_timestamp");
var maxTimestamp = convertFieldValueToEpochTime(maxColumnFetcher);
if (this.maxCurrentTimetstamp == 0L) {
LOG.info("Initial Load");
return Optional.of(new BigQuerySourceSplit("InitialLoad", 0L, maxTimestamp));
}
if (maxTimestamp > maxCurrentTimetstamp) {
LOG.info("Found a new split with new timestamp: {}", maxTimestamp);
return Optional.of(
new BigQuerySourceSplit(Long.toString(maxTimestamp), maxCurrentTimetstamp, maxTimestamp));
} else {
LOG.info("No new split found");
return Optional.empty();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
void handleDiscoverNewSplitResult(Optional<BigQuerySourceSplit> newSplit, Throwable t) {
if (t != null) {
throw new RuntimeException(t);
}
if (newSplit.isEmpty()) {
// No new split
return;
}
this.currentSplit = newSplit.get();
this.maxCurrentTimetstamp = this.currentSplit.getMaxTimestamp();
this.assignSplit();
}
void assignSplit() {
final Iterator<Integer> awaitingReader = readersAwaitingSplit.iterator();
LOG.info("Assigning split to readers");
while (awaitingReader.hasNext()) {
int nextAwaiting = awaitingReader.next();
// if the reader that requested another split has failed in the meantime, remove
// it from the list of waiting readers
if (!enumContext.registeredReaders().containsKey(nextAwaiting)) {
awaitingReader.remove();
continue;
}
Optional<BigQuerySourceSplit> split = Optional.of(currentSplit);
final BigQuerySourceSplit bqSplit = split.get();
enumContext.assignSplit(bqSplit, nextAwaiting);
this.isAssigned = true;
awaitingReader.remove();
}
}
private void scheduleNextSplit() {
LOG.info("Scheduling Discovery next split");
this.enumContext.callAsync(this::discoverNewSplit, this::handleDiscoverNewSplitResult, INITIAL_DELAY,
DISCOVER_INTERVAL);
}
public long convertFieldValueToEpochTime(FieldValue fieldValue) {
if (fieldValue.isNull()) {
throw new IllegalArgumentException("FieldValue is null");
}
try {
return fieldValue.getTimestampValue();
} catch (NumberFormatException e) {
LocalDate date = LocalDate.parse(fieldValue.getStringValue());
return date.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli();
}
}
}
My BigQuerySplitEnumerator
implements the SplitEnumerator
interface, which needs two generics <SplitT extends SourceSplit, CheckpointT>
.
In our case SplitT
is the Split we created before - BigQuerySourceSplit
- and CheckpointT
- in my case BigQuerySplitEnumeratorState
- is the State
of the SplitEnumerator.
On Constructor side I've decided to pass few things:
- A
SplitEnumeratorContext
that:- Host information necessary for the SplitEnumerator to make split assignment decisions.
- Accept and track the split assignment from the enumerator.
- Provide a managed threading model so the split enumerators do not need to create their own internal threads.
- A
BigQueryOptions
class that contains information to connect to BigQuery - A
BigQuerySplitEnumeratorState
to recover from a Checkpoint the State.
Worth to mention the TreeSet<Integer> readersAwaitingSplit
, contains Readers (i.e. Task Processors) that have request a Split and are waiting for that to be assigned.
BIG BREAK
At this point I need to stop and mention how bad my implementation is.
For how I've coded the project there is only ONE Split available and alive at time.
This whole project was a way for me to understand the behind-the-scene of Flink.
The result of this decision is that SplitEnumerator
will just generate one split at time and that's why, for example, I have a boolean variable named isAssigned
that tracks this only Split.
Let's move to the methods that are not self-explanatory.
A SplitEnumerator
needs was a way to discover new splits, in other words, to create them.
To do so I've used the following three methods - called by scheduleNextSplit
method:
discoverNewSplit
handleDiscoverNewSplitResult
assignSplit
The overridden start
method calls scheduleNextSplit
that schedule discoverNewSplit
every minute.
The return value of it will be passed to handleDiscoverNewSplitResult
that, in case of a new Split, will assign it via assignSplit
method..
I know it's confusing, but the idea is to have a scheduled method that discover and assign Splits.
The discovery phase is based on a simple query
SELECT MAX(" + readOptions.getColumnFetcher() + ") as max_timestamp FROM `" + tableName + "`
Every minute, it retrieves the latest timestamp available in the table. If this timestamp is greater than the most recent one stored internally, a new split is created.
In case a new Split is created, the SplitEnumerator
would check if there are any Readers waiting for a Split, assign the split to the first one, and update the internal State.
Let's take a break
Ok, let's stop a seconds and try to recap what we have understood so far.
SplitEnumerator
creates splits and assign them to Readers that are waiting in idle.
A Split contains important metadata needed to retrieve the actual data.
A State class, in general, is used for creating a checkpoint.
A SerDe class is needed for all those components that needs to be serialized and deserialized.
Now that we know how a Split is created, we need to understand how to retrieve data based on the information stored on the Split.
SplitReader
Here we are.
We have an understanding of what a Split is.
We understood how discover and create and assign them.
Now we need a way to actually collect and read the data.
The SplitReader
class is what we need.
Important A single SplitReader
class could read from a multiple splits, but since we have only one Split available at time, the implementation of it would be easier.
// antoniocali/sources/reader/BigQuerySplitReader.java
public class BigQuerySplitReader implements SplitReader<RowData, BigQuerySourceSplit> {
private static final Logger LOG = LoggerFactory.getLogger(BigQuerySplitReader.class);
private final Queue<BigQuerySourceSplit> assignedSplits = new ArrayDeque<>();
private final BigQueryReadOptions readOptions;
private Long currentTimestamp = 0L;
private Boolean closed = false;
public BigQuerySplitReader(BigQueryReadOptions readOptions) {
this.readOptions = readOptions;
}
// To check where to use
Long currentTimestampToFetch(BigQuerySourceSplit split) {
if (split.getMaxTimestamp() > 0) {
currentTimestamp = split.getMaxTimestamp();
}
return currentTimestamp;
}
private Iterator<FieldValueList> retrieveSplitData(BigQuerySourceSplit bqSplit) throws IOException, InterruptedException {
BigQueryClient bigQueryClient = BigQueryClient.builder().setReadOptions(readOptions).build();
String sqlQuery = BigQueryUtils.getSqlFromSplit(bqSplit, readOptions);
LOG.info("Retrieving Data for - {}", bqSplit);
LOG.info("SQL query: {}", sqlQuery);
BigQuery bigQuery = bigQueryClient.getBigQuery();
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build();
TableResult tableResult = bigQuery.query(queryConfig);
return tableResult.iterateAll().iterator();
}
@Override
public RecordsWithSplitIds<RowData> fetch() throws IOException {
if (closed) {
throw new IllegalStateException("Can't fetch records from a closed split reader.");
}
RecordsBySplits.Builder<RowData> respBuilder = new RecordsBySplits.Builder<>();
var currentSplit = Optional.ofNullable(assignedSplits.poll());
if (currentSplit.isEmpty()) {
LOG.info("current split is empty");
return respBuilder.build();
}
BigQueryClient bigQueryClient = BigQueryClient.builder().setReadOptions(readOptions).build();
TableSchema tableSchema = bigQueryClient.getTableSchema();
Schema avroSchema = SchemaTransform.toGenericAvroSchema(readOptions.getFullTableName(true),
tableSchema.getFields());
RowType rowType = (RowType) FlinkAvroUtils.AvroSchemaToRowType(avroSchema.toString()).getTypeAt(0);
FieldValueListToRowDataConverters.FieldValueListToRowDataConverter fieldValueListToRowDataConverter = FieldValueListToRowDataConverters.createRowConverter(
rowType);
var actualSplit = currentSplit.get();
LOG.info("actual split is {}", actualSplit);
var read = 0L;
try {
var records = retrieveSplitData(actualSplit);
while (records.hasNext()) {
var record = records.next();
read++;
var converted = fieldValueListToRowDataConverter.convert(record);
respBuilder.add(actualSplit.splitId(), converted);
}
respBuilder.addFinishedSplit(actualSplit.splitId());
currentTimestamp = actualSplit.getMaxTimestamp();
LOG.info("Finish Reading {} - Total Records {}", actualSplit, read);
return respBuilder.build();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void handleSplitsChanges(SplitsChange<BigQuerySourceSplit> splitsChanges) {
LOG.debug("Handle split changes {}.", splitsChanges);
assignedSplits.addAll(splitsChanges.splits());
}
@Override
public void wakeUp() {
}
@Override
public void pauseOrResumeSplits(Collection<BigQuerySourceSplit> splitsToPause,
Collection<BigQuerySourceSplit> splitsToResume) {
SplitReader.super.pauseOrResumeSplits(splitsToPause, splitsToResume);
}
@Override
public void close() throws Exception {
if (!closed) {
closed = true;
currentTimestamp = 0L;
}
}
}
Let's focus on what's important - the fetch
method.
I've used an internal class called RecordsBySplits
, that provides us a easy way to push data to a Split Blocking Queue.
We finally see the query against BigQuery used to retrieve data.
It's perfomed by the following call var records = retrieveSplitData(actualSplit);
and it looks like this
"SELECT * " +
" FROM " + tableName +
" WHERE " + columnNameConverted +
" BETWEEN " + bigQuerySourceSplit.getMinTimestamp() +
" AND " + bigQuerySourceSplit.getMaxTimestamp();
The fetch
method does also few extra steps.
My idea was to be also TableApi
compatible.
So I've created a Converter from FieldValueList to RowData - the abstract class used by TableApi.
New Break
We are starting to connects all the dots.
It's important to mention that the main job of a SplitReader is just to push Records in the Reader BlockingQueue, so Flink can pass it to the downstream.
If you're familiar with Flink, you know that usually a pipeline starts as env.fromSource(???).<Operators>.to()
So far we checked single components, but we are missing what is needed to fulfill those ??? - a Source
. And that's is our next step, where all the links are going to be connected.
Let's take a look of what we need for creating a Source in Flink.
We need to create two classes:
- a
SourceReader
class: more likelySourceReaderBase
, an easier provided implementation of it, that provides some synchronization between the mail box main thread and the SourceReader internal threads. - a
Source
class: which acts like a factory class that connects in one spot the SplitEnumerator and SourceReader.
Let's take a look at the SourceReaderBase
first, since the Source
is the final glue to everything we built so far.
If we take a look at our very first image on the article we can understand what a SourceReaderBase
actually does.
We need a mechanism for the Reader to requests Split to the Job processor and process it.
In my follow example I use a provided implementation of the SourceReaderBase
called SingleThreadMultiplexSourceReaderBase
that use a SingleThread to perform the following steps:
- Request a Split
- Read the data of the Split using a
SplitReader
public class BigQuerySourceReader
extends SingleThreadMultiplexSourceReaderBase<RowData, RowData, BigQuerySourceSplit, BigQuerySourceSplitState> {
private static final Logger LOG = LoggerFactory.getLogger(BigQuerySourceReader.class);
public BigQuerySourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<RowData>> elementsQueue,
Supplier<SplitReader<RowData, BigQuerySourceSplit>> splitFetcherManager,
RecordEmitter<RowData, RowData, BigQuerySourceSplitState> recordEmitter, Configuration config,
SourceReaderContext context) {
super(elementsQueue, splitFetcherManager, recordEmitter, config, context);
}
@Override
public void start() {
if (getNumberOfCurrentlyAssignedSplits() == 0) {
context.sendSplitRequest();
}
}
@Override
protected void onSplitFinished(Map<String, BigQuerySourceSplitState> finishedSplitIds) {
for (BigQuerySourceSplitState splitState : finishedSplitIds.values()) {
BigQuerySourceSplit sourceSplit = splitState.toBigQuerySourceSplit();
LOG.info("Read for split {} is completed.", sourceSplit.splitId());
}
context.sendSplitRequest();
}
@Override
protected BigQuerySourceSplitState initializedState(BigQuerySourceSplit split) {
return new BigQuerySourceSplitState(split.splitId(), split.getMinTimestamp(), split.getMaxTimestamp());
}
@Override
protected BigQuerySourceSplit toSplitType(String splitId, BigQuerySourceSplitState sst) {
return new BigQuerySourceSplit(splitId, sst.getMinCurrentTimestamp(), sst.getMaxCurrentTimestamp());
}
}
When the Reader starts, it first checks whether it has an assigned split. If not, it sends a request to obtain one.
It's important to mention that SingleThreadMultiplexSourceReaderBase
requires some extra configuration - look at our constructor.
⚠️ It requires a RecordEmitter
: a class that takes a record from the SplitReader, updates the state of the Split and finally emits the records to the downstream. You can see an implementation below
public class BigQueryRecordEmitter implements RecordEmitter<RowData, RowData, BigQuerySourceSplitState>, Serializable {
@Override
public void emitRecord(RowData rowData, SourceOutput<RowData> sourceOutput,
BigQuerySourceSplitState bigQuerySourceSplitState) throws Exception {
sourceOutput.collect(rowData);
}
}
ALERT: I've never implemented a way to update the State of the Split!
⚠️ It requires a FutureCompletingBlockingQueue
: a custom implementation of blocking queue that is used in the hand-over of data from a producing thread to a consuming thread (the same Blocking Queue I've mentioned when I was talking about a SplitReader
)
RECAP before the grand finale.
We saw few things so far:
- What a Split is (Metadata information)
- a
SplitEnumerator
that discover Splits and assigned to Readers (on Job/Master Manager) - a
SplitReader
that collects the data based on Split's metadata information (on Task Manager) - a
SourceReaderBase
that requests for a Split on a Reader when it doesn't have any assigned to (on Task Manager)
And finally we can step to the final class - the Source
.
As mentioned above, it is the glue to all our Lego bricks we have built so far.
Let's take a look.
public class BigQueryDataStreamSource implements Source<RowData, BigQuerySourceSplit, BigQuerySplitEnumeratorState> {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryDataStreamSource.class);
final BigQueryReadOptions readOptions;
public BigQueryDataStreamSource(BigQueryReadOptions readOptions) {
this.readOptions = readOptions;
}
@Override
public Boundedness getBoundedness() {
return Boundedness.CONTINUOUS_UNBOUNDED;
}
@Override
public SplitEnumerator<BigQuerySourceSplit, BigQuerySplitEnumeratorState> restoreEnumerator(
SplitEnumeratorContext<BigQuerySourceSplit> splitEnumeratorContext,
BigQuerySplitEnumeratorState bigQuerySplitEnumeratorState) throws Exception {
LOG.info("Restoring Enumerator with following State: {}", bigQuerySplitEnumeratorState);
return new BigQuerySplitEnumerator(splitEnumeratorContext, readOptions, bigQuerySplitEnumeratorState);
}
@Override
public SplitEnumerator<BigQuerySourceSplit, BigQuerySplitEnumeratorState> createEnumerator(
SplitEnumeratorContext<BigQuerySourceSplit> splitEnumeratorContext) throws Exception {
LOG.info("Creating new Enumerator");
return new BigQuerySplitEnumerator(splitEnumeratorContext, readOptions, null);
}
@Override
public SimpleVersionedSerializer<BigQuerySourceSplit> getSplitSerializer() {
return BigQuerySourceSplitSerializer.INSTANCE;
}
@Override
public SimpleVersionedSerializer<BigQuerySplitEnumeratorState> getEnumeratorCheckpointSerializer() {
return BigQuerySplitEnumeratorStateSerializer.INSTANCE;
}
@Override
public SourceReader<RowData, BigQuerySourceSplit> createReader(SourceReaderContext sourceReaderContext) throws
Exception {
FutureCompletingBlockingQueue<RecordsWithSplitIds<RowData>> elementsQueue = new FutureCompletingBlockingQueue<>();
BigQueryRecordEmitter recordEmitter = new BigQueryRecordEmitter();
Supplier<SplitReader<RowData, BigQuerySourceSplit>> splitReaderSupplier = () -> new BigQuerySplitReader(
readOptions);
return new BigQuerySourceReader(elementsQueue, splitReaderSupplier, recordEmitter, new Configuration(),
sourceReaderContext);
}
}
Let's deep dive in:
-
getBoundiness
provides a description of the Source: if it is Unbounded (STREAMING) or Bounded (Batched) -
createEnumerator
glues a Source to aSplitEnumerator
-
createReader
glues a Source to aSourceReader
Here we are, at the end of our journey.
I know I've done already few recaps but a very last one is needed, since I've also forgot to mention few things.
A Split serves as the fundamental unit of data processing in Flink. It primarily provides the information to retrieve the required data. Each Split is always associated with a SerDe and a State — used for checkpointing.
A SplitEnumerator is responsible for discovering and creating new Splits, as well as assigning them to Readers.
It is also associated with a State, as checkpointing is crucial to track which Splits have been created and read, as well as whether any Splits are currently assigned to Readers and to which ones.
Given that the State of a SplitEnumerator is often complex, a SerDe class is required to handle its serialization and deserialization.
SplitEnumerator lives on Job/Master Process.A SplitReader, responsible for reading the data from a Split and pushing it into the Blocking Queue of Records.
It operates within the Job/Master Process.A SourceReader, responsible for requesting a new Splits.
[Optional] A RecordEmitter, a straightforward implementation for retrieving data from a SplitReader, updating the Split's State, and emitting records downstream.
A Source, the glue connecting all components. It brings together a
SplitEnumerator<Split>
and aSplitReader<Split>
into a unified interface.
Top comments (0)