📍 J on the Beach, Málaga 2024 – 📹 Video recording of the presentation
This post is about a presentation at J on the Beach, a conference on multiple topics including data engineering and distributed systems, in May 2024. It is pretty much the presentation notes converted into a post. In addition to expanding on the notes and converting into a coherent text, we have added a little TL;DR summary for convenience.
The data lakehouse architecture is gaining traction as it simplifies data management by serving as a unified repository for various data workloads. A Dremio report highlights that 95% of respondents plan to use a lakehouse for at least 25% of their workloads in the future. The lakehouse extends traditional data lakes by reducing the need for data movement and ETL jobs, leveraging open-table formats like Apache Iceberg and Delta Lake for better performance and workload support.
Key to this efficiency is organizing data through multi-dimensional indexing, as exemplified by the Qbeast-Spark extension for Delta Lake. This approach optimizes data layout, enhancing query performance and reducing processing time and data volume read. Experiments with the NYC Yellow Taxi dataset demonstrated significant performance improvements, emphasizing the importance of regular data layout maintenance in data lakehouses.
The data lakehouse architecture is becoming increasingly popular in the industry as an approach to overcome the complexity of the data landscape in organizations of all sizes. The Dremio report on the State of the Lakehouse reports that 95% of their respondents expect to run at least 25% of their workload on a lakehouse in the upcoming years. Independent of what the precise value is, it matches expectations of increasing interest across the industry. Of course, prevalence and reasons for adoption may vary according to market segment, but the architecture principle of being a single repository for a variety of data workloads is widely applicable.
The data lakehouse architecture is the next step in the evolution of the data lake vision of having one repository for different types of data, supporting a diverse set of native formats. Although the vision does not prescribe the technology used to implement such data lakes, traditionally, it has been implemented with distributed file systems (HDFS) and object stores (AWS S3). File and table formats, like ORC, Parquet, and Hive have been created to enable structure over data with files and objects.
The data Lakehouse extends the data lake to accommodate more data workloads of an organization, including transformations against raw data in the lakehouse and derived data sets. Its promise is to reduce or even eliminate completely the necessity of moving data or creating copies, foregoing data movement and copies via a web of ETL jobs that add complexity to the infrastructure. In practical terms, a new generation of open-table formats provides multiple improvements over the Hive format and enables more workloads to run directly on the data lake. The name lakehouse stems from the ability to support workloads on the data lake that have been traditionally associated with EDW (Enterprise Data Warehouses).
Open-table formats are a critical piece, an enabler of data lakehouses. A simple way to think about these open-table formats is that they provide a layer of metadata on top of open-file formats to enable a table structure. Different formats have different metadata they rely on, here I’m showing two popular examples, Apache Iceberg, and Delta Lake.
When processing a query against a table, the metadata provides information such as the file to process and stats. Stats like min-max ranges per column enable file skipping, which reduces the volume of data to fetch from storage and process. A min-max range for a table column of a given file consists of a minimum and a maximum for the values of the column in that file. With such values, a query engine determines whether to skip a file or parts of it based on conditions like in WHERE clauses in the query. The use of min-max ranges to skip data is not a new concept, there are code references in the Parquet project from as early as 2014 and references in academic work such as the “Small Materialized Aggregates” paper from VLDB 1998.
Min-max filtering impacts the performance of query processing, let me walk through a simple example to illustrate a table with two numeric columns, A and B.
Initially, we have a table with 8 records split across 4 files. This is the current state of the table and says that the files have been created according to the order in which the data has been generated and ingested. Each file has its associated min-max ranges.
If we filter the table using the predicate in the slide (A > 10 AND B > 0.0), then we need to read from all the files, we are not able to filter any out based only on the min-max ranges.
Now say that we have the data organized differently. We take the two columns and use them as dimensions of a two-dimensional space, we split it 4 ways and map each sub-space to a file, as the arrows indicate. After reorganizing, we only need to read one file as the others are ruled out based on the min-max ranges.
Tables are often not static, though, new data is appended regularly. When that happens, it affects the data layout impacting the min-max filtering again. In the example, the same filtering requires reading two files with the new data instead of one. Consequently, reorganizing the data layout regularly enables a more effective min-max filtering.
The impact of the data layout on min-max filtering can be significant, and it has been shown in both academic work and practical use cases. You may be wondering about horizontal partitioning of the data, Hive-style. It is definitely effective, but it induces data fragmentation for high cardinality columns, which is common in practical use cases. In some cases, it might require even manipulating the data and creating new columns just to get the partitioning to work, which is not really desirable.
Here, I’m using the term lakehousekeeping to denote the process of organizing data in the lakehouse, and in particular, the data layout for efficiency. This organization broadly occurs at ingestion and regularly for data at rest to ensure an efficient layout. To decide on the layout, we focus on the remainder of this presentation on a multi-dimensional indexing approach. It represents data as a space of selected dimensions and partitions it to form cubes, mapping cubes to files. Space partitions are organized in a tree data structure called OTree.
We have implemented it as an extension of Spark, enabling this indexing for Delta Lake tables. We name the software extension qbeast-spark
. qbeast-spark
is open-source and available on GitHub, primarily developed by Qbeast, which is an early-stage startup company.
qbeast-spark
Let’s briefly cover multi-dimensional indexing with Qbeast-spark. Using a designated subset of columns, which could be manually set or automatically chosen, we organize the data in a multi-dimensional space where the dimensions are defined by the designated columns. The space is split according to a designated capacity, if the sub-space is large, then we split further.
We use a cube to denote such a subspace. A cube contains elements of the subspace. It maps to a node in the index tree. Leaf cubes do not split further, while parent cubes do split further. As parent cubes contain a fraction of the data in the subspace, we can process approximate queries using samples efficiently.
We store cube data in files. Cube is split into blocks of data that may be spread across multiple files. Although I’m simplifying the illustration by mapping one cube to one file, in practice, we can have data from multiple cubes in the same file.
We mentioned previously that we build on Spark. To create a table indexed with Qbeast, we create a Spark data frame to load the data we want to index with Qbeast and specify the write format to be Qbeast. In the code snippet, we specify manually the columns we want to use to index, although there is an auto-indexing feature. For a table already indexed with Qbeast, we have the option of optimizing it, which will perform some maintenance operations over the table, like compaction.
Delta persists table transaction metadata in the Delta Log, which is a folder under the overarching table folder. The Delta Log comprises JSON files, and here we show an entry extracted from such a JSON file, adding a data file to the table. In this example, we see that the add file entry provides information like the path and stats on the columns. For Qbeast indexing, we add metadata as tags like in the second snippet of the slide. We see in the snippet that the file contains one cube.
Let’s see it in action with some public data set. We used the NYC Yellow Taxi data from 2022 and 2023, there are 24 parquet files with about 77 million records total. It is not a large data set, but it is public and makes reproducibility possible. The schema is the one on the right-hand side of the slide, it has 19 columns, all numeric. We ran all experiments on my local laptop, so they are easily reproducible, although there could be of course differences in the results due to hardware configuration.
We have ingested the data as a Qbeast table, and the slide shows the metrics extracted with Qbeast-Spark. I have indexed using four columns and initially chose a cube size of 50,000 records.
Let’s look at some plots. This first plot shows for each value in the data set range (x-axis) the normalized number of files (y-axis) containing the value. The range in this case corresponds to the pickup location dimension. It is normalized because the number of files is different between the source and the table indexed with Qbeast. We show data points for both the source data and the data indexed with Qbeast. We see that with Qbeast, values in the range are in fewer files relatively, indicating narrower min-max ranges per file for this column. Values within the range are present in a fraction of less than 0.2 of the files in the data set indexed with Qbeast while all values are present in all files for the original data.
We next show data points for pickup timestamp, which is very different as the data is very skewed, and, not surprisingly, we observe it concentrated on the natural range of the data set, 2022-2023. However, there are many outliers in the raw data, which we have kept for analysis purposes, even though it might be desirable to remove in a practical use case.
We see that these outlier values are present in fewer files relative to the total. With the Qbeast-indexed data set, values are present in a fraction smaller than 0.6.
Let’s see how it affects the simple filtering of the data. We have executed three runs after a warmup run of a simple filtering query. We see from the figures that for the filtering predicate that we’ve chosen, reduces the response time between 16-33%. We also observed a reduction in the amount of data that Spark reads to process by 35%.
Of course, making the processing slightly more complex has a higher impact on the performance because we have less data to process. We see that the improvements are more significant both for response time (about 50%) and the amount of data we need to process (about 45%) when adding sorting and retrieving the top 20 records.
Fetching the top 20 records forces the engine to retrieve all the data of the records, unlike the filtering query alone which only requires fetching the data of the filtered columns as we are using a columnar format underneath, Parquet.
As I make the filtering ranges wider, we require data from more files, and the fact that we have many more files to read from ends up impacting negatively the performance.
Let’s now rewrite our data with larger files, configured with a larger target cube size. We have taken the average file size from the source data and wrote a Qbeast table using that value as the target cube size.
With this new data set, we conducted the same analysis for the same two dimensions. For pickup location, we see that files are larger, there are fewer of them, yet values are present in a fraction of files smaller than 0.6, indicating narrower min-max ranges.
For pickup timestamp, the skewness of this dimension forces values in the upper side of the range into all files. There are multiple aspects of the implementation that cause this like rollups to consolidate files.
We can see that we still get a shorter response time and less data to process when performing that filtering over a wider range.
We argued in this presentation that it is important for performance and resource utilization to pay close attention to the data layout in a data lakehouse. We discussed an advanced technique based on tree indexing that enables efficient processing of queries by reducing the min-max ranges. With a small public data set, we are able to reduce query response time by 50% and the amount of data read to process by a similar amount. Such optimization is appropriate at distinct points of the data lifecycle: when ingesting data into a table and regularly to ensure that the table overall has a good layout. We associated this overall activity with the housekeeping of the data lakehouse as an analogy of how we take time to keep our homes or even hotel rooms organized so that we are more efficient in our daily tasks.
qbeast-spark
repo: https://github.com/Qbeast-io/qbeast-spark