RasterFrames is built on Apache Spark, which is an open source, distributed computing engine for processing large amounts of data. A RasterFrame is a Spark DataFrame that contains imagery.
In this article, we discuss how to tune and configure Spark for your analysis. We give a brief overview of how Spark works, show how to interpret the Spark UI to monitor your job, talk about specific functions that will help you tune your Spark jobs, and show how to set parameters to configure your Spark instance.
How Spark Works
Apache Spark uses a driver/worker architecture. The driver is the central coordinator that communicates with a potentially large number of distributed workers, called executors, through the cluster manager. The cluster manager is responsible for allocating resources across executors. The executors are responsible for executing a task sent to it from the driver.
Source: https://runawayhorse001.github.io/LearningApacheSpark/introduction.html
Image Source: https://spark.apache.org/docs/0.9.1/cluster-overview.html
Action Versus Transformation
Spark functions are divided into 2 categories: actions and transformations. Transformations are functions that transform your data in some way. They are lazy in nature, which means they don't execute until an action is called. Some transformation functions include:
map()
filter()
join()
groupBy()
repartition()
coalesce()
union()
distinct()
Actions trigger the execution of a series of transformations on the actual data. They allow you to view the results of your transformations. Some action functions include:
count()
collect()
take()
Source: https://data-flair.training/blogs/spark-rdd-operations-transformations-actions/
Spark Jargon
- Job: A piece of code which reads some input, performs some computation on the data, and writes some output data.
- Stage: Jobs are divided into stages. A stage is a physical unit of execution.
- Task: Each stage has some tasks. A task is a unit of work that is sent to the executor and run over a partition of data.
Source: https://data-flair.training/blogs/how-apache-spark-works/
Monitoring Your Spark Job
To access the Spark UI in EarthAI Notebooks, click Spark in the top menu and select Spark UI. This will open up a new window in your main work area within EarthAI Notebooks.
The Spark UI can be used to monitor the progress of your Spark job and to view the resource consumption of your cluster. You can also check your Spark configurations, such as how many executors your cluster has and how many cores are allocated to each executor.
As an example, if you notice your code is taking a while to run, you can open up the Spark UI to see what job and stage the code is on. Then, you can view the executors page to see if your executors are overwhelmed. If your executors are overwhelmed, you can try:
- refactoring your code to make it more efficient;
- reconfiguring your executors to give them more memory; or
- launching a larger cluster.
Interpreting the Spark UI
We run through a code example to create a Spark job that we can view in the Spark UI. In the code below, we run through the steps of:
- importing the EarthAI library;
- instantiating a Spark instance;
- querying Earth OnDemand for Landsat 8 imagery;
- reading Landsat 8 imagery into a RasterFrame; and
- counting how many rows are in the RasterFrame.
spark.read.raster
is a Spark transformation function and count()
is a Spark action function. The action will kick off a Spark job that we can monitor in the Spark UI. Run the code below and then open up the Spark UI.
# import Earth AI library from earthai.all import * # create a Spark instance with default configurations spark = create_earthai_spark_session() # query Earth OnDemand from imagery cat = earth_ondemand.read_catalog( geo='POINT(-110.0 44.5)', start_datetime='2018-08-01', end_datetime='2018-08-31', max_cloud_cover=10, collections='mcd43a4', ) # read imagery into a RasterFrame rf = spark.read.raster(cat, catalog_col_names = ['B01', 'B04', 'B03']) # count how many rows in RasterFrame rf.count()
Jobs
When you open the Spark UI, you will see the "Jobs" tab. This page shows the number of jobs. Each job will have a description (an action function), a status, and how many tasks have been completed. Clicking on a description in the list of jobs will show the current state of all stages for that particular job.
The number of jobs will be equal to the number of actions in your code. We've only run one action so far, so you should see a single row in your jobs table as shown below.
Stages
Clicking on the "Stages" tab shows the current state of all stages of Spark job(s) running on your cluster. Clicking on a description in the list of stages will show details for each stage.
Environment
The "Environment" tab shows your Spark environment details and a long list of Spark properties. If you reconfigure any parameters in your Spark instance, you can check if the properties have been set correctly on this page. In a later section, we will show how to reconfigure Spark properties.
Executors
By clicking the "Executors" tab you can view the resource and performance details of your executors, including memory and disk usage as well as task and shuffle information.
For more information on the Spark UI, click here.
Source: https://sparkbyexamples.com/spark/spark-web-ui-understanding/
Spark Tuning
Shuffling
Shuffling is a mechanism for moving data across different executors or even across machines. It's a expensive operation because it requires disk and network I/O. Shuffling is often a cause of performance issues, so when you notice your code lagging, look for transformations that involve shuffling.
Shuffling is triggered by certain transformation operations, including:
groupByKey()
reduceByKey()
join()
repartition()
coalesce()
The partitionBy()
function is useful for making shuffling operations more efficient. You can specify a column to redistribute the data by across partitions. This function is especially useful when writing out data.
repartition()
and coalesce()
are two functions that trigger shuffling that can be used to make subsequent shuffling operations in your code more efficient. We talk about repartitioning more in the next section.
Source: https://stackoverflow.com/questions/33831561/pyspark-repartition-vs-partitionby/42028881#42028881
Repartitioning
Repartitioning is the process of increasing or decreasing the number of partitions that your data is distributed across. It's important to repartition datasets after filtering down a large data set, so you don't end up with a single executor doing all the work.
The repartition()
function allows you to specify the number of partitions that you want to distribute your data across. The number of partitions can be increased or decreased. This operation does a full shuffle and distributes data evenly across partitions.
The coalesce()
function allows you to reduce the number of partitions that your data is spread across. It avoids a full data shuffle by using existing partitions, which minimizes the amount of data that's being shuffled. It results in partitions with different amounts of data.
coalesce()
runs faster than repartition()
, but unequal-sized partitions can be slower to work with than equal-sized partitions. You should consider the number of cores and amount of data you have when using these functions, so you don't specify too few or too many partitions.
Source: https://stackoverflow.com/questions/31610971/spark-repartition-vs-coalesce
Caching
Caching is the process of storing your data to speed up any future processing of the data. A DataFrame that is not cached is re-evaluated again each time an action is invoked on it.
Spark has 2 functions for caching a DataFrame: cache()
and persist(level: StorageLevel)
. cache()
will store a DataFrame in memory whereas persist()
can store in memory, on disk, or off-heap memory depending on the storage level. The default value for persist()
is MEMORY_AND_DISK which means that the data will be stored in memory if there is space for it, otherwise it will be stored on disk.
If you know your data is too large to fit in-memory, you should use persist()
rather than cache()
to avoid out-of-memory errors. Unfortunately, reading from disk is much slower than reading from memory, so try to only cache the columns and rows needed in your analysis to reduce the size of the dataset and remove the need for disk storage.
You should cache a DataFrame if you are reusing it in an iterative loop or if you are reusing it multiple times through your code. Caching can also help in reducing the cost of recovery if an executor fails, so it's a good idea to cache when DataFrame computation is expensive. You can check the "Storage" tab in the Spark UI to see how much space your cached datasets take up.
Source: https://unraveldata.com/to-cache-or-not-to-cache/
Broadcasting
Broadcasting is mechanism for putting a copy of a variable on each executor. This can speed up data processing since the Spark job doesn't have to ship a copy of the variable with each task. The broadcast()
function can be used, for example, to give every executor a copy of an input dataset or lookup dictionary in an efficient manner.
Source: https://spark.apache.org/docs/latest/api/java/org/apache/spark/broadcast/Broadcast.html
Setting Spark Properties
As mentioned earlier, you can check the "Environment" tab of the Spark UI to view the properties of your Spark instance. In the code below, we show how to change one of these properties. We set the shuffle partition size, which is useful for optimizing the performance of your code.
When dealing with a small amount of data, you should reduce the number of shuffle partitions. Otherwise, you could end up with a bunch of partitions with a small amount of data on each, which could result in too many tasks and slow down performance.
When dealing with a large amount of data, you should increase the number of shuffle partitions. Otherwise, you could end up with some really long running tasks that could result in an out of memory error.
We've already imported the EarthAI library and queried Earth OnDemand for Landsat 8 imagery earlier in the article, so we can skip straight to reinstantiating a Spark instance. We set the number of shuffle partitions to 150 in a dictionary and pass this to create_earthai_spark_session()
.
# create spark session and specify the number of shuffle partitions spark = create_earthai_spark_session(**{ 'spark.sql.shuffle.partitions': 150 })
We rerun the function to read MODIS imagery into a RasterFrame using the Earth OnDemand catalog created earlier in the article.
# read imagery into a RasterFrame rf = spark.read.raster(cat, catalog_col_names = ['B01', 'B04', 'B03'])
Then, we print the number of partitions to make sure the parameter was set correctly.
# get the number of partitions rf.rdd.getNumPartitions()
Comments
0 comments
Please sign in to leave a comment.