#hide
from google.colab import drive
'/content/gdrive', force_remount=True)
drive.mount(= "/content/gdrive/My Drive"
root_dir = root_dir + '/Rao_CME241/RL-book' base_dir
Mounted at /content/gdrive
Simple Inventory Example with Unlimited Capacity (Infinite State/Action Space)
Kobus Esterhuysen
July 5, 2022
It is important for an inventory manager to maintain inventory at optimal levels for each kind of product. Too much inventory incurs a holding cost while too little inventory gives rise to a stockout cost. This balancing act could be addressed by framing this problem as a Markov Decision Process. In part 1 we looked at the Markov Process (also known as a Markov Chain). Part 2 made use of a Markov Reward Process. Both of these approaches covered sequential uncertainty. In part 3, we will make use of a Markov Decision Process. We will finally be able to address sequential decisioning. To summarize the context, Markov Processes may be presented, in order of increasing complexity, in the following way:
So, in this project we will take the Markov Reward Process from the previous project, and add the concept of an action associated with each transition. This means our inventory problem has become a Markov Decision Process.
Formally, a Markov Decision Process consists of:
There are two main cost elements related to inventory items:
Note that costs may be thought of as negative rewards.
In Part 2, the decision of how much to order (ordering policy) was simple: \[ \theta=\text{max}(C - (\alpha + \beta), 0) \] where: - \(\theta \in \mathbb Z_{\ge0}\) is the order quantity - \(C \in \mathbb Z_{\ge0}\) is the space quantity - \(\alpha\) is the on-hand inventory - \(\beta\) is the on-order inventory - \((\alpha,\beta)\) is the state
This policy tends to carry maximum inventory at all time which means the risk of running out-of-stock is very low, leading to a high holding cost \(h\). It seems that this situation is to be preferred to the other extreme, a high stockout cost \(p\) because stockout costs are generally much higher than holdings costs. In other words, we want to err on the side of having too much inventory rather than too litte. But by how much should we err? What if we can reach the optimal trade-off point which will maximize our profit? This is indeed possible with a Markov Decision Process. Instead of always having a \(\theta\) \(\text{max}(C - (\alpha + \beta), 0)\), we will now have a \(\theta\) from the set \(\{0,1,2,...,\text{max}(C - (\alpha + \beta), 0)\}\). Not only will we be able to pick from a wider array of \(\theta\) values, but the picking action will happen more intelligently. Instead of just being subject to sequential uncertainty, we will be in a position to apply sequential decisioning for better outcomes.
We now have a stochastic ordering policy \[ \begin{aligned} \pi(s,a) &= \pi(a|s) \\ &= \mathbb P[A_t=a|S_t=s] \end{aligned} \] for all \(t=0,1,2,...,\) for all \(s \in \mathcal N\), for all \(a \in \mathcal A\)
At each time step \(t\), we need to pick an action \(A_t\) based upon the currently observed state \(S_t\).
Given the observed state \(S_t\) and a performed action \(A_t\), we can find the probabilities of the next state \(S_{t+1}\) and the next reward \(R_{t+1}\).
The underlying task is to maximize the expected return from each state. This amounts to maximizing the value function.
We need to express the transition probabilities of the Markov Decision Process, taking the rewards and actions into account: \[ \begin{aligned} \mathcal P_R(s,a,s',r) &= p(s',r|s,a) \\ &= \mathbb P[S_{t+1}=s',R_{t+1}=r|S_t=s,A_t=a] \end{aligned} \] for \(t=0,1,2,...\), for all \(s \in \mathcal N, a \in \mathcal A,r \in \mathcal D, s' \in \mathcal S\), such that \[ \sum_{s' \in \mathcal S} \sum_{r \in \mathcal D}\mathcal P_R(s,a,s'r) = 1 \] for all \(s \in \mathcal N, a \in \mathcal A\)
As before, we have the constraints:
Observe the state \(S_t=(\alpha, \beta)\) at 18h00 (store-closing)
Immediately order according to the ordering policy: \[ \theta=\text{max}(C - (\alpha + \beta), 0) \] where:
Capture any overnight holding cost
Receive items at 06h00 (if there was an order 36 hours ago)
Open the store at 08h00
Experience random demand according to the demand probability above
Capture any stockout cost due to missed demand
Close the store at 18h00 and
In this project we assume there is no constraint on the capacity of the inventory item. We can order any integer quantity. This means the action space is infinite. The consequence of this is that the on-hand and on-order quantities are also unbounded so that the state is also infinite. We will not be able to use the tabular dynamic programming algorithms for this case. However, there is a lot of value in modeling infinite MDPs because we can perform simulations.
#hide
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
root_dir = "/content/gdrive/My Drive"
base_dir = root_dir + '/Rao_CME241/RL-book'
Mounted at /content/gdrive
We make use of the approach followed in http://web.stanford.edu/class/cme241/.
class Distribution(ABC, Generic[A]):
@abstractmethod
def sample(self) -> A:
pass
def sample_n(self, n: int) -> Sequence[A]:
return [self.sample() for _ in range(n)]
@abstractmethod
def expectation(
self,
f: Callable[[A], float]
) -> float:
pass
def map(
self,
f: Callable[[A], B]
) -> Distribution[B]:
return SampledDistribution(lambda: f(self.sample()))
def apply(
self,
f: Callable[[A], Distribution[B]]
) -> Distribution[B]:
def sample():
a = self.sample()
b_dist = f(a)
return b_dist.sample()
return SampledDistribution(sample)
class SampledDistribution(Distribution[A]):
sampler: Callable[[], A]
expectation_samples: int
def __init__(
self,
sampler: Callable[[], A],
expectation_samples: int = 10000
):
self.sampler = sampler
self.expectation_samples = expectation_samples
def sample(self) -> A:
return self.sampler()
def expectation(
self,
f: Callable[[A], float]
) -> float:
return sum(f(self.sample()) for _ in
range(self.expectation_samples))/self.expectation_samples
class FiniteDistribution(Distribution[A], ABC):
@abstractmethod
def table(self) -> Mapping[A, float]:
pass
def probability(self, outcome: A) -> float:
return self.table()[outcome]
def map(self, f: Callable[[A], B]) -> FiniteDistribution[B]:
result: Dict[B, float] = defaultdict(float)
for x, p in self:
result[f(x)] += p
return Categorical(result)
def sample(self) -> A:
outcomes = list(self.table().keys())
weights = list(self.table().values())
return random.choices(outcomes, weights=weights)[0]
def expectation(self, f: Callable[[A], float]) -> float:
return sum(p * f(x) for x, p in self)
def __iter__(self) -> Iterator[Tuple[A, float]]:
return iter(self.table().items())
def __eq__(self, other: object) -> bool:
if isinstance(other, FiniteDistribution):
return self.table() == other.table()
else:
return False
def __repr__(self) -> str:
return repr(self.table())
class Categorical(FiniteDistribution[A]):
probabilities: Mapping[A, float]
def __init__(self, distribution: Mapping[A, float]):
total = sum(distribution.values())
# Normalize probabilities to sum to 1
self.probabilities = {outcome: probability/total
for outcome, probability in distribution.items()}
def table(self) -> Mapping[A, float]:
return self.probabilities
def probability(self, outcome: A) -> float:
return self.probabilities.get(outcome, 0.)
class MarkovProcess(ABC, Generic[S]):
#. transition from this state
@abstractmethod
def transition(
self,
state: NonTerminal[S]
) -> Distribution[State[S]]: #s'|s or s->s'
pass
def simulate(
self,
start_state_distribution: Distribution[NonTerminal[S]]
) -> Iterable[State[S]]:
state: State[S] = start_state_distribution.sample()
yield state
while isinstance(state, NonTerminal):
state = self.transition(state).sample()
yield state
def traces(
self,
start_state_distribution: Distribution[NonTerminal[S]]
) -> Iterable[Iterable[State[S]]]:
while True:
yield self.simulate(start_state_distribution)
@dataclass(frozen=True)
class ReturnStep(TransitionStep[S]):
return_: float
class MarkovRewardProcess(MarkovProcess[S]):
#. transition from this state
def transition(self, state: NonTerminal[S]) -> Distribution[State[S]]: #s'|s or s->s'
distribution = self.transition_reward(state)
def next_state(distribution=distribution):
next_s, _ = distribution.sample() #.ignores reward
return next_s
return SampledDistribution(next_state)
@abstractmethod
def transition_reward(#. transition from this state
self,
state: NonTerminal[S]
) -> Distribution[Tuple[State[S], float]]: #s',r|s or s->s',r
pass
def simulate_reward(
self,
start_state_distribution: Distribution[NonTerminal[S]]
) -> Iterable[TransitionStep[S]]:
state: State[S] = start_state_distribution.sample()
reward: float = 0.
while isinstance(state, NonTerminal):
next_distribution = self.transition_reward(state)
next_state, reward = next_distribution.sample()
yield TransitionStep(state, next_state, reward) # s -> s'r
state = next_state
def reward_traces(
self,
start_state_distribution: Distribution[NonTerminal[S]]
) -> Iterable[Iterable[TransitionStep[S]]]:
while True:
yield self.simulate_reward(start_state_distribution)
class MarkovDecisionProcess(ABC, Generic[S, A]):
def apply_policy(self, policy: Policy[S, A]) -> MarkovRewardProcess[S]: #.implied MRP
mdp = self
class RewardProcess(MarkovRewardProcess[S]):
def transition_reward(
self,
state: NonTerminal[S]
) -> Distribution[Tuple[State[S], float]]:
actions: Distribution[A] = policy.act(state)
#.return actions.apply(lambda a: mdp.step(state, a))
return actions.apply(lambda a: mdp.transition_action(state, a))#.
return RewardProcess()
@abstractmethod
def actions(self, state: NonTerminal[S]) -> Iterable[A]:
pass
#.def step(
@abstractmethod
def transition_action(#. transition from this state,action (called transition() elsewhere)
self,
state: NonTerminal[S],
action: A
) -> Distribution[Tuple[State[S], float]]:
pass
#.def simulate_actions(
def simulate_action(#.
self,
#.start_states: Distribution[NonTerminal[S]],
start_state_distribution: Distribution[NonTerminal[S]],#.
policy: Policy[S, A]
) -> Iterable[TransitionStep[S, A]]:
#.state: State[S] = start_states.sample()
state: State[S] = start_state_distribution.sample()#.
while isinstance(state, NonTerminal):
action_distribution = policy.act(state)
action = action_distribution.sample()
#.next_distribution = self.step(state, action)
next_distribution = self.transition_action(state, action)#.
next_state, reward = next_distribution.sample()
yield TransitionStep(state, action, next_state, reward) # sa -> s'r
state = next_state
def action_traces(
self,
start_states: Distribution[NonTerminal[S]],
policy: Policy[S, A]
) -> Iterable[Iterable[TransitionStep[S, A]]]:
while True:
#.yield self.simulate_actions(start_states, policy)
yield self.simulate_action(start_states, policy)#.
#
#===study SimpleInventoryDeterministicPolicySimpleInventoryDeterministicPolicy
si_dp = SimpleInventoryDeterministicPolicy(reorder_point=8)
si_dp
SimpleInventoryDeterministicPolicy(action_for=<function SimpleInventoryDeterministicPolicy.__init__.<locals>.action_for at 0x7f306b899560>)
class SimpleInventoryStochasticPolicy(Policy[InventoryState, int]):
def __init__(self, reorder_point_poisson_mean: float):
self.reorder_point_poisson_mean: float = reorder_point_poisson_mean
def act(self, state: NonTerminal[InventoryState]) -> \
SampledDistribution[int]:
def action_func(state=state) -> int:
reorder_point_sample: int = \
np.random.poisson(self.reorder_point_poisson_mean)
return max(
reorder_point_sample - state.state.inventory_position(),
#reorder_point_sample - state.inventory_position(), #.
0
)
return SampledDistribution(action_func)
<__main__.SimpleInventoryStochasticPolicy at 0x7f306b882290>
NonTerminal(state=InventoryState(on_hand=5, on_order=0))
<function __main__.SimpleInventoryStochasticPolicy.act.<locals>.action_func(state=NonTerminal(state=InventoryState(on_hand=5, on_order=0))) -> 'int'>
@dataclass(frozen=True)
class SimpleInventoryMDPNoCap(MarkovDecisionProcess[InventoryState, int]):
poisson_lambda: float
holding_cost: float
stockout_cost: float
#.def step( #. s',r|s,a
def transition_action( #. s',r|s,a
self,
state: NonTerminal[InventoryState],
order: int #. action
) -> SampledDistribution[Tuple[State[InventoryState], float]]:
def sample_next_state_reward(
state=state,
order=order
) -> Tuple[State[InventoryState], float]:
demand_sample: int = np.random.poisson(self.poisson_lambda)
ip: int = state.state.inventory_position()
#. s'
next_state: InventoryState = InventoryState(
max(ip - demand_sample, 0),
order
)
#. r
reward: float = - self.holding_cost * state.state.on_hand\
- self.stockout_cost * max(demand_sample - ip, 0)
#. s',r
return NonTerminal(next_state), reward
return SampledDistribution(sample_next_state_reward)
def actions(self, state: NonTerminal[InventoryState]) -> Iterator[int]:
return itertools.count(start=0, step=1)
def fraction_of_days_oos(
self,
policy: Policy[InventoryState, int],
time_steps: int,
num_traces: int
) -> float:
impl_mrp: MarkovRewardProcess[InventoryState] = self.apply_policy(policy)#.implied MRP
count: int = 0
high_fractile: int = int(poisson(self.poisson_lambda).ppf(0.98))
start: InventoryState = random.choice(
[InventoryState(i, 0) for i in range(high_fractile + 1)])
for _ in range(num_traces):
steps = itertools.islice(
impl_mrp.simulate_reward(Constant(NonTerminal(start))),
time_steps
)
for step in steps:
if step.reward < -self.holding_cost * step.state.state.on_hand:
count += 1
return float(count) / (time_steps * num_traces)
item_poisson_lambda = 2.0
item_holding_cost = 1.0
item_stockout_cost = 10.0
item_reorder_point = 8
item_reorder_point_poisson_mean = 8.0
item_time_steps = 1000
item_num_traces = 1000
si_mdp_nocap = SimpleInventoryMDPNoCap(poisson_lambda=item_poisson_lambda,
holding_cost=item_holding_cost,
stockout_cost=item_stockout_cost)
si_mdp_nocap
SimpleInventoryMDPNoCap(poisson_lambda=2.0, holding_cost=1.0, stockout_cost=10.0)
s = NonTerminal(InventoryState(on_hand=3, on_order=0))
gen = si_mdp_nocap.actions(state=s); type(gen)
itertools.count
(NonTerminal(state=InventoryState(on_hand=3, on_order=4)), -3.0)
[(NonTerminal(state=InventoryState(on_hand=1, on_order=4)), -3.0),
(NonTerminal(state=InventoryState(on_hand=2, on_order=4)), -3.0),
(NonTerminal(state=InventoryState(on_hand=1, on_order=4)), -3.0),
(NonTerminal(state=InventoryState(on_hand=0, on_order=4)), -43.0),
(NonTerminal(state=InventoryState(on_hand=2, on_order=4)), -3.0),
(NonTerminal(state=InventoryState(on_hand=0, on_order=4)), -3.0),
(NonTerminal(state=InventoryState(on_hand=1, on_order=4)), -3.0),
(NonTerminal(state=InventoryState(on_hand=1, on_order=4)), -3.0),
(NonTerminal(state=InventoryState(on_hand=0, on_order=4)), -23.0),
(NonTerminal(state=InventoryState(on_hand=1, on_order=4)), -3.0)]
item_poisson_lambda = 2.0
item_holding_cost = 1.0
item_stockout_cost = 10.0
item_reorder_point_poisson_mean = 8.0
item_time_steps = 100
item_num_traces = 100
si_mdp_nocap: MarkovRewardProcess[InventoryState] = SimpleInventoryMDPNoCap(poisson_lambda=item_poisson_lambda,
holding_cost=item_holding_cost,
stockout_cost=item_stockout_cost)
si_sp = SimpleInventoryStochasticPolicy(
reorder_point_poisson_mean=item_reorder_point_poisson_mean)
<__main__.MarkovDecisionProcess.apply_policy.<locals>.RewardProcess at 0x7f306b89c590>
#
# poisson(si_mdp_nocap.poisson_lambda)
#. percent-point-function (ppf) is the inverse of cumulative distribution function (cdf):
# poisson(si_mdp_nocap.poisson_lambda).ppf(0.98)
# int(poisson(si_mdp_nocap.poisson_lambda).ppf(0.98))
high_fractile: int = int(poisson(si_mdp_nocap.poisson_lambda).ppf(0.98))
high_fractile
5
[InventoryState(on_hand=0, on_order=0),
InventoryState(on_hand=1, on_order=0),
InventoryState(on_hand=2, on_order=0),
InventoryState(on_hand=3, on_order=0),
InventoryState(on_hand=4, on_order=0),
InventoryState(on_hand=5, on_order=0)]
start: InventoryState = random.choice(
[InventoryState(i, 0) for i in range(high_fractile + 1)])
start
InventoryState(on_hand=5, on_order=0)
for _ in range(item_num_traces):
steps = itertools.islice(
impl_mrp.simulate_reward(Constant(NonTerminal(start))),
item_time_steps
)
steps_list = list(steps); steps_list
my_step = steps_list[7]; my_step
# for step in steps:
# if step.reward < -si_mdp_nocap.holding_cost*step.state.state.on_hand:
# count += 1
TransitionStep(state=NonTerminal(state=InventoryState(on_hand=2, on_order=4)), next_state=NonTerminal(state=InventoryState(on_hand=3, on_order=3)), reward=-2.0)
print(f"type(my_step): {type(my_step)}")
print(f"my_step.state: {my_step.state}")
print(f"my_step.next_state: {my_step.next_state}")
print(f"my_step.reward: {my_step.reward}")
type(my_step): <class '__main__.TransitionStep'>
my_step.state: NonTerminal(state=InventoryState(on_hand=2, on_order=4))
my_step.next_state: NonTerminal(state=InventoryState(on_hand=3, on_order=3))
my_step.reward: -2.0
count = 0
for _ in range(item_num_traces):
steps = itertools.islice(
impl_mrp.simulate_reward(Constant(NonTerminal(start))),
item_time_steps
)
# steps_list = list(steps); steps_list
# my_step = steps_list[7]; my_step
for step in steps:
if step.reward < -si_mdp_nocap.holding_cost*step.state.state.on_hand:
count += 1
count
279
item_poisson_lambda = 2.0
item_holding_cost = 1.0
item_stockout_cost = 10.0
item_reorder_point = 8
item_reorder_point_poisson_mean = 8.0
item_time_steps = 100
item_num_traces = 100
si_mdp_nocap = SimpleInventoryMDPNoCap(poisson_lambda=item_poisson_lambda,
holding_cost=item_holding_cost,
stockout_cost=item_stockout_cost)
# Deterministic Policy
si_dp = SimpleInventoryDeterministicPolicy(
reorder_point=item_reorder_point
)
oos_frac_dp = si_mdp_nocap.fraction_of_days_oos(policy=si_dp,
time_steps=item_time_steps,
num_traces=item_num_traces)
print(f"Deterministic Policy: {oos_frac_dp*100:.2f}% Out-Of-Stock days")
# Stochastic Policy
si_sp = SimpleInventoryStochasticPolicy(
reorder_point_poisson_mean=item_reorder_point_poisson_mean
)
oos_frac_sp = si_mdp_nocap.fraction_of_days_oos(policy=si_sp,
time_steps=item_time_steps,
num_traces=item_num_traces)
print(f"Stochastic Policy: {oos_frac_sp*100:.2f}% Out-Of-Stock days")
Deterministic Policy: 2.13% Out-Of-Stock days
Stochastic Policy: 4.07% Out-Of-Stock days
Do again but sweep through a number of reorder_point
/reorder_point_poisson_mean
values:
tuples = \
[
(
rp,
si_mdp_nocap.fraction_of_days_oos(
policy=SimpleInventoryDeterministicPolicy(reorder_point=rp),
time_steps=item_time_steps,
num_traces=item_num_traces
),
si_mdp_nocap.fraction_of_days_oos(
policy=SimpleInventoryStochasticPolicy(reorder_point_poisson_mean=rp),
time_steps=item_time_steps,
num_traces=item_num_traces
)
)
for rp in range(0,15)
]
tuples
[(0, 0.8436, 0.8516),
(1, 0.7268, 0.6436),
(2, 0.5586, 0.4831),
(3, 0.4045, 0.3449),
(4, 0.2708, 0.2513),
(5, 0.1658, 0.1562),
(6, 0.096, 0.1031),
(7, 0.0521, 0.0654),
(8, 0.0164, 0.0306),
(9, 0.0084, 0.0181),
(10, 0.0064, 0.0122),
(11, 0.0068, 0.0121),
(12, 0.001, 0.0036),
(13, 0.0061, 0.001),
(14, 0.0085, 0.0091)]
([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14],
[0.8436,
0.7268,
0.5586,
0.4045,
0.2708,
0.1658,
0.096,
0.0521,
0.0164,
0.0084,
0.0064,
0.0068,
0.001,
0.0061,
0.0085],
[0.8516,
0.6436,
0.4831,
0.3449,
0.2513,
0.1562,
0.1031,
0.0654,
0.0306,
0.0181,
0.0122,
0.0121,
0.0036,
0.001,
0.0091])
Next we visualize how the fraction of out-of-stock days varies with increasing reorder point levels for the cases of both a deterministic policy and a stochastic policy.
import matplotlib.pyplot as plt
fig,axs = plt.subplots(figsize=(14,10))
axs.set_xlabel('reorder_point/reorder_point_poisson_mean', fontsize=20)
axs.set_ylabel('Out-Of-Stock Days (%)', fontsize=20)
axs.set_title(f'Out-Of-Stock Days vs Reorder Level', fontsize=24)
axs.plot(dp, color='b', label='Deterministic Policy')
axs.plot(sp, color='k', label='Stochastic Policy')
axs.legend(fontsize=20);