We will use multispectral Landsat 8 imagery acquired during the spring season to create a machine learning model that classifies each pixel as either a water body or not.
The main components of this job are very similar to those in the supervised learning example shown in the RasterFrames documentation. There are a few key differences. The target data is already in the form of a raster. And this raster is in a different CRS than the imagery. To deal with this, we will perform a raster join between the DataFrame with our imagery features and our raster target.
from earthai.all import * from earthai.transformers import TileVectorizer from earthai.ml import * from pyspark.ml.feature import StandardScaler, VectorAssembler, StringIndexer from pyspark.ml.classification import RandomForestClassifier from pyspark.ml import Pipeline from pyspark.sql.functions import * from shapely.geometry import box import matplotlib.pyplot as plt import pandas as pd import time
partitions = 200 spark = create_earthai_spark_session(**{ 'spark.default.parallelism': partitions, 'spark.sql.shuffle.partitions': partitions, })
Read in Water Target
The target data is a raster of water versus land near Mackinaw City, Michigan, USA in the entire Great Lakes region. The pixel values indicate how many years of the month there is water present in that location. For this analysis, we will create a binary label if the value is non-zero.
target_url = 'https://s22s-test-geotiffs.s3.amazonaws.com/water_class/seasonality_mackinaw.tif' tile_size = 128 target_rf = spark.read.raster(target_url, tile_dimensions=(tile_size, tile_size), spatial_index_partitions=partitions ) target_rf = target_rf.select( rf_crs("proj_raster").alias("crs"), rf_extent("proj_raster").alias("extent"), rf_tile(rf_local_greater("proj_raster", 0)).alias("target") )
Read in Earth-Observation Data
We define a geometry object called aoi
that represents an area around Mackinaw City, Michigan, USA.
Note we use a very low cloud filter.
aoi = box(-85.0, 45.7, -84.6, 46.0) catalog = earth_ondemand.read_catalog( geo=aoi, max_cloud_cover=1.0, collections='landsat8_l1tp', start_datetime='2018-02-01T00:00:00', end_datetime='2018-05-31T23:59:59' )
Feature Engineering
- Read EO data from
catalog
DataFrame - Derive indices
- Filter to tiles intersecting
aoi
- Mask out missing data from features
We will filter our feature data to the aoi
area. We express this filter as a function so we can apply it to the target data as well.
def aoi_col_filter(tile_col_name): """ Define a column to filter by our area of interset (AOI) We use a column name arg to align to a given tile columns spatial info """ return st_intersects(rf_geometry(tile_col_name), st_reproject( st_geomFromWKT(lit(aoi.wkt)), rf_mk_crs('epsg:4326'), rf_crs(tile_col_name) ) ) bands = ["B2", "B3", "B4", "B5", "B6", "B7", "BQA"] features_rf = spark.read.raster( catalog[['id', 'datetime'] + bands], catalog_col_names=bands, tile_dimensions=(tile_size, tile_size), spatial_index_partitions=partitions ) features_rf = features_rf \ .withColumnRenamed("B2", "blue") \ .withColumnRenamed("B3", "green") \ .withColumnRenamed("B4", "red") \ .withColumnRenamed("B5", "nir") \ .withColumn("mean_swir", rf_local_divide(rf_local_add("B6", "B7"), lit(2.0))) \ .withColumn("ndvi", rf_normalized_difference("nir", "red")) \ .withColumn("ndwi", rf_normalized_difference("green", "nir")) features_rf = features_rf.filter(aoi_col_filter('blue')) blue_ct = features_rf.select(rf_cell_type('blue')).distinct().first()[0][0] masked_blue_ct = CellType(blue_ct).with_no_data_value(0) features_rf = features_rf.select( rf_crs('blue').alias('crs'), rf_extent('blue').alias('extent'), rf_tile(rf_mask_by_bit(rf_convert_cell_type('blue', masked_blue_ct), 'BQA', 0, 1)).alias('blue'), rf_tile("green"), rf_tile("red"), rf_tile("nir"), rf_tile("mean_swir"), rf_tile("ndvi"), rf_tile("ndwi") )
Raster Join of Target and Features
The raster join operation uses the embedded spatial information from the raster read to bring rows from each table together. Then each right-hand side record that spatially intersects the left-hand side is reprojected (warped) to align with the grid of the left-hand side.
Then we will split into training and test sets by random assignment.
rf = features_rf.raster_join(target_rf) \ .withColumn('train_set', when(rand(seed=1234) > 0.3, 1).otherwise(0)) train_df = rf.filter(rf.train_set == 1) \ .cache() test_df = rf.filter(rf.train_set == 0)
ML Pipeline
The key components of the ML pipeline are:
TileVectorizer
- packs Tile cells into ML VectorsRandomForestClassifier
vectorizer = TileVectorizer() \ .setTargetLabelCol('target') \ .setFilterNA(True) classifier = RandomForestClassifier(numTrees=20) \ .setLabelCol('target') \ .setFeaturesCol('features') pipeline = Pipeline().setStages([vectorizer, classifier])
Fit the pipeline.
model = pipeline.fit(train_df)
Optionally write the model to disk. This is useful if you plan to deploy the model.
model.save(f'model/water_random_forest_{int(time.time() * 1000)}')
Evaluate Model Performance
Use the fitted model to make predictions on the test set and visualize the predictions.
prediction_df = model.transform(test_df) vals = prediction_df.select('prediction', 'target').toPandas()
showConfusionMatrix(vals.prediction.values, vals.target.values) printOverallStats(vals.prediction.values, vals.target.values) printClassStats(vals.prediction.values, vals.target.values)
Comments
0 comments
Please sign in to leave a comment.