diff --git a/dev-requirements.txt b/dev-requirements.txt index 0b789d7..ffbcefd 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -6,6 +6,7 @@ numpy>=1.23.0 ortools>=9.1.9552 pandas>=1.3.5 joblib>0.17 +pydantic==1.10.7 sphinx sphinx_gallery sphinx_rtd_theme diff --git a/docs/release_notes.rst b/docs/release_notes.rst index fabda02..5fa18dc 100644 --- a/docs/release_notes.rst +++ b/docs/release_notes.rst @@ -12,6 +12,11 @@ Features: * Remove support for python 3.7 and add support for python up to 3.11 * Update the project dependencies +* Support of non interger transactions. This allows for more accurate agent estimations. +* Replacement of parameter grid with automatic dictionary handeling for multiple scenarios and multiple parameters. +* Added enforce_trafficker_requirements to allow final rounding towards intergers. If using data for capacity estimations it is adviced to do rounding in your final step. +* Added pydantic based data model for ErlangC to ensure correctness of input values. +* ErlangC renamed to erlang for future expansions. What's new in 0.5.0 ------------------- diff --git a/pyworkforce/__init__.py b/pyworkforce/__init__.py index b02bb47..9c0de15 100644 --- a/pyworkforce/__init__.py +++ b/pyworkforce/__init__.py @@ -1,11 +1,10 @@ -from .queuing import ErlangC, MultiErlangC +from .queuing import ErlangC from .scheduling import MinRequiredResources, MinAbsDifference from .rostering import MinHoursRoster from ._version import __version__ __all__ = [ "ErlangC", - "MultiErlangC", "MinRequiredResources", "MinAbsDifference", "MinHoursRoster", diff --git a/pyworkforce/queuing/__init__.py b/pyworkforce/queuing/__init__.py index 41a2154..4960046 100644 --- a/pyworkforce/queuing/__init__.py +++ b/pyworkforce/queuing/__init__.py @@ -1,3 +1,3 @@ -from pyworkforce.queuing.erlang import ErlangC, MultiErlangC +from pyworkforce.queuing.erlang import ErlangC -__all__ = ["ErlangC", "MultiErlangC"] +__all__ = ["ErlangC"] \ No newline at end of file diff --git a/pyworkforce/queuing/erlang.py b/pyworkforce/queuing/erlang.py index c6c2a79..23a0879 100644 --- a/pyworkforce/queuing/erlang.py +++ b/pyworkforce/queuing/erlang.py @@ -1,14 +1,25 @@ -from math import exp, ceil, floor +import warnings +import pandas as pd +import numpy as np +from math import exp, gamma, ceil from pyworkforce.utils import ParameterGrid -from joblib import Parallel, delayed +from collections.abc import Iterable +from pydantic import BaseModel, ValidationError, validator, Field, root_validator +from pydantic.typing import Dict, Optional +# Import constants +from pyworkforce.queuing.queueing_constants import cErlangC_generic_variables -class ErlangC: +def raise_value_errors(message): """ - Computes the number of positions required to attend a number of transactions in a - queue system based on erlangc.rst. Implementation inspired on: - https://lucidmanager.org/data-science/call-centre-workforce-planning-erlang-c-in-r/ + Raise validation errors + """ + raise ValidationError(message) +class ErlangCData(BaseModel): + f""" + Contains the generic parameters expected for ErlangC. The model contains {cErlangC_generic_variables} + Parameters ---------- transactions: float, @@ -21,35 +32,40 @@ class ErlangC: Interval length (minutes) where the transactions come in shrinkage: float, Percentage of time that an operator unit is not available. - """ - - def __init__(self, transactions: float, aht: float, asa: float, - interval: int, shrinkage=0.0, - **kwargs): - - if transactions <= 0: - raise ValueError("transactions can't be smaller or equals than 0") - if aht <= 0: - raise ValueError("aht can't be smaller or equals than 0") - - if asa <= 0: - raise ValueError("asa can't be smaller or equals than 0") - - if interval <= 0: - raise ValueError("interval can't be smaller or equals than 0") - - if shrinkage < 0 or shrinkage >= 1: - raise ValueError("shrinkage must be between in the interval [0,1)") + """ + transactions: float = Field(gt=0.0) + aht: float= Field(gt=0.0) + asa: float = Field(gt=0.0) + shrinkage: float = Field(0.0, ge=0.0, le=1.0) + interval: int = Field(gt=0.0) + service_level_target: float = Field(gt=0.0, le=1.0) + achieved_service_level: Optional[float] = Field(gt=0.0, le=1.0) + raw_positions: Optional[float] = Field(ge=0.0) + positions: Optional[float] = Field(ge=0.0) + maximum_occupancy: Optional[float] = Field(1.0, ge=0.0) + waiting_probability: Optional[float] = Field(ge=0.0) + achieved_occupancy: Optional[float] = Field(ge=0.0) + intensity: Optional[float] = Field(gt=0.0) # set as optional for creation purposes, if user specified than check if calculation matches + + @root_validator + def calculate_intensity(cls, values): + """ + Calculates the intensity for Erlang calculations + """ + + intensity = (values["transactions"] / values["interval"]) * values['aht'] + if values['intensity'] is not None: + # If exists check if user value makes sense + if values['intensity'] != intensity: + warnings.warn(f"specified intensity: {values['intensity']} does not match calculated intensity: {intensity}, please check if this is desired") + else: + # add value + values['intensity'] = intensity - self.n_transactions = transactions - self.aht = aht - self.interval = interval - self.asa = asa - self.intensity = (self.n_transactions / self.interval) * self.aht - self.shrinkage = shrinkage + return values - def waiting_probability(self, positions: int, scale_positions: bool = False): + def calculate_waiting_probability(self): """ Returns the probability of waiting in the queue @@ -57,24 +73,19 @@ def waiting_probability(self, positions: int, scale_positions: bool = False): ---------- positions: int, The number of positions to attend the transactions. - scale_positions: bool, default=False - Set it to True if the positions were calculated using shrinkage. """ + # Gamma distribution is extended to treat floats with factorials with n+1, this is the upper part of the erlang C equation + substitution_position_estimate = (self.intensity** self.raw_positions / gamma(self.raw_positions+1)) * (self.raw_positions / (self.raw_positions-self.intensity) ) - if scale_positions: - productive_positions = floor((1 - self.shrinkage) * positions) - else: - productive_positions = positions - - erlang_b_inverse = 1 - for position in range(1, productive_positions + 1): - erlang_b_inverse = 1 + (erlang_b_inverse * position / self.intensity) - - erlang_b = 1 / erlang_b_inverse - return productive_positions * erlang_b / (productive_positions - self.intensity * (1 - erlang_b)) + #Sum of erlang series + erlang_b = 1 + for position in np.arange(1, self.raw_positions, 1): + erlang_b += (self.intensity**position) / gamma(position+1) + + self.waiting_probability = substitution_position_estimate / (erlang_b + substitution_position_estimate) - def service_level(self, positions: int, scale_positions: bool = False): + def calculate_service_level(self): """ Returns the expected service level given a number of positions @@ -83,20 +94,14 @@ def service_level(self, positions: int, scale_positions: bool = False): positions: int, The number of positions attending. - scale_positions: bool, default = False - Set it to True if the positions were calculated using shrinkage. - """ - if scale_positions: - productive_positions = floor((1 - self.shrinkage) * positions) - else: - productive_positions = positions - probability_wait = self.waiting_probability(productive_positions, scale_positions=False) - exponential = exp(-(productive_positions - self.intensity) * (self.asa / self.aht)) - return max(0, 1 - (probability_wait * exponential)) + self.calculate_waiting_probability() + exponential = exp(-(self.raw_positions - self.intensity) * (self.asa / self.aht)) - def achieved_occupancy(self, positions: int, scale_positions: bool = False): + self.achieved_service_level = max(0, 1-(self.waiting_probability * exponential)) + + def calculate_achieved_occupancy(self): """ Returns the expected occupancy of positions @@ -105,18 +110,12 @@ def achieved_occupancy(self, positions: int, scale_positions: bool = False): positions: int, The number of raw positions - scale_positions: bool, default=False - Set it to True if the positions were calculated using shrinkage. """ - if scale_positions: - productive_positions = floor((1 - self.shrinkage) * positions) - else: - productive_positions = positions - return self.intensity / productive_positions + self.achieved_occupancy = self.intensity / self.raw_positions - def required_positions(self, service_level: float, max_occupancy: float = 1.0): + def calculate_required_positions(self, enforce_trafficking_requirements:bool = True): """ Computes the requirements using erlangc.rst @@ -142,213 +141,103 @@ def required_positions(self, service_level: float, max_occupancy: float = 1.0): The expected occupancy of positions waiting_probability: float, The probability of a transaction waiting in the queue - """ - - if service_level < 0 or service_level > 1: - raise ValueError("service_level must be between 0 and 1") - - if max_occupancy < 0 or max_occupancy > 1: - raise ValueError("max_occupancy must be between 0 and 1") - - positions = round(self.intensity + 1) - achieved_service_level = self.service_level(positions, scale_positions=False) - while achieved_service_level < service_level: - positions += 1 - achieved_service_level = self.service_level(positions, scale_positions=False) - - achieved_occupancy = self.achieved_occupancy(positions, scale_positions=False) - - raw_positions = ceil(positions) - - if achieved_occupancy > max_occupancy: - raw_positions = ceil(self.intensity / max_occupancy) - achieved_occupancy = self.achieved_occupancy(raw_positions) - achieved_service_level = self.service_level(raw_positions) - - waiting_probability = self.waiting_probability(positions=raw_positions) - positions = ceil(raw_positions / (1 - self.shrinkage)) - - return {"raw_positions": raw_positions, - "positions": positions, - "service_level": achieved_service_level, - "occupancy": achieved_occupancy, - "waiting_probability": waiting_probability} - - -class MultiErlangC: + """ + # set positions is intensity + 1 for initalisation, otherwise equations will return 1 for SL. + self.raw_positions = self.intensity + 1 + self.calculate_service_level() + # Incremental increase of position estimate to reach service level + while self.achieved_service_level < self.service_level_target: + self.raw_positions += .1 + self.calculate_service_level() + + #Compensate calculated positions for shrinkage factor used. + self.positions = self.raw_positions / (1 - self.shrinkage) + + #Update parameters if legacy settings are required + if enforce_trafficking_requirements: + self.raw_positions = ceil(self.raw_positions) + self.positions = ceil(self.positions) + self.calculate_service_level() + + # Set resulting parameters based on final position estimate + self.calculate_achieved_occupancy() + # Adjust estimate when max occopancy is reached + if self.achieved_occupancy > self.maximum_occupancy: + self.raw_positions = self.intensity / self.maximum_occupancy + self.raw_positions = ceil(self.raw_positions) if enforce_trafficking_requirements else self.raw_positions + + self.calculate_waiting_probability() + +class ErlangC(BaseModel): """ - This class uses the erlangc.rst class using joblib's Parallel, - allowing to run multiple scenarios at once. - It finds solutions iterating over all possible combinations provided by the users, - inspired how Sklearn's Grid Search works - - Parameters - ---------- - - param_grid: dict, - Dictionary with the erlangc.rst.__init__ parameters, each key of the dictionary must be the - expected parameter and the value must be a list with the different options to iterate - example: {"transactions": [100, 200], "aht": [3], "interval": [30], "asa": [20 / 60], "shrinkage": [0.3]} - n_jobs: int, default=2 - The maximum number of concurrently running jobs. - If -1 all CPUs are used. If 1 is given, no parallel computing code is used at all, which is useful for debugging. - For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Thus for n_jobs = -2, all CPUs but one are used. - None is a marker for ‘unset’ that will be interpreted as n_jobs=1 (sequential execution) - unless the call is performed under a parallel_backend() context manager that sets another value for n_jobs. - pre_dispatch: {"all", int, or expression}, default='2 * n_jobs' - The number of batches (of tasks) to be pre-dispatched. Default is ‘2*n_jobs’. - See joblib's documentation for more details: https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html - - Attributes - ---------- - - waiting_probability_params: list[tuple], - Each tuple of the list represents the used parameters in param_grid for ErlangC and - arguments_grid for waiting_probability method,corresponding to the same order returned - by the MultiErlangC.waiting_probability method. - service_level_params: list[tuple], - Each tuple of the list represents the used parameters in param_grid for ErlangC and - arguments_grid for service_level method,corresponding to the same order returned - by the MultiErlangC.service_level method. - achieved_occupancy_params: list[tuple], - Each tuple of the list represents the used parameters in param_grid for ErlangC and - arguments_grid for achieved_occupancy method,corresponding to the same order returned - by the MultiErlangC.achieved_occupancy method. - required_positions_params: list[tuple], - Each tuple of the list represents the used parameters in param_grid for ErlangC and - arguments_grid for required_positions method,corresponding to the same order returned - by the MultiErlangC.required_positions method. """ - - def __init__(self, param_grid: dict, n_jobs: int = 2, pre_dispatch: str = '2 * n_jobs'): - - self.param_grid = param_grid - self.n_jobs = n_jobs - self.pre_dispatch = pre_dispatch - self.param_list = list(ParameterGrid(self.param_grid)) - self.waiting_probability_params = None - self.service_level_params = None - self.achieved_occupancy_params = None - self.required_positions_params = None - - def waiting_probability(self, arguments_grid): + ### Parameter setup ### + erlang_scenarios : Dict[str, Dict[str, ErlangCData]] # List of possible scenario's including subscenario's. + + ### Setup parameter grid and validate parameters ### + @validator('erlang_scenarios', pre=True) + def __create_parameter_grid__(cls, erlang_scenarios) -> dict(): + + for scenario, scenario_parameters in erlang_scenarios.items(): + # Format parameter to interables if not provided + for parameter, value in scenario_parameters.items(): + if not isinstance(value, Iterable): + scenario_parameters[parameter] = [value] + + sub_scenarios = list(ParameterGrid(scenario_parameters)) + sub_scenario_output = {} + for sub_scenario in range(len(sub_scenarios)): + #validate individual scenarios + sub_scenario_output.update( + {f"{scenario}.{sub_scenario}" :ErlangCData(**sub_scenarios[sub_scenario])} + ) + erlang_scenarios[scenario] = sub_scenario_output + return erlang_scenarios + + def results_to_dataframe(self) -> pd.DataFrame: + """ + Returns erlang scenario objects to a dataframe + """ + + # Transform all classes of Erlang C towards dictionaries + results = {main_scenario: {subscenario_name: dict(subscenario) for subscenario_name, subscenario in sub_scenarios.items()} for main_scenario, sub_scenarios in self.erlang_scenarios.items()} + + # Transform nested dictionaries towards a dataframe + scenario_frames = [] + for scenario_name, scenario in results.items(): + scenario_frame = pd.DataFrame.from_dict(scenario, orient='index').reset_index().rename(columns={'index':'subscenario'}) + scenario_frame.insert(0, 'scenario', scenario_name) + scenario_frames.append(scenario_frame) + + return pd.concat(scenario_frames) + + def calculate_required_positions(self, enforce_trafficking_requirements: bool = True): + """ + Calculate the required positions for handeling transactions according to Erlang C methodology + """ + + result = {main_scenario: {subscenario_name: subscenario.calculate_required_positions(enforce_trafficking_requirements) for subscenario_name, subscenario in sub_scenarios.items()} for main_scenario, sub_scenarios in self.erlang_scenarios.items()} + + + def calculate_waiting_probability(self): """ Returns the probability of waiting in the queue - Returns a list with the solution to all the possible combinations from the arguments_grid - and the erlangc.rst param_grid - - Parameters - ---------- - - arguments_grid: dict, - Dictionary with the erlangc.rst.waiting_probability parameters, - each key of the dictionary must be the expected parameter and - the value must be a list with the different options to iterate - example: {"positions": [10, 20, 30], "scale_positions": [True, False]} """ + + results = {main_scenario: {subscenario_name: subscenario.calculate_waiting_probability() for subscenario_name, subscenario in sub_scenarios.items()} for main_scenario, sub_scenarios in self.erlang_scenarios.items()} - arguments_list = list(ParameterGrid(arguments_grid)) - self.waiting_probability_params = [(erlang_params, wait_params) - for erlang_params in self.param_list - for wait_params in arguments_list] - combinations = len(self.param_list) * len(arguments_list) - results = Parallel(n_jobs=self.n_jobs, - pre_dispatch=self.pre_dispatch)(delayed(ErlangC(**params).waiting_probability)(**arguments) - for params in self.param_list - for arguments in arguments_list) - self._check_solutions(results, combinations) - - return results - def service_level(self, arguments_grid): + def calculate_service_level(self): """ Returns the expected service level given a number of positions - Returns a list with the solution to all the possible combinations from the arguments_grid - and the erlangc.rst param_grid - - Parameters - ---------- - - arguments_grid: dict, - Dictionary with the erlangc.rst.service_level parameters, - each key of the dictionary must be the expected parameter and - the value must be a list with the different options to iterate - example: {"positions": [10, 20, 30], "scale_positions": [True, False]} - - """ - arguments_list = list(ParameterGrid(arguments_grid)) - self.service_level_params = [(erlang_params, sl_params) - for erlang_params in self.param_list - for sl_params in arguments_list] - combinations = len(self.param_list) * len(arguments_list) - results = Parallel(n_jobs=self.n_jobs, - pre_dispatch=self.pre_dispatch)(delayed(ErlangC(**params).service_level)(**arguments) - for params in self.param_list - for arguments in arguments_list) - self._check_solutions(results, combinations) - - return results - - def achieved_occupancy(self, arguments_grid): """ - Returns the expected occupancy of positions - Returns a list with the solution to all the possible combinations from the arguments_grid - and the erlangc.rst param_grid - - Parameters - ---------- - arguments_grid: dict, - Dictionary with the erlangc.rst.achieved_occupancy parameters, - each key of the dictionary must be the expected parameter and - the value must be a list with the different options to iterate - example: {"positions": [10, 20, 30], "scale_positions": [True, False]} - """ - - arguments_list = list(ParameterGrid(arguments_grid)) - combinations = len(self.param_list) * len(arguments_list) - results = Parallel(n_jobs=self.n_jobs, - pre_dispatch=self.pre_dispatch)(delayed(ErlangC(**params).achieved_occupancy)(**arguments) - for params in self.param_list - for arguments in arguments_list) - self._check_solutions(results, combinations) + results = {main_scenario: {subscenario_name: subscenario.calculate_service_level() for subscenario_name, subscenario in sub_scenarios.items()} for main_scenario, sub_scenarios in self.erlang_scenarios.items()} - return results - def required_positions(self, arguments_grid): + def calculate_achieved_occupancy(self): """ - Computes the requirements using MultiErlangC - Returns a list with the solution to all the possible combinations from the arguments_grid and the erlangc.rst param_grid - - Parameters - ---------- - - arguments_grid: dict, - Dictionary with the erlangc.rst.achieved_occupancy parameters, - each key of the dictionary must be the expected parameter and - the value must be a list with the different options to iterate - example: {"service_level": [0.85, 0.9], "max_occupancy": [0.8, 0.95]} + Returns the expected occupancy of positions """ - arguments_list = list(ParameterGrid(arguments_grid)) - combinations = len(self.param_list) * len(arguments_list) - results = Parallel(n_jobs=self.n_jobs, - pre_dispatch=self.pre_dispatch)(delayed(ErlangC(**params).required_positions)(**arguments) - for params in self.param_list - for arguments in arguments_list) - self._check_solutions(results, combinations) - - return results - - def _check_solutions(self, solutions, combinations): - """ - Checks the integrity of the solution in terms of dimensions - """ - if len(solutions) < 1: # noqa - raise ValueError("Could not find any solution, make sure the param_grid is defined correctly") - - if len(solutions) != combinations: - raise ValueError('Inconsistent results. Expected {} ' - 'solutions, got {}' - .format(len(self.param_list), - len(solutions))) # noqa + results = {main_scenario: {subscenario_name: subscenario.calculate_achieved_occupancy() for subscenario_name, subscenario in sub_scenarios.items()} for main_scenario, sub_scenarios in self.erlang_scenarios.items()} \ No newline at end of file diff --git a/pyworkforce/queuing/queueing_constants.py b/pyworkforce/queuing/queueing_constants.py new file mode 100644 index 0000000..caf0718 --- /dev/null +++ b/pyworkforce/queuing/queueing_constants.py @@ -0,0 +1,13 @@ +cTransactions_name = 'transactions' +cShrinkage_name = 'shrinkage' +cAHT_name = 'aht' +cInterval_name = 'interval' +cASA_name = 'asa' + +cErlangC_generic_variables = { + cTransactions_name:cTransactions_name, + cShrinkage_name:cShrinkage_name, + cAHT_name:cAHT_name, + cInterval_name:cInterval_name, + cASA_name:cASA_name, +} diff --git a/pyworkforce/queuing/tests/test_erlang.py b/pyworkforce/queuing/tests/test_erlang.py index 2477182..2241106 100644 --- a/pyworkforce/queuing/tests/test_erlang.py +++ b/pyworkforce/queuing/tests/test_erlang.py @@ -1,92 +1,69 @@ import pytest from pyworkforce.queuing import ErlangC - -def test_expected_erlangc_results(): - erlang = ErlangC(transactions=100, asa=0.33, aht=3, interval=30, shrinkage=0.3) - results = erlang.required_positions(service_level=0.8, max_occupancy=0.85) - raw_positions = results['raw_positions'] - positions = results['positions'] - service_level = results['service_level'] - occupancy = results['occupancy'] - waiting_probability = results['waiting_probability'] - - assert raw_positions == 14 - assert positions == 20 - assert round(service_level, 3) == 0.888 - assert round(occupancy, 3) == 0.714 - assert round(waiting_probability, 3) == 0.174 - - -def test_scale_positions_erlangc(): - erlang = ErlangC(transactions=100, asa=0.33, aht=3, interval=30, shrinkage=0.3) - results = erlang.required_positions(service_level=0.8, max_occupancy=0.85) - positions = results['positions'] - service_level = erlang.service_level(positions=positions, scale_positions=True) - occupancy = erlang.achieved_occupancy(positions=positions, scale_positions=True) - waiting_probability = erlang.waiting_probability(positions=positions, scale_positions=True) - - assert positions == 20 - assert round(service_level, 3) == 0.888 - assert round(occupancy, 3) == 0.714 - assert round(waiting_probability, 3) == 0.174 - - -def test_over_occupancy_erlangc(): - erlang = ErlangC(transactions=100, asa=0.33, aht=3, interval=30, shrinkage=0.3) - results = erlang.required_positions(service_level=0.8, max_occupancy=0.7) - raw_positions = results['raw_positions'] - positions = results['positions'] - service_level = erlang.service_level(positions=positions, scale_positions=True) - occupancy = erlang.achieved_occupancy(positions=positions, scale_positions=True) - waiting_probability = erlang.waiting_probability(positions=positions, scale_positions=True) - - assert raw_positions == 15 - assert positions == 22 - assert round(service_level, 3) == 0.941 - assert round(occupancy, 3) == 0.667 - assert round(waiting_probability, 3) == 0.102 - - -def test_wrong_transactions_erlangc(): - with pytest.raises(Exception) as excinfo: - erlang = ErlangC(transactions=-20, asa=0.33, aht=3, interval=30, shrinkage=0.3) - assert str(excinfo.value) == "transactions can't be smaller or equals than 0" - - -def test_wrong_aht_erlangc(): - with pytest.raises(Exception) as excinfo: - erlang = ErlangC(transactions=100, asa=0.33, aht=-5, interval=30, shrinkage=0.3) - assert str(excinfo.value) == "aht can't be smaller or equals than 0" - - -def test_wrong_asa_erlangc(): - with pytest.raises(Exception) as excinfo: - erlang = ErlangC(transactions=100, asa=0, aht=5, interval=30, shrinkage=0.3) - assert str(excinfo.value) == "asa can't be smaller or equals than 0" - - -def test_wrong_interval_erlangc(): - with pytest.raises(Exception) as excinfo: - erlang = ErlangC(transactions=100, asa=10, aht=5, interval=-30, shrinkage=0.3) - assert str(excinfo.value) == "interval can't be smaller or equals than 0" - - -def test_wrong_shrinkage_erlangc(): - with pytest.raises(Exception) as excinfo: - erlang = ErlangC(transactions=100, asa=10, aht=5, interval=30, shrinkage=1) - assert str(excinfo.value) == "shrinkage must be between in the interval [0,1)" - - -def test_wrong_service_level_erlangc(): - erlang = ErlangC(transactions=100, asa=0.33, aht=3, interval=30, shrinkage=0.3) - with pytest.raises(Exception) as excinfo: - results = erlang.required_positions(service_level=1.8, max_occupancy=0.85) - assert str(excinfo.value) == "service_level must be between 0 and 1" - - -def test_wrong_max_occupancy_erlangc(): - erlang = ErlangC(transactions=100, asa=0.33, aht=3, interval=30, shrinkage=0.3) - with pytest.raises(Exception) as excinfo: - results = erlang.required_positions(service_level=0.8, max_occupancy=1.2) - assert str(excinfo.value) == "max_occupancy must be between 0 and 1" +class TestDefaultErlangCBehaviour: + """ + Test regular erlangC inputs for single and multi scenario dictionaries. + """ + + + single_scenario_erlangC_legacy = {"test_scenario 1": {"transactions": 100, "aht": 3.0, "asa": .33, "shrinkage": 0.3, "interval": 30, 'service_level_target':.8}} + single_scenario_erlangC = {"test_scenario 1": {"transactions": 100, "aht": 3.0, "asa": .33, "shrinkage": 0.3, "interval": 30, 'service_level_target':.8}} + multiple_scenario_erlangC = {"test_scenario 1": {"transactions": 100, "aht": 3.0, "asa": 20 / 60, "shrinkage": 0.3, "interval": 30, 'service_level_target':.8}, + "test_scenario 2": {"transactions": [100,200], "aht": 3.0, "asa": 20 / 60, "shrinkage": 0.3, "interval": 30, 'service_level_target':.8}} + + def test_erlangc_single_scenario_results_legacy(self): + + erlang = ErlangC(erlang_scenarios=self.single_scenario_erlangC_legacy) + erlang.calculate_required_positions() + results = erlang.results_to_dataframe() + results = results.round(3) + assert (results['raw_positions'] == 14).all() + assert (results['positions'] == 19).all() + assert (results['achieved_service_level'] == 0.888).all() + assert (results['achieved_occupancy'] == 0.714).all() + assert (results['waiting_probability'] == 0.174).all() + + def test_erlangc_single_scenario_results(self): + + erlang = ErlangC(erlang_scenarios=self.single_scenario_erlangC) + erlang.calculate_required_positions(enforce_trafficking_requirements=False) + results = erlang.results_to_dataframe() + results = results.round(3) + + assert (results['raw_positions'] == 13.1).all() + assert (results['positions'] == 18.714).all() + assert (results['achieved_service_level'] == 0.817).all() + assert (results['achieved_occupancy'] == 0.763).all() + assert (results['waiting_probability'] == 0.257).all() + + def test_erlangc_multi_scenario_results(self): + + erlang = ErlangC(erlang_scenarios=self.multiple_scenario_erlangC) + erlang.calculate_required_positions(enforce_trafficking_requirements=False) + results = erlang.results_to_dataframe() + results = results.round(3) + + columns = ['scenario', 'subscenario', 'transactions', 'aht', 'asa', 'shrinkage', + 'interval', 'service_level_target', 'achieved_service_level', + 'raw_positions', 'positions', 'maximum_occupancy', + 'waiting_probability', 'achieved_occupancy', 'intensity'] + + assert results.shape == (3,15) + assert (results.columns == columns).all() + assert (results['scenario'].isin(list(self.multiple_scenario_erlangC.keys()) )).all() + + scenario_1 = results[results.scenario == "test_scenario 1"] + assert (scenario_1['raw_positions'] == 13.1).all() + assert (scenario_1['positions'] == 18.714).all() + assert (scenario_1['achieved_service_level'] == 0.818).all() + assert (scenario_1['achieved_occupancy'] == 0.763).all() + assert (scenario_1['waiting_probability'] == 0.257).all() + + scenario_2 = results[results.scenario == "test_scenario 2"] + assert (scenario_2['raw_positions'] == [13.1,23.9]).all() + assert (scenario_2['positions'] == [18.714, 34.143]).all() + assert (scenario_2['achieved_service_level'] == [0.818, 0.801]).all() + assert (scenario_2['achieved_occupancy'] == [0.763, 0.837]).all() + assert (scenario_2['waiting_probability'] == [0.257, 0.307]).all() + \ No newline at end of file diff --git a/pyworkforce/queuing/tests/test_multi_erlang.py b/pyworkforce/queuing/tests/test_multi_erlang.py deleted file mode 100644 index 88eb6eb..0000000 --- a/pyworkforce/queuing/tests/test_multi_erlang.py +++ /dev/null @@ -1,95 +0,0 @@ -import pytest -from pyworkforce.queuing.erlang import MultiErlangC - - -def test_expected_multierlangc_results(): - param_grid = {"transactions": [100], "asa": [0.33], "aht": [3], "interval": [30], "shrinkage": [0.3]} - erlang = MultiErlangC(param_grid=param_grid) - arguments_grid = {"service_level": [0.8], "max_occupancy": [0.85]} - results = erlang.required_positions(arguments_grid=arguments_grid)[0] - raw_positions = results['raw_positions'] - positions = results['positions'] - service_level = results['service_level'] - occupancy = results['occupancy'] - waiting_probability = results['waiting_probability'] - - assert raw_positions == 14 - assert positions == 20 - assert round(service_level, 3) == 0.888 - assert round(occupancy, 3) == 0.714 - assert round(waiting_probability, 3) == 0.174 - - -def test_expected_multierlangc_grid(): - param_grid = {"transactions": [100], "asa": [0.33], "aht": [3], "interval": [30], "shrinkage": [0.3]} - erlang = MultiErlangC(param_grid=param_grid) - arguments_grid = {"service_level": [0.8, 0.9], "max_occupancy": [0.85]} - results_list = erlang.required_positions(arguments_grid=arguments_grid) - - assert len(results_list) == 2 - - results = results_list[0] - - raw_positions = results['raw_positions'] - positions = results['positions'] - service_level = results['service_level'] - occupancy = results['occupancy'] - waiting_probability = results['waiting_probability'] - - assert raw_positions == 14 - assert positions == 20 - assert round(service_level, 3) == 0.888 - assert round(occupancy, 3) == 0.714 - assert round(waiting_probability, 3) == 0.174 - - results = results_list[1] - - raw_positions = results['raw_positions'] - positions = results['positions'] - service_level = results['service_level'] - occupancy = results['occupancy'] - waiting_probability = results['waiting_probability'] - - assert raw_positions == 15 - assert positions == 22 - assert round(service_level, 3) == 0.941 - assert round(occupancy, 3) == 0.667 - assert round(waiting_probability, 3) == 0.102 - - -def test_multiscale_positions_erlangc(): - param_grid = {"transactions": [100], "asa": [0.33], "aht": [3], "interval": [30], "shrinkage": [0.3]} - erlang = MultiErlangC(param_grid=param_grid) - arguments_grid = {"service_level": [0.8], "max_occupancy": [0.85]} - results = erlang.required_positions(arguments_grid=arguments_grid)[0] - positions = results['positions'] - - arguments_grid = {"positions": [positions], "scale_positions": [True]} - service_level = erlang.service_level(arguments_grid)[0] - occupancy = erlang.achieved_occupancy(arguments_grid)[0] - waiting_probability = erlang.waiting_probability(arguments_grid)[0] - - assert positions == 20 - assert round(service_level, 3) == 0.888 - assert round(occupancy, 3) == 0.714 - assert round(waiting_probability, 3) == 0.174 - - -def test_expected_multierlangc_wrong_arguments(): - param_grid = {"transactions": [100], "asa": [0.33], "aht": [3], "interval": [30], "shrinkage": [0.3]} - erlang = MultiErlangC(param_grid=param_grid) - arguments_grid = {"max_occupancy": [0.85]} - - with pytest.raises(Exception) as excinfo: - results = erlang.required_positions(arguments_grid=arguments_grid)[0] - assert str(excinfo.value) in ["required_positions() missing 1 required positional argument: 'service_level'", - "ErlangC.required_positions() missing 1 required positional argument: 'service_level'"] - - -def test_multierlangc_wrong_grid(): - param_grid = {"transactions": 100, "asa": [0.33], "aht": [3], "interval": [30], "shrinkage": [0.3]} - - with pytest.raises(Exception) as excinfo: - results = MultiErlangC(param_grid=param_grid) - assert str(excinfo.value) == "Parameter grid value is not iterable (key='transactions', value=100)" - diff --git a/setup.py b/setup.py index c2a0627..d6d0251 100644 --- a/setup.py +++ b/setup.py @@ -43,6 +43,7 @@ 'ortools>=9.2.9972', 'pandas>=1.3.5', 'joblib>0.17' + 'pydantic==1.10.7' ], python_requires=">=3.8", include_package_data=True,