Source code for evpv.region

# coding: utf-8

import json
import os
import rasterio
from rasterio.mask import mask
import csv
import pandas as pd
import geopandas as gpd
import numpy as np
from shapely.geometry import shape, LineString, Point, Polygon, box, MultiPoint
from shapely.ops import transform, nearest_points, snap
from geopy.distance import geodesic, distance
from pyproj import Geod, Proj, Transformer
import folium
import branca.colormap as cm

from evpv import helpers as hlp

[docs] class Region: """ A class to represent the region of interest, subdivided into traffic zones populated with aggregated geospatial data. This class enables geospatial data input validation, subdivision into traffic zones, and the aggregation of data across these zones. Specifically, it facilitates the processing of population data from raster files, workplace and POI data from CSVs, and the generation of traffic zones based on configurable properties. The class also supports the export of traffic zone data for further analysis and visualization through CSV files and Folium maps. Key Features: - Input Data Validation: Validates input data and recalculates/repopulates traffic zones on-the-fly when key properties are modified. - Traffic Zone Creation: Defines the traffic zones based of zone target size and shape, and provides the option to crop zones to the boundaries of the defined region. - Data Aggregation: Aggregates population, workplace, and POI data within each traffic zone. - Export and Visualizatio: Offers traffic zone export to CSV and visualization via Folium maps Note: The region of interest is converted to a bounding box for zoning purposes. Set the `crop_to_region` parameter to `True` to retain only zones within the region's boundaries. """
[docs] def __init__(self, region_geojson: str, population_raster: str, workplaces_csv: str, pois_csv: str, traffic_zone_properties: dict): """ Initializes the Region class with region data and properties for traffic zoning. Args: region_geojson (str): Path to the region geojson file. population_raster (str): Path to the population density raster file. workplaces_csv (str): Path to the CSV file containing workplace locations. pois_csv (str): Path to the CSV file containing points of interest (POI) locations. traffic_zone_properties (dict): A dictionary of traffic zone properties with keys: 'target_size_km' (float): Desired zone size in kilometers. 'shape' (str): Shape of zones (e.g., "rectangle", "triangle"). 'crop_to_region' (bool): Whether to crop zones to the region of interest. """ print("=========================================") print(f"INFO \t Creation of a Region object.") print("=========================================") self._initialized = False # Initialization flag self.region_geometry = region_geojson self.population_density = population_raster self.workplaces = workplaces_csv self.pois = pois_csv self.traffic_zone_properties = traffic_zone_properties print(f"INFO \t Successful initialization of input parameters.") print(f"\t > Region area: {self.calculate_area_km2()} km²") self._traffic_zones = None # To store resulting traffic zones as a DataFrame # Initialize traffic zones immediately self._define_traffic_zones() self._populate_traffic_zones() self._initialized = True # Initialization flag
# Traffic zones property (read-only) @property def traffic_zones(self): """pd.DataFrame: Data representing the traffic zones in the region.""" return self._traffic_zones # Properties and Setters with automatic re-population of traffic zones on change @property def region_geometry(self)-> Polygon: """Polygon: The shapely polygon representing the region of interest.""" return self._region_geometry @region_geometry.setter def region_geometry(self, path: str): if not os.path.isfile(path): raise FileNotFoundError(f"The geojson at {path} does not exist.") with open(path, 'r') as f: geojson_data = json.load(f) # Convert the GEOJSON data to a shapely object geometry = geojson_data['features'][0]['geometry'] shapely_shape = shape(geometry) self._region_geometry = shapely_shape if self._initialized: self._populate_traffic_zones() # Re-populate zones if data has changed and initialized @property def population_density(self) -> str: """str: The population density data from the raster file.""" return self._population_density @population_density.setter def population_density(self, path: str): if not os.path.isfile(path): raise FileNotFoundError(f"The population density raster at {path} does not exist.") # Read the raster data using rasterio with rasterio.open(path) as src: # Get the bounds of the raster raster_bounds = src.bounds # Get the bounds of the region of interest region_bounds = self.region_geometry.bounds # Assuming _region is a shapely Polygon # Check if the raster covers the region of interest if not (raster_bounds[0] <= region_bounds[2] and # raster left <= region right raster_bounds[2] >= region_bounds[0] and # raster right >= region left raster_bounds[1] <= region_bounds[3] and # raster bottom <= region top raster_bounds[3] >= region_bounds[1]): # raster top >= region bottom raise ValueError("The population density raster does not cover the entire region of interest.") # If the check passes, assign the path self._population_density = path if self._initialized: self._populate_traffic_zones() # Re-populate zones if data has changed and initialized @property def workplaces(self) -> list: """list: A list of tuples representing workplaces (longitude, latitude)""" return self._workplaces @workplaces.setter def workplaces(self, path: str) -> None: if not os.path.isfile(path): raise FileNotFoundError(f"ERROR \t The CSV for workplaces at {path} does not exist.") workplaces_center_points = self._load_weighted_locations(path) self._workplaces = workplaces_center_points if self._initialized: self._populate_traffic_zones() # Re-populate zones if data has changed and initialized @property def pois(self) -> list: """list: A list of tuples representing points of interest (longitude, latitude)""" return self._pois @pois.setter def pois(self, path: str): if not os.path.isfile(path): raise FileNotFoundError(f"ERROR \t The CSV for POIs at {path} does not exist.") pois_center_points = self._load_weighted_locations(path) self._pois = pois_center_points if self._initialized: self._populate_traffic_zones() # Re-populate zones if data has changed and initialized @property def traffic_zone_properties(self) -> dict: """dict: A dictionary of traffic zone properties.""" return self._traffic_zone_properties @traffic_zone_properties.setter def traffic_zone_properties(self, properties: dict): if not isinstance(properties, dict): raise ValueError("Traffic zone properties must be a dictionary.") required_keys = ['target_size_km', 'shape', 'crop_to_region'] for key in required_keys: if key not in properties: raise KeyError(f"Missing required traffic zone property: '{key}'") if properties['target_size_km'] <= 0: raise ValueError("Target size must be a positive value.") self._traffic_zone_properties = properties if self._initialized: self._define_traffic_zones() # Re-define traffic zones self._populate_traffic_zones() # Re-populate zones if data has changed and initialized # Helpers def _load_weighted_locations(self, path: str) -> list: """ Loads weighted locations from a CSV file and returns a list of tuples representing (longitude, latitude) points. Args: path (str): Path to the CSV file containing locations. Returns: list: A list of tuples with repeated locations based on their weight. """ if not os.path.isfile(path): raise FileNotFoundError(f"The CSV at {path} does not exist.") center_points = [] with open(path, mode='r') as csv_file: csv_reader = csv.DictReader(csv_file) for row in csv_reader: name = row['name'] latitude = float(row['latitude']) longitude = float(row['longitude']) weight = int(row['weight']) if weight == 0: print(f"Warning: Skipping {name} row in {path} due to zero weight: {weight}") continue if weight < 0: raise ValueError(f"{name} row in {path} contains negative weight") # Duplicate the locations according to the weight for _ in range(weight): center_points.append((longitude, latitude)) return center_points # Zoning methods def _define_traffic_zones(self): """Defines traffic zones based on the specified properties in traffic_zone_properties.""" print(f"INFO \t Defining traffic zones...") shape_type = self._traffic_zone_properties["shape"] target_size_km = self._traffic_zone_properties["target_size_km"] crop_to_region = self._traffic_zone_properties["crop_to_region"] # Create a bbox and zones zoning according to the shape type if shape_type == "rectangle": self._traffic_zones = self._create_rectangular_zones(target_size_km) else: raise NotImplementedError(f"Shape type '{shape_type}' is not implemented.") # Delete zones outside the boundaries of the shapefile if crop_to_region: print(f"\t Removing zones outside region of interest (crop to region)...") # Identify the rows to remove rows_to_remove = self._traffic_zones[self._traffic_zones['centroid_is_inside_region'] == False] # Drop these rows from the original DataFrame self._traffic_zones = self._traffic_zones.drop(rows_to_remove.index) print(f"\t > Remaining zones: {len(self._traffic_zones)}") def _create_rectangular_zones(self, target_size_km): """Creates a bounding box around the region of interest and splits it into rectangles based on the target size.""" print(f"INFO \t Rectangular zoning...") # Define UTM projection based on the region centroid lon, lat = self.region_geometry.centroid.x, self.region_geometry.centroid.y utm_proj = Proj(proj="utm", zone=int((lon + 180) // 6) + 1, ellps="WGS84", preserve_units=False) transformer_to_utm = Transformer.from_proj("epsg:4326", utm_proj, always_xy=True) transformer_to_latlon = Transformer.from_proj(utm_proj, "epsg:4326", always_xy=True) # Transform the region geometry to UTM projection region_geom_projected = transform(transformer_to_utm.transform, self.region_geometry) minx, miny, maxx, maxy = region_geom_projected.bounds # Compute the number of segments based on the target size in meters target_size_m = target_size_km * 1000 n_rectangles_x = round((maxx - minx) / target_size_m) n_rectangles_y = round((maxy - miny) / target_size_m) # Calculate the width and height of each rectangle in the bounding box width = (maxx - minx) / n_rectangles_x height = (maxy - miny) / n_rectangles_y rectangle_area_km2 = (width * height) / 1000000 # Convert m² to km² print(f"\t > Bounding box split into {n_rectangles_x} x {n_rectangles_y} zones") print(f"\t > Width: {width/1000:.4f} km - Height: {height/1000:.4f} km: - Area: {rectangle_area_km2:.4f} km²") grid_data = [] for i in range(n_rectangles_x): for j in range(n_rectangles_y): # Define each rectangle in the UTM projection lower_left_x = minx + i * width lower_left_y = miny + j * height upper_right_x = lower_left_x + width upper_right_y = lower_left_y + height bbox_geom = box(lower_left_x, lower_left_y, upper_right_x, upper_right_y) # Transform the centroid back to latitude and longitude center_point_proj = bbox_geom.centroid center_lon, center_lat = transformer_to_latlon.transform(center_point_proj.x, center_point_proj.y) # Check if the centroid is within the region geometry (using original lat/lon coordinates) center_point_latlon = transform(transformer_to_latlon.transform, center_point_proj) is_inside = self.region_geometry.contains(center_point_latlon) # Transform the bounding box to lat/lon and store it in "geometry" bbox_geom_latlon = transform(transformer_to_latlon.transform, bbox_geom) grid_data.append({ 'id': f"{i}_{j}", 'centroid': (center_lat, center_lon), 'geometry': bbox_geom_latlon, 'area_km2': rectangle_area_km2, 'centroid_is_inside_region': is_inside }) # Convert to DataFrame return pd.DataFrame(grid_data) # Aggregation Methods def _populate_traffic_zones(self): """Populates traffic zones with aggregated data on population, workplaces, and POIs.""" print(f"INFO \t Aggregation of geodata into traffic zones...") if self.traffic_zones is None: raise ValueError("Traffic zones must be defined before populating them.") traffic_zone_data = [] for _, zone in self.traffic_zones.iterrows(): zone_geom = zone["geometry"] # Population bbox_gdf = gpd.GeoDataFrame({'geometry': [zone_geom]}, crs="EPSG:4326") population_raster_path = self.population_density # Path to the population raster file with rasterio.open(population_raster_path) as src: # Read the population raster # Clip the raster using the bounding box out_image, out_transform = mask(src, [bbox_gdf.geometry.values[0]], crop=True) out_meta = src.meta n_people = np.sum(out_image[out_image > 0]) # assuming no data values are <= 0 # Workplaces # Convert the list of center points to shapely Point objects points = [Point(lon, lat) for lon, lat in self.workplaces] # Count how many points are within the bounding box points_within_bbox = [point for point in points if point.within(zone_geom)] n_workplaces = len(points_within_bbox) # POIs (same logic as for workplaces) points = [Point(lon, lat) for lon, lat in self.pois] points_within_bbox = [point for point in points if point.within(zone_geom)] n_pois = len(points_within_bbox) traffic_zone_data.append({ "id": zone["id"], "geometric_center": zone["centroid"], "geometry": zone["geometry"], 'area_km2': zone["area_km2"], "n_people": n_people, "n_workplaces": n_workplaces, "n_pois": n_pois }) self._traffic_zones = pd.DataFrame(traffic_zone_data) # Update traffic zones print(f"\t > Population: {self.traffic_zones["n_people"].sum()}") print(f"\t > Workplaces: {self.traffic_zones["n_workplaces"].sum()}") print(f"\t > POIs: {self.traffic_zones["n_pois"].sum()}") # Region metrics
[docs] def centroid_coords(self) -> tuple: """ Get the coordinates of the centroid of the target area. Returns: tuple: The (latitude, longitude) coordinates of the centroid. """ centroid = self.region_geometry.centroid return centroid.y, centroid.x
[docs] def average_zone_area_km2(self) -> float: """Calculates the average area of the traffic zones in square kilometers. Returns: float: The area in square kilometers. """ # Calculate the average area average_area_km2 = self._traffic_zones['area_km2'].mean() return average_area_km2
[docs] def calculate_area_km2(self) -> float: """Calculates the area of the region in square kilometers. Returns: float: The area in square kilometers. """ # Get the centroid to determine the UTM zone lon, lat = self._region_geometry.centroid.x, self._region_geometry.centroid.y # Define UTM projection based on centroid utm_proj = Proj(proj="utm", zone=int((lon + 180) // 6) + 1, ellps="WGS84", preserve_units=False) transformer = Transformer.from_proj("epsg:4326", utm_proj, always_xy=True) # Transform the polygon to UTM utm_polygon = transform(transformer.transform, self._region_geometry) # Calculate area in km2 (area in meters / 1,000,000) area_km2 = utm_polygon.area / 1_000_000 return area_km2
# Export and visualization
[docs] def to_map(self, filepath: str) -> None: """ Saves a folium map with simulation setup properties, including region geometry, zone geometry, population, workplaces, and POIs. """ df = self.traffic_zones # 1. Create the base map with administrative boundaries m1 = hlp.create_base_map(self) # 2. Add Simulation bounding box minx, miny, maxx, maxy = self.region_geometry.bounds folium.Rectangle(bounds=[[miny, minx], [maxy, maxx]], fill=True, fill_opacity=0, color='blue', weight=2).add_to(m1) # 3. Add TAZ boundaries hlp.add_taz_boundaries(m1, df) # 4. Add Feature Groups for 'n_workplaces', 'n_people', and 'n_pois' hlp.add_colormapped_feature_group(m1, df, self, 'n_workplaces', 'Number of workplaces', 'Workplaces') hlp.add_colormapped_feature_group(m1, df, self, 'n_people', 'Number of people', 'Population') hlp.add_colormapped_feature_group(m1, df, self, 'n_pois', 'Number of POIs', 'POIs') # 5. Add Layer Control and save the map folium.LayerControl().add_to(m1) m1.save(filepath)
[docs] def to_csv(self, filepath: str) -> None: """ Saves a csv file with all data """ self.traffic_zones.to_csv(filepath)