I originally posted this on Starburst's blog, as part of a series I've been publishing on Iceberg.
One key feature of the Apache Iceberg connector is Trino’s ability to modify data that resides on object storage. As we all know, storage like Amazon S3 is immutable which means it cannot be modified. This was a challenge in the Hadoop era where data needed to be modified or removed at the individual row level. Trino allows for full DML (data manipulation language) using the Iceberg connector which means full support for update, delete and merge.
Since Iceberg is a table format, when DML commands are issued, transaction logs are created and stored in the same storage, so clients that read from the same table will reflect these changes in subsequent queries.
Insert
Inserts are one of the most used commands used in a modern data lake. Data is constantly being added and as you would expect, the Iceberg connector for Iceberg supports a standard insert statement:
insert into customer_iceberg values
(90000,'Testing','33 Main',3,'303-867-5309',323,'MACHINERY','Testing Iceberg');
Note that update, deletes and merges should be run in serial and or batch against a single table. This is to ensure there are no conflicts.
Update
Updates in Trino with the Iceberg connector act just like ordinary updates. If there are current select statements executing against this table, they will see the data in the previous snapshot to ensure read integrity.
update customer_iceberg set name = 'Tim Rogers' where custkey = 2732;
Delete
Delete statements are rarely used in a modern data lake, instead “soft” deletes are most common which means the row is updated to be flagged as deleted. There is usually a status column or something similar to filter out reading this data by select queries. In the event a row or set of rows need to be deleted, this can be done with a typical delete statement:
delete customer_iceberg where custkey = 2732;
Note: A delete statement doesn’t actually physically delete the data off the storage. In order to ensure the data has been removed, an expire_snapshots procedure needs to be executed with a date that is less than when the delete statement was ran.
Merge
The merge statement is a very handy tool to add logic based operations to a sql statement. Merge is often used when you have new or modified data that is staged in a table first. A good example is customer data that is being pulled from an operational system. CDC (change data capture) data is extracted from a CRM system into a staging table in S3. Or with Trino, a merge can use an existing table from the source system.
To use merge, you can either stage data that needs to be inserted or updated into your target table or you can use data directly from the source table(s).
Examples:
Example 1: If there are rows that don’t exist in the target table, insert them. This is a very basic merge statement. The customer_land
table below could be a staged table in object storage like S3 or it could be from a source system such as MySQL or SQL Server:
MERGE INTO s3lakehouse.blog.customer_base AS b
USING (select * from s3lakehouse.blog.customer_land) AS l
ON (b.custkey = l.custkey)
WHEN NOT MATCHED
THEN INSERT (custkey, name, state, zip, cust_since,last_update_dt)
VALUES(l.custkey, l.name, l.state, l.zip, l.cust_since,l.last_update_dt);
Example 2: With merge, we can issue a single statement to insert new rows and update existing ones:
MERGE INTO s3lakehouse.blog.customer_base AS b
USING s3lakehouse.blog.customer_land AS l
ON (b.custkey = l.custkey)
WHEN MATCHED and b.name != l.name
THEN UPDATE
SET name = l.name ,
state = l.state,
zip = l.zip,
cust_since = l.cust_since
WHEN NOT MATCHED
THEN INSERT (custkey, name, state, zip, cust_since,last_update_dt)
VALUES(l.custkey, l.name, l.state, l.zip, l.cust_since,l.last_update_dt);
This statement will insert new rows where the custkey doesn’t exist in the target table. It will update rows in the target table if the custkey matches and the name has changed. Of course in real-world situations, there will be numerous columns that are checked to see if they have changed to issue an update. I chose name for this simple example but you can see the power of merge and why it’s a game changer for a modern data lake.
Example 3: Slowly Changing Dimension (SCD Type 2)
MERGE INTO s3lakehouse.blog.customer_base as b
USING
( SELECT null as custkey_match, custkey, name, state, zip, cust_since, last_update_dt,'Y' as active_ind,current_timestamp as end_dt
FROM s3lakehouse.blog.customer_land
UNION ALL
SELECT
custkey as custkey_match,custkey, name, state, zip, cust_since, last_update_dt,active_ind,end_dt
FROM s3lakehouse.blog.customer_base
WHERE custkey IN
(SELECT custkey FROM s3lakehouse.blog.customer_land where active_ind = 'Y')
) as scdChangeRows
ON (b.custkey = scdChangeRows.custkey and b.custkey = scdChangeRows.custkey_match)
WHEN MATCHED and b.active_ind = 'Y' THEN
UPDATE SET end_dt = current_timestamp,active_ind = 'N'
WHEN NOT MATCHED THEN
INSERT (custkey, name, state, zip, cust_since,last_update_dt,active_ind,end_dt)
VALUES(scdChangeRows.custkey, scdChangeRows.name, scdChangeRows.state, scdChangeRows.zip,
scdChangeRows.cust_since,scdChangeRows.last_update_dt,'Y',null);
A SCD Type 2 simply means we insert new rows and we also “end date” existing rows and insert a new row. This allows history to be maintained in a single table. This is a data warehousing technique that has been around for a long time. The ability to perform this feature in a data lake is new though and opens up a choice to provide data warehousing features right out of a cloud storage.
There is a lot going on in this merge so we’ll cover a few points. We first select data from the landing table and union it with our base table while only pulling active rows. From there, we insert any new rows in addition to any modified rows. Lastly, we update or “end date” the old row by setting the active_ind to ‘N’ and providing a date in the end_dt column.
Optimize
As your Iceberg tables grow and have many operations performed against them, it’s a good idea to optimize them from time to time. The optimize command not only makes small files larger for better performance, it also cleans up the metadata which improves queries due to less metadata that needs to be read.
To scan the table for small files and make them larger, you simply issue the following command:
alter table <table> execute optimize;
This will look for any files under 100MB and combine them into larger ones. You can also choose the file size if 100MB:
ALTER TABLE <table> EXECUTE optimize(file_size_threshold => '10MB')
If your Iceberg table becomes very large and the optimize command above is taking too long to run, you can just optimize the files that have arrived recently:
alter table <table> execute optimize where $file_modified_time > <yesterday>
This will look for files that have arrived since yesterday and optimize them. On a very active table where lots of changes are taking place, this will greatly reduce the amount of time the optimize command takes.
For tables that are being modified constantly, it’s a good idea to optimize at regular intervals.
We’ve covered one of the most powerful features of Iceberg with Trino which allows database type updates/deletes/merges to be performed on your modern data lake. This opens the door to increasing the use cases and reliability of the data lake on a variety of clouds and platforms.
Top comments (0)