Apache Paimon is a new data lakehouse format that focuses on solving the challenges of streaming scenarios, but also supports batch processing. Overall, Paimon has the potential to replace the existing Iceberg as the new standard for data lakehousing.
Why Iceberg and not the other two (Hudi and Delta Lake)?
Iceberg is the most widely supported by various open-source engines, including pure query engines (e.g., Trino), New SQL databases (e.g., StarRocks, Doris), and streaming frameworks (e.g., Flink, Spark), all of which support Iceberg.
However, Iceberg faces several problems in streaming scenarios, the most serious one is the fragmentation of small files. Queries in data lakehouses rely heavily on file reads, and if a query has to scan many files at once, it will of course perform poorly.
To avoid this problem, we need to rely on an external orchestrator to merge files on a regular basis. Paimon is designed with a built-in merge mechanism, and many other optimizations for mass writes, making it more adaptable to streaming scenarios.
Experiment environment
In order to learn more about Iceberg, I have set up two experimental environments.
This time I also built a playground for Paimon, which also includes Trino and Flink.
https://github.com/wirelessr/paimon-trino-flink-playground
In addition, StarRocks was also put in as a representative of New SQL.
Because neither Trino nor StarRocks support streaming writes at this stage, Paimon's writes come from Flink.
How to use
NOTE: Since some of the links to the official Paimon files are not working, I've put the files into this repo. However, some of the files are huge, so I put them in via LFS, so be sure to install
git-lfs
.
Trino driver is in paimon-trino-427-0.8-20241112.000605-197-plugin.tar.gz
.
tar -zxvf paimon-trino-427-0.8-20241112.000605-197-plugin.tar.gz
Then it will run normally with docker compose up -d
.
docker compose up -d
Flink
Let's start by connecting to Flink SQL.
docker compose exec flink-jobmanager ./bin/sql-client.sh
./bin/sql-client.sh
To write data using Flink we first need to create the correct catalog.
CREATE CATALOG my_catalog WITH (
'type' = 'paimon',
'warehouse' = 's3://warehouse/flink',
's3.endpoint' = 'http://storage:9000',
's3.access-key' = 'admin',
's3.secret-key' = 'password',
's3.path.style.access' = 'true'
);
As shown in the above commands, we're using the MinIO
as an S3
to store the Paimon.
The next step in creating the table and writing the data is quite simple, just run the commands according to the official documentat.
USE CATALOG my_catalog;
-- create a word count table
CREATE TABLE word_count (
word STRING PRIMARY KEY NOT ENFORCED,
cnt BIGINT
);
-- create a word data generator table
CREATE TEMPORARY TABLE word_table (
word STRING
) WITH (
'connector' = 'datagen',
'fields.word.length' = '1'
);
-- paimon requires checkpoint interval in streaming mode
SET 'execution.checkpointing.interval' = '10 s';
-- write streaming data to dynamic table
INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;
Then we actually read it and see the result of what we've written.
-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';
-- switch to batch mode
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';
-- olap query the table
SELECT * FROM word_count;
Trino
Let's go to Trino's cli first.
docker compose exec trino trino
Trino's paimon catalog is already set up, but I didn't add a new schema but just used the default one.
So we can query the Flink write result directly.
SELECT * FROM paimon.default.word_count;
We should see something similar to the Flink query.
StarRocks
This is an extra, just to show how much attention Paimon is getting now that many New SQL databases are starting to support it.
Prepare a mysql
client locally to connect to StarRocks.
mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
We still need to create a catalog.
CREATE EXTERNAL CATALOG paimon_catalog_flink
PROPERTIES
(
"type" = "paimon",
"paimon.catalog.type" = "filesystem",
"paimon.catalog.warehouse" = "s3://warehouse/flink",
"aws.s3.enable_path_style_access" = "true",
"aws.s3.endpoint" = "http://storage:9000",
"aws.s3.access_key" = "admin",
"aws.s3.secret_key" = "password"
);
The mysql client should not support Trino's table locator format: <catalog>. <schema>. <table>
, so we have to switch to the db before we can query.
USE paimon_catalog_flink.default;
SELECT * FROM word_count;
The results here will be similar to the above.
Conclusion
Although Paimon supports many kinds of metastore as follows.
- filesystem
- hive metastore
- jdbc
But for the sake of simplicity, I didn't use extra components, so I only use S3 aka filesystem as metastore. Although the function is fine, according to the official document, using S3 as warehouse needs to be paired with hive metastore or jdbc metastore to ensure consistency.
But for object storage such as OSS and S3, their 'RENAME' does not have atomic semantic. We need to configure Hive or jdbc metastore and enable 'lock.enabled' option for the catalog. Otherwise, there may be a chance of losing the snapshot.
Understanding what kind of scenarios require this kind of consistency will be the goal of my future experiments.
Top comments (0)