Pipeline¶
This notebook demonstrates a streamlined UrbanMapper workflow using the UrbanPipeline class, replicating the step-by-step example with PLUTO data in Downtown Brooklyn. We’ll define all steps upfront, execute them in one go, and visualise the results.
Essentially, this notebook covers the Basics/[7]urban_pipeline.ipynb example.
Data source used:
- PLUTO data from NYC Open Data. https://www.nyc.gov/content/planning/pages/resources/datasets/mappluto-pluto-change
from urban_mapper import UrbanMapper
from urban_mapper.pipeline import UrbanPipeline
# Initialise UrbanMapper
um = UrbanMapper()
Step 1: Define the Pipeline¶
Goal: Set up all components of the workflow in a single pipeline.
Input: Configurations for each UrbanMapper module.
Output: An UrbanPipeline object ready to process data.
We define each step—urban layer, loader, imputer, filter, enricher, and visualiser—with their specific roles:
- Urban Layer: Street intersections in Downtown Brooklyn.
- Loader: PLUTO data from CSV.
- Imputer: Fills missing coordinates.
- Filter: Trims data to the bounding box.
- Enricher: Adds average floors per intersection.
- Visualiser: Prepares an interactive map.
urban_layer = (
um.urban_layer.with_type("streets_intersections")
.from_place("Downtown Brooklyn, New York City, USA", network_type="drive")
.with_mapping(
longitude_column="longitude",
latitude_column="latitude",
# geometry_column=<geometry_column_name>", # Replace <geometry_column_name> with the actual name of your geometry column instead of latitude and longitude columns.
output_column="nearest_intersection",
threshold_distance=50,
) # Recall that with mapping is to tell `map_nearest_layer` how it should map the urban data with the urban layer.
.build()
)
# Note: For the documentation interactive mode, we only query 5000 records from the dataset. Feel free to remove for a more realistic analysis.
loader = (
um
.loader
.from_huggingface("oscur/pluto", number_of_rows=5000, streaming=True)
.with_columns(longitude_column="longitude", latitude_column="latitude")
# .with_columns(geometry_column=<geometry_column_name>") # Replace <geometry_column_name> with the actual name of your geometry column instead of latitude and longitude columns.
.build()
)
imputer = (
um.imputer.with_type("SimpleGeoImputer")
.on_columns(longitude_column="longitude", latitude_column="latitude")
# .on_columns(geometry_column=<geometry_column_name>") # Replace <geometry_column_name> with the actual name of your geometry column instead of latitude and longitude columns.
.build()
)
filter_step = um.filter.with_type("BoundingBoxFilter").build()
enricher = (
um.enricher.with_data(group_by="nearest_intersection", values_from="numfloors")
.aggregate_by(method="mean", output_column="avg_floors")
.build()
)
visualiser = (
um.visual.with_type("Interactive")
.with_style({"tiles": "CartoDB Positron", "colorbar_text_color": "gray"})
.build()
)
# Assemble the pipeline
pipeline = UrbanPipeline(
[
("urban_layer", urban_layer),
("loader", loader),
("imputer", imputer),
("filter", filter_step),
("enricher", enricher),
("visualiser", visualiser),
]
)
# Let's preview the urban pipeline we just created
pipeline.preview()
Urban Pipeline Preview:
Step 1: urban_layer
Urban Layer: OSMNXIntersections
CRS: EPSG:4326
Mappings:
Mapping:
- lon=longitude, lat=latitude, output=nearest_intersection
Step 2: loader
Loader: DataFrameLoader
Latitude Column: latitude
Longitude Column: longitude
Geometry Column:
CRS: EPSG:4326
Additional params: {'map_columns': None, 'file_path': '', 'input_dataframe': None}
Step 3: imputer
Imputer: SimpleGeoImputer
Action: Drop rows with missing 'latitude' or 'longitude'
Step 4: filter
Filter: BoundingBoxFilter
Action: Filter data to the bounding box of the urban layer
Step 5: enricher
Enricher Workflow:
├── Step 1: Data Input
│ ├── Group By: nearest_intersection
│ └── Values From: numfloors
├── Step 2: Action
│ ├── Type: Aggregate
│ ├── Aggregator: SimpleAggregator
│ ├── Method: mean
│ └── Output Column: avg_floors
└── Step 3: Enricher
├── Type: SingleAggregatorEnricher
└── Status: Ready
Step 6: visualiser
Visualiser: InteractiveVisualiser using Folium
Style: tiles: CartoDB Positron, colorbar_text_color: gray
Step 2: Execute the Pipeline¶
Goal: Process the data through all defined steps in one operation.
Input: The UrbanPipeline object from Step 1.
Output: A mapped GeoDataFrame and an enriched UrbanLayer with processed data.
The compose_transform method runs the entire workflow—loading data, imputing, filtering, mapping, and enriching—in a single call, ensuring seamless data flow.
mapped_data, enriched_layer = pipeline.compose_transform()
~> Loading: loader...
|████████ |
▁▃▅
1/5 [20%]
i
~> Loading: loader...
|████████ |
▂▄▆
1/5 [20%]
i
~> Loading: loader...
|████████ |
▃▅▇
1/5 [20%]
i
~> Loading: loader...
|████████ |
▄▆█
1/5 [20%]
i
~> Loading: loader...
|████████ |
▅▇▇
1/5 [20%]
i
~> Loading: loader...
|████████ |
▆█▆
1/5 [20%]
i
~> Loading: loader...
|████████ |
▇▇▅
1/5 [20%]
i
~> Loading: loader...
|████████ |
█▆▄
1/5 [20%]
i
~> Loading: loader...
|████████ |
▇▅▃
1/5 [20%]
i
~> Loading: loader...
|████████ |
▆▄▂
1/5 [20%]
i
~> Loading: loader...
|████████ |
▅▃▁
1/5 [20%]
i
~> Loading: loader...
|████████ |
▄▂▂
1/5 [20%]
i
~> Loading: loader...
|████████ |
▃▁▃
1/5 [20%]
i
~> Loading: loader...
|████████ |
▂▂▄
1/5 [20%]
i
~> Loading: loader...
|████████ |
▁▃▅
1/5 [20%]
i
~> Loading: loader...
|████████ |
▂▄▆
1/5 [20%]
i
~> Loading: loader...
|████████ |
▃▅▇
1/5 [20%]
i
~> Loading: loader...
|████████ |
▄▆█
1/5 [20%]
i
~> Loading: loader...
|████████ |
▅▇▇
1/5 [20%]
i
~> Loading: loader...
|████████ |
▆█▆
1/5 [20%]
i
~> Loading: loader...
|████████ |
▇▇▅
1/5 [20%]
i
~> Loading: loader...
|████████ |
█▆▄
1/5 [20%]
i
~> Loading: loader...
|████████ |
▇▅▃
1/5 [20%]
i
~> Loading: loader...
|████████ |
▆▄▂
1/5 [20%]
i
~> Loading: loader...
|████████ |
▅▃▁
1/5 [20%]
i
~> Loading: loader...
|████████ |
▄▂▂
1/5 [20%]
i
~> Loading: loader...
|████████ |
▃▁▃
1/5 [20%]
i
~> Loading: loader...
|████████ |
▂▂▄
1/5 [20%]
i
~> Loading: loader...
|████████ |
▁▃▅
1/5 [20%]
i
~> Loading: loader...
|████████ |
▂▄▆
1/5 [20%]
i
~> Loading: loader...
|████████ |
▃▅▇
1/5 [20%]
i
~> Loading: loader...
|████████ |
▄▆█
1/5 [20%]
i
~> Loading: loader...
|████████ |
▅▇▇
1/5 [20%]
i
~> Loading: loader...
|████████ |
▆█▆
1/5 [20%]
i
~> Loading: loader...
|████████ |
▇▇▅
1/5 [20%]
i
~> Loading: loader...
|████████ |
█▆▄
1/5 [20%]
i
~> Loading: loader...
|████████ |
▇▅▃
1/5 [20%]
i
~> Loading: loader...
|████████ |
▆▄▂
1/5 [20%]
i
~> Loading: loader...
|████████ |
▅▃▁
1/5 [20%]
i
~> Loading: loader...
|████████ |
▄▂▂
1/5 [20%]
i
~> Loading: loader...
|████████ |
▃▁▃
1/5 [20%]
i
~> Loading: loader...
|████████ |
▂▂▄
1/5 [20%]
i
~> Loading: loader...
|████████ |
▁▃▅
1/5 [20%]
i
~> Loading: loader...
|████████ |
▂▄▆
1/5 [20%]
i
~> Loading: loader...
|████████ |
▃▅▇
1/5 [20%]
i
~> Loading: loader...
|████████ |
▄▆█
1/5 [20%]
i
~> Loading: loader...
|████████ |
▅▇▇
1/5 [20%]
i
~> Loading: loader...
|████████ |
▆█▆
1/5 [20%]
i
~> Loading: loader...
|████████ |
▇▇▅
1/5 [20%]
i
~> Loading: loader...
|████████ |
█▆▄
1/5 [20%]
i
~> Loading: loader...
|████████ |
▇▅▃
1/5 [20%]
i
~> Loading: loader...
|████████ |
▆▄▂
1/5 [20%]
i
🗺️ Successfully composed pipeline with 5 steps!
|███████████████████████████████
Step 3: Visualise Results¶
Goal: Present the enriched data on an interactive map.
Input: The enriched layer from Step 2 and columns to display (avg_floors).
Output: An interactive Folium map showing average floors per intersection.
The pipeline’s visualise method leverages the pre-configured visualiser to generate the map directly from the enriched layer.
fig = pipeline.visualise(["avg_floors"])
fig # Display the interactive map
Step 4: Save and Load Pipeline¶
Goal: Preserve the pipeline for future use or sharing.
Input: A file path (./my_pipeline.dill) for saving.
Output: A saved pipeline file and a reloaded UrbanPipeline object.
Saving with save and loading with load allows you to reuse or distribute your workflow effortlessly.
# Save the pipeline
pipeline.save("./my_pipeline.dill")
# Load it back
loaded_pipeline = UrbanPipeline.load("./my_pipeline.dill")
# Preview the loaded pipeline
loaded_pipeline.preview()
# Visualise with the loaded pipeline
fig = loaded_pipeline.visualise(["avg_floors"])
Urban Pipeline Preview:
Step 1: urban_layer
Urban Layer: OSMNXIntersections
CRS: EPSG:4326
Mappings:
Mapping:
- lon=longitude, lat=latitude, output=nearest_intersection
Step 2: loader
Loader: DataFrameLoader
Latitude Column: latitude
Longitude Column: longitude
Geometry Column:
CRS: EPSG:4326
Additional params: {'map_columns': None, 'file_path': ''}
Step 3: imputer
Imputer: SimpleGeoImputer
Action: Drop rows with missing 'latitude' or 'longitude'
Step 4: filter
Filter: BoundingBoxFilter
Action: Filter data to the bounding box of the urban layer
Step 5: enricher
Enricher Workflow:
├── Step 1: Data Input
│ ├── Group By: nearest_intersection
│ └── Values From: numfloors
├── Step 2: Action
│ ├── Type: Aggregate
│ ├── Aggregator: SimpleAggregator
│ ├── Method: mean
│ └── Output Column: avg_floors
└── Step 3: Enricher
├── Type: SingleAggregatorEnricher
└── Status: Ready
Step 6: visualiser
Visualiser: InteractiveVisualiser using Folium
Style: tiles: CartoDB Positron, colorbar_text_color: gray
Conclusion¶
Well done! Using UrbanPipeline, you’ve efficiently processed and visualised PLUTO data with less code than the step-by-step approach. This method shines for its simplicity and reusability. Compare it with the Step-by-Step notebook for a detailed breakdown of each stage!