Analyze OSM data in spark

May 7, 2019·
Dr. Georg Heiler
Dr. Georg Heiler
· 2 min read
blog

This article assumes that you already possess open-street-map data in a format similar to the one described here.

analyzing the OSM community

Load the dataframes:

val node = spark.sql("SELECT * FROM my_db.osm_node").drop("dt")
val way = spark.sql("SELECT * FROM my_db.osm_way").drop("dt")
val relation = spark.sql("SELECT * FROM my_db.osm_relation").drop("dt")

As only a single partition is ingested with this sample data, it is fine to drop the partitioning column as it is constant anyhow. In a more production ready setup remember to select a fitting partition value for all three tables.

How did the number of nodes evolve over the years? (inspired by):

node.withColumn("year", year(col("timestamp")))
  .groupBy("year")
  .agg(count("*").alias("nodes_per_year"))
  .orderBy(desc("year"))
  .show
Note
If you need the fields: uid, user_sid, changeset to directly track the OSM community read https://osm-internal.download.geofabrik.de/index.html and get the raw data files from a source where these are not dropped.

generate geometry from the graph

A simple case - extract points (x,y) for each hospital:

node.filter(col("tags").getItem("amenity") === "hospital")
  .select('id, 'x_long_wgs84, 'y_lat_wgs84)
  .show(5, false)

results in:

+----------+------------------+------------------+
|        id|      x_long_wgs84|       y_lat_wgs84|
+----------+------------------+------------------+
| 482297303|         16.509893|         47.507452|
| 702520462|        12.9887397|        46.6995002|
|....
| 334634341|16.257896000000002|        48.0913958|
|5626135755|        15.4172255|        47.0597672|
+----------+------------------+------------------+

Now, a little bit more complex: find the geometries of highways. Usually posexplode (way.selectExpr("*", "posexplode(nodes) as (indexedNodeIndex,indexedNode)").drop("nodes")) is required to keep the original order of the elements inside the array. But in this case the index is already provided so a simple explode is good enough.

val highways = way.filter('tags.getItem("highway").isNotNull)
  .select($"id".as("wayId"), explode($"nodes").as("indexedNode"))
val intermediateNodes = node.select($"id".as("nodeId"), $"x_long_wgs84", $"y_lat_wgs84")

val wayGeometries = highways.join(intermediateNodes, $"indexedNode.nodeId" === $"nodeId")
  .groupBy("wayId")
  .agg(sort_array(collect_list(struct($"indexedNode.index", $"y_lat_wgs84", $"x_long_wgs84"))).as("geometry"))

The result looks like:

root
 |-- wayId: long (nullable = true)
 |-- geometry: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- index: integer (nullable = true)
 |    |    |-- y_lat_wgs84: double (nullable = true)
 |    |    |-- x_long_wgs84: double (nullable = true)

+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|wayId  |geometry                                                                                                                                                                                                                                                                                                                                                                  |
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|374036 |[[0,48.1923654,16.273967900000002], [1,48.192309800000004,16.2739489], [2,48.192234000000006,16.2739279], [3,48.192175500000005,16.2739176]]                                                                                                                                                                                                                              |
|2975945|[[0,47.559291900000005,13.8593094], [1,47.559302100000004,13.859371600000001], [2,47.55932120000001,13.859477], [3,47.559189200000006,13.859636], [4,47.558987,13.859803300000001], [5,47.5588545,13.859868800000001], [6,47.5585602,13.8601109], [7,47.5584739,13.860249600000001], [8,47.558379800000004,13.860469100000001], [9,47.558193900000006,13.860748500000001]]|
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Now adding some JTS magic or a couple of string operations easily yield either a geometry object for further processing or a WKT linestring.

Converting relations is left as an exercise for the reader. It is a bit more complex, but a good introduction can be found here as ways must be reassembled recursively.

Dr. Georg Heiler
Authors
senior data expert
Georg is a Senior data expert at Magenta and a ML-ops engineer at ASCII. He is solving challenges with data. His interests include geospatial graphs and time series. Georg transitions the data platform of Magenta to the cloud and is handling large scale multi-modal ML-ops challenges at ASCII.