Tag: Spark

Set up Jupyter + Spark on MAC

Migrating from the Linux operative system to a curated MAC-OS could be tricky, especially if you are a developer. In this post, we will address how to set up your computer to use Spark and Jupyter Notebook with the M1 chip.

1. Install Homebrew

Homebrew is The Missing Package Manager for MacOS. Homebrew installs the stuff you need that Apple (or your Linux system) didn’t. It also installs packages to their own directory and then symlinks their files into /usr/local (on macOS Intel).

To install it, just download it through curl and run the installation script:

/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"

Once the process is done, make sure the brew command is added to your PATH by executing these two lines:

# This would add brew command to the PATH every time you open a shell
echo 'eval "$(/opt/homebrew/bin/brew shellenv)"' >> ~/.zprofile
eval "$(/opt/homebrew/bin/brew shellenv)"

2. Install JAVA

Apache Spark uses the JVM to execute the tasks, so we would need a compatible Java Version to run the notebooks with a distributed engine.

You can install Java through brew:

brew install openjdk@8
Getting the right version of Java for MACOS with an M1 chip

If brew installation does not work for your MAC, we recommend using Azul’s Zulu OpenJDK v1.8.

You can find it on the downloads page, by scrolling down to the bottom of the website. Notice the filters that we applied in the link: Java 8, macOS, ARM 64-bit, JDK.

  • Download JDK. You can retrieve either .dmg package, .zip or .tar.gz

wget "https://cdn.azul.com/zulu/bin/zulu8.64.0.19-ca-jdk8.0.345-macosx_aarch64.tar.gz"
tar -xvf zulu8.64.0.19-ca-jdk8.0.345-macosx_aarch64.tar.gz
  • Define JAVA_HOME environment variable as the path to JDK folder in .zprofile
echo "export JAVA_HOME=$PWD/zulu8.64.0.19-ca-jdk8.0.345-macosx_aarch64.tar.gz" >> ~/.zprofile
  • Add JAVA_HOME to PATH
export PATH=$PATH:$JAVA_HOME

3. Install Python and Scala

Jupyter Notebook gives you the possibility of using it with two different languages: Python and Scala. Though the Python console is more widely used, Apache Spark has a Scala API that covers all of the use cases.

To install Scala and Python interpreters, run the following commands with brew:

brew install python && brew install scala

4. Install Pyspark

There are multiple ways of installing and using Spark on a MAC.

  • Install with pip
pip install pyspark

#Or an specific version
pip install pyspark=3.1.2

Using the Hadoop Version

# Available versions are 2.7 and 3.2
PYSPARK_HADOOP_VERSION=2.7 pip install pyspark
  • Install with Conda. Conda is an open-source package management and environmental management system which is a part of the Anaconda  distribution. It is language agnostic and can replace both pip and virtualenv.
conda install pyspark

Note that **PySpark at Conda is not necessarily synced with PySpark** release cycle because it is maintained by the community separately.

To download a specific version (that is available on the Anaconda repositories):

conda install pyspark=3.1.2 -y
  • Download the binaries and set up environment variables.

    If none of the alternatives above is suitable for your distribution, you can always check the Apache Spark download pages, decompress the files, and set up environment variables to run pyspark from the terminal.

# Download version 3.1.2
wget https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
tar -xvf spark-3.1.2-bin-hadoop3.2.tgz

Make sure the environment variables are correctly set up.

cd spark-3.1.2-bin-hadoop3.2
# Configure SPARK_HOME to run shell, SQL, submit with $SPAR_HOME/bin/<command>
export SPARK_HOME=`pwd`
# Configure PYTHONPATH to find the PySpark and Py4J packages under SPARK_HOME/python/lib
export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo "${ZIPS[*]}"):$PYTHONPATH

5. Install Jupyter Labs

  • Install with pip
pip install jupyterlab
  • With conda
conda install jupyterlab
  • With brew
brew install jupyter

6. Start Jupyter Notebook

Once the installation is complete, you can open a notebook in localhost by executing:

jupyter-notebook

The output will look like this:

(base) osopardo1> ~ % jupyter-notebook
[I 2022-10-18 14:21:25.819 LabApp] JupyterLab extension loaded from /Users/trabajo/opt/anaconda3/lib/python3.9/site-packages/jupyterlab
[I 2022-10-18 14:21:25.819 LabApp] JupyterLab application directory is /Users/trabajo/opt/anaconda3/share/jupyter/lab
[I 14:21:25.825 NotebookApp] Serving notebooks from local directory: /Users/trabajo
[I 14:21:25.825 NotebookApp] Jupyter Notebook 6.4.12 is running at:
[I 14:21:25.825 NotebookApp] http://localhost:8888/?token=7d02412ff87bd430c7404114f97cfde63b06e1e0c1a2b2e1
[I 14:21:25.825 NotebookApp]  or http://127.0.0.1:8888/?token=7d02412ff87bd430c7404114f97cfde63b06e1e0c1a2b2e1
[I 14:21:25.825 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 14:21:25.832 NotebookApp] 
    
    To access the notebook, open this file in a browser:
        file:///Users/trabajo/Library/Jupyter/runtime/nbserver-23854-open.html
    Or copy and paste one of these URLs:
        http://localhost:8888/?token=7d02412ff87bd430c7404114f97cfde63b06e1e0c1a2b2e1
     or http://127.0.0.1:8888/?token=7d02412ff87bd430c7404114f97cfde63b06e1e0c1a2b2e1

You can avoid that the browser opens up automatically with the –no-browser option. Copy and paste one of the links that appear in the shell to access the jupyter notebook environment.

Note that all the logs would be printed in the terminal unless you put the process in the background.

7. Play with Pyspark

Create a Notebook and start using Spark in your Data Science Projects!

Jupyter Notebook with Pyspark

About Qbeast
Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
© 2020 Qbeast. All rights reserved.
Share:

Back to menu

Continue reading

Approximate Queries on Data Lakes with Qbeast-Spark

Approximate Queries on Data Lakes with Qbeast-Spark

Jiawei Hu

Writing procedural programs to analyze data often comes less handy than a declarative approach, for one has to define the exact control flow of the program rather than simply stating the desired outcome. For example, it takes a lot more to write a simple groupby using Python than with SQL.

Other than this apparent inconvenience, there’s also a potential performance drop-off, given that procedural codes are treated as black boxes when it comes to automatic code optimization. On the other hand, the structure of SQL queries are transparent to the underlying query optimizer, and both rule and cost-based optimizations can be applied to improve performance.

This was one of the main issues Apache Spark SQL set out to tackle. A declarative DataFrame API was introduced to integrate relational processing with procedural Spark code, and an extensible query optimizer was implemented to enable query optimization.

In a nutshell, the programs written in procedural languages can leverage the DataFrame API to write declarative code. Before its execution, the Catalyst takes the program for optimization and converts it into underlying Spark RDD procedural code.

This optimizer that sits in the middle does the job of parsing, optimizing, and code generation, so the query is executed with maximum efficiency.

Last time we discussed the bits and pieces of a scalable multi-column indexing technology — OTree algorithm, and how its peculiar inner workings gave us the ability to sample indexed tables with minimum data transfer. This article will discuss how the Efficient Sampling we saw is materialized in Qbeast-Spark — extending Spark SQL query optimizer, Catalyst.

A given SQL query is first parsed during the optimization process to construct an Abstract Syntax Tree, a canonical form of representing queries. This structure is then analyzed from top to bottom, and optimization rules are applied to minimize the data transfer and compute resource consumption.

SELECT
A.name
FROM
A JOIN B ON A.id = B.id
WHERE
CONDITION_ON_A AND CONDITION_ON_B

Taking the above query as an example, a possible syntax tree can be shown in the following image:

  1. Two tables, A and B, are joined.
  2. On the resulting table, we filter the rows by applying conditions cond_A and cond_B.
  3. We retrieve the name column as output.

Optimization rules such as predicate pushdown, constant folding, and projection pruning are applied to improve performance while keeping the query result intact. If we were to apply some of the rules to our tree, a possible output could look like the following.

Cond_A and cond_B are pushed down to the data source to reduce I/O; with the help of metadata, data files with values out of the desired range are discarded before reading. This results in fewer I/O and fewer compute resource requirements for the subsequent JOIN operation.

Hands-on

To demonstrate how easy it is to extend the Catalyst, we will implement a custom Spark Session Extension by adding a new optimization rule that modifies the behavior of the Sample operator. More details here.

As shown in the code snippet below, we implement a custom optimization rule, MyOptimizationRule, that takes a Sample operator with an upperBound of 0.1 and changes it to 0.2.

Once done, we need to add this rule to the Logical Plan Optimization Pipeline by injecting the rule into the SparkSessionExtensions.

To see the behavior in action, open spark-shell with the proper configurations and check the Sample query plan by running df.sample(0.1).explain(true).

As shown below, before the optimization, the query plan had upperBound=0.1(see Parsed Logical Plan and Analyzed Logical Plan); this is converted to 0.2 by our MyOptimizationRule class, as shown in Optimized Logical Plan and Physical Plan.

Qbeast Efficient Sampling

In the last article, we’ve seen that the WRITE protocol from OTree can arrange data in a hierarchical structure that, when traversing the tree from top to bottom, one is to encounter cubes with data that are part of an increasing fraction of the dataset.

Take the index tree from the above image as an example; a cube with maxWeight = 0.2 implies that all data contained are part of the first 20% of the dataset.

maxWeight increases monotonically from root to leaf in any branch. And since weights are generated randomly, the WRITE protocol ensures that the data contained in a given cube can be treated as a representation of its subtree.

With all this information in mind, it is possible to implement an efficient Sample operator that, by accessing targeted cubes according to the provided sample fraction, can generate statistically representative subsets with minimum I/O.

When reading a given sample fraction from the source, we start traversing the tree from the root following the path defined by all branches in the DFS fashion. Whenever a maxWeight ≥ fraction is encountered, we read its content, stop going further, and jump to the next branch. In this way, we collect data from all those cubes with maxWeight ≤ f.

Qbeast-spark, in essence, does this behind the scenes by injecting optimization rules through session extensions. When we capture a Sample operator on top of a QbeastDataSource, our rule transforms it into a set of filters applied on the indexed columns and the maxWeight. Spark SQL later further optimizes these filters to select the target cubes, resulting in an optimized query plan.

The image above shows the query plan of a sample operation on a QbeastDataSource. We can appreciate what initially is a Sample operator is converted into a Filter during Rule-based Optimization. The sample fraction is used to define a cube maxWeight for filtering.

This filter is used during FileScan to select targeting cubes and applied later on individual rows to further reduce the data size.

The I/O saving can be observed in Spark UI of the corresponding SQL query. The entire dataset is 254,975 rows, and thanks to the Qbeast Sampling, only 100,038 rows are retrieved from the secondary storage. The filters are reused to reduce the row number to match the sample fraction.

To our knowledge, the same operation done in any other data format would trigger 100% I/O. Below is the query plan for sample(0.3) on the same dataset written in Apache Parquet format. It reads all the files from the dataset, invoking a complete I/O, reflected on the size of files read and the number of output rows from Scan parquet.

This ability of Sampling data efficiently paves the way for approximated queries where the accuracy of the query outcome is secondary to the query execution speed. For example, the user can be flexible on query output because the last few decimals of aggregation aren’t as important as having the results quickly.

It is in these scenarios where Efficient Data Sampling is crucial.


If I have sparked an interest in knowing more about Qbeast-Spark in you, do not hesitate to take a look at the GitHub repo and try it out by yourself.

Happy Learning.

About Qbeast
Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
© 2020 Qbeast. All rights reserved.
Share:

Back to menu

Continue reading

Read from public S3 bucket with Spark

S3 Hadoop Compatibility

Trying to read from public Amazon S3 object storage with Spark can cause many errors related to Hadoop versions.

Here are some tips to configure your spark application.

Spark Configuration

To read the S3 public bucket, you need to start a spark-shell with version 3.1.1 or superior and Hadoop dependencies of 3.2.

If you have to update the binaries to a compatible version to use this feature, follow these steps:

  • Download spark tar from the repository
$ > wget https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
  • Decompress the files
$ > tar xzvf spark-3.1.1-bin-hadoop3.2.tgz
  • Update the SPARK_HOME environment variable
$ > export SPARK_HOME=$PWD/spark-3.1.1-bin-hadoop3.2

Once you have your spark ready to execute, the following configuration must be used:

$ > $SPARK_HOME/bin/spark-shell \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider \ 
--packages com.amazonaws:aws-java-sdk:1.12.20,\
		org.apache.hadoop:hadoop-common:3.2.0,\
    org.apache.hadoop:hadoop-client:3.2.0,\
    org.apache.hadoop:hadoop-aws:3.2.0

The  org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider  provides Anonymous credentials in order to access the public S3.

And to read the file:

val df = spark
.read
.format("parquet")
.load("s3a://qbeast-public-datasets/store_sales")

Summary

There’s no known working version of Hadoop 2.7 for AWS S3. However, you can try to use it. If you do so, remember to include the following option:

--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
About Qbeast
Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
© 2020 Qbeast. All rights reserved.
Share:

Back to menu

Continue reading

Join our newsletter

Subscribe to our newsletter to receive product announcements.

© 2020 Qbeast
Design by Xurris