Tag: Spark

Approximate Queries on Data Lakes with Qbeast-Spark

Approximate Queries on Data Lakes with Qbeast-Spark

Jiawei Hu

Writing procedural programs to analyze data often comes less handy than a declarative approach, for one has to define the exact control flow of the program rather than simply stating the desired outcome. For example, it takes a lot more to write a simple groupby using Python than with SQL.

Other than this apparent inconvenience, there’s also a potential performance drop-off, given that procedural codes are treated as black boxes when it comes to automatic code optimization. On the other hand, the structure of SQL queries are transparent to the underlying query optimizer, and both rule and cost-based optimizations can be applied to improve performance.

This was one of the main issues Apache Spark SQL set out to tackle. A declarative DataFrame API was introduced to integrate relational processing with procedural Spark code, and an extensible query optimizer was implemented to enable query optimization.

In a nutshell, the programs written in procedural languages can leverage the DataFrame API to write declarative code. Before its execution, the Catalyst takes the program for optimization and converts it into underlying Spark RDD procedural code.

This optimizer that sits in the middle does the job of parsing, optimizing, and code generation, so the query is executed with maximum efficiency.

Last time we discussed the bits and pieces of a scalable multi-column indexing technology — OTree algorithm, and how its peculiar inner workings gave us the ability to sample indexed tables with minimum data transfer. This article will discuss how the Efficient Sampling we saw is materialized in Qbeast-Spark — extending Spark SQL query optimizer, Catalyst.

A given SQL query is first parsed during the optimization process to construct an Abstract Syntax Tree, a canonical form of representing queries. This structure is then analyzed from top to bottom, and optimization rules are applied to minimize the data transfer and compute resource consumption.

SELECT
A.name
FROM
A JOIN B ON A.id = B.id
WHERE
CONDITION_ON_A AND CONDITION_ON_B

Taking the above query as an example, a possible syntax tree can be shown in the following image:

  1. Two tables, A and B, are joined.
  2. On the resulting table, we filter the rows by applying conditions cond_A and cond_B.
  3. We retrieve the name column as output.

Optimization rules such as predicate pushdown, constant folding, and projection pruning are applied to improve performance while keeping the query result intact. If we were to apply some of the rules to our tree, a possible output could look like the following.

Cond_A and cond_B are pushed down to the data source to reduce I/O; with the help of metadata, data files with values out of the desired range are discarded before reading. This results in fewer I/O and fewer compute resource requirements for the subsequent JOIN operation.

Hands-on

To demonstrate how easy it is to extend the Catalyst, we will implement a custom Spark Session Extension by adding a new optimization rule that modifies the behavior of the Sample operator. More details here.

As shown in the code snippet below, we implement a custom optimization rule, MyOptimizationRule, that takes a Sample operator with an upperBound of 0.1 and changes it to 0.2.

Once done, we need to add this rule to the Logical Plan Optimization Pipeline by injecting the rule into the SparkSessionExtensions.

To see the behavior in action, open spark-shell with the proper configurations and check the Sample query plan by running df.sample(0.1).explain(true).

As shown below, before the optimization, the query plan had upperBound=0.1(see Parsed Logical Plan and Analyzed Logical Plan); this is converted to 0.2 by our MyOptimizationRule class, as shown in Optimized Logical Plan and Physical Plan.

Qbeast Efficient Sampling

In the last article, we’ve seen that the WRITE protocol from OTree can arrange data in a hierarchical structure that, when traversing the tree from top to bottom, one is to encounter cubes with data that are part of an increasing fraction of the dataset.

Take the index tree from the above image as an example; a cube with maxWeight = 0.2 implies that all data contained are part of the first 20% of the dataset.

maxWeight increases monotonically from root to leaf in any branch. And since weights are generated randomly, the WRITE protocol ensures that the data contained in a given cube can be treated as a representation of its subtree.

With all this information in mind, it is possible to implement an efficient Sample operator that, by accessing targeted cubes according to the provided sample fraction, can generate statistically representative subsets with minimum I/O.

When reading a given sample fraction from the source, we start traversing the tree from the root following the path defined by all branches in the DFS fashion. Whenever a maxWeight ≥ fraction is encountered, we read its content, stop going further, and jump to the next branch. In this way, we collect data from all those cubes with maxWeight ≤ f.

Qbeast-spark, in essence, does this behind the scenes by injecting optimization rules through session extensions. When we capture a Sample operator on top of a QbeastDataSource, our rule transforms it into a set of filters applied on the indexed columns and the maxWeight. Spark SQL later further optimizes these filters to select the target cubes, resulting in an optimized query plan.

The image above shows the query plan of a sample operation on a QbeastDataSource. We can appreciate what initially is a Sample operator is converted into a Filter during Rule-based Optimization. The sample fraction is used to define a cube maxWeight for filtering.

This filter is used during FileScan to select targeting cubes and applied later on individual rows to further reduce the data size.

The I/O saving can be observed in Spark UI of the corresponding SQL query. The entire dataset is 254,975 rows, and thanks to the Qbeast Sampling, only 100,038 rows are retrieved from the secondary storage. The filters are reused to reduce the row number to match the sample fraction.

To our knowledge, the same operation done in any other data format would trigger 100% I/O. Below is the query plan for sample(0.3) on the same dataset written in Apache Parquet format. It reads all the files from the dataset, invoking a complete I/O, reflected on the size of files read and the number of output rows from Scan parquet.

This ability of Sampling data efficiently paves the way for approximated queries where the accuracy of the query outcome is secondary to the query execution speed. For example, the user can be flexible on query output because the last few decimals of aggregation aren’t as important as having the results quickly.

It is in these scenarios where Efficient Data Sampling is crucial.


If I have sparked an interest in knowing more about Qbeast-Spark in you, do not hesitate to take a look at the GitHub repo and try it out by yourself.

Happy Learning.

About Qbeast
Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
© 2020 Qbeast. All rights reserved.
Share:

Back to menu

Continue reading

Read from public S3 bucket with Spark

S3 Hadoop Compatibility

Trying to read from public Amazon S3 object storage with Spark can cause many errors related to Hadoop versions.

Here are some tips to configure your spark application.

Spark Configuration

To read the S3 public bucket, you need to start a spark-shell with version 3.1.1 or superior and Hadoop dependencies of 3.2.

If you have to update the binaries to a compatible version to use this feature, follow these steps:

  • Download spark tar from the repository
$ > wget https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
  • Decompress the files
$ > tar xzvf spark-3.1.1-bin-hadoop3.2.tgz
  • Update the SPARK_HOME environment variable
$ > export SPARK_HOME=$PWD/spark-3.1.1-bin-hadoop3.2

Once you have your spark ready to execute, the following configuration must be used:

$ > $SPARK_HOME/bin/spark-shell \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider \ 
--packages com.amazonaws:aws-java-sdk:1.12.20,\
		org.apache.hadoop:hadoop-common:3.2.0,\
    org.apache.hadoop:hadoop-client:3.2.0,\
    org.apache.hadoop:hadoop-aws:3.2.0

The  org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider  provides Anonymous credentials in order to access the public S3.

And to read the file:

val df = spark
.read
.format("parquet")
.load("s3a://qbeast-public-datasets/store_sales")

Summary

There’s no known working version of Hadoop 2.7 for AWS S3. However, you can try to use it. If you do so, remember to include the following option:

--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
About Qbeast
Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
© 2020 Qbeast. All rights reserved.
Share:

Back to menu

Continue reading

Contact us info@qbeast.io

C/ Roc Boronat 117, 2a Planta, 08018 Barcelona

© 2020 Qbeast
Design by Xurris