Shortcuts

Source code for rl4co.envs.ffsp

from typing import Optional

import torch

from tensordict.tensordict import TensorDict
from torchrl.data import (
    BoundedTensorSpec,
    CompositeSpec,
    UnboundedContinuousTensorSpec,
    UnboundedDiscreteTensorSpec,
)

from rl4co.envs.common.base import RL4COEnvBase


[docs]class FFSPEnv(RL4COEnvBase): """Flexible Flow Shop Problem (FFSP) environment. The goal is to schedule a set of jobs on a set of machines such that the makespan is minimized. Args: num_stage: number of stages num_machine: number of machines in each stage num_job: number of jobs min_time: minimum processing time of a job max_time: maximum processing time of a job batch_size: batch size of the problem Note: - [IMPORTANT] This version of ffsp requires the number of machines in each stage to be the same """ name = "ffsp" def __init__( self, num_stage: int, num_machine: int, num_job: int, min_time: float = 0.1, max_time: float = 1.0, batch_size: list = [50], **kwargs, ): super().__init__(**kwargs) self.num_stage = num_stage self.num_machine = num_machine self.num_machine_total = num_stage * num_machine self.num_job = num_job self.min_time = min_time self.max_time = max_time self.batch_size = batch_size def _step(self, td: TensorDict) -> TensorDict: # job_idx is the action from the model job_idx = td["job_idx"] time_idx = td["time_idx"] batch_idx = td["batch_idx"] machine_idx = td["machine_idx"][0] sub_time_idx = td["sub_time_idx"] schedule = td["schedule"] schedule[batch_idx, machine_idx, job_idx] = time_idx job_length = td["job_duration"][batch_idx, job_idx, machine_idx] machine_wait_step = td["machine_wait_step"] machine_wait_step[batch_idx, machine_idx] = job_length job_location = td["job_location"] job_location[batch_idx, job_idx] += 1 job_wait_step = td["job_wait_step"] job_wait_step[batch_idx, job_idx] = job_length finish = (job_location[:, : self.num_job] == self.num_stage).all(dim=-1) done = finish.all() if done: end_schedule = schedule + td["job_duration"].permute(0, 2, 1) end_time_max, _ = end_schedule[:, :, : self.job_cnt].max(dim=-1) end_time_max, _ = end_time_max.max(dim=-1) reward = end_time_max else: ready = torch.flatten(finish) idx = torch.flatten(batch_idx) idx = idx[~ready] while ~ready.all(): new_sub_time_idx = sub_time_idx[idx] + 1 step_time_required = new_sub_time_idx == self.num_machine_total time_idx[idx] += step_time_required.long() new_sub_time_idx[step_time_required] = 0 sub_time_idx[idx] = new_sub_time_idx new_machine_idx = td["machine_table"][0][new_sub_time_idx] machine_idx[idx] = new_machine_idx machine_wait_steps = machine_wait_step[idx, :] machine_wait_steps[step_time_required, :] -= 1 machine_wait_steps[machine_wait_steps < 0] = 0 machine_wait_step[idx, :] = machine_wait_steps job_wait_steps = job_wait_step[idx, :] job_wait_steps[step_time_required, :] -= 1 job_wait_steps[job_wait_steps < 0] = 0 job_wait_step[idx, :] = job_wait_steps machine_ready = machine_wait_step[idx, new_machine_idx] == 0 new_stage_idx = td["stage_table"][0][new_sub_time_idx] job_ready_1 = job_location[idx, : self.num_job] == new_stage_idx[:, None] job_ready_2 = job_wait_step[idx, : self.num_job] == 0 job_ready = (job_ready_1 & job_ready_2).any(dim=-1) ready = machine_ready & job_ready idx = idx[~ready] stage_idx = td["stage_table"][0][sub_time_idx] stage_machine_idx = td["stage_machine_table"][0][sub_time_idx] job_loc = job_location[:, : self.num_job] job_wait_time = job_wait_step[:, : self.num_job] job_in_stage = job_loc == stage_idx[:, None] job_not_waiting = job_wait_time == 0 job_available = job_in_stage & job_not_waiting job_in_previous_stages = (job_loc < stage_idx[:, None]).any(dim=-1) job_waiting_in_stage = (job_in_stage & (job_wait_time > 0)).any(dim=-1) wait_allowed = job_in_previous_stages + job_waiting_in_stage + finish job_enable = torch.cat((job_available, wait_allowed[:, None]), dim=-1) job_mask = torch.full( size=(*self.batch_size, self.num_job + 1), dtype=torch.float32, device=self.device, fill_value=float("-inf"), ) job_mask[job_enable] = 0 reward = td["reward"] return TensorDict( { "next": { "stage_table": td["stage_table"], "machine_table": td["machine_table"], "time_idx": time_idx, "sub_time_idx": sub_time_idx, "batch_idx": batch_idx, "machine_idx": machine_idx, "schedule": schedule, "machine_wait_step": machine_wait_step, "job_location": job_location, "job_wait_step": job_wait_step, "job_duration": td["job_duration"], "reward": reward, "finish": finish, # Update variables "job_mask": job_mask, "stage_idx": stage_idx, "stage_machine_idx": stage_machine_idx, } }, td.shape, ) def _reset( self, td: Optional[TensorDict] = None, batch_size: Optional[list] = None ) -> TensorDict: """ Args: Returns: - stage_table [batch_size, num_stage * num_machine] - machine_table [batch_size, num_machine * num_stage] - stage_machine_idx [batch_size, num_stage * num_machine] - time_idx [batch_size] - sub_time_idx [batch_size] - batch_idx [batch_size] - machine_idx [batch_size] - schedule [batch_size, num_machine_total, num_job+1] - machine_wait_step [batch_size, num_machine_total] - job_location [batch_size, num_job+1] - job_wait_step [batch_size, num_job+1] - job_duration [batch_size, num_job+1, num_machine * num_stage] """ if batch_size is None: batch_size = self.batch_size if td is None else td["observation"].shape[:-2] if td is None or td.is_empty(): td = self.generate_data(batch_size=batch_size) # Init stage and machine mapping table stage_table = ( torch.arange(self.num_stage, dtype=torch.long, device=self.device) .repeat_interleave(self.num_machine) .repeat(*batch_size, 1) ) machine_table = torch.arange( self.num_machine * self.num_stage, dtype=torch.long, device=self.device ).repeat(*batch_size, 1) stage_machine_table = torch.arange( self.num_machine, dtype=torch.long, device=self.device ).repeat(*batch_size, self.num_stage) # Init index record tensor time_idx = torch.zeros(size=(batch_size), dtype=torch.long, device=self.device) sub_time_idx = torch.zeros( size=(batch_size), dtype=torch.long, device=self.device ) batch_idx = torch.arange(*batch_size) machine_idx = machine_table[..., sub_time_idx] # Scheduling status information schedule = torch.full( size=(*batch_size, self.num_machine_total, self.num_job + 1), dtype=torch.long, device=self.device, fill_value=-999999, ) machine_wait_step = torch.zeros( size=(*batch_size, self.num_machine_total), dtype=torch.long, device=self.device, ) job_location = torch.zeros( size=(*batch_size, self.num_job + 1), dtype=torch.long, device=self.device, ) job_wait_step = torch.zeros( size=(*batch_size, self.num_job + 1), dtype=torch.long, device=self.device, ) job_duration = torch.empty( size=(*batch_size, self.num_job + 1, self.num_machine * self.num_stage), dtype=torch.long, device=self.device, ) job_duration[..., : self.num_job, :] = td["run_time"].view( *batch_size, self.num_job, -1 ) job_duration[..., self.num_job, :] = 0 # Finish status information reward = torch.full( size=(self.batch_size), dtype=torch.float32, device=self.device, fill_value=float("-inf"), ) finish = torch.full( size=(self.batch_size), dtype=torch.bool, device=self.device, fill_value=False, ) return TensorDict( { # Mapping table information "stage_table": stage_table, "machine_table": machine_table, "stage_machine_table": stage_machine_table, # Index information "time_idx": time_idx, "sub_time_idx": sub_time_idx, "batch_idx": batch_idx, "machine_idx": machine_idx, # Scheduling status information "schedule": schedule, "machine_wait_step": machine_wait_step, "job_location": job_location, "job_wait_step": job_wait_step, "job_duration": job_duration, # Finish status information "reward": reward, "finish": finish, }, batch_size=batch_size, ) def _make_spec(self, td_params: TensorDict): self.observation_spec = CompositeSpec( time_idx=UnboundedDiscreteTensorSpec( shape=(1,), dtype=torch.int64, ), sub_time_idx=UnboundedDiscreteTensorSpec( shape=(1,), dtype=torch.int64, ), batch_idx=UnboundedDiscreteTensorSpec( shape=(1,), dtype=torch.int64, ), machine_idx=UnboundedDiscreteTensorSpec( shape=(1,), dtype=torch.int64, ), schedule=UnboundedDiscreteTensorSpec( shape=(self.num_machine_total, self.num_job + 1), dtype=torch.int64, ), machine_wait_step=UnboundedDiscreteTensorSpec( shape=(self.num_machine_total), dtype=torch.int64, ), job_location=UnboundedDiscreteTensorSpec( shape=(self.num_job + 1), dtype=torch.int64, ), job_wait_step=UnboundedDiscreteTensorSpec( shape=(self.num_job + 1), dtype=torch.int64, ), job_duration=UnboundedDiscreteTensorSpec( shape=(self.num_job + 1, self.num_machine * self.num_stage), dtype=torch.int64, ), shape=(), ) self.input_spec = self.observation_spec.clone() self.action_spec = BoundedTensorSpec( shape=(1,), dtype=torch.int64, minimum=0, maximum=self.num_loc, ) self.reward_spec = UnboundedContinuousTensorSpec(shape=(1,)) self.done_spec = UnboundedDiscreteTensorSpec(shape=(1,), dtype=torch.bool)
[docs] def get_reward(self, td, actions) -> TensorDict: return td["reward"]
[docs] def generate_data(self, batch_size) -> TensorDict: # Batch size input check batch_size = [batch_size] if isinstance(batch_size, int) else batch_size # Init observation: running time of each job on each machine run_time = ( torch.FloatTensor(*batch_size, self.num_job, self.num_machine, self.num_stage) .uniform_(self.min_time, self.max_time) .to(self.device) ) return TensorDict( { "run_time": run_time, }, batch_size=batch_size, )
[docs] def render(self, td: TensorDict): raise NotImplementedError("TODO: render is not implemented yet")
if __name__ == "__main__": """ num_stage: int, num_machine: int, num_job: int, min_time: float = 0.1, max_time: float = 1.0, pomo_size: int = 1, batch_size: list = [50], seed: int = None, device: str = "cpu", """ env = FFSPEnv( num_stage=2, num_machine=3, num_job=4, min_time=2, max_time=10, batch_size=[5], seed=None, device="cpu", ) td = env.reset() print(td) td["job_idx"] = torch.tensor([1, 1, 1, 1, 1]) td = env._step(td) print(td) pass

© Copyright Federico Berto, Chuanbo Hua, Junyoung Park. Revision f4bc96ca.

Built with Sphinx using a theme provided by Read the Docs.