Indexing and Sampling on Data Lake(house)s with Qbeast-Spark
Creating and leveraging indexes is an essential feature in DBMS and data warehouses to improve query speed. When applying filters in a SQL query, the index is first consulted to locate the target data before any read. In this way, we avoid reading the entire table from the storage, reducing the data transfer involved.For a query like the following, if the sales_table is indexed on the year column, we can directly access the data from 2020 and onwards, avoiding sequential scan on the entire table. Search algorithms such as Binary Search can be leveraged to achieve sub-linear time complexity by sorting the table by year.
SELECT * FROM sales_table WHERE year >= 2020
Given this obvious benefit, one of the first things to do when creating a table in a DBMS is to define the indexes you want, using one or more columns(or any combination of them).
Generally, the column defined as the primary key is used as a clustered index, the order of which dictates the arrangement of the table rows. One can also create non-clustered indexes, keeping in mind that each index is a separate data structure that requires additional storage space. Non-clustered indexes don’t affect the table rows’ ordering; they are used to locate data, and an index-table link is used for data access.
In our last article, we discussed the need for Data Lakehouses, how they could alleviate many of the burdens in big data, and the role of Qbeast-format. To summarize, Data Lakehouses add a metadata management layer on the Data Lake to achieve ACID properties and schema enforcement, among other benefits. Qbeast-format’s goal is to add multiple additional features to the Lakehouse, such as multi-dimensional indexing, efficient sampling, and table tolerance, to reduce data transfer for queries further, making Data Lakehouses even more powerful.
This time, we will explain the details behind the key features of Qbeast-format, emphasizing its benefits and inner workings by unfolding the OTree Algorithm.
With Qbeast-format, more specifically its implementation Qbeast-spark, one can create a clustered multi-dimensional index for their table stored in a Data Lake with a single line of code. With that, you can directly access your Data Lake(house) and start running queries with indexing support.
# Indexing a Spark SQL DataFrame on four columns with Qbeast-spark ( df.write .format('qbeast') .option('columnsToIndex', 'col_1,col_2,col_3,col_4') .save('./qbeast_table') )
But just before we dive in, let’s have a quick rewind on how things have been up until now and where it can potentially go.
- Single and multi-column indexes
- Spatial index
- Recursive Space Division Algorithm
- OTree Algorithm
How to Implement A Multi-column Index?
Implementing a single-column index is straightforward — take the indexed column, order the column records according to their values, and sort the table records according to the index if the index is clustered. When reading, use the index to find the target records. If the index is non-clustered, an index-record link is used for data access.
The construction of multi-column indexing is a bit more complex, for there’s no natural way of sorting the records that respect the natural order of all individual columns. For instance, arranging rows with columns (age, height, weight) can only be done by prioritizing one column over another.
One example could be first order the records by age. For those that have the same age, order them by height. And for those that have the same height, order them by weight.
Studens_table (age, height, weight) (15 , 160 , 65 ) (15 , 163 , 66 ) (15 , 170 , 70 ) (16 , 158 , 60 ) (18 , 162 , 75 ) (18 , 162 , 76 ) (18 , 180 , 70 ) (18 , 180 , 72 ) (23 , 175 , 78 ) (23 , 176 , 69 )
A direct consequence of this is that only the first column is ordered. If we were to filter the records by height and height alone, this index is of no use. The same applies to weight. If we search by (age, height), the target ages would be found quickly, but a sequential scan would be needed to locate the target heights.
This limitation is attributed to the interdependency between the columns when sorting, which gave birth to Spatial Indexes.
Spatial Index for Enhanced Data Querying
The idea is to construct a vector space with the target columns and position the records inside the space according to their values. This way, all columns involved are mutually independent, each defining a dimension in the space. When filtering, one can use any combination of the indexed columns, and no restriction as before is implied.
Take the image below as an example, we index our dataset with columns X and Y, and construct a two-dimensional space for the records. The position of the records depend only on their values for the indexed columns. When filtering the table using any combination of these columns, we search on the vector space to locate the records.
Illustration of queries that target specific data. The dataset is indexed on columns x and y. The two blue rectangles illustrate the range constraints on the indexed columns, only the elements inside these regions.
In the image above, limiting values for both X and Y results in retrieving elements from the small blue rectangle region on the left. When using only one column, such as the case of the long rectangle orthogonal to the X axis, we can simply limit the values for the X dimension to the defined range, regardless of the column Y.
Recursive Division for Optimizing Data Access
To improve filtering speed and reduce retrieval time, we split the high-density regions of the vector space into subspaces in a recursive fashion, thus the Recursive Space Division(RSD).
Regions are divided until the number of elements in each subspace is no larger than a predefined capacity. All the elements in space are distributed among the subspaces for each division. In the case of a QuadTree, each division halves the ranges in all dimensions, resulting in non-overlapping subspaces with well-defined boundaries that evenly split the region. If the number of dimensions is n, it creates 2^n subspaces in each split.
The images below show the division process of two-dimensional space; notice the number of divisions and the density of each area.
Recursive Space Division for QuadTree, first iteration from left to right.
Recursive Space Division for QuadTree. The space is divided for those regions where the number of elements contains is more than 2. The top-left square experienced more divisions for its higher density of data points.
A Tree-like representation of the example above is shown in the image below. An empty string “ ” is used to represent the root, and numbers 0, 1, 2, and 3 are used to encode the subspaces ordered clockwise. Each node in the tree is either a leaf node containing elements equal to or less than the predefined capacity or an internal node with four children and has no records.
Tree-like representation for QuadTree. Nodes split into child nodes when the capacity is exceeded. Nodes in red contain no pointers after splitting.
Data pointers are stored in the tree nodes to reference the table records. Each time a node is split(red nodes in the image above), the pointers contained are classified and distributed among the child nodes, leaving the node itself empty.
If we were to query the elements that fall inside the boundaries of node 020, nodes 0200, 0201, 0203, 02020, 02021, 02022, and 02023 are read, with 020, and 0202 being left out.
Keeping the Index Updated
Writing new records to an indexed dataset requires finding their correct positioning. For a single-column index, as discussed at the beginning, the task is the same as inserting an element into a sorted array. For a spatial index like QuadTree, the idea would be to find the node with the right boundaries to accommodate the record. It starts from the root and navigates down the tree.
As shown in the following image, the values of the new record determines the node(or subspace) it should go. If the node is an inner node, then we should choose the correct child node for allocation.
Element e should go to the top-left cube. 0 < a/3 < a/2, 0 < b/3 < b/2.
A space division can be triggered during WRITES that put a number of records in a node that exceeds the capacity. The contained records are classified and redistributed among the subtree. This process can take a long time, affecting the write speed, reducing concurrency, and often requiring locks to achieve isolation.
Space division from a QuadTree. The elements from a split node are classified and redistributed among the child nodes.
As shown in the above image, a write encounters nodeX to be full which triggers an optimization process that requires splitting the space by creating child cubes(X1, X2, X3, and X4), reading all the records from nodeX, assigning and sending the records to the corresponding child cube according to their values.
A QuadTree index as described is not scalable. For this reason OTree is introduced.
OTree Algorithm: A Solution for Scalable Data Indexing
An OTree Index is designed to be eventually optimal so concurrent writes don’t need to acquire locks. It’s optimized for immutable cloud object stores and is based on open formats such as JSON and Parquet. Features such as Multi-dimensional Indexing and Efficient Sampling are at the core of Qbeast-format, and they are both empowered by the OTree Algorithm.
When a node is full, the classification and redistribution process of existing records is removed for better concurrency. The nodes now have a soft limit as capacity. When encountering a full node, new records can be either squeezed into it or allocated in a newly created child node, always leaving the existing records intact. (More on this later!)
OTree algorithm. Writing an element(E) with a weight(w) to cube X. If w ≥ maxWeight we skip to its correct child cube until finding a cube such that w < maxWeight. The algorithm creates child cubes if cube X is a leaf.
The result is a scalable multi-column index that supports concurrent writes, and it organizes data in a hierarchical structure — traversing the index from top to bottom, one is to encounter cubes with data that are part of increasing fractions. A simple read protocol can be implemented to retrieve data fractions with minimum data transfer.
Sampling scenarios reading different fractions of a dataset with OTree index. On the left, sampling a 20% of the entire dataset means reading the levels of the tree from top the bottom until a cube with a maxWeight ≥ 0.2 is encountered. Same for the image on the right with f = 0.3.
Next, we will dive into the bits and pieces of the READ and WRITE protocols from OTree and explain how they enable to the benefits mentioned above.
- Cubes are Nodes
- When writing, each element is assigned a randomly generated number called weight. It’s an important parameter that decides whether a cube should admit an incoming record.
- The storage of cubes comprises a payload and an offset, with the latter being temporary storage whose elements are to be eliminated during optimization.
- The element that has the largest weight in a cube’s payload is the maxElement, and its weight is the cube’s maxWeight.
Illustration for cubes in an OTree.
- A necessary condition for a cube to accept a new element e is that e.w < cube.maxWeight.
- Cubes have a state that depends on their size and at which stage of optimization process they are. A cube’s state affects how READs and WRITEs are conducted. Possible cube states are Flooded, Announced, and Replicated.
The OTree WRITE Protocol
To allocate an element e to a given cube, first, the cube boundaries have to contain the values of e, and second, the cube maxWeight has to be smaller than e.w.
Initially, the maxWeight of any cube is set to MaxValue by default, for which all values of weight are acceptable. When the payload is full, it is defined as the largest weight found in the payload.
If both conditions are met, and the payload is full, the maxElement is pushed to the offset to make room for e. The maxWeight is now what used to be the second-largest weight. The same applies to maxElement.
Elements in the offset are there temporarily. When optimization/replication process kicks off, they are copied down to the subtree and the offset itself is removed. A detailed description of the OTree optimization is found in a separate article.
Why Weight? The OTree READ Protocol for Efficient Data Sampling
The simple answer is that it allows us to sample the data efficiently.
Aside from the non-empty inner cubes, there’s not that much difference between a QuadTree and an OTree. Except when one begins to examine the OTree’s data distribution closely from top to bottom.
- Since e.w < cube.maxWeight is required for writing, it’s natural to deduce that maxWeights increases monotonically from the root to any leaf cube.
- Since the weights are randomly generated for each record, and no record redistribution occurs when a node is full, the data contained in each inner cube can statistically represent the data from the cube’s subtree in a degree proportional to the cube size.
- Assuming the range of weights randomly generated is from 0 to 1.0, if a cube has a maxWeight of say 0.5, then all the records in a cube should be part of the first half of the dataset.
Considering the above three points, we can build a sampling operator that doesn’t require the I/O of the whole dataset: If we were to read a fraction f from the dataset, we traverse the tree from top to bottom and stop going further in any branch as soon as a cube with a maxWeight ≥ f is encountered.
Sampling scenarios reading different fractions of a dataset with OTree index. On the left, sampling a 20% of the entire dataset means reading the levels of the tree from top the bottom until a cube with a maxWeight ≥ 0.2 is encountered. Same for the image on the right with fraction = 0.3.
The cases above show that reading a fraction f from the dataset requires only accessing the corresponding cubes. A DFS tree traversal is implemented to read the branches and stop as soon as a cube with maxWeight ≥ f is encountered. The cubes that we don’t access generate an equivalent I/O.
To see the features discussed above in action, you can refer to Qbeast-Spark for advanced SQL query performance and try it out by yourself. It is our first open-source implementation of the OTree algorithm, and we want to invite you to contribute to this project and accelerate this new change.
Want to learn more? Book a call with us!