Halimhailey's picture
Upload folder using huggingface_hub
ffa1f50 verified
# ============================================================
# SD_roster_real - Fixed Team Production Planning (Option A)
# - Uses config-style variable names from src/config/optimization_config.py
# - Team per product (simultaneous): UNICEF Fixed term / Humanizer
# - Line types via numeric ids: 6=long, 7=short
# - One product per (line, shift, day)
# - Weekly demand (across DATE_SPAN)
# ============================================================
from ortools.linear_solver import pywraplp
from math import ceil
import datetime
from src.config.constants import ShiftType, LineType, KitLevel
# ---- config import ----
# Import constants and other modules directly
from src.config.constants import ShiftType, LineType, DefaultConfig
import src.preprocess.extract as extract
from src.preprocess.hierarchy_parser import sort_products_by_hierarchy
class Optimizer:
"""Workforce optimization class that handles all configuration and optimization logic"""
def __init__(self):
"""Initialize optimizer with session state configuration"""
self.load_session_state_config()
self.load_data()
def load_session_state_config(self):
"""Load all configuration from session state"""
import streamlit as st
import datetime as dt
# Date configuration
self.start_date = st.session_state.start_date
self.planning_days = st.session_state.planning_days
self.start_datetime = dt.datetime.combine(self.start_date, dt.datetime.min.time())
self.end_date = self.start_datetime + dt.timedelta(days=self.planning_days - 1)
self.date_span = list(range(1, self.planning_days + 1))
# Employee and shift configuration
self.employee_type_list = list(st.session_state.selected_employee_types)
self.active_shift_list = sorted(list(st.session_state.selected_shifts))
print("\n[DEBUG] From session_state.selected_employee_types:")
for emp in self.employee_type_list:
print(f" - '{emp}' (len={len(emp)}, repr={repr(emp)})")
# Working hours configuration
self.max_hour_per_person_per_day = st.session_state.max_hour_per_person_per_day
self.max_hours_shift = {
ShiftType.REGULAR: st.session_state.max_hours_shift_1,
ShiftType.EVENING: st.session_state.max_hours_shift_2,
ShiftType.OVERTIME: st.session_state.max_hours_shift_3
}
# Workforce limits
self.max_employee_per_type_on_day = st.session_state.max_employee_per_type_on_day
# Operations configuration
self.line_counts = st.session_state.line_counts
self.max_parallel_workers = {
LineType.LONG_LINE: st.session_state.max_parallel_workers_long_line,
LineType.MINI_LOAD: st.session_state.max_parallel_workers_mini_load
}
# Cost configuration
self.cost_list_per_emp_shift = st.session_state.cost_list_per_emp_shift
# Payment mode configuration
self.payment_mode_config = st.session_state.payment_mode_config
# Fixed staffing requirements
self.fixed_min_unicef_per_day = st.session_state.fixed_min_unicef_per_day
print("✅ Session state configuration loaded successfully")
def load_data(self):
"""Load all required data from files"""
# Load hierarchy data
try:
kit_levels, dependencies, priority_order = extract.get_production_order_data()
self.kit_levels = kit_levels
self.kit_dependencies = dependencies
self.production_priority_order = priority_order
except:
self.kit_levels = {}
self.kit_dependencies = {}
self.production_priority_order = []
# Load kit line match data
try:
kit_line_match = extract.read_kit_line_match_data()
kit_line_match_dict = kit_line_match.set_index("kit_name")["line_type"].to_dict()
# Create line name to ID mapping
line_name_to_id = {
"long line": LineType.LONG_LINE,
"mini load": LineType.MINI_LOAD
}
# Convert line names to IDs
self.kit_line_match_dict = {}
for kit_name, line_name in kit_line_match_dict.items():
self.kit_line_match_dict[kit_name] = line_name_to_id.get(line_name.lower(), line_name)
except:
self.kit_line_match_dict = {}
# Load product and demand data
try:
from src.demand_filtering import DemandFilter
filter_instance = DemandFilter()
filter_instance.load_data(force_reload=True)
self.product_list = filter_instance.get_filtered_product_list()
self.demand_dictionary = filter_instance.get_filtered_demand_dictionary()
except:
self.product_list = []
self.demand_dictionary = {}
# Load team requirements
try:
print("\n[DEBUG] Loading team requirements from Kits Calculation...")
kits_df = extract.read_personnel_requirement_data()
print(f"[DEBUG] Loaded kits_df with {len(kits_df)} rows")
print(f"[DEBUG] Columns: {list(kits_df.columns)}")
# Initialize team requirements dictionary
self.team_req_per_product = {
"UNICEF Fixed term": {},
"Humanizer": {}
}
# Process each product in the product list
for product in self.product_list:
product_data = kits_df[kits_df['Kit'] == product]
if not product_data.empty:
# Extract Humanizer and UNICEF staff requirements
humanizer_req = product_data["Humanizer"].iloc[0]
unicef_req = product_data["UNICEF staff"].iloc[0]
# Convert to int (data is already cleaned in extract function)
self.team_req_per_product["Humanizer"][product] = int(humanizer_req)
self.team_req_per_product["UNICEF Fixed term"][product] = int(unicef_req)
else:
print(f"[WARN] Product {product} not found in Kits Calculation, setting requirements to 0")
self.team_req_per_product["Humanizer"][product] = 0
self.team_req_per_product["UNICEF Fixed term"][product] = 0
print(f"\n[DEBUG] team_req_per_product keys after loading:")
for key in self.team_req_per_product.keys():
product_count = len(self.team_req_per_product[key])
print(f" - '{key}' (len={len(key)}, {product_count} products)")
except Exception as e:
print(f"[ERROR] Failed to load team requirements: {e}")
import traceback
traceback.print_exc()
self.team_req_per_product = {}
# Load product speed data
try:
self.per_product_speed = extract.read_package_speed_data()
except:
self.per_product_speed = {}
print("✅ All data loaded successfully")
def build_lines(self):
"""Build line instances from session state configuration"""
line_tuples = []
try:
import streamlit as st
# Get selected line types from Data Selection tab
selected_lines = st.session_state.selected_lines
# Get line counts from Operations tab
line_counts = st.session_state.line_counts
print(f"Using lines from session state - selected: {selected_lines}, counts: {line_counts}")
for line_type in selected_lines:
count = line_counts.get(line_type, 0)
for i in range(1, count + 1):
line_tuples.append((line_type, i))
return line_tuples
except Exception as e:
print(f"Could not get line config from session state: {e}")
# Fallback: Use default values
print("Falling back to default line configuration")
default_selected_lines = [LineType.LONG_LINE, LineType.MINI_LOAD]
default_line_counts = {
LineType.LONG_LINE: DefaultConfig.LINE_COUNT_LONG_LINE,
LineType.MINI_LOAD: DefaultConfig.LINE_COUNT_MINI_LOAD
}
for line_type in default_selected_lines:
count = default_line_counts.get(line_type, 0)
for i in range(1, count + 1):
line_tuples.append((line_type, i))
return line_tuples
def run_optimization(self):
"""Run the main optimization algorithm"""
# *** CRITICAL: Load fresh data to reflect current Streamlit configs ***
print("\n" + "="*60)
print("🔄 LOADING FRESH DATA FOR OPTIMIZATION")
print("="*60)
print(f"📦 LOADED PRODUCTS: {len(self.product_list)} products")
print(f"📈 LOADED DEMAND: {sum(self.demand_dictionary.values())} total units")
print(f"👥 LOADED TEAM REQUIREMENTS: {len(self.team_req_per_product)} employee types")
# Debug: Print team requirements keys
print("\n[DEBUG] team_req_per_product employee types:")
for emp_type in self.team_req_per_product.keys():
print(f" - '{emp_type}'")
print("\n[DEBUG] self.employee_type_list:")
for emp_type in self.employee_type_list:
print(f" - '{emp_type}'")
# Build ACTIVE schedule for fresh product list
ACTIVE = {t: {p: 1 for p in self.product_list} for t in self.date_span}
# --- Sets ---
date_span_list = list(self.date_span)
employee_type_list = self.employee_type_list
active_shift_list = self.active_shift_list
print(f"\n[DEBUG] employee_type_list: {employee_type_list}")
print(f"[DEBUG] active_shift_list: {active_shift_list}")
# *** HIERARCHY SORTING: Sort products by production priority ***
print("\n" + "="*60)
print("🔗 APPLYING HIERARCHY-BASED PRODUCTION ORDERING")
print("="*60)
sorted_product_list = sort_products_by_hierarchy(list(self.product_list), self.kit_levels, self.kit_dependencies)
line_tuples = self.build_lines()
print("Lines", line_tuples)
print("PER_PRODUCT_SPEED", self.per_product_speed)
# --- Short aliases for parameters ---
print("\n[DEBUG] Creating variable aliases...")
Hmax_s = dict(self.max_hours_shift) # per-shift hours
Hmax_daily = self.max_hour_per_person_per_day
max_workers_line = dict(self.max_parallel_workers) # per line type
max_employee_type_day = self.max_employee_per_type_on_day # {emp_type:{t:headcount}}
cost = self.cost_list_per_emp_shift # {emp_type:{shift:cost}}
# Create aliases for data dictionaries
TEAM_REQ_PER_PRODUCT = self.team_req_per_product
DEMAND_DICTIONARY = self.demand_dictionary
KIT_LINE_MATCH_DICT = self.kit_line_match_dict
KIT_LEVELS = self.kit_levels
KIT_DEPENDENCIES = self.kit_dependencies
PER_PRODUCT_SPEED = self.per_product_speed
FIXED_MIN_UNICEF_PER_DAY = self.fixed_min_unicef_per_day
PAYMENT_MODE_CONFIG = self.payment_mode_config
# Mock missing config variables (if they exist in config, they'll be overridden)
EVENING_SHIFT_MODE = "normal"
EVENING_SHIFT_DEMAND_THRESHOLD = 0.9
print(f"[DEBUG] TEAM_REQ_PER_PRODUCT has {len(TEAM_REQ_PER_PRODUCT)} employee types")
print(f"[DEBUG] employee_type_list has {len(employee_type_list)} types")
# --- Feasibility quick checks ---
print("\n[DEBUG] Starting feasibility checks...")
# 1) If team size is greater than max_workers_line, block the product-line type combination
for i, p in enumerate(sorted_product_list):
print(f"[DEBUG] Checking product {i+1}/{len(sorted_product_list)}: {p}")
# Check if all employee types exist in TEAM_REQ_PER_PRODUCT
for e in employee_type_list:
if e not in TEAM_REQ_PER_PRODUCT:
print(f"[ERROR] Employee type '{e}' not found in TEAM_REQ_PER_PRODUCT!")
print(f"[ERROR] Available keys: {list(TEAM_REQ_PER_PRODUCT.keys())}")
raise KeyError(f"Employee type '{e}' not in team requirements data")
if p not in TEAM_REQ_PER_PRODUCT[e]:
print(f"[ERROR] Product '{p}' not found in TEAM_REQ_PER_PRODUCT['{e}']!")
raise KeyError(f"Product '{p}' not in team requirements for employee type '{e}'")
req_total = sum(TEAM_REQ_PER_PRODUCT[e][p] for e in employee_type_list)
print(f"[DEBUG] req_total: {req_total}")
lt = KIT_LINE_MATCH_DICT.get(p, 6) # Default to long line (6) if not found
if p not in KIT_LINE_MATCH_DICT:
print(f"[WARN] Product {p}: No line type mapping found, defaulting to long line (6)")
if req_total > max_workers_line.get(lt, 1e9):
print(f"[WARN] Product {p}: team size {req_total} > MAX_PARALLEL_WORKERS[{lt}] "
f"= {max_workers_line.get(lt)}. Blocked.")
# 2) Check if demand can be met without evening shift (only if in normal mode)
if EVENING_SHIFT_MODE == "normal":
total_demand = sum(DEMAND_DICTIONARY.get(p, 0) for p in sorted_product_list)
# Calculate maximum capacity with regular + overtime shifts only
regular_overtime_shifts = [s for s in active_shift_list if s in ShiftType.REGULAR_AND_OVERTIME]
max_capacity = 0
for p in sorted_product_list:
if p in PER_PRODUCT_SPEED:
product_speed = PER_PRODUCT_SPEED[p] # units per hour
# Calculate max hours available for this product across all lines and shifts
max_hours_per_product = 0
for ell in line_tuples:
for s in regular_overtime_shifts:
for t in date_span_list:
max_hours_per_product += Hmax_s[s]
max_capacity += product_speed * max_hours_per_product
capacity_ratio = max_capacity / total_demand if total_demand > 0 else float('inf')
print(f"[CAPACITY CHECK] Total demand: {total_demand}")
print(f"[CAPACITY CHECK] Max capacity (Regular + Overtime): {max_capacity:.1f}")
print(f"[CAPACITY CHECK] Capacity ratio: {capacity_ratio:.2f}")
if capacity_ratio < EVENING_SHIFT_DEMAND_THRESHOLD:
print(f"\n🚨 [ALERT] DEMAND TOO HIGH!")
print(f" Current capacity can only meet {capacity_ratio*100:.1f}% of demand")
print(f" Threshold: {EVENING_SHIFT_DEMAND_THRESHOLD*100:.1f}%")
print(f" RECOMMENDATION: Change EVENING_SHIFT_MODE to 'activate_evening' to enable evening shift")
print(f" This will add shift 3 to increase capacity\n")
# --- Solver ---
solver = pywraplp.Solver.CreateSolver('CBC')
if not solver:
raise RuntimeError("CBC solver not found.")
INF = solver.infinity()
# --- Variables ---
# Assignment[p,ell,s,t] ∈ {0,1}: 1 if product p runs on (line,shift,day)
Assignment, Hours, Units = {}, {}, {} # Hours: run hours, Units: production units
for p in sorted_product_list:
for ell in line_tuples: # ell = (line_type_id, idx)
for s in active_shift_list:
for t in date_span_list:
#Is product p assigned to run on line ell, during shift s, on day t?
Assignment[p, ell, s, t] = solver.BoolVar(f"Z_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}")
#How many hours does product p run on line ell, during shift s, on day t?
Hours[p, ell, s, t] = solver.NumVar(0, Hmax_s[s], f"T_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}")
#How many units does product p run on line ell, during shift s, on day t?
Units[p, ell, s, t] = solver.NumVar(0, INF, f"U_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}")
# Note: IDLE variables removed - we only track employees actually working on production
# Variable to track actual number of employees of each type working each shift each day
# This represents how many distinct employees of type e are working in shift s on day t
EMPLOYEE_COUNT = {}
for e in employee_type_list:
for s in active_shift_list:
for t in date_span_list:
# Note: Minimum staffing is per day, not per shift
# We'll handle the daily minimum constraint separately
max_count = max_employee_type_day.get(e, {}).get(t, 100)
EMPLOYEE_COUNT[e, s, t] = solver.IntVar(
0, # No minimum per shift (daily minimum handled separately)
max_count,
f"EmpCount_{e}_s{s}_day{t}"
)
# Track total person-hours worked by each employee type per shift per day
# This is needed for employee-centric wage calculation
EMPLOYEE_HOURS = {}
for e in employee_type_list:
for s in active_shift_list:
for t in date_span_list:
# Sum of all work hours for employee type e in shift s on day t
# This represents total person-hours (e.g., 5 employees × 8 hours = 40 person-hours)
EMPLOYEE_HOURS[e, s, t] = solver.Sum(
TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t]
for p in sorted_product_list
for ell in line_tuples
)
# Note: Binary variables for bulk payment are now created inline in the cost calculation
# --- Objective: Minimize total labor cost (wages) ---
# Employee-centric approach: calculate wages based on actual employees and their hours
print(f"\n[DEBUG] Payment mode configuration: {PAYMENT_MODE_CONFIG}")
# Build cost terms based on payment mode
cost_terms = []
for e in employee_type_list:
for s in active_shift_list:
for t in date_span_list:
payment_mode = PAYMENT_MODE_CONFIG.get(s, "partial") # Default to partial if not specified
if payment_mode == "partial":
# Partial payment: pay for actual person-hours worked
# Cost = hourly_rate × total_person_hours
# Example: $20/hr × 40 person-hours = $800
cost_terms.append(cost[e][s] * EMPLOYEE_HOURS[e, s, t])
elif payment_mode == "bulk":
# Bulk payment: if ANY work happens in shift, pay ALL working employees for FULL shift
# We need to know: did employee type e work at all in shift s on day t?
# Create binary: 1 if employee type e worked in this shift
work_in_shift = solver.BoolVar(f"work_{e}_s{s}_d{t}")
# Link binary to work hours
# If EMPLOYEE_HOURS > 0, then work_in_shift = 1
# If EMPLOYEE_HOURS = 0, then work_in_shift = 0
max_possible_hours = Hmax_s[s] * max_employee_type_day[e][t]
solver.Add(EMPLOYEE_HOURS[e, s, t] <= max_possible_hours * work_in_shift)
solver.Add(work_in_shift * 0.001 <= EMPLOYEE_HOURS[e, s, t])
# Calculate number of employees working in this shift
# This is approximately: ceil(EMPLOYEE_HOURS / Hmax_s[s])
# But we can use: employees_working_in_shift
# For simplicity, use EMPLOYEE_HOURS / Hmax_s[s] as continuous approximation
# Or better: create a variable for employees per shift
# Simpler approach: For bulk payment, assume if work happens,
# we need approximately EMPLOYEE_HOURS/Hmax_s[s] employees,
# and each gets paid for full shift
# Cost ≈ (EMPLOYEE_HOURS / Hmax_s[s]) × Hmax_s[s] × hourly_rate = EMPLOYEE_HOURS × hourly_rate
# But that's the same as partial! The difference is we round up employees.
# Better approach: Create variable for employees working in this specific shift
employees_in_shift = solver.IntVar(0, max_employee_type_day[e][t], f"emp_{e}_s{s}_d{t}")
# Link employees_in_shift to work requirements
# If EMPLOYEE_HOURS requires N employees, then employees_in_shift >= ceil(N)
solver.Add(employees_in_shift * Hmax_s[s] >= EMPLOYEE_HOURS[e, s, t])
# Cost: pay each employee for full shift
cost_terms.append(cost[e][s] * Hmax_s[s] * employees_in_shift)
# Note: No idle employee costs - only pay for employees actually working
total_cost = solver.Sum(cost_terms)
# Objective: minimize total labor cost (wages)
# This finds the optimal production schedule (product order, line assignment, timing)
# that minimizes total wages while meeting all demand and capacity constraints
solver.Minimize(total_cost)
# --- Constraints ---
# 1) Weekly demand - must meet exactly (no over/under production)
for p in sorted_product_list:
total_production = solver.Sum(Units[p, ell, s, t] for ell in line_tuples for s in active_shift_list for t in date_span_list)
demand = DEMAND_DICTIONARY.get(p, 0)
# Must produce at least the demand
solver.Add(total_production >= demand)
# Must not produce more than the demand (prevent overproduction)
solver.Add(total_production <= demand)
# 2) One product per (line,shift,day) + time gating
for ell in line_tuples:
for s in active_shift_list:
for t in date_span_list:
solver.Add(solver.Sum(Assignment[p, ell, s, t] for p in sorted_product_list) <= 1)
for p in sorted_product_list:
solver.Add(Hours[p, ell, s, t] <= Hmax_s[s] * Assignment[p, ell, s, t])
# 3) Product-line type compatibility + (optional) activity by day
for p in sorted_product_list:
req_lt = KIT_LINE_MATCH_DICT.get(p, LineType.LONG_LINE) # Default to long line if not found
req_total = sum(TEAM_REQ_PER_PRODUCT[e][p] for e in employee_type_list)
for ell in line_tuples:
allowed = (ell[0] == req_lt) and (req_total <= max_workers_line.get(ell[0], 1e9))
for s in active_shift_list:
for t in date_span_list:
if ACTIVE[t][p] == 0 or not allowed:
solver.Add(Assignment[p, ell, s, t] == 0)
solver.Add(Hours[p, ell, s, t] == 0)
solver.Add(Units[p, ell, s, t] == 0)
# 4) Line throughput: Units ≤ product_speed * Hours
for p in sorted_product_list:
for ell in line_tuples:
for s in active_shift_list:
for t in date_span_list:
# Get product speed (same speed regardless of line type)
if p in PER_PRODUCT_SPEED:
# Convert kit per day to kit per hour (assuming 7.5 hour workday)
speed = PER_PRODUCT_SPEED[p]
# Upper bound: units cannot exceed capacity
solver.Add(
Units[p, ell, s, t] <= speed * Hours[p, ell, s, t]
)
# Lower bound: if working, must produce (prevent phantom work)
solver.Add(
Units[p, ell, s, t] >= speed * Hours[p, ell, s, t]
)
else:
# Default speed if not found
default_speed = 800 / 7.5 # units per hour
print(f"Warning: No speed data for product {p}, using default {default_speed:.1f} per hour")
# Upper bound: units cannot exceed capacity
solver.Add(
Units[p, ell, s, t] <= default_speed * Hours[p, ell, s, t]
)
# Lower bound: if working, must produce (prevent phantom work)
solver.Add(
Units[p, ell, s, t] >= default_speed * Hours[p, ell, s, t]
)
# Working hours constraint: active employees cannot exceed shift hour capacity
for e in employee_type_list:
for s in active_shift_list:
for t in date_span_list:
# No idle employee constraints - employees are only counted when working
solver.Add(
solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t] for p in sorted_product_list for ell in line_tuples)
<= Hmax_s[s] * max_employee_type_day[e][t]
)
# 6) Per-shift staffing capacity by type: link employee count to actual work hours
# This constraint ensures EMPLOYEE_COUNT[e,s,t] represents the actual number of employees needed in each shift
for e in employee_type_list:
for s in active_shift_list:
for t in date_span_list:
# Total person-hours worked by employee type e in shift s on day t
total_person_hours_in_shift = solver.Sum(
TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t]
for p in sorted_product_list
for ell in line_tuples
)
# Employee count must be sufficient to cover the work in this shift
# If employees work H person-hours total and each can work max M hours/shift,
# then we need at least ceil(H/M) employees
# Constraint: employee_count × max_hours_per_shift >= total_person_hours_in_shift
solver.Add(EMPLOYEE_COUNT[e, s, t] * Hmax_s[s] >= total_person_hours_in_shift)
# 7) Shift ordering constraints (only apply if shifts are available)
# Evening shift after regular shift
if ShiftType.EVENING in active_shift_list and ShiftType.REGULAR in active_shift_list: # Only if both shifts are available
for e in employee_type_list:
for t in date_span_list:
solver.Add(
solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, ShiftType.EVENING, t] for p in sorted_product_list for ell in line_tuples)
<=
solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, ShiftType.REGULAR, t] for p in sorted_product_list for ell in line_tuples)
)
# Overtime should only be used when regular shift is at capacity
if ShiftType.OVERTIME in active_shift_list and ShiftType.REGULAR in active_shift_list: # Only if both shifts are available
print("\n[OVERTIME] Adding constraints to ensure overtime only when regular shift is insufficient...")
for e in employee_type_list:
for t in date_span_list:
# Get available regular capacity for this employee type and day
regular_capacity = max_employee_type_day[e][t]
# Total regular shift usage for this employee type and day
regular_usage = solver.Sum(
TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, ShiftType.REGULAR, t]
for p in sorted_product_list for ell in line_tuples
)
# Total overtime usage for this employee type and day
overtime_usage = solver.Sum(
TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, ShiftType.OVERTIME, t]
for p in sorted_product_list for ell in line_tuples
)
# Create binary variable: 1 if using overtime, 0 otherwise
using_overtime = solver.IntVar(0, 1, f'using_overtime_{e}_{t}')
# If using overtime, regular capacity must be utilized significantly
# Regular usage must be at least 90% of capacity to allow overtime
min_regular_for_overtime = int(0.9 * regular_capacity)
# Constraint 1: Can only use overtime if regular usage is high
solver.Add(regular_usage >= min_regular_for_overtime * using_overtime)
# Constraint 2: If any overtime is used, set the binary variable
solver.Add(overtime_usage <= regular_capacity * using_overtime)
overtime_constraints_added = len(employee_type_list) * len(date_span_list) * 2 # 2 constraints per employee type per day
print(f"[OVERTIME] Added {overtime_constraints_added} constraints ensuring overtime only when regular shifts are at 90%+ capacity")
# 7.5) Bulk payment linking constraints are now handled inline in the cost calculation
# 7.6) *** FIXED MINIMUM UNICEF EMPLOYEES CONSTRAINT ***
# Ensure minimum UNICEF fixed-term staff work in the REGULAR shift every day
# The minimum applies to the regular shift specifically (not overtime or evening)
if 'UNICEF Fixed term' in employee_type_list and FIXED_MIN_UNICEF_PER_DAY > 0:
if ShiftType.REGULAR in active_shift_list:
print(f"\n[FIXED STAFFING] Adding constraint for minimum {FIXED_MIN_UNICEF_PER_DAY} UNICEF employees in REGULAR shift per day...")
for t in date_span_list:
# At least FIXED_MIN_UNICEF_PER_DAY employees must work in the regular shift each day
solver.Add(
EMPLOYEE_COUNT['UNICEF Fixed term', ShiftType.REGULAR, t] >= FIXED_MIN_UNICEF_PER_DAY
)
print(f"[FIXED STAFFING] Added {len(date_span_list)} constraints ensuring >= {FIXED_MIN_UNICEF_PER_DAY} UNICEF employees in regular shift per day")
else:
print(f"\n[FIXED STAFFING] Warning: Regular shift not available, cannot enforce minimum UNICEF staffing")
# 8) *** HIERARCHY DEPENDENCY CONSTRAINTS ***
# For subkits with prepack dependencies: dependencies should be produced before or same time
print("\n[HIERARCHY] Adding dependency constraints...")
dependency_constraints_added = 0
for p in sorted_product_list:
dependencies = KIT_DEPENDENCIES.get(p, [])
if dependencies:
# Get the level of the current product
p_level = KIT_LEVELS.get(p, 2)
for dep in dependencies:
if dep in sorted_product_list: # Only if dependency is also in production list
# Calculate "completion time" for each product (sum of all production times)
p_completion = solver.Sum(
t * Hours[p, ell, s, t] for ell in line_tuples for s in active_shift_list for t in date_span_list
)
dep_completion = solver.Sum(
t * Hours[dep, ell, s, t] for ell in line_tuples for s in active_shift_list for t in date_span_list
)
# Dependency should complete before or at the same time
solver.Add(dep_completion <= p_completion)
dependency_constraints_added += 1
print(f" Added constraint: {dep} (dependency) <= {p} (level {p_level})")
print(f"[HIERARCHY] Added {dependency_constraints_added} dependency constraints")
# --- Solve ---
status = solver.Solve()
if status != pywraplp.Solver.OPTIMAL:
status_names = {pywraplp.Solver.INFEASIBLE: "INFEASIBLE", pywraplp.Solver.UNBOUNDED: "UNBOUNDED"}
print(f"No optimal solution. Status: {status} ({status_names.get(status, 'UNKNOWN')})")
# Debug hint:
# solver.EnableOutput()
# solver.ExportModelAsLpFile("model.lp")
return None
# --- Report ---
result = {}
result['objective'] = solver.Objective().Value()
# Weekly production
prod_week = {p: sum(Units[p, ell, s, t].solution_value() for ell in line_tuples for s in active_shift_list for t in date_span_list) for p in sorted_product_list}
result['weekly_production'] = prod_week
# Which product ran on which line/shift/day
schedule = []
for t in date_span_list:
for ell in line_tuples:
for s in active_shift_list:
chosen = [p for p in sorted_product_list if Assignment[p, ell, s, t].solution_value() > 0.5]
if chosen:
p = chosen[0]
schedule.append({
'day': t,
'line_type_id': ell[0],
'line_idx': ell[1],
'shift': s,
'product': p,
'run_hours': Hours[p, ell, s, t].solution_value(),
'units': Units[p, ell, s, t].solution_value(),
})
result['run_schedule'] = schedule
# Implied headcount by type/shift/day (ceil)
headcount = []
for e in employee_type_list:
for s in active_shift_list:
for t in date_span_list:
used_ph = sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t].solution_value() for p in sorted_product_list for ell in line_tuples)
need = ceil(used_ph / (Hmax_s[s] + 1e-9))
headcount.append({'emp_type': e, 'shift': s, 'day': t,
'needed': need, 'available': max_employee_type_day[e][t]})
result['headcount_per_shift'] = headcount
# Total person-hours by type/day (≤ 14h * headcount)
ph_by_day = []
for e in employee_type_list:
for t in date_span_list:
used = sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t].solution_value() for s in active_shift_list for p in sorted_product_list for ell in line_tuples)
ph_by_day.append({'emp_type': e, 'day': t,
'used_person_hours': used,
'cap_person_hours': Hmax_daily * max_employee_type_day[e][t]})
result['person_hours_by_day'] = ph_by_day
# Actual employee count per type/shift/day (from EMPLOYEE_COUNT variable)
employee_count_by_shift = []
for e in employee_type_list:
for s in active_shift_list:
for t in date_span_list:
count = int(EMPLOYEE_COUNT[e, s, t].solution_value())
used_hours = sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t].solution_value()
for p in sorted_product_list for ell in line_tuples)
avg_hours_per_employee = used_hours / count if count > 0 else 0
if count > 0: # Only add entries where employees are working
employee_count_by_shift.append({
'emp_type': e,
'shift': s,
'day': t,
'employee_count': count,
'total_person_hours': used_hours,
'avg_hours_per_employee': avg_hours_per_employee,
'available': max_employee_type_day[e][t]
})
result['employee_count_by_shift'] = employee_count_by_shift
# Also calculate daily totals (summing across shifts)
employee_count_by_day = []
for e in employee_type_list:
for t in date_span_list:
# Sum employees across all shifts for this day
total_count = sum(int(EMPLOYEE_COUNT[e, s, t].solution_value()) for s in active_shift_list)
used_hours = sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t].solution_value()
for s in active_shift_list for p in sorted_product_list for ell in line_tuples)
avg_hours_per_employee = used_hours / total_count if total_count > 0 else 0
if total_count > 0: # Only add days where employees are working
employee_count_by_day.append({
'emp_type': e,
'day': t,
'employee_count': total_count,
'total_person_hours': used_hours,
'avg_hours_per_employee': avg_hours_per_employee,
'available': max_employee_type_day[e][t]
})
result['employee_count_by_day'] = employee_count_by_day
# Note: Idle employee tracking removed - only counting employees actually working
# Pretty print
print("Objective (min cost):", result['objective'])
print("\n--- Weekly production by product ---")
for p, u in prod_week.items():
print(f"{p}: {u:.1f} / demand {DEMAND_DICTIONARY.get(p,0)}")
print("\n--- Schedule (line, shift, day) ---")
for row in schedule:
shift_name = ShiftType.get_name(row['shift'])
line_name = LineType.get_name(row['line_type_id'])
print(f"date_span_list{row['day']} {line_name}-{row['line_idx']} {shift_name}: "
f"{row['product']} Hours={row['run_hours']:.2f}h Units={row['units']:.1f}")
print("\n--- Implied headcount need (per type/shift/day) ---")
for row in headcount:
shift_name = ShiftType.get_name(row['shift'])
print(f"{row['emp_type']}, {shift_name}, date_span_list{row['day']}: "
f"need={row['needed']} (avail {row['available']})")
print("\n--- Total person-hours by type/day ---")
for row in ph_by_day:
print(f"{row['emp_type']}, date_span_list{row['day']}: used={row['used_person_hours']:.1f} "
f"(cap {row['cap_person_hours']})")
print("\n--- Actual employee count by type/shift/day ---")
for row in employee_count_by_shift:
shift_name = ShiftType.get_name(row['shift'])
print(f"{row['emp_type']}, {shift_name}, date_span_list{row['day']}: "
f"count={row['employee_count']} employees, "
f"total_hours={row['total_person_hours']:.1f}h, "
f"avg={row['avg_hours_per_employee']:.1f}h/employee")
print("\n--- Daily employee totals by type/day (sum across shifts) ---")
for row in employee_count_by_day:
print(f"{row['emp_type']}, date_span_list{row['day']}: "
f"count={row['employee_count']} employees total, "
f"total_hours={row['total_person_hours']:.1f}h, "
f"avg={row['avg_hours_per_employee']:.1f}h/employee "
f"(available: {row['available']})")
# Note: Idle employee reporting removed - only tracking employees actually working
return result
if __name__ == "__main__":
optimizer = Optimizer()
optimizer.run_optimization()