Urban Pipeline¶
In this notebook, we'll show how to go from step by step, that is notebook [1-6] by bundling up all your workflow steps into one neat pipeline.
Data source used:
- PLUTO data from NYC Open Data. https://www.nyc.gov/content/planning/pages/resources/datasets/mappluto-pluto-change
Let’s get started! 🌟
import urban_mapper as um
from urban_mapper.pipeline import UrbanPipeline
mapper = um.UrbanMapper()
What’s the UrbanPipeline All About?¶
The UrbanPipeline class is like the conductor of an orchestra –– for the ML enthusiasts, it is trying to mimic what Scikit-Learn does with the Scikit Pipeline –– —it brings together all the UrbanMapper steps (loading data, creating layers, imputing missing bits, filtering, enriching, and visualising) and makes them play in harmony. You define your steps, pop them into the pipeline, and it handles the rest. It’s brilliant for keeping your workflow tidy and repeatable; yet not only, also shareable and reusable!
Setting Up a Simple Pipeline¶
Let’s build a pipeline that does the following:
- Loads PLUTO data from a CSV file.
- Creates a street intersections layer for Manhattan.
- Imputes missing coordinates.
- Filters data to the layer’s bounding box.
- Enriches the layer with average building floors.
- Sets up an interactive map to visualise it all.
We’ll define each step and slot them into our pipeline.
urban_layer = (
mapper.urban_layer.with_type("streets_intersections")
.from_place("Manhattan, New York City, USA", network_type="drive")
# With mapping is the equivalent of map_nearest_layer, yet this one below is prepping how should the map_nearest_layer be done by UrbanMpapper under the hood.
.with_mapping(
longitude_column="longitude", latitude_column="latitude",
# geometry_column=<geometry_column_name>", # Replace <geometry_column_name> with the actual name of your geometry column instead of latitude and longitude columns.
output_column="nearest_intersection",
threshold_distance=50, # Optional: sets a 50-meter threshold for nearest mapping.
)
.build()
)
# Note: For the documentation interactive mode, we only query 100000 records from the dataset. Feel free to remove for a more realistic analysis.
loader = (
mapper
.loader
.from_huggingface("oscur/pluto", number_of_rows=100000, streaming=True)
.with_columns("longitude", "latitude")
# .with_columns(geometry_column=<geometry_column_name>") # Replace <geometry_column_name> with the actual name of your geometry column instead of latitude and longitude columns.
.build()
)
imputer = (
mapper
.imputer
.with_type("SimpleGeoImputer")
.on_columns("longitude", "latitude")
# .on_columns(geometry_column=<geometry_column_name>") # Replace <geometry_column_name> with the actual name of your geometry column instead of latitude and longitude columns.
.build()
)
filter_step = mapper.filter.with_type("BoundingBoxFilter").build()
enricher = mapper.enricher.with_data(group_by="nearest_intersection", values_from="numfloors").aggregate_by(method="mean", output_column="avg_floors").build()
visualiser = mapper.visual.with_type("Interactive").with_style({"tiles": "CartoDB Positron", "colorbar_text_color": "gray"}).build()
# Assemble the pipeline
# Note that a pipeline's step is a tuple with a name and the step itself.
# Later one when loading the pipeline, or sharing it, anyone can use `.get_step("step_name")` to get the step, preview it, re-run it, etc.
pipeline = UrbanPipeline(
[
("urban_layer", urban_layer),
("loader", loader),
("imputer", imputer),
("filter", filter_step),
("enricher", enricher),
("visualiser", visualiser)
]
)
# Note that we can do this in a more concise way, but we are showing the steps for clarity.
# The concise way would be looking alike this for only with urban layer:
# pipeline = UrbanPipeline([
# ("urban_layer", (
# mapper.urban_layer
# .with_type("streets_intersections")
# .from_place("Downtown Brooklyn, New York City, USA", network_type="drive")
# .with_mapping(
# longitude_column="longitude",
# latitude_column="latitude",
## geometry_column=<geometry_column_name>", # Replace <geometry_column_name> with the actual name of your geometry column instead of latitude and longitude columns.
# output_column="nearest_intersection",
# threshold_distance=50
# )
# .build()
# )),
# # Add the other steps here
# ])
# Let's preview our urban pipeline workflow
pipeline.preview()
Running the Pipeline¶
Time to put it to work! We’ll use compose_transform to run the entire pipeline in one go—loading, imputing, filtering, mapping, enriching, all sorted. Then, we’ll visualise the results with a snazzy interactive map.
Note however that we could do this in two steps, first calling compose() and then transform(), but we are showing the two steps in one for simplicity.
# Execute the pipeline
mapped_data, enriched_layer = pipeline.compose_transform()
# Show the results
fig = pipeline.visualise(result_columns=["avg_floors"])
# result_columns is basically the columns that will be displayed in the map.
# If you want to display only one column, you can pass a string as well.
fig # Displays an interactive map in your notebook
Saving and Loading Your Pipeline¶
You can save your pipeline to a file and load it back later (that means every you would need the pipeline, it won't need to redo its entire workflow as saved). Here, we’ll save it, load it, and ensure it’s ready for further use.
pipeline.save("./my_pipeline.dill")
loaded_pipeline = UrbanPipeline.load("./my_pipeline.dill")
loaded_pipeline.preview()
Accessing the Enriched Layer for Machine Learning Analysis¶
Now, let’s retrieve the enriched urban layer using .get_layer() for machine learning analysis.
enriched_layer = loaded_pipeline.get_step("urban_layer")
enriched_gdf = enriched_layer.get_layer()
enriched_gdf.head()
Load many different datasets in the same pipeline¶
You can load many datasets. All the provided datasets should have the same columns provided in with_data, aggregate_by, etc.
The static visualizer looks into the enriched data with data_id column and uses it to show data with different markers.
urban_layer = (
mapper.urban_layer.with_type("streets_intersections")
.from_place("Manhattan, New York City, USA", network_type="drive")
# With mapping is the equivalent of map_nearest_layer, yet this one below is prepping how should the map_nearest_layer be done by UrbanMpapper under the hood.
.with_mapping(
longitude_column="longitude",
latitude_column="latitude",
# geometry_column=<geometry_column_name>", # Replace <geometry_column_name> with the actual name of your geometry column instead of latitude and longitude columns.
output_column="nearest_intersection",
threshold_distance=50, # Optional: sets a 50-meter threshold for nearest mapping.
)
.build()
)
loader1 = (
mapper
.loader
.from_huggingface("oscur/pluto", number_of_rows=1000, streaming=True)
.with_columns("longitude", "latitude")
# .with_columns(geometry_column=<geometry_column_name>") # Replace <geometry_column_name> with the actual name of your geometry column instead of latitude and longitude columns.
.build()
)
loader2 = (
mapper
.loader
.from_huggingface("oscur/taxisvis1M", number_of_rows=1000, streaming=True)
.with_columns("pickup_longitude", "pickup_latitude")
.with_map({"pickup_longitude": "longitude", "pickup_latitude": "latitude"})
# .with_columns(geometry_column=<geometry_column_name>") # Replace <geometry_column_name> with the actual name of your geometry column instead of latitude and longitude columns.
.build()
)
# Both imputer and filter will be applied only to loader2
imputer = (
mapper
.imputer
.with_data("taxi_data")
.with_type("SimpleGeoImputer")
.on_columns("longitude", "latitude")
# .on_columns(geometry_column=<geometry_column_name>") # Replace <geometry_column_name> with the actual name of your geometry column instead of latitude and longitude columns.
.build()
)
filter_step = mapper.filter.with_data("taxi_data").with_type("BoundingBoxFilter").build()
# Enricher will be applied to the dataset
enricher1 = mapper.enricher.with_data(group_by="nearest_intersection", values_from="numfloors", data_id="pluto_data").aggregate_by(method="mean", output_column="avg_floors").build()
enricher2 = mapper.enricher.with_data(group_by="pickup_segment", data_id="taxi_data").count_by(output_column="pickup_count").build()
visualiser = mapper.visual.with_type("Interactive").with_style({"tiles": "CartoDB Positron", "colorbar_text_color": "gray"}).build()
# Assemble the pipeline
# Note that a pipeline's step is a tuple with a name and the step itself.
# When more than one loader is defined, the pipeline creates a dictonary with all the loaded data and the step loader names as keys
# Later one when loading the pipeline, or sharing it, anyone can use `.get_step("step_name")` to get the step, preview it, re-run it, etc.
pipeline = UrbanPipeline(
[
("urban_layer", urban_layer),
("pluto_data", loader1),
("taxi_data", loader2),
("imputer", imputer),
("filter", filter_step),
("enricher1", enricher1),
("enricher2", enricher2),
("visualiser", visualiser)
]
)
# Let's preview our urban pipeline workflow
pipeline.preview()
Proceeding with Machine Learning Analysis¶
With the enriched layer in hand, let’s perform a simple machine learning task. We’ll use K-Means clustering to group street intersections based on the average number of building floors (avg_floors).
Step 1: Prepare the Data¶
Extract the relevant feature from the enriched layer and handle any missing values.
features = enriched_gdf[['avg_floors']]
features = features.dropna()
Step 2: Apply K-Means Clustering¶
Cluster the intersections into 3 groups based on avg_floors.
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=5, random_state=42)
kmeans.fit(features)
enriched_gdf['cluster'] = kmeans.labels_
Step 3: Visualise the Clusters¶
Visualise the clusters on a static map using Matplotlib.
import numpy as np
import matplotlib.pyplot as plt
# Extract the cluster centroids (average floors for each cluster) and flatten to 1D
centroids = kmeans.cluster_centers_.flatten()
# Get the indices that would sort the centroids from low to high
sorted_indices = np.argsort(centroids)
# Create a mapping from original cluster labels to new sorted labels
label_mapping = {original: new for new, original in enumerate(sorted_indices)}
# Apply the mapping to the GeoDataFrame to create sorted cluster labels
enriched_gdf['cluster_sorted'] = enriched_gdf['cluster'].map(label_mapping)
# Round the sorted centroids for display in the legend
rounded_centroids = [round(centroids[idx], 1) for idx in sorted_indices]
# Set up the figure and axis for the plot
fig, ax = plt.subplots(figsize=(10, 10))
# Plot the GeoDataFrame using the sorted cluster labels and 'viridis' colormap
scatter = enriched_gdf.plot(column='cluster_sorted', ax=ax, cmap='viridis')
# Label the axes and set the title
ax.set_xlabel('Longitude')
ax.set_ylabel('Latitude')
plt.title('Street Intersections in Manhattan Clustered by Average Building Floors')
# Add a color bar to the plot
cbar = scatter.get_figure().colorbar(scatter.get_children()[0], ax=ax)
# Set the ticks and labels for the color bar based on the sorted centroids
cbar.set_ticks(range(len(sorted_indices)))
cbar.set_ticklabels([f'Avg. Floors: {centroid}' for centroid in rounded_centroids])
# Display the plot
plt.show()
Exporting to JupyterGIS (Optional)¶
For collaborative exploration, you can export your pipeline to JupyterGIS. Check out JupyterGIS documentation for more details.
pipeline.to_jgis(
filepath="urban_analysis.JGIS",
urban_layer_name="Manhattan Intersections",
raise_on_existing=False,
)
Wrapping It Up¶
Smashing job! 🌟 You’ve built and run your first UrbanPipeline, saved it, loaded it back, retrieved the enriched layer, and performed a machine learning analysis. You can now reuse, share, or tweak this workflow as needed!