We will use multispectral Landsat 8 imagery acquired during the growing season to classify the type of crop growing in known crop fields.
The main components of this job are very similar to those in the supervised learning example shown in the RasterFrames documentation. There are a few key differences. The target data is already in the form of a raster. And this raster is in a different CRS than the imagery. To deal with this, we will perform a raster join between the two DataFrames.
Create a SparkSession with options to improve the way tasks are spread across workers.
from earthai.all import * import pyspark.sql.functions as F parallelism = 1000 spark = create_earthai_spark_session(**{ 'spark.default.parallelism': parallelism, 'spark.sql.shuffle.partitions': parallelism, 'spark.driver.memory': '3G' })
Pull Landsat 8 from Earth OnDemand
We can use the earth_ondemand.collections
and earth_ondemand.bands
functions to learn more about the data. Here we will query Landsat 8 L1C data using all the Multi-Spectral Instrument (MSI) bands except ultra-blue. We also pull the BQA in order to mask out cloudy pixels.
We use the earth_ondemand.grid
package to specify a desired Landsat product grid in place of a spatial query.
from earthai.earth_ondemand.grid import LandsatGrid catalog = earth_ondemand.read_catalog( max_cloud_cover=10, collections='landsat8_l1tp', start_datetime='2018-07-01T00:00:00', end_datetime='2018-08-31T23:59:59', grid_ids=LandsatGrid(30, 27) ) df1 = spark.read.raster( catalog, catalog_col_names=['B2', 'B3', 'B4', 'B5', 'B6', 'B7', 'BQA'], lazy_tiles=False )
Cloud Masking
As discussed in detail here, we will mask for high probability clouds, high probability cloud shadows, and fill areas.
In this job, it is sufficient to mask a single band. This will be discussed later when we apply the TileVectorizer
After masking we drop the BQA column because we no longer need it, and we do not want it to be included in our feature set.
df2 = df1.select( rf_interpret_cell_type_as('B2', 'uint16').alias('blue'), rf_interpret_cell_type_as('B3', 'uint16').alias('green'), rf_interpret_cell_type_as('B4', 'uint16').alias('red'), rf_interpret_cell_type_as('B5', 'uint16').alias('nir'), rf_interpret_cell_type_as('B6', 'uint16').alias('swir1'), rf_interpret_cell_type_as('B7', 'uint16').alias('swir2'), 'BQA' ) df_masked = df2.withColumn('blue_masked', # cloud, high prob rf_mask_by_bits('blue', 'bqa', 5, 2, [3])) \ .withColumn('blue_masked', # cloud shadow, high prob rf_mask_by_bits('blue_masked', 'bqa', 7, 2, [3])) \ .withColumn('blue_masked', # mask yes rf_mask_by_bit('blue_masked', 'bqa', 0, 1)) \ .filter(rf_data_cells('blue_masked') > 0) \ .drop('blue', 'BQA') \ .cache()
Read Crop Target
Let's inspect the metadata about our crop target. The data come from the USDA Cropland Data Layer
In this raster the value 100 indicates areas that are not crop fields. We will exclude them from our analysis by setting them to NoData. This also means that our ML model will not have been trained on a background class.
target_url = 'https://s22s-test-geotiffs.s3.amazonaws.com/crop_class/scene_30_27_target.tif'
! gdalinfo /vsicurl/{target_url} 2> /dev/null
df3 = spark.read.raster(target_url)
target_df = df3.select(rf_mask_by_value('proj_raster', 'proj_raster', 100).alias('target')).cache()
Join Features and Target
The raster join operation uses the embedded spatial information from the raster read to bring rows from each table together. Then each right hand side record that spatially intersects the left hand side is reprojected (warped) to align with the grid of the left hand side.
The raster join is a left outer join, so we will filter away any null rows.
joined = df_masked.raster_join(target_df)
Feature Creation
We will compute some commonly used derived features for this kind of task, and putting the dataframe in the final form for training.
analytics_base_table = joined.select( 'crs', 'extent', rf_tile('green').alias('green'), rf_tile('red').alias('red'), rf_tile('nir').alias('nir'), rf_tile('blue_masked').alias('blue_masked'), rf_tile(rf_normalized_difference('nir', 'red')).alias('ndvi'), rf_tile(rf_local_divide(rf_local_add('swir1', 'swir2'), lit(2.0))).alias('mean_swir'), rf_tile(rf_normalized_difference('nir', 'swir1')).alias('ndwi'), rf_tile('target').alias('target') ) display(analytics_base_table)
Train - Test Split
We will divide the rows of the resulting DataFrame into approximately 70/30 train/test split.
train_df, test_df = analytics_base_table.randomSplit([.7, .3], 2345)
Create ML Pipeline
The key components of the ML pipeline are:
TileVectorizer
- packs Tile cells into ML VectorsStringIndexer
- handle categorical labelsDecisionTreeClassifier
from earthai.transformers import TileVectorizer from pyspark.ml.feature import StringIndexer from pyspark.ml.classification import DecisionTreeClassifier from pyspark.ml import Pipeline exploder = TileVectorizer().setFilterNA(True).setTargetLabelCol('target') labelIndexer = StringIndexer() \ .setInputCol('target') \ .setOutputCol('indexedTarget') classifier = DecisionTreeClassifier(maxDepth=5) \ .setLabelCol('indexedTarget') \ .setFeaturesCol('features') pipeline = Pipeline() \ .setStages([exploder, labelIndexer, classifier])
Train the Model
This may take two or three minutes to evaluate on the Extra Large launch option.
model = pipeline.fit(train_df)
Optionally, you can save the model to read in later to make predictions.
model.write().overwrite().save('crop_model/decision_tree')
Evaluate the Model
Key:
- 0 = other
- 1 = corn
- 2 = soybeans
- 3 = alfalfa
- 4 = durum/spring wheat
- 5 = sugar beets
- 6 = dry beans
Note that it may take about one minute to evaluate the toPandas
statement.
prediction_df = model.transform(test_df) vals = prediction_df.select(classifier.getPredictionCol(), classifier.getLabelCol()).toPandas()
from earthai.ml import * showConfusionMatrix(vals.prediction.values, vals.indexedTarget.values) printOverallStats(vals.prediction.values, vals.indexedTarget.values) printClassStats(vals.prediction.values, vals.indexedTarget.values)
Comments
0 comments
Article is closed for comments.