Tag: Data Engineering

Approximate Queries on Data Lakes with Qbeast-Spark

Approximate Queries on Data Lakes with Qbeast-Spark

Jiawei Hu

Writing procedural programs to analyze data often comes less handy than a declarative approach, for one has to define the exact control flow of the program rather than simply stating the desired outcome. For example, it takes a lot more to write a simple groupby using Python than with SQL.

Other than this apparent inconvenience, there’s also a potential performance drop-off, given that procedural codes are treated as black boxes when it comes to automatic code optimization. On the other hand, the structure of SQL queries are transparent to the underlying query optimizer, and both rule and cost-based optimizations can be applied to improve performance.

This was one of the main issues Apache Spark SQL set out to tackle. A declarative DataFrame API was introduced to integrate relational processing with procedural Spark code, and an extensible query optimizer was implemented to enable query optimization.

In a nutshell, the programs written in procedural languages can leverage the DataFrame API to write declarative code. Before its execution, the Catalyst takes the program for optimization and converts it into underlying Spark RDD procedural code.

This optimizer that sits in the middle does the job of parsing, optimizing, and code generation, so the query is executed with maximum efficiency.

Last time we discussed the bits and pieces of a scalable multi-column indexing technology — OTree algorithm, and how its peculiar inner workings gave us the ability to sample indexed tables with minimum data transfer. This article will discuss how the Efficient Sampling we saw is materialized in Qbeast-Spark — extending Spark SQL query optimizer, Catalyst.

A given SQL query is first parsed during the optimization process to construct an Abstract Syntax Tree, a canonical form of representing queries. This structure is then analyzed from top to bottom, and optimization rules are applied to minimize the data transfer and compute resource consumption.

SELECT
A.name
FROM
A JOIN B ON A.id = B.id
WHERE
CONDITION_ON_A AND CONDITION_ON_B

Taking the above query as an example, a possible syntax tree can be shown in the following image:

  1. Two tables, A and B, are joined.
  2. On the resulting table, we filter the rows by applying conditions cond_A and cond_B.
  3. We retrieve the name column as output.

Optimization rules such as predicate pushdown, constant folding, and projection pruning are applied to improve performance while keeping the query result intact. If we were to apply some of the rules to our tree, a possible output could look like the following.

Cond_A and cond_B are pushed down to the data source to reduce I/O; with the help of metadata, data files with values out of the desired range are discarded before reading. This results in fewer I/O and fewer compute resource requirements for the subsequent JOIN operation.

Hands-on

To demonstrate how easy it is to extend the Catalyst, we will implement a custom Spark Session Extension by adding a new optimization rule that modifies the behavior of the Sample operator. More details here.

As shown in the code snippet below, we implement a custom optimization rule, MyOptimizationRule, that takes a Sample operator with an upperBound of 0.1 and changes it to 0.2.

Once done, we need to add this rule to the Logical Plan Optimization Pipeline by injecting the rule into the SparkSessionExtensions.

To see the behavior in action, open spark-shell with the proper configurations and check the Sample query plan by running df.sample(0.1).explain(true).

As shown below, before the optimization, the query plan had upperBound=0.1(see Parsed Logical Plan and Analyzed Logical Plan); this is converted to 0.2 by our MyOptimizationRule class, as shown in Optimized Logical Plan and Physical Plan.

Qbeast Efficient Sampling

In the last article, we’ve seen that the WRITE protocol from OTree can arrange data in a hierarchical structure that, when traversing the tree from top to bottom, one is to encounter cubes with data that are part of an increasing fraction of the dataset.

Take the index tree from the above image as an example; a cube with maxWeight = 0.2 implies that all data contained are part of the first 20% of the dataset.

maxWeight increases monotonically from root to leaf in any branch. And since weights are generated randomly, the WRITE protocol ensures that the data contained in a given cube can be treated as a representation of its subtree.

With all this information in mind, it is possible to implement an efficient Sample operator that, by accessing targeted cubes according to the provided sample fraction, can generate statistically representative subsets with minimum I/O.

When reading a given sample fraction from the source, we start traversing the tree from the root following the path defined by all branches in the DFS fashion. Whenever a maxWeight ≥ fraction is encountered, we read its content, stop going further, and jump to the next branch. In this way, we collect data from all those cubes with maxWeight ≤ f.

Qbeast-spark, in essence, does this behind the scenes by injecting optimization rules through session extensions. When we capture a Sample operator on top of a QbeastDataSource, our rule transforms it into a set of filters applied on the indexed columns and the maxWeight. Spark SQL later further optimizes these filters to select the target cubes, resulting in an optimized query plan.

The image above shows the query plan of a sample operation on a QbeastDataSource. We can appreciate what initially is a Sample operator is converted into a Filter during Rule-based Optimization. The sample fraction is used to define a cube maxWeight for filtering.

This filter is used during FileScan to select targeting cubes and applied later on individual rows to further reduce the data size.

The I/O saving can be observed in Spark UI of the corresponding SQL query. The entire dataset is 254,975 rows, and thanks to the Qbeast Sampling, only 100,038 rows are retrieved from the secondary storage. The filters are reused to reduce the row number to match the sample fraction.

To our knowledge, the same operation done in any other data format would trigger 100% I/O. Below is the query plan for sample(0.3) on the same dataset written in Apache Parquet format. It reads all the files from the dataset, invoking a complete I/O, reflected on the size of files read and the number of output rows from Scan parquet.

This ability of Sampling data efficiently paves the way for approximated queries where the accuracy of the query outcome is secondary to the query execution speed. For example, the user can be flexible on query output because the last few decimals of aggregation aren’t as important as having the results quickly.

It is in these scenarios where Efficient Data Sampling is crucial.


If I have sparked an interest in knowing more about Qbeast-Spark in you, do not hesitate to take a look at the GitHub repo and try it out by yourself.

Happy Learning.

About Qbeast
Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
© 2020 Qbeast. All rights reserved.
Share:

Back to menu

Continue reading

Indexing and Sampling on Data Lake(house)s with Qbeast-Spark

Indexing and Sampling on Data Lake(house)s with Qbeast-Spark

Creating and leveraging indexes is an essential feature in DBMS and data warehouses to improve query speed. When applying filters in a SQL query, the index is first consulted to locate the target data before any read. In this way, we avoid reading the entire table from the storage, reducing the data transfer involved.

For a query like the following, if the sales_table is indexed on the year column, we can directly access the data from 2020 and onwards, avoiding sequential scan on the entire table. Search algorithms such as Binary Search can be leveraged to achieve sub-linear time complexity by sorting the table by year.

SELECT
*
FROM
sales_table
WHERE
year >= 2020

Given this obvious benefit, one of the first things to do when creating a table in a DBMS is to define the indexes you want, using one or more columns(or any combination of them).

Generally, the column defined as the primary key is used as a clustered index, the order of which dictates the arrangement of the table rows. One can also create non-clustered indexes, keeping in mind that each index is a separate data structure that requires additional storage space. Non-clustered indexes don’t affect the table rows’ ordering; they are used to locate data, and an index-table link is used for data access.

In our last article, we discussed the need for Data Lakehouses, how they could alleviate many of the burdens in big data, and the role of Qbeast-format. To summarize, Data Lakehouses add a metadata management layer on the Data Lake to achieve ACID properties and schema enforcement, among other benefits. Qbeast-format’s goal is to add multiple additional features to the Lakehouse, such as multi-dimensional indexing, efficient sampling, and table tolerance, to reduce data transfer for queries further, making Data Lakehouses even more powerful.

This time, we will explain the details behind the key features of Qbeast-format, emphasizing its benefits and inner workings by unfolding the OTree Algorithm.

With Qbeast-format, more specifically its implementation Qbeast-spark, one can create a clustered multi-dimensional index for their table stored in a Data Lake with a single line of code. With that, you can directly access your Data Lake(house) and start running queries with indexing support.

# Indexing a Spark SQL DataFrame on four columns with Qbeast-spark
(
df.write
.format('qbeast')
.option('columnsToIndex', 'col_1,col_2,col_3,col_4')
.save('./qbeast_table')
)

But just before we dive in, let’s have a quick rewind on how things have been up until now and where it can potentially go.

Content:

  • Single and multi-column indexes
  • Spatial index
  • Recursive Space Division Algorithm
  • OTree Algorithm

How to implement a multi-column index?

Implementing a single-column index is straightforward — take the indexed column, order the column records according to their values, and sort the table records according to the index if the index is clustered. When reading, use the index to find the target records. If the index is non-clustered, an index-record link is used for data access.

The construction of multi-column indexing is a bit more complex, for there’s no natural way of sorting the records that respect the natural order of all individual columns. For instance, arranging rows with columns (age, height, weight) can only be done by prioritizing one column over another.

One example could be first order the records by age. For those that have the same age, order them by height. And for those that have the same height, order them by weight.

Studens_table
(age, height, weight)

(15 , 160 , 65 )
(15 , 163 , 66 )
(15 , 170 , 70 )
(16 , 158 , 60 )
(18 , 162 , 75 )
(18 , 162 , 76 )
(18 , 180 , 70 )
(18 , 180 , 72 )
(23 , 175 , 78 )
(23 , 176 , 69 )

A direct consequence of this is that only the first column is ordered. If we were to filter the records by height and height alone, this index is of no use. The same applies to weight. If we search by (age, height), the target ages would be found quickly, but a sequential scan would be needed to locate the target heights.

This limitation is attributed to the interdependency between the columns when sorting, which gave birth to Spatial Indexes.

Spatial Index

The idea is to construct a vector space with the target columns and position the records inside the space according to their values. This way, all columns involved are mutually independent, each defining a dimension in the space. When filtering, one can use any combination of the indexed columns, and no restriction as before is implied.

Take the image below as an example, we index our dataset with columns X and Y, and construct a two-dimensional space for the records. The position of the records depend only on their values for the indexed columns. When filtering the table using any combination of these columns, we search on the vector space to locate the records.

Illustration of queries that target specific data. The dataset is indexed on columns x and y. The two blue rectangles illustrate the range constraints on the indexed columns, only the elements inside these regions.

In the image above, limiting values for both X and Y results in retrieving elements from the small blue rectangle region on the left. When using only one column, such as the case of the long rectangle orthogonal to the X axis, we can simply limit the values for the X dimension to the defined range, regardless of the column Y.

Recursive Division

To improve filtering speed and reduce retrieval time, we split the high-density regions of the vector space into subspaces in a recursive fashion, thus the Recursive Space Division(RSD).

Regions are divided until the number of elements in each subspace is no larger than a predefined capacity. All the elements in space are distributed among the subspaces for each division. In the case of a QuadTree, each division halves the ranges in all dimensions, resulting in non-overlapping subspaces with well-defined boundaries that evenly split the region. If the number of dimensions is n, it creates 2^n subspaces in each split.

The images below show the division process of two-dimensional space; notice the number of divisions and the density of each area.

Recursive Space Division for QuadTree, first iteration from left to right.

Recursive Space Division for QuadTree. The space is divided for those regions where the number of elements contains is more than 2. The top-left square experienced more divisions for its higher density of data points.

A Tree-like representation of the example above is shown in the image below. An empty string “ ” is used to represent the root, and numbers 0, 1, 2, and 3 are used to encode the subspaces ordered clockwise. Each node in the tree is either a leaf node containing elements equal to or less than the predefined capacity or an internal node with four children and has no records.

Tree-like representation for QuadTree. Nodes split into child nodes when the capacity is exceeded. Nodes in red contain no pointers after splitting.

Data pointers are stored in the tree nodes to reference the table records. Each time a node is split(red nodes in the image above), the pointers contained are classified and distributed among the child nodes, leaving the node itself empty.

If we were to query the elements that fall inside the boundaries of node 020, nodes 0200, 0201, 0203, 02020, 02021, 02022, and 02023 are read, with 020, and 0202 being left out.

Keeping the Index Updated

Writing new records to an indexed dataset requires finding their correct positioning. For a single-column index, as discussed at the beginning, the task is the same as inserting an element into a sorted array. For a spatial index like QuadTree, the idea would be to find the node with the right boundaries to accommodate the record. It starts from the root and navigates down the tree.

As shown in the following image, the values of the new record determines the node(or subspace) it should go. If the node is an inner node, then we should choose the correct child node for allocation.

Element e should go to the top-left cube. 0 < a/3 < a/2, 0 < b/3 < b/2.

A space division can be triggered during WRITES that put a number of records in a node that exceeds the capacity. The contained records are classified and redistributed among the subtree. This process can take a long time, affecting the write speed, reducing concurrency, and often requiring locks to achieve isolation.

Space division from a QuadTree. The elements from a split node are classified and redistributed among the child nodes.

As shown in the above image, a write encounters nodeX to be full which triggers an optimization process that requires splitting the space by creating child cubes(X1, X2, X3, and X4), reading all the records from nodeX, assigning and sending the records to the corresponding child cube according to their values.

A QuadTree index as described is not scalable. For this reason OTree is introduced.

OTree Algorithm

An OTree Index is designed to be eventually optimal so concurrent writes don’t need to acquire locks. It’s optimized for immutable cloud object stores and is based on open formats such as JSON and Parquet. Features such as Multi-dimensional Indexing and Efficient Sampling are at the core of Qbeast-format, and they are both empowered by the OTree Algorithm.

When a node is full, the classification and redistribution process of existing records is removed for better concurrency. The nodes now have a soft limit as capacity. When encountering a full node, new records can be either squeezed into it or allocated in a newly created child node, always leaving the existing records intact. (More on this later!)

OTree algorithm. Writing an element(E) with a weight(w) to cube X. If w ≥ maxWeight we skip to its correct child cube until finding a cube such that w < maxWeight. The algorithm creates child cubes if cube X is a leaf.

The result is a scalable multi-column index that supports concurrent writes, and it organizes data in a hierarchical structure — traversing the index from top to bottom, one is to encounter cubes with data that are part of increasing fractions. A simple read protocol can be implemented to retrieve data fractions with minimum data transfer.

Sampling scenarios reading different fractions of a dataset with OTree index. On the left, sampling a 20% of the entire dataset means reading the levels of the tree from top the bottom until a cube with a maxWeight ≥ 0.2 is encountered. Same for the image on the right with f = 0.3.

Next, we will dive into the bits and pieces of the READ and WRITE protocols from OTree and explain how they enable to the benefits mentioned above.

OTree Notions

  • Cubes are Nodes
  • When writing, each element is assigned a randomly generated number called weight. It’s an important parameter that decides whether a cube should admit an incoming record.
  • The storage of cubes comprises a payload and an offset, with the latter being temporary storage whose elements are to be eliminated during optimization.
  • The element that has the largest weight in a cube’s payload is the maxElement, and its weight is the cube’s maxWeight.

Illustration for cubes in an OTree.

  • A necessary condition for a cube to accept a new element e is that e.w < cube.maxWeight.
  • Cubes have a state that depends on their size and at which stage of optimization process they are. A cube’s state affects how READs and WRITEs are conducted. Possible cube states are Flooded, Announced, and Replicated.

The OTree WRITE Protocol

To allocate an element e to a given cube, first, the cube boundaries have to contain the values of e, and second, the cube maxWeight has to be smaller than e.w.

Initially, the maxWeight of any cube is set to MaxValue by default, for which all values of weight are acceptable. When the payload is full, it is defined as the largest weight found in the payload.

If both conditions are met, and the payload is full, the maxElement is pushed to the offset to make room for e. The maxWeight is now what used to be the second-largest weight. The same applies to maxElement.

Elements in the offset are there temporarily. When optimization/replication process kicks off, they are copied down to the subtree and the offset itself is removed. A detailed description of the OTree optimization is found in a separate article.

Why Weight? The OTree READ Protocol

The simple answer is that it allows us to sample the data efficiently.

Aside from the non-empty inner cubes, there’s not that much difference between a QuadTree and an OTree. Except when one begins to examine the OTree’s data distribution closely from top to bottom.

  • Since e.w < cube.maxWeight is required for writing, it’s natural to deduce that maxWeights increases monotonically from the root to any leaf cube.
  • Since the weights are randomly generated for each record, and no record redistribution occurs when a node is full, the data contained in each inner cube can statistically represent the data from the cube’s subtree in a degree proportional to the cube size.
  • Assuming the range of weights randomly generated is from 0 to 1.0, if a cube has a maxWeight of say 0.5, then all the records in a cube should be part of the first half of the dataset.

Considering the above three points, we can build a sampling operator that doesn’t require the I/O of the whole dataset: If we were to read a fraction f from the dataset, we traverse the tree from top to bottom and stop going further in any branch as soon as a cube with a maxWeight ≥ f is encountered.

Sampling scenarios reading different fractions of a dataset with OTree index. On the left, sampling a 20% of the entire dataset means reading the levels of the tree from top the bottom until a cube with a maxWeight ≥ 0.2 is encountered. Same for the image on the right with fraction = 0.3.

The cases above show that reading a fraction f from the dataset requires only accessing the corresponding cubes. A DFS tree traversal is implemented to read the branches and stop as soon as a cube with maxWeight ≥ f is encountered. The cubes that we don’t access generate an equivalent I/O.

To see the features discussed above in action, you can refer to Qbeast-Spark and try it out by yourself. It is our first open-source implementation of the OTree algorithm, and we want to invite you to contribute to this project and accelerate this new change.

Happy Learning.

About Qbeast
Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
© 2020 Qbeast. All rights reserved.
Share:

Back to menu

Continue reading

How Qbeast solves the pain chain of Big Data analytics

Are you ready to find out how speeding up data analysis by up to 100x solves data teams’ pain points?

Well, first let me give you some background information. According to a survey conducted by Ascend.io and published in July 2020, 97% of data teams are above or at work capacity.¹ Given that every day more and more data is generated and stored, this is not good news for data teams and organizations. Yet, the capability to leverage data in business has never been more critical.

The pain chain

The survey states that the ability to meet data needs is significantly impacted by slow iteration cycles in data teams. This aligns with the feedback that we received from our customers’ data teams as well.

To explain why iteration cycles are slow, let’s use the concept of the pain chain. The pain chain was first introduced by Keith M. Eades and is a map to describe a sequence of problems in an organization’s process.² The pain of one role in the company causes the pain of another function. In our case, the data pain chain starts with the Data Engineer, follows to the Data Scientist, and finally involves the decision-makers. To keep in mind, the data engineer is the one who prepares the data. The data scientist uses this data to create valuable and actionable insights. And well, the decision-maker is a project manager, for example, who wants to get a data-driven project done.

The survey found that data scientists are the most impacted by the dependency on others, such as data engineers, to access the data and the systems (48%). On the other hand, data engineers spend most of their time maintaining existing and legacy systems (54%).

How does this impact the decision-maker? Well, it leads to a significant loss of value due to delayed implementation of data products or because they cannot be implemented at all.

How do we solve it

Qbeast’s solution tackles the pain chain on several fronts to eliminate it altogether.

Front 1: Data Engineering

There is nothing more time consuming and nerve-racking than maintaining and building complex ETL pipelines.

Less complexity and more flexibility with an innovative storage architecture

Can’t we just work without ETL pipelines? You may say yes, we can use a data lake instead of a data warehouse. We can keep all the data in the data lake and query it directly from there. The downside? Querying is slow and processing all the data is expensive. But what if you could query all the data directly without sacrificing speed and cost?

With Qbeast, you can store all the data in your data lake. We organize the data so that you can find what exactly you are looking for. Even better, we can answer queries by reading only a small sample of the dataset. And you can use your favorite programming languages, be it Scala, Java, Python, or R.

How do we do this? With our storage technology, we combine multidimensional indexing and statistical sampling. Check out this scientific paper³ to find out more.

Our technology’s advantage is that we can offer superior query speed than data warehouses while keeping the data lakes’ flexibility. No ETL pipelines but fast and cost-effective. The best of both worlds, so to speak.

Front 2: Data Science

We know that if you are a data scientist, you do not care so much about pipelines. You want to get all the data you need to tune your model. And it is a pain to rely on a data engineer every time you need to query a large dataset. You are losing time, and you can’t focus on the things that matter. But what if you could decide the time required to run your query yourself?

Data Leverage

By analyzing the data with a tolerance limit, you can decide how long to wait for a query and adjust the precision to your use case. Yes, this means that you can run a query on whatever you want. Do you want to know the number of sales in the last months? Full precision! But do you really need to scan your whole data lake to see the percentage of male users? Probably not.

With Qbeast, you can get the results you need while accessing only a minimum amount of available data. We call this concept Data Leverage. With this option, you can speed up queries by up to 100x compared to running state-of-the-art query engines such as Apache Spark.

Conclusion

A storage system, which unites multidimensional indexing techniques and statistical sampling, solves the data analytics pain chain by speeding up queries, reducing complexity, and adding flexibility. This results in a significant speed-up of iteration cycles in data teams. Increased productivity and speed of data analysis itself have a colossal impact on the ability to meet data needs and to create superior data products. And above all, alleviating the pain chain results in a happy data team, decision-makers, and customers.

But the pain chain doesn’t end here! Now it is time for the application developers to pick up all the insights uncovered by the data scientists and use them to build amazing products! That’s a topic for another post, but I bet you have guessed; we have a solution for that too.

References

1. Team Ascend. “New Research Reveals 97% of Data Teams Are at or Over Capacity”, Ascend.io, 23 July 2020, New Research Reveals 97% of Data Teams Are at or Over Capacity. Accessed 28 December 2020.

2. Eades, Keith M., The New Solution Selling: The Revolutionary Sales Process That is Changing the Way People Sell, McGraw-Hill, 2004.

3. C. Cugnasco et al., “The OTree: Multidimensional Indexing with efficient data Sampling for HPC,” 2019 IEEE International Conference on Big Data (Big Data), Los Angeles, CA, USA, 2019, pp. 433–440, doi: 10.1109/BigData47090.2019.9006121.

For further information on BTTG, please visit the BTTG website.

About Qbeast
Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
© 2020 Qbeast. All rights reserved.
Share:

Back to menu

Continue reading

Contact us info@qbeast.io

C/ Roc Boronat 117, 2a Planta, 08018 Barcelona

© 2020 Qbeast
Design by Xurris