Qbeast format — enhanced Data Lakehouse

Pushing Data Lakehouse to a new height.

Jiawei Hu
Data Lakehouse and its enhanced features with Qbeast-Spark.

We all like seeing complex problems being solved by simple and elegant solutions, I can’t verbalize what it is, but the good feeling that comes with it is undeniable. At Qbeast, we want to tackle some of the difficulties in the field of Big Data, and we want to do it by introducing a powerful and open source data format that enables fast approximate and full precision analytics. Sounds good? Before going into the details, let’s take a look at the problem first.

Big Data is often associated with words like diversity and chaos. For diversity, there’s a rich set of things one can do, with a rich set of tools at our disposal. For chaos, one may refer to having to choose one tool or another depending on the characteristics of the task, and the burden of maintenance and coordination of resources to maximize efficiency and minimize cost. Those who work with ETL pipelines know what I’m talking about.

Many of the complexities in the field are attributed to the usage of two-tier systems of Data Lake + Data Warehouse. Raw data are stored in the Lake, some of them get to be preprocessed and stored in a warehouse, where analytical queries are performed against. ETL’s are designed to process and transfer data from one place to another, gluing the system together.

Things don’t have to remain this way; there has been an intent in the industry to improve the situation and combine the best of both worlds into one — the Data Lakehouse.

Qbeast format is the foundation to build blazing fast Data Lakehouses that, aside from bringing ACID properties to the Data Lake, adds features such as multi-dimensional indexing, efficient sampling, and tolerance-bounded approximated queries to reduce data transfer further and accelerate the usage of approximated queries.

If you are a victim of the chaos from the field, and are annoyed when building and maintaining ETL pipelines, then you might find this article helpful. Qbeast format, and its first implementation as a DataSource in Spark, Qbeast Spark, is capable of indexing your datasets using multiple columns and sample them without unnecessary data transfer.

You can take a look at our project in this repository.

Lake + Warehouse

When it comes to data storage in industrial settings, the two main approaches are Data Lakes and Data Warehouses.

With Data Lake, we refer to a storage into which we can dump any raw data and by itself has no purpose other than storage. It’s directly accessible (e.g. Using Spark to access an S3 bucket) and has an object/file API.

Its primary advantage is its robustness and its capability of storing all kinds of bulky information. Data stored in a Data Lake can be

1. structured (tabular data like a CSV file),
2. semi-structured (e.g. JSON),
3. or unstructured (images, audio, video, etc.).

Among its disadvantages we can find the lack of oversight of content (a catalog), no data schema enforcement for data consistency, and no fine-grained data access control. They make data lakes unemployable for OLAP systems because consistent data ingest is not secured, neither is any SQL support.

To compensate for these shortcomings, we set up Data Warehouses to handle the specific needs. They are designed for clean and preprocessed data, extracted and transformed from a Data Lake. Analytical queries can be performed against the consistent snapshots of the warehouse.

The schemas of the data in a Warehouse are optimized according to the characteristics of the queries. ETL pipelines are deployed to transform the incoming data.

Unlike the Lake, a Warehouse has the craved ACID properties and implements features for data skipping such as indexing to make it suitable for many business analytics operations.

The data stored in a Data Lake is generally not directly consumed by end BI users but through a Data Warehouse. Image from: Delta Lake: HighPerformance ACID Table Storage over Cloud Object Stores.

Issues with this two-tier system

The ETL pipelines are in charge of the heavy lifting, gluing the two-tiers system together. For different tasks, different extract, transform, and loading processes are set up, as well as separate Warehouses.

The issue arises when the requirements for the results change, so the corresponding ETL pipelines also have to change, leading to significant maintenance burdens.

In most cases, ETLs are not only required to move data from Lake to Warehouse, but also to move data from their sources to the Lake, to begin with, further complicating the pipeline.

Each ETL step is a potential point of failure to reduce data quality. With increasing data size, the latency of ETLing new data from Lake to Warehouse can lead to an issue known as Data Staleness.

The lack of support for advanced analytics is also an issue — feeding data to ML models for either training or inferencing often require complex non-SQL code that is not optimized either on Lake or Warehouse:

  • Inefficient access to Warehouses
  • Open format lake data has no ACID and data indexing for access optimization

Lakehouse, a new architecture

A Lakehouse is designed to unify lake and warehouse, bypassing the need for ETLs while achieving the best of both worlds:

  • Low-cost, robust, and open cloud object storage from the Lake,
  • and efficient data management and optimization features from the Warehouse.

A metadata management layer is added to the Lake to achieve ACID and other desired properties. Image from: Lakehouse: A New Generation of Open Platforms that Unify Data Warehousing and Advanced Analytics.

The main issue with Data Lake, despite all its advantages, is the fact that failing to provide a built-in data management system leaves users to their own devices to secure atomic transactions, and no write conflicts will occur when multiple users access the same piece of information.

A transactional metadata management layer is placed on top of Data Lakes to enable ACID properties. This management layer is similar to(but not exactly the same as) what a Write-Ahead Log (WAL) does to database systems:

  • A transaction log is consulted before reading any table content to determine what files are part of the table.
  • When writing, a new file is added for the written content. The system uses the log to resolve conflicting naming when multiple users are involved.

A similar approach is found in Apache Hive, a warehousing solution built on top of MapReduce and HDFS to support SQL-like queries and reduce the complexity of using Hadoop. Its Metastore is a system catalog used to store metadata about the tables, and is used to manage data.

Unlike Metastore, an application with information stored on a separate RDBMS, the log of a Lakehouse is stored on the same data lake. As shown in the image below, a Lakehouse implementation Delta Lake has the actual content files stored in the parquet format (partitioned by date here). A transaction log(/_delta_log/), containing JSON log files, is found in the same mytable directory.

Layout of a sample Delta Lake table. Image from Delta Lake: HighPerformance ACID Table Storage over Cloud Object Stores.

The existence of a metadata layer also gives room for implementing schema enforcement —any record that violates the predefined schema can be rejected by the layer.

Any query engine capable of accessing the data lake, such as Spark, can be used to leverage to the benefits of Data Lakehouse directly — no need for a heavy system to mediate data access.

Qbeast format

Qbeast-spark is an implementation of Qbeast format in the form of DataSource in Spark.

With the intent of accelerating this new change, we are introducing Qbeast format, an extension of this metadata management layer that brings functionalities such as Multi-column indexing, efficient sampling, and table tolerance to your data.

The role of Qbeast format is to optimize Lakehouse performance by reducing data transfer and reading data more intelligently. Its implementation in Spark as a DataSource is available at Qbeast-spark.

Multi-dimensional index

Indexing a dataset enables data skipping when filtering on the indexed columns in a query. The arrangement of the indexed columns is known to the system and it helps locating the target data quicker. In many cases, the dataset is physically arranged according to the ordering of its index for improved access speed.

Here’s how it works: when using the WHERE clause in a SQL query to define constraints, the query engine first consults the index to locate the data that satisfies the requirements, before proceeding to the further computations. If the target data is only a third of the entire dataset, then only that portion of the data would be accessed and retrieved from the storage.

The usage of an index helps avoid reading the entire dataset, reducing the amount of data transfer involved and speeding up the query. Qbeast Format allows you to index your data on as many columns as you need.

Illustration of queries that target specific data. The dataset is indexed on columns x and y. The two blue rectangles illustrate the range constraints on the indexed columns, only the elements inside these regions.

# Indexing your dataset with Qbeast-spark on columns x, y, and z.(
df
.write
.format('qbeast')
.option('columnsToIndex', 'x,y,z')
.save('./qbeast_table')
)

The foundation of the Qbeast-spark multi-dimensional indexing is the Recursive Space Division algorithm, optimized to incorporate strategic Data Replication to improve performance for read-intensive applications.

See documentation for more information.

Data Sampling

Creating statistically representative subsets that are only a fraction in size is desirable in many scenarios. Working with these subsets can increase query speed and reduce compute resource usage because the size of the data is smaller, the resulting query accuracy is kept within reasonable margins if the size reduction isn’t too extreme and random instance selection is ensured.

Take data visualization as an example. Loading and computing results for large datasets in their entirety can sometimes be overkill, especially for graphs such as box plots, histograms, or scatter plots where the purpose is to get a hold on the general trend of the data and not so much about the numerical values.

Why bother spending resources when a sample can do the work with less?

Another scenario where sampling is helpful is to perform approximated queries. The user is willing to speed up query execution by reading fewer data, compromising a reduced degree of accuracy because of the non-crucial nature of the task. For instance, if we were to calculate the average age of the customers in a e-commerce dataset, is the second decimal that important if the goal is to see which group age they belong to?

In these scenarios, one can create and work on a subset instead of using the entire dataset. Up until now, however, no efficient sampling implementation is to be found. No data format provides it and all that one can do is load the entire dataset in memory and randomly select elements to form the subset of the desired size. If we were to read a sample of, say, 10% in size only, it would still invoke the I/O of the whole dataset, for all data has to be in memory to perform the random selection.

The issue can be seen in executing df.sample(fraction=0.1) with any dataframe in Spark and check the details of the query execution in Spark UI. One notices that all data is loaded into memory, and a sample operator is then applied to reduce size.

Qbeast-Spark is a reference implementation of Qbeast format built on Apache Spark and Delta Lake. The Qbeast-spark datasource is capable of exploiting its index and finding the target fraction with minimal I/O. The resulting data is just as representative as that of the vanilla Spark.

Under the hood, the index of the dataset has a tree-like structure, with elements ‘randomly’ placed from top to bottom. Reads start from the root and stop as soon as the target data fraction is obtained.

Table Tolerance

Despite its clear upsides, there is a general resistance against the Sample operator. The main reason behind it is the lack of a clear model to understand the trade-off between the sample size and the loss in query accuracy. For a specific query, how do I know if 60% of data is leading to too much error, or is it enough to satisfy my 5% error tolerance?

Table Tolerance is meant to take this burden away from the users. It’s a functionality empowered by the Qbeast format that exposes the tolerance instead of the sample size to the user, whose job now is to only define how much error is acceptable for their query. Its algorithm computes the results with the minimum sample size that satisfies the constrain..

df.agg(avg('daily_sale')).tolerance(0.1)

In the example here we compute the average aggregation for the ‘daily_sale’ column, with a 10% tolerance. Our algorithm does all the dirty works behind the scenes, computes statistics on the data to find the correct sample size, executes the query and finally returns the result.

For more details on each of the features and the project in general, refer to the Qbeast-Spark repository. We would like to invite you to contribute to this project and accelerate this new change.

Happy Learning.

About Qbeast
Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
© 2020 Qbeast. All rights reserved.

Comment

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

© 2020 Qbeast
Design by Xurris