DEV Community

ChunTing Wu
ChunTing Wu

Posted on

Apache Paimon Playground ft. Flink and Trino

Apache Paimon is a new data lakehouse format that focuses on solving the challenges of streaming scenarios, but also supports batch processing. Overall, Paimon has the potential to replace the existing Iceberg as the new standard for data lakehousing.

Why Iceberg and not the other two (Hudi and Delta Lake)?

Iceberg is the most widely supported by various open-source engines, including pure query engines (e.g., Trino), New SQL databases (e.g., StarRocks, Doris), and streaming frameworks (e.g., Flink, Spark), all of which support Iceberg.

However, Iceberg faces several problems in streaming scenarios, the most serious one is the fragmentation of small files. Queries in data lakehouses rely heavily on file reads, and if a query has to scan many files at once, it will of course perform poorly.

To avoid this problem, we need to rely on an external orchestrator to merge files on a regular basis. Paimon is designed with a built-in merge mechanism, and many other optimizations for mass writes, making it more adaptable to streaming scenarios.

Experiment environment

In order to learn more about Iceberg, I have set up two experimental environments.

This time I also built a playground for Paimon, which also includes Trino and Flink.

https://github.com/wirelessr/paimon-trino-flink-playground

In addition, StarRocks was also put in as a representative of New SQL.

Image description

Because neither Trino nor StarRocks support streaming writes at this stage, Paimon's writes come from Flink.

How to use

NOTE: Since some of the links to the official Paimon files are not working, I've put the files into this repo. However, some of the files are huge, so I put them in via LFS, so be sure to install git-lfs.

Trino driver is in paimon-trino-427-0.8-20241112.000605-197-plugin.tar.gz.

tar -zxvf paimon-trino-427-0.8-20241112.000605-197-plugin.tar.gz
Enter fullscreen mode Exit fullscreen mode

Then it will run normally with docker compose up -d.

docker compose up -d
Enter fullscreen mode Exit fullscreen mode

Flink

Let's start by connecting to Flink SQL.

docker compose exec flink-jobmanager ./bin/sql-client.sh
./bin/sql-client.sh
Enter fullscreen mode Exit fullscreen mode

To write data using Flink we first need to create the correct catalog.

CREATE CATALOG my_catalog WITH (
    'type' = 'paimon',
    'warehouse' = 's3://warehouse/flink',
    's3.endpoint' = 'http://storage:9000',
    's3.access-key' = 'admin',
    's3.secret-key' = 'password',
    's3.path.style.access' = 'true'
);
Enter fullscreen mode Exit fullscreen mode

As shown in the above commands, we're using the MinIO as an S3 to store the Paimon.

The next step in creating the table and writing the data is quite simple, just run the commands according to the official documentat.

USE CATALOG my_catalog;

-- create a word count table
CREATE TABLE word_count (
    word STRING PRIMARY KEY NOT ENFORCED,
    cnt BIGINT
);

-- create a word data generator table
CREATE TEMPORARY TABLE word_table (
    word STRING
) WITH (
    'connector' = 'datagen',
    'fields.word.length' = '1'
);

-- paimon requires checkpoint interval in streaming mode
SET 'execution.checkpointing.interval' = '10 s';

-- write streaming data to dynamic table
INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;
Enter fullscreen mode Exit fullscreen mode

Then we actually read it and see the result of what we've written.

-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';

-- switch to batch mode
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';

-- olap query the table
SELECT * FROM word_count;
Enter fullscreen mode Exit fullscreen mode

Trino

Let's go to Trino's cli first.

docker compose exec trino trino
Enter fullscreen mode Exit fullscreen mode

Trino's paimon catalog is already set up, but I didn't add a new schema but just used the default one.

So we can query the Flink write result directly.

SELECT * FROM paimon.default.word_count;
Enter fullscreen mode Exit fullscreen mode

We should see something similar to the Flink query.

StarRocks

This is an extra, just to show how much attention Paimon is getting now that many New SQL databases are starting to support it.

Prepare a mysql client locally to connect to StarRocks.

mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
Enter fullscreen mode Exit fullscreen mode

We still need to create a catalog.

CREATE EXTERNAL CATALOG paimon_catalog_flink
PROPERTIES
(
    "type" = "paimon",
    "paimon.catalog.type" = "filesystem",
    "paimon.catalog.warehouse" = "s3://warehouse/flink",
    "aws.s3.enable_path_style_access" = "true",
    "aws.s3.endpoint" = "http://storage:9000",
    "aws.s3.access_key" = "admin",
    "aws.s3.secret_key" = "password"
);
Enter fullscreen mode Exit fullscreen mode

The mysql client should not support Trino's table locator format: <catalog>. <schema>. <table>, so we have to switch to the db before we can query.

USE paimon_catalog_flink.default;
SELECT * FROM word_count;
Enter fullscreen mode Exit fullscreen mode

The results here will be similar to the above.

Conclusion

Although Paimon supports many kinds of metastore as follows.

  • filesystem
  • hive metastore
  • jdbc

But for the sake of simplicity, I didn't use extra components, so I only use S3 aka filesystem as metastore. Although the function is fine, according to the official document, using S3 as warehouse needs to be paired with hive metastore or jdbc metastore to ensure consistency.

But for object storage such as OSS and S3, their 'RENAME' does not have atomic semantic. We need to configure Hive or jdbc metastore and enable 'lock.enabled' option for the catalog. Otherwise, there may be a chance of losing the snapshot.

Understanding what kind of scenarios require this kind of consistency will be the goal of my future experiments.

Top comments (0)