Shortcuts

Source code for rl4co.envs.routing.pdp

from typing import Optional

import torch

from tensordict.tensordict import TensorDict
from torchrl.data import (
    BoundedTensorSpec,
    CompositeSpec,
    UnboundedContinuousTensorSpec,
    UnboundedDiscreteTensorSpec,
)

from rl4co.envs.common.base import RL4COEnvBase
from rl4co.utils.ops import gather_by_index, get_tour_length


[docs]class PDPEnv(RL4COEnvBase): """Pickup and Delivery Problem (PDP) environment. The environment is made of num_loc + 1 locations (cities): - 1 depot - `num_loc` / 2 pickup locations - `num_loc` / 2 delivery locations The goal is to visit all the pickup and delivery locations in the shortest path possible starting from the depot The conditions is that the agent must visit a pickup location before visiting its corresponding delivery location Args: num_loc: number of locations (cities) in the TSP td_params: parameters of the environment seed: seed for the environment device: device to use. Generally, no need to set as tensors are updated on the fly """ name = "pdp" def __init__( self, num_loc: int = 20, min_loc: float = 0, max_loc: float = 1, td_params: TensorDict = None, **kwargs, ): super().__init__(**kwargs) self.num_loc = num_loc self.min_loc = min_loc self.max_loc = max_loc self._make_spec(td_params) @staticmethod def _step(td: TensorDict) -> TensorDict: current_node = td["action"].unsqueeze(-1) num_loc = td["locs"].shape[-2] - 1 # except depot # Pickup and delivery node pair of selected node new_to_deliver = (current_node + num_loc // 2) % (num_loc + 1) # Set available to 0 (i.e., we visited the node) available = td["available"].scatter( -1, current_node.expand_as(td["action_mask"]), 0 ) to_deliver = td["to_deliver"].scatter( -1, new_to_deliver.expand_as(td["to_deliver"]), 1 ) # Action is feasible if the node is not visited and is to deliver # action_mask = torch.logical_and(available, to_deliver) action_mask = available & to_deliver # We are done there are no unvisited locations done = torch.count_nonzero(available, dim=-1) == 0 # The reward is calculated outside via get_reward for efficiency, so we set it to 0 here reward = torch.zeros_like(done) # Update step td.update( { "current_node": current_node, "available": available, "to_deliver": to_deliver, "i": td["i"] + 1, "action_mask": action_mask, "reward": reward, "done": done, } ) return td def _reset(self, td: Optional[TensorDict] = None, batch_size=None) -> TensorDict: if batch_size is None: batch_size = self.batch_size if td is None else td.batch_size if td is None or td.is_empty(): td = self.generate_data(batch_size=batch_size) self.to(td.device) locs = torch.cat((td["depot"][:, None, :], td["locs"]), -2) # Pick is 1, deliver is 0 [batch_size, graph_size+1], [1,1...1, 0...0] to_deliver = torch.cat( [ torch.ones( *batch_size, self.num_loc // 2 + 1, dtype=torch.bool, device=self.device, ), torch.zeros( *batch_size, self.num_loc // 2, dtype=torch.bool, device=self.device ), ], dim=-1, ) # Cannot visit depot at first step # [0,1...1] so set not available available = torch.ones( (*batch_size, self.num_loc + 1), dtype=torch.bool, device=self.device ) action_mask = ~available.contiguous() # [batch_size, graph_size+1] action_mask[..., 0] = 1 # First step is always the depot # Other variables current_node = torch.zeros( (*batch_size, 1), dtype=torch.int64, device=self.device ) i = torch.zeros((*batch_size, 1), dtype=torch.int64, device=self.device) return TensorDict( { "locs": locs, "current_node": current_node, "to_deliver": to_deliver, "available": available, "i": i, "action_mask": action_mask, }, batch_size=batch_size, ) def _make_spec(self, td_params: TensorDict): """Make the observation and action specs from the parameters.""" self.observation_spec = CompositeSpec( locs=BoundedTensorSpec( minimum=self.min_loc, maximum=self.max_loc, shape=(self.num_loc + 1, 2), dtype=torch.float32, ), current_node=UnboundedDiscreteTensorSpec( shape=(1), dtype=torch.int64, ), to_deliver=UnboundedDiscreteTensorSpec( shape=(1), dtype=torch.int64, ), i=UnboundedDiscreteTensorSpec( shape=(1), dtype=torch.int64, ), action_mask=UnboundedDiscreteTensorSpec( shape=(self.num_loc + 1), dtype=torch.bool, ), shape=(), ) self.action_spec = BoundedTensorSpec( shape=(1,), dtype=torch.int64, minimum=0, maximum=self.num_loc + 1, ) self.reward_spec = UnboundedContinuousTensorSpec(shape=(1,)) self.done_spec = UnboundedDiscreteTensorSpec(shape=(1,), dtype=torch.bool)
[docs] @staticmethod def get_reward(td, actions) -> TensorDict: # assert (actions[:, 0] == 0).all(), "Not starting at depot" assert ( torch.arange(actions.size(1), out=actions.data.new()) .view(1, -1) .expand_as(actions) == actions.data.sort(1)[0] ).all(), "Not visiting all nodes" visited_time = torch.argsort( actions, 1 ) # index of pickup less than index of delivery assert ( visited_time[:, 1 : actions.size(1) // 2 + 1] < visited_time[:, actions.size(1) // 2 + 1 :] ).all(), "Deliverying without pick-up" # Gather locations in the order of actions and get reward = -(total distance) locs_ordered = gather_by_index(td["locs"], actions) # [batch, graph_size+1, 2] return -get_tour_length(locs_ordered)
[docs] def generate_data(self, batch_size) -> TensorDict: batch_size = [batch_size] if isinstance(batch_size, int) else batch_size # Initialize the locations (including the depot which is always the first node) locs_with_depot = ( torch.FloatTensor(*batch_size, self.num_loc + 1, 2) .uniform_(self.min_loc, self.max_loc) .to(self.device) ) return TensorDict( { "locs": locs_with_depot[..., 1:, :], "depot": locs_with_depot[..., 0, :], }, batch_size=batch_size, )
[docs] @staticmethod def render(td: TensorDict, actions=None, ax=None): import matplotlib.pyplot as plt markersize = 8 td = td.detach().cpu() # if batch_size greater than 0 , we need to select the first batch element if td.batch_size != torch.Size([]): td = td[0] if actions is not None: actions = actions[0] # Variables init_deliveries = td["to_deliver"][1:] delivery_locs = td["locs"][1:][~init_deliveries.bool()] pickup_locs = td["locs"][1:][init_deliveries.bool()] depot_loc = td["locs"][0] actions = actions if actions is not None else td["action"] fig, ax = plt.subplots() # Plot the actions in order for i in range(len(actions)): from_node = actions[i] to_node = ( actions[i + 1] if i < len(actions) - 1 else actions[0] ) # last goes back to depot from_loc = td["locs"][from_node] to_loc = td["locs"][to_node] ax.plot([from_loc[0], to_loc[0]], [from_loc[1], to_loc[1]], "k-") ax.annotate( "", xy=(to_loc[0], to_loc[1]), xytext=(from_loc[0], from_loc[1]), arrowprops=dict(arrowstyle="->", color="black"), annotation_clip=False, ) # Plot the depot location ax.plot( depot_loc[0], depot_loc[1], "g", marker="s", markersize=markersize, label="Depot", ) # Plot the pickup locations for i, pickup_loc in enumerate(pickup_locs): ax.plot( pickup_loc[0], pickup_loc[1], "r", marker="^", markersize=markersize, label="Pickup" if i == 0 else None, ) # Plot the delivery locations for i, delivery_loc in enumerate(delivery_locs): ax.plot( delivery_loc[0], delivery_loc[1], "b", marker="v", markersize=markersize, label="Delivery" if i == 0 else None, ) # Setup limits and show ax.set_xlim(-0.05, 1.05) ax.set_ylim(-0.05, 1.05)

© Copyright Federico Berto, Chuanbo Hua, Junyoung Park. Revision 14d072ed.

Built with Sphinx using a theme provided by Read the Docs.