{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Writing to Parquet" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As discussed in the [RasterFrames documentation](https://rasterframes.io/raster-write.html#parquet), you can write a Spark DataFrame to the Apache Parquet format. There is support for writing and reading tiles. This may be useful to checkpoint your analysis.\n", "\n", "\n", "More information can also be found at the [pyspark site](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter).\n", "\n", "## Practical Considerations\n", "\n", "Parquet reading and writing in Spark is pretty robust and there are lots of options. Refer to the above documentation for more information.\n", "\n", "It is worth considering the available storage space before writing a large DataFrame to a local directory.\n", "\n", "Parquet supports distributed reading from and writing to S3. However, writing directly to S3 is not recommended. Instead, write to either the local file system or HDFS (if available) then copy / `distcp` the data to S3.\n", "\n", "## Demonstration\n", "\n", "We will read a small amount of data, write it to Parquet, and then read a second copy of it from the Parquet.\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "autoscroll": "auto", "collapsed": false, "jupyter": { "outputs_hidden": false }, "options": { "caption": false, "complete": true, "display_data": true, "display_stream": true, "dpi": 200, "echo": true, "evaluate": true, "f_env": null, "f_pos": "htpb", "f_size": [ 6, 4 ], "f_spines": true, "fig": true, "include": true, "name": "init", "option_string": "name = \"init\"", "results": "verbatim", "term": false, "wrap": "output" } }, "outputs": [], "source": [ "from earthai.init import *\n", "import pyspark.sql.functions as F\n", "\n", "catalog = earth_ondemand.read_catalog('POINT(20 40)',\n", " start_datetime='2018-07-01',\n", " end_datetime='2018-07-03',\n", " collections='mcd43a4'\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "autoscroll": "auto", "collapsed": false, "jupyter": { "outputs_hidden": false }, "options": { "caption": false, "complete": true, "display_data": true, "display_stream": true, "dpi": 200, "echo": true, "evaluate": true, "f_env": null, "f_pos": "htpb", "f_size": [ 6, 4 ], "f_spines": true, "fig": true, "include": true, "name": null, "option_string": "", "results": "verbatim", "term": false, "wrap": "output" } }, "outputs": [], "source": [ "df = spark.read.raster(catalog, ['B01', 'B02', 'B03'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "autoscroll": "auto", "collapsed": false, "jupyter": { "outputs_hidden": false }, "options": { "caption": false, "complete": true, "display_data": true, "display_stream": true, "dpi": 200, "echo": false, "evaluate": true, "f_env": null, "f_pos": "htpb", "f_size": [ 6, 4 ], "f_spines": true, "fig": true, "include": true, "name": null, "option_string": "echo=False", "results": "verbatim", "term": false, "wrap": "output" } }, "outputs": [], "source": [ "import shutil\n", "import os\n", "\n", "if os.path.exists('/tmp/export.pq'):\n", " shutil.rmtree('/tmp/export.pq')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "Write the entire DataFrame with the standard DataFrameWriter in pyspark.\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "autoscroll": "auto", "collapsed": false, "jupyter": { "outputs_hidden": false }, "options": { "caption": false, "complete": true, "display_data": true, "display_stream": true, "dpi": 200, "echo": true, "evaluate": true, "f_env": null, "f_pos": "htpb", "f_size": [ 6, 4 ], "f_spines": true, "fig": true, "include": true, "name": null, "option_string": "", "results": "verbatim", "term": false, "wrap": "output" } }, "outputs": [], "source": [ "df.write.parquet('/tmp/export.pq')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "Reading this data is through using the spark [default `parquet` format](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader).\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "autoscroll": "auto", "collapsed": false, "jupyter": { "outputs_hidden": false }, "options": { "caption": false, "complete": true, "display_data": true, "display_stream": true, "dpi": 200, "echo": true, "evaluate": true, "f_env": null, "f_pos": "htpb", "f_size": [ 6, 4 ], "f_spines": true, "fig": true, "include": true, "name": null, "option_string": "", "results": "verbatim", "term": false, "wrap": "output" } }, "outputs": [], "source": [ "df2 = spark.read.parquet('/tmp/export.pq')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "autoscroll": "auto", "collapsed": false, "jupyter": { "outputs_hidden": false }, "options": { "caption": false, "complete": true, "display_data": true, "display_stream": true, "dpi": 200, "echo": true, "evaluate": true, "f_env": null, "f_pos": "htpb", "f_size": [ 6, 4 ], "f_spines": true, "fig": true, "include": true, "name": null, "option_string": "", "results": "verbatim", "term": false, "wrap": "output" } }, "outputs": [], "source": [ "df2.columns" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "In keeping with Spark's lazy compute model, so far there is just a representation of the data location and its schema.\n", "\n", "We can run a few queries to demonstrate the content of the original and round-tripped DataFrames are the same.\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "autoscroll": "auto", "collapsed": false, "jupyter": { "outputs_hidden": false }, "options": { "caption": false, "complete": true, "display_data": true, "display_stream": true, "dpi": 200, "echo": true, "evaluate": true, "f_env": null, "f_pos": "htpb", "f_size": [ 6, 4 ], "f_spines": true, "fig": true, "include": true, "name": null, "option_string": "", "results": "verbatim", "term": false, "wrap": "output" } }, "outputs": [], "source": [ "df2.groupBy('id', 'datetime').agg(F.count(rf_extent('B01')))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "autoscroll": "auto", "collapsed": false, "jupyter": { "outputs_hidden": false }, "options": { "caption": false, "complete": true, "display_data": true, "display_stream": true, "dpi": 200, "echo": true, "evaluate": true, "f_env": null, "f_pos": "htpb", "f_size": [ 6, 4 ], "f_spines": true, "fig": true, "include": true, "name": null, "option_string": "", "results": "verbatim", "term": false, "wrap": "output" } }, "outputs": [], "source": [ "df.groupBy('id', 'datetime').agg(F.count(rf_extent('B01')))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "Another look at the equivalency of the data.\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "autoscroll": "auto", "collapsed": false, "jupyter": { "outputs_hidden": false }, "options": { "caption": false, "complete": true, "display_data": true, "display_stream": true, "dpi": 200, "echo": true, "evaluate": true, "f_env": null, "f_pos": "htpb", "f_size": [ 6, 4 ], "f_spines": true, "fig": true, "include": true, "name": null, "option_string": "", "results": "verbatim", "term": false, "wrap": "output" } }, "outputs": [], "source": [ "df.select('id', 'B01', 'B02').orderBy(rf_tile_sum('B01'), ascending=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "autoscroll": "auto", "collapsed": false, "jupyter": { "outputs_hidden": false }, "options": { "caption": false, "complete": true, "display_data": true, "display_stream": true, "dpi": 200, "echo": true, "evaluate": true, "f_env": null, "f_pos": "htpb", "f_size": [ 6, 4 ], "f_spines": true, "fig": true, "include": true, "name": null, "option_string": "", "results": "verbatim", "term": false, "wrap": "output" } }, "outputs": [], "source": [ "df2.select('id', 'B01', 'B02').orderBy(rf_tile_sum('B01'), ascending=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [] } ], "metadata": { "kernel_info": { "name": "echo" }, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" }, "zendesk": { "id": 360043896451} }, "nbformat": 4, "nbformat_minor": 4 }