This is my final post about dlt (data load tool) for now. I've been writing about a workshop given by Adrian Brudeau. The workshop was recorded on YouTube and the full documentation is HERE.
Incremental loading
Incremental loading will only load new data. This makes our pipelines run faster and cheaper. But we may have to update preexisting data. State keeps track of what was loaded. Incremental extraction uses the state to request only the data starting from a particular date and time.
You don't want to scan your entire table when loading incremental data. This can be very costly.
There are two ways of loading data incrementally. The first is to append and the second is to merge. If the data is stateless, for example, the NY taxi data that we're using, you just append it. The previous data doesn't change. Event data, like our taxi data, is stateless. Once an event happens, nothing about it changes.
We use "merge" to update data that changes. The instructor created an example where the customer has a rating that can change. For example, they could have booked a ride, and then later changed the ride status to "paid" or "cancelled". If cancelled, the rating might drop. So you need a way to change preexisting data, and that's with "merge".
In this example, the status of the ride has changed from "booked" to "cancelled", John's rating has dropped from 4.9 to 4.4, and Jack's rating has dropped from 3.9 to 3.6.
data = [
{
"vendor_name": "VTS",
"record_hash": "b00361a396177a9cb410ff61f20015ad",
"time": {
"pickup": "2009-06-14 23:23:00",
"dropoff": "2009-06-14 23:48:00"
},
"Trip_Distance": 17.52,
"coordinates": {
"start": {
"lon": -73.787442,
"lat": 40.641525
},
"end": {
"lon": -73.980072,
"lat": 40.742963
}
},
"Rate_Code": None,
"store_and_forward": None,
"Payment": {
"type": "Credit",
"amt": 20.5,
"surcharge": 0,
"mta_tax": None,
"tip": 9,
"tolls": 4.15,
"status": "cancelled"
},
"Passenger_Count": 2,
"passengers": [
{"name": "John", "rating": 4.4},
{"name": "Jack", "rating": 3.6}
],
"Stops": [
{"lon": -73.6, "lat": 40.6},
{"lon": -73.5, "lat": 40.5}
]
},
]
The dlt commands we use to update the data are:
# define the connection to load to.
pipeline = dlt.pipeline(destination='duckdb', dataset_name='taxi_rides')
# run the pipeline with default settings, and capture the outcome
info = pipeline.run(data,
table_name="users",
write_disposition="merge",
merge_key="record_hash")
Notice that we're using the record_hash as the merge key. If I look at the data now, I can see that the status is "cancelled" and the ratings have changed. Also, the information wasn't actually updated. It's been replaced. Updating would be time-consuming, because our data is stored in columnar format.
That's all about dlt for now. There's more on dlthub. In particular, we need to be able to read data incrementally if we're going to write it incrementally, and we didn't cover that in this workshop. Use the links at the end of the workshop documentation folder on Data Talks Club's data engineering zoomcamp site to learn more about dlt.
Top comments (0)