Shortcuts

Source code for rl4co.envs.routing.mtsp

from typing import Optional

import numpy as np
import torch

from tensordict.tensordict import TensorDict
from torchrl.data import (
    BoundedTensorSpec,
    CompositeSpec,
    UnboundedContinuousTensorSpec,
    UnboundedDiscreteTensorSpec,
)

from rl4co.envs.common.base import RL4COEnvBase
from rl4co.envs.common.utils import batch_to_scalar
from rl4co.utils.ops import gather_by_index, get_distance, get_tour_length


[docs]class MTSPEnv(RL4COEnvBase): """Multiple Traveling Salesman Problem environment At each step, an agent chooses to visit a city. A maximum of `num_agents` agents can be employed to visit the cities. The cost can be defined in two ways: - `minmax`: (default) the reward is the maximum of the path lengths of all the agents - `sum`: the cost is the sum of the path lengths of all the agents Reward is - cost, so the goal is to maximize the reward (minimize the cost). Args: num_loc: number of locations (cities) to visit min_loc: minimum value of the locations max_loc: maximum value of the locations min_num_agents: minimum number of agents max_num_agents: maximum number of agents cost_type: type of cost to use, either `minmax` or `sum` td_params: parameters for the TensorDict specs """ name = "mtsp" def __init__( self, num_loc: int = 20, min_loc: float = 0, max_loc: float = 1, min_num_agents: int = 5, max_num_agents: int = 5, cost_type: str = "minmax", td_params: TensorDict = None, **kwargs, ): super().__init__(**kwargs) self.num_loc = num_loc self.min_loc = min_loc self.max_loc = max_loc self.min_num_agents = min_num_agents self.max_num_agents = max_num_agents self.cost_type = cost_type self._make_spec(td_params) @staticmethod def _step(td: TensorDict) -> TensorDict: # Initial variables is_first_action = batch_to_scalar(td["i"]) == 0 current_node = td["action"] first_node = current_node if is_first_action else td["first_node"] # Get the locations of the current node and the previous node and the depot cur_loc = gather_by_index(td["locs"], current_node) prev_loc = gather_by_index( td["locs"], td["current_node"] ) # current_node is the previous node depot_loc = td["locs"][..., 0, :] # If current_node is the depot, then increment agent_idx cur_agent_idx = td["agent_idx"] + (current_node == 0).long() # Set not visited to 0 (i.e., we visited the node) available = td["action_mask"].scatter( -1, current_node[..., None].expand_as(td["action_mask"]), 0 ) # Available[..., 0] is the depot, which is always available unless: # - current_node is the depot # - agent_idx greater than num_agents -1 available[..., 0] = torch.logical_and( current_node != 0, td["agent_idx"] < td["num_agents"] - 1 ) # We are done there are no unvisited locations except the depot done = torch.count_nonzero(available[..., 1:], dim=-1) == 0 # If done is True, then we make the depot available again, so that it will be selected as the next node with prob 1 available[..., 0] = torch.logical_or(done, available[..., 0]) # If current agent is different from previous agent, then we have a new subtour and reset the length, otherwise we add the new distance current_length = torch.where( cur_agent_idx != td["agent_idx"], torch.zeros_like(td["current_length"]), td["current_length"] + get_distance(cur_loc, prev_loc), ) # If done, we add the distance from the current_node to the depot as well current_length = torch.where( done, current_length + get_distance(cur_loc, depot_loc), current_length ) # We update the max_subtour_length and reset the current_length max_subtour_length = torch.where( current_length > td["max_subtour_length"], current_length, td["max_subtour_length"], ) # The reward is the negative of the max_subtour_length (minmax objective) reward = -max_subtour_length td.update( { "max_subtour_length": max_subtour_length, "current_length": current_length, "agent_idx": cur_agent_idx, "first_node": first_node, "current_node": current_node, "i": td["i"] + 1, "action_mask": available, "reward": reward, "done": done, } ) return td def _reset(self, td: Optional[TensorDict] = None, batch_size=None) -> TensorDict: # Initialize data if batch_size is None: batch_size = self.batch_size if td is None else td["locs"].shape[:-2] device = td.device if td is not None else self.device if td is None or td.is_empty(): td = self.generate_data(batch_size=batch_size) # Keep track of the agent number to know when to stop agent_idx = torch.zeros((*batch_size,), dtype=torch.int64, device=device) # Make variable for max_subtour_length between subtours max_subtour_length = torch.zeros(batch_size, dtype=torch.float32, device=device) current_length = torch.zeros(batch_size, dtype=torch.float32, device=device) # Other variables current_node = torch.zeros((*batch_size,), dtype=torch.int64, device=device) available = torch.ones( (*batch_size, self.num_loc), dtype=torch.bool, device=device ) # 1 means not visited, i.e. action is allowed available[..., 0] = 0 # Depot is not available as first node i = torch.zeros((*batch_size,), dtype=torch.int64, device=device) return TensorDict( { "locs": td["locs"], # depot is first node "num_agents": td["num_agents"], "max_subtour_length": max_subtour_length, "current_length": current_length, "agent_idx": agent_idx, "first_node": current_node, "current_node": current_node, "i": i, "action_mask": available, }, batch_size=batch_size, ) def _make_spec(self, td_params: TensorDict): """Make the observation and action specs from the parameters.""" self.observation_spec = CompositeSpec( locs=BoundedTensorSpec( minimum=self.min_loc, maximum=self.max_loc, shape=(self.num_loc, 2), dtype=torch.float32, ), num_agents=UnboundedDiscreteTensorSpec( shape=(1), dtype=torch.int64, ), agent_idx=UnboundedDiscreteTensorSpec( shape=(1), dtype=torch.int64, ), current_length=UnboundedContinuousTensorSpec( shape=(1), dtype=torch.float32, ), max_subtour_length=UnboundedContinuousTensorSpec( shape=(1), dtype=torch.float32, ), first_node=UnboundedDiscreteTensorSpec( shape=(1), dtype=torch.int64, ), current_node=UnboundedDiscreteTensorSpec( shape=(1), dtype=torch.int64, ), i=UnboundedDiscreteTensorSpec( shape=(1), dtype=torch.int64, ), action_mask=UnboundedDiscreteTensorSpec( shape=(self.num_loc), dtype=torch.bool, ), shape=(), ) self.action_spec = BoundedTensorSpec( shape=(1,), dtype=torch.int64, minimum=0, maximum=self.num_loc, ) self.reward_spec = UnboundedContinuousTensorSpec() self.done_spec = UnboundedDiscreteTensorSpec(dtype=torch.bool)
[docs] def get_reward(self, td, actions=None) -> TensorDict: # With minmax, get the maximum distance among subtours, calculated in the model if self.cost_type == "minmax": return td["reward"].squeeze(-1) # With distance, same as TSP elif self.cost_type == "distance": locs = td["locs"] locs_ordered = locs.gather(1, actions.unsqueeze(-1).expand_as(locs)) return -get_tour_length(locs_ordered) else: raise ValueError(f"Cost type {self.cost_type} not supported")
[docs] def generate_data(self, batch_size) -> TensorDict: batch_size = [batch_size] if isinstance(batch_size, int) else batch_size # Initialize the locations (including the depot which is always the first node) locs = ( torch.FloatTensor(*batch_size, self.num_loc, 2) .uniform_(self.min_loc, self.max_loc) .to(self.device) ) # Initialize the num_agents: either fixed or random integer between min and max if self.min_num_agents == self.max_num_agents: num_agents = ( torch.ones(batch_size, dtype=torch.int64, device=self.device) * self.min_num_agents ) else: num_agents = torch.randint( self.min_num_agents, self.max_num_agents, size=batch_size, device=self.device, ) return TensorDict( { "locs": locs, "num_agents": num_agents, }, batch_size=batch_size, )
[docs] @staticmethod def render(td, actions=None, ax=None): import matplotlib.pyplot as plt from matplotlib import colormaps def discrete_cmap(num, base_cmap="nipy_spectral"): """Create an N-bin discrete colormap from the specified input map""" base = colormaps[base_cmap] color_list = base(np.linspace(0, 1, num)) cmap_name = base.name + str(num) return base.from_list(cmap_name, color_list, num) if actions is None: actions = td.get("action", None) # if batch_size greater than 0 , we need to select the first batch element if td.batch_size != torch.Size([]): td = td[0] actions = actions[0] num_agents = td["num_agents"] locs = td["locs"] cmap = discrete_cmap(num_agents, "rainbow") fig, ax = plt.subplots() # Add depot action = 0 to before first action and after last action actions = torch.cat( [ torch.zeros(1, dtype=torch.int64), actions, torch.zeros(1, dtype=torch.int64), ] ) # Make list of colors from matplotlib for i, loc in enumerate(locs): if i == 0: # depot marker = "s" color = "g" label = "Depot" markersize = 10 else: # normal location marker = "o" color = "tab:blue" label = "Cities" markersize = 8 if i > 1: label = "" ax.plot( loc[0], loc[1], color=color, marker=marker, markersize=markersize, label=label, ) # Plot the actions in order agent_idx = 0 for i in range(len(actions)): if actions[i] == 0: agent_idx += 1 color = cmap(num_agents - agent_idx) from_node = actions[i] to_node = ( actions[i + 1] if i < len(actions) - 1 else actions[0] ) # last goes back to depot from_loc = td["locs"][from_node] to_loc = td["locs"][to_node] ax.plot([from_loc[0], to_loc[0]], [from_loc[1], to_loc[1]], color=color) ax.annotate( "", xy=(to_loc[0], to_loc[1]), xytext=(from_loc[0], from_loc[1]), arrowprops=dict(arrowstyle="->", color=color), annotation_clip=False, ) # Legend handles, labels = ax.get_legend_handles_labels() ax.legend(handles, labels) ax.set_title("mTSP") ax.set_xlabel("x-coordinate") ax.set_ylabel("y-coordinate")

© Copyright Federico Berto, Chuanbo Hua, Junyoung Park. Revision 14d072ed.

Built with Sphinx using a theme provided by Read the Docs.