Architecture Decision Records¶
This is a collection of subdocuments that describe why (or why not) we made a particular design decision in GeoTrellis.
0001 - Streaming Writes¶
Context¶
To write streaming data (e.g. RDD[(K, V)]
) to an S3
backend it
is necessary to map over rdd partitions and to send multiple async PUT
requests for all elements of a certain partition, it is important to
synchronize these requests in order to be sure, that after calling a
writer
function all data was ingested (or at least attempted). Http
status error 503 Service Unavailable
requires resending a certain
PUT request (with exponential backoff) due to possible network problems
this error was caused by. Accumulo
and Cassandra
writers work in
a similar fashion.
To handle this situation we use the Task
abstraction from Scalaz,
which uses it’s own Future
implementation. The purpose of this
research is to determine the possibility of removing the heavy
Scalaz
dependency. In a near future we will likely depend on the
Cats
library, which is lighter, more modular, and covers much of the
same ground as Scalaz. Thus, to depend on Scalaz
is not ideal.
Decision¶
We started by a moving from Scalaz Task
to an implementation based
on the scala standard library Future
abstraction. Because
List[Future[A]]
is convertable to Future[List[A]]
it was thought
that this simpler home-grown solution might be a workable alternative.
Every Future
is basically some calculation that needs to be
submitted to a thread pool. When you call
(fA: Future[A]).flatMap(a => fB: Future[B])
, both Future[A]
and
Future[B]
need to be submitted to the thread pool, even though they
are not running concurrently and could run on the same thread. If
Future
was unsuccessful it is possible to define recovery strategy
(in case of S3
it is neccesary).
We faced two problems: difficulties in Future
synchronization
(Future.await
) and in Future
delay functionality (as we want an
exponential backoff in the S3
backend case).
We can await a Future
until it’s done (Duration.Inf
), but we can
not be sure that Future
was completed exactly at this point (for
some reason - this needs further investigation - it completes a bit
earlier/later).
Having a threadpool of Future
s and having some List[Future[A]
,
awaiting of these Futures
does not guarantees completeness of each
Future
of a threadpool. Recovering a Future
we produce a new
Future
, so that recoved Future
s and recursive Future
s
are new Future
s in the same threadpool. It isn’t obvious how to
await all necessary Future
s. Another problem is delayed
Future
s, in fact such behaviour can only be achieved by creating
blocking Future
s. As a workaround to such a situation, and to
avoid blocking Future
s, it is possible to use a Timer
, but
in fact that would be a sort of separate Future
pool.
Let’s observe Scalaz Task
more closely, and compare it to native
scala Future
s. With Task
we recieve a bit more control over
calculations. In fact Task
is not a concurrently running
computation, it’s a description of a computation, a lazy sequence of
instructions that may or may not include instructions to submit some of
calculations to thread pools. When you call
(tA: Task[A]).flatMap(a => tB: Task[B])
, the Task[B]
will by
default just continue running on the same thread that was already
executing Task[A]
. Calling Task.fork
pushes the task into the
thread pool. Scalaz Task
s operates with their own Future
implementation. Thus, having a stream of Task
s provides more
control over concurrent computations.
Some implementations were written, but each had synchronization problems. This attempt to get rid of the Scalaz dependency is not as trival as we had anticipated.
This is not a critical decision and, if necessary, we can come back to it later.
Consequences¶
All implementations based on Future
s are non-trival, and it
requires time to implement a correct write stream based on native
Future
s.
Here
are the two simplest and most transparent implementation variants, but
both have synchronization problems.
Scalaz Task
s seem to be better suited to our needs. Task
s
run on demand, and there is no requirement of instant submission of
Task
s into a thread pool. As described above, Task
is a lazy
sequence of intructions and some of them could submit calculations into
a thread pool. Currently it makes sense to depend on Scalaz.
0002 - HDFS Raster Layers¶
Context¶
Raster layer is a regular grid of raster tiles, represented as a
RDD[(K, V)]
where K
contains the column, row, and/or time.
Raster layer storage scheme must support two forms of queries with
different requirements:
- Distributed bounding box queries
- Minimum time between start of the query and time at which records are inspected for a match
- Minimum number of records discarded during query refinement stage
- Key/Value look-ups
- Clear mapping from any K to a single block file
- Efficient seeks to any random value in the layer
HDFS does not provide any active index management so we must carefully define a storage and indexing scheme that supports both of those cases.
Decision¶
The design builds on an established pattern of mapping a multi-dimensional tile key to a one-dimensional index using a space filling curve (SFC). This requires definition of bounding spatial extent and resolution but provides a total ordering for our records.
MapFiles¶
The layer will be sorted and written to multiple Hadoop MapFiles.
MapFile
consist of two files:
data
file is aSequenceFile
ofLongWritable
andBytesWritable
key/value pairs where the key is the SFC index and value bytes are Avro encodedVector[(K,V)]
where allK
s map to the given SFC index.index
file is aSequenceFile
which maps aLongWritable
in seen indata
file to its offset at some definedindexInterval
.
When MapFile
is open the index
is read fully and allows fast
random seeks into the data
file.
Each map file will consequently correspond to an SFC range from from first to last key stored in the file. Because the whole layer is sorted before being written we can assume that that ranges covered by the map files are exclusive.
It will be important to know which SFC range each file corresponds to and to avoid creating an addition overall index file we record the value of the first SFC index stored in the map file as part of the file name.
We experimented with using a bloom filter index, but it did not appear appropriate. Because each file will be restricted to be no bigger than a single HDFS block (64M/128M) the time to compute and store the bloom filter does not offer any speed improvements on per-file basis.
Single Value Queries¶
In a single value query we are given an instance of K
and we must
produce a corresponding V
or an error. The first step is to locate
the MapFile
which potentially contains (K, V)
record. Because
the layer records are indexed by their SFC index we map K
to
i: Long
and determine which file contains potential match by
examining the file listing and finding the file with maximum starting
index that is less than equal i
. At this point the MapFile
must
be opened and queried for the key.
The file listing is a comparatively expensive operation that is cached
when we create a Reader[K, V]
instance for a given layer from
HadoopValueReader
. Additionally as we maintain an LRU cache of
MapFiles
s as we open them to satisfy client requests. Because SFC
preserves some spatial locality of the records, geographically close
records are likely to be close in SFC index, and we expect key/value
queries to be geographically grouped, for instance requests from a map
viewer. This leads us to expect that MapFile
LRU cache can have a
high hit-rate.
Once we have located a record with matching SFC index we must verify
that it contains a matching K
. This is important because several
distinct values of K
can potentially map to the same SFC index.
Bounding Box Queries¶
To implement bounding box queries we extend FileInputFormat
, the
critical task is to filter the potential file list to remove any files
which do not have a possible match. This step happens on the Spark
driver process so it is good to perform this task without opening the
files themselves. Again we exploit the fact that file names contain the
first index written and assume that a file covers SFC range from that
value until the starting index of the file with the next closest index.
Next the query bounding box is decomposed into separate list of SFC
ranges. A single contiguous bounding box will likely decompose into many
hundreds or even thousands of SFC ranges. These ranges represent all of
the points on SFC index which intersect the query region. Finally we
discard any MapFile
whose SFC index range does not intersect the the
bounding box SFC ranges.
The job of inspecting each MapFile
is distributed to executors which
perform in-sync traversal of query SFC ranges and file records until the
end of each candidate file is reached. The resulting list of records is
checked against the original bounding box as a query refinement step.
Layer Writing¶
When writing a layer we will receive RDD[(K, V)] with Metadata[M]
with unknown partitioning. It is possible that two records which will
map to the same SFC index are in fact located on different partitions.
Before writing we must ensure that all records that map to a given SFC
index value reside on the same partition and we are able to write them
in order. This can be expressed as a
rdd.groupByKey(k => sfcIndex(k)).sortByKey
. However we can avoid the
double shuffle implied here by partitioning the rdd
on SFC index of
each record and defining partition breaks by inspecting dataset bounding
box which is a required part of M
. This approach is similar to using
RangePartitioner
but without the requirement of record sampling.
Critically we instruct Spark to sort the records by their SFC index
during the single shuffle cause by repartitioning.
With records thus partitioned and sorted we can start writing them to
MapFile
s. Each produced file will have the name of
part-r-<partition number>-<first record index>
. This is trivial to
do because we have the encoded record when we need to open the file for
writing. Additionally we keep track to number of bytes written to each
file so we can close it and roll over to a new file if the next record
written is about to cross the HDFS block boundary. Keeping files to a
single block is a standard advise that optimizes their locality, it is
now not possible to have a single file that is stored across two HDFS
nodes.
Consequences¶
This storage strategy provides key features which are important for performance:
- Writing is handled using a single shuffle, which is minimum required to get consistency
- Sorting the records allows us to view them as exclusive ranges and filter large number of files without opening them
- Storing index information in the file name allows us to perform query planning without using a secondary index or opening any of the individual files
- Individual files are guaranteed to never exceed block boundary
- There is a clear and efficient mapping from any
K
to a file potentially containing the matching record
Testing showed that HadoopValueReader
LRU caching strategy is
effective and it provides sufficient performance to support serving a
rendered tile layer to a web client directly from HDFS. It is likely
that this performance can be further improved by adding an actor-based
caching layer to re-order the requests and read MapFile
s in order.
Because each file represents an exclusive range and there is no layer
wide index to be updated there is a possibility of doing an incremental
layer update where we only change those MapFile
s which intersect
with the updated records.
0003 - Readers / Writers Multithreading¶
Context¶
Not all GeoTrellis readers and writers implemented using MR jobs
(Accumulo RDDReader, Hadoop RDDReaders), but using socket reads as well.
This (socket) this approach allows to define paralelizm level depending
on system configuration, like CPU, RAM, FS. In case of RDDReaders
,
that would be threads amount per rdd partition, in case of
CollectionReaders
, that would be threads amount per whole
collection.
All numbers are more impericall rather than have strong theory approvals. Test cluster works in a local network to exclude possible network issues. Reads tested on ~900 objects per read request of landsat tiles (test project).
Test cluster¶
- Apache Spark 1.6.2
- Apache Hadoop 2.7.2
- Apache Accumulo 1.7.1
- Cassandra 3.7
Decision¶
Was benchmarked functions calls performace depending on RAM / and CPU cores availble.
File Backend¶
FileCollectionReader
optimal (or reasonable in most cases) pool size
equal to cores number. As well there could be FS restrictions, that
depends on a certain FS settings.
- collection.reader: number of CPU cores available to the virtual machine
- rdd.reader / writer: number of CPU cores available to the virtual machine
Hadoop Backend¶
In case of Hadoop
we can use up to 16 threads without reall
significant memory usage increment, as HadoopCollectionReader
keeps
in cache up to 16 MapFile.Readers
by default (by design). However
using more than 16 threads would not improve performance signifiicantly.
- collection.reader: number of CPU cores available to the virtual machine
S3 Backend¶
S3
threads number is limited only by the backpressure, and that’s an
impericall number to have max performance and not to have lots of
useless failed requests.
- collection.reader: number of CPU cores available to the virtual machine, <= 8
- rdd.reader / writer: number of CPU cores available to the virtual machine, <= 8
Accumulo Backend¶
Numbers in the table provided are average for warmup calls. Same results valid for all backends supported, and the main really performance valueable configuration property is avaible CPU cores, results table:
4 CPU cores result (m3.xlarge):
Threads | Reads time (ms) | Comment |
---|---|---|
4 | ~15,541 | |
8 | ~18,541 | ~500mb+ of ram usage to previous |
32 | ~20,120 | ~500mb+ of ram usage to previous |
8 CPU cores result (m3.2xlarge):
Threads | Reads time (ms) | Comment |
---|---|---|
4 | ~12,532 | |
8 | ~9,541 | ~500mb+ of ram usage to previous |
32 | ~10,610 | ~500mb+ of ram usage to previous |
- collection.reader: number of CPU cores available to the virtual machine
Cassandra Backend¶
4 CPU cores result (m3.xlarge):
Threads | Reads time (ms) | Comment |
---|---|---|
4 | ~7,622 | |
8 | ~9,511 | Higher load on a driver node + (+ ~500mb of ram usage to previous) |
32 | ~13,261 | Higher load on a driver node + (+ ~500mb of ram usage to previous) |
8 CPU cores result (m3.2xlarge):
Threads | Reads time (ms) | Comment |
---|---|---|
4 | ~8,100 | |
8 | ~4,541 | Higher load on a driver node + (+ ~500mb of ram usage to previous) |
32 | ~7,610 | Higher load on a driver node + (+ ~500mb of ram usage to previous) |
- collection.reader: number of CPU cores available to the virtual machine
- rdd.reader / writer: number of CPU cores available to the virtual machine
Conclusion¶
For all backends performance result are pretty similar to Accumulo
and Cassandra
backend numbers. In order not to duplicate data these
numbers were omitted. Thread pool size mostly depend on CPU cores
availble, less on RAM. In order not to loose performane should not be
used threads more than CPU cores availble for java machine, otherwise
that can lead to significant performance loss.