In the world of logistics and transportation, planning efficient routes for vehicles is crucial. However, equally important is ensuring that drivers adhere to regulations, such as taking mandatory breaks after certain hours of driving. In this blog post, we'll explore how to enforce break constraints after a vehicle departs from the depot using Google OR-Tools in a Capacitated Vehicle Routing Problem with Time Windows and Breaks (CVRPTWB).
We'll walk through a Python example where we define an ActiveTime dimension that tracks the time since each vehicle departed from the depot. By leveraging this dimension, we'll set up mandatory breaks for drivers after specific hours of active driving.
Problem Overview
We are tackling a Capacitated Vehicle Routing Problem with Time Windows and Breaks (CVRPTWB). In this problem, we need to plan routes for vehicles that have:
- Capacity constraints: Each vehicle has a maximum load capacity.
- Time windows: Each customer must be served within a specific time frame.
- Driver breaks: Drivers must take breaks after certain hours of driving since they departed from the depot.
Our goal is to minimize the total distance traveled while adhering to these constraints.
Setting Up the Environment
First, ensure you have Google OR-Tools installed:
pip install ortools
Now, let's dive into the code.
Code Walkthrough
Importing Necessary Libraries
from ortools.constraint_solver import pywrapcp
from ortools.constraint_solver import routing_enums_pb2
import random
import pandas as pd
Creating the Data Model
We define a function create_data_model()
to generate our problem's data, including locations, demands, and time windows.
def create_data_model():
"""Stores the data for the problem."""
data = {}
...
return data
Key parameters:
-
Number of orders:
data["num_orders"]
-
Number of vehicles:
data["num_vehicles"]
-
Vehicle capacities:
data["vehicle_capacities"]
-
Time windows:
data["time_windows"]
-
Service time per demand unit:
data["time_per_demand_unit"]
Note: We use random generation for locations, demands, and time windows to simulate real-world variability. Due to this randomness, sometimes the problem may not have a feasible solution. If that happens, you might need to run the code multiple times.
Initializing the Routing Model
We create the routing index manager and the routing model.
manager = pywrapcp.RoutingIndexManager(
data["num_orders"] + 1, data["num_vehicles"], data["depot"]
)
routing = pywrapcp.RoutingModel(manager)
parameters = pywrapcp.DefaultRoutingSearchParameters()
parameters.first_solution_strategy = (
routing_enums_pb2.FirstSolutionStrategy.PARALLEL_CHEAPEST_INSERTION
)
Defining Distance and Demand Callbacks
We define the Manhattan distance and demand callbacks to calculate distances between nodes and demands at each node.
def manhattan_distance(x1, y1, x2, y2):
return abs(x1 - x2) + abs(y1 - y2)
def transit_callback(i, j):
...
def demand_callback(i):
...
We register these callbacks with the routing model.
transit_index = routing.RegisterTransitCallback(transit_callback)
routing.SetArcCostEvaluatorOfAllVehicles(transit_index)
demand_index = routing.RegisterUnaryTransitCallback(demand_callback)
routing.AddDimensionWithVehicleCapacity(
demand_index, 0, data["vehicle_capacities"], True, "Capacity"
)
Setting Service Times and Time Windows
We calculate service times based on demands and add the time dimension with time windows.
service_times = [
data["time_per_demand_unit"] * data["demands"][manager.IndexToNode(i)]
for i in range(routing.Size())
]
def time_callback(i, j):
...
time_index = routing.RegisterTransitCallback(time_callback)
routing.AddDimension(time_index, data["horizon"], data["horizon"], False, "Time")
time_dimension = routing.GetDimensionOrDie("Time")
We then add time window constraints for each customer and the depot.
for order in range(1, manager.GetNumberOfNodes()):
start, end = data["time_windows"][order]
index = manager.NodeToIndex(order)
time_dimension.CumulVar(index).SetRange(start, end)
for vehicle_id in range(data["num_vehicles"]):
index = routing.Start(vehicle_id)
time_dimension.CumulVar(index).SetRange(
data["time_windows"][data["depot"]][0], data["time_windows"][data["depot"]][1]
)
Key Part 1: Defining the ActiveTime Dimension
To enforce breaks after a certain amount of active driving time since departing from the depot, we introduce an ActiveTime dimension.
Defining the ActiveTime Callback
def active_time_callback(i, j):
loc_i = data["locations"][manager.IndexToNode(i)]
loc_j = data["locations"][manager.IndexToNode(j)]
return (
int(manhattan_distance(loc_i[0], loc_i[1], loc_j[0], loc_j[1]) / data["speed"])
+ service_times[manager.IndexToNode(i)]
)
This callback calculates the transit time between nodes, including the service time at the current node.
Adding the ActiveTime Dimension
We register the active_time_callback
and add the ActiveTime dimension. Crucially, we set fix_start_cumul_to_zero
to True
, ensuring that the active time for each vehicle starts from zero at the depot.
active_time_index = routing.RegisterTransitCallback(active_time_callback)
routing.AddDimensionWithVehicleTransits(
[active_time_index]*data["num_vehicles"], 0, data["horizon"], True, "ActiveTime"
)
active_time_dimension = routing.GetDimensionOrDie("ActiveTime")
Explanation:
-
AddDimensionWithVehicleTransits
allows us to add a dimension that can have different transit callbacks for each vehicle. In our case, we use the same callback for all vehicles. - The fourth argument
True
is forfix_start_cumul_to_zero
, which fixes the cumulative value at the start of each vehicle's route to zero. This is essential for accurately tracking the active time since departing from the depot.
Minimizing Time Variables
We also add time minimization constraints to help the solver find better solutions.
for i in range(routing.Size()):
routing.AddVariableMinimizedByFinalizer(time_dimension.CumulVar(i))
routing.AddVariableMinimizedByFinalizer(active_time_dimension.CumulVar(i))
for j in range(data["num_vehicles"]):
routing.AddVariableMinimizedByFinalizer(time_dimension.CumulVar(routing.Start(j)))
routing.AddVariableMinimizedByFinalizer(time_dimension.CumulVar(routing.End(j)))
routing.AddVariableMinimizedByFinalizer(active_time_dimension.CumulVar(routing.Start(j)))
routing.AddVariableMinimizedByFinalizer(active_time_dimension.CumulVar(routing.End(j)))
Key Part 2: Adding Vehicle Breaks Based on ActiveTime
Now that we have our ActiveTime dimension, we can set up breaks that occur after certain hours of active driving.
Defining Break Data
We define the break intervals that we want to enforce. For example, drivers must take a 30-minute break between 6-8 hours and another between 14-16 hours of active driving.
break_data = [
# [start_min, start_max, duration] (in seconds)
[6 * 3600, 8 * 3600, 1 * 1800], # Morning break
[14 * 3600, 16 * 3600, 1 * 1800], # Afternoon break
]
Creating and Enforcing Break Intervals
We create break intervals for each vehicle and enforce that they must be taken.
solver = routing.solver()
breaks = {}
for vehicle in range(data["num_vehicles"]):
breaks[vehicle] = []
for i, data_break in enumerate(break_data):
break_interval = solver.FixedDurationIntervalVar(
data_break[0],
data_break[1],
data_break[2],
False, # is_optional set to False
f"Vehicle {vehicle}'s Break {i}",
)
breaks[vehicle].append(break_interval)
# Apply break constraints
solver.Add(breaks[vehicle][0].PerformedExpr() == 1)
solver.Add(breaks[vehicle][1].PerformedExpr() == 1)
index = routing.Start(vehicle)
solver.Add(time_dimension.CumulVar(index) < breaks[vehicle][0].StartExpr())
Linking Breaks to ActiveTime Dimension
This is the crucial step: We set the breaks to be based on the ActiveTime dimension rather than the default time dimension.
# Set breaks based on ActiveTime dimension
active_time_dimension.SetBreakIntervalsOfVehicle(
breaks[vehicle], vehicle, service_times
)
Explanation:
- By calling
SetBreakIntervalsOfVehicle
with ouractive_time_dimension
, we ensure that breaks are scheduled based on the cumulative active time since the vehicle departed from the depot. - The
service_times
parameter accounts for the time spent servicing customers.
Optional: Forcing Longer Working Hours for Testing
Note: In our example, to test the breaks effectively, we add constraints to ensure vehicles work longer hours. This is for experimentation purposes and can be removed in actual applications.
# Please delete this when you apply to your problem
# This is added for this experiment to ensure results with long hours of working so we can test our breaks
for vehicle in range(data["num_vehicles"]):
index = routing.Start(vehicle)
solver.Add(time_dimension.CumulVar(index) <= 1 * 3600)
index = routing.End(vehicle)
solver.Add(time_dimension.CumulVar(index) >= 23 * 3600)
Allowing Orders to Be Skipped with Penalties
To provide flexibility in routing, we add penalties for skipping orders. This helps the solver make decisions that balance total distance with the need to serve all customers.
kPenalty = 10000000 # Penalty for skipping an order
for order in range(1, manager.GetNumberOfNodes()):
routing.AddDisjunction([manager.NodeToIndex(order)], kPenalty)
Solving the Problem
We set up the solver and solve the problem.
solution = routing.SolveWithParameters(parameters)
Extracting and Printing the Solution
Finally, we extract the solution and print out the routes, including the break schedules.
Printing the Break Plans
print("==== BREAK PLAN ====")
for vehicle_id in range(data["num_vehicles"]):
vehicle_breaks = breaks[vehicle_id]
for break_interval in vehicle_breaks:
if solution.PerformedValue(break_interval):
print(
f"{break_interval.Name()} Start({break_interval.StartExpr()}) "
f"Duration({break_interval.DurationExpr()}) "
f"End({break_interval.EndExpr()})"
)
Printing the Route Plans
print("\n\n==== ROUTE PLAN ====")
total_distance = 0
total_load = 0
total_time = 0
capacity_dimension = routing.GetDimensionOrDie("Capacity")
time_dimension = routing.GetDimensionOrDie("Time")
for vehicle_id in range(data["num_vehicles"]):
index = routing.Start(vehicle_id)
plan_output = f"Route for vehicle {vehicle_id}:\n"
route_distance = 0
route_load = 0
while not routing.IsEnd(index):
...
print(plan_output)
total_distance += route_distance
total_load += route_load
total_time += solution.Max(time_var)
print(f"Total Distance of all routes: {total_distance} units")
print(f"Total Load of all routes: {total_load}")
print(f"Total Time of all routes: {round(total_time/3600, 3)} hours")
Full Code
For completeness, here's the full code with comments:
# Capacitated Vehicle Routing Problem with Time Windows and Breaks (CVRPTWB)
from ortools.constraint_solver import pywrapcp
from ortools.constraint_solver import routing_enums_pb2
import random
import pandas as pd
# Data Model
def create_data_model():
"""Stores the data for the problem."""
data = {}
# Problem parameters corresponding to the mathematical model
data["num_orders"] = 20 # |C| = 20 customer nodes
data["num_vehicles"] = 3 # |V| = 3 vehicles
data["depot"] = 0 # Depot node
data["vehicle_capacities"] = [40] * data["num_vehicles"] # Q = 40 for each vehicle
data["speed"] = 100 # Vehicle speed (used in time calculations)
# Time parameters
data["time_per_demand_unit"] = 100 # Service time per demand unit (s_i = 100 * d_i)
data["horizon"] = 24 * 3600 # H = 24 hours in seconds
num_locations = data["num_orders"] + 1 # Total nodes |N| = |C| + 1 (depot)
# Random seed for reproducibility
use_deterministic_seed = False
if use_deterministic_seed:
random.seed(0)
else:
random.seed(None)
# Randomly generate locations (x_i, y_i for each node i)
kXMax = 100000
kYMax = 100000
data["locations"] = [
(random.randint(0, kXMax), random.randint(0, kYMax))
for _ in range(num_locations)
]
# Random demands (d_i for each customer i)
data["demands"] = [0] + [random.randint(1, 10) for _ in range(data["num_orders"])]
# Time windows ([a_i, b_i] for each node i)
data["time_windows"] = []
kTWDuration = 5 * 3600 # 5 hours in seconds
for _ in range(num_locations):
start = random.randint(0, data["horizon"] - kTWDuration)
data["time_windows"].append((start, start + kTWDuration))
return data
# Instantiate the data problem
data = create_data_model()
# Initialize routing and search parameters
manager = pywrapcp.RoutingIndexManager(
data["num_orders"] + 1, data["num_vehicles"], data["depot"]
)
routing = pywrapcp.RoutingModel(manager)
parameters = pywrapcp.DefaultRoutingSearchParameters()
# Set first solution strategy (not in mathematical model, but affects solution quality)
parameters.first_solution_strategy = (
routing_enums_pb2.FirstSolutionStrategy.PARALLEL_CHEAPEST_INSERTION
)
# Define the Manhattan distance function (c_ij in the model)
def manhattan_distance(x1, y1, x2, y2):
return abs(x1 - x2) + abs(y1 - y2)
# Callback for distance calculation (implements c_ij)
def transit_callback(i, j):
loc_i = data["locations"][manager.IndexToNode(i)]
loc_j = data["locations"][manager.IndexToNode(j)]
return manhattan_distance(loc_i[0], loc_i[1], loc_j[0], loc_j[1])
transit_index = routing.RegisterTransitCallback(transit_callback)
routing.SetArcCostEvaluatorOfAllVehicles(transit_index)
# Adding capacity dimension constraints (Constraint 3 in the model)
def demand_callback(i):
return data["demands"][manager.IndexToNode(i)]
demand_index = routing.RegisterUnaryTransitCallback(demand_callback)
routing.AddDimensionWithVehicleCapacity(
demand_index, 0, data["vehicle_capacities"], True, "Capacity"
)
# Set service times (s_i in the model)
service_times = [
data["time_per_demand_unit"] * data["demands"][manager.IndexToNode(i)]
for i in range(routing.Size())
]
def time_callback(i, j):
loc_i = data["locations"][manager.IndexToNode(i)]
loc_j = data["locations"][manager.IndexToNode(j)]
return (
int(manhattan_distance(loc_i[0], loc_i[1], loc_j[0], loc_j[1]) / data["speed"])
+ service_times[manager.IndexToNode(i)]
)
time_index = routing.RegisterTransitCallback(time_callback)
routing.AddDimension(time_index, data["horizon"], data["horizon"], False, "Time")
time_dimension = routing.GetDimensionOrDie("Time")
# === Key Part 1: Defining ActiveTime Dimension === #
def active_time_callback(i, j):
loc_i = data["locations"][manager.IndexToNode(i)]
loc_j = data["locations"][manager.IndexToNode(j)]
return (
int(manhattan_distance(loc_i[0], loc_i[1], loc_j[0], loc_j[1]) / data["speed"])
+ service_times[manager.IndexToNode(i)]
)
active_time_index = routing.RegisterTransitCallback(active_time_callback)
routing.AddDimensionWithVehicleTransits(
[active_time_index]*data["num_vehicles"], 0, data["horizon"], True, "ActiveTime"
)
active_time_dimension = routing.GetDimensionOrDie("ActiveTime")
# ================================================ #
# Adding time windows (Constraint 4 in the model)
for order in range(1, manager.GetNumberOfNodes()):
start, end = data["time_windows"][order]
index = manager.NodeToIndex(order)
time_dimension.CumulVar(index).SetRange(start, end)
for vehicle_id in range(data["num_vehicles"]):
index = routing.Start(vehicle_id)
active_time_dimension.CumulVar(index).SetValue(0)
# Add time window constraints for each vehicle start node.
depot_idx = data["depot"]
for vehicle_id in range(data["num_vehicles"]):
index = routing.Start(vehicle_id)
time_dimension.CumulVar(index).SetRange(
data["time_windows"][depot_idx][0], data["time_windows"][depot_idx][1]
)
# Minimizing time variables (part of the objective function)
for i in range(routing.Size()):
routing.AddVariableMinimizedByFinalizer(time_dimension.CumulVar(i))
for j in range(data["num_vehicles"]):
routing.AddVariableMinimizedByFinalizer(time_dimension.CumulVar(routing.Start(j)))
routing.AddVariableMinimizedByFinalizer(time_dimension.CumulVar(routing.End(j)))
# Minimizing ActiveTime variables
for i in range(routing.Size()):
routing.AddVariableMinimizedByFinalizer(active_time_dimension.CumulVar(i))
for j in range(data["num_vehicles"]):
routing.AddVariableMinimizedByFinalizer(
active_time_dimension.CumulVar(routing.Start(j))
)
routing.AddVariableMinimizedByFinalizer(
active_time_dimension.CumulVar(routing.End(j))
)
# === Key Part 2: Adding Vehicle Breaks === #
solver = routing.solver()
break_data = [
# [start_min, start_max, duration] (in seconds)
[6 * 3600, 8 * 3600, 1 * 1800], # Morning break
[14 * 3600, 16 * 3600, 1 * 1800], # Afternoon break
]
# Optional: Forcing long working hours for testing
# Please delete this when you apply to your problem
# This is added for this experiment to ensure results with long hours of working so we can test our breaks
for vehicle in range(data["num_vehicles"]):
index = routing.Start(vehicle)
solver.Add(time_dimension.CumulVar(index) <= 1 * 3600)
index = routing.End(vehicle)
solver.Add(time_dimension.CumulVar(index) >= 23 * 3600)
breaks = {}
for vehicle in range(data["num_vehicles"]):
# Declare a list of break intervals for each vehicle
breaks[vehicle] = []
for i, data_break in enumerate(break_data):
# Declare each possible break interval for the current vehicle
break_interval = solver.FixedDurationIntervalVar(
data_break[0],
data_break[1],
data_break[2],
False, # is_optional set to False
f"Vehicle {vehicle}'s Break {i}",
)
breaks[vehicle].append(break_interval)
# Apply break constraints
solver.Add(breaks[vehicle][0].PerformedExpr() == 1)
solver.Add(breaks[vehicle][1].PerformedExpr() == 1)
index = routing.Start(vehicle)
solver.Add(time_dimension.CumulVar(index) < breaks[vehicle][0].StartExpr())
# Set breaks based on ActiveTime dimension
active_time_dimension.SetBreakIntervalsOfVehicle(
breaks[vehicle], vehicle, service_times
)
# ========================================== #
# Adding penalty costs to allow skipping orders
kPenalty = 10000000 # Penalty for skipping an order
for order in range(1, manager.GetNumberOfNodes()):
routing.AddDisjunction([manager.NodeToIndex(order)], kPenalty)
# Solve the problem
solution = routing.SolveWithParameters(parameters)
print("==== BREAK PLAN ====")
for vehicle_id in range(data["num_vehicles"]):
# Get the break intervals for the current vehicle
vehicle_breaks = breaks[vehicle_id]
# Check each break interval of the vehicle
for break_interval in vehicle_breaks:
# Find whether the break interval was performed or not from the solution
if solution.PerformedValue(break_interval):
print(
f"{break_interval.Name()} Start({break_interval.StartExpr()}) "
f"Duration({break_interval.DurationExpr()}) "
f"End({break_interval.EndExpr()})"
)
print("\n\n==== ROUTE PLAN ====")
total_distance = 0
total_load = 0
total_time = 0
capacity_dimension = routing.GetDimensionOrDie("Capacity")
time_dimension = routing.GetDimensionOrDie("Time")
for vehicle_id in range(data["num_vehicles"]):
index = routing.Start(vehicle_id)
plan_output = f"Route for vehicle {vehicle_id}:\n"
route_distance = 0
route_load = 0
while not routing.IsEnd(index):
node_index = manager.IndexToNode(index)
load_var = capacity_dimension.CumulVar(index)
time_var = time_dimension.CumulVar(index)
route_load = solution.Value(load_var)
time_min = solution.Min(time_var)
time_max = solution.Max(time_var)
plan_output += f" {node_index} Load({route_load}) Time({round(time_min/3600, 3)},{round(time_max/3600, 3)}) -> "
previous_index = index
index = solution.Value(routing.NextVar(index))
route_distance += routing.GetArcCostForVehicle(
previous_index, index, vehicle_id
)
node_index = manager.IndexToNode(index)
time_var = time_dimension.CumulVar(index)
time_min = solution.Min(time_var)
time_max = solution.Max(time_var)
load_var = capacity_dimension.CumulVar(index)
route_load = solution.Value(load_var)
plan_output += (
f" {node_index} Time({round(time_min/3600, 3)},{round(time_max/3600, 3)})\n"
)
plan_output += f"Distance of the route: {route_distance} units\n"
plan_output += f"Load of the route: {route_load}\n"
print(plan_output)
total_distance += route_distance
total_load += route_load
total_time += solution.Max(time_var)
print(f"Total Distance of all routes: {total_distance} units")
print(f"Total Load of all routes: {total_load}")
print(f"Total Time of all routes: {round(total_time/3600, 3)} hours")
Validate Result
I wrote additional code to confirm breaks as below:
import pandas as pd
def seconds_to_hhmm(seconds):
# Convert seconds to HH:MM format
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
return f"{hours}:{minutes:02d}"
events = []
for vehicle_id in range(data["num_vehicles"]):
index = routing.Start(vehicle_id)
break_remark = ""
is_start = True
while not routing.IsEnd(index):
next_index = solution.Value(routing.NextVar(index))
node_index = manager.IndexToNode(index)
next_node_index = manager.IndexToNode(next_index)
time_var = time_dimension.CumulVar(index)
arrival_time = solution.Min(time_var) # in seconds
if is_start:
depot_arrival_time = arrival_time
is_start = False
time_var = time_dimension.CumulVar(next_index)
actual_next_arrival_time = solution.Min(time_var) # in seconds
service_time = service_times[node_index] # in seconds
start_wait = arrival_time + service_time
expected_next_arrival_time = arrival_time + time_callback(index, next_index)
transit_duration = expected_next_arrival_time - service_time - arrival_time
waiting_duration = actual_next_arrival_time - expected_next_arrival_time
start_transit = start_wait + waiting_duration
if routing.IsStart(index):
events.append(
(
vehicle_id,
node_index,
"start_route",
arrival_time,
seconds_to_hhmm(arrival_time),
0,
seconds_to_hhmm(arrival_time - depot_arrival_time),
"",
)
)
else:
events.append(
(
vehicle_id,
node_index,
"arrival_time",
arrival_time,
seconds_to_hhmm(arrival_time),
0,
seconds_to_hhmm(arrival_time - depot_arrival_time),
"",
)
)
events.append(
(
vehicle_id,
node_index,
"start_transit",
start_transit,
seconds_to_hhmm(start_transit),
seconds_to_hhmm(transit_duration),
seconds_to_hhmm(start_transit - depot_arrival_time),
"",
)
)
index = next_index
if waiting_duration > 0:
end_wait = start_wait + waiting_duration
for break_id, break_info in enumerate(break_data):
overlap_start = max(start_wait - depot_arrival_time, break_info[0])
overlap_end = min(
end_wait - depot_arrival_time, break_info[1] + break_info[2]
)
overlap_duration = overlap_end - overlap_start
if overlap_duration > break_info[2]:
this_remark = f"break {break_id} "
break_remark = break_remark + this_remark
events.append(
(
vehicle_id,
next_node_index,
"start_wait (idle + break)",
start_wait,
seconds_to_hhmm(start_wait),
seconds_to_hhmm(waiting_duration),
seconds_to_hhmm(start_wait - depot_arrival_time),
break_remark,
)
)
break_remark = ""
if routing.IsEnd(next_index):
events.append(
(
vehicle_id,
next_node_index,
"end_route",
actual_next_arrival_time,
seconds_to_hhmm(actual_next_arrival_time),
0,
seconds_to_hhmm(actual_next_arrival_time - depot_arrival_time),
"",
)
)
df_result = pd.DataFrame(
events,
columns=[
"vehicle_id",
"node",
"event",
"timestamp",
"time",
"duration",
"since_start",
"remark",
],
).sort_values(["vehicle_id", "timestamp"])
df_result = df_result[
[
"vehicle_id",
"node",
"event",
"timestamp",
"time",
"since_start",
"duration",
"remark",
]
].copy()
Here is an example output:
Conclusion
By introducing an ActiveTime dimension and linking driver breaks to this dimension, we can effectively enforce breaks after specific hours of driving since departing from the depot in Google OR-Tools. This approach ensures compliance with regulations and enhances driver safety.
Remember that due to random data generation, you might need to run the code multiple times to obtain a feasible solution. In real-world applications, you'd use actual data, which would provide consistent results.
Top comments (0)