Source code for evpv.chargingsimulator

# coding: utf-8

import json
import os
import rasterio
import pandas as pd
import geopandas as gpd
import random
import numpy as np
from shapely.geometry import shape, LineString, Point, Polygon, box, MultiPoint
from shapely.ops import transform, nearest_points, snap
from geopy.distance import geodesic, distance
import openrouteservice
import time
import math
import csv
import folium
import branca.colormap as cm

from evpv.vehicle import Vehicle
from evpv.vehiclefleet import VehicleFleet
from evpv.region import Region
from evpv.mobilitysimulator import MobilitySimulator
from evpv.pvsimulator import PVSimulator
from evpv import helpers as hlp

[docs] class ChargingSimulator: """ A class to simulate the spatio-temporal daily charging demand of an electric vehicle fleet within a specified region. This class models charging demand by integrating data from mobility simulations, region-specific characteristics, and vehicle fleet details, along with user-defined charging scenarios that include charging efficiency and behavior settings. Key Features: - Spatial Charging Demand: Computes the charging demand per traffic zone within the specified region, enabling geographically detailed assessments of charging needs. - Temporal Charging Demand: Generates a load profile for each electric vehicle across all traffic zones using stochastic allocation. The number of vehicles charging and charging times are based on a state-of-charge (SOC) decision model, with a "dumb charging" strategy by default (where vehicles charge at full charger power upon arrival). - Scenario-Based Modeling: Allows flexible scenario configuration, including charging locations (with varying charger power levels), and custom arrival times at home and work, to reflect realistic charging patterns. - Smart Charging Compatibility: Enables the application of smart charging strategies in a secondary processing step, with pre-implemented strategies such as peak shaving to manage grid impact. Note: This class assumes the use of a predefined region, vehicle fleet, and mobility demand simulator. Charging scenario parameters are provided as a dictionary, allowing flexible configuration of different charging behaviors. """
[docs] def __init__(self, region: Region, vehicle_fleet: VehicleFleet, mobility_demand: MobilitySimulator, scenario: dict, charging_efficiency: float): """ Initializes the ChargingSimulator class. Args: region (Region): An instance representing the geographic area for the simulation. vehicle_fleet (VehicleFleet): The electric vehicle fleet to be simulated. mobility_demand (MobilitySimulator): An instance of MobilitySimulator providing mobility demand data. scenario (dict): Configuration parameters for the charging scenario. Keys: 'charging_power' (float): The average power used for charging (in kW). 'max_charging_sessions' (int): Maximum number of charging sessions per day. 'charging_schedule' (str): Type of schedule for charging (e.g., 'daytime', 'nighttime'). charging_efficiency (float): The efficiency factor for charging, ranging between 0 and 1. Prints: Initialization details, including the chosen region, vehicle fleet characteristics, and scenario configuration. """ print("=========================================") print(f"INFO \t Creation of a ChargingSimulator object.") print("=========================================") self.vehicle_fleet = vehicle_fleet self.region = region self.mobility_demand = mobility_demand self.scenario = scenario self.charging_efficiency = charging_efficiency print(f"INFO \t Successful initialization of input parameters.") # Modeling results self._spatial_demand = None self._temporal_demand_vehicle_properties = None self._temporal_demand_profile = None self._temporal_demand_profile_aggregated = None
# Results properties (read-only) @property def spatial_demand(self) -> pd.DataFrame: """pd.DataFrame: The spatial charging demand.""" return self._spatial_demand @property def temporal_demand_vehicle_properties(self) -> pd.DataFrame: """pd.DataFrame: The properties of the vehicles that are charging today. """ return self._temporal_demand_vehicle_properties @property def temporal_demand_profile(self) -> pd.DataFrame: """pd.DataFrame: The temporal charging demand for each vehicle.""" return self._temporal_demand_profile @property def temporal_demand_profile_aggregated(self) -> pd.DataFrame: """pd.DataFrame: The total temporal charging demand (aggregated over each vehicle).""" return self._temporal_demand_profile_aggregated # Properties and Setters @property def region(self) -> Region: """Region: The region used in the simulation.""" return self._region @region.setter def region(self, value: Region): if not isinstance(value, Region): raise ValueError("region must be an instance of the Region class.") self._region = value @property def vehicle_fleet(self) -> VehicleFleet: """VehicleFleet: The vehicle fleet used in the simulation.""" return self._vehicle_fleet @vehicle_fleet.setter def vehicle_fleet(self, value: VehicleFleet): if not isinstance(value, VehicleFleet): raise ValueError("vehicle_fleet must be an instance of the VehicleFleet class.") self._vehicle_fleet = value @property def mobility_demand(self) -> MobilitySimulator: """MobilitySimulator: The mobility demand (i.e. flows between traffic zones) aggregated over all simulations provided.""" return self._mobility_demand @mobility_demand.setter def mobility_demand(self, value: MobilitySimulator): if not isinstance(value, MobilitySimulator): raise ValueError(f"mobility_demand object must be an instance of the MobilitySimulator class.") if not 'n_outflows' in value.aggregated_zone_metrics.columns: raise ValueError("MobilitySimulator object - Vehicle allocation has not been performed.") # Check if trip distribution has been performed if not 'Origin' in value.flows.columns: raise ValueError("MobilitySimulator object - Trip distribution has not been performed.") self._mobility_demand = value @property def scenario(self) -> dict: """dict: The charging scenario.""" return self._scenario @scenario.setter def scenario(self, value: dict): # Define required structure and expected types for each scenario required_keys = { 'home': { 'share': (float,), # share should be a float 'power_options_kW': list, # power_options should be a list 'arrival_time_h': list, # arrival_time should be a list }, 'work': { 'share': (float,), 'power_options_kW': list, 'arrival_time_h': list, }, 'poi': { 'share': (float,), 'power_options_kW': list } } # Validate input value if not isinstance(value, dict): raise ValueError("scenario must be a dictionary.") total_share = 0 # To track the total share for validation for key, expected_structure in required_keys.items(): if key not in value: raise ValueError(f"Missing required key: '{key}' in scenario.") # Check that each key's value is a dictionary and follows the expected structure scenario_part = value[key] if not isinstance(scenario_part, dict): raise ValueError(f"The value for '{key}' must be a dictionary.") # Validate each attribute in the scenario part for attr, expected_type in expected_structure.items(): if attr not in scenario_part: raise ValueError(f"Missing '{attr}' in '{key}' configuration.") if not isinstance(scenario_part[attr], expected_type): raise TypeError(f"'{attr}' in '{key}' must be of type {expected_type}.") # Sum up the shares and validate they sum to 1 total_share += scenario_part['share'] # Validate power options format and shares power_options = scenario_part['power_options_kW'] if not all(isinstance(option, list) and len(option) == 2 and isinstance(option[0], (float, int)) and isinstance(option[1], (float, int)) for option in power_options): raise TypeError(f"Each 'power_options_kW' entry in '{key}' must be a list of [power_level, share].") # Check that the sum of the shares in power_options is exactly 1 if sum(option[1] for option in power_options) != 1: raise ValueError(f"The sum of shares in 'power_options_kW' for '{key}' must be exactly 1.") # Final check that total share is equal to 1 if total_share != 1: raise ValueError("The sum of 'share' values across all keys must be exactly 1.") # If all checks pass, set the value self._scenario = value @property def charging_efficiency(self) -> float: """float: The charging efficiency (between 0 and 1).""" return self._charging_efficiency @charging_efficiency.setter def charging_efficiency(self, value: float): if value < 0.0 or value > 1.0: raise ValueError("The charging efficiency must be between 0 and 1") self._charging_efficiency = value # Spatial charging demand
[docs] def compute_spatial_demand(self) -> None: """Compute the spatial charging demand and store results in the _spatial_demand DataFrame. This method calculates the total and per-vehicle daily charging demand at each traffic zone based on the scenario configuration and zone-specific properties. It updates the spatial_demand attribute with the computed results. Returns: None: This method does not return a value; it updates the internal _spatial_demand DataFrame. """ print(f"INFO \t Computing the spatial charging demand...") # Step 1: Retrieve the shares for different charging locations (origin, destination, intermediate). share_home = self.scenario['home']['share'] share_work = self.scenario['work']['share'] share_poi = self.scenario['poi']['share'] # Step 2: Calculate the fleet-wide average EV consumption average_ev_consumption = self.vehicle_fleet.average_consumption() # Step 3: Compute the total and average demand per vehicle at POIs (specific case as we do not know where they charge) demand_at_poi = ( 2 * self.mobility_demand.aggregated_zone_metrics['fkt_inflows'].sum() * average_ev_consumption / self.charging_efficiency * share_poi ) tot_vehicles_at_poi = self.mobility_demand.aggregated_zone_metrics['n_inflows'].sum() * share_poi demand_per_vehicle_at_poi = (demand_at_poi / tot_vehicles_at_poi) if tot_vehicles_at_poi > 0 else 0 total_pois = self.region.traffic_zones['n_pois'].sum() # Step 4: Initialize data storage for zone-specific charging demand. data = [] # Step 5: Loop over each zone to calculate the charging demand at home, work, and pois. for index, row in self.mobility_demand.aggregated_zone_metrics.iterrows(): # Retrieve zone-specific information. zone_id = row['id'] geometry = self.region.traffic_zones.loc[self.region.traffic_zones['id'] == zone_id, 'geometry'].values[0] # Step 6: Compute the total energy demand (kWh) for each charging location Etot_home = (2 * row['fkt_outflows'] * share_home * average_ev_consumption / self.charging_efficiency) Etot_work = (2 * row['fkt_inflows'] * share_work * average_ev_consumption / self.charging_efficiency) # For poi, get the number of pois at this taz n_pois = self.region.traffic_zones.loc[self.region.traffic_zones['id'] == zone_id, 'n_pois'].values[0] Etot_poi = demand_at_poi * (n_pois / total_pois) # Step 7: Calculate the number of vehicles charging at home and destination for this TAZ. vehicles_home = round(row['n_outflows'] * share_home) vehicles_work = round(row['n_inflows'] * share_work) vehicles_poi = round(Etot_poi / demand_per_vehicle_at_poi) if Etot_poi > 0 else 0 # Store calculated vehicles temporarily for adjustment data.append({ 'id': zone_id, 'geometry': geometry, 'n_vehicles_home': vehicles_home, 'n_vehicles_work': vehicles_work, 'n_vehicles_poi': vehicles_poi, 'Etot_home_kWh': Etot_home, 'Etot_work_kWh': Etot_work, 'Etot_poi_kWh': Etot_poi }) # Calculate total vehicles from initial estimates total_vehicles = sum(item['n_vehicles_home'] + item['n_vehicles_work'] + item['n_vehicles_poi'] for item in data) # Desired total number of vehicles (this value should be set based on your requirements) desired_total_vehicles = self.vehicle_fleet.total_vehicles difference = desired_total_vehicles - total_vehicles if difference != 0: print(f"ALERT \t Randomly allocating {difference} vehicles due to rounding errors...") # Step 10: Adjust the number of vehicles by adding/removing one vehicle randomly while total_vehicles != desired_total_vehicles: difference = desired_total_vehicles - total_vehicles if difference > 0: # Adding a vehicle # Select a random zone and a random category to add a vehicle random_zone_index = np.random.randint(len(data)) selected_zone = data[random_zone_index] # Randomly select the category (home, work, poi) category = np.random.choice(['n_vehicles_home', 'n_vehicles_work', 'n_vehicles_poi']) if not selected_zone[category] == 0: # Ensure we do not add a vehicle where there is no vehicle selected_zone[category] += 1 total_vehicles += 1 elif difference < 0: # Removing a vehicle # Select a random zone and a random category to remove a vehicle random_zone_index = np.random.randint(len(data)) selected_zone = data[random_zone_index] # Randomly select the category (home, work, poi) category = np.random.choice(['n_vehicles_home', 'n_vehicles_work', 'n_vehicles_poi']) if selected_zone[category] > 0: # Ensure there's a vehicle to remove selected_zone[category] -= 1 total_vehicles -= 1 # Calculate average charging demand per vehicle for each charging location (kWh per vehicle). for item in data: item['E_per_vehicle_home_kWh'] = (item['Etot_home_kWh'] / round(item['n_vehicles_home'])) if round(item['n_vehicles_home']) > 0 else 0 item['E_per_vehicle_work_kWh'] = (item['Etot_work_kWh'] / round(item['n_vehicles_work'])) if round(item['n_vehicles_work']) > 0 else 0 item['E_per_vehicle_poi_kWh'] = demand_per_vehicle_at_poi if round(item['n_vehicles_poi']) > 0 else 0 # Step 11: Convert data into a DataFrame and store it in the spatial_demand attribute. self._spatial_demand = pd.DataFrame(data) # Step 12: Print main aggregated outputs print(f" \t > Average consumption: {average_ev_consumption:.3f} kWh/km") print(f" \t > Total charging needs: {self._spatial_demand['Etot_home_kWh'].sum() + self._spatial_demand['Etot_work_kWh'].sum() + self._spatial_demand['Etot_poi_kWh'].sum()} kWh") print(f" \t > Home: Total: {self._spatial_demand['Etot_home_kWh'].sum():.3f} kWh - " f"Per vehicle (weighted avg): {( (self._spatial_demand['E_per_vehicle_home_kWh'] * self._spatial_demand['n_vehicles_home']).sum() / self._spatial_demand['n_vehicles_home'].sum()) if self._spatial_demand['n_vehicles_home'].sum() > 0 else 0:.3f} kWh - " f"Vehicles: {self._spatial_demand['n_vehicles_home'].sum()}") print(f" \t > Work: Total: {self._spatial_demand['Etot_work_kWh'].sum():.3f} kWh - " f"Per vehicle (weighted avg): {( (self._spatial_demand['E_per_vehicle_work_kWh'] * self._spatial_demand['n_vehicles_work']).sum() / self._spatial_demand['n_vehicles_work'].sum()) if self._spatial_demand['n_vehicles_work'].sum() > 0 else 0:.3f} kWh - " f"Vehicles: {self._spatial_demand['n_vehicles_work'].sum()}") print(f" \t > POIs: Total: {self._spatial_demand['Etot_poi_kWh'].sum():.3f} kWh - " f"Per vehicle (weighted avg): {( (self._spatial_demand['E_per_vehicle_poi_kWh'] * self._spatial_demand['n_vehicles_poi']).sum() / self._spatial_demand['n_vehicles_poi'].sum()) if self._spatial_demand['n_vehicles_poi'].sum() > 0 else 0:.3f} kWh - " f"Vehicles: {self._spatial_demand['n_vehicles_poi'].sum()}")
# Temporal charging demand
[docs] def compute_temporal_demand(self, time_step: float, travel_time_home_work: float = 0.5, soc_threshold_mean: float = 0.6, soc_threshold_std_dev: float = 0.2): """ Computes the temporal charging demand for vehicles, assigning their properties and calculating the charging profile based on specified parameters. Parameters: time_step (float): The time interval (in hours) for the charging profile computation. travel_time_home_work (float): Average travel time from home to work (in hours). soc_threshold_mean (float): Mean state of charge (SoC) threshold for vehicle charging. soc_threshold_std_dev (float): Standard deviation for the SoC threshold. Returns: None """ print(f"INFO \t Computing the temporal charging demand...") if self._spatial_demand is None: raise RuntimeError(f"ERROR \t Please compute the spatial charging demand before the temporal one.") self._assign_vehicle_properties(travel_time_home_work, soc_threshold_mean, soc_threshold_std_dev) self._compute_charging_profile(time_step) self._compute_aggregated_charging_profile()
def _assign_vehicle_properties(self, travel_time_home_work: float = 0.5, soc_threshold_mean: float = 0.6, soc_threshold_std_dev: float = 0.2): """ Assigns energy demand, arrival times, and other properties to each vehicle based on the charging location, including calculations for daily charging demand and vehicle characteristics. Parameters: travel_time_home_work (float): Average travel time from home to work (in hours). soc_threshold_mean (float): Mean state of charge (SoC) threshold for vehicle charging. soc_threshold_std_dev (float): Standard deviation for the SoC threshold. Returns: None """ # Step 1: Assign vehicle properties ######################################################## print(f"INFO \t Assigning properties to each vehicle...") all_vehicle_properties = [] # To store DataFrames for each location vehicle_id_counter = 0 # Initialize a counter for unique vehicle IDs # Loop through all possible charging locations for charging_location in self.scenario.keys(): # Get the number of vehicles and scenario settings vehicle_counts = self.spatial_demand['n_vehicles_' + charging_location].sum() # Skip if there are no vehicles for this charging location if vehicle_counts == 0: continue scenario = self.scenario[charging_location] work_arrival_mean, work_arrival_std = self.scenario['work']['arrival_time_h'] home_arrival_mean, home_arrival_std = self.scenario['home']['arrival_time_h'] # Assign origin zone to vehicles origin_flows = self.mobility_demand.flows.groupby("Origin")["Flow"].sum().reset_index() zone_probabilities = origin_flows["Flow"] / origin_flows["Flow"].sum() assigned_origins = np.random.choice(origin_flows["Origin"], size=vehicle_counts, p=zone_probabilities) # Assign destination assigned_destinations = [] for origin in assigned_origins: # Filter rows where Origin matches the current vehicle's origin od_subset = self.mobility_demand.flows[self.mobility_demand.flows["Origin"] == origin] # Normalize flows to get destination probabilities probs = od_subset["Flow"] / od_subset["Flow"].sum() # Randomly choose a destination for this origin destination = np.random.choice(od_subset["Destination"], p=probs) assigned_destinations.append(destination) # Initialize a DataFrame for vehicle properties vehicle_properties = pd.DataFrame({ "vehicle_id": np.arange(vehicle_id_counter, vehicle_id_counter + vehicle_counts), # Unique vehicle IDs "name": np.empty(vehicle_counts, dtype=object), "charging_location": charging_location, "home_zone": assigned_origins, "work_zone": assigned_destinations, "days_between_charges": np.zeros(vehicle_counts), "charging_demand": np.zeros(vehicle_counts), "arrival_time": np.zeros(vehicle_counts), "departure_time": np.zeros(vehicle_counts), "idling_duration": np.zeros(vehicle_counts), "charging_power": np.zeros(vehicle_counts), "strategy": np.zeros(vehicle_counts), }) # Update the vehicle counter to ensure unique IDs vehicle_id_counter += vehicle_counts # Loop over each zone to calculate zone-specific travel distance distributions selected_distances = np.zeros(vehicle_counts) # Placeholder for distances for zone in np.unique(assigned_origins): zone_indices = np.where(assigned_origins == zone)[0] # Vehicles in this zone zone_demand = self.mobility_demand.flows[self.mobility_demand.flows['Origin'] == zone] # Aggregate flows and calculate distances and probabilities for this zone grouped_zone_demand = zone_demand.groupby('Distance road (km)')['Flow'].sum().reset_index() grouped_zone_demand['Probability'] = grouped_zone_demand['Flow'] / grouped_zone_demand['Flow'].sum() distances = grouped_zone_demand['Distance road (km)'].values probabilities = grouped_zone_demand['Probability'].values # Assign travel distances for vehicles in this zone selected_distances[zone_indices] = np.random.choice(distances, size=len(zone_indices), p=probabilities) # Randomly select vehicle types for all vehicles at once vehicle_types, vehicle_shares = zip(*self.vehicle_fleet.vehicle_types) selected_vehicles = np.random.choice(vehicle_types, size=vehicle_counts, p=vehicle_shares) # Populate the vehicle names directly vehicle_properties['name'] = [vehicle.name for vehicle in selected_vehicles] vehicle_properties['strategy'] = ["dumb" for vehicle in selected_vehicles] # Calculate daily charging demand in a vectorized way daily_charging_demand = 2 * selected_distances * np.array([vehicle.consumption_kWh_per_km for vehicle in selected_vehicles]) / self.charging_efficiency vehicle_properties['days_between_charges'] = np.vectorize(hlp.calculate_days_between_charges_single_vehicle)( daily_charging_demand, np.array([vehicle.battery_capacity_kWh for vehicle in selected_vehicles]), soc_threshold_mean * np.ones(vehicle_counts), soc_threshold_std_dev * np.ones(vehicle_counts) ) # Calculate charging demand in a vectorized way vehicle_properties['charging_demand'] = daily_charging_demand * vehicle_properties['days_between_charges'] # Prepare power options power_options = np.array(scenario['power_options_kW']) max_charging_powers = np.array([vehicle.max_charging_power_kW for vehicle in selected_vehicles]) # Select valid charging powers below the max charging power for each vehicle valid_power_choices = [ [power for power, _ in power_options if power <= max_power] for max_power in max_charging_powers ] # Randomly select charging power for each vehicle vehicle_properties['charging_power'] = [ random.choice(valid_powers) if valid_powers else None for valid_powers in valid_power_choices ] # Handle arrival and departure times based on charging location if charging_location != 'poi': arrival_times = np.random.normal(loc=scenario['arrival_time_h'][0], scale=scenario['arrival_time_h'][1], size=vehicle_counts) vehicle_properties['arrival_time'] = arrival_times % 24 # Use modulo for wrapping else: random_arrival = np.random.normal(work_arrival_mean, work_arrival_std / 2, size=vehicle_counts) random_departure = np.random.normal(home_arrival_mean, home_arrival_std / 2, size=vehicle_counts) vehicle_properties['arrival_time'] = (np.random.uniform(random_arrival, random_departure) % 24) if charging_location == 'home': work_arrival_times = np.random.normal(work_arrival_mean, work_arrival_std, size=vehicle_counts) vehicle_properties['departure_time'] = (work_arrival_times - travel_time_home_work) % 24 # Wrap with modulo elif charging_location == 'work': home_arrival_times = np.random.normal(home_arrival_mean, home_arrival_std, size=vehicle_counts) vehicle_properties['departure_time'] = (home_arrival_times - travel_time_home_work) % 24 elif charging_location == 'poi': charging_durations = vehicle_properties['charging_demand'] / vehicle_properties['charging_power'] vehicle_properties['departure_time'] = (vehicle_properties['arrival_time'] + charging_durations) % 24 # Wrap with modulo # Vectorized calculation of idling time arrival_times = vehicle_properties['arrival_time'].values departure_times = vehicle_properties['departure_time'].values # Calculate idling time based on conditions vehicle_properties['idling_duration'] = np.where( departure_times >= arrival_times, departure_times - arrival_times, (24 - arrival_times) + departure_times ) # Append the DataFrame to the list all_vehicle_properties.append(vehicle_properties) # Compute statistics for this scenario num_vehicles = len(vehicle_properties) avg_demand= vehicle_properties['charging_demand'].mean() days= vehicle_properties['days_between_charges'].mean() vehicle_counts = vehicle_properties['name'].value_counts().to_dict() # Step 2: Select and filter only vehicles charging today ######################################################## print(f"INFO \t Selecting and filtering only vehicles charging today...") # Concatenate all location-specific DataFrames into a single DataFrame vehicle_properties = pd.concat(all_vehicle_properties, ignore_index=True) charging_probability = 1 / vehicle_properties['days_between_charges'] # Determine if each vehicle will charge today using vectorized random sampling charging_today = np.random.rand(len(charging_probability)) <= charging_probability # Modify the original DataFrame to keep only vehicles charging today vehicle_properties = vehicle_properties[charging_today] # Reset the index for the modified DataFrame vehicle_properties.reset_index(drop=True, inplace=True) print(f"\t > Number of vehicles charging: {len(vehicle_properties)}") self._temporal_demand_vehicle_properties = vehicle_properties def _compute_charging_profile(self, time_step: float): """ Creates a 24-hour charging profile for each vehicle, detailing the charging power at each specified time step. Parameters: time_step (float): The time step interval (in hours) for which the charging profile is computed. Returns: None """ print(f"INFO \t Computing the charging profile for each vehicle...") vehicle_properties = self.temporal_demand_vehicle_properties # Define the time intervals num_time_steps = int(24 / time_step) time_steps = [(i * time_step) for i in range(num_time_steps)] # Create a list to hold individual vehicle charging profiles profiles = [] # Precompute relevant values charging_powers = vehicle_properties["charging_power"].values arrival_times = vehicle_properties["arrival_time"].values idling_durations = vehicle_properties["idling_duration"].values charging_demands = vehicle_properties["charging_demand"].values # Calculate charging durations and indices charging_durations = charging_demands / charging_powers start_indices = np.round(arrival_times / time_step).astype(int) end_indices = start_indices + np.round(charging_durations / time_step).astype(int) # Preliminary checks if np.any(charging_durations <= time_step): num_alerts = np.sum(charging_durations <= time_step) print(f"ALERT \t {num_alerts} charging durations are smaller than the timestep. Charging demand may not be met.") # Check how many vehicles have a charging duration greater than their idling time vehicles_with_long_charging = (charging_durations - 0.01) > idling_durations # Adding a small correction to account for rounding issues count_long_charging = vehicles_with_long_charging.sum() if count_long_charging > 0: print(f"ALERT \t {count_long_charging} vehicle(s) require charging durations longer than their idling periods. Charging continues beyond the expected idling time.") # Process each vehicle for idx, vehicle_id in enumerate(vehicle_properties["vehicle_id"]): # Initialize profile with zeros profile = np.zeros(num_time_steps) if end_indices[idx] >= num_time_steps: # Wrap-around case # Charging from start index to the end of the day profile[start_indices[idx]:] += charging_powers[idx] # Charging from the start of the day to the wrap end wrap_end = end_indices[idx] % num_time_steps profile[:wrap_end] += charging_powers[idx] else: # No-wrap case profile[start_indices[idx]:end_indices[idx]] += charging_powers[idx] # Append the vehicle profile to the profiles list profiles.append([vehicle_id] + profile.tolist()) # Create DataFrame from profiles profile_df = pd.DataFrame(profiles, columns=["vehicle_id"] + time_steps) # Transpose the DataFrame and reset the index profile_df = profile_df.set_index('vehicle_id').transpose().reset_index() profile_df.columns = ['time'] + profile_df.columns[1:].tolist() # Rename first column to 'time' self._temporal_demand_profile = profile_df def _compute_aggregated_charging_profile(self): """ Creates an aggregated charging profile, summing the charging power of all vehicles across different locations (home, work, poi) for each time step. It also includes statistics on the number of vehicles present and charging at each location. Parameters: time_step (float): The time step interval (in hours) for which the aggregated charging profile is computed. Returns: None """ print("INFO \t Computing the aggregated charging profile by location...") # Get the individual charging profile and vehicle properties DataFrames vehicle_properties = self.temporal_demand_vehicle_properties # contains 'vehicle_id', 'arrival_time', 'departure_time', 'charging_location' charging_profile = self.temporal_demand_profile # contains 'vehicle_id', 'time', and power data # Convert time columns to float (if not already) time_step = charging_profile['time'][1] - charging_profile['time'][0] vehicle_properties['arrival_time'] = vehicle_properties['arrival_time'].astype(float) vehicle_properties['departure_time'] = vehicle_properties['departure_time'].astype(float) # Reshape the charging profile to long format with each vehicle's charging power at each time interval melted_profile = charging_profile.melt(id_vars='time', var_name='vehicle_id', value_name='charging_power') melted_profile['vehicle_id'] = melted_profile['vehicle_id'].astype(int) # Merge with vehicle properties to associate each vehicle's charging with its location merged_df = melted_profile.merge(vehicle_properties[['vehicle_id', 'charging_location', 'arrival_time', 'departure_time']], on='vehicle_id') # Vectorized determination of whether each vehicle is plugged in based on arrival and departure times arrival_time = merged_df['arrival_time'] departure_time = merged_df['departure_time'] time = merged_df['time'] merged_df['is_plugged'] = ((departure_time > arrival_time) & (arrival_time <= time) & (time < departure_time)) | \ ((departure_time <= arrival_time) & ((time < departure_time) | (time >= arrival_time))) # Aggregate charging power by time and location aggregated_profile = (merged_df .groupby(['time', 'charging_location'])['charging_power'] .sum() .unstack(fill_value=0) .reset_index() .rename_axis(None, axis=1)) # Ensure all required columns are present for col in ['home', 'work', 'poi']: if col not in aggregated_profile.columns: aggregated_profile[col] = 0 # Add missing location columns with zeroes if no data for location # Add the 'total' column summing across 'home', 'work', and 'poi' aggregated_profile['total'] = aggregated_profile[['home', 'work', 'poi']].sum(axis=1) # Count the number of vehicles charging (non-zero power) by location and time vehicle_charging_counts = (merged_df[merged_df['charging_power'] > 0] .groupby(['time', 'charging_location'])['vehicle_id'] .nunique() .unstack(fill_value=0) .reset_index()) # Ensure all required columns are present in vehicle_charging_counts for col in ['home', 'work', 'poi']: if col not in vehicle_charging_counts.columns: vehicle_charging_counts[col] = 0 # Rename columns to indicate they are counts of charging vehicles vehicle_charging_counts = vehicle_charging_counts.rename(columns={'home': 'home_vehicle_charging', 'work': 'work_vehicle_charging', 'poi': 'poi_vehicle_charging'}) # Add a total vehicle charging count column vehicle_charging_counts['total_vehicle_charging'] = vehicle_charging_counts[['home_vehicle_charging', 'work_vehicle_charging', 'poi_vehicle_charging']].sum(axis=1) # Count the number of vehicles plugged in (regardless of charging status) by location and time vehicle_plugged_counts = (merged_df[merged_df['is_plugged']] .groupby(['time', 'charging_location'])['vehicle_id'] .nunique() .unstack(fill_value=0) .reset_index()) # Ensure all required columns are present in vehicle_plugged_counts for col in ['home', 'work', 'poi']: if col not in vehicle_plugged_counts.columns: vehicle_plugged_counts[col] = 0 # Rename columns to indicate they are counts of plugged-in vehicles vehicle_plugged_counts = vehicle_plugged_counts.rename(columns={'home': 'home_vehicle_present', 'work': 'work_vehicle_present', 'poi': 'poi_vehicle_present'}) # Add a total plugged-in vehicle count column vehicle_plugged_counts['total_vehicle_present'] = vehicle_plugged_counts[['home_vehicle_present', 'work_vehicle_present', 'poi_vehicle_present']].sum(axis=1) # Merge the counts back into the aggregated profile aggregated_profile = aggregated_profile.merge(vehicle_charging_counts, on='time', how='left').fillna(0) aggregated_profile = aggregated_profile.merge(vehicle_plugged_counts, on='time', how='left').fillna(0) self._temporal_demand_profile_aggregated = aggregated_profile[['time', 'home', 'work', 'poi', 'total', 'home_vehicle_present', 'work_vehicle_present', 'poi_vehicle_present', 'total_vehicle_present', 'home_vehicle_charging', 'work_vehicle_charging', 'poi_vehicle_charging', 'total_vehicle_charging']] # Convert power (kW) to energy (kWh) by multiplying each time step by time_step (in hours) energy_needs = aggregated_profile[['home', 'work', 'poi', 'total']] * time_step # Calculate total and per-location charging needs (kWh) total_charging_needs = energy_needs['total'].sum() home_charging_needs = energy_needs['home'].sum() work_charging_needs = energy_needs['work'].sum() poi_charging_needs = energy_needs['poi'].sum() print(f"\t > Total Charging Needs: {total_charging_needs:.2f} kWh") print(f"\t > Home Charging Needs: {home_charging_needs:.2f} kWh") print(f"\t > Work Charging Needs: {work_charging_needs:.2f} kWh") print(f"\t > POI Charging Needs: {poi_charging_needs:.2f} kWh") # Smart charging
[docs] def apply_smart_charging(self, location: list, charging_strategy: str, share: float, **kwargs): """ Apply a smart charging strategy to a subset of vehicles at specific locations. Args: location (list): List of location types ('home', 'work', 'poi'). charging_strategy (str): The name of the charging strategy to apply. share (float): Proportion of vehicles participating (between 0 and 1). kwargs: Additional parameters for specific charging strategies. """ print(f"INFO \t Applying '{charging_strategy}' charging strategy...") # Check that share is within the valid range if not (0 <= share <= 1): raise ValueError("Share must be between 0 and 1.") selected_vehicle_ids = [] # Initialize an empty list for selected vehicle IDs # Iterate through each specified location for l in location: # Filter vehicles by the current location location_vehicles = self._temporal_demand_vehicle_properties[self._temporal_demand_vehicle_properties['charging_location'] == l] # Determine the number of vehicles to modify for the current location num_smart_vehicles = int(share * len(location_vehicles)) # If there are vehicles at this location, select some if num_smart_vehicles > 0: # Ensure that we do not attempt to select more vehicles than available selected_vehicles = random.sample(list(location_vehicles['vehicle_id']), min(num_smart_vehicles, len(location_vehicles))) selected_vehicle_ids.extend(selected_vehicles) # Add to the total list of selected vehicle IDs print(f"\t > Selected {len(selected_vehicles)} vehicles from '{l}'") # Check if there are any vehicles to process if not selected_vehicle_ids: print(">\t No vehicles found for the given locations.") return # Update the strategy column for selected and non-selected vehicles self._temporal_demand_vehicle_properties['strategy'] = self._temporal_demand_vehicle_properties[ 'vehicle_id'].apply(lambda vid: charging_strategy if vid in selected_vehicle_ids else "dumb") # Get relevant columns in _temporal_demand_profile smart_vehicle_columns = [vid for vid in selected_vehicle_ids if vid in self._temporal_demand_profile.columns] smart_vehicles_df = self._temporal_demand_profile[['time'] + smart_vehicle_columns].copy() # Apply the selected charging strategy modified_smart_vehicles_df = self._apply_charging_strategy(smart_vehicles_df, charging_strategy, **kwargs) # Update _temporal_demand_profile with modified charging profiles self._temporal_demand_profile.update(modified_smart_vehicles_df) # Recompute aggregated profile self._compute_aggregated_charging_profile()
def _apply_charging_strategy(self, smart_vehicles_df: pd.DataFrame, strategy: str, **kwargs) -> pd.DataFrame: """ Applies a specific charging strategy to the smart vehicles. Args: smart_vehicles_df (pd.DataFrame): DataFrame of vehicles to apply the strategy on. strategy (str): The name of the charging strategy. **kwargs: Additional parameters specific to certain strategies. Returns: pd.DataFrame: Modified DataFrame with updated charging profiles. """ # Multiply by (dummy strategy) ############################## if strategy == "multiply_by": factor = kwargs.get("factor") # Multiply each vehicle's charging profile by factor (skip the 'time' column) for column in smart_vehicles_df.columns[1:]: # Skip the 'time' column smart_vehicles_df[column] *= factor # Peak shaving through ideal coordination ######################################### elif strategy == "peak_shaving": # Initialize peak power and power demand array peak_power = 0 # Total power peak time = smart_vehicles_df.iloc[:, 0].values time_step = time[1] - time[0] power_demand = np.zeros(len(time)) # Tracks power demand over time intervals # Initialize charging profile for each vehicle smart_charging_profile = pd.DataFrame(0, index=time, columns=smart_vehicles_df.columns[1:], dtype=float) # Filter for vehicles by vehicle_id in the smart_vehicle_ids list smart_vehicle_ids = smart_vehicles_df.columns[1:] smart_vehicles_props = self.temporal_demand_vehicle_properties[self.temporal_demand_vehicle_properties['vehicle_id'].isin(smart_vehicle_ids)] # Process each vehicle for i, row in smart_vehicles_props.iterrows(): # print(i) vehicle_id = row['vehicle_id'] arrival_time = row['arrival_time'] departure_time = row['departure_time'] charging_demand = row['charging_demand'] max_power = row['charging_power'] # Calculate start and end indices in the time array for charging window start_idx = np.searchsorted(time, arrival_time) end_idx = np.searchsorted(time, departure_time) if end_idx < start_idx: end_idx += len(time) # Handle wrap-around for overnight charging # Remaining demand to be met remaining_demand = charging_demand # Minimize the total peak load for t in range(start_idx, end_idx): current_time_idx = t % len(time) # Wrap around 24-hour period current_total_power = power_demand[current_time_idx] # If the current total power is below the peak power, charge with maximum power to stay below the limit if current_total_power < peak_power: charge_power = min(peak_power - current_total_power, max_power) charge_energy = charge_power * time_step # Ensure not to exceed remaining charging demand if charge_energy > remaining_demand: charge_energy = remaining_demand charge_power = charge_energy / time_step power_demand[current_time_idx] += charge_power remaining_demand -= charge_energy smart_charging_profile.at[time[current_time_idx], vehicle_id] = charge_power # Stop charging if demand is fully met if remaining_demand <= 0: break # If there’s still demand, distribute uniformly across the charging window if remaining_demand > 0: charging_duration = (24 - arrival_time + departure_time) if departure_time < arrival_time else (departure_time - arrival_time) charge_power = remaining_demand / charging_duration for t in range(start_idx, end_idx): current_time_idx = t % len(time) power_demand[current_time_idx] += charge_power smart_charging_profile.at[time[current_time_idx], vehicle_id] += charge_power # Update peak power if needed peak_power = max(power_demand) # Reset the index to convert the time index into a column smart_charging_profile.reset_index(drop=False, inplace=True) # Rename the 'index' column to 'time' to reflect its content smart_charging_profile.rename(columns={'index': 'time'}, inplace=True) smart_vehicles_df = smart_charging_profile else: raise ValueError(f"Charging strategy is unknown.") # Additional strategies can be implemented here return smart_vehicles_df # Export and visualization
[docs] def to_csv(self, filepath: str): """ Saves two CSV files with the main output data for flows and aggregated zone metrics. Args: filepath (str): Base path for the output files. Automatically appends suffixes "_flows" and "_aggregated_zone_metrics" before ".csv". """ # Remove any existing file extension filepath_without_ext, _ = os.path.splitext(filepath) # Save each dataframe with the respective suffix self._spatial_demand.to_csv(f"{filepath_without_ext}_spatial_demand.csv", index=False) self._temporal_demand_vehicle_properties.to_csv(f"{filepath_without_ext}_temporal_demand_vehicle_properties.csv", index=False) self._temporal_demand_profile.to_csv(f"{filepath_without_ext}_temporal_demand_profile.csv", index=False) aggregated_profile = self._temporal_demand_profile_aggregated[ ['time', 'home', 'work', 'poi', 'total', 'home_vehicle_present', 'work_vehicle_present', 'poi_vehicle_present', 'total_vehicle_present', 'home_vehicle_charging', 'work_vehicle_charging', 'poi_vehicle_charging', 'total_vehicle_charging'] ].rename(columns={ 'time': 'Time (h)', 'home': 'Load - Home (kW)', 'work': 'Load - Work (kW)', 'poi': 'Load - POI (kW)', 'total': 'Load - Total (kW)', 'home_vehicle_present': 'Vehicles Present - Home', 'work_vehicle_present': 'Vehicles Present - Work', 'poi_vehicle_present': 'Vehicles Present - POI', 'total_vehicle_present': 'Vehicles Present - Total', 'home_vehicle_charging': 'Vehicles Charging - Home', 'work_vehicle_charging': 'Vehicles Charging - Work', 'poi_vehicle_charging': 'Vehicles Charging - POI', 'total_vehicle_charging': 'Vehicles Charging - Total' }) aggregated_profile.to_csv(f"{filepath_without_ext}_temporal_demand_profile_aggregated.csv", index=False)
[docs] def chargingdemand_total_to_map(self, filepath: str): """ Creates a Folium map visualizing the total charging demand at origin and destination points.S """ df = self.spatial_demand # 1. Create base map with administrative boundaries m = hlp.create_base_map(self.region) # 2. Add TAZ boundaries hlp.add_taz_boundaries(m, self.region.traffic_zones) # 3. Add color-mapped feature groups for charging demand hlp.add_colormapped_feature_group(m, df, self.region, 'Etot_home_kWh', 'Charging demand at Home', 'Charging demand (kWh)') hlp.add_colormapped_feature_group(m, df, self.region, 'Etot_work_kWh', 'Charging demand at Work', 'Charging demand (kWh)') hlp.add_colormapped_feature_group(m, df, self.region, 'Etot_poi_kWh', 'Charging demand at POIs', 'Charging demand (kWh)') # 4. Add Layer Control and Save the map folium.LayerControl().add_to(m) m.save(filepath)
[docs] def chargingdemand_pervehicle_to_map(self, filepath: str): """ Creates a Folium map visualizing the charging demand per vehicle at home, work, and POI. """ df = self.spatial_demand # 1. Create base map with administrative boundaries m = hlp.create_base_map(self.region) # 2. Add TAZ boundaries hlp.add_taz_boundaries(m, self.region.traffic_zones) # 3. Add color-mapped feature groups for charging demand per vehicle hlp.add_colormapped_feature_group(m, df, self.region, 'E_per_vehicle_home_kWh', 'Charging need per vehicle at Home', 'Charging demand (kWh/vehicle)') hlp.add_colormapped_feature_group(m, df, self.region, 'E_per_vehicle_work_kWh', 'Charging need per vehicle at Work', 'Charging demand (kWh/vehicle)') hlp.add_colormapped_feature_group(m, df, self.region, 'E_per_vehicle_poi_kWh', 'Charging need per vehicle at POIs', 'Charging demand (kWh/vehicle)') # 4. Add Layer Control and Save the map folium.LayerControl().add_to(m) m.save(filepath)
[docs] def chargingdemand_nvehicles_to_map(self, filepath: str): """ Creates a Folium map visualizing the number of vehicles charging at home, work, and POI locations. """ df = self.spatial_demand # 1. Create base map with administrative boundaries m = hlp.create_base_map(self.region) # 2. Add TAZ boundaries hlp.add_taz_boundaries(m, self.region.traffic_zones) # 3. Add color-mapped feature groups for number of vehicles charging hlp.add_colormapped_feature_group(m, df, self.region, 'n_vehicles_home', 'Number of vehicles charging at Home', 'Number of vehicles') hlp.add_colormapped_feature_group(m, df, self.region, 'n_vehicles_work', 'Number of vehicles charging at Work', 'Number of vehicles') hlp.add_colormapped_feature_group(m, df, self.region, 'n_vehicles_poi', 'Number of vehicles charging at POIs', 'Number of vehicles') # 4. Add Layer Control and Save the map folium.LayerControl().add_to(m) m.save(filepath)