Analyze OSM data in spark

May 7, 2019·
Georg Heiler
Georg Heiler
· 2 min read

This article assumes that you already possess open-street-map data in a format similar to the one described here.

analyzing the OSM community

Load the dataframes:

val node = spark.sql("SELECT * FROM my_db.osm_node").drop("dt")
val way = spark.sql("SELECT * FROM my_db.osm_way").drop("dt")
val relation = spark.sql("SELECT * FROM my_db.osm_relation").drop("dt")

As only a single partition is ingested with this sample data, it is fine to drop the partitioning column as it is constant anyhow. In a more production ready setup remember to select a fitting partition value for all three tables.

How did the number of nodes evolve over the years? (inspired by):

node.withColumn("year", year(col("timestamp")))
  .groupBy("year")
  .agg(count("*").alias("nodes_per_year"))
  .orderBy(desc("year"))
  .show
If you need the fields: uid, user_sid, changeset to directly track the OSM community read https://osm-internal.download.geofabrik.de/index.html and get the raw data files from a source where these are not dropped.

generate geometry from the graph

A simple case - extract points (x,y) for each hospital:

node.filter(col("tags").getItem("amenity") === "hospital")
  .select('id, 'x_long_wgs84, 'y_lat_wgs84)
  .show(5, false)

results in:

+----------+------------------+------------------+
|        id|      x_long_wgs84|       y_lat_wgs84|
+----------+------------------+------------------+
| 482297303|         16.509893|         47.507452|
| 702520462|        12.9887397|        46.6995002|
|....
| 334634341|16.257896000000002|        48.0913958|
|5626135755|        15.4172255|        47.0597672|
+----------+------------------+------------------+

Now, a little bit more complex: find the geometries of highways. Usually posexplode (way.selectExpr("*", "posexplode(nodes) as (indexedNodeIndex,indexedNode)").drop("nodes")) is required to keep the original order of the elements inside the array. But in this case the index is already provided so a simple explode is good enough.

val highways = way.filter('tags.getItem("highway").isNotNull)
  .select($"id".as("wayId"), explode($"nodes").as("indexedNode"))
val intermediateNodes = node.select($"id".as("nodeId"), $"x_long_wgs84", $"y_lat_wgs84")

val wayGeometries = highways.join(intermediateNodes, $"indexedNode.nodeId" === $"nodeId")
  .groupBy("wayId")
  .agg(sort_array(collect_list(struct($"indexedNode.index", $"y_lat_wgs84", $"x_long_wgs84"))).as("geometry"))

The result looks like:

root
 |-- wayId: long (nullable = true)
 |-- geometry: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- index: integer (nullable = true)
 |    |    |-- y_lat_wgs84: double (nullable = true)
 |    |    |-- x_long_wgs84: double (nullable = true)

+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|wayId  |geometry                                                                                                                                                                                                                                                                                                                                                                  |
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|374036 |[[0,48.1923654,16.273967900000002], [1,48.192309800000004,16.2739489], [2,48.192234000000006,16.2739279], [3,48.192175500000005,16.2739176]]                                                                                                                                                                                                                              |
|2975945|[[0,47.559291900000005,13.8593094], [1,47.559302100000004,13.859371600000001], [2,47.55932120000001,13.859477], [3,47.559189200000006,13.859636], [4,47.558987,13.859803300000001], [5,47.5588545,13.859868800000001], [6,47.5585602,13.8601109], [7,47.5584739,13.860249600000001], [8,47.558379800000004,13.860469100000001], [9,47.558193900000006,13.860748500000001]]|
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Now adding some JTS magic or a couple of string operations easily yield either a geometry object for further processing or a WKT linestring.

Converting relations is left as an exercise for the reader. It is a bit more complex, but a good introduction can be found here as ways must be reassembled recursively.

Georg Heiler
Authors
senior data expert
My research interests include large geo-spatial time and network data analytics.