Kernel Density¶
This document provides a detailed example on how to build a raster from point data using kernel density estimation. Though that is the ostensible point, it also provides a brief introduction to working with rasters, including how to tile a raster and how to use the result as the basis for a computation in Spark.
Kernel density is one way to convert a set of points (an instance of vector data) into a raster. In this process, at every point in the point set, the contents of what is effectively a small Tile (called a Kernel) containing a predefined pattern are added to the grid cells surrounding the point in question (i.e., the kernel is centered on the tile cell containing the point and then added to the Tile). This is an example of a local map algebra operation. Assuming that the points were sampled according to a probability density function, and if the kernel is derived from a Gaussian function, this can develop a smooth approximation to the density function that the points were sampled from. (Alternatively, each point can be given a weight, and the kernel values can be scaled by that weight before being applied to the tile, which we will do below.)
To begin, let’s generate a random collection of points with weights in
the range (0, 32). To capture the idea of “a point with weight”, we
create a PointFeature[Double]
(which is an alias of
Feature[Point, Double]
). Features, in general, combine a geometry
with an attribute value in a type-safe way.
import geotrellis.vector._
import scala.util._
val extent = Extent(-109, 37, -102, 41) // Extent of Colorado
def randomPointFeature(extent: Extent): PointFeature[Double] = {
def randInRange (low: Double, high: Double): Double = {
val x = Random.nextDouble
low * (1-x) + high * x
}
Feature(Point(randInRange(extent.xmin, extent.xmax), // the geometry
randInRange(extent.ymin, extent.ymax)),
Random.nextInt % 16 + 16) // the weight (attribute)
}
val pts = (for (i <- 1 to 1000) yield randomPointFeature(extent)).toList
The choice of extent is largely arbitrary in this example, but note that
the coordinates here are taken with respect to the standard (longitude,
latitude) that we normally consider. Other coordinate representations
are available, and it might be useful to investigate coordinate
reference systems (CRSs) if you want more details. Some operations in
GeoTrellis require that a CRS object be constructed to place your
rasters in the proper context. For (longitude, latitude) coordinates,
either geotrellis.proj4.CRS.fromName("EPSG:4326")
or
geotrellis.proj4.LatLng
will generate the desired CRS.
Next, we will create a tile containing the kernel density estimate:
import geotrellis.raster._
import geotrellis.raster.mapalgebra.focal.Kernel
val kernelWidth: Int = 9
/* Gaussian kernel with std. deviation 1.5, amplitude 25 */
val kern: Kernel = Kernel.gaussian(kernelWidth, 1.5, 25)
val kde: Tile = pts.kernelDensity(kern, RasterExtent(extent, 700, 400))
This populates a 700x400 tile with the desired kernel density estimate.
In order to view the resulting file, a simple method is to write the
tile out to PNG or TIFF. In the following snippet, a PNG is created in
the directory sbt
was launched in (the working directory), where the
values of the tile are mapped to colors that smoothly interpolate from
blue to yellow to red.
import geotrellis.raster.render._
val colorMap = ColorMap(
(0 to kde.findMinMax._2 by 4).toArray,
ColorRamps.HeatmapBlueToYellowToRedSpectrum
)
kde.renderPng(colorMap).write("test.png")
The advantage of using a TIFF output is that it will be tagged with the extent and CRS, and the resulting image file can be overlayed on a map in a viewing program such as QGIS. That output is generated by the following statements.
import geotrellis.raster.io.geotiff._
GeoTiff(kde, extent, LatLng).write("test.tif")
Subdividing Tiles¶
The above example focuses on a toy problem that creates a small raster object. However, the purpose of GeoTrellis is to enable the processing of large-scale datasets. In order to work with large rasters, it will be necessary to subdivide a region into a grid of tiles so that the processing of each piece may be handled by different processors which may, for example, reside on remote machines in a cluster. This section explains some of the concepts involved in subdividing a raster into a set of tiles.
We will still use an Extent
object to set the bounds of our raster
patch in space, but we must now specify how that extent is broken up
into a grid of Tile
s. This requires a statement of the form:
import geotrellis.spark.tiling._
val tl = TileLayout(7, 4, 100, 100)
Here, we have specified a 7x4 grid of Tiles, each of which has 100x100
cells. This will eventually be used to divide the earlier monolithic
700x400 Tile (kde
) into 28 uniformly-sized subtiles. The TileLayout
is then combined with the extent in a LayoutDefinition
object:
val ld = LayoutDefinition(extent, tl)
In preparation for reimplementing the previous kernel density estimation
with this structure, we note that each point in pts
lies at the
center of a kernel, which covers some non-zero area around the points.
We can think of each point/kernel pair as a small square extent centered
at the point in question with a side length of 9 pixels (the arbitrary
width we chose for our kernel earlier). Each pixel, however, covers some
non-zero area of the map, which we can also think of as an extent with
side lengths given in map coordinates. The dimensions of a pixel’s
extent are given by the cellwidth
and cellheight
members of a
LayoutDefinition object.
By incorporating all these ideas, we can create the following function to generate the extent of the kernel centered at a given point:
def pointFeatureToExtent[D](kwidth: Double, ld: LayoutDefinition, ptf: PointFeature[D]): Extent = {
val p = ptf.geom
Extent(p.x - kwidth * ld.cellwidth / 2,
p.y - kwidth * ld.cellheight / 2,
p.x + kwidth * ld.cellwidth / 2,
p.y + kwidth * ld.cellheight / 2)
}
def ptfToExtent[D](p: PointFeature[D]) = pointFeatureToExtent(9, ld, p)
When we consider the kernel extent of a point in the context of a
LayoutDefinition, it’s clear that a kernel’s extent may overlap more
than one tile in the layout. To discover the tiles that a given point’s
kernel extents overlap, LayoutDefinition provides a mapTransform
object. Among the methods of mapTransform is the ability to determine
the indices of the subtiles in the TileLayout that overlap a given
extent. Note that the tiles in a given layout are indexed by
SpatialKey
s, which are effectively (Int,Int)
pairs giving the
(column,row) of each tile as follows:
+-------+-------+-------+ +-------+
| (0,0) | (1,0) | (2,0) | . . . | (6,0) |
+-------+-------+-------+ +-------+
| (0,1) | (1,1) | (2,1) | . . . | (6,1) |
+-------+-------+-------+ +-------+
. . . . .
. . . . .
. . . . .
+-------+-------+-------+ +-------+
| (0,3) | (1,3) | (2,3) | . . . | (6,3) |
+-------+-------+-------+ +-------+
Specifically, in our running example,
ld.mapTransform(ptfToExtent(Feature(Point(-108, 38), 100.0)))
returns GridBounds(0, 2, 1, 3),
indicating that every cell with
columns in the range [0,1] and rows in the range [2,3] are intersected
by the kernel centered on the point (-108, 38)—that is, the 2x2 block
of tiles at the lower left of the layout.
In order to proceed with the kernel density estimation, it is necessary
to then convert the list of points into a collection of
(SpatialKey, List[PointFeature[Double]])
that gathers all the points
that have an effect on each subtile, as indexed by their SpatialKeys.
The following snippet accomplishes that.
import geotrellis.spark._
def ptfToSpatialKey[D](ptf: PointFeature[D]): Seq[(SpatialKey,PointFeature[D])] = {
val ptextent = ptfToExtent(ptf)
val gridBounds = ld.mapTransform(ptextent)
for {
(c, r) <- gridBounds.coords
if r < tl.totalRows
if c < tl.totalCols
} yield (SpatialKey(c,r), ptf)
}
val keyfeatures: Map[SpatialKey, List[PointFeature[Double]]] =
pts
.flatMap(ptfToSpatialKey)
.groupBy(_._1)
.map { case (sk, v) => (sk, v.unzip._2) }
Now, all the subtiles may be generated in the same fashion as the monolithic tile case above.
val keytiles = keyfeatures.map { case (sk, pfs) =>
(sk, pfs.kernelDensity(
kern,
RasterExtent(ld.mapTransform(sk), tl.tileDimensions._1, tl.tileDimensions._2)
))
}
Finally, it is necessary to combine the results. Note that, in order to produce a 700x400 tile that is identical to the simpler, non-tiled case, every SpatialKey must be represented in the map, or the result may not span the full range. This is only necessary if it is important to generate a tile that spans the full extent.
import geotrellis.spark.stitch.TileLayoutStitcher
val tileList =
for {
r <- 0 until ld.layoutRows
c <- 0 until ld.layoutCols
} yield {
val k = SpatialKey(c,r)
(k, keytiles.getOrElse(k, IntArrayTile.empty(tl.tileCols, tl.tileRows)))
}
val stitched = TileLayoutStitcher.stitch(tileList)._1
It is now the case that stitched
is identical to kde.
Distributing Computation via Apache Spark¶
As mentioned, the reason for introducing this more complicated version
of kernel density estimation was to enable distributed processing of the
kernel stamping process. Each subtile could potentially be handled by a
different node in a cluster, which would make sense if the dataset in
question were, say, the location of individual trees, requiring a huge
amount of working memory. By breaking the domain into smaller pieces, we
can exploit the distributed framework provided by Apache Spark to spread
the task to a number of machines. This will also provide fault tolerant,
rapid data processing for real-time and/or web-based applications. Spark
is much too big a topic to discuss in any detail here, so the curious
reader is advised to search the web for more information. Our concern
falls on how to write code to exploit the structures provided by Spark,
specifically Resilient Distributed Datasets, or RDDs. An RDD is a
distributed collection, with all of the usual features of a
collection—e.g., map, reduce—as well as distributed versions of certain
sequential operations—e.g., aggregate
is akin to foldLeft
for
standard collections. In order to create an RDD, one will call the
parallelize()
method on a SparkContext
object, which can be
generated by the following set of statements.
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf().setMaster("local").setAppName("Kernel Density")
val sc = new SparkContext(conf)
In our case, we have a collection of PointFeatures that we wish to parallelize, so we issue the command
import org.apache.spark.rdd.RDD
val pointRdd = sc.parallelize(pts, 10)
Here, the 10 indicates that we want to distribute the data, as 10 partitions, to the available workers. A partition is a subset of the data in an RDD that will be processed by one of the workers, enabling parallel, distributed computation, assuming the existence of a pool of workers on a set of machines. If we exclude this value, the default parallelism will be used, which is typically the number of processors, though in this local example, it defaults to one.
In order to perform the same task as in the previous section, but in
parallel, we will approach the problem in much the same way: points will
be assigned an extent corresponding to the extent of the associated
kernel, those points will be assigned SpatialKeys based on which
subtiles their kernels overlap, and each kernel will be applied to the
tile corresponding to its assigned SpatialKey. Earlier, this process was
effected by a flatMap followed by a groupBy and then a map. This very
same procedure could be used here, save for the fact that groupBy, when
applied to an RDD, triggers an expensive, slow, network-intensive
shuffling operation which collects items with the same key on a single
node in the cluster. Instead, a fold-like operation will be used:
aggregateByKey
, which has a signature of
RDD[(K, U)] => T => ((U, T) => T, (T, T) => T) => RDD[(K, T)].
That
is, we begin with an RDD of key/value pairs, provide a “zero value” of
type T
, the type of the final result, and two functions: (1) a
sequential operator, which uses a single value of type U
to update
an accumulated value of type T
, and (2) a combining operator,
which merges two accumulated states of type T
. In our case,
U = PointFeature[Double]
and T = Tile
; this implies that the
insertion function is a kernel stamper and the merging function is a
tile adder.
import geotrellis.raster.density.KernelStamper
def stampPointFeature(
tile: MutableArrayTile,
tup: (SpatialKey, PointFeature[Double])
): MutableArrayTile = {
val (spatialKey, pointFeature) = tup
val tileExtent = ld.mapTransform(spatialKey)
val re = RasterExtent(tileExtent, tile)
val result = tile.copy.asInstanceOf[MutableArrayTile]
KernelStamper(result, kern)
.stampKernelDouble(re.mapToGrid(pointFeature.geom), pointFeature.data)
result
}
import geotrellis.raster.mapalgebra.local.LocalTileBinaryOp
object Adder extends LocalTileBinaryOp {
def combine(z1: Int, z2: Int) = {
if (isNoData(z1)) {
z2
} else if (isNoData(z2)) {
z1
} else {
z1 + z2
}
}
def combine(r1: Double, r2:Double) = {
if (isNoData(r1)) {
r2
} else if (isNoData(r2)) {
r1
} else {
r1 + r2
}
}
}
def sumTiles(t1: MutableArrayTile, t2: MutableArrayTile): MutableArrayTile = {
Adder(t1, t2).asInstanceOf[MutableArrayTile]
}
Note that we require a custom Adder implementation because the built-in
tile summation operator returns NODATA
if either of the cells being
added contain a NODATA
value themselves.
Creating the desired result is then a matter of the following series of
operations on pointRdd
:
val tileRdd: RDD[(SpatialKey, Tile)] =
pointRdd
.flatMap(ptfToSpatialKey)
.mapPartitions({ partition =>
partition.map { case (spatialKey, pointFeature) =>
(spatialKey, (spatialKey, pointFeature))
}
}, preservesPartitioning = true)
.aggregateByKey(ArrayTile.empty(DoubleCellType, ld.tileCols, ld.tileRows))
(stampPointFeature, sumTiles)
.mapValues{ tile: MutableArrayTile => tile.asInstanceOf[Tile] }
The mapPartitions
operation simply applies a transformation to an
RDD without triggering any kind of shuffling operation. Here, it is
necessary to make the SpatialKey available to stampPointFeature
so
that it can properly determine the pixel location in the corresponding
tile.
We would be finished here, except that RDDs inside GeoTrellis are required to carry along a Metadata object that describes the context of the RDD. This is created like so:
import geotrellis.proj4.LatLng
val metadata = TileLayerMetadata(DoubleCellType,
ld,
ld.extent,
LatLng,
KeyBounds(SpatialKey(0,0),
SpatialKey(ld.layoutCols-1,
ld.layoutRows-1)))
To combine the RDD and the metadata, we write
val resultRdd = ContextRDD(tileRdd, metadata)
.
This resulting RDD is essentially the object of interest, though it is
possible to write resultRDD.stitch
to produce a single merged tile.
In the general case, however, the RDD may cover an area so large and in
sufficient resolution that the result of stitching would be too large
for working memory. In these sorts of applications, the usual work flow
is to save the tiles off to one of the distributed back ends (Accumulo,
S3, HDFS, etc.). Tiles thus stored may then be used in further
processing steps or be served to applications (e.g., web mapping
applications). If it is absolutely necessary, the individual tiles may
be saved off as GeoTIFFs and stitched via an application like GDAL.
A Note on Running Example Code¶
To run the above test code, it is necessary to have a compatible
environment. Spark code may experience failures if run solely in the
Scala interpreter, as accessed through SBT’s console
command. One
way to ensure proper execution is to run in spark-shell
, a Scala
environment which provides a SparkContext made available through the
variable sc
. Another way is to compile the application into a JAR
file using sbt assembly
, and to use spark-submit
. This latter
option is the preferred method for Spark applications, in general, but
for the purposes of trying out the provided code samples,
spark-shell
is the more sensible choice. The use of spark-submit
is beyond the scope of this documentation, but many resources are
available on the internet for learning this tool.
In either event, it will be necessary to install Spark in your local
environment to run the code above. Once that is done, you will need to
clone the GeoTrellis repository from
Github. From the root
directory of that project, execute the provided sbt
script. Once SBT
is loaded, the following commands can be executed:
project spark-etl
assembly
This packages the required class files into a JAR file. Now, again from the GeoTrellis source tree root directory, issue the command
spark-shell --jars spark-etl/target/scala-2.11/geotrellis-spark-etl-assembly-[version].jar
From the resulting interpreter prompt, perform the following imports:
import geotrellis.raster._
import geotrellis.vector._
import geotrellis.proj4._
import geotrellis.spark._
import geotrellis.spark.util._
import geotrellis.spark.tiling._
It should then be possible to input the example code from above (excluding the creation of a SparkContext) and get the desired result.
A Note on Implementation¶
The procedures that we’ve been considering above have been implemented
in GeoTrellis and are located in
raster/src/main/scala/geotrellis/raster/density/
and
spark/src/main/scala/geotrellis/spark/density
. This final
implementation is more complete than the simple version presented here,
as it handles type conversion for different tile cell types and is
augmented with convenience functions that are provided through the use
of the MethodExtensions
facility. Briefly, method extensions allow
for implicit conversion between Traversable[PointFeature[Num]]
(where Num
is either Int
or Double
) and a wrapper class
which provides a method
kernelDensity: (Kernel, RasterExtent) => Tile
. Thus, any traversable
collection can be treated as if it possesses a kernelDensity
method.
This pattern appears all throughout GeoTrellis, and provides some
welcome syntactic sugar.
Furthermore, the final implementation is more flexible with regard to the type of data used. Both the PointFeature parameter and the Tile CellType may be of integral or floating-point type. See the code for details.