Author: Paola Pardo

How to configure your Spark application in an Amazon EMR Notebook

On the last pill we learned how to set up Spark and Jupyter Notebook on your MAC-OS. Now it’s time to level up and configure your Spark application on Amazon EMR

Apache Spark

Apache Spark is distributed computing framework for executing data workloads across multiple nodes in a cluster. It was released in 2014, 8 years ago, and the last stable version available is 3.3.1.

Spark can run in multiple languages such as Scala, Java, SQL and Python. It is mainly used for ETL (Extract Transform and Load) pipelines in which large amounts of data need to be processed at a time, however, it also covers Streaming applications with the Spark Structured Streaming module.

The difference with traditional MapReduce was that no transformation (map) is computed until the action (reduce) is called. Its architectural foundation is the RDD or Resilient Distributed Dataset: a read-only multiset of data items distributed over a cluster of machines, that is maintained in a fault-tolerant way.

The DataFrame API was released on Spark 2.x as an abstraction of the RDD, along with a new library that gives the possibility of querying data in SQL language.

Spark Session

The SparkSession is the main entry point to the Spark environment. As the name suggests, it is a representation of the active Session of the cluster.

To create a basic SparkSession, just use SparkSession.builder()

frompyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Once you have the instance of a SparkSession, you can start reading datasets and transforming them into visualizations or Pandas DataFrames.

df = spark.read.json("examples/src/main/resources/people.json")

Spark Configuration

Spark API lets you configure the environment for each unique session. How many resources do you want for the application (memory, CPUs…)? Do you need to load extra packages from the Maven Repository? Which are the authentication keys to access the S3 bucket?

A subset of basic Spark configurations is listed below:

Parameters Description Values
jars Jars to be used in the session List of string
pyFiles Python files to be used in the session List of string
files Files to be used in the session List of string
driverMemory Amount of memory to be used for the driver process string
driverCores Number of cores to be used for the driver process int
executorMemory Amount of memory to be used for the executor process string
executorCores Number of cores to be used for the executor process int
numExecutors Number of executors to be launched for the session int
archives Archives to be used in the session List of string
queue Name of the YARN queue string
name Name of the session (name must be in lower case) string

Those configurations are added as a <key, value> pair on the SparkConf class. To parse correctly the Map, all the spark keys should start with spark.

For a more detailed list, please check the Spark Documentation.

Amazon EMR

Amazon EMR is a cloud solution for big-data processing, interactive analytics and machine learning. It provides the infrastructure and the management to host an Apache Spark cluster with Apache Hive and Presto.

It’s elasticity enables you to quickly deploy and provision the capacity of your cluster, and it’s designed to reduce the cost of processing large amounts of data with Amazon EC2 Spot Integration

Jupyter Notebook with Spark settings on EMR

An Amazon EMR notebook is a serverless Jupyter notebook. It uses the Sparkmagic kernel as a client to execute the code through an Apache Livy server.

Sparkmagic

The Sparkmagic project includes a set of commands for interactively running Spark code in multiple languages, as well as some kernels that you can use to turn Jupyter into an integrated Spark environment.

Use %%configure to add the required configuration before you run your first spark-bound code cell and avoid trouble with the cluster-wide spark configurations.:

%%configure -f
{"executorMemory":"4G"}

If you want to add more specific configurations that goes with —conf command, use a nested JSON object:

%%configure -f
{ "conf":  { "spark.dynamicAllocation.enabled":"false",
					 "spark.jars.packages": "io.qbeast:qbeast-spark_2.12:0.2.0,io.delta:delta-core_2.12:1.0.0",
           "spark.sql.extensions": "io.qbeast.spark.internal.QbeastSparkSessionExtension"} }
Check if the configuration is correct by executing:
%%info

On the server side, check the /var/log/livy/livy-livy-server.out log on the EMR cluster.

20/06/24 10:11:22 INFO InteractiveSession$: Creating Interactive session 2: [owner: null, request: [kind: pyspark, proxyUser: None, executorMemory: 4G, conf: spark.dynamicAllocation.enabled -> false, spark.jars.packages -> io.qbeast:qbeast-spark_2.12:0.2.0,io.delta:delta-core_2.12:1.0.0, spark.sql.extensions -> io.qbeast.spark.internal.QbeastSparkSessionExtension, heartbeatTimeoutInSecond: 0]]

In this article, we’ve seen the main session components of Apache Spark and how to configure Jupyter-Spark Applications to run in an EMR cluster. In the next chapters, we will get more hands-on and try some Spark Examples.

If you find this post helpful, don’t hesitate on sharing it and tagging us on any social media!

Have a great day 🙂

About Qbeast
Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
© 2020 Qbeast. All rights reserved.
Share:

Back to menu

Continue reading

Set up Jupyter + Spark on MAC

Migrating from the Linux operative system to a curated MAC-OS could be tricky, especially if you are a developer. In this post, we will address how to set up your computer to use Spark and Jupyter Notebook with the M1 chip.

1. Install Homebrew

Homebrew is The Missing Package Manager for MacOS. Homebrew installs the stuff you need that Apple (or your Linux system) didn’t. It also installs packages to their own directory and then symlinks their files into /usr/local (on macOS Intel).

To install it, just download it through curl and run the installation script:

/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"

Once the process is done, make sure the brew command is added to your PATH by executing these two lines:

# This would add brew command to the PATH every time you open a shell
echo 'eval "$(/opt/homebrew/bin/brew shellenv)"' >> ~/.zprofile
eval "$(/opt/homebrew/bin/brew shellenv)"

2. Install JAVA

Apache Spark uses the JVM to execute the tasks, so we would need a compatible Java Version to run the notebooks with a distributed engine.

You can install Java through brew:

brew install openjdk@8
Getting the right version of Java for MACOS with an M1 chip

If brew installation does not work for your MAC, we recommend using Azul’s Zulu OpenJDK v1.8.

You can find it on the downloads page, by scrolling down to the bottom of the website. Notice the filters that we applied in the link: Java 8, macOS, ARM 64-bit, JDK.

  • Download JDK. You can retrieve either .dmg package, .zip or .tar.gz

wget "https://cdn.azul.com/zulu/bin/zulu8.64.0.19-ca-jdk8.0.345-macosx_aarch64.tar.gz"
tar -xvf zulu8.64.0.19-ca-jdk8.0.345-macosx_aarch64.tar.gz
  • Define JAVA_HOME environment variable as the path to JDK folder in .zprofile
echo "export JAVA_HOME=$PWD/zulu8.64.0.19-ca-jdk8.0.345-macosx_aarch64.tar.gz" >> ~/.zprofile
  • Add JAVA_HOME to PATH
export PATH=$PATH:$JAVA_HOME

3. Install Python and Scala

Jupyter Notebook gives you the possibility of using it with two different languages: Python and Scala. Though the Python console is more widely used, Apache Spark has a Scala API that covers all of the use cases.

To install Scala and Python interpreters, run the following commands with brew:

brew install python && brew install scala

4. Install Pyspark

There are multiple ways of installing and using Spark on a MAC.

  • Install with pip
pip install pyspark

#Or an specific version
pip install pyspark=3.1.2

Using the Hadoop Version

# Available versions are 2.7 and 3.2
PYSPARK_HADOOP_VERSION=2.7 pip install pyspark
  • Install with Conda. Conda is an open-source package management and environmental management system which is a part of the Anaconda  distribution. It is language agnostic and can replace both pip and virtualenv.
conda install pyspark

Note that **PySpark at Conda is not necessarily synced with PySpark** release cycle because it is maintained by the community separately.

To download a specific version (that is available on the Anaconda repositories):

conda install pyspark=3.1.2 -y
  • Download the binaries and set up environment variables.

    If none of the alternatives above is suitable for your distribution, you can always check the Apache Spark download pages, decompress the files, and set up environment variables to run pyspark from the terminal.

# Download version 3.1.2
wget https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
tar -xvf spark-3.1.2-bin-hadoop3.2.tgz

Make sure the environment variables are correctly set up.

cd spark-3.1.2-bin-hadoop3.2
# Configure SPARK_HOME to run shell, SQL, submit with $SPAR_HOME/bin/<command>
export SPARK_HOME=`pwd`
# Configure PYTHONPATH to find the PySpark and Py4J packages under SPARK_HOME/python/lib
export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo "${ZIPS[*]}"):$PYTHONPATH

5. Install Jupyter Labs

  • Install with pip
pip install jupyterlab
  • With conda
conda install jupyterlab
  • With brew
brew install jupyter

6. Start Jupyter Notebook

Once the installation is complete, you can open a notebook in localhost by executing:

jupyter-notebook

The output will look like this:

(base) osopardo1> ~ % jupyter-notebook
[I 2022-10-18 14:21:25.819 LabApp] JupyterLab extension loaded from /Users/trabajo/opt/anaconda3/lib/python3.9/site-packages/jupyterlab
[I 2022-10-18 14:21:25.819 LabApp] JupyterLab application directory is /Users/trabajo/opt/anaconda3/share/jupyter/lab
[I 14:21:25.825 NotebookApp] Serving notebooks from local directory: /Users/trabajo
[I 14:21:25.825 NotebookApp] Jupyter Notebook 6.4.12 is running at:
[I 14:21:25.825 NotebookApp] http://localhost:8888/?token=7d02412ff87bd430c7404114f97cfde63b06e1e0c1a2b2e1
[I 14:21:25.825 NotebookApp]  or http://127.0.0.1:8888/?token=7d02412ff87bd430c7404114f97cfde63b06e1e0c1a2b2e1
[I 14:21:25.825 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 14:21:25.832 NotebookApp] 
    
    To access the notebook, open this file in a browser:
        file:///Users/trabajo/Library/Jupyter/runtime/nbserver-23854-open.html
    Or copy and paste one of these URLs:
        http://localhost:8888/?token=7d02412ff87bd430c7404114f97cfde63b06e1e0c1a2b2e1
     or http://127.0.0.1:8888/?token=7d02412ff87bd430c7404114f97cfde63b06e1e0c1a2b2e1

You can avoid that the browser opens up automatically with the –no-browser option. Copy and paste one of the links that appear in the shell to access the jupyter notebook environment.

Note that all the logs would be printed in the terminal unless you put the process in the background.

7. Play with Pyspark

Create a Notebook and start using Spark in your Data Science Projects!

Jupyter Notebook with Pyspark

About Qbeast
Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
© 2020 Qbeast. All rights reserved.
Share:

Back to menu

Continue reading

Scala Test Dive-in: Public, Private and Protected methods

We all know that testing code can be done in different ways. This pill is not to explain which is the best way to see if your Scala project is working as it should. But it will provide some tips and tricks for testing public, private, and protected methods.

Public Methods

Public methods are the functions inside a class, that can be called from outside, through the instantiated object. Public method testing is no rocket science. In Scala, the use of Matchers and Clues is needed in order to understand what is wrong.

Imagine we want to test a MathUtils class that has simple methods min and max:

class MathUtils {
  def min(x: Int, y: Int): Int = if (x <= y) x else y

  def max(x: Int, y: Int): Int = if (x >= y) x else y

}

This is how your test should look like:

import org.scalatest.AppendedClues.convertToClueful
import org.scalatest.matchers.should.Matchers
import org.scalatest.flatspec.AnyFlatSpec


class MathUtilsTest extends AnyFlatSpec with Matchers {

  "MathUtils" should "compute min correctly" in {
    val min = 10
    val max = 20
		val mathUtils = new MathUtils()
    mathUtils.min(min, max) shouldBe min withClue s"Min is not $min"
  }

  it should "compute max correctly" in {
    val min = 10
    val max = 20
		val mathUtils = new MathUtils()
    mathUtils.max(min, max) shouldBe max withClue s"Max is not $max"
  }
}

Private Methods

Private methods are the methods that cannot be accessed in any other class than the one in which they are declared.

Testing these functions is way more tricky. You have different ways of proceeding: copy and paste the implementation in a test class (which is out of the table), use Mockito, or try with PrivateMethodTester.

Let’s write a private method on the class MathUtils:

class MathUtils {

  def min(x: Int, y: Int): Int = if (x <= y) x else y

  def max(x: Int, y: Int): Int = if (x >= y) x else y

  private def sum(x: Int, y: Int): Int = {
    x + y
  }

  def sum(x: Int, y: Int, z: Int): Int = {
    val aux = sum(x, y)
    sum(aux, z)
  }

}

PrivateMethodTester is a trait that facilitates the testing of private methods. You have to mix it in your test class in order to take advantage of it.


import org.scalatest.AppendedClues.convertToClueful
import org.scalatest.matchers.should.Matchers
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.PrivateMethodTester

class MathUtilsPrivateTest extends AnyFlatSpec with Matchers with PrivateMethodTester {

  "MathUtils" should "compute sum correctly" in {
  
    val x = 1
    val y = 2

    val mathUtils = new MathUtils()
		val sumPrivateMethod = PrivateMethod[Int]('sum)
    val privateSum = mathUtils invokePrivate sumPrivateMethod(1, 2)
    privateSum shouldBe (x + y) withClue s"Sum is not is not ${x + y}"
  }
}

In val sumPrivateMethod = PrivateMethod[Int]('sum) we have different parts:

  • [Int] is the return type of the method
  • (’sum) is the name of the method to call

In mathUtils invokePrivate sumPrivateMethod(x, y) you can collect the result in a val to compare and understand if it’s working properly. You need to use an instance of the class/object to invoke the method, otherwise, it will not find it.

Protected Methods

A protected method is like a private method in that it can only be invoked from within the implementation of a class or its subclasses.

For example we decide to make sum method protected instead of private. Class MathUtils would look like this:

class MathUtils {
  def min(x: Int, y: Int): Int = if (x <= y) x else y

  def max(x: Int, y: Int): Int = if (x >= y) x else y

  protected def sum(x: Int, y: Int): Int = x + y

}

If we create a new object from MathUtils and try to call the sum method, it will throw a warning saying that ‘sum is not accessible from this place’

But don’t worry, we have a solution for that as well.

We can write a subclass specific for this test and override the method since it can be invoked through the implementation of its subclasses.


class MathUtilsTestClass extends MathUtils {
  override def sum(x: Int, y: Int): Int = super.sum(x, y)
}

class MathUtilsProtectedTest extends AnyFlatSpec with Matchers {
  "MathUtils" should "compute sum correctly" in {
    val x = 1
    val y = 2
    val mathUtilsProtected = new MathUtilsTestClass()
    mathUtilsProtected.sum(x, y) shouldBe (x + y) withClue s"Sum is not is not ${x + y}"
  }

}

Summary

Now you can test the different types of methods in your Scala project: public, private, and protected. For more information about Scala, functional programming, and style, feel free to ask us or check out our other pills!

About Qbeast
Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
© 2020 Qbeast. All rights reserved.
Share:

Back to menu

Continue reading

Read from public S3 bucket with Spark

S3 Hadoop Compatibility

Trying to read from public Amazon S3 object storage with Spark can cause many errors related to Hadoop versions.

Here are some tips to configure your spark application.

Spark Configuration

To read the S3 public bucket, you need to start a spark-shell with version 3.1.1 or superior and Hadoop dependencies of 3.2.

If you have to update the binaries to a compatible version to use this feature, follow these steps:

  • Download spark tar from the repository
$ > wget https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
  • Decompress the files
$ > tar xzvf spark-3.1.1-bin-hadoop3.2.tgz
  • Update the SPARK_HOME environment variable
$ > export SPARK_HOME=$PWD/spark-3.1.1-bin-hadoop3.2

Once you have your spark ready to execute, the following configuration must be used:

$ > $SPARK_HOME/bin/spark-shell \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider \ 
--packages com.amazonaws:aws-java-sdk:1.12.20,\
		org.apache.hadoop:hadoop-common:3.2.0,\
    org.apache.hadoop:hadoop-client:3.2.0,\
    org.apache.hadoop:hadoop-aws:3.2.0

The  org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider  provides Anonymous credentials in order to access the public S3.

And to read the file:

val df = spark
.read
.format("parquet")
.load("s3a://qbeast-public-datasets/store_sales")

Summary

There’s no known working version of Hadoop 2.7 for AWS S3. However, you can try to use it. If you do so, remember to include the following option:

--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
About Qbeast
Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
© 2020 Qbeast. All rights reserved.
Share:

Back to menu

Continue reading

Code Formatting with Scalafmt

Whether you are starting a Scala project or collaborating in one, here, you have a guide to know the most used frameworks for improving the code style.

Scalastyle and Scalafmt

Scalastyle is a handy tool for coding style in Scala, similar to what Checkstyle does in Java. Scalafmt formats code to look consistent between people on your team, and it is perfectly integrated into your toolchain.

Installation

For the installation, you need to add the following to the plugins.sbt file under the project folder.

addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2") 
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")

This will create a Scalastyle configuration under scalastyle_config.xml. And a file .scalafmt.conf where you can write rules to maintain consistency across the project.

For example:

# This style is copied from
# <https://github.com/apache/spark/blob/master/dev/.scalafmt.conf> version = "2.7.5"
align = none
align.openParenDefnSite = false
align.openParenCallSite = false
align.tokens = [] 
optIn = { 
  configStyleArguments = false 
} 
danglingParentheses = false 
docstrings = JavaDoc 
maxColumn = 98 
newlines.topLevelStatements = [before,after]

Quickstart

When opening a project that contains a .scalafmt.conf file, you will be prompted to use it:

Choose the scalafmt formatter, and it will be used at compile-time for formatting files.

However, you can check it manually with:

sbt scalastyle

Another exciting feature is that you can configure your IDE to reformat at saving:

Alternatively, force code formatting:

sbt scalafmt # Format main sources 

sbt test:scalafmt # Format test sources 

sbt scalafmtCheck # Check if the scala sources under the project have been formatted 

sbt scalafmtSbt # Format *.sbt and project /*.scala files 

sbt scalafmtSbtCheck # Check if the files have been formatted by scalafmtSbt

More tricks

Scaladocs

Sbt also checks the format of the Scala docs when publishing the artifacts. The following command will check and generate the Scaladocs:

sbt doc

Header Creation

Sometimes a header must be present in all files. You can do so by using this plugin: https://github.com/sbt/sbt-header

First, add it in the plugins.sbt:

addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.6.0")

Include the header you want to show in your build.sbt

headerLicense := Some(HeaderLicense.Custom("Copyright 2021 Qbeast Pills"))

And use it in compile time with:

Compile / compile := (Compile / compile).dependsOn(Compile / headerCheck).value

To automatize the creation of headers in all files, execute:

sbt headerCreate

Using println

Scalafmt has strong policies on print information. And we all debug like this now and then.

The quick solution is to wrap your code:

// scalastyle:off println
<your beautiful piece of code>
// scalastyle:on println

But make sure you delete these comments before pushing any commits 😉

About Qbeast
Qbeast is here to simplify the lives of the Data Engineers and make Data Scientists more agile with fast queries and interactive visualizations. For more information, visit qbeast.io
© 2020 Qbeast. All rights reserved.
Share:

Back to menu

Continue reading

© 2020 Qbeast
Design by Xurris