The ETL Tool

When working with GeoTrellis, often the first task is to load a set of rasters to perform reprojection, mosaicing and pyramiding before saving them as a GeoTrellis layer. It is possible, and not too difficult, to use core GreoTrellis features to write a program to accomplish this task. However, after writing a number of such programs we noticed two patterns emerge:

  • Often an individual ETL process will require some modification that is orthogonal to the core ETL logic
  • When designing an ETL process it is useful to first run it a smaller dataset, perhaps locally, as a verification
  • Once written it would be useful to re-run the same ETL process with different input and output storage media

To assist these patterns spark-etl project implements a plugin architecture for tile input sources and output sinks which allows you to write a compact ETL program without having to specify the type and the configuration of the input and output at compile time. The ETL process is broken into three stages: load, tile, and save. This affords an opportunity to modify the dataset using any of the GeoTrellis operations in between the stages.

Sample ETL Application

import geotrellis.raster.Tile
import geotrellis.spark._
import geotrellis.spark.etl.Etl
import geotrellis.spark.etl.config.EtlConf
import geotrellis.spark.util.SparkUtils
import geotrellis.vector.ProjectedExtent
import org.apache.spark.SparkConf

object GeoTrellisETL {
  type I = ProjectedExtent // or TemporalProjectedExtent for temporal ingest
  type K = SpatialKey // or SpaceTimeKey for temporal ingest
  type V = Tile // or MultibandTile to ingest multiband tile
  def main(args: Array[String]): Unit = {
    implicit val sc = SparkUtils.createSparkContext("GeoTrellis ETL", new SparkConf(true))
    try {
      EtlConf(args) foreach { conf =>
        /* parse command line arguments */
        val etl = Etl(conf)
        /* load source tiles using input module specified */
        val sourceTiles = etl.load[I, V]
        /* perform the reprojection and mosaicing step to fit tiles to LayoutScheme specified */
        val (zoom, tiled) = etl.tile[I, V, K](sourceTiles)
        /* save and optionally pyramid the mosaiced layer */
        etl.save[K, V](LayerId(etl.input.name, zoom), tiled)
      }
    } finally {
      sc.stop()
    }
  }
}

Above is just Etl.ingest function implementation, so it is possible to rewrite same functionality:

import geotrellis.spark._
import geotrellis.raster.Tile
import geotrellis.spark.util.SparkUtils
import geotrellis.vector.ProjectedExtent
import org.apache.spark.SparkConf

object SinglebandIngest {
  def main(args: Array[String]): Unit = {
    implicit val sc = SparkUtils.createSparkContext("GeoTrellis ETL SinglebandIngest", new SparkConf(true))
    try {
      Etl.ingest[ProjectedExtent, SpatialKey, Tile](args)
    } finally {
      sc.stop()
    }
  }
}

Etl.ingest function can be used with following types variations:

  • Etl.ingest[ProjectedExtent, SpatialKey, Tile]
  • Etl.ingest[ProjectedExtent, SpatialKey, MultibandTile]
  • Etl.ingest[TemporalProjectedExtent, SpaceTimeKey, Tile]
  • Etl.ingest[TemporalProjectedExtent, SpaceTimeKey, MultibandTile]

For temporal ingest TemporalProjectedExtent and SpaceTimeKey should be used, for spatial ingest ProjectedExtent and SpatialKey.

User-defined ETL Configs

The above sample application can be placed in a new SBT project that has a dependency on "org.locationtech.geotrellis" %% "geotrellis-spark-etl" % s"$VERSION" in addition to dependency on spark-core. and built into an assembly with sbt-assembly plugin. You should be careful to include a assemblyMergeStrategy for sbt assembly plugin as it is provided in spark-etl build file.

At this point you would create a seperate App object for each one of your ETL configs.

Built-in ETL Configs

For convinence and as an example the spark-etl project provides two App objects that perform vanilla ETL:

  • geotrellis.spark.etl.SinglebandIngest
  • geotrellis.spark.etl.MultibandIngest

You may use them by building an assembly jar of spark-etl project as follows:

cd geotrellis
./sbt
sbt> project spark-etl
sbt> assembly

The assembly jar will be placed in geotrellis/spark-etl/target/scala-2.11 directory.

Running the Spark Job

For maximum flexibility it is desirable to run spark jobs with spark-submit. In order to achieve this spark-core dependency must be listed as provided and sbt-assembly plugin used to create the fat jar as described above. Once the assembly jar is read outputs and inputs can be setup through command line arguments like so:

#!/bin/sh
export JAR="geotrellis-etl-assembly-1.0.0-SNAPSHOT.jar"

spark-submit \
--class geotrellis.spark.etl.SinglebandIngest \
--master local[*] \
--driver-memory 2G \
$JAR \
--backend-profiles "file://backend-profiles.json" \
--input "file://input.json" \
--output "file://output.json"

Note that the arguments before the $JAR configure SparkContext and arguments after configure GeoTrellis ETL inputs and outputs.

Command Line Arguments

Option Description
backend-profiles Path to a json file (local fs / hdfs) with credentials for ingest datasets (required field)
input Path to a json file (local fs / hdfs) with datasets to ingest, with optional credentials
output Path to a json file (local fs / hdfs) with output backend params to ingest, with optional credentials

Backend Profiles JSON

{
  "backend-profiles": [{
    "name": "accumulo-name",
    "type": "accumulo",
    "zookeepers": "zookeepers",
    "instance": "instance",
    "user": "user",
    "password": "password"
  },
  {
    "name": "cassandra-name",
    "type": "cassandra",
    "allowRemoteDCsForLocalConsistencyLevel": false,
    "localDc": "datacenter1",
    "usedHostsPerRemoteDc": 0,
    "hosts": "hosts",
    "replicationStrategy": "SimpleStrategy",
    "replicationFactor": 1,
    "user": "user",
    "password": "password"
  }]
}

Sets of named profiles for each backend.

Output JSON

{
   "backend":{
      "type":"accumulo",
      "path":"output",
      "profile":"accumulo-name"
   },
   "breaks":"0:ffffe5ff;0.1:f7fcb9ff;0.2:d9f0a3ff;0.3:addd8eff;0.4:78c679ff;0.5:41ab5dff;0.6:238443ff;0.7:006837ff;1:004529ff",
   "reprojectMethod":"buffered",
   "cellSize":{
      "width":256.0,
      "height":256.0
   },
   "encoding":"geotiff",
   "tileSize":256,
   "layoutExtent":{
      "xmin":1.0,
      "ymin":2.0,
      "xmax":3.0,
      "ymax":4.0
   },
   "tileLayout":{
       "layoutCols": 360,
       "layoutRows": 180,
       "tileCols":   240,
       "tileRows":   240
    },
   "resolutionThreshold":0.1,
   "pyramid":true,
   "resampleMethod":"nearest-neighbor",
   "keyIndexMethod":{
      "type":"zorder"
   },
   "layoutScheme":"zoomed",
   "cellType":"int8",
   "crs":"EPSG:3857"
}
Key Value
backend Backend description is presented below
breaks Breaks string for render output (optional field)
partitions Partitions number during pyramid build
reprojectMethod buffered, per-tile
cellSize Cell size
encoding png, geotiff for render output
tileSize Tile size (optional field)If not set, the default size of output tiles is 256x256
layoutExtent Layout extent (optional field)
tileLayout Tile layout to specify layout grid (optional field)
resolutionThreshold Resolution for user defined Layout Scheme (optional field)
pyramid true, false - ingest with or without building a pyramid
resampleMethod nearest-neighbo r, bilinear, cubic-convoluti on, cubic-spline, lanczos
keyIndexMethod zorder, row-major, hilbert
layoutScheme tms, floating (optional field)
cellType int8, int16, etc… (optional field)
crs Destination crs name (example: EPSG:3857) (optional field)

Backend Keyword

Key Value
type Input backend type (file / hadoop / s3 / accumulo / cassandra)
path Input path (local path / hdfs), or s3:// url
profile Profile name to use for input

Supported Layout Schemes

Layout Scheme Options
zoomed Zoomed layout scheme
floating Floating layout scheme in a native projection

KeyIndex Methods

Key Options
type zorder, row-major, hilbert
temporalResolution Temporal resolution for temporal indexing (optional field)
timeTag Time tag name for input geotiff tiles (optional field)
timeFormat Time format to parse time stored in time tag geotiff tag (optional field)

Input JSON

[{
  "format": "geotiff",
  "name": "test",
  "cache": "NONE",
  "noData": 0.0,
  "clip": {
    "xmin":1.0,
    "ymin":2.0,
    "xmax":3.0,
    "ymax":4.0
  },
  "backend": {
    "type": "hadoop",
    "path": "input"
  }
}]
Key Value
format Format of the tile files to be read (ex: geotiff)
name Input dataset name
cache Spark RDD cache strategy
noData NoData value
clip Extent in target CRS to clip the input source
crs Destination crs name (example: EPSG:3857) (optional field)
maxTleSize Inputs will be broken up into smaller tiles of the given size (optional field)(example: 256 returns 256x256 tiles)
numPartitions How many partitions Spark should make when repartioning (optional field)

Supported Formats

Format Options
geotiff Spatial ingest
temporal-geotiff Temporal ingest

Supported Inputs

Input Options
hadoop path (local path / hdfs)
s3 s3:// url

Supported Outputs

Output Options
hadoop Path
accumulo Table name
cassandra Table name with keysapce (keyspace.tablename)
s3 s3:// url
render Path

Accumulo Output

Accumulo output module has two write strategies:

  • hdfs strategy uses Accumulo bulk import
  • socket strategy uses Accumulo BatchWriter

When using hdfs strategy ingestPath argument will be used as the temporary directory where records will be written for use by Accumulo bulk import. This directory should ideally be an HDFS path.

Layout Scheme

GeoTrellis is able to tile layers in either ZoomedLayoutScheme, matching TMS pyramid, or FloatingLayoutScheme, matching the native resolution of input raster. These alternatives may be selecting by using the layoutScheme option.

Note that ZoomedLayoutScheme needs to know the world extent, which it gets from the CRS, in order to build the TMS pyramid layout. This will likely cause resampling of input rasters to match the resolution of the TMS levels.

On other hand FloatingLayoutScheme will discover the native resolution and extent and partition it by given tile size without resampling.

User-Defined Layout

You may bypass the layout scheme logic by providing layoutExtent and either a tileLayout or a cellSize and tileSize to fully define the layout and start the tiling process. The user may optionally specify an output cellType as well (default case uses the input cellType).

Reprojection

spark-etl project supports two methods of reprojection: buffered and per-tile. They provide a trade-off between accuracy and flexibility.

Buffered reprojection method is able to sample pixels past the tile boundaries by performing a neighborhood join. This method is the default and produces the best results. However it requires that all of the source tiles share the same CRS.

Per tile reproject method can not consider pixels past the individual tile boundaries, even if they exist elsewhere in the dataset. Any pixels past the tile boundaries will be as NODATA when interpolating. This restriction allows for source tiles to have a different projections per tile. This is an effective way to unify the projections for instance when projection from multiple UTM projections to WebMercator.

Rendering a Layer

render output module is different from other modules in that it does not save a GeoTrellis layer but rather provides a way to render a layer, after tiling and projection, to a set of images. This is useful to either verify the ETL process or render a TMS pyramid.

The path module argument is actually a path template, that allows the following substitution:

  • {x} tile x coordinate
  • {y} tile y coordinate
  • {z} layer zoom level
  • {name} layer name

A sample render output configuration template could be:

{
  "path": "s3://tms-bucket/layers/{name}/{z}-{x}-{y}.png",
  "ingestType": {
    "format":"geotiff",
    "output":"render"
  }
}

Extension

In order to provide your own input or output modules you must extend InputPlugin and OutputPlugin and register them in the Etl constructor via a TypedModule.

Examples

Standard ETL assembly provides two classes to ingest objects: class to ingest singleband tiles and class to ingest multiband tiles. The class name to ingest singleband tiles is geotrellis.spark.etl.SinglebandIngest and to ingest multiband tiles is geotrellis.spark.etl.MultibandIngest.

Every example can be launched using:

#!/bin/sh
export JAR="geotrellis-etl-assembly-0.10-SNAPSHOT.jar"

spark-submit \
--class geotrellis.spark.etl.{SinglebandIngest | MultibandIngest} \
--master local[*] \
--driver-memory 2G \
$JAR \
--input "file://input.json" \
--output "file://output.json" \
--backend-profiles "file://backend-profiles.json"

Example Backend Profile

backend-profiles.json:

{
   "backend-profiles":[
      {
         "name":"accumulo-name",
         "type":"accumulo",
         "zookeepers":"zookeepers",
         "instance":"instance",
         "user":"user",
         "password":"password"
      },
      {
         "name":"cassandra-name",
         "type":"cassandra",
         "allowRemoteDCsForLocalConsistencyLevel":false,
         "localDc":"datacenter1",
         "usedHostsPerRemoteDc":0,
         "hosts":"hosts",
         "replicationStrategy":"SimpleStrategy",
         "replicationFactor":1,
         "user":"user",
         "password":"password"
      }
   ]
}

Example Output JSON

output.json:

{
   "backend":{
      "type":"accumulo",
      "path":"output",
      "profile":"accumulo-name"
   },
   "breaks":"0:ffffe5ff;0.1:f7fcb9ff;0.2:d9f0a3ff;0.3:addd8eff;0.4:78c679ff;0.5:41ab5dff;0.6:238443ff;0.7:006837ff;1:004529ff",
   "reprojectMethod":"buffered",
   "cellSize":{
      "width":256.0,
      "height":256.0
   },
   "encoding":"geotiff",
   "tileSize":256,
   "layoutExtent":{
      "xmin":1.0,
      "ymin":2.0,
      "xmax":3.0,
      "ymax":4.0
   },
   "resolutionThreshold":0.1,
   "pyramid":true,
   "resampleMethod":"nearest-neighbor",
   "keyIndexMethod":{
      "type":"zorder"
   },
   "layoutScheme":"zoomed",
   "cellType":"int8",
   "crs":"EPSG:3857"
}

Example Input JSON

input.json:

{
  "format": "geotiff",
  "name": "test",
  "cache": "NONE",
  "noData": 0.0,
  "backend": {
    "type": "hadoop",
    "path": "input"
  }
}

Backend JSON examples (local fs)

"backend": {
  "type": "hadoop",
  "path": "file:///Data/nlcd/tiles"
}

Backend JSON example (hdfs)

"backend": {
  "type": "hadoop",
  "path": "hdfs://nlcd/tiles"
}

Backend JSON example (s3)

"backend": {
  "type": "s3",
  "path": "s3://com.azavea.datahub/catalog"
}

Backend JSON example (accumulo)

"backend": {
  "type": "accumulo",
  "profile": "accumulo-gis",
  "path": "nlcdtable"
}

Backend JSON example (set of PNGs into S3)

"backend": {
  "type": "render",
  "path": "s3://tms-bucket/layers/{name}/{z}-{x}-{y}.png"
}

Backend JSON example (set of PNGs into hdfs or local fs)

"backend": {
  "type": "render",
  "path": "hdfs://path/layers/{name}/{z}-{x}-{y}.png"
}