An RL Approach for Inventory Management (Part 3)

Simple Inventory Example with Unlimited Capacity (Infinite State/Action Space)

Inventory Management
Markov Decision Process
Python
Author

Kobus Esterhuysen

Published

July 5, 2022

1 Introduction

It is important for an inventory manager to maintain inventory at optimal levels for each kind of product. Too much inventory incurs a holding cost while too little inventory gives rise to a stockout cost. This balancing act could be addressed by framing this problem as a Markov Decision Process. In part 1 we looked at the Markov Process (also known as a Markov Chain). Part 2 made use of a Markov Reward Process. Both of these approaches covered sequential uncertainty. In part 3, we will make use of a Markov Decision Process. We will finally be able to address sequential decisioning. To summarize the context, Markov Processes may be presented, in order of increasing complexity, in the following way:

  • Markov Process (also called a Markov Chain) which only have states
  • Markov Reward Process which adds rewards
  • Markov Decision Process which adds actions

So, in this project we will take the Markov Reward Process from the previous project, and add the concept of an action associated with each transition. This means our inventory problem has become a Markov Decision Process.

Formally, a Markov Decision Process consists of:

  • Sets
    • a countable set of states \(\mathcal S\) (called the state space)
    • a set \(\mathcal T \subseteq \mathcal S\) (called the terminal states)
    • a countable set of actions \(\mathcal A\) (called the action space)
  • Sequences
    • a time-indexed sequence of environment-generated state random variables \(S_t \in \mathcal S\) for \(t=0,1,2,3,...\)
    • a time-indexed sequence of environment-generated reward random variables \(R_t \in \mathcal D\) (a countable subset of \(\mathbb R\)) for \(t=1,2,3,...\)
    • a time-indexed sequence of agent-controllable actions \(A_t \in \mathcal A\) for \(t=0,1,2,3,...\)
  • Markov Property \(\mathbb P[S_{t+1},R_{t+1}|S_t,A_t,S_{t-1},A_{t-1},...,S_0,A_0]=P[S_{t+1},R_{t+1}|S_t,A_t]\) for all \(t \ge 0\)
  • Termination
    • if \(S_T \in \mathcal T\) (for some time step \(T\)), then this sequence terminates at time step \(T\)

2 Inventory Costs

There are two main cost elements related to inventory items:

  • Holding cost \(h\)
    • Lost interest on cash used to buy the item
    • Upkeep cost
  • Stockout cost \(p\)
    • Lost revenue per demanded item

Note that costs may be thought of as negative rewards.

2.1 Better Decisions with a Stochastic Ordering Policy

In Part 2, the decision of how much to order (ordering policy) was simple: \[ \theta=\text{max}(C - (\alpha + \beta), 0) \] where: - \(\theta \in \mathbb Z_{\ge0}\) is the order quantity - \(C \in \mathbb Z_{\ge0}\) is the space quantity - \(\alpha\) is the on-hand inventory - \(\beta\) is the on-order inventory - \((\alpha,\beta)\) is the state

This policy tends to carry maximum inventory at all time which means the risk of running out-of-stock is very low, leading to a high holding cost \(h\). It seems that this situation is to be preferred to the other extreme, a high stockout cost \(p\) because stockout costs are generally much higher than holdings costs. In other words, we want to err on the side of having too much inventory rather than too litte. But by how much should we err? What if we can reach the optimal trade-off point which will maximize our profit? This is indeed possible with a Markov Decision Process. Instead of always having a \(\theta\) \(\text{max}(C - (\alpha + \beta), 0)\), we will now have a \(\theta\) from the set \(\{0,1,2,...,\text{max}(C - (\alpha + \beta), 0)\}\). Not only will we be able to pick from a wider array of \(\theta\) values, but the picking action will happen more intelligently. Instead of just being subject to sequential uncertainty, we will be in a position to apply sequential decisioning for better outcomes.

We now have a stochastic ordering policy \[ \begin{aligned} \pi(s,a) &= \pi(a|s) \\ &= \mathbb P[A_t=a|S_t=s] \end{aligned} \] for all \(t=0,1,2,...,\) for all \(s \in \mathcal N\), for all \(a \in \mathcal A\)

3 Problem Statement

At each time step \(t\), we need to pick an action \(A_t\) based upon the currently observed state \(S_t\).

Given the observed state \(S_t\) and a performed action \(A_t\), we can find the probabilities of the next state \(S_{t+1}\) and the next reward \(R_{t+1}\).

The underlying task is to maximize the expected return from each state. This amounts to maximizing the value function.

We need to express the transition probabilities of the Markov Decision Process, taking the rewards and actions into account: \[ \begin{aligned} \mathcal P_R(s,a,s',r) &= p(s',r|s,a) \\ &= \mathbb P[S_{t+1}=s',R_{t+1}=r|S_t=s,A_t=a] \end{aligned} \] for \(t=0,1,2,...\), for all \(s \in \mathcal N, a \in \mathcal A,r \in \mathcal D, s' \in \mathcal S\), such that \[ \sum_{s' \in \mathcal S} \sum_{r \in \mathcal D}\mathcal P_R(s,a,s'r) = 1 \] for all \(s \in \mathcal N, a \in \mathcal A\)

As before, we have the constraints:

  • daily demand for the item comes from a Poisson distribution whose probability mass function (pmf) is given by: \[ f(i) = \frac{e^{-λ}λ^i}{i!} \] where \(\lambda \in \mathbb R_{\ge 0}\) is the Poisson parameter
  • the associated cumulative density function (cdf) is given by: \[ F(i) = \sum_{j=0}^if(j) \]
  • maximum storage capacity for the item is \(C \in \mathbb Z_{\ge0}\)
  • opportunity to order daily at 18h00 when store closes
  • ordered items arrive at 06h00 (36 hours later on the day after the day you ordered); i.e. the delivery lead time is 36 hours
  • the state of the inventory operation is \((α,β)\) where
    • α is the inventory in the store at 18h00 (on-hand inventory at 18h00)
    • β is the inventory in transit, ordered the previous day; will arrive the next morning at 06h00 (on-order inventory at 18h00)
  • stochastic ordering policy \[ \begin{aligned} \pi(s,a) &= \pi(a|s) \\ &= \mathbb P[A_t=a|S_t=s] \end{aligned} \] for all \(t=0,1,2,...,\) for all \(s \in \mathcal N\), for all \(a \in \mathcal A\)

4 Daily Flow of Events

  • Observe the state \(S_t=(\alpha, \beta)\) at 18h00 (store-closing)

  • Immediately order according to the ordering policy: \[ \theta=\text{max}(C - (\alpha + \beta), 0) \] where:

    • \(\theta \in \mathbb Z_{\ge0}\) is the order quantity
    • \(C \in \mathbb Z_{\ge0}\) is the space quantity
    • \(\alpha\) is the on-hand inventory
    • \(\beta\) is the on-order inventory
    • \((\alpha,\beta)\) is the state
  • Capture any overnight holding cost

  • Receive items at 06h00 (if there was an order 36 hours ago)

  • Open the store at 08h00

  • Experience random demand according to the demand probability above

    • number of items sold for the day will be the minimum of:
      • demand on the day and
      • inventory at store opening on the day
  • Capture any stockout cost due to missed demand

  • Close the store at 18h00 and

    • observe the state \[ S_{t+1}=(\text{max}(\alpha+\beta-i,0), \text{max}(C-(\alpha+\beta),0)) \qquad \text{(4.1)} \]
    • capture the reward \[ R_{t+1}=-h⋅\alpha-p \cdot \text{max}(i-(\alpha+\beta),0) \qquad \text{(4.2)} \] which is the negative weighted sum of
      • overnight holding cost, \(h\) and the
      • day’s stockout cost, \(p\)

4.1 Unlimited Capacity

In this project we assume there is no constraint on the capacity of the inventory item. We can order any integer quantity. This means the action space is infinite. The consequence of this is that the on-hand and on-order quantities are also unbounded so that the state is also infinite. We will not be able to use the tabular dynamic programming algorithms for this case. However, there is a lot of value in modeling infinite MDPs because we can perform simulations.

5 Implementation

#hide
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
root_dir = "/content/gdrive/My Drive"
base_dir = root_dir + '/Rao_CME241/RL-book'
Mounted at /content/gdrive
!python --version
Python 3.7.15

We make use of the approach followed in http://web.stanford.edu/class/cme241/.

from typing import Tuple, Iterator
import numpy as np
from dataclasses import dataclass
from scipy.stats import poisson
import random
import itertools

5.1 Distributions

from __future__ import annotations
from abc import ABC, abstractmethod
from collections import Counter, defaultdict
from dataclasses import dataclass
import numpy as np
import random
from typing import (Callable, Dict, Generic, Iterator, Iterable,
                    Mapping, Optional, Sequence, Tuple, TypeVar)
A = TypeVar('A')
B = TypeVar('B')
class Distribution(ABC, Generic[A]):
    @abstractmethod
    def sample(self) -> A:
        pass

    def sample_n(self, n: int) -> Sequence[A]:
        return [self.sample() for _ in range(n)]

    @abstractmethod
    def expectation(
        self,
        f: Callable[[A], float]
    ) -> float:
        pass

    def map(
        self,
        f: Callable[[A], B]
    ) -> Distribution[B]:
        return SampledDistribution(lambda: f(self.sample()))

    def apply(
        self,
        f: Callable[[A], Distribution[B]]
    ) -> Distribution[B]:
        def sample():
            a = self.sample()
            b_dist = f(a)
            return b_dist.sample()
        return SampledDistribution(sample)
class SampledDistribution(Distribution[A]):
    sampler: Callable[[], A]
    expectation_samples: int

    def __init__(
        self,
        sampler: Callable[[], A],
        expectation_samples: int = 10000
    ):
        self.sampler = sampler
        self.expectation_samples = expectation_samples

    def sample(self) -> A:
        return self.sampler()

    def expectation(
        self,
        f: Callable[[A], float]
    ) -> float:
        return sum(f(self.sample()) for _ in
                   range(self.expectation_samples))/self.expectation_samples
class FiniteDistribution(Distribution[A], ABC):
    @abstractmethod
    def table(self) -> Mapping[A, float]:
        pass

    def probability(self, outcome: A) -> float:
        return self.table()[outcome]

    def map(self, f: Callable[[A], B]) -> FiniteDistribution[B]:
        result: Dict[B, float] = defaultdict(float)
        for x, p in self:
            result[f(x)] += p
        return Categorical(result)

    def sample(self) -> A:
        outcomes = list(self.table().keys())
        weights = list(self.table().values())
        return random.choices(outcomes, weights=weights)[0]

    def expectation(self, f: Callable[[A], float]) -> float:
        return sum(p * f(x) for x, p in self)

    def __iter__(self) -> Iterator[Tuple[A, float]]:
        return iter(self.table().items())

    def __eq__(self, other: object) -> bool:
        if isinstance(other, FiniteDistribution):
            return self.table() == other.table()
        else:
            return False

    def __repr__(self) -> str:
        return repr(self.table())
class Categorical(FiniteDistribution[A]):
    probabilities: Mapping[A, float]

    def __init__(self, distribution: Mapping[A, float]):
        total = sum(distribution.values())
        # Normalize probabilities to sum to 1
        self.probabilities = {outcome: probability/total
                              for outcome, probability in distribution.items()}

    def table(self) -> Mapping[A, float]:
        return self.probabilities

    def probability(self, outcome: A) -> float:
        return self.probabilities.get(outcome, 0.)
@dataclass(frozen=True)
class Constant(FiniteDistribution[A]):
    value: A

    def sample(self) -> A:
        return self.value

    def table(self) -> Mapping[A, float]:
        return {self.value: 1}

    def probability(self, outcome: A) -> float:
        return 1. if outcome == self.value else 0.

5.2 Markov Processes

S = TypeVar('S')
X = TypeVar('X')
class State(ABC, Generic[S]):
    state: S

    def on_non_terminal(
        self,
        f: Callable[[NonTerminal[S]], X],
        default: X
    ) -> X:
        if isinstance(self, NonTerminal):
            return f(self)
        else:
            return default
@dataclass(frozen=True)
class NonTerminal(State[S]):
    state: S
        
    def __eq__(self, other):
        return self.state == other.state

    def __lt__(self, other):
        return self.state < other.state
class MarkovProcess(ABC, Generic[S]):
    #. transition from this state
    @abstractmethod
    def transition(
        self, 
        state: NonTerminal[S]
    ) -> Distribution[State[S]]: #s'|s or s->s'
        pass
    
    def simulate(
        self,
        start_state_distribution: Distribution[NonTerminal[S]]
    ) -> Iterable[State[S]]:
        state: State[S] = start_state_distribution.sample()
        yield state
        while isinstance(state, NonTerminal):
            state = self.transition(state).sample()
            yield state

    def traces(
        self,
        start_state_distribution: Distribution[NonTerminal[S]]
    ) -> Iterable[Iterable[State[S]]]:
        while True:
            yield self.simulate(start_state_distribution)

5.3 Markov Reward Processes

@dataclass(frozen=True)
class TransitionStep(Generic[S]):
    state: NonTerminal[S]
    next_state: State[S]
    reward: float
    
    def add_return(self, γ: float, return_: float) -> ReturnStep[S]:
        return ReturnStep( # s -> s'r
            self.state,
            self.next_state,
            self.reward,
            return_=self.reward + γ*return_
        )
@dataclass(frozen=True)
class ReturnStep(TransitionStep[S]):
    return_: float

class MarkovRewardProcess(MarkovProcess[S]):
    #. transition from this state
    def transition(self, state: NonTerminal[S]) -> Distribution[State[S]]: #s'|s or s->s'
        distribution = self.transition_reward(state)
        def next_state(distribution=distribution):
            next_s, _ = distribution.sample() #.ignores reward
            return next_s
        return SampledDistribution(next_state)

    @abstractmethod
    def transition_reward(#. transition from this state
        self, 
        state: NonTerminal[S]
    ) -> Distribution[Tuple[State[S], float]]: #s',r|s or s->s',r
        pass
    
    def simulate_reward(
        self,
        start_state_distribution: Distribution[NonTerminal[S]]
    ) -> Iterable[TransitionStep[S]]:
        state: State[S] = start_state_distribution.sample()
        reward: float = 0.
        while isinstance(state, NonTerminal):
            next_distribution = self.transition_reward(state)
            next_state, reward = next_distribution.sample()
            yield TransitionStep(state, next_state, reward) # s -> s'r
            state = next_state

    def reward_traces(
        self,
        start_state_distribution: Distribution[NonTerminal[S]]
    ) -> Iterable[Iterable[TransitionStep[S]]]:
        while True:
            yield self.simulate_reward(start_state_distribution)

5.4 Markov Decision Processes

class MarkovDecisionProcess(ABC, Generic[S, A]):
    def apply_policy(self, policy: Policy[S, A]) -> MarkovRewardProcess[S]: #.implied MRP
        mdp = self
        class RewardProcess(MarkovRewardProcess[S]):
            def transition_reward(
                self,
                state: NonTerminal[S]
            ) -> Distribution[Tuple[State[S], float]]:
                actions: Distribution[A] = policy.act(state)
                #.return actions.apply(lambda a: mdp.step(state, a))
                return actions.apply(lambda a: mdp.transition_action(state, a))#.
        return RewardProcess()

    @abstractmethod
    def actions(self, state: NonTerminal[S]) -> Iterable[A]:
        pass

    #.def step(
    @abstractmethod
    def transition_action(#. transition from this state,action (called transition() elsewhere)
        self,
        state: NonTerminal[S],
        action: A
    ) -> Distribution[Tuple[State[S], float]]:
        pass

    #.def simulate_actions(
    def simulate_action(#.
            self,
            #.start_states: Distribution[NonTerminal[S]],
            start_state_distribution: Distribution[NonTerminal[S]],#.
            policy: Policy[S, A]
    ) -> Iterable[TransitionStep[S, A]]:
        #.state: State[S] = start_states.sample()
        state: State[S] = start_state_distribution.sample()#.
        while isinstance(state, NonTerminal):
            action_distribution = policy.act(state)
            action = action_distribution.sample()
            
            #.next_distribution = self.step(state, action)
            next_distribution = self.transition_action(state, action)#.
            next_state, reward = next_distribution.sample()
            yield TransitionStep(state, action, next_state, reward) # sa -> s'r
            state = next_state

    def action_traces(
        self,
        start_states: Distribution[NonTerminal[S]],
        policy: Policy[S, A]
    ) -> Iterable[Iterable[TransitionStep[S, A]]]:
        while True:
            #.yield self.simulate_actions(start_states, policy)
            yield self.simulate_action(start_states, policy)#.

5.4 Policies

A = TypeVar('A')
S = TypeVar('S')
class Policy(ABC, Generic[S, A]):
    @abstractmethod
    def act(self, state: NonTerminal[S]) -> Distribution[A]:
        pass
@dataclass(frozen=True)
class DeterministicPolicy(Policy[S, A]):
    action_for: Callable[[S], A]

    def act(self, state: NonTerminal[S]) -> Constant[A]:
        return Constant(self.action_for(state.state))

5.5 Inventory Problem

@dataclass(frozen=True)
class InventoryState:
    on_hand: int
    on_order: int

    def inventory_position(self) -> int:
        return self.on_hand + self.on_order
class SimpleInventoryDeterministicPolicy(
        DeterministicPolicy[InventoryState, int]
):
    def __init__(self, reorder_point: int):
        self.reorder_point: int = reorder_point

        def action_for(s: InventoryState) -> int:
            return max(self.reorder_point - s.inventory_position(), 0)

        super().__init__(action_for)
# 
#===study SimpleInventoryDeterministicPolicySimpleInventoryDeterministicPolicy
si_dp = SimpleInventoryDeterministicPolicy(reorder_point=8)
si_dp
SimpleInventoryDeterministicPolicy(action_for=<function SimpleInventoryDeterministicPolicy.__init__.<locals>.action_for at 0x7f306b899560>)
s = InventoryState(on_hand=5, on_order=0); s
InventoryState(on_hand=5, on_order=0)
si_dp.action_for(s)
3
# 
#---end study SimpleInventoryDeterministicPolicySimpleInventoryDeterministicPolicy
# 
#===study SimpleInventoryStochasticPolicy
class SimpleInventoryStochasticPolicy(Policy[InventoryState, int]):
    def __init__(self, reorder_point_poisson_mean: float):
        self.reorder_point_poisson_mean: float = reorder_point_poisson_mean

    def act(self, state: NonTerminal[InventoryState]) -> \
            SampledDistribution[int]:
        def action_func(state=state) -> int:
            reorder_point_sample: int = \
                np.random.poisson(self.reorder_point_poisson_mean)
            return max(
                reorder_point_sample - state.state.inventory_position(),
                #reorder_point_sample - state.inventory_position(), #.
                0
            )
        return SampledDistribution(action_func)
si_sp = SimpleInventoryStochasticPolicy(reorder_point_poisson_mean=8.0)
si_sp
<__main__.SimpleInventoryStochasticPolicy at 0x7f306b882290>
s = NonTerminal(InventoryState(on_hand=5, on_order=0)); s
NonTerminal(state=InventoryState(on_hand=5, on_order=0))
sd = si_sp.act(s); sd
<__main__.SampledDistribution at 0x7f3077b24b50>
sd.sampler
<function __main__.SimpleInventoryStochasticPolicy.act.<locals>.action_func(state=NonTerminal(state=InventoryState(on_hand=5, on_order=0))) -> 'int'>
sd.sample()
9
sd.sample_n(10)
[4, 0, 6, 1, 2, 0, 1, 0, 2, 5]
# 
#---end study SimpleInventoryStochasticPolicy
@dataclass(frozen=True)
class SimpleInventoryMDPNoCap(MarkovDecisionProcess[InventoryState, int]):
    poisson_lambda: float
    holding_cost: float
    stockout_cost: float

    #.def step( #. s',r|s,a
    def transition_action( #. s',r|s,a
        self,
        state: NonTerminal[InventoryState],
        order: int #. action
    ) -> SampledDistribution[Tuple[State[InventoryState], float]]:
        def sample_next_state_reward(
            state=state,
            order=order
        ) -> Tuple[State[InventoryState], float]:
            demand_sample: int = np.random.poisson(self.poisson_lambda)
            ip: int = state.state.inventory_position()
            #. s'
            next_state: InventoryState = InventoryState(
                max(ip - demand_sample, 0),
                order
            )
            #. r
            reward: float = - self.holding_cost * state.state.on_hand\
                - self.stockout_cost * max(demand_sample - ip, 0)
            #. s',r
            return NonTerminal(next_state), reward
        return SampledDistribution(sample_next_state_reward)

    def actions(self, state: NonTerminal[InventoryState]) -> Iterator[int]:
        return itertools.count(start=0, step=1)

    def fraction_of_days_oos(
        self,
        policy: Policy[InventoryState, int],
        time_steps: int,
        num_traces: int
    ) -> float:
        impl_mrp: MarkovRewardProcess[InventoryState] = self.apply_policy(policy)#.implied MRP
        count: int = 0
        high_fractile: int = int(poisson(self.poisson_lambda).ppf(0.98))
        start: InventoryState = random.choice(
            [InventoryState(i, 0) for i in range(high_fractile + 1)])
        for _ in range(num_traces):
            steps = itertools.islice(
                impl_mrp.simulate_reward(Constant(NonTerminal(start))),
                time_steps
            )
            for step in steps:
                if step.reward < -self.holding_cost * step.state.state.on_hand:
                    count += 1
        return float(count) / (time_steps * num_traces)
# 
# ===study actions(): All actions for a given state
item_poisson_lambda = 2.0
item_holding_cost = 1.0
item_stockout_cost = 10.0

item_reorder_point = 8
item_reorder_point_poisson_mean = 8.0

item_time_steps = 1000
item_num_traces = 1000

si_mdp_nocap = SimpleInventoryMDPNoCap(poisson_lambda=item_poisson_lambda,
                                        holding_cost=item_holding_cost,
                                        stockout_cost=item_stockout_cost)
si_mdp_nocap
SimpleInventoryMDPNoCap(poisson_lambda=2.0, holding_cost=1.0, stockout_cost=10.0)
s = NonTerminal(InventoryState(on_hand=3, on_order=0))
gen = si_mdp_nocap.actions(state=s); type(gen)
itertools.count
# 
# first 10 actions for state s
[next(gen) for _ in range(10)]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# 
# ---end study actions()
# 
# ===study transition_action()
sd = si_mdp_nocap.transition_action(state=(NonTerminal(InventoryState(on_hand=3, on_order=0))), order=4)
sd.sample() #s',r|s,a = s',r|(3,0),4
(NonTerminal(state=InventoryState(on_hand=3, on_order=4)), -3.0)
[sd.sample() for _ in range(10)]
[(NonTerminal(state=InventoryState(on_hand=1, on_order=4)), -3.0),
 (NonTerminal(state=InventoryState(on_hand=2, on_order=4)), -3.0),
 (NonTerminal(state=InventoryState(on_hand=1, on_order=4)), -3.0),
 (NonTerminal(state=InventoryState(on_hand=0, on_order=4)), -43.0),
 (NonTerminal(state=InventoryState(on_hand=2, on_order=4)), -3.0),
 (NonTerminal(state=InventoryState(on_hand=0, on_order=4)), -3.0),
 (NonTerminal(state=InventoryState(on_hand=1, on_order=4)), -3.0),
 (NonTerminal(state=InventoryState(on_hand=1, on_order=4)), -3.0),
 (NonTerminal(state=InventoryState(on_hand=0, on_order=4)), -23.0),
 (NonTerminal(state=InventoryState(on_hand=1, on_order=4)), -3.0)]
# 
# ---end study transition_action()
# 
# ---study fraction_of_days_oos
item_poisson_lambda = 2.0
item_holding_cost = 1.0
item_stockout_cost = 10.0
item_reorder_point_poisson_mean = 8.0
item_time_steps = 100
item_num_traces = 100
si_mdp_nocap: MarkovRewardProcess[InventoryState] = SimpleInventoryMDPNoCap(poisson_lambda=item_poisson_lambda,
                                        holding_cost=item_holding_cost,
                                        stockout_cost=item_stockout_cost)
si_sp = SimpleInventoryStochasticPolicy(
    reorder_point_poisson_mean=item_reorder_point_poisson_mean)
impl_mrp = si_mdp_nocap.apply_policy(si_sp)#.implied MRP
impl_mrp
<__main__.MarkovDecisionProcess.apply_policy.<locals>.RewardProcess at 0x7f306b89c590>
# 
# poisson(si_mdp_nocap.poisson_lambda)
#. percent-point-function (ppf) is the inverse of cumulative distribution function (cdf):
# poisson(si_mdp_nocap.poisson_lambda).ppf(0.98)
# int(poisson(si_mdp_nocap.poisson_lambda).ppf(0.98))
high_fractile: int = int(poisson(si_mdp_nocap.poisson_lambda).ppf(0.98))
high_fractile
5
[InventoryState(i, 0) for i in range(high_fractile + 1)]
[InventoryState(on_hand=0, on_order=0),
 InventoryState(on_hand=1, on_order=0),
 InventoryState(on_hand=2, on_order=0),
 InventoryState(on_hand=3, on_order=0),
 InventoryState(on_hand=4, on_order=0),
 InventoryState(on_hand=5, on_order=0)]
start: InventoryState = random.choice(
  [InventoryState(i, 0) for i in range(high_fractile + 1)])
start
InventoryState(on_hand=5, on_order=0)
for _ in range(item_num_traces):
    steps = itertools.islice(
        impl_mrp.simulate_reward(Constant(NonTerminal(start))),
        item_time_steps
    )
steps_list = list(steps); steps_list
my_step = steps_list[7]; my_step
    # for step in steps:
    #     if step.reward < -si_mdp_nocap.holding_cost*step.state.state.on_hand:
    #         count += 1
TransitionStep(state=NonTerminal(state=InventoryState(on_hand=2, on_order=4)), next_state=NonTerminal(state=InventoryState(on_hand=3, on_order=3)), reward=-2.0)
print(f"type(my_step): {type(my_step)}")
print(f"my_step.state: {my_step.state}")
print(f"my_step.next_state: {my_step.next_state}")
print(f"my_step.reward: {my_step.reward}")
type(my_step): <class '__main__.TransitionStep'>
my_step.state: NonTerminal(state=InventoryState(on_hand=2, on_order=4))
my_step.next_state: NonTerminal(state=InventoryState(on_hand=3, on_order=3))
my_step.reward: -2.0
count = 0
for _ in range(item_num_traces):
    steps = itertools.islice(
        impl_mrp.simulate_reward(Constant(NonTerminal(start))),
        item_time_steps
    )
    # steps_list = list(steps); steps_list
    # my_step = steps_list[7]; my_step
    for step in steps:
        if step.reward < -si_mdp_nocap.holding_cost*step.state.state.on_hand:
            count += 1
count
279
# 
# ---end study fraction_of_days_oos

5.5.1 Experiment

item_poisson_lambda = 2.0
item_holding_cost = 1.0
item_stockout_cost = 10.0

item_reorder_point = 8
item_reorder_point_poisson_mean = 8.0

item_time_steps = 100
item_num_traces = 100

si_mdp_nocap = SimpleInventoryMDPNoCap(poisson_lambda=item_poisson_lambda,
                                        holding_cost=item_holding_cost,
                                        stockout_cost=item_stockout_cost)

# Deterministic Policy
si_dp = SimpleInventoryDeterministicPolicy(
    reorder_point=item_reorder_point
)
oos_frac_dp = si_mdp_nocap.fraction_of_days_oos(policy=si_dp,
                                            time_steps=item_time_steps,
                                            num_traces=item_num_traces)
print(f"Deterministic Policy: {oos_frac_dp*100:.2f}% Out-Of-Stock days")

# Stochastic Policy
si_sp = SimpleInventoryStochasticPolicy(
    reorder_point_poisson_mean=item_reorder_point_poisson_mean
)
oos_frac_sp = si_mdp_nocap.fraction_of_days_oos(policy=si_sp,
                                                time_steps=item_time_steps,
                                                num_traces=item_num_traces)
print(f"Stochastic Policy: {oos_frac_sp*100:.2f}% Out-Of-Stock days")
Deterministic Policy: 2.13% Out-Of-Stock days
Stochastic Policy: 4.07% Out-Of-Stock days

Do again but sweep through a number of reorder_point/reorder_point_poisson_mean values:

tuples = \
[
    (
      rp, 
      si_mdp_nocap.fraction_of_days_oos(
          policy=SimpleInventoryDeterministicPolicy(reorder_point=rp), 
          time_steps=item_time_steps, 
          num_traces=item_num_traces
      ),
      si_mdp_nocap.fraction_of_days_oos(
          policy=SimpleInventoryStochasticPolicy(reorder_point_poisson_mean=rp), 
          time_steps=item_time_steps, 
          num_traces=item_num_traces
      )     
    ) 
    for rp in range(0,15)
]
tuples
[(0, 0.8436, 0.8516),
 (1, 0.7268, 0.6436),
 (2, 0.5586, 0.4831),
 (3, 0.4045, 0.3449),
 (4, 0.2708, 0.2513),
 (5, 0.1658, 0.1562),
 (6, 0.096, 0.1031),
 (7, 0.0521, 0.0654),
 (8, 0.0164, 0.0306),
 (9, 0.0084, 0.0181),
 (10, 0.0064, 0.0122),
 (11, 0.0068, 0.0121),
 (12, 0.001, 0.0036),
 (13, 0.0061, 0.001),
 (14, 0.0085, 0.0091)]
rp, dp, sp = [list(tup) for tup in zip(*tuples)]; rp, dp, sp
([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14],
 [0.8436,
  0.7268,
  0.5586,
  0.4045,
  0.2708,
  0.1658,
  0.096,
  0.0521,
  0.0164,
  0.0084,
  0.0064,
  0.0068,
  0.001,
  0.0061,
  0.0085],
 [0.8516,
  0.6436,
  0.4831,
  0.3449,
  0.2513,
  0.1562,
  0.1031,
  0.0654,
  0.0306,
  0.0181,
  0.0122,
  0.0121,
  0.0036,
  0.001,
  0.0091])

6 Visualization

6.1 Out-Of-Stock Days versus Reorder Level

Next we visualize how the fraction of out-of-stock days varies with increasing reorder point levels for the cases of both a deterministic policy and a stochastic policy.

# 
# convert to percentages
dp = [100*i for i in dp]
sp = [100*i for i in sp]
import matplotlib.pyplot as plt
fig,axs = plt.subplots(figsize=(14,10))
axs.set_xlabel('reorder_point/reorder_point_poisson_mean', fontsize=20)
axs.set_ylabel('Out-Of-Stock Days (%)', fontsize=20)
axs.set_title(f'Out-Of-Stock Days vs Reorder Level', fontsize=24)
axs.plot(dp, color='b', label='Deterministic Policy')
axs.plot(sp, color='k', label='Stochastic Policy')
axs.legend(fontsize=20);