Source code for evpv.mobilitysimulator

# coding: utf-8

import json
import os
import rasterio
import pandas as pd
import geopandas as gpd
import numpy as np
from shapely.geometry import shape, LineString, Point, Polygon, box, MultiPoint
from shapely.ops import transform, nearest_points, snap
from geopy.distance import geodesic, distance
import openrouteservice
import time
import math
import csv
import folium
import branca.colormap as cm

from evpv.vehicle import Vehicle
from evpv.vehiclefleet import VehicleFleet
from evpv.region import Region
from evpv import helpers as hlp

[docs] class MobilitySimulator: """ A class to perform a mobility demand simulation for a given region. This class simulates the demand of commuting demand based on a given vehicle fleet and region, supporting various methods for allocating vehicles spatially and performing trip distribution using spatial interaction modelling. Key Features: - Vehicle allocation: Allocates vehicles across zones of a region based on user-defined methods, supporting the addition of randomness. - Trip distribution: Supports various spatial interaction models for trip distribution, such as the gravity and radiation models, allowing for customization of attraction and cost features to generate realistic commuting demand. - Self-calibrated gravity model: Integrates a self-calibrated gravity model that can be used without calibration, following the work of Lenormand et. al. 2016 - Routing and time/distance calculation: Enables flexible distance computation through both euclidean and road-based distances, with optional integration of OpenRouteService (ORS) API for advanced routing. - Distance offset: Allows for adding a distance offset added to the distance calculated for commuting. Note: This class assumes the region is pre-defined and vehicle allocation parameters are specified in a dictionary. The use of the `ors_key` parameter enables routing with real road network data, while omitting it defaults to basic distance calculations. """
[docs] def __init__(self, vehicle_fleet: VehicleFleet, region: Region, vehicle_allocation_params: dict, trip_distribution_params: dict): """ Initializes the MobilitySimulator class. Args: vehicle_fleet (VehicleFleet): An instance of the VehicleFleet class. region (Region): An instance of the Region class. vehicle_allocation_params (dict): Method for vehicle spatial allocation generation. Keys: 'method' (str): The method used for trip generation (e.g., 'population'). 'randomness' (float): A value between 0 and 1 to introduce randomness. trip_distribution_params (dict): Parameters specific to trip distribution. Keys: 'road_to_euclidian_ratio' (float): Ratio of road distance to Euclidean distance. 'ors_key' (str or None): Optional ORS (OpenRouteService) API key for routing. 'model_type' (str): Type of spatial interaction model ('gravity' or 'radiation'). 'attraction_feature' (str): Feature to attract trips (e.g., 'workplaces'). 'cost_feature' (str): Feature representing the cost of travel (e.g., 'distance_road'). 'distance_offset_km' (float): Distance added to the distance calculated through trip distribution. """ print("=========================================") print(f"INFO \t Creation of a MobilitySimulator object.") print("=========================================") self.vehicle_fleet = vehicle_fleet self.region = region self.vehicle_allocation_params = vehicle_allocation_params self.trip_distribution_params = trip_distribution_params print(f"INFO \t Successful initialization of input parameters.") self._flows = None # To store flows between traffic zones self._aggregated_zone_metrics = None # To store aggregated metrics for each zone (like vehicle-km or average distance travelled)
# Results properties (read-only) @property def flows(self) -> pd.DataFrame: """pd.DataFrame: The flows of the trip distribution model.""" return self._flows @property def aggregated_zone_metrics(self) -> pd.DataFrame: """pd.DataFrame: The aggregated_zone_metrics of the trip distribution model.""" return self._aggregated_zone_metrics # Properties and Setters @property def vehicle_fleet(self) -> VehicleFleet: """VehicleFleet: The vehicle fleet used in the simulation.""" return self._vehicle_fleet @vehicle_fleet.setter def vehicle_fleet(self, value: VehicleFleet): if not isinstance(value, VehicleFleet): raise ValueError("vehicle_fleet must be an instance of the VehicleFleet class.") self._vehicle_fleet = value @property def region(self) -> Region: """Region: The region used in the simulation.""" return self._region @region.setter def region(self, value: Region): if not isinstance(value, Region): raise ValueError("region must be an instance of the Region class.") self._region = value @property def vehicle_allocation_params(self) -> str: """str: The method used for trip generation (e.g., 'population').""" return self._vehicle_allocation_params @vehicle_allocation_params.setter def vehicle_allocation_params(self, value: str): # Check if `value` is a dictionary if not isinstance(value, dict): raise ValueError("Trip generation method must be a dictionary.") # Validate required keys if 'method' not in value: raise ValueError("Missing required trip generation parameter: 'method'.") if 'randomness' not in value: raise ValueError("Missing required trip generation parameter: 'randomness'.") # Validate types if not isinstance(value['method'], str) or not value['method']: raise ValueError("Trip generation method must be a non-empty string.") if not isinstance(value['randomness'], (float, int)) or not (0 <= value['randomness'] <= 1): raise ValueError("Randomness must be a float between 0 and 1.") # If all checks pass, set the parameter self._vehicle_allocation_params = value @property def trip_distribution_params(self) -> dict: """dict: Parameters used for trip distribution.""" return self._trip_distribution_params @trip_distribution_params.setter def trip_distribution_params(self, value: dict): # Define required keys and expected types for each key required_keys = { 'road_to_euclidian_ratio': (float, int), # ratio should be numeric 'ors_key': (str, type(None)), # optional API key, string or None 'model_type': str, # model type as a string 'attraction_feature': str, # feature name as a string 'cost_feature': str, # feature name as a string 'distance_offset_km': (float) } # Check if `value` is a dictionary if not isinstance(value, dict): raise ValueError("Trip distribution parameters must be a dictionary.") # Validate each required key is present with the correct type for key, expected_type in required_keys.items(): if key not in value: raise ValueError(f"Missing required trip distribution parameter: '{key}'.") if not isinstance(value[key], expected_type): raise TypeError(f"Parameter '{key}' must be of type {expected_type}.") # Additional checks for specific parameter values if value['road_to_euclidian_ratio'] <= 0: raise ValueError("road_to_euclidian_ratio must be a positive number.") # Check the connection with ORS if value['ors_key'] is not None: if not self.is_valid_ors_connection(value['ors_key']): raise ValueError("Impossible to connect to ORS. Check the online status of the service (https://status.openrouteservice.org/) and the validity of your ORS key.") # If all checks pass, set the parameter self._trip_distribution_params = value # Helpers
[docs] def is_valid_ors_connection(self, ors_key: str) -> bool: """Check if the ORS key is valid by creating a client. Args: ors_key (str): The ORS API key to validate. Returns: bool: True if the key is valid, False otherwise. """ try: # Attempt to create an ORS client with the provided key client = openrouteservice.Client(key=ors_key) # Perform a simple request to check validity routes = client.directions(((8.34234,48.23424),(8.34423,48.26424))) return True # If no exceptions, the key is valid except Exception as e: print(f"Error while trying to establish a connection with ORS: {e}") return False
# Vehicle allocation methods
[docs] def vehicle_allocation(self) -> pd.DataFrame: """Allocates vehicle (number of outflows) to traffic zones. This method returns a new DataFrame containing the `id` of each zone and the allocated number of outflows (`n_outflows`). Returns: pd.DataFrame: A DataFrame with zone metrics, including `id` and `n_outflows`. """ print(f"INFO \t Allocating vehicles to traffic zones...") method = self.vehicle_allocation_params['method'] randomness = self.vehicle_allocation_params['randomness'] zones = self.region.traffic_zones.copy() # Make a copy to avoid direct modification n_outflows = self.vehicle_fleet.total_vehicles method_vehicle_count = int(n_outflows * (1 - randomness)) random_vehicle_count = n_outflows - method_vehicle_count print(f"INFO \t Method-based allocation of {method_vehicle_count} vehicles...") # Initial vehicle allocation based on specified method if method == 'population': total_population = zones['n_people'].sum() zones['n_outflows'] = zones['n_people'].apply( lambda x: round(method_vehicle_count * (x / total_population)) if total_population > 0 else 0 ) else: print(f"ERROR \t The vehicle allocation method {method} is unknown.") # Random allocation for remaining vehicles if randomness != 0: print(f"INFO \t Random allocation of {random_vehicle_count} vehicles...") for _ in range(random_vehicle_count): selected_zone = zones.sample().index[0] zones.at[selected_zone, 'n_outflows'] += 1 # Correct the allocation to match total outflow count difference = n_outflows - zones['n_outflows'].sum() if difference != 0: print(f"ALERT \t Adjusting difference of {difference} vehicles due to rounding...") zones = self._adjust_vehicle_allocation(difference, zones) # Update the traffic zones with the new data self._aggregated_zone_metrics = zones[['id', 'n_outflows']] # Print print(f"\t > Total vehicles: {zones['n_outflows'].sum()}") print(f"\t > Vehicles per zone: {zones['n_outflows'].mean()} ± {zones['n_outflows'].std()}") # ASCII Histogram of vehicle distribution # Should be a close to a normal distribution when randomness is 100% bins = np.histogram_bin_edges(zones['n_outflows'], bins='auto') hist, bin_edges = np.histogram(zones['n_outflows'], bins=bins) print("\t > Distribution of zones per vehicle count:") for count, edge in zip(hist, bin_edges[:-1]): bar = "#" * (count * 2) # Scale the bar for better visibility print(f"\t {int(edge):>4} - {int(edge + (bin_edges[1] - bin_edges[0])):<4}: {bar}")
def _adjust_vehicle_allocation(self, difference: int, zones: pd.DataFrame) -> pd.DataFrame: """Adjusts vehicle allocation to match the total count. Args: difference (int): The difference between allocated and target outflow counts. zones (pd.DataFrame): DataFrame with current outflows per zone. Returns: pd.DataFrame: Adjusted zones DataFrame with corrected outflows. """ if difference > 0: # Add outflows based on current distribution for i in range(difference): total_current_outflows = zones['n_outflows'].sum() probabilities = zones['n_outflows'] / total_current_outflows selected_zone = zones.sample(weights=probabilities).index[0] zones.at[selected_zone, 'n_outflows'] += 1 elif difference < 0: # Remove outflows based on current distribution for i in range(-difference): total_current_outflows = zones['n_outflows'].sum() probabilities = zones['n_outflows'] / total_current_outflows # Prevent negative number of vehicles while True: selected_zone = zones.sample(weights=probabilities).index[0] if zones.at[selected_zone, 'n_outflows'] > 0: zones.at[selected_zone, 'n_outflows'] -= 1 break return zones # Trip distribution methods
[docs] def trip_distribution(self, batch_size: int = 49) -> None: """ Distributes trips between traffic zones based on a spatial interaction model. """ print(f"INFO \t Trip distribution...") # Get relevant parameters model = self.trip_distribution_params['model_type'] ors_key = self.trip_distribution_params['ors_key'] road_to_euclidian_ratio = self.trip_distribution_params['road_to_euclidian_ratio'] attraction_feature = self.trip_distribution_params['attraction_feature'] cost_feature = self.trip_distribution_params['cost_feature'] vkt_offset = self.trip_distribution_params['distance_offset_km'] if self._aggregated_zone_metrics is None: # Check if the Region contains the number of vehicles raise RuntimeError(f"ERROR \t Please perfom vehicle allocation before trip distribution.") # Create a dataframe with region attributes and vehicle outflows for convenience df = self.region.traffic_zones.copy() df['n_outflows'] = self._aggregated_zone_metrics['n_outflows'] # Add the new column 'n_outflows' to the copied dataframe # Calculate the distances and travel times between traffic zones using ORS or road_to_euclidian_ratio print(f"INFO \t Calculating distance/time between traffic zones...") flows_df = self._get_travel_data(df, ors_key, road_to_euclidian_ratio, batch_size) # Apply the spatial interaction model print(f"INFO \t Applying the '{model}' spatial interaction model...") for origin in flows_df['Origin'].unique(): # Filter rows for the current origin origin_rows = flows_df[flows_df['Origin'] == origin] origin_id = origin_rows.iloc[0]['Origin'] # SIM input: outgoing trips n_outflows = df.loc[df['id'] == origin_id, 'n_outflows'].values[0] # Get attraction and cost features att_origin, dest_att_list, cost_list = self._get_attraction_cost_data(df, origin_id, origin_rows, attraction_feature, cost_feature) # Calculate the flows based on the model flows = self._apply_spatial_interaction_model(model, n_outflows, att_origin, dest_att_list, cost_list) # Update the flows column where row_id equals 1 flows_df.loc[flows_df['Origin'] == origin_id, 'Flow'] = flows[:len(flows_df.loc[flows_df['Origin'] == origin_id])] # Append flow data self._flows = flows_df # Add aggregated data to the region object n_outflows = [] n_inflows = [] fkt_outflows = [] fkt_inflows = [] vkt_outflows = [] vkt_inflows = [] # Iterate over the TAZ and append data for index, row in df.iterrows(): # Append values related to the origin (outflows) out_df = flows_df[flows_df['Origin'] == row['id']].copy() out_df['Distance_Flow_Product'] = (out_df['Distance road (km)'] + vkt_offset) * out_df['Flow'] outflow_sum = out_df['Flow'].sum() distance_flow_product_sum_out = out_df['Distance_Flow_Product'].sum() n_outflows.append(outflow_sum) fkt_outflows.append(distance_flow_product_sum_out) if outflow_sum != 0: vkt_outflows.append(distance_flow_product_sum_out / outflow_sum) else: vkt_outflows.append(0) # Append values related to the destination (inflows) in_df = flows_df[flows_df['Destination'] == row['id']].copy() in_df['Distance_Flow_Product'] = (in_df['Distance road (km)'] + vkt_offset) * in_df['Flow'] inflow_sum = in_df['Flow'].sum() distance_flow_product_sum_in = in_df['Distance_Flow_Product'].sum() n_inflows.append(inflow_sum) fkt_inflows.append(distance_flow_product_sum_in) if inflow_sum != 0: vkt_inflows.append(distance_flow_product_sum_in / inflow_sum) else: vkt_inflows.append(0) # Add a new column with values from the list self._aggregated_zone_metrics['n_inflows'] = n_inflows self._aggregated_zone_metrics['fkt_outflows'] = fkt_outflows self._aggregated_zone_metrics['fkt_inflows'] = fkt_inflows self._aggregated_zone_metrics['vkt_outflows'] = vkt_outflows self._aggregated_zone_metrics['vkt_inflows'] = vkt_inflows # Print print(f"\t > Total vehicle-km (by road): {self.aggregated_zone_metrics['fkt_outflows'].sum()} km") print(f"\t > Average distance travelled per vehicle (by road): {self.aggregated_zone_metrics['fkt_outflows'].sum() / self.aggregated_zone_metrics['n_outflows'].sum()} km")
def _get_travel_data(self, df: pd.DataFrame, ors_key: str, road_to_euclidian_ratio: float, batch_size: int) -> pd.DataFrame: """ Get road distances and travel times between traffic zones using ORS or road_to_euclidian_ratio. Parameters: df (pd.DataFrame): DataFrame containing traffic zones with their geometric centers and IDs. ors_key (str): OpenRouteService API key for distance calculations. road_to_euclidian_ratio (float): Ratio to convert Euclidean distance to road distance if necessary. batch_size (int): Number of origins/destinations per ORS request. Returns: pd.DataFrame: DataFrame containing the flow data with 'Origin', 'Destination', 'Flow', 'Distance road (km)', and 'Distance euclidian (km)', with rows where 'Origin' equals 'Destination' removed. """ # Extract coordinates of the centroid coordinates = [[coord[1], coord[0]] for coord in df['geometric_center']] num_coordinates = len(coordinates) # Initialize the full matrices durations = np.zeros((num_coordinates, num_coordinates)) distances = np.zeros((num_coordinates, num_coordinates)) # If the user provides an ORS key if ors_key is not None: print(f"INFO \t Routing using ORS matrix request") # Check if the number of coordinates exceeds the batch size if num_coordinates > batch_size: print(f"ALERT \t {num_coordinates} origins/destinations: this number is greater than the batch size set at {batch_size}. Multiple ORS requests are needed.") # Initialize ORS client client = openrouteservice.Client(key=ors_key) # Replace with your ORS API key # Split the coordinates into manageable batches coordinate_batches = [coordinates[i:i + batch_size] for i in range(0, len(coordinates), batch_size)] # Make multiple requests to the ORS API for each batch for i, batch in enumerate(coordinate_batches): print(f"INFO \t Sending ORS request for {len(batch)} origins/destinations.") batch_matrix = client.distance_matrix( locations=batch, profile='driving-car', metrics=['duration', 'distance'], resolve_locations=True ) # Extract travel times and distances from the response batch_durations = batch_matrix['durations'] batch_distances = batch_matrix['distances'] # Determine the range of indices for this batch batch_start = i * batch_size batch_end = batch_start + len(batch) # Populate the corresponding section of the full matrices durations[batch_start:batch_end, batch_start:batch_end] = batch_durations distances[batch_start:batch_end, batch_start:batch_end] = batch_distances time.sleep(2) # Introduce a delay between requests to avoid rate limiting else: print(f"INFO \t Distance by road calculated using empirical road-to-euclidian ratio equal to {road_to_euclidian_ratio}") # Prepare lists to hold the data origin_ids = [] destination_ids = [] flows = [] travel_times = [] travel_distances = [] travel_distances_euclidian = [] # Populate the data based on the matrix response or on the road/euclidian distance ratio ors_errors = 0 for i, origin_id in enumerate(df['id']): for j, destination_id in enumerate(df['id']): origin_ids.append(origin_id) destination_ids.append(destination_id) flows.append(0.0) # Placeholder for the flow for all origin-destination pairs # Calculate euclidean travel distance and append data point1 = df.loc[df['id'] == origin_id, 'geometric_center'].iloc[0] point2 = df.loc[df['id'] == destination_id, 'geometric_center'].iloc[0] euclidian_distance = geodesic(point1, point2).kilometers travel_distances_euclidian.append(euclidian_distance) # For all zero distances (the case for all combinations if ORS calculation has not been performed) if (math.isnan(distances[i][j]) or distances[i][j] == .0) and (origin_id != destination_id): distance = euclidian_distance * road_to_euclidian_ratio travel_distances.append(distance) travel_times.append(distance / 30 * 60) # Convert to time # If ORS calculation was done, calculate the number of unresolved locations if ors_key is not None: ors_errors += 1 else: travel_distances.append(distances[i][j] / 1000) # Convert meters to kilometers travel_times.append(durations[i][j] / 60) # Convert seconds to minutes if ors_errors: print(f"ALERT \t ORS was unable to calculate distance or resolve {ors_errors} routes. Road to euclidian distance ratio used instead with a travel speed of 30 km/h. This could affect the model reliability!") # Create the resulting DataFrame flow_data = { 'Origin': origin_ids, 'Destination': destination_ids, 'Flow': flows, 'Distance road (km)': travel_distances, 'Distance euclidian (km)': travel_distances_euclidian } flows_df = pd.DataFrame(flow_data) # Remove rows where 'Origin' is equal to 'Destination' flows_df = flows_df[flows_df['Origin'] != flows_df['Destination']] return flows_df def _get_attraction_cost_data(self, df:pd.DataFrame, origin_id: str, origin_rows:pd.DataFrame, attraction_feature: str, cost_feature: str) -> tuple: """ Retrieve the attraction and cost data for the given origin. Args: df (pd.DataFrame): The DataFrame containing traffic zone data. origin_id (str): The ID of the origin zone. origin_rows (DataFrame): DataFrame rows for the current origin. attraction_feature (str): The feature used for attraction (e.g., 'population', 'workplaces'). cost_feature (str): The feature used for cost (e.g., 'distance_road', 'time_road'). Returns: tuple: Attraction value for origin, list of destination attractions, list of costs. """ if attraction_feature == 'population': att_origin = df.loc[df['id'] == origin_id, 'n_people'].values[0] dest_att_list = origin_rows['Destination'].apply(lambda x: df.loc[df['id'] == x, 'n_people'].values[0]).tolist() elif attraction_feature == 'workplaces': att_origin = df.loc[df['id'] == origin_id, 'n_workplaces'].values[0] dest_att_list = origin_rows['Destination'].apply(lambda x: df.loc[df['id'] == x, 'n_workplaces'].values[0]).tolist() else: raise ValueError(f"ERROR \t Attraction feature is unknown.") if cost_feature == 'distance_road': cost_list = origin_rows['Distance road (km)'].tolist() elif cost_feature == 'distance_euclidian': cost_list = origin_rows['Distance euclidian (km)'].tolist() else: raise ValueError(f"ERROR \t Cost feature is unknown.") return att_origin, dest_att_list, cost_list def _apply_spatial_interaction_model(self, model: str, n_outflows: int, att_origin: float, dest_att_list: list, cost_list: list) -> list: """ Apply the spatial interaction model to calculate flows. Args: model (str): The spatial interaction model to use. n_outflows (int): The number of outgoing trips. att_origin (float): The attraction value for the origin. dest_att_list (list): The list of attractions for each destination. cost_list (list): The list of costs associated with each destination. Returns: list: Calculated flows based on the model. """ # Use a dictionary on the instance to store persistent flags if not hasattr(self, '_apply_spatial_interaction_model_flags'): self._apply_spatial_interaction_model_flags = {'beta_info': False, 'unit_surface_alert': False} flags = self._apply_spatial_interaction_model_flags # Shorten access to the flags if model == 'gravity_power_1': return self.prod_constrained_gravity_power( origin_n_trips=n_outflows, dest_attractivity_list=dest_att_list, cost_list=cost_list, gamma=1) elif model == 'gravity_exp_1': return self.prod_constrained_gravity_exp( origin_n_trips=n_outflows, dest_attractivity_list=dest_att_list, cost_list=cost_list, beta=1) elif model == 'gravity_exp_01': return self.prod_constrained_gravity_exp( origin_n_trips=n_outflows, dest_attractivity_list=dest_att_list, cost_list=cost_list, beta=0.1) elif model == 'gravity_exp_scaled': unit_surface_km2 = self.region.average_zone_area_km2() beta = 0.3 * unit_surface_km2**(-0.18) # c.f. Lenormand et al 2016 # Display the beta info once if not flags['beta_info']: print(f"\t > Auto-scaled free parameter based on an average zone area of {unit_surface_km2} km²: Beta = {beta}") flags['beta_info'] = True # Display the unit surface area alert once if unit_surface_km2 < 5 and not flags['unit_surface_alert']: print("ALERT \t The average unit surface area is less than 5 km2, which may cause the scaling law of the gravity model to be invalid.") flags['unit_surface_alert'] = True return self.prod_constrained_gravity_exp( origin_n_trips=n_outflows, dest_attractivity_list=dest_att_list, cost_list=cost_list, beta=beta) elif model == 'radiation': return self.prod_constrained_radiation( origin_n_trips=n_outflows, origin_attractivity=att_origin, dest_attractivity_list=dest_att_list, cost_list=cost_list) else: raise ValueError(f"ERROR \t Spatial interaction model '{model}' is unknown.") # Spatial interaction models
[docs] def prod_constrained_gravity_exp(self, origin_n_trips: float, dest_attractivity_list: list[float], cost_list: list[float], beta: float) -> np.ndarray: r"""Estimates flows from origin to destinations using a production-constrained gravity model with exponential cost. This is a special case of a spatial interaction model, where the cost function$f(d_{ij})$ follows an exponential law. .. math:: f(d_{ij})=e^{-d_{ij} \beta} See prod_constrained_gravity_power() for more details on spatial interaction models for trip distribution. Args: origin_n_trips (float): The total number of trips originating from the origin. dest_attractivity_list (list[float]): A list of attractivity values for each destination. cost_list (list[float]): A list of cost values corresponding to each destination. beta (float, optional): The exponent applied to the cost in the calculation. Returns: np.ndarray: An array of estimated flows to each destination. """ flows = np.zeros(len(dest_attractivity_list)) norm_constant = .0 # Calculating raw flows and normalization constant for j in range(len(flows)): if cost_list[j] == 0: print(f"ALERT \t Cost function is NULL for some fluxes, setting the corresponding flux to zero", end='\r') flows[j] = 0 attractivity_over_cost = 0 else: attractivity_over_cost = dest_attractivity_list[j] / math.exp(beta * cost_list[j]) flows[j] = origin_n_trips * attractivity_over_cost norm_constant += attractivity_over_cost # Normalization flows = flows / norm_constant return flows
[docs] def prod_constrained_radius(self, origin_n_trips: float, dest_attractivity_list: list[float], cost_list: list[float], radius: float = 10) -> np.ndarray: """Estimates flows from origin to destinations using a production-constrained model based on distance radius. This is a simplified example of a spatial interaction model. In this case, the product of attractiveness and cost is set to 1 if the destination falls within a specified radius, and 0 otherwise. This function is purely hypothetical and has no practical application justification. It can serve as a test function since it directs all flows to destinations within the given radius. See prod_constrained_gravity_power() for more details on spatial interaction models for trip distribution. Args: origin_n_trips (float): The total number of trips originating from the origin. dest_attractivity_list (list[float]): A list of attractivity values for each destination. cost_list (list[float]): A list of cost values corresponding to each destination. radius (float, optional): The maximum distance for attractivity. Returns: np.ndarray: An array of estimated flows to each destination. """ flows = np.zeros(len(dest_attractivity_list)) norm_constant = .0 # Calculating raw flows and normalization constant for j in range(len(flows)): if cost_list[j] == 0: print(f"ALERT \t Cost function is NULL for some fluxes, setting the corresponding flux to zero", end='\r') flows[j] = 0 attractivity_over_cost = 0 else: if cost_list[j] <= radius: attractivity_over_cost = 1 else: attractivity_over_cost = 0 flows[j] = origin_n_trips * attractivity_over_cost norm_constant += attractivity_over_cost # Normalization flows = flows / norm_constant return flows
[docs] def prod_constrained_radiation(self, origin_n_trips: float, origin_attractivity: float, dest_attractivity_list: list[float], cost_list: list[float]) -> np.ndarray: r"""Estimates flows from origin to destinations using a production-constrained radiation model. This is a special case of an intervening opportunity spatial interaction model. In this case, the algorithm is more complex than gravity laws, as the cost function $f(d_{ij})$ includes the intervening opportunities located at a distance smaller than $d_{ij}$ .. math:: f(d_{ij}) = ((A_i+s_{ij}).(A_i+A_j+s_{ij}))^{-1} .. math:: s_{ij} = \sum_{k \neq i,j} A_k, ~ if ~ d_{ik}<d_{ij} with $s_{ij}$ being the intervening opportunities. Refer to prod_constrained_gravity_power() for other symbols. To our knowledge, this model has been initially described in the following publication: Simini, F., González, M., Maritan, A. et al. A universal model for mobility and migration patterns. Nature 484, 96–100 (2012). https://doi.org/10.1038/nature10856. While this model has the advantage of being parameter-free, it seems to not perform well at smaller scales. Args: origin_n_trips (float): The total number of trips originating from the origin. origin_attractivity (float): The attractivity of the origin. dest_attractivity_list (list[float]): A list of attractivity values for each destination. cost_list (list[float]): A list of cost values corresponding to each destination. Returns: np.ndarray: An array of estimated flows to each destination. """ # Step 1: Initialize variables flows = np.zeros(len(dest_attractivity_list)) norm_constant = .0 intervening_opportunity = .0 # Step 2: Create list of tuples to store initial order of cost_list and order them by distance indexed_cost_list = list(enumerate(cost_list)) # Sort the list of tuples based on the values sorted_indexed_cost_list = sorted(indexed_cost_list, key=lambda x: x[1]) # Prepare a list to store flows with their original indices calculated_indexed_flows = [] # Step 3: Calculate raw flows and normalization constant i = 0 for original_index, _ in sorted_indexed_cost_list: j = original_index # Use the original index from the sorted list num = dest_attractivity_list[j] # Origin attractivity cancels out when normalizing. den = (origin_attractivity + intervening_opportunity) * (origin_attractivity + dest_attractivity_list[j] + intervening_opportunity) if den == 0: print(f"ALERT \t Denominator of radiation model is NULL. Flow is set to zero.", end='\r') attractivity_over_cost = 0 calculated_flow = 0 else: attractivity_over_cost = num / den calculated_flow = origin_n_trips * attractivity_over_cost flows[j] = calculated_flow # Add intervening opportunity only if the next destination is farther away than the current one if i >= 1 and (sorted_indexed_cost_list[i - 1] < sorted_indexed_cost_list[i]): intervening_opportunity += dest_attractivity_list[j] norm_constant += attractivity_over_cost # Append the calculated flow with its original index to the list calculated_indexed_flows.append((original_index, flows[j])) i += 1 # Step 4: Sort back to the original order using the original indices original_order_flows = sorted(calculated_indexed_flows, key=lambda x: x[0]) # Extract the values from the tuples to get the final list final_flows = [value for index, value in original_order_flows] # Step 5: Normalize the flows final_flows = np.array(final_flows) / norm_constant return final_flows
# Overloading sum def __add__(self, other): print("***********************************************") print(f"INFO \t Summing two MobilitySimulator objects") print(f"\t Warning: This method implementation is not fully mature and may not handle all edge cases. Note that summing only concerns the results, input properties might make no sense anymore.") if not isinstance(other, MobilitySimulator): return NotImplemented if self.trip_distribution_params['distance_offset_km'] != .0 or other.trip_distribution_params['distance_offset_km'] != .0: print("ERROR \t Cannot handle the case where 'distance_offset_km' is not zero.") return NotImplemented # Sum flows if self._flows is not None and other._flows is not None: # Concatenate the flows combined_flows = pd.concat([self._flows, other._flows], ignore_index=True) # Group by Origin and Destination and sum the Flow column combined_flows = combined_flows.groupby(['Origin', 'Destination'], as_index=False).agg({ 'Flow': 'sum', # Sum the Flow column 'Distance road (km)': 'first', # Keep first or customize as needed 'Distance euclidian (km)': 'first' # Keep first or customize as needed }) else: ValueError("Run the two simulations before adding two objects.") # Create a new MobilitySimulator instance for the result new_simulator = MobilitySimulator( vehicle_fleet=self.vehicle_fleet, region=self.region, vehicle_allocation_params=self.vehicle_allocation_params, trip_distribution_params=self.trip_distribution_params ) # Assign the combined flows new_simulator._flows = combined_flows # Sum total vehicles from the vehicle_fleet of both instances total_vehicles = self.vehicle_fleet.total_vehicles + other.vehicle_fleet.total_vehicles new_simulator.vehicle_fleet.total_vehicles = total_vehicles # Update the total vehicles in the new instance # Initialize the aggregated metrics and update new_simulator._aggregated_zone_metrics = self._aggregated_zone_metrics new_simulator.update_aggregated_metrics() # Print print(f"\t > Total vehicle-km (by road): {new_simulator.aggregated_zone_metrics['fkt_outflows'].sum()} km") print(f"\t > Average distance travelled per vehicle (by road): {new_simulator.aggregated_zone_metrics['fkt_outflows'].sum() / new_simulator.aggregated_zone_metrics['n_outflows'].sum()} km") print("***********************************************") return new_simulator
[docs] def update_aggregated_metrics(self): """ Updates the aggregated zone metrics based on flows data. """ if self._flows is None: print("Flows or aggregated metrics are not initialized.") return # Loop through each zone ID in the aggregated metrics for index, row in self._aggregated_zone_metrics.iterrows(): zone_id = row['id'] # Calculate inflows (flows into the zone) inflows = self._flows[self._flows['Destination'] == zone_id]['Flow'].sum() # Calculate outflows (flows out of the zone) outflows = self._flows[self._flows['Origin'] == zone_id]['Flow'].sum() # Update inflows and outflows counts self._aggregated_zone_metrics.at[index, 'n_inflows'] = inflows self._aggregated_zone_metrics.at[index, 'n_outflows'] = round(outflows) # Calculate total kilometers (fkt) total_km_outflows = (self._flows[self._flows['Origin'] == zone_id]['Distance road (km)'] * self._flows[self._flows['Origin'] == zone_id]['Flow']).sum() total_km_inflows = (self._flows[self._flows['Destination'] == zone_id]['Distance road (km)'] * self._flows[self._flows['Destination'] == zone_id]['Flow']).sum() # Update total kilometers self._aggregated_zone_metrics.at[index, 'fkt_outflows'] = total_km_outflows self._aggregated_zone_metrics.at[index, 'fkt_inflows'] = total_km_inflows # Calculate average distance (vkt) if outflows > 0: self._aggregated_zone_metrics.at[index, 'vkt_outflows'] = total_km_outflows / outflows else: self._aggregated_zone_metrics.at[index, 'vkt_outflows'] = 0 if inflows > 0: self._aggregated_zone_metrics.at[index, 'vkt_inflows'] = total_km_inflows / inflows else: self._aggregated_zone_metrics.at[index, 'vkt_inflows'] = 0
# Export and visualization
[docs] def to_csv(self, filepath: str) -> None: """ Saves two CSV files with the main output data for flows and aggregated zone metrics. Args: filepath (str): Base path for the output files. Automatically appends suffixes "_flows" and "_aggregated_zone_metrics" before ".csv". """ # Remove any existing file extension filepath_without_ext, _ = os.path.splitext(filepath) # Save each dataframe with the respective suffix self._flows.to_csv(f"{filepath_without_ext}_flows.csv", index=False) # Rename columns in the aggregated zone metrics for clarity in the exported CSV renamed_metrics = self._aggregated_zone_metrics.rename(columns={ "n_outflows": "Nbr of Outflows", "n_inflows": "Nbr of Inflows", "fkt_outflows": "Fleet Kilometers Out (km)", "fkt_inflows": "Fleet Kilometers In (km)", "vkt_outflows": "Vehicle Kilometers Out (km)", "vkt_inflows": "Vehicle Kilometers In (km)" }) # Save aggregated zone metrics file renamed_metrics.to_csv(f"{filepath_without_ext}_aggregated_zone_metrics.csv", index=False)
[docs] def vehicle_allocation_to_map(self, filepath: str): df = self.aggregated_zone_metrics # Create the base map with admin boundaries m = hlp.create_base_map(self.region) # Add TAZ boundaries hlp.add_taz_boundaries(m, self.region.traffic_zones) # Add number of outflows hlp.add_colormapped_feature_group(m, df, self.region, 'n_outflows', 'Number of trips', 'Trips') # Add Layer Control and Save folium.LayerControl().add_to(m) m.save(filepath)
[docs] def trip_distribution_to_map(self, filepath: str, trip_id: int): # Create the base map with admin boundaries m = hlp.create_base_map(self.region) # Add TAZ boundaries hlp.add_taz_boundaries(m, self.region.traffic_zones) # Add trip distribution flows group = self.flows[self.flows['Origin'] == trip_id] if not group.empty: hlp.add_colormapped_feature_group(m, group, self.region, 'Flow', f'Number of trips from {trip_id}', 'Flows', zone_id_col='Destination') folium.LayerControl().add_to(m) m.save(filepath)