|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from ortools.linear_solver import pywraplp |
|
|
from math import ceil |
|
|
import datetime |
|
|
from src.config.constants import ShiftType, LineType, KitLevel |
|
|
|
|
|
|
|
|
|
|
|
from src.config.constants import ShiftType, LineType, DefaultConfig |
|
|
import src.preprocess.extract as extract |
|
|
from src.preprocess.hierarchy_parser import sort_products_by_hierarchy |
|
|
|
|
|
class Optimizer: |
|
|
"""Workforce optimization class that handles all configuration and optimization logic""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize optimizer with session state configuration""" |
|
|
self.load_session_state_config() |
|
|
self.load_data() |
|
|
|
|
|
def load_session_state_config(self): |
|
|
"""Load all configuration from session state""" |
|
|
import streamlit as st |
|
|
import datetime as dt |
|
|
|
|
|
|
|
|
self.start_date = st.session_state.start_date |
|
|
self.planning_days = st.session_state.planning_days |
|
|
self.start_datetime = dt.datetime.combine(self.start_date, dt.datetime.min.time()) |
|
|
self.end_date = self.start_datetime + dt.timedelta(days=self.planning_days - 1) |
|
|
self.date_span = list(range(1, self.planning_days + 1)) |
|
|
|
|
|
|
|
|
self.employee_type_list = list(st.session_state.selected_employee_types) |
|
|
self.active_shift_list = sorted(list(st.session_state.selected_shifts)) |
|
|
|
|
|
print("\n[DEBUG] From session_state.selected_employee_types:") |
|
|
for emp in self.employee_type_list: |
|
|
print(f" - '{emp}' (len={len(emp)}, repr={repr(emp)})") |
|
|
|
|
|
|
|
|
self.max_hour_per_person_per_day = st.session_state.max_hour_per_person_per_day |
|
|
self.max_hours_shift = { |
|
|
ShiftType.REGULAR: st.session_state.max_hours_shift_1, |
|
|
ShiftType.EVENING: st.session_state.max_hours_shift_2, |
|
|
ShiftType.OVERTIME: st.session_state.max_hours_shift_3 |
|
|
} |
|
|
|
|
|
|
|
|
self.max_employee_per_type_on_day = st.session_state.max_employee_per_type_on_day |
|
|
|
|
|
|
|
|
self.line_counts = st.session_state.line_counts |
|
|
self.max_parallel_workers = { |
|
|
LineType.LONG_LINE: st.session_state.max_parallel_workers_long_line, |
|
|
LineType.MINI_LOAD: st.session_state.max_parallel_workers_mini_load |
|
|
} |
|
|
|
|
|
|
|
|
self.cost_list_per_emp_shift = st.session_state.cost_list_per_emp_shift |
|
|
|
|
|
|
|
|
self.payment_mode_config = st.session_state.payment_mode_config |
|
|
|
|
|
|
|
|
self.fixed_min_unicef_per_day = st.session_state.fixed_min_unicef_per_day |
|
|
|
|
|
print("✅ Session state configuration loaded successfully") |
|
|
|
|
|
def load_data(self): |
|
|
"""Load all required data from files""" |
|
|
|
|
|
try: |
|
|
kit_levels, dependencies, priority_order = extract.get_production_order_data() |
|
|
self.kit_levels = kit_levels |
|
|
self.kit_dependencies = dependencies |
|
|
self.production_priority_order = priority_order |
|
|
except: |
|
|
self.kit_levels = {} |
|
|
self.kit_dependencies = {} |
|
|
self.production_priority_order = [] |
|
|
|
|
|
|
|
|
try: |
|
|
kit_line_match = extract.read_kit_line_match_data() |
|
|
kit_line_match_dict = kit_line_match.set_index("kit_name")["line_type"].to_dict() |
|
|
|
|
|
|
|
|
line_name_to_id = { |
|
|
"long line": LineType.LONG_LINE, |
|
|
"mini load": LineType.MINI_LOAD |
|
|
} |
|
|
|
|
|
|
|
|
self.kit_line_match_dict = {} |
|
|
for kit_name, line_name in kit_line_match_dict.items(): |
|
|
self.kit_line_match_dict[kit_name] = line_name_to_id.get(line_name.lower(), line_name) |
|
|
except: |
|
|
self.kit_line_match_dict = {} |
|
|
|
|
|
|
|
|
try: |
|
|
from src.demand_filtering import DemandFilter |
|
|
filter_instance = DemandFilter() |
|
|
filter_instance.load_data(force_reload=True) |
|
|
self.product_list = filter_instance.get_filtered_product_list() |
|
|
self.demand_dictionary = filter_instance.get_filtered_demand_dictionary() |
|
|
except: |
|
|
self.product_list = [] |
|
|
self.demand_dictionary = {} |
|
|
|
|
|
|
|
|
try: |
|
|
print("\n[DEBUG] Loading team requirements from Kits Calculation...") |
|
|
kits_df = extract.read_personnel_requirement_data() |
|
|
print(f"[DEBUG] Loaded kits_df with {len(kits_df)} rows") |
|
|
print(f"[DEBUG] Columns: {list(kits_df.columns)}") |
|
|
|
|
|
|
|
|
self.team_req_per_product = { |
|
|
"UNICEF Fixed term": {}, |
|
|
"Humanizer": {} |
|
|
} |
|
|
|
|
|
|
|
|
for product in self.product_list: |
|
|
product_data = kits_df[kits_df['Kit'] == product] |
|
|
if not product_data.empty: |
|
|
|
|
|
humanizer_req = product_data["Humanizer"].iloc[0] |
|
|
unicef_req = product_data["UNICEF staff"].iloc[0] |
|
|
|
|
|
|
|
|
self.team_req_per_product["Humanizer"][product] = int(humanizer_req) |
|
|
self.team_req_per_product["UNICEF Fixed term"][product] = int(unicef_req) |
|
|
else: |
|
|
print(f"[WARN] Product {product} not found in Kits Calculation, setting requirements to 0") |
|
|
self.team_req_per_product["Humanizer"][product] = 0 |
|
|
self.team_req_per_product["UNICEF Fixed term"][product] = 0 |
|
|
|
|
|
print(f"\n[DEBUG] team_req_per_product keys after loading:") |
|
|
for key in self.team_req_per_product.keys(): |
|
|
product_count = len(self.team_req_per_product[key]) |
|
|
print(f" - '{key}' (len={len(key)}, {product_count} products)") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[ERROR] Failed to load team requirements: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
self.team_req_per_product = {} |
|
|
|
|
|
|
|
|
try: |
|
|
self.per_product_speed = extract.read_package_speed_data() |
|
|
except: |
|
|
self.per_product_speed = {} |
|
|
|
|
|
print("✅ All data loaded successfully") |
|
|
|
|
|
def build_lines(self): |
|
|
"""Build line instances from session state configuration""" |
|
|
line_tuples = [] |
|
|
|
|
|
try: |
|
|
import streamlit as st |
|
|
|
|
|
selected_lines = st.session_state.selected_lines |
|
|
|
|
|
line_counts = st.session_state.line_counts |
|
|
|
|
|
print(f"Using lines from session state - selected: {selected_lines}, counts: {line_counts}") |
|
|
for line_type in selected_lines: |
|
|
count = line_counts.get(line_type, 0) |
|
|
for i in range(1, count + 1): |
|
|
line_tuples.append((line_type, i)) |
|
|
|
|
|
return line_tuples |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Could not get line config from session state: {e}") |
|
|
|
|
|
print("Falling back to default line configuration") |
|
|
default_selected_lines = [LineType.LONG_LINE, LineType.MINI_LOAD] |
|
|
default_line_counts = { |
|
|
LineType.LONG_LINE: DefaultConfig.LINE_COUNT_LONG_LINE, |
|
|
LineType.MINI_LOAD: DefaultConfig.LINE_COUNT_MINI_LOAD |
|
|
} |
|
|
|
|
|
for line_type in default_selected_lines: |
|
|
count = default_line_counts.get(line_type, 0) |
|
|
for i in range(1, count + 1): |
|
|
line_tuples.append((line_type, i)) |
|
|
|
|
|
return line_tuples |
|
|
|
|
|
def run_optimization(self): |
|
|
"""Run the main optimization algorithm""" |
|
|
|
|
|
print("\n" + "="*60) |
|
|
print("🔄 LOADING FRESH DATA FOR OPTIMIZATION") |
|
|
print("="*60) |
|
|
|
|
|
print(f"📦 LOADED PRODUCTS: {len(self.product_list)} products") |
|
|
print(f"📈 LOADED DEMAND: {sum(self.demand_dictionary.values())} total units") |
|
|
print(f"👥 LOADED TEAM REQUIREMENTS: {len(self.team_req_per_product)} employee types") |
|
|
|
|
|
|
|
|
print("\n[DEBUG] team_req_per_product employee types:") |
|
|
for emp_type in self.team_req_per_product.keys(): |
|
|
print(f" - '{emp_type}'") |
|
|
|
|
|
print("\n[DEBUG] self.employee_type_list:") |
|
|
for emp_type in self.employee_type_list: |
|
|
print(f" - '{emp_type}'") |
|
|
|
|
|
|
|
|
ACTIVE = {t: {p: 1 for p in self.product_list} for t in self.date_span} |
|
|
|
|
|
|
|
|
date_span_list = list(self.date_span) |
|
|
employee_type_list = self.employee_type_list |
|
|
active_shift_list = self.active_shift_list |
|
|
print(f"\n[DEBUG] employee_type_list: {employee_type_list}") |
|
|
print(f"[DEBUG] active_shift_list: {active_shift_list}") |
|
|
|
|
|
|
|
|
print("\n" + "="*60) |
|
|
print("🔗 APPLYING HIERARCHY-BASED PRODUCTION ORDERING") |
|
|
print("="*60) |
|
|
sorted_product_list = sort_products_by_hierarchy(list(self.product_list), self.kit_levels, self.kit_dependencies) |
|
|
|
|
|
line_tuples = self.build_lines() |
|
|
print("Lines", line_tuples) |
|
|
|
|
|
print("PER_PRODUCT_SPEED", self.per_product_speed) |
|
|
|
|
|
|
|
|
print("\n[DEBUG] Creating variable aliases...") |
|
|
Hmax_s = dict(self.max_hours_shift) |
|
|
Hmax_daily = self.max_hour_per_person_per_day |
|
|
max_workers_line = dict(self.max_parallel_workers) |
|
|
max_employee_type_day = self.max_employee_per_type_on_day |
|
|
cost = self.cost_list_per_emp_shift |
|
|
|
|
|
|
|
|
TEAM_REQ_PER_PRODUCT = self.team_req_per_product |
|
|
DEMAND_DICTIONARY = self.demand_dictionary |
|
|
KIT_LINE_MATCH_DICT = self.kit_line_match_dict |
|
|
KIT_LEVELS = self.kit_levels |
|
|
KIT_DEPENDENCIES = self.kit_dependencies |
|
|
PER_PRODUCT_SPEED = self.per_product_speed |
|
|
FIXED_MIN_UNICEF_PER_DAY = self.fixed_min_unicef_per_day |
|
|
PAYMENT_MODE_CONFIG = self.payment_mode_config |
|
|
|
|
|
|
|
|
EVENING_SHIFT_MODE = "normal" |
|
|
EVENING_SHIFT_DEMAND_THRESHOLD = 0.9 |
|
|
|
|
|
print(f"[DEBUG] TEAM_REQ_PER_PRODUCT has {len(TEAM_REQ_PER_PRODUCT)} employee types") |
|
|
print(f"[DEBUG] employee_type_list has {len(employee_type_list)} types") |
|
|
|
|
|
|
|
|
print("\n[DEBUG] Starting feasibility checks...") |
|
|
|
|
|
|
|
|
for i, p in enumerate(sorted_product_list): |
|
|
print(f"[DEBUG] Checking product {i+1}/{len(sorted_product_list)}: {p}") |
|
|
|
|
|
|
|
|
for e in employee_type_list: |
|
|
if e not in TEAM_REQ_PER_PRODUCT: |
|
|
print(f"[ERROR] Employee type '{e}' not found in TEAM_REQ_PER_PRODUCT!") |
|
|
print(f"[ERROR] Available keys: {list(TEAM_REQ_PER_PRODUCT.keys())}") |
|
|
raise KeyError(f"Employee type '{e}' not in team requirements data") |
|
|
if p not in TEAM_REQ_PER_PRODUCT[e]: |
|
|
print(f"[ERROR] Product '{p}' not found in TEAM_REQ_PER_PRODUCT['{e}']!") |
|
|
raise KeyError(f"Product '{p}' not in team requirements for employee type '{e}'") |
|
|
|
|
|
req_total = sum(TEAM_REQ_PER_PRODUCT[e][p] for e in employee_type_list) |
|
|
print(f"[DEBUG] req_total: {req_total}") |
|
|
lt = KIT_LINE_MATCH_DICT.get(p, 6) |
|
|
if p not in KIT_LINE_MATCH_DICT: |
|
|
print(f"[WARN] Product {p}: No line type mapping found, defaulting to long line (6)") |
|
|
if req_total > max_workers_line.get(lt, 1e9): |
|
|
print(f"[WARN] Product {p}: team size {req_total} > MAX_PARALLEL_WORKERS[{lt}] " |
|
|
f"= {max_workers_line.get(lt)}. Blocked.") |
|
|
|
|
|
|
|
|
if EVENING_SHIFT_MODE == "normal": |
|
|
total_demand = sum(DEMAND_DICTIONARY.get(p, 0) for p in sorted_product_list) |
|
|
|
|
|
|
|
|
regular_overtime_shifts = [s for s in active_shift_list if s in ShiftType.REGULAR_AND_OVERTIME] |
|
|
max_capacity = 0 |
|
|
|
|
|
for p in sorted_product_list: |
|
|
if p in PER_PRODUCT_SPEED: |
|
|
product_speed = PER_PRODUCT_SPEED[p] |
|
|
|
|
|
max_hours_per_product = 0 |
|
|
for ell in line_tuples: |
|
|
for s in regular_overtime_shifts: |
|
|
for t in date_span_list: |
|
|
max_hours_per_product += Hmax_s[s] |
|
|
|
|
|
max_capacity += product_speed * max_hours_per_product |
|
|
|
|
|
capacity_ratio = max_capacity / total_demand if total_demand > 0 else float('inf') |
|
|
|
|
|
print(f"[CAPACITY CHECK] Total demand: {total_demand}") |
|
|
print(f"[CAPACITY CHECK] Max capacity (Regular + Overtime): {max_capacity:.1f}") |
|
|
print(f"[CAPACITY CHECK] Capacity ratio: {capacity_ratio:.2f}") |
|
|
|
|
|
if capacity_ratio < EVENING_SHIFT_DEMAND_THRESHOLD: |
|
|
print(f"\n🚨 [ALERT] DEMAND TOO HIGH!") |
|
|
print(f" Current capacity can only meet {capacity_ratio*100:.1f}% of demand") |
|
|
print(f" Threshold: {EVENING_SHIFT_DEMAND_THRESHOLD*100:.1f}%") |
|
|
print(f" RECOMMENDATION: Change EVENING_SHIFT_MODE to 'activate_evening' to enable evening shift") |
|
|
print(f" This will add shift 3 to increase capacity\n") |
|
|
|
|
|
|
|
|
|
|
|
solver = pywraplp.Solver.CreateSolver('CBC') |
|
|
if not solver: |
|
|
raise RuntimeError("CBC solver not found.") |
|
|
INF = solver.infinity() |
|
|
|
|
|
|
|
|
|
|
|
Assignment, Hours, Units = {}, {}, {} |
|
|
for p in sorted_product_list: |
|
|
for ell in line_tuples: |
|
|
for s in active_shift_list: |
|
|
for t in date_span_list: |
|
|
|
|
|
Assignment[p, ell, s, t] = solver.BoolVar(f"Z_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}") |
|
|
|
|
|
Hours[p, ell, s, t] = solver.NumVar(0, Hmax_s[s], f"T_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}") |
|
|
|
|
|
Units[p, ell, s, t] = solver.NumVar(0, INF, f"U_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
EMPLOYEE_COUNT = {} |
|
|
for e in employee_type_list: |
|
|
for s in active_shift_list: |
|
|
for t in date_span_list: |
|
|
|
|
|
|
|
|
max_count = max_employee_type_day.get(e, {}).get(t, 100) |
|
|
EMPLOYEE_COUNT[e, s, t] = solver.IntVar( |
|
|
0, |
|
|
max_count, |
|
|
f"EmpCount_{e}_s{s}_day{t}" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
EMPLOYEE_HOURS = {} |
|
|
for e in employee_type_list: |
|
|
for s in active_shift_list: |
|
|
for t in date_span_list: |
|
|
|
|
|
|
|
|
EMPLOYEE_HOURS[e, s, t] = solver.Sum( |
|
|
TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t] |
|
|
for p in sorted_product_list |
|
|
for ell in line_tuples |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"\n[DEBUG] Payment mode configuration: {PAYMENT_MODE_CONFIG}") |
|
|
|
|
|
|
|
|
cost_terms = [] |
|
|
|
|
|
for e in employee_type_list: |
|
|
for s in active_shift_list: |
|
|
for t in date_span_list: |
|
|
payment_mode = PAYMENT_MODE_CONFIG.get(s, "partial") |
|
|
|
|
|
if payment_mode == "partial": |
|
|
|
|
|
|
|
|
|
|
|
cost_terms.append(cost[e][s] * EMPLOYEE_HOURS[e, s, t]) |
|
|
|
|
|
elif payment_mode == "bulk": |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
work_in_shift = solver.BoolVar(f"work_{e}_s{s}_d{t}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
max_possible_hours = Hmax_s[s] * max_employee_type_day[e][t] |
|
|
solver.Add(EMPLOYEE_HOURS[e, s, t] <= max_possible_hours * work_in_shift) |
|
|
solver.Add(work_in_shift * 0.001 <= EMPLOYEE_HOURS[e, s, t]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
employees_in_shift = solver.IntVar(0, max_employee_type_day[e][t], f"emp_{e}_s{s}_d{t}") |
|
|
|
|
|
|
|
|
|
|
|
solver.Add(employees_in_shift * Hmax_s[s] >= EMPLOYEE_HOURS[e, s, t]) |
|
|
|
|
|
|
|
|
cost_terms.append(cost[e][s] * Hmax_s[s] * employees_in_shift) |
|
|
|
|
|
|
|
|
|
|
|
total_cost = solver.Sum(cost_terms) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
solver.Minimize(total_cost) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for p in sorted_product_list: |
|
|
total_production = solver.Sum(Units[p, ell, s, t] for ell in line_tuples for s in active_shift_list for t in date_span_list) |
|
|
demand = DEMAND_DICTIONARY.get(p, 0) |
|
|
|
|
|
|
|
|
solver.Add(total_production >= demand) |
|
|
|
|
|
|
|
|
solver.Add(total_production <= demand) |
|
|
|
|
|
|
|
|
for ell in line_tuples: |
|
|
for s in active_shift_list: |
|
|
for t in date_span_list: |
|
|
solver.Add(solver.Sum(Assignment[p, ell, s, t] for p in sorted_product_list) <= 1) |
|
|
for p in sorted_product_list: |
|
|
solver.Add(Hours[p, ell, s, t] <= Hmax_s[s] * Assignment[p, ell, s, t]) |
|
|
|
|
|
|
|
|
for p in sorted_product_list: |
|
|
req_lt = KIT_LINE_MATCH_DICT.get(p, LineType.LONG_LINE) |
|
|
req_total = sum(TEAM_REQ_PER_PRODUCT[e][p] for e in employee_type_list) |
|
|
for ell in line_tuples: |
|
|
allowed = (ell[0] == req_lt) and (req_total <= max_workers_line.get(ell[0], 1e9)) |
|
|
for s in active_shift_list: |
|
|
for t in date_span_list: |
|
|
if ACTIVE[t][p] == 0 or not allowed: |
|
|
solver.Add(Assignment[p, ell, s, t] == 0) |
|
|
solver.Add(Hours[p, ell, s, t] == 0) |
|
|
solver.Add(Units[p, ell, s, t] == 0) |
|
|
|
|
|
|
|
|
for p in sorted_product_list: |
|
|
for ell in line_tuples: |
|
|
for s in active_shift_list: |
|
|
for t in date_span_list: |
|
|
|
|
|
if p in PER_PRODUCT_SPEED: |
|
|
|
|
|
speed = PER_PRODUCT_SPEED[p] |
|
|
|
|
|
solver.Add( |
|
|
Units[p, ell, s, t] <= speed * Hours[p, ell, s, t] |
|
|
) |
|
|
|
|
|
solver.Add( |
|
|
Units[p, ell, s, t] >= speed * Hours[p, ell, s, t] |
|
|
) |
|
|
else: |
|
|
|
|
|
default_speed = 800 / 7.5 |
|
|
print(f"Warning: No speed data for product {p}, using default {default_speed:.1f} per hour") |
|
|
|
|
|
solver.Add( |
|
|
Units[p, ell, s, t] <= default_speed * Hours[p, ell, s, t] |
|
|
) |
|
|
|
|
|
solver.Add( |
|
|
Units[p, ell, s, t] >= default_speed * Hours[p, ell, s, t] |
|
|
) |
|
|
|
|
|
|
|
|
for e in employee_type_list: |
|
|
for s in active_shift_list: |
|
|
for t in date_span_list: |
|
|
|
|
|
solver.Add( |
|
|
solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t] for p in sorted_product_list for ell in line_tuples) |
|
|
<= Hmax_s[s] * max_employee_type_day[e][t] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
for e in employee_type_list: |
|
|
for s in active_shift_list: |
|
|
for t in date_span_list: |
|
|
|
|
|
total_person_hours_in_shift = solver.Sum( |
|
|
TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t] |
|
|
for p in sorted_product_list |
|
|
for ell in line_tuples |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
solver.Add(EMPLOYEE_COUNT[e, s, t] * Hmax_s[s] >= total_person_hours_in_shift) |
|
|
|
|
|
|
|
|
|
|
|
if ShiftType.EVENING in active_shift_list and ShiftType.REGULAR in active_shift_list: |
|
|
for e in employee_type_list: |
|
|
for t in date_span_list: |
|
|
solver.Add( |
|
|
solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, ShiftType.EVENING, t] for p in sorted_product_list for ell in line_tuples) |
|
|
<= |
|
|
solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, ShiftType.REGULAR, t] for p in sorted_product_list for ell in line_tuples) |
|
|
) |
|
|
|
|
|
|
|
|
if ShiftType.OVERTIME in active_shift_list and ShiftType.REGULAR in active_shift_list: |
|
|
print("\n[OVERTIME] Adding constraints to ensure overtime only when regular shift is insufficient...") |
|
|
|
|
|
for e in employee_type_list: |
|
|
for t in date_span_list: |
|
|
|
|
|
regular_capacity = max_employee_type_day[e][t] |
|
|
|
|
|
|
|
|
regular_usage = solver.Sum( |
|
|
TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, ShiftType.REGULAR, t] |
|
|
for p in sorted_product_list for ell in line_tuples |
|
|
) |
|
|
|
|
|
|
|
|
overtime_usage = solver.Sum( |
|
|
TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, ShiftType.OVERTIME, t] |
|
|
for p in sorted_product_list for ell in line_tuples |
|
|
) |
|
|
|
|
|
|
|
|
using_overtime = solver.IntVar(0, 1, f'using_overtime_{e}_{t}') |
|
|
|
|
|
|
|
|
|
|
|
min_regular_for_overtime = int(0.9 * regular_capacity) |
|
|
|
|
|
|
|
|
solver.Add(regular_usage >= min_regular_for_overtime * using_overtime) |
|
|
|
|
|
|
|
|
solver.Add(overtime_usage <= regular_capacity * using_overtime) |
|
|
|
|
|
overtime_constraints_added = len(employee_type_list) * len(date_span_list) * 2 |
|
|
print(f"[OVERTIME] Added {overtime_constraints_added} constraints ensuring overtime only when regular shifts are at 90%+ capacity") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if 'UNICEF Fixed term' in employee_type_list and FIXED_MIN_UNICEF_PER_DAY > 0: |
|
|
if ShiftType.REGULAR in active_shift_list: |
|
|
print(f"\n[FIXED STAFFING] Adding constraint for minimum {FIXED_MIN_UNICEF_PER_DAY} UNICEF employees in REGULAR shift per day...") |
|
|
for t in date_span_list: |
|
|
|
|
|
solver.Add( |
|
|
EMPLOYEE_COUNT['UNICEF Fixed term', ShiftType.REGULAR, t] >= FIXED_MIN_UNICEF_PER_DAY |
|
|
) |
|
|
print(f"[FIXED STAFFING] Added {len(date_span_list)} constraints ensuring >= {FIXED_MIN_UNICEF_PER_DAY} UNICEF employees in regular shift per day") |
|
|
else: |
|
|
print(f"\n[FIXED STAFFING] Warning: Regular shift not available, cannot enforce minimum UNICEF staffing") |
|
|
|
|
|
|
|
|
|
|
|
print("\n[HIERARCHY] Adding dependency constraints...") |
|
|
dependency_constraints_added = 0 |
|
|
|
|
|
for p in sorted_product_list: |
|
|
dependencies = KIT_DEPENDENCIES.get(p, []) |
|
|
if dependencies: |
|
|
|
|
|
p_level = KIT_LEVELS.get(p, 2) |
|
|
|
|
|
for dep in dependencies: |
|
|
if dep in sorted_product_list: |
|
|
|
|
|
p_completion = solver.Sum( |
|
|
t * Hours[p, ell, s, t] for ell in line_tuples for s in active_shift_list for t in date_span_list |
|
|
) |
|
|
dep_completion = solver.Sum( |
|
|
t * Hours[dep, ell, s, t] for ell in line_tuples for s in active_shift_list for t in date_span_list |
|
|
) |
|
|
|
|
|
|
|
|
solver.Add(dep_completion <= p_completion) |
|
|
dependency_constraints_added += 1 |
|
|
|
|
|
print(f" Added constraint: {dep} (dependency) <= {p} (level {p_level})") |
|
|
|
|
|
print(f"[HIERARCHY] Added {dependency_constraints_added} dependency constraints") |
|
|
|
|
|
|
|
|
status = solver.Solve() |
|
|
if status != pywraplp.Solver.OPTIMAL: |
|
|
status_names = {pywraplp.Solver.INFEASIBLE: "INFEASIBLE", pywraplp.Solver.UNBOUNDED: "UNBOUNDED"} |
|
|
print(f"No optimal solution. Status: {status} ({status_names.get(status, 'UNKNOWN')})") |
|
|
|
|
|
|
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
result = {} |
|
|
result['objective'] = solver.Objective().Value() |
|
|
|
|
|
|
|
|
prod_week = {p: sum(Units[p, ell, s, t].solution_value() for ell in line_tuples for s in active_shift_list for t in date_span_list) for p in sorted_product_list} |
|
|
result['weekly_production'] = prod_week |
|
|
|
|
|
|
|
|
schedule = [] |
|
|
for t in date_span_list: |
|
|
for ell in line_tuples: |
|
|
for s in active_shift_list: |
|
|
chosen = [p for p in sorted_product_list if Assignment[p, ell, s, t].solution_value() > 0.5] |
|
|
if chosen: |
|
|
p = chosen[0] |
|
|
schedule.append({ |
|
|
'day': t, |
|
|
'line_type_id': ell[0], |
|
|
'line_idx': ell[1], |
|
|
'shift': s, |
|
|
'product': p, |
|
|
'run_hours': Hours[p, ell, s, t].solution_value(), |
|
|
'units': Units[p, ell, s, t].solution_value(), |
|
|
}) |
|
|
result['run_schedule'] = schedule |
|
|
|
|
|
|
|
|
headcount = [] |
|
|
for e in employee_type_list: |
|
|
for s in active_shift_list: |
|
|
for t in date_span_list: |
|
|
used_ph = sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t].solution_value() for p in sorted_product_list for ell in line_tuples) |
|
|
need = ceil(used_ph / (Hmax_s[s] + 1e-9)) |
|
|
headcount.append({'emp_type': e, 'shift': s, 'day': t, |
|
|
'needed': need, 'available': max_employee_type_day[e][t]}) |
|
|
result['headcount_per_shift'] = headcount |
|
|
|
|
|
|
|
|
ph_by_day = [] |
|
|
for e in employee_type_list: |
|
|
for t in date_span_list: |
|
|
used = sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t].solution_value() for s in active_shift_list for p in sorted_product_list for ell in line_tuples) |
|
|
ph_by_day.append({'emp_type': e, 'day': t, |
|
|
'used_person_hours': used, |
|
|
'cap_person_hours': Hmax_daily * max_employee_type_day[e][t]}) |
|
|
result['person_hours_by_day'] = ph_by_day |
|
|
|
|
|
|
|
|
employee_count_by_shift = [] |
|
|
for e in employee_type_list: |
|
|
for s in active_shift_list: |
|
|
for t in date_span_list: |
|
|
count = int(EMPLOYEE_COUNT[e, s, t].solution_value()) |
|
|
used_hours = sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t].solution_value() |
|
|
for p in sorted_product_list for ell in line_tuples) |
|
|
avg_hours_per_employee = used_hours / count if count > 0 else 0 |
|
|
if count > 0: |
|
|
employee_count_by_shift.append({ |
|
|
'emp_type': e, |
|
|
'shift': s, |
|
|
'day': t, |
|
|
'employee_count': count, |
|
|
'total_person_hours': used_hours, |
|
|
'avg_hours_per_employee': avg_hours_per_employee, |
|
|
'available': max_employee_type_day[e][t] |
|
|
}) |
|
|
result['employee_count_by_shift'] = employee_count_by_shift |
|
|
|
|
|
|
|
|
employee_count_by_day = [] |
|
|
for e in employee_type_list: |
|
|
for t in date_span_list: |
|
|
|
|
|
total_count = sum(int(EMPLOYEE_COUNT[e, s, t].solution_value()) for s in active_shift_list) |
|
|
used_hours = sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t].solution_value() |
|
|
for s in active_shift_list for p in sorted_product_list for ell in line_tuples) |
|
|
avg_hours_per_employee = used_hours / total_count if total_count > 0 else 0 |
|
|
if total_count > 0: |
|
|
employee_count_by_day.append({ |
|
|
'emp_type': e, |
|
|
'day': t, |
|
|
'employee_count': total_count, |
|
|
'total_person_hours': used_hours, |
|
|
'avg_hours_per_employee': avg_hours_per_employee, |
|
|
'available': max_employee_type_day[e][t] |
|
|
}) |
|
|
result['employee_count_by_day'] = employee_count_by_day |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("Objective (min cost):", result['objective']) |
|
|
print("\n--- Weekly production by product ---") |
|
|
for p, u in prod_week.items(): |
|
|
print(f"{p}: {u:.1f} / demand {DEMAND_DICTIONARY.get(p,0)}") |
|
|
|
|
|
print("\n--- Schedule (line, shift, day) ---") |
|
|
for row in schedule: |
|
|
shift_name = ShiftType.get_name(row['shift']) |
|
|
line_name = LineType.get_name(row['line_type_id']) |
|
|
print(f"date_span_list{row['day']} {line_name}-{row['line_idx']} {shift_name}: " |
|
|
f"{row['product']} Hours={row['run_hours']:.2f}h Units={row['units']:.1f}") |
|
|
|
|
|
print("\n--- Implied headcount need (per type/shift/day) ---") |
|
|
for row in headcount: |
|
|
shift_name = ShiftType.get_name(row['shift']) |
|
|
print(f"{row['emp_type']}, {shift_name}, date_span_list{row['day']}: " |
|
|
f"need={row['needed']} (avail {row['available']})") |
|
|
|
|
|
print("\n--- Total person-hours by type/day ---") |
|
|
for row in ph_by_day: |
|
|
print(f"{row['emp_type']}, date_span_list{row['day']}: used={row['used_person_hours']:.1f} " |
|
|
f"(cap {row['cap_person_hours']})") |
|
|
|
|
|
print("\n--- Actual employee count by type/shift/day ---") |
|
|
for row in employee_count_by_shift: |
|
|
shift_name = ShiftType.get_name(row['shift']) |
|
|
print(f"{row['emp_type']}, {shift_name}, date_span_list{row['day']}: " |
|
|
f"count={row['employee_count']} employees, " |
|
|
f"total_hours={row['total_person_hours']:.1f}h, " |
|
|
f"avg={row['avg_hours_per_employee']:.1f}h/employee") |
|
|
|
|
|
print("\n--- Daily employee totals by type/day (sum across shifts) ---") |
|
|
for row in employee_count_by_day: |
|
|
print(f"{row['emp_type']}, date_span_list{row['day']}: " |
|
|
f"count={row['employee_count']} employees total, " |
|
|
f"total_hours={row['total_person_hours']:.1f}h, " |
|
|
f"avg={row['avg_hours_per_employee']:.1f}h/employee " |
|
|
f"(available: {row['available']})") |
|
|
|
|
|
|
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
optimizer = Optimizer() |
|
|
optimizer.run_optimization() |