Kernel Density

This document provides a detailed example on how to build a raster from point data using kernel density estimation. Though that is the ostensible point, it also provides a brief introduction to working with rasters, including how to tile a raster and how to use the result as the basis for a computation in Spark.

Kernel density is one way to convert a set of points (an instance of vector data) into a raster. In this process, at every point in the point set, the contents of what is effectively a small Tile (called a Kernel) containing a predefined pattern are added to the grid cells surrounding the point in question (i.e., the kernel is centered on the tile cell containing the point and then added to the Tile). This is an example of a local map algebra operation. Assuming that the points were sampled according to a probability density function, and if the kernel is derived from a Gaussian function, this can develop a smooth approximation to the density function that the points were sampled from. (Alternatively, each point can be given a weight, and the kernel values can be scaled by that weight before being applied to the tile, which we will do below.)

To begin, let’s generate a random collection of points with weights in the range (0, 32). To capture the idea of “a point with weight”, we create a PointFeature[Double] (which is an alias of Feature[Point, Double]). Features, in general, combine a geometry with an attribute value in a type-safe way.

import geotrellis.vector._
import scala.util._

val extent = Extent(-109, 37, -102, 41) // Extent of Colorado

def randomPointFeature(extent: Extent): PointFeature[Double] = {
  def randInRange (low: Double, high: Double): Double = {
    val x = Random.nextDouble
    low * (1-x) + high * x
  }
  Feature(Point(randInRange(extent.xmin, extent.xmax),      // the geometry
                randInRange(extent.ymin, extent.ymax)),
          Random.nextInt % 16 + 16)                         // the weight (attribute)
}

val pts = (for (i <- 1 to 1000) yield randomPointFeature(extent)).toList

The choice of extent is largely arbitrary in this example, but note that the coordinates here are taken with respect to the standard (longitude, latitude) that we normally consider. Other coordinate representations are available, and it might be useful to investigate coordinate reference systems (CRSs) if you want more details. Some operations in GeoTrellis require that a CRS object be constructed to place your rasters in the proper context. For (longitude, latitude) coordinates, either geotrellis.proj4.CRS.fromName("EPSG:4326") or geotrellis.proj4.LatLng will generate the desired CRS.

Next, we will create a tile containing the kernel density estimate:

import geotrellis.raster._
import geotrellis.raster.mapalgebra.focal.Kernel

val kernelWidth: Int = 9

/* Gaussian kernel with std. deviation 1.5, amplitude 25 */
val kern: Kernel = Kernel.gaussian(kernelWidth, 1.5, 25)

val kde: Tile = pts.kernelDensity(kern, RasterExtent(extent, 700, 400))

This populates a 700x400 tile with the desired kernel density estimate. In order to view the resulting file, a simple method is to write the tile out to PNG or TIFF. In the following snippet, a PNG is created in the directory sbt was launched in (the working directory), where the values of the tile are mapped to colors that smoothly interpolate from blue to yellow to red.

import geotrellis.raster.render._

val colorMap = ColorMap(
  (0 to kde.findMinMax._2 by 4).toArray,
  ColorRamps.HeatmapBlueToYellowToRedSpectrum
)

kde.renderPng(colorMap).write("test.png")

The advantage of using a TIFF output is that it will be tagged with the extent and CRS, and the resulting image file can be overlayed on a map in a viewing program such as QGIS. That output is generated by the following statements.

import geotrellis.raster.io.geotiff._

GeoTiff(kde, extent, LatLng).write("test.tif")

Subdividing Tiles

The above example focuses on a toy problem that creates a small raster object. However, the purpose of GeoTrellis is to enable the processing of large-scale datasets. In order to work with large rasters, it will be necessary to subdivide a region into a grid of tiles so that the processing of each piece may be handled by different processors which may, for example, reside on remote machines in a cluster. This section explains some of the concepts involved in subdividing a raster into a set of tiles.

We will still use an Extent object to set the bounds of our raster patch in space, but we must now specify how that extent is broken up into a grid of Tiles. This requires a statement of the form:

import geotrellis.spark.tiling._

val tl = TileLayout(7, 4, 100, 100)

Here, we have specified a 7x4 grid of Tiles, each of which has 100x100 cells. This will eventually be used to divide the earlier monolithic 700x400 Tile (kde) into 28 uniformly-sized subtiles. The TileLayout is then combined with the extent in a LayoutDefinition object:

val ld = LayoutDefinition(extent, tl)

In preparation for reimplementing the previous kernel density estimation with this structure, we note that each point in pts lies at the center of a kernel, which covers some non-zero area around the points. We can think of each point/kernel pair as a small square extent centered at the point in question with a side length of 9 pixels (the arbitrary width we chose for our kernel earlier). Each pixel, however, covers some non-zero area of the map, which we can also think of as an extent with side lengths given in map coordinates. The dimensions of a pixel’s extent are given by the cellwidth and cellheight members of a LayoutDefinition object.

By incorporating all these ideas, we can create the following function to generate the extent of the kernel centered at a given point:

def pointFeatureToExtent[D](kwidth: Double, ld: LayoutDefinition, ptf: PointFeature[D]): Extent = {
  val p = ptf.geom

  Extent(p.x - kwidth * ld.cellwidth / 2,
         p.y - kwidth * ld.cellheight / 2,
         p.x + kwidth * ld.cellwidth / 2,
         p.y + kwidth * ld.cellheight / 2)
}

def ptfToExtent[D](p: PointFeature[D]) = pointFeatureToExtent(9, ld, p)

When we consider the kernel extent of a point in the context of a LayoutDefinition, it’s clear that a kernel’s extent may overlap more than one tile in the layout. To discover the tiles that a given point’s kernel extents overlap, LayoutDefinition provides a mapTransform object. Among the methods of mapTransform is the ability to determine the indices of the subtiles in the TileLayout that overlap a given extent. Note that the tiles in a given layout are indexed by SpatialKeys, which are effectively (Int,Int) pairs giving the (column,row) of each tile as follows:

+-------+-------+-------+       +-------+
| (0,0) | (1,0) | (2,0) | . . . | (6,0) |
+-------+-------+-------+       +-------+
| (0,1) | (1,1) | (2,1) | . . . | (6,1) |
+-------+-------+-------+       +-------+
    .       .       .     .         .
    .       .       .       .       .
    .       .       .         .     .
+-------+-------+-------+       +-------+
| (0,3) | (1,3) | (2,3) | . . . | (6,3) |
+-------+-------+-------+       +-------+

Specifically, in our running example, ld.mapTransform(ptfToExtent(Feature(Point(-108, 38), 100.0))) returns GridBounds(0, 2, 1, 3), indicating that every cell with columns in the range [0,1] and rows in the range [2,3] are intersected by the kernel centered on the point (-108, 38)—that is, the 2x2 block of tiles at the lower left of the layout.

In order to proceed with the kernel density estimation, it is necessary to then convert the list of points into a collection of (SpatialKey, List[PointFeature[Double]]) that gathers all the points that have an effect on each subtile, as indexed by their SpatialKeys. The following snippet accomplishes that.

import geotrellis.spark._

def ptfToSpatialKey[D](ptf: PointFeature[D]): Seq[(SpatialKey,PointFeature[D])] = {
  val ptextent = ptfToExtent(ptf)
  val gridBounds = ld.mapTransform(ptextent)

  for {
    (c, r) <- gridBounds.coords
    if r < tl.totalRows
    if c < tl.totalCols
  } yield (SpatialKey(c,r), ptf)
}

val keyfeatures: Map[SpatialKey, List[PointFeature[Double]]] =
  pts
    .flatMap(ptfToSpatialKey)
    .groupBy(_._1)
    .map { case (sk, v) => (sk, v.unzip._2) }

Now, all the subtiles may be generated in the same fashion as the monolithic tile case above.

val keytiles = keyfeatures.map { case (sk, pfs) =>
  (sk, pfs.kernelDensity(
    kern,
    RasterExtent(ld.mapTransform(sk), tl.tileDimensions._1, tl.tileDimensions._2)
  ))
}

Finally, it is necessary to combine the results. Note that, in order to produce a 700x400 tile that is identical to the simpler, non-tiled case, every SpatialKey must be represented in the map, or the result may not span the full range. This is only necessary if it is important to generate a tile that spans the full extent.

import geotrellis.spark.stitch.TileLayoutStitcher

val tileList =
  for {
    r <- 0 until ld.layoutRows
    c <- 0 until ld.layoutCols
  } yield {
    val k = SpatialKey(c,r)
    (k, keytiles.getOrElse(k, IntArrayTile.empty(tl.tileCols, tl.tileRows)))
  }

val stitched = TileLayoutStitcher.stitch(tileList)._1

It is now the case that stitched is identical to kde.

Distributing Computation via Apache Spark

As mentioned, the reason for introducing this more complicated version of kernel density estimation was to enable distributed processing of the kernel stamping process. Each subtile could potentially be handled by a different node in a cluster, which would make sense if the dataset in question were, say, the location of individual trees, requiring a huge amount of working memory. By breaking the domain into smaller pieces, we can exploit the distributed framework provided by Apache Spark to spread the task to a number of machines. This will also provide fault tolerant, rapid data processing for real-time and/or web-based applications. Spark is much too big a topic to discuss in any detail here, so the curious reader is advised to search the web for more information. Our concern falls on how to write code to exploit the structures provided by Spark, specifically Resilient Distributed Datasets, or RDDs. An RDD is a distributed collection, with all of the usual features of a collection—e.g., map, reduce—as well as distributed versions of certain sequential operations—e.g., aggregate is akin to foldLeft for standard collections. In order to create an RDD, one will call the parallelize() method on a SparkContext object, which can be generated by the following set of statements.

import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf().setMaster("local").setAppName("Kernel Density")
val sc = new SparkContext(conf)

In our case, we have a collection of PointFeatures that we wish to parallelize, so we issue the command

import org.apache.spark.rdd.RDD

val pointRdd = sc.parallelize(pts, 10)

Here, the 10 indicates that we want to distribute the data, as 10 partitions, to the available workers. A partition is a subset of the data in an RDD that will be processed by one of the workers, enabling parallel, distributed computation, assuming the existence of a pool of workers on a set of machines. If we exclude this value, the default parallelism will be used, which is typically the number of processors, though in this local example, it defaults to one.

In order to perform the same task as in the previous section, but in parallel, we will approach the problem in much the same way: points will be assigned an extent corresponding to the extent of the associated kernel, those points will be assigned SpatialKeys based on which subtiles their kernels overlap, and each kernel will be applied to the tile corresponding to its assigned SpatialKey. Earlier, this process was effected by a flatMap followed by a groupBy and then a map. This very same procedure could be used here, save for the fact that groupBy, when applied to an RDD, triggers an expensive, slow, network-intensive shuffling operation which collects items with the same key on a single node in the cluster. Instead, a fold-like operation will be used: aggregateByKey, which has a signature of RDD[(K, U)] => T => ((U, T) => T, (T, T) => T) => RDD[(K, T)]. That is, we begin with an RDD of key/value pairs, provide a “zero value” of type T, the type of the final result, and two functions: (1) a sequential operator, which uses a single value of type U to update an accumulated value of type T, and (2) a combining operator, which merges two accumulated states of type T. In our case, U = PointFeature[Double] and T = Tile; this implies that the insertion function is a kernel stamper and the merging function is a tile adder.

import geotrellis.raster.density.KernelStamper

def stampPointFeature(
  tile: MutableArrayTile,
  tup: (SpatialKey, PointFeature[Double])
 ): MutableArrayTile = {
  val (spatialKey, pointFeature) = tup
  val tileExtent = ld.mapTransform(spatialKey)
  val re = RasterExtent(tileExtent, tile)
  val result = tile.copy.asInstanceOf[MutableArrayTile]

  KernelStamper(result, kern)
    .stampKernelDouble(re.mapToGrid(pointFeature.geom), pointFeature.data)

  result
}

import geotrellis.raster.mapalgebra.local.LocalTileBinaryOp

object Adder extends LocalTileBinaryOp {
  def combine(z1: Int, z2: Int) = {
    if (isNoData(z1)) {
      z2
    } else if (isNoData(z2)) {
      z1
    } else {
      z1 + z2
    }
  }

  def combine(r1: Double, r2:Double) = {
    if (isNoData(r1)) {
      r2
    } else if (isNoData(r2)) {
      r1
    } else {
      r1 + r2
    }
  }
}

def sumTiles(t1: MutableArrayTile, t2: MutableArrayTile): MutableArrayTile = {
  Adder(t1, t2).asInstanceOf[MutableArrayTile]
}

Note that we require a custom Adder implementation because the built-in tile summation operator returns NODATA if either of the cells being added contain a NODATA value themselves.

Creating the desired result is then a matter of the following series of operations on pointRdd:

val tileRdd: RDD[(SpatialKey, Tile)] =
  pointRdd
    .flatMap(ptfToSpatialKey)
    .mapPartitions({ partition =>
      partition.map { case (spatialKey, pointFeature) =>
        (spatialKey, (spatialKey, pointFeature))
      }
    }, preservesPartitioning = true)
    .aggregateByKey(ArrayTile.empty(DoubleCellType, ld.tileCols, ld.tileRows))
                   (stampPointFeature, sumTiles)
    .mapValues{ tile: MutableArrayTile => tile.asInstanceOf[Tile] }

The mapPartitions operation simply applies a transformation to an RDD without triggering any kind of shuffling operation. Here, it is necessary to make the SpatialKey available to stampPointFeature so that it can properly determine the pixel location in the corresponding tile.

We would be finished here, except that RDDs inside GeoTrellis are required to carry along a Metadata object that describes the context of the RDD. This is created like so:

import geotrellis.proj4.LatLng

val metadata = TileLayerMetadata(DoubleCellType,
                                 ld,
                                 ld.extent,
                                 LatLng,
                                 KeyBounds(SpatialKey(0,0),
                                           SpatialKey(ld.layoutCols-1,
                                                      ld.layoutRows-1)))

To combine the RDD and the metadata, we write val resultRdd = ContextRDD(tileRdd, metadata).

This resulting RDD is essentially the object of interest, though it is possible to write resultRDD.stitch to produce a single merged tile. In the general case, however, the RDD may cover an area so large and in sufficient resolution that the result of stitching would be too large for working memory. In these sorts of applications, the usual work flow is to save the tiles off to one of the distributed back ends (Accumulo, S3, HDFS, etc.). Tiles thus stored may then be used in further processing steps or be served to applications (e.g., web mapping applications). If it is absolutely necessary, the individual tiles may be saved off as GeoTIFFs and stitched via an application like GDAL.

A Note on Running Example Code

To run the above test code, it is necessary to have a compatible environment. Spark code may experience failures if run solely in the Scala interpreter, as accessed through SBT’s console command. One way to ensure proper execution is to run in spark-shell, a Scala environment which provides a SparkContext made available through the variable sc. Another way is to compile the application into a JAR file using sbt assembly, and to use spark-submit. This latter option is the preferred method for Spark applications, in general, but for the purposes of trying out the provided code samples, spark-shell is the more sensible choice. The use of spark-submit is beyond the scope of this documentation, but many resources are available on the internet for learning this tool.

In either event, it will be necessary to install Spark in your local environment to run the code above. Once that is done, you will need to clone the GeoTrellis repository from Github. From the root directory of that project, execute the provided sbt script. Once SBT is loaded, the following commands can be executed:

project spark-etl
assembly

This packages the required class files into a JAR file. Now, again from the GeoTrellis source tree root directory, issue the command

spark-shell --jars spark-etl/target/scala-2.11/geotrellis-spark-etl-assembly-[version].jar

From the resulting interpreter prompt, perform the following imports:

import geotrellis.raster._
import geotrellis.vector._
import geotrellis.proj4._
import geotrellis.spark._
import geotrellis.spark.util._
import geotrellis.spark.tiling._

It should then be possible to input the example code from above (excluding the creation of a SparkContext) and get the desired result.

A Note on Implementation

The procedures that we’ve been considering above have been implemented in GeoTrellis and are located in raster/src/main/scala/geotrellis/raster/density/ and spark/src/main/scala/geotrellis/spark/density. This final implementation is more complete than the simple version presented here, as it handles type conversion for different tile cell types and is augmented with convenience functions that are provided through the use of the MethodExtensions facility. Briefly, method extensions allow for implicit conversion between Traversable[PointFeature[Num]] (where Num is either Int or Double) and a wrapper class which provides a method kernelDensity: (Kernel, RasterExtent) => Tile. Thus, any traversable collection can be treated as if it possesses a kernelDensity method. This pattern appears all throughout GeoTrellis, and provides some welcome syntactic sugar.

Furthermore, the final implementation is more flexible with regard to the type of data used. Both the PointFeature parameter and the Tile CellType may be of integral or floating-point type. See the code for details.