Spatial Analytics in the Cloud

Peter Batty has a couple interesting blogs on Netezza and their recent spatial analytics release. Basically Netezza has developed a parallel system, hardware/software, for processing large spatial queries with scaling improvements of 1 to 2 orders of magnitude over Oracle Spatial. This is apparently accomplished by pushing some of the basic filtering and projection out to the hardware disk reader as well as more commonly used parallel Map Reduce techniques. Ref Google’s famous white paper: http://labs.google.com/papers/mapreduce-osdi04.pdf

One comment struck me as Rich Zimmerman mentioned that use of their system eliminated indexing and tuning essentially using the efficiency of brute force parallel processing. There is no doubt that their process is highly effective and successful given the number of client buy ins as well as Larry Ellison’s attention. I suppose, though, that an Oracle buy out is generally considered the gold standard of competitive pain when Oracle is on the field.

In Peter’s interview with Rich Zimmerman they discuss a simple scenario in which a large number of point records (in the billion range) are joined with a polygon set and processed with a spatial ‘point in polygon’ query. This is the type of analytics that would be common in real estate insurance risk analytics and is typically quite time consuming. Evidently Netezza is able to perform these types of analytics in near real time, which is quite useful in terms of evolving risk situations such as wildfire, hurricane, earthquake, flooding etc. In these scenarios domain components are dynamically changing polygons of risk, such as projected wind speed, against a relatively static point set.

Netezza performance improvement factors over Oracle Spatial were in the 100 to 500 range with Netezza SPU arrays being anywhere from 50 to 1000. My guess would be that the performance curve would be roughly linear. The interview suggested an amazing 500x improvement over Oracle Spatial with an unknown number of SPUs. It would be interesting to see a performance versus SPU array size curve.

I of course have no details on the Netezza hardware enhancements, but I have been fascinated with the large scale clustering potential of cloud computing, the poor man’s supercomputer. In the Amazon AWS model, node arrays are full power virtual systems with consequent generality, as opposed to the more specific SPUs of the Netezza systems. However, cloud communications has to have a much larger latency compared to an engineered multi SPU array. On the other hand, would the $0.1/hr instance cost compare favorably to a custom hardware array? I don’t know, but a cloud based solution would be more flexible and scale up or down as needed. For certain, cost would be well below even a single cpu Oracle Spatial license.

Looking at the sited example problem, we are faced with a very large static point set and a smaller dynamically changing polygon set. The problem is that assigning polygons of risk to each point requires an enormous number of ‘point in polygon’ calculations.

In thinking about the type of analytics discussed in Peter’s blog the question arises, how could similar spatial analytics be addressed in the AWS space? The following speculative discussion looks at the idea of architecting an AWS solution to the class of spatial analysis problems mentioned in the Netezza interview.

The obvious place to look is AWS Hadoop

Since Hadoop was originally written by the Apache Lucene developers as a way to improve text based search, it does not directly address spatial analytics. Hadoop handles the overhead of scheduling, automatic parallelization, and job/status tracking. The Map Reduce algorithm is provided by the developer as essentially two Java classes:

  Map – public static class MapClass extends MapReduceBase implements Mapper{ … }
  Reduce – public static class ReduceClass extends MapReduceBase implements Reducer { …. }

In theory, with some effort, the appropriate Java Map and Reduce classes could be developed specific to this problem domain, but is there another approach, possibly simpler?

My first thought, like Netezza’s, was to leverage the computational efficiency of PostGIS over an array of EC2 instances. This means dividing the full point set into smaller subsets, feeding these subset computations to their own EC2 instance and then aggregating the results. In my mind this involves at minimum:
 1. a feeder EC2 instance to send sub-tiles
 2. an array of EC2 computational instances
 3. a final aggregator EC2 instance to provide a result.

Points
One approach to this example problem is to treat the very large point set as an array tiled in the tiff image manner with a regular rectangular grid pattern. Grid tiling only needs to be done once or as part of the insert/update operation. The assumptions here are:
 a. the point set is very large
 b. the point set is relatively static
 c. distribution is roughly homogenous

If c is not the case, grid tiling would still work, but with a quad tree tiling pattern that subdivides dense tiles into smaller spatial extents. Applying the familiar string addressing made popular by Google Map and then Virtual Earth with its 0-3 quadrature is a simple approach to tiling the point table.

Fig 2 – tile subdivision

Recursively appending a char from 0 to 3 for each level provides a cell identifier string that can be applied to each tile. For example ’002301′ identifies a tile cell NW/NW/SW/SE/NW/NE. So the first step, analogous to spatial indexing, would be a pass through the point table calculating tile signatures for each point. This is a time consuming preprocess, basically iterating over the entire table and assigning each point to a tile. An initial density guess can be made to some tile depth. Then if the point tiles are not homogenous (very likely), tiles with much higher counts are subdivided recursively until a target density is reached.

Creating an additional tile geometry table during the tile signature calculations is a convenience for processing polygons later on. Fortunately the assumption that the point table is relatively static means that this process occurs rarely.

The tile identifier is just a string signature that can be indexed to pull predetermined tile subsets. Once completed there is a point tile set available for parallel processing with a simple query.
 SELECT point.wkb_geom, point.id
  FROM point
  WHERE point.tile = tilesignature;

Note that tile size can be manipulated easily by changing the WHERE clause slightly to reduce the length of the tile signature. In effect this combines 4 tiles into a single parent tile (’00230*’ = ’002300′ +’002301′ + ’002302′ + ’002303′ )
 SELECT point.wkb_geom, point.id
  FROM point
  WHERE
   (substring(tilesignature from 0 for( length(tilesignature)-1))||’*') LIKE point.tile;

Assuming the polygon geometry set is small enough, the process is simply feeding sub-tile point sets into ‘point in polygon’ replicated queries such as this PostGIS query:
 SELECT point.id
  FROM point, polygon
  WHERE
    point.wkb_geom && polygon.wkb_geom
   AND intersects(polygon.wkb_geom, point.wkb_geom);

This is where the AWS cloud computing could become useful. Identical CPU systems can be spawned using a preconfigured EC2 image with Java and PostGIS installed. A single feeder instance contains the complete point table with tile signatures as an indexed PostGIS table. A Java feeder class then iterates through the set of all tiles resulting from this query:
 ···SELECT DISTINCT point.tile FROM points ORDER BY point.tile

Using a DISTINCT query eliminates empty tiles as opposed to simply iterating over the entire tile universe. Again a relatively static point set indicates a static tile set. So this query only occurs in the initial setup. Alternatively a select on the tile table where the wkb_geom is not null would produce the same result probably more efficiently.

Each point set resulting from the query below is then sent to its own AWS EC2 computation instance.
 foreach tilesignature in DISTINCT point.tile
 {
  SELECT point.wkb_geom, point.id
  FROM points
  WHERE point.tile = tilesignature;
  }

Polygons

The polygon set also has assumptions:
 a. the polygon set is dynamically changing
 b. the polygon set is relatively small

Selecting a subset of polygons to match a sub-tile of points is pretty efficient using the tile table created earlier:
 SELECT polygon.wkb_geom
   FROM tile INNER JOIN polygon ON (polygon.tile = tile.id);
  WHERE tile.wkb_geom && polygon.wkb_geom;

Now the feeder instance can send a subset of points along with a matching subset of polygons to a computation EC2 instance.

Connecting EC2 instances

However, at this point I run into a problem! I was simply glossing over the “send” part of this exercise. The problem in most parallel algorithms is the communication latency between processors. In an ideal world shared memory would make this extremely efficient, but EC2 arrays are not connected this way. The cloud is not so efficient.

AWS services do include additional tools, Simple Storage Service, S3 , Simple Queue Service, SQS , and SimpleDB. S3 is a type of shared storage. SQS is a type of asynchronous message passing, while SimpleDB provides a cloud based DB capability on structured data.

S3 is appealing because writing collections of polygon and point sets should be fairly efficient, one S3 object per tile unit. At the other end, computation instances would read from the S3 input bucket and write results back to a result output bucket. The aggregator instance can then read from the output result bucket.

However, implicit in this S3 arrangement is a great deal of schedule messaging. SQS is an asynchronous messaging system provided for this type of problem. Since messages are being sent anyway, why even use S3? SQS messages are limited to 8k of text so they are not sufficient for large object communications. Besides point sets may not even change from one cycle to the next. The best approach is to copy each tile point set to an S3 Object, and separate S3 objects for polygon tile sets. Then add an SQS message to the queue. The computation instances read from the SQS message queue and load the identified S3 objects for processing. Note that point tile sets will only need to be written to S3 once at the initial pass. Subsequent cycles will only be updating the polygon tile sets. Hadoop would handle all of this in a robust manner taking into account failed systems and lost messages so it may be worth a serious examination.

SimpleDB is not especially useful in this scenario, because the feeder instance’s PostGIS is much more efficient at organizing tile objects. As long as the point and polygon tables will fit in a single instance it is better to rely on that instance to chunk the tiles and write them to S3, then alerting computational instances via SQS.

Once an SQS message is read by the target computation instance how exactly should we arrange the computation? Although tempting, use of PostGIS again brings up some problems. The point and polygon object sets would need to be added to two tables, indexed, and then queried with “point in polygon.” This does not sound efficient at all! A better approach might be to read S3 objects with their point and polygon geometry sets through a custom Java class based on the JTS Topology Suite

Our preprocess has already optimized the two sets using a bounds intersect based on a tile structure so plain iteration of all points over all polygons in a single tile should be fairly efficient. If the supplied chunk is too large for the brute force approach, a more sophisticated JTS extension class could index by polygon bbox first and then process with the Intersect function. This would only help if the granularity of the message sets was large. Caching tile point sets on the computational instances could also save some S3 reads reducing the computation setup to a smaller polygon tile set read.

This means that there is a bit of experimental tuning involved. A too fine grained tile chews up time in the messaging and S3 reads, while a coarse grained tile takes more time in the Intersect computation.

Finally each computation instance stores its result set to an S3 result object consisting of a collection of point.id and any associated polygon.ids that intersect the point. Sending an SQS mesage to the aggregator alerts it to availability of result updates. At the other end is an aggregator, which takes the S3 result objects and pushes them into an association table of point.id, polygon.id, or pip table. The aggregator instance can be a duplicate of the original feeder instance with its complete PostGIS DB already populated with the static point table and the required relation table (initially empty).

If this AWS system can be built and process in reasonable time an additional enhancement suggests itself. Assuming that risk polygons are being generated by other sources such as the National Hurricane Center, it would be nice to update the polygon table on an ongoing basis. Adding a polling class to check for new polygons and update our PostGIS table, would allow the polygons to be updated in near real time. Each time a pass through the point set is complete it could be repeated automatically reflecting any polygon changes. Continuous cycling through the complete tile set incrementally updates the full set of points.

At the other end, our aggregator instance would be continuously updating the point.id, polygon.id relation table one sub-tile at a time as the SQS result messages arrive. The decoupling afforded by SQS is a powerful incentive to use this asynchronous message communication. The effect is like a slippy map interface with subtiles continuously updating in the background, automatically registering risk polygon changes. Since risk polygons are time dependent it would also be interesting to keep timestamped histories of the polygons, providing for historical progressions by adding a time filter to our tile polygon select. The number of EC2 computation instances determine speed of these update cycles up to the latency limit of SQS and S3 read/writes.

Visualization of the results might be an interesting exercise in its own right. Continuous visualization could be attained by making use of the aggregator relation table to assign some value to each tile. For example in pseudo query code:
foreach tile in tile table {
···SELECT AVG(polygon.attribute)
  FROM point, pip, polygon WHERE pip.pointid = point.id AND polygon.id = pip.polygonid)
   AND point.tile = tilesignature;
 }

Treating each tile as a pixel lets the aggregator create polygon.value heat maps assigning a color and/or alpha transparency to each png image pixel. Unfortunately this would generally be a coarse image but it could be a useful kml GroundOverlay at wide zooms in a Google Map Control. These images can be readily changed by substituting different polygon.attribute values.

If Google Earth is the target visualization client using a Geoserver on the aggregator instance would allow a kml reflector to kick in at lower zoom levels to show point level detail as <NetworkLink> overlays based on polygon.attributes associated with each point. GE is a nice client since it will handle refreshing the point collection after each zoom or pan, as long as the view is within the assigned Level of Detail. Geoserver kml reflector essentially provides all this for almost free once the point featureType layer is added. Multiple risk polygon layers can also be added through Geoserver for client views with minimal effort.

COSTS

This is pure speculation on my part since I have not had time or money to really play with message driven AWS clusters. However, as an architecture it has merit. Adjustments in the tile granularity essentially adjust the performance up to the limit of SQS latency. Using cheap standard CPU instances would work for the computational array. However, there will be additional compute loads on the feeder and aggregator, especially if the aggregator does double duty as a web service. Fortunately AWS provides scaling classes of virtual hardware as well. Making use of a Feeder instance based on medium CPU adds little to system cost:
$0.20 – High-CPU Medium Instance
1.7 GB of memory, 5 EC2 Compute Units (2 virtual cores with 2.5 EC2 Compute Units each), 350 GB of instance storage, 32-bit platform
(note: a High CPU Extra Large instance could provide enough memory for an in memory point table – PostrgeSQL memory Tuning)

The aggregator end might benefit from a high cpu instance:
$0.80 – High-CPU Extra Large Instance
7 GB of memory, 20 EC2 Compute Units (8 virtual cores with 2.5 EC2 Compute Units each), 1690 GB of instance storage, 64-bit platform

A minimal system might be $0.20 feeder => five $0.10 computation instance => $0.80 aggregator = $1.10/hr plus whatever data transfer costs accrue. Keeping the system in a single zone to reduce SQS latency would be a good idea and in zone data costs are free.

Note that 5 computation instances are unlikely to provide sufficient performance. However, a nice feature of AWS cloud space is the adjustability of the configuration. If 5 is insufficient add more. If the point set is reduced drop off some units. If the polygon set increases substantially divide off the polygon tiling to its own high CPU instance. If your service suddenly gets slashdotted perhaps a load balanced webservice farm could be arranged? The commitment is just what you need and can be adjusted within a few minutes or hours not days or weeks.

Summary

Again this is a speculative discussion to keep my notes available for future reference. I believe that this type of parallelism would work for the class of spatial analytics problems discussed. It is particularly appealing to web visualization with continuous updating. The cost is not especially high, but then unknown pitfalls may await. Estimating four weeks of development and $1.50/hr EC2 costs leads to $7000 – $8000 for proof of concept development with an ongoing operational cost of about $1500/mo for a small array of 10 computational units. The class of problems involving very large point sets against polygons should be fairly common in insurance risk analysis, emergency management, and Telco/Utility customer base systems. Cloud arrays can never match the 500x performance improvement of Netezza, but cost should place it in the low end of the cost/performance spectrum. Maybe 5min cycles rather than 5sec are good enough. Look out Larry!

Comments are closed.