So by now, you are probably aware that InfluxData has been busy building the next generation of the InfluxDB storage engine. If you dig a little deeper, you will start to uncover some concepts that might be foreign to you:
- Apache Parquet
- Apache Arrow
- Arrow Flight
These open-source projects are some of the core building blocks that make up the new storage engine. For the most part, you won’t need to worry about what’s under the hood. Though if you are like me and want a more practical understanding of what some of the projects are, then join me on my journey of discovery.
The first component we are going to dig into is Apache Arrow. My colleague Charles gave a great high-level overview, which you can find here.
In short:
“Arrow manages data in arrays, which can be grouped in tables to represent columns of data in tabular data. Arrow also provides support for various formats to get those tabular data in and out of disk and networks. The most commonly used formats are Parquet (You will be exposed to this concept quite a bit).”
For performance reasons, our developers used Rust to code InfluxDB’s new storage engine. I personally like to learn new coding concepts in Python, so we will be making use of the pyarrow client library.
The basics
In Apache Arrow, you have two primary data containers/classes: Arrays and Tables. We will dig more into what these are later, but let’s first write a quick snippet of code for creating each:
import pyarrow as pa
# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes"], type=pa.string())
count = pa.array([12, 5, 2, 1], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022], type=pa.int16())
# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])
print(table)
So in this example, you can see we constructed 3 arrays of values: animal, count, and year. We can combine these arrays to form the columns of a table. The results of running this code look like so:
animal: string
count: int8
year: int16
----
animal: [["sheep","cows","horses","foxes"]]
count: [[12,5,2,1]]
year: [[2022,2022,2022,2022]]
So now that we have a table to work with, let’s see what we can do with it. The first primary feature of Arrow is to provide facilities for saving and restoring your tabular data (most commonly into the Parquet format, which will feature heavily in future blogs).
Let’s save and load our newly created table:
import pyarrow as pa
import pyarrow.parquet as pq
# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes"], type=pa.string())
count = pa.array([12, 5, 2, 1], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022], type=pa.int16())
# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])
# Save the table to a Parquet file
pq.write_table(table, 'example.parquet')
# Load the table from the Parquet file
table2 = pq.read_table('example.parquet')
print(table2)
Lastly, to finish the basics, let’s try out a compute function (value_counts). We can apply compute functions to arrays and tables, which then allows us to apply transformations to a dataset. We will cover these in greater detail in the next section but let’s start with a simple example:
import pyarrow as pa
import pyarrow.compute as pc
# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes", "sheep"], type=pa.string())
count = pa.array([12, 5, 2, 1, 10], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022, 2021], type=pa.int16())
# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])
count_y = pc.value_counts(table['animal'])
print(count_y)
As you can see, call the library pyarrow.compute as pc and use the built-in count function. This allows us to count the number of values within a given array or table. We chose to count up the number of animals, which produces the following output:
-- child 0 type: string
[
"sheep",
"cows",
"horses",
"foxes"
]
-- child 1 type: int64
[
2,
1,
1,
1
]
A practical example
So I decided to skip listing all the datatypes and processors to you and thought I would show you a more realistic example of using Apache Arrow with InfluxDB’s 3.0.
So the plan:
- Query InfluxDB 3.0 using the new python client library.
- Then we will use a new function to save the table as a series of partitioned Parquet files to disk.
- Lastly a second script will reload the partitions and perform a series of basic aggregations on our Arrow Table structure.
Let’s take a look at the code:
create_parquet.py
token = ''
host = 'eu-central-1-1.aws.cloud2.influxdata.com'
org = 'Jay-IOx'
db = 'factory'
client = InfluxDBClient3.InfluxDBClient3(token=token,
host=host,
org=org,
database=db)
query = "SELECT vibration FROM machine_data WHERE time >= now() - 1h GROUP BY machineID"
table = client.query(query=query, language="influxql")
print(table)
print("Saving to parquet files...")
# partitioning of your data in smaller chunks
ds.write_dataset(table, "machine_data", format="parquet",
partitioning=ds.partitioning(
pa.schema([table.schema.field("iox::measurement")])
))
So this function you will be unfamiliar;
- Write_dataset(… partitioning=ds.partitioning(…)) : This modified method partitions our table into Parquet files based upon the values within our ‘iox::measurement’ column. This will look like a tree of directories. This method helps to separate large datasets into more manageable assets.
Let’s now take a look at the second script, which works with our saved Parquet files:
import pyarrow.dataset as ds
# Loading back the partitioned dataset will detect the chunks
machine_data = ds.dataset("machine_data", format="parquet", partitioning=["iox::measurement"])
print(machine_data.files)
# Convert to a table
machine_data = machine_data.to_table()
print(machine_data)
# Grouped Aggregation example
aggregation = machine_data.group_by("machineID").aggregate([("vibration", "mean"),("vibration", "max"),("vibration", "min") ]).to_pandas()
print(aggregation)
In this script we deploy several new functions you might be familiar with if you work with Pandas or other query engines: group_by and aggregate. We use these functions to group our data points based on the measurement and provide a mathematical aggregate to each group (mean, max, min). This generates a new Arrow table based on the aggregations. We then convert the table back to a data frame for readability.
Conclusion
I hope this blog empowers you to start digging deeper into Apache Arrow and helps you to understand why we decided to invest in the future of Apache Arrow and its child products. I also hope it gives you the foundations to start exploring how you can build your own analytics applications from this framework. InfluxDB’s new storage engine emphasizes its commitment to the greater ecosystem. For instance, allowing the exportation of Parquet files gives us the opportunity to analyze our data in platforms such as Rapid Miner and other analytical platforms.
My call to action for you is to check out the code here and discover some of the other processor functionality Apache Arrow offers. A lot of the content coming up will be around Apache Parquet, so if there are any products/platforms that use Parquet that you would like us to talk about let us know. Come join us on Slack and the forums. Share your thoughts — I look forward to seeing you there!
Top comments (1)