This document provides a detailed example on how to build a raster from point data using kernel density estimation. Though that is the ostensible point, it also provides a brief introduction to working with rasters, including how to tile a raster and how to use the result as the basis for a computation in Spark.
Kernel density is one way to convert a set of points (an instance of vector data) into a raster. In this process, at every point in the point set, the contents of what is effectively a small Tile (called a Kernel) containing a predefined pattern are added to the grid cells surrounding the point in question (i.e., the kernel is centered on the tile cell containing the point and then added to the Tile). This is an example of a local map algebra operation. Assuming that the points were sampled according to a probability density function, and if the kernel is derived from a Gaussian function, this can develop a smooth approximation to the density function that the points were sampled from. (Alternatively, each point can be given a weight, and the kernel values can be scaled by that weight before being applied to the tile, which we will do below.)
To begin, let's generate a random collection of points with weights in the
range (0, 32). To capture the idea of "a point with weight", we create a
PointFeature[Double]
(which is an alias of Feature[Point, Double]
).
Features, in general, combine a geometry with an attribute value in a
type-safe way.
import geotrellis.vector._
import scala.util._
val extent = Extent(-109, 37, -102, 41) // Extent of Colorado
def randomPointFeature(extent: Extent): PointFeature[Double] = {
def randInRange (low: Double, high: Double): Double = {
val x = Random.nextDouble
low * (1-x) + high * x
}
Feature(Point(randInRange(extent.xmin, extent.xmax), // the geometry
randInRange(extent.ymin, extent.ymax)),
Random.nextInt % 16 + 16) // the weight (attribute)
}
val pts = (for (i <- 1 to 1000) yield randomPointFeature(extent)).toList
The choice of extent is largely arbitrary in this example, but note that the
coordinates here are taken with respect to the standard (longitude,
latitude) that we normally consider. Other coordinate representations are
available, and it might be useful to investigate coordinate reference
systems (CRSs) if you want more details. Some operations in GeoTrellis
require that a CRS object be constructed to place your rasters in the proper
context. For (longitude, latitude) coordinates, either
geotrellis.proj4.CRS.fromName("EPSG:4326")
or geotrellis.proj4.LatLng
will generate the desired CRS.
Next, we will create a tile containing the kernel density estimate:
import geotrellis.raster._
import geotrellis.raster.mapalgebra.focal.Kernel
val kernelWidth: Int = 9
/* Gaussian kernel with std. deviation 1.5, amplitude 25 */
val kern: Kernel = Kernel.gaussian(kernelWidth, 1.5, 25)
val kde: Tile = pts.kernelDensity(kern, RasterExtent(extent, 700, 400))
This populates a 700x400 tile with the desired kernel density estimate. In
order to view the resulting file, a simple method is to write the tile out
to PNG or TIFF. In the following snippet, a PNG is created in the directory
sbt
was launched in (the working directory), where the values of the tile
are mapped to colors that smoothly interpolate from blue to yellow to red.
import geotrellis.raster.render._
val colorMap = ColorMap(
(0 to kde.findMinMax._2 by 4).toArray,
ColorRamps.HeatmapBlueToYellowToRedSpectrum
)
kde.renderPng(colorMap).write("test.png")
The advantage of using a TIFF output is that it will be tagged with the extent and CRS, and the resulting image file can be overlayed on a map in a viewing program such as QGIS. That output is generated by the following statements.
import geotrellis.raster.io.geotiff._
GeoTiff(kde, extent, LatLng).write("test.tif")
Subdividing Tiles¶
The above example focuses on a toy problem that creates a small raster object. However, the purpose of GeoTrellis is to enable the processing of large-scale datasets. In order to work with large rasters, it will be necessary to subdivide a region into a grid of tiles so that the processing of each piece may be handled by different processors which may, for example, reside on remote machines in a cluster. This section explains some of the concepts involved in subdividing a raster into a set of tiles.
We will still use an Extent
object to set the bounds of our raster patch
in space, but we must now specify how that extent is broken up into a grid
of Tile
s. This requires a statement of the form:
import geotrellis.spark.tiling._
val tl = TileLayout(7, 4, 100, 100)
Here, we have specified a 7x4 grid of Tiles, each of which has 100x100
cells. This will eventually be used to divide the earlier monolithic 700x400
Tile (kde
) into 28 uniformly-sized subtiles. The TileLayout is then
combined with the extent in a LayoutDefinition
object:
val ld = LayoutDefinition(extent, tl)
In preparation for reimplementing the previous kernel density estimation
with this structure, we note that each point in pts
lies at the center of
a kernel, which covers some non-zero area around the points. We can think
of each point/kernel pair as a small square extent centered at the point in
question with a side length of 9 pixels (the arbitrary width we chose for
our kernel earlier). Each pixel, however, covers some non-zero area of the
map, which we can also think of as an extent with side lengths given in map
coordinates. The dimensions of a pixel's extent are given by the
cellwidth
and cellheight
members of a LayoutDefinition object.
By incorporating all these ideas, we can create the following function to generate the extent of the kernel centered at a given point:
def pointFeatureToExtent[D](kwidth: Double, ld: LayoutDefinition, ptf: PointFeature[D]): Extent = {
val p = ptf.geom
Extent(p.x - kwidth * ld.cellwidth / 2,
p.y - kwidth * ld.cellheight / 2,
p.x + kwidth * ld.cellwidth / 2,
p.y + kwidth * ld.cellheight / 2)
}
def ptfToExtent[D](p: PointFeature[D]) = pointFeatureToExtent(9, ld, p)
When we consider the kernel extent of a point in the context of a
LayoutDefinition, it's clear that a kernel's extent may overlap more than
one tile in the layout. To discover the tiles that a given point's kernel
extents overlap, LayoutDefinition provides a mapTransform
object. Among
the methods of mapTransform is the ability to determine the indices of the
subtiles in the TileLayout that overlap a given extent. Note that the tiles
in a given layout are indexed by SpatialKey
s, which are effectively
(Int,Int)
pairs giving the (column,row) of each tile as follows:
+-------+-------+-------+ +-------+
| (0,0) | (1,0) | (2,0) | . . . | (6,0) |
+-------+-------+-------+ +-------+
| (0,1) | (1,1) | (2,1) | . . . | (6,1) |
+-------+-------+-------+ +-------+
. . . . .
. . . . .
. . . . .
+-------+-------+-------+ +-------+
| (0,3) | (1,3) | (2,3) | . . . | (6,3) |
+-------+-------+-------+ +-------+
Specifically, in our running example,
ld.mapTransform(ptfToExtent(Feature(Point(-108, 38), 100.0)))
returns
GridBounds(0, 2, 1, 3),
indicating that every cell with columns in the
range [0,1] and rows in the range [2,3] are intersected by the kernel
centered on the point (-108, 38)---that is, the 2x2 block of tiles at the
lower left of the layout.
In order to proceed with the kernel density estimation, it is necessary to
then convert the list of points into a collection of (SpatialKey,
List[PointFeature[Double]])
that gathers all the points that have an effect
on each subtile, as indexed by their SpatialKeys. The following snippet
accomplishes that.
import geotrellis.spark._
def ptfToSpatialKey[D](ptf: PointFeature[D]): Seq[(SpatialKey,PointFeature[D])] = {
val ptextent = ptfToExtent(ptf)
val gridBounds = ld.mapTransform(ptextent)
for {
(c, r) <- gridBounds.coords
if r < tl.totalRows
if c < tl.totalCols
} yield (SpatialKey(c,r), ptf)
}
val keyfeatures: Map[SpatialKey, List[PointFeature[Double]]] =
pts
.flatMap(ptfToSpatialKey)
.groupBy(_._1)
.map { case (sk, v) => (sk, v.unzip._2) }
Now, all the subtiles may be generated in the same fashion as the monolithic tile case above.
val keytiles = keyfeatures.map { case (sk, pfs) =>
(sk, pfs.kernelDensity(
kern,
RasterExtent(ld.mapTransform(sk), tl.tileDimensions._1, tl.tileDimensions._2)
))
}
Finally, it is necessary to combine the results. Note that, in order to produce a 700x400 tile that is identical to the simpler, non-tiled case, every SpatialKey must be represented in the map, or the result may not span the full range. This is only necessary if it is important to generate a tile that spans the full extent.
import geotrellis.spark.stitch.TileLayoutStitcher
val tileList =
for {
r <- 0 until ld.layoutRows
c <- 0 until ld.layoutCols
} yield {
val k = SpatialKey(c,r)
(k, keytiles.getOrElse(k, IntArrayTile.empty(tl.tileCols, tl.tileRows)))
}
val stitched = TileLayoutStitcher.stitch(tileList)._1
It is now the case that stitched
is identical to kde.
Distributing Computation via Apache Spark¶
As mentioned, the reason for introducing this more complicated version of
kernel density estimation was to enable distributed processing of the kernel
stamping process. Each subtile could potentially be handled by a different
node in a cluster, which would make sense if the dataset in question were,
say, the location of individual trees, requiring a huge amount of working
memory. By breaking the domain into smaller pieces, we can exploit the
distributed framework provided by Apache Spark to spread the task to a
number of machines. This will also provide fault tolerant, rapid data
processing for real-time and/or web-based applications. Spark is much too
big a topic to discuss in any detail here, so the curious reader is advised
to search the web for more information. Our concern falls on how to write
code to exploit the structures provided by Spark, specifically Resilient
Distributed Datasets, or RDDs. An RDD is a distributed collection, with
all of the usual features of a collection—e.g., map, reduce—as well as
distributed versions of certain sequential operations—e.g., aggregate
is
akin to foldLeft
for standard collections. In order to create an RDD, one
will call the parallelize()
method on a SparkContext
object, which can
be generated by the following set of statements.
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf().setMaster("local").setAppName("Kernel Density")
val sc = new SparkContext(conf)
In our case, we have a collection of PointFeatures that we wish to parallelize, so we issue the command
import org.apache.spark.rdd.RDD
val pointRdd = sc.parallelize(pts, 10)
Here, the 10 indicates that we want to distribute the data, as 10 partitions, to the available workers. A partition is a subset of the data in an RDD that will be processed by one of the workers, enabling parallel, distributed computation, assuming the existence of a pool of workers on a set of machines. If we exclude this value, the default parallelism will be used, which is typically the number of processors, though in this local example, it defaults to one.
In order to perform the same task as in the previous section, but in
parallel, we will approach the problem in much the same way: points will be
assigned an extent corresponding to the extent of the associated kernel,
those points will be assigned SpatialKeys based on which subtiles their
kernels overlap, and each kernel will be applied to the tile corresponding
to its assigned SpatialKey. Earlier, this process was effected by a flatMap
followed by a groupBy and then a map. This very same procedure could be
used here, save for the fact that groupBy, when applied to an RDD, triggers
an expensive, slow, network-intensive shuffling operation which collects
items with the same key on a single node in the cluster. Instead, a
fold-like operation will be used: aggregateByKey
, which has a signature of
RDD[(K, U)] => T => ((U, T) => T, (T, T) => T) => RDD[(K, T)].
That is,
we begin with an RDD of key/value pairs, provide a “zero value” of type T
,
the type of the final result, and two functions: (1) a sequential
operator, which uses a single value of type U
to update an accumulated
value of type T
, and (2) a combining operator, which merges two
accumulated states of type T
. In our case, U = PointFeature[Double]
and
T = Tile
; this implies that the insertion function is a kernel stamper and
the merging function is a tile adder.
import geotrellis.raster.density.KernelStamper
def stampPointFeature(
tile: MutableArrayTile,
tup: (SpatialKey, PointFeature[Double])
): MutableArrayTile = {
val (spatialKey, pointFeature) = tup
val tileExtent = ld.mapTransform(spatialKey)
val re = RasterExtent(tileExtent, tile)
val result = tile.copy.asInstanceOf[MutableArrayTile]
KernelStamper(result, kern)
.stampKernelDouble(re.mapToGrid(pointFeature.geom), pointFeature.data)
result
}
import geotrellis.raster.mapalgebra.local.LocalTileBinaryOp
object Adder extends LocalTileBinaryOp {
def combine(z1: Int, z2: Int) = {
if (isNoData(z1)) {
z2
} else if (isNoData(z2)) {
z1
} else {
z1 + z2
}
}
def combine(r1: Double, r2:Double) = {
if (isNoData(r1)) {
r2
} else if (isNoData(r2)) {
r1
} else {
r1 + r2
}
}
}
def sumTiles(t1: MutableArrayTile, t2: MutableArrayTile): MutableArrayTile = {
Adder(t1, t2).asInstanceOf[MutableArrayTile]
}
Note that we require a custom Adder implementation because the built-in tile
summation operator returns NODATA
if either of the cells being added
contain a NODATA
value themselves.
Creating the desired result is then a matter of the following series of
operations on pointRdd
:
val tileRdd: RDD[(SpatialKey, Tile)] =
pointRdd
.flatMap(ptfToSpatialKey)
.mapPartitions({ partition =>
partition.map { case (spatialKey, pointFeature) =>
(spatialKey, (spatialKey, pointFeature))
}
}, preservesPartitioning = true)
.aggregateByKey(ArrayTile.empty(DoubleCellType, ld.tileCols, ld.tileRows))
(stampPointFeature, sumTiles)
.mapValues{ tile: MutableArrayTile => tile.asInstanceOf[Tile] }
The mapPartitions
operation simply applies a transformation to an RDD
without triggering any kind of shuffling operation. Here, it is necessary
to make the SpatialKey available to stampPointFeature
so that it can
properly determine the pixel location in the corresponding tile.
We would be finished here, except that RDDs inside GeoTrellis are required to carry along a Metadata object that describes the context of the RDD. This is created like so:
import geotrellis.proj4.LatLng
val metadata = TileLayerMetadata(DoubleCellType,
ld,
ld.extent,
LatLng,
KeyBounds(SpatialKey(0,0),
SpatialKey(ld.layoutCols-1,
ld.layoutRows-1)))
To combine the RDD and the metadata, we write val resultRdd = ContextRDD(tileRdd, metadata)
.
This resulting RDD is essentially the object of interest, though it is
possible to write resultRDD.stitch
to produce a single merged tile. In
the general case, however, the RDD may cover an area so large and in
sufficient resolution that the result of stitching would be too large for
working memory. In these sorts of applications, the usual work flow is to
save the tiles off to one of the distributed back ends (Accumulo, S3, HDFS,
etc.). Tiles thus stored may then be used in further processing steps or be
served to applications (e.g., web mapping applications). If it is
absolutely necessary, the individual tiles may be saved off as GeoTIFFs and
stitched via an application like GDAL.
A Note on Running Example Code¶
To run the above test code, it is necessary to have a compatible
environment. Spark code may experience failures if run solely in the Scala
interpreter, as accessed through SBT's console
command. One way to ensure
proper execution is to run in spark-shell
, a Scala environment which
provides a SparkContext made available through the variable sc
. Another
way is to compile the application into a JAR file using sbt assembly
, and
to use spark-submit
. This latter option is the preferred method for Spark
applications, in general, but for the purposes of trying out the provided
code samples, spark-shell
is the more sensible choice. The use of
spark-submit
is beyond the scope of this documentation, but many resources
are available on the internet for learning this tool.
In either event, it will be necessary to install Spark in your local
environment to run the code above. Once that is done, you will need to
clone the GeoTrellis repository from
Github. From the root directory
of that project, execute the provided sbt
script. Once SBT is loaded, the
following commands can be executed:
project spark-etl
assembly
This packages the required class files into a JAR file. Now, again from the GeoTrellis source tree root directory, issue the command
spark-shell --jars spark-etl/target/scala-2.11/geotrellis-spark-etl-assembly-[version].jar
From the resulting interpreter prompt, perform the following imports:
import geotrellis.raster._
import geotrellis.vector._
import geotrellis.proj4._
import geotrellis.spark._
import geotrellis.spark.util._
import geotrellis.spark.tiling._
It should then be possible to input the example code from above (excluding the creation of a SparkContext) and get the desired result.
A Note on Implementation¶
The procedures that we've been considering above have been implemented in
GeoTrellis and are located in
raster/src/main/scala/geotrellis/raster/density/
and
spark/src/main/scala/geotrellis/spark/density
. This final implementation
is more complete than the simple version presented here, as it handles type
conversion for different tile cell types and is augmented with convenience
functions that are provided through the use of the MethodExtensions
facility. Briefly, method extensions allow for implicit conversion between
Traversable[PointFeature[Num]]
(where Num
is either Int
or Double
)
and a wrapper class which provides a method kernelDensity: (Kernel,
RasterExtent) => Tile
. Thus, any traversable collection can be treated as
if it possesses a kernelDensity
method. This pattern appears all
throughout GeoTrellis, and provides some welcome syntactic sugar.
Furthermore, the final implementation is more flexible with regard to the type of data used. Both the PointFeature parameter and the Tile CellType may be of integral or floating-point type. See the code for details.