One common problem that we heard about is the scalability of ML models. Even if you are using simple ML models like support vector machines, linear regression, or neural networks, when you use them at large scale they can be very slow. Luckily for us,we can use sampling as a trick to increase speed without compromising accuracy. This is what the Qbeast Format can help us with, and we’ll see how to take advantage of its sampling capability.
One of the uses of machine learning algorithms is detecting fraudulent transactions by feeding a model with historical transaction data.
In this case we used the following dataset Fraudulent Transactions Prediction which is provided in csv format and can be directly loaded with Spark. But first let’s get a general idea of the data structure and properties.
DATA PREPARATION
This can be easily done using pandas:
The ‘type’ column is a categorical one, which needs to be encoded. ‘nameOrig’ and ‘nameDest’ columns are strings that in this case can proportionate little information, so it’s better to remove them. The ‘amount’ and balance columns have a clearly wider range than other columns as the first step.
It is clear that some preprocessing is needed:
columns = [col for col in whole_dataset.columns]
# Removing the string columns
columns.remove("nameOrig")
columns.remove("nameDest")
columns.remove("isFraud")
# Removing columns to preprocess
columns.remove("type")
columns.remove("step")
#Encoding of categorical features
le = LabelEncoder()
whole_dataset["type"] = le.fit_transform(whole_dataset["type"])
#Scaling the numeric features
scaler = StandardScaler()
whole_dataset[columns] = scaler.fit_transform(whole_dataset[columns])
#Add unscaled features again
columns.append("type")
columns.append("step")
To be able to test our algorithm properly, we’ll need a train set and a test set:
Indexing it by those four columns we assure that our samples will be randomly distributed in each of those four dimensions, conserving the distribution of the whole data.
TRAINING
We tried to check how much the training process of a random forest classifier could be sped up by using the Qbeast sampling mechanism.
We performed the same experiment for different sampling percentages. We trained a random forest using only the retrieved data for a given percentage of sampling, and repeated the experiment several times for each value to smooth possible random fluctuations (and rebooting the random seed for each percentage)
We used the BSC’s dislib library to perform our experiments, however, if you are not familiar with it, the sklearn library should provide similar results.
for p in precisions:
print("Computing with precision= : "+str(p))
# Preprocessing
random.seed(4)
np.random.seed(4)
rt0 = time.time()
train = read_sample_pandas(table_path, p)
rt1 = time.time()
read_time_dict[p] = rt1-rt0
dataset = pandas.concat(objs=[train, test_data], axis=0)
dataset_preprocessed = pandas.get_dummies(dataset, columns=["type"])
train = dataset_preprocessed[:train_objs_num]
test = dataset_preprocessed[train_objs_num:]
columns = [col for col in test.columns]
columns.remove("isFraud")
columns.remove("step")
Xtrain, Ytrain = train[columns].to_numpy(),
le.transform(train["isFraud"].to_numpy())
Xtestwhole, Ytestwhole = test[columns].to_numpy(),
le.transform(test["isFraud"].to_numpy())
#Random forest
time_dict[p]=[]
prediction_dict[p] = []
for i in range(10):
start = time.time()
rf = RandomForestClassifier()
blk_size = (int(math.ceil(Xtrain.shape[0]/10)), Xtrain.shape[1])
Ytrain= Ytrain.reshape(len(Ytrain), 1)
print("Fitting!")
rf.fit(ds.array(Xtrain, block_size=blk_size),
ds.array(Ytrain, block_size=(blk_size[0], 1)))
print("Predicting!")
pred = rf.predict(ds.array(Xtestwhole, block_size=blk_size))
end = time.time()
time_dict[p].append(end-start)
label_dict[p]=Ytestwhole
prediction_dict[p].append(pred.collect())
RESULTS
We also measured the reading and training for a range of different percentages:
As expected, the execution time increases as the training is performed with more data.
Regarding the accuracy, a simple random forest can achieve a really high accuracy since the classes in this dataset are highly imbalanced. So even classifying all samples as not fraudulent would give a high accuracy.
In this case the F1 score is more appropriate than the accuracy, because it gives importance not only to precision, but also to the recall, sothe false negatives (which we want to avoid, so we don’t take any fraudulent as a good one) have more impact on this metric.
The results for the F1 score are the following ones:
CONCLUSION
The results are good even when retrieving a small percentage of the whole data, being almost identical to the ‘real’ F1 with 40% of the data, while the time needed for reading and training the algorithm is around 20% of the time we would need reading the whole dataset. Even with 20% of the data, we get good results spending 10x less time on the training process. This confirms that the performed samplings are statistically significant, so the distribution of the data is conserved and therefore the training process is optimal.
About Qbeast Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
Approximate Queries on Data Lakes with Qbeast-Spark
Jiawei Hu
Writing procedural programs to analyze data often comes less handy than a declarative approach, for one has to define the exact control flow of the program rather than simply stating the desired outcome. For example, it takes a lot more to write a simple groupby using Python than with SQL.
Other than this apparent inconvenience, there’s also a potential performance drop-off, given that procedural codes are treated as black boxes when it comes to automatic code optimization. On the other hand, the structure of SQL queries are transparent to the underlying query optimizer, and both rule and cost-based optimizations can be applied to improve performance.
This was one of the main issues Apache Spark SQL set out to tackle. A declarative DataFrame API was introduced to integrate relational processing with procedural Spark code, and an extensible query optimizer was implemented to enable query optimization.
In a nutshell, the programs written in procedural languages can leverage the DataFrame API to write declarative code. Before its execution, the Catalyst takes the program for optimization and converts it into underlying Spark RDD procedural code.
This optimizer that sits in the middle does the job of parsing, optimizing, and code generation, so the query is executed with maximum efficiency.
Last time we discussed the bits and pieces of a scalable multi-column indexing technology — OTree algorithm, and how its peculiar inner workings gave us the ability to sample indexed tables with minimum data transfer. This article will discuss how the Efficient Sampling we saw is materialized in Qbeast-Spark — extending Spark SQL query optimizer, Catalyst.
A given SQL query is first parsed during the optimization process to construct an Abstract Syntax Tree, a canonical form of representing queries. This structure is then analyzed from top to bottom, and optimization rules are applied to minimize the data transfer and compute resource consumption.
SELECT A.name FROM A JOIN B ON A.id = B.id WHERE CONDITION_ON_A AND CONDITION_ON_B
Taking the above query as an example, a possible syntax tree can be shown in the following image:
Two tables, A and B, are joined.
On the resulting table, we filter the rows by applying conditions cond_A and cond_B.
We retrieve the name column as output.
Optimization rules such as predicate pushdown, constant folding, and projection pruning are applied to improve performance while keeping the query result intact. If we were to apply some of the rules to our tree, a possible output could look like the following.
Cond_A and cond_B are pushed down to the data source to reduce I/O; with the help of metadata, data files with values out of the desired range are discarded before reading. This results in fewer I/O and fewer compute resource requirements for the subsequent JOIN operation.
Hands-on
To demonstrate how easy it is to extend the Catalyst, we will implement a custom Spark Session Extension by adding a new optimization rule that modifies the behavior of the Sample operator. More details here.
As shown in the code snippet below, we implement a custom optimization rule, MyOptimizationRule, that takes a Sample operator with an upperBound of 0.1 and changes it to 0.2.
Once done, we need to add this rule to the Logical Plan Optimization Pipeline by injecting the rule into the SparkSessionExtensions.
To see the behavior in action, open spark-shell with the proper configurations and check the Sample query plan by running df.sample(0.1).explain(true).
As shown below, before the optimization, the query plan had upperBound=0.1(see Parsed Logical Plan and Analyzed Logical Plan); this is converted to 0.2 by our MyOptimizationRule class, as shown in Optimized Logical Plan and Physical Plan.
Qbeast Efficient Sampling
In the last article, we’ve seen that the WRITE protocol from OTree can arrange data in a hierarchical structure that, when traversing the tree from top to bottom, one is to encounter cubes with data that are part of an increasing fraction of the dataset.
Take the index tree from the above image as an example; a cube with maxWeight = 0.2 implies that all data contained are part of the first 20% of the dataset.
maxWeight increases monotonically from root to leaf in any branch. And since weights are generated randomly, the WRITE protocol ensures that the data contained in a given cube can be treated as a representation of its subtree.
With all this information in mind, it is possible to implement an efficient Sample operator that, by accessing targeted cubes according to the provided sample fraction, can generate statistically representative subsets with minimum I/O.
When reading a given sample fraction from the source, we start traversing the tree from the root following the path defined by all branches in the DFS fashion. Whenever a maxWeight ≥ fraction is encountered, we read its content, stop going further, and jump to the next branch. In this way, we collect data from all those cubes with maxWeight ≤ f.
Qbeast-spark, in essence, does this behind the scenes by injecting optimization rules through session extensions. When we capture a Sample operator on top of a QbeastDataSource, our rule transforms it into a set of filters applied on the indexed columns and the maxWeight. Spark SQL later further optimizes these filters to select the target cubes, resulting in an optimized query plan.
The image above shows the query plan of a sample operation on a QbeastDataSource. We can appreciate what initially is a Sample operator is converted into a Filter during Rule-based Optimization. The sample fraction is used to define a cube maxWeight for filtering.
This filter is used during FileScan to select targeting cubes and applied later on individual rows to further reduce the data size.
The I/O saving can be observed in Spark UI of the corresponding SQL query. The entire dataset is 254,975 rows, and thanks to the Qbeast Sampling, only 100,038 rows are retrieved from the secondary storage. The filters are reused to reduce the row number to match the sample fraction.
To our knowledge, the same operation done in any other data format would trigger 100% I/O. Below is the query plan for sample(0.3) on the same dataset written in Apache Parquet format. It reads all the files from the dataset, invoking a complete I/O, reflected on the size of files read and the number of output rows from Scan parquet.
This ability of Sampling data efficiently paves the way for approximated queries where the accuracy of the query outcome is secondary to the query execution speed. For example, the user can be flexible on query output because the last few decimals of aggregation aren’t as important as having the results quickly.
It is in these scenarios where Efficient Data Sampling is crucial.
If I have sparked an interest in knowing more about Qbeast-Spark in you, do not hesitate to take a look at the GitHub repo and try it out by yourself.
Happy Learning.
About Qbeast Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
Indexing and Sampling on Data Lake(house)s with Qbeast-Spark
Creating and leveraging indexes is an essential feature in DBMS and data warehouses to improve query speed. When applying filters in a SQL query, the index is first consulted to locate the target data before any read. In this way, we avoid reading the entire table from the storage, reducing the data transfer involved.
For a query like the following, if the sales_table is indexed on the year column, we can directly access the data from 2020 and onwards, avoiding sequential scan on the entire table. Search algorithms such as Binary Search can be leveraged to achieve sub-linear time complexity by sorting the table by year.
SELECT * FROM sales_table WHERE year >= 2020
Given this obvious benefit, one of the first things to do when creating a table in a DBMS is to define the indexes you want, using one or more columns(or any combination of them).
Generally, the column defined as the primary key is used as a clustered index, the order of which dictates the arrangement of the table rows. One can also create non-clustered indexes, keeping in mind that each index is a separate data structure that requires additional storage space. Non-clustered indexes don’t affect the table rows’ ordering; they are used to locate data, and an index-table link is used for data access.
In our last article, we discussed the need for Data Lakehouses, how they could alleviate many of the burdens in big data, and the role of Qbeast-format. To summarize, Data Lakehouses add a metadata management layer on the Data Lake to achieve ACID properties and schema enforcement, among other benefits. Qbeast-format’s goal is to add multiple additional features to the Lakehouse, such as multi-dimensional indexing, efficient sampling, and table tolerance, to reduce data transfer for queries further, making Data Lakehouses even more powerful.
This time, we will explain the details behind the key features of Qbeast-format, emphasizing its benefits and inner workings by unfolding the OTree Algorithm.
With Qbeast-format, more specifically its implementation Qbeast-spark, one can create a clustered multi-dimensional index for their table stored in a Data Lake with a single line of code. With that, you can directly access your Data Lake(house) and start running queries with indexing support.
# Indexing a Spark SQL DataFrame on four columns with Qbeast-spark ( df.write .format('qbeast') .option('columnsToIndex', 'col_1,col_2,col_3,col_4') .save('./qbeast_table') )
But just before we dive in, let’s have a quick rewind on how things have been up until now and where it can potentially go.
Content:
Single and multi-column indexes
Spatial index
Recursive Space Division Algorithm
OTree Algorithm
How to implement a multi-column index?
Implementing a single-column index is straightforward — take the indexed column, order the column records according to their values, and sort the table records according to the index if the index is clustered. When reading, use the index to find the target records. If the index is non-clustered, an index-record link is used for data access.
The construction of multi-column indexing is a bit more complex, for there’s no natural way of sorting the records that respect the natural order of all individual columns. For instance, arranging rows with columns (age, height, weight) can only be done by prioritizing one column over another.
One example could be first order the records by age. For those that have the same age, order them by height. And for those that have the same height, order them by weight.
A direct consequence of this is that only the first column is ordered. If we were to filter the records by height and height alone, this index is of no use. The same applies to weight. If we search by (age, height), the target ages would be found quickly, but a sequential scan would be needed to locate the target heights.
This limitation is attributed to the interdependency between the columns when sorting, which gave birth to Spatial Indexes.
Spatial Index
The idea is to construct a vector space with the target columns and position the records inside the space according to their values. This way, all columns involved are mutually independent, each defining a dimension in the space. When filtering, one can use any combination of the indexed columns, and no restriction as before is implied.
Take the image below as an example, we index our dataset with columns X and Y, and construct a two-dimensional space for the records. The position of the records depend only on their values for the indexed columns. When filtering the table using any combination of these columns, we search on the vector space to locate the records.
Illustration of queries that target specific data. The dataset is indexed on columns x and y. The two blue rectangles illustrate the range constraints on the indexed columns, only the elements inside these regions.
In the image above, limiting values for both X and Y results in retrieving elements from the small blue rectangle region on the left. When using only one column, such as the case of the long rectangle orthogonal to the X axis, we can simply limit the values for the X dimension to the defined range, regardless of the column Y.
Recursive Division
To improve filtering speed and reduce retrieval time, we split the high-density regions of the vector space into subspaces in a recursive fashion, thus the Recursive Space Division(RSD).
Regions are divided until the number of elements in each subspace is no larger than a predefined capacity. All the elements in space are distributed among the subspaces for each division. In the case of a QuadTree, each division halves the ranges in all dimensions, resulting in non-overlapping subspaces with well-defined boundaries that evenly split the region. If the number of dimensions is n, it creates 2^n subspaces in each split.
The images below show the division process of two-dimensional space; notice the number of divisions and the density of each area.
Recursive Space Division for QuadTree, first iteration from left to right.
Recursive Space Division for QuadTree. The space is divided for those regions where the number of elements contains is more than 2. The top-left square experienced more divisions for its higher density of data points.
A Tree-like representation of the example above is shown in the image below. An empty string “ ” is used to represent the root, and numbers 0, 1, 2, and 3 are used to encode the subspaces ordered clockwise. Each node in the tree is either a leaf node containing elements equal to or less than the predefined capacity or an internal node with four children and has no records.
Tree-like representation for QuadTree. Nodes split into child nodes when the capacity is exceeded. Nodes in red contain no pointers after splitting.
Data pointers are stored in the tree nodes to reference the table records. Each time a node is split(red nodes in the image above), the pointers contained are classified and distributed among the child nodes, leaving the node itself empty.
If we were to query the elements that fall inside the boundaries of node 020, nodes 0200, 0201, 0203, 02020, 02021, 02022, and 02023 are read, with 020, and 0202 being left out.
Keeping the Index Updated
Writing new records to an indexed dataset requires finding their correct positioning. For a single-column index, as discussed at the beginning, the task is the same as inserting an element into a sorted array. For a spatial index like QuadTree, the idea would be to find the node with the right boundaries to accommodate the record. It starts from the root and navigates down the tree.
As shown in the following image, the values of the new record determines the node(or subspace) it should go. If the node is an inner node, then we should choose the correct child node for allocation.
Element e should go to the top-left cube. 0 < a/3 < a/2, 0 < b/3 < b/2.
A space division can be triggered during WRITES that put a number of records in a node that exceeds the capacity. The contained records are classified and redistributed among the subtree. This process can take a long time, affecting the write speed, reducing concurrency, and often requiring locks to achieve isolation.
Space division from a QuadTree. The elements from a split node are classified and redistributed among the child nodes.
As shown in the above image, a write encounters nodeX to be full which triggers an optimization process that requires splitting the space by creating child cubes(X1, X2, X3, and X4), reading all the records from nodeX, assigning and sending the records to the corresponding child cube according to their values.
A QuadTree index as described is not scalable. For this reason OTree is introduced.
OTree Algorithm
An OTree Index is designed to be eventually optimal so concurrent writes don’t need to acquire locks. It’s optimized for immutable cloud object stores and is based on open formats such as JSON and Parquet. Features such as Multi-dimensionalIndexing and Efficient Sampling are at the core of Qbeast-format, and they are both empowered by the OTree Algorithm.
When a node is full, the classification and redistribution process of existing records is removed for better concurrency. The nodes now have a soft limit as capacity. When encountering a full node, new records can be either squeezed into it or allocated in a newly created child node, always leaving the existing records intact. (More on this later!)
OTree algorithm. Writing an element(E) with a weight(w) to cube X. If w ≥ maxWeight we skip to its correct child cube until finding a cube such that w < maxWeight. The algorithm creates child cubes if cube X is a leaf.
The result is a scalable multi-column index that supports concurrent writes, and it organizes data in a hierarchical structure — traversing the index from top to bottom, one is to encounter cubes with data that are part of increasing fractions. A simple read protocol can be implemented to retrieve data fractions with minimum data transfer.
Sampling scenarios reading different fractions of a dataset with OTree index. On the left, sampling a 20% of the entire dataset means reading the levels of the tree from top the bottom until a cube with a maxWeight ≥ 0.2 is encountered. Same for the image on the right with f = 0.3.
Next, we will dive into the bits and pieces of the READ and WRITE protocols from OTree and explain how they enable to the benefits mentioned above.
OTree Notions
Cubes are Nodes
When writing, each element is assigned a randomly generated number called weight. It’s an important parameter that decides whether a cube should admit an incoming record.
The storage of cubes comprises a payload and an offset, with the latter being temporary storage whose elements are to be eliminated during optimization.
The element that has the largest weight in a cube’s payload is the maxElement, and its weight is the cube’s maxWeight.
Illustration for cubes in an OTree.
A necessary condition for a cube to accept a new element e is that e.w < cube.maxWeight.
Cubes have a state that depends on their size and at which stage of optimization process they are. A cube’s state affects how READs and WRITEs are conducted. Possible cube states are Flooded, Announced, and Replicated.
The OTree WRITE Protocol
To allocate an element e to a given cube, first, the cube boundaries have to contain the values of e, and second, the cube maxWeight has to be smaller than e.w.
Initially, the maxWeight of any cube is set to MaxValue by default, for which all values of weight are acceptable. When the payload is full, it is defined as the largest weight found in the payload.
If both conditions are met, and the payload is full, the maxElement is pushed to the offset to make room for e. The maxWeight is now what used to be the second-largest weight. The same applies to maxElement.
Elements in the offset are there temporarily. When optimization/replication process kicks off, they are copied down to the subtree and the offset itself is removed. A detailed description of the OTree optimization is found in a separate article.
Why Weight? The OTree READ Protocol
The simple answer is that it allows us to sample the data efficiently.
Aside from the non-empty inner cubes, there’s not that much difference between a QuadTree and an OTree. Except when one begins to examine the OTree’s data distribution closely from top to bottom.
Since e.w < cube.maxWeight is required for writing, it’s natural to deduce that maxWeights increases monotonically from the root to any leaf cube.
Since the weights are randomly generated for each record, and no record redistribution occurs when a node is full, the data contained in each inner cube can statistically represent the data from the cube’s subtree in a degree proportional to the cube size.
Assuming the range of weights randomly generated is from 0 to 1.0, if a cube has a maxWeight of say 0.5, then all the records in a cube should be part of the first half of the dataset.
Considering the above three points, we can build a sampling operator that doesn’t require the I/O of the whole dataset: If we were to read a fraction f from the dataset, we traverse the tree from top to bottom and stop going further in any branch as soon as a cube with a maxWeight ≥ f is encountered.
Sampling scenarios reading different fractions of a dataset with OTree index. On the left, sampling a 20% of the entire dataset means reading the levels of the tree from top the bottom until a cube with a maxWeight ≥ 0.2 is encountered. Same for the image on the right with fraction = 0.3.
The cases above show that reading a fraction f from the dataset requires only accessing the corresponding cubes. A DFS tree traversal is implemented to read the branches and stop as soon as a cube with maxWeight ≥ f is encountered. The cubes that we don’t access generate an equivalent I/O.
To see the features discussed above in action, you can refer to Qbeast-Spark and try it out by yourself. It is our first open-source implementation of the OTree algorithm, and we want to invite you to contribute to this project and accelerate this new change.
Happy Learning.
About Qbeast Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
Data Lakehouse and its enhanced features with Qbeast-Spark.
We all like seeing complex problems being solved by simple and elegant solutions, I can’t verbalize what it is, but the good feeling that comes with it is undeniable. At Qbeast, we want to tackle some of the difficulties in the field of Big Data, and we want to do it by introducing a powerful and open source data format that enables fast approximate and full precision analytics. Sounds good? Before going into the details, let’s take a look at the problem first.
Big Data is often associated with words like diversity and chaos. For diversity, there’s a rich set of things one can do, with a rich set of tools at our disposal. For chaos, one may refer to having to choose one tool or another depending on the characteristics of the task, and the burden of maintenance and coordination of resources to maximize efficiency and minimize cost. Those who work with ETL pipelines know what I’m talking about.
Many of the complexities in the field are attributed to the usage of two-tier systems of Data Lake + Data Warehouse. Raw data are stored in the Lake, some of them get to be preprocessed and stored in a warehouse, where analytical queries are performed against. ETL’s are designed to process and transfer data from one place to another, gluing the system together.
Things don’t have to remain this way; there has been an intent in the industry to improve the situation and combine the best of both worlds into one — the Data Lakehouse.
Qbeast format is the foundation to build blazing fast Data Lakehouses that, aside from bringing ACID properties to the Data Lake, adds features such as multi-dimensional indexing, efficient sampling, and tolerance-bounded approximated queries to reduce data transfer further and accelerate the usage of approximated queries.
If you are a victim of the chaos from the field, and are annoyed when building and maintaining ETL pipelines, then you might find this article helpful. Qbeast format, and its first implementation as a DataSource in Spark, Qbeast Spark, is capable of indexing your datasets using multiple columns and sample them without unnecessary data transfer.
You can take a look at our project in this repository.
Lake + Warehouse
When it comes to data storage in industrial settings, the two main approaches are Data Lakes and Data Warehouses.
With Data Lake, we refer to a storage into which we can dump any raw data and by itself has no purpose other than storage. It’s directly accessible (e.g. Using Spark to access an S3 bucket) and has an object/file API.
Its primary advantage is its robustness and its capability of storing all kinds of bulky information. Data stored in a Data Lake can be
1. structured (tabular data like a CSV file), 2. semi-structured (e.g. JSON), 3. or unstructured (images, audio, video, etc.).
Among its disadvantages we can find the lack of oversight of content (a catalog), no data schema enforcement for data consistency, and no fine-grained data access control. They make data lakes unemployable for OLAP systems because consistent data ingest is not secured, neither is any SQL support.
To compensate for these shortcomings, we set up Data Warehouses to handle the specific needs. They are designed for clean and preprocessed data, extracted and transformed from a Data Lake. Analytical queries can be performed against the consistent snapshots of the warehouse.
The schemas of the data in a Warehouse are optimized according to the characteristics of the queries. ETL pipelines are deployed to transform the incoming data.
Unlike the Lake, a Warehouse has the craved ACID properties and implements features for data skipping such as indexing to make it suitable for many business analytics operations.
The data stored in a Data Lake is generally not directly consumed by end BI users but through a Data Warehouse. Image from: Delta Lake: HighPerformance ACID Table Storage over Cloud Object Stores.
Issues with this two-tier system
The ETL pipelines are in charge of the heavy lifting, gluing the two-tiers system together. For different tasks, different extract, transform, and loading processes are set up, as well as separate Warehouses.
The issue arises when the requirements for the results change, so the corresponding ETL pipelines also have to change, leading to significant maintenance burdens.
In most cases, ETLs are not only required to move data from Lake to Warehouse, but also to move data from their sources to the Lake, to begin with, further complicating the pipeline.
Each ETL step is a potential point of failure to reduce data quality. With increasing data size, the latency of ETLing new data from Lake to Warehouse can lead to an issue known as Data Staleness.
The lack of support for advanced analytics is also an issue — feeding data to ML models for either training or inferencing often require complex non-SQL code that is not optimized either on Lake or Warehouse:
Inefficient access to Warehouses
Open format lake data has no ACID and data indexing for access optimization
Lakehouse, a new architecture
A Lakehouse is designed to unify lake and warehouse, bypassing the need for ETLs while achieving the best of both worlds:
Low-cost, robust, and open cloud object storage from the Lake,
and efficient data management and optimization features from the Warehouse.
The main issue with Data Lake, despite all its advantages, is the fact that failing to provide a built-in data management system leaves users to their own devices to secure atomic transactions, and no write conflicts will occur when multiple users access the same piece of information.
A transactional metadata management layer is placed on top of Data Lakes to enable ACID properties. This management layer is similar to(but not exactly the same as) what a Write-Ahead Log (WAL) does to database systems:
A transaction log is consulted before reading any table content to determine what files are part of the table.
When writing, a new file is added for the written content. The system uses the log to resolve conflicting naming when multiple users are involved.
A similar approach is found in Apache Hive, a warehousing solution built on top of MapReduce and HDFS to support SQL-like queries and reduce the complexity of using Hadoop. Its Metastore is a system catalog used to store metadata about the tables, and is used to manage data.
Unlike Metastore, an application with information stored on a separate RDBMS, the log of a Lakehouse is stored on the same data lake. As shown in the image below, a Lakehouse implementation Delta Lake has the actual content files stored in the parquet format (partitioned by date here). A transaction log(/_delta_log/), containing JSON log files, is found in the same mytable directory.
The existence of a metadata layer also gives room for implementing schema enforcement —any record that violates the predefined schema can be rejected by the layer.
Any query engine capable of accessing the data lake, such as Spark, can be used to leverage to the benefits of Data Lakehouse directly — no need for a heavy system to mediate data access.
Qbeast format
Qbeast-spark is an implementation of Qbeast format in the form of DataSource in Spark.
With the intent of accelerating this new change, we are introducing Qbeast format, an extension of this metadata management layer that brings functionalities such as Multi-column indexing, efficient sampling, and table tolerance to your data.
The role of Qbeast format is to optimize Lakehouse performance by reducing data transfer and reading data more intelligently. Its implementation in Spark as a DataSource is available at Qbeast-spark.
Multi-dimensional index
Indexing a dataset enables data skipping when filtering on the indexed columns in a query. The arrangement of the indexed columns is known to the system and it helps locating the target data quicker. In many cases, the dataset is physically arranged according to the ordering of its index for improved access speed.
Here’s how it works: when using the WHERE clause in a SQL query to define constraints, the query engine first consults the index to locate the data that satisfies the requirements, before proceeding to the further computations. If the target data is only a third of the entire dataset, then only that portion of the data would be accessed and retrieved from the storage.
The usage of an index helps avoid reading the entire dataset, reducing the amount of data transfer involved and speeding up the query. Qbeast Format allows you to index your data on as many columns as you need.
Illustration of queries that target specific data. The dataset is indexed on columns x and y. The two blue rectangles illustrate the range constraints on the indexed columns, only the elements inside these regions.
# Indexing your dataset with Qbeast-spark on columns x, y, and z.(
df
.write
.format('qbeast')
.option('columnsToIndex', 'x,y,z')
.save('./qbeast_table')
)
The foundation of the Qbeast-spark multi-dimensional indexing is the Recursive Space Division algorithm, optimized to incorporate strategic Data Replication to improve performance for read-intensive applications.
Creating statistically representative subsets that are only a fraction in size is desirable in many scenarios. Working with these subsets can increase query speed and reduce compute resource usage because the size of the data is smaller, the resulting query accuracy is kept within reasonable margins if the size reduction isn’t too extreme and random instance selection is ensured.
Take data visualization as an example. Loading and computing results for large datasets in their entirety can sometimes be overkill, especially for graphs such as box plots, histograms, or scatter plots where the purpose is to get a hold on the general trend of the data and not so much about the numerical values.
Why bother spending resources when a sample can do the work with less?
Another scenario where sampling is helpful is to perform approximated queries. The user is willing to speed up query execution by reading fewer data, compromising a reduced degree of accuracy because of the non-crucial nature of the task. For instance, if we were to calculate the average age of the customers in a e-commerce dataset, is the second decimal that important if the goal is to see which group age they belong to?
In these scenarios, one can create and work on a subset instead of using the entire dataset. Up until now, however, no efficient sampling implementation is to be found. No data format provides it and all that one can do is load the entire dataset in memory and randomly select elements to form the subset of the desired size. If we were to read a sample of, say, 10% in size only, it would still invoke the I/O of the whole dataset, for all data has to be in memory to perform the random selection.
The issue can be seen in executing df.sample(fraction=0.1) with any dataframe in Spark and check the details of the query execution in Spark UI. One notices that all data is loaded into memory, and a sample operator is then applied to reduce size.
Qbeast-Spark is a reference implementation of Qbeast format built on Apache Spark and Delta Lake. The Qbeast-spark datasource is capable of exploiting its index and finding the target fraction with minimal I/O. The resulting data is just as representative as that of the vanilla Spark.
Under the hood, the index of the dataset has a tree-like structure, with elements ‘randomly’ placed from top to bottom. Reads start from the root and stop as soon as the target data fraction is obtained.
Table Tolerance
Despite its clear upsides, there is a general resistance against the Sample operator. The main reason behind it is the lack of a clear model to understand the trade-off between the sample size and the loss in query accuracy. For a specific query, how do I know if 60% of data is leading to too much error, or is it enough to satisfy my 5% error tolerance?
Table Tolerance is meant to take this burden away from the users. It’s a functionality empowered by the Qbeast format that exposes the tolerance instead of the sample size to the user, whose job now is to only define how much error is acceptable for their query. Its algorithm computes the results with the minimum sample size that satisfies the constrain..
df.agg(avg('daily_sale')).tolerance(0.1)
In the example here we compute the average aggregation for the ‘daily_sale’ column, with a 10% tolerance. Our algorithm does all the dirty works behind the scenes, computes statistics on the data to find the correct sample size, executes the query and finally returns the result.
For more details on each of the features and the project in general, refer to the Qbeast-Spark repository. We would like to invite you to contribute to this project and accelerate this new change.
Happy Learning.
About Qbeast Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io