# ============================================================ # SD_roster_real - Fixed Team Production Planning (Option A) # - Uses config-style variable names from src/config/optimization_config.py # - Team per product (simultaneous): UNICEF Fixed term / Humanizer # - Line types via numeric ids: 6=long, 7=short # - One product per (line, shift, day) # - Weekly demand (across DATE_SPAN) # ============================================================ from ortools.linear_solver import pywraplp from math import ceil import datetime from src.config.constants import ShiftType, LineType, KitLevel # ---- config import ---- # Import constants and other modules directly from src.config.constants import ShiftType, LineType, DefaultConfig import src.preprocess.extract as extract from src.preprocess.hierarchy_parser import sort_products_by_hierarchy class Optimizer: """Workforce optimization class that handles all configuration and optimization logic""" def __init__(self): """Initialize optimizer with session state configuration""" self.load_session_state_config() self.load_data() def load_session_state_config(self): """Load all configuration from session state""" import streamlit as st import datetime as dt # Date configuration self.start_date = st.session_state.start_date self.planning_days = st.session_state.planning_days self.start_datetime = dt.datetime.combine(self.start_date, dt.datetime.min.time()) self.end_date = self.start_datetime + dt.timedelta(days=self.planning_days - 1) self.date_span = list(range(1, self.planning_days + 1)) # Employee and shift configuration self.employee_type_list = list(st.session_state.selected_employee_types) self.active_shift_list = sorted(list(st.session_state.selected_shifts)) print("\n[DEBUG] From session_state.selected_employee_types:") for emp in self.employee_type_list: print(f" - '{emp}' (len={len(emp)}, repr={repr(emp)})") # Working hours configuration self.max_hour_per_person_per_day = st.session_state.max_hour_per_person_per_day self.max_hours_shift = { ShiftType.REGULAR: st.session_state.max_hours_shift_1, ShiftType.EVENING: st.session_state.max_hours_shift_2, ShiftType.OVERTIME: st.session_state.max_hours_shift_3 } # Workforce limits self.max_employee_per_type_on_day = st.session_state.max_employee_per_type_on_day # Operations configuration self.line_counts = st.session_state.line_counts self.max_parallel_workers = { LineType.LONG_LINE: st.session_state.max_parallel_workers_long_line, LineType.MINI_LOAD: st.session_state.max_parallel_workers_mini_load } # Cost configuration self.cost_list_per_emp_shift = st.session_state.cost_list_per_emp_shift # Payment mode configuration self.payment_mode_config = st.session_state.payment_mode_config # Fixed staffing requirements self.fixed_min_unicef_per_day = st.session_state.fixed_min_unicef_per_day print("āœ… Session state configuration loaded successfully") def load_data(self): """Load all required data from files""" # Load hierarchy data try: kit_levels, dependencies, priority_order = extract.get_production_order_data() self.kit_levels = kit_levels self.kit_dependencies = dependencies self.production_priority_order = priority_order except: self.kit_levels = {} self.kit_dependencies = {} self.production_priority_order = [] # Load kit line match data try: kit_line_match = extract.read_kit_line_match_data() kit_line_match_dict = kit_line_match.set_index("kit_name")["line_type"].to_dict() # Create line name to ID mapping line_name_to_id = { "long line": LineType.LONG_LINE, "mini load": LineType.MINI_LOAD } # Convert line names to IDs self.kit_line_match_dict = {} for kit_name, line_name in kit_line_match_dict.items(): self.kit_line_match_dict[kit_name] = line_name_to_id.get(line_name.lower(), line_name) except: self.kit_line_match_dict = {} # Load product and demand data try: from src.demand_filtering import DemandFilter filter_instance = DemandFilter() filter_instance.load_data(force_reload=True) self.product_list = filter_instance.get_filtered_product_list() self.demand_dictionary = filter_instance.get_filtered_demand_dictionary() except: self.product_list = [] self.demand_dictionary = {} # Load team requirements try: print("\n[DEBUG] Loading team requirements from Kits Calculation...") kits_df = extract.read_personnel_requirement_data() print(f"[DEBUG] Loaded kits_df with {len(kits_df)} rows") print(f"[DEBUG] Columns: {list(kits_df.columns)}") # Initialize team requirements dictionary self.team_req_per_product = { "UNICEF Fixed term": {}, "Humanizer": {} } # Process each product in the product list for product in self.product_list: product_data = kits_df[kits_df['Kit'] == product] if not product_data.empty: # Extract Humanizer and UNICEF staff requirements humanizer_req = product_data["Humanizer"].iloc[0] unicef_req = product_data["UNICEF staff"].iloc[0] # Convert to int (data is already cleaned in extract function) self.team_req_per_product["Humanizer"][product] = int(humanizer_req) self.team_req_per_product["UNICEF Fixed term"][product] = int(unicef_req) else: print(f"[WARN] Product {product} not found in Kits Calculation, setting requirements to 0") self.team_req_per_product["Humanizer"][product] = 0 self.team_req_per_product["UNICEF Fixed term"][product] = 0 print(f"\n[DEBUG] team_req_per_product keys after loading:") for key in self.team_req_per_product.keys(): product_count = len(self.team_req_per_product[key]) print(f" - '{key}' (len={len(key)}, {product_count} products)") except Exception as e: print(f"[ERROR] Failed to load team requirements: {e}") import traceback traceback.print_exc() self.team_req_per_product = {} # Load product speed data try: self.per_product_speed = extract.read_package_speed_data() except: self.per_product_speed = {} print("āœ… All data loaded successfully") def build_lines(self): """Build line instances from session state configuration""" line_tuples = [] try: import streamlit as st # Get selected line types from Data Selection tab selected_lines = st.session_state.selected_lines # Get line counts from Operations tab line_counts = st.session_state.line_counts print(f"Using lines from session state - selected: {selected_lines}, counts: {line_counts}") for line_type in selected_lines: count = line_counts.get(line_type, 0) for i in range(1, count + 1): line_tuples.append((line_type, i)) return line_tuples except Exception as e: print(f"Could not get line config from session state: {e}") # Fallback: Use default values print("Falling back to default line configuration") default_selected_lines = [LineType.LONG_LINE, LineType.MINI_LOAD] default_line_counts = { LineType.LONG_LINE: DefaultConfig.LINE_COUNT_LONG_LINE, LineType.MINI_LOAD: DefaultConfig.LINE_COUNT_MINI_LOAD } for line_type in default_selected_lines: count = default_line_counts.get(line_type, 0) for i in range(1, count + 1): line_tuples.append((line_type, i)) return line_tuples def run_optimization(self): """Run the main optimization algorithm""" # *** CRITICAL: Load fresh data to reflect current Streamlit configs *** print("\n" + "="*60) print("šŸ”„ LOADING FRESH DATA FOR OPTIMIZATION") print("="*60) print(f"šŸ“¦ LOADED PRODUCTS: {len(self.product_list)} products") print(f"šŸ“ˆ LOADED DEMAND: {sum(self.demand_dictionary.values())} total units") print(f"šŸ‘„ LOADED TEAM REQUIREMENTS: {len(self.team_req_per_product)} employee types") # Debug: Print team requirements keys print("\n[DEBUG] team_req_per_product employee types:") for emp_type in self.team_req_per_product.keys(): print(f" - '{emp_type}'") print("\n[DEBUG] self.employee_type_list:") for emp_type in self.employee_type_list: print(f" - '{emp_type}'") # Build ACTIVE schedule for fresh product list ACTIVE = {t: {p: 1 for p in self.product_list} for t in self.date_span} # --- Sets --- date_span_list = list(self.date_span) employee_type_list = self.employee_type_list active_shift_list = self.active_shift_list print(f"\n[DEBUG] employee_type_list: {employee_type_list}") print(f"[DEBUG] active_shift_list: {active_shift_list}") # *** HIERARCHY SORTING: Sort products by production priority *** print("\n" + "="*60) print("šŸ”— APPLYING HIERARCHY-BASED PRODUCTION ORDERING") print("="*60) sorted_product_list = sort_products_by_hierarchy(list(self.product_list), self.kit_levels, self.kit_dependencies) line_tuples = self.build_lines() print("Lines", line_tuples) print("PER_PRODUCT_SPEED", self.per_product_speed) # --- Short aliases for parameters --- print("\n[DEBUG] Creating variable aliases...") Hmax_s = dict(self.max_hours_shift) # per-shift hours Hmax_daily = self.max_hour_per_person_per_day max_workers_line = dict(self.max_parallel_workers) # per line type max_employee_type_day = self.max_employee_per_type_on_day # {emp_type:{t:headcount}} cost = self.cost_list_per_emp_shift # {emp_type:{shift:cost}} # Create aliases for data dictionaries TEAM_REQ_PER_PRODUCT = self.team_req_per_product DEMAND_DICTIONARY = self.demand_dictionary KIT_LINE_MATCH_DICT = self.kit_line_match_dict KIT_LEVELS = self.kit_levels KIT_DEPENDENCIES = self.kit_dependencies PER_PRODUCT_SPEED = self.per_product_speed FIXED_MIN_UNICEF_PER_DAY = self.fixed_min_unicef_per_day PAYMENT_MODE_CONFIG = self.payment_mode_config # Mock missing config variables (if they exist in config, they'll be overridden) EVENING_SHIFT_MODE = "normal" EVENING_SHIFT_DEMAND_THRESHOLD = 0.9 print(f"[DEBUG] TEAM_REQ_PER_PRODUCT has {len(TEAM_REQ_PER_PRODUCT)} employee types") print(f"[DEBUG] employee_type_list has {len(employee_type_list)} types") # --- Feasibility quick checks --- print("\n[DEBUG] Starting feasibility checks...") # 1) If team size is greater than max_workers_line, block the product-line type combination for i, p in enumerate(sorted_product_list): print(f"[DEBUG] Checking product {i+1}/{len(sorted_product_list)}: {p}") # Check if all employee types exist in TEAM_REQ_PER_PRODUCT for e in employee_type_list: if e not in TEAM_REQ_PER_PRODUCT: print(f"[ERROR] Employee type '{e}' not found in TEAM_REQ_PER_PRODUCT!") print(f"[ERROR] Available keys: {list(TEAM_REQ_PER_PRODUCT.keys())}") raise KeyError(f"Employee type '{e}' not in team requirements data") if p not in TEAM_REQ_PER_PRODUCT[e]: print(f"[ERROR] Product '{p}' not found in TEAM_REQ_PER_PRODUCT['{e}']!") raise KeyError(f"Product '{p}' not in team requirements for employee type '{e}'") req_total = sum(TEAM_REQ_PER_PRODUCT[e][p] for e in employee_type_list) print(f"[DEBUG] req_total: {req_total}") lt = KIT_LINE_MATCH_DICT.get(p, 6) # Default to long line (6) if not found if p not in KIT_LINE_MATCH_DICT: print(f"[WARN] Product {p}: No line type mapping found, defaulting to long line (6)") if req_total > max_workers_line.get(lt, 1e9): print(f"[WARN] Product {p}: team size {req_total} > MAX_PARALLEL_WORKERS[{lt}] " f"= {max_workers_line.get(lt)}. Blocked.") # 2) Check if demand can be met without evening shift (only if in normal mode) if EVENING_SHIFT_MODE == "normal": total_demand = sum(DEMAND_DICTIONARY.get(p, 0) for p in sorted_product_list) # Calculate maximum capacity with regular + overtime shifts only regular_overtime_shifts = [s for s in active_shift_list if s in ShiftType.REGULAR_AND_OVERTIME] max_capacity = 0 for p in sorted_product_list: if p in PER_PRODUCT_SPEED: product_speed = PER_PRODUCT_SPEED[p] # units per hour # Calculate max hours available for this product across all lines and shifts max_hours_per_product = 0 for ell in line_tuples: for s in regular_overtime_shifts: for t in date_span_list: max_hours_per_product += Hmax_s[s] max_capacity += product_speed * max_hours_per_product capacity_ratio = max_capacity / total_demand if total_demand > 0 else float('inf') print(f"[CAPACITY CHECK] Total demand: {total_demand}") print(f"[CAPACITY CHECK] Max capacity (Regular + Overtime): {max_capacity:.1f}") print(f"[CAPACITY CHECK] Capacity ratio: {capacity_ratio:.2f}") if capacity_ratio < EVENING_SHIFT_DEMAND_THRESHOLD: print(f"\n🚨 [ALERT] DEMAND TOO HIGH!") print(f" Current capacity can only meet {capacity_ratio*100:.1f}% of demand") print(f" Threshold: {EVENING_SHIFT_DEMAND_THRESHOLD*100:.1f}%") print(f" RECOMMENDATION: Change EVENING_SHIFT_MODE to 'activate_evening' to enable evening shift") print(f" This will add shift 3 to increase capacity\n") # --- Solver --- solver = pywraplp.Solver.CreateSolver('CBC') if not solver: raise RuntimeError("CBC solver not found.") INF = solver.infinity() # --- Variables --- # Assignment[p,ell,s,t] ∈ {0,1}: 1 if product p runs on (line,shift,day) Assignment, Hours, Units = {}, {}, {} # Hours: run hours, Units: production units for p in sorted_product_list: for ell in line_tuples: # ell = (line_type_id, idx) for s in active_shift_list: for t in date_span_list: #Is product p assigned to run on line ell, during shift s, on day t? Assignment[p, ell, s, t] = solver.BoolVar(f"Z_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}") #How many hours does product p run on line ell, during shift s, on day t? Hours[p, ell, s, t] = solver.NumVar(0, Hmax_s[s], f"T_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}") #How many units does product p run on line ell, during shift s, on day t? Units[p, ell, s, t] = solver.NumVar(0, INF, f"U_{p}_{ell[0]}_{ell[1]}_s{s}_d{t}") # Note: IDLE variables removed - we only track employees actually working on production # Variable to track actual number of employees of each type working each shift each day # This represents how many distinct employees of type e are working in shift s on day t EMPLOYEE_COUNT = {} for e in employee_type_list: for s in active_shift_list: for t in date_span_list: # Note: Minimum staffing is per day, not per shift # We'll handle the daily minimum constraint separately max_count = max_employee_type_day.get(e, {}).get(t, 100) EMPLOYEE_COUNT[e, s, t] = solver.IntVar( 0, # No minimum per shift (daily minimum handled separately) max_count, f"EmpCount_{e}_s{s}_day{t}" ) # Track total person-hours worked by each employee type per shift per day # This is needed for employee-centric wage calculation EMPLOYEE_HOURS = {} for e in employee_type_list: for s in active_shift_list: for t in date_span_list: # Sum of all work hours for employee type e in shift s on day t # This represents total person-hours (e.g., 5 employees Ɨ 8 hours = 40 person-hours) EMPLOYEE_HOURS[e, s, t] = solver.Sum( TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t] for p in sorted_product_list for ell in line_tuples ) # Note: Binary variables for bulk payment are now created inline in the cost calculation # --- Objective: Minimize total labor cost (wages) --- # Employee-centric approach: calculate wages based on actual employees and their hours print(f"\n[DEBUG] Payment mode configuration: {PAYMENT_MODE_CONFIG}") # Build cost terms based on payment mode cost_terms = [] for e in employee_type_list: for s in active_shift_list: for t in date_span_list: payment_mode = PAYMENT_MODE_CONFIG.get(s, "partial") # Default to partial if not specified if payment_mode == "partial": # Partial payment: pay for actual person-hours worked # Cost = hourly_rate Ɨ total_person_hours # Example: $20/hr Ɨ 40 person-hours = $800 cost_terms.append(cost[e][s] * EMPLOYEE_HOURS[e, s, t]) elif payment_mode == "bulk": # Bulk payment: if ANY work happens in shift, pay ALL working employees for FULL shift # We need to know: did employee type e work at all in shift s on day t? # Create binary: 1 if employee type e worked in this shift work_in_shift = solver.BoolVar(f"work_{e}_s{s}_d{t}") # Link binary to work hours # If EMPLOYEE_HOURS > 0, then work_in_shift = 1 # If EMPLOYEE_HOURS = 0, then work_in_shift = 0 max_possible_hours = Hmax_s[s] * max_employee_type_day[e][t] solver.Add(EMPLOYEE_HOURS[e, s, t] <= max_possible_hours * work_in_shift) solver.Add(work_in_shift * 0.001 <= EMPLOYEE_HOURS[e, s, t]) # Calculate number of employees working in this shift # This is approximately: ceil(EMPLOYEE_HOURS / Hmax_s[s]) # But we can use: employees_working_in_shift # For simplicity, use EMPLOYEE_HOURS / Hmax_s[s] as continuous approximation # Or better: create a variable for employees per shift # Simpler approach: For bulk payment, assume if work happens, # we need approximately EMPLOYEE_HOURS/Hmax_s[s] employees, # and each gets paid for full shift # Cost ā‰ˆ (EMPLOYEE_HOURS / Hmax_s[s]) Ɨ Hmax_s[s] Ɨ hourly_rate = EMPLOYEE_HOURS Ɨ hourly_rate # But that's the same as partial! The difference is we round up employees. # Better approach: Create variable for employees working in this specific shift employees_in_shift = solver.IntVar(0, max_employee_type_day[e][t], f"emp_{e}_s{s}_d{t}") # Link employees_in_shift to work requirements # If EMPLOYEE_HOURS requires N employees, then employees_in_shift >= ceil(N) solver.Add(employees_in_shift * Hmax_s[s] >= EMPLOYEE_HOURS[e, s, t]) # Cost: pay each employee for full shift cost_terms.append(cost[e][s] * Hmax_s[s] * employees_in_shift) # Note: No idle employee costs - only pay for employees actually working total_cost = solver.Sum(cost_terms) # Objective: minimize total labor cost (wages) # This finds the optimal production schedule (product order, line assignment, timing) # that minimizes total wages while meeting all demand and capacity constraints solver.Minimize(total_cost) # --- Constraints --- # 1) Weekly demand - must meet exactly (no over/under production) for p in sorted_product_list: total_production = solver.Sum(Units[p, ell, s, t] for ell in line_tuples for s in active_shift_list for t in date_span_list) demand = DEMAND_DICTIONARY.get(p, 0) # Must produce at least the demand solver.Add(total_production >= demand) # Must not produce more than the demand (prevent overproduction) solver.Add(total_production <= demand) # 2) One product per (line,shift,day) + time gating for ell in line_tuples: for s in active_shift_list: for t in date_span_list: solver.Add(solver.Sum(Assignment[p, ell, s, t] for p in sorted_product_list) <= 1) for p in sorted_product_list: solver.Add(Hours[p, ell, s, t] <= Hmax_s[s] * Assignment[p, ell, s, t]) # 3) Product-line type compatibility + (optional) activity by day for p in sorted_product_list: req_lt = KIT_LINE_MATCH_DICT.get(p, LineType.LONG_LINE) # Default to long line if not found req_total = sum(TEAM_REQ_PER_PRODUCT[e][p] for e in employee_type_list) for ell in line_tuples: allowed = (ell[0] == req_lt) and (req_total <= max_workers_line.get(ell[0], 1e9)) for s in active_shift_list: for t in date_span_list: if ACTIVE[t][p] == 0 or not allowed: solver.Add(Assignment[p, ell, s, t] == 0) solver.Add(Hours[p, ell, s, t] == 0) solver.Add(Units[p, ell, s, t] == 0) # 4) Line throughput: Units ≤ product_speed * Hours for p in sorted_product_list: for ell in line_tuples: for s in active_shift_list: for t in date_span_list: # Get product speed (same speed regardless of line type) if p in PER_PRODUCT_SPEED: # Convert kit per day to kit per hour (assuming 7.5 hour workday) speed = PER_PRODUCT_SPEED[p] # Upper bound: units cannot exceed capacity solver.Add( Units[p, ell, s, t] <= speed * Hours[p, ell, s, t] ) # Lower bound: if working, must produce (prevent phantom work) solver.Add( Units[p, ell, s, t] >= speed * Hours[p, ell, s, t] ) else: # Default speed if not found default_speed = 800 / 7.5 # units per hour print(f"Warning: No speed data for product {p}, using default {default_speed:.1f} per hour") # Upper bound: units cannot exceed capacity solver.Add( Units[p, ell, s, t] <= default_speed * Hours[p, ell, s, t] ) # Lower bound: if working, must produce (prevent phantom work) solver.Add( Units[p, ell, s, t] >= default_speed * Hours[p, ell, s, t] ) # Working hours constraint: active employees cannot exceed shift hour capacity for e in employee_type_list: for s in active_shift_list: for t in date_span_list: # No idle employee constraints - employees are only counted when working solver.Add( solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t] for p in sorted_product_list for ell in line_tuples) <= Hmax_s[s] * max_employee_type_day[e][t] ) # 6) Per-shift staffing capacity by type: link employee count to actual work hours # This constraint ensures EMPLOYEE_COUNT[e,s,t] represents the actual number of employees needed in each shift for e in employee_type_list: for s in active_shift_list: for t in date_span_list: # Total person-hours worked by employee type e in shift s on day t total_person_hours_in_shift = solver.Sum( TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t] for p in sorted_product_list for ell in line_tuples ) # Employee count must be sufficient to cover the work in this shift # If employees work H person-hours total and each can work max M hours/shift, # then we need at least ceil(H/M) employees # Constraint: employee_count Ɨ max_hours_per_shift >= total_person_hours_in_shift solver.Add(EMPLOYEE_COUNT[e, s, t] * Hmax_s[s] >= total_person_hours_in_shift) # 7) Shift ordering constraints (only apply if shifts are available) # Evening shift after regular shift if ShiftType.EVENING in active_shift_list and ShiftType.REGULAR in active_shift_list: # Only if both shifts are available for e in employee_type_list: for t in date_span_list: solver.Add( solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, ShiftType.EVENING, t] for p in sorted_product_list for ell in line_tuples) <= solver.Sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, ShiftType.REGULAR, t] for p in sorted_product_list for ell in line_tuples) ) # Overtime should only be used when regular shift is at capacity if ShiftType.OVERTIME in active_shift_list and ShiftType.REGULAR in active_shift_list: # Only if both shifts are available print("\n[OVERTIME] Adding constraints to ensure overtime only when regular shift is insufficient...") for e in employee_type_list: for t in date_span_list: # Get available regular capacity for this employee type and day regular_capacity = max_employee_type_day[e][t] # Total regular shift usage for this employee type and day regular_usage = solver.Sum( TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, ShiftType.REGULAR, t] for p in sorted_product_list for ell in line_tuples ) # Total overtime usage for this employee type and day overtime_usage = solver.Sum( TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, ShiftType.OVERTIME, t] for p in sorted_product_list for ell in line_tuples ) # Create binary variable: 1 if using overtime, 0 otherwise using_overtime = solver.IntVar(0, 1, f'using_overtime_{e}_{t}') # If using overtime, regular capacity must be utilized significantly # Regular usage must be at least 90% of capacity to allow overtime min_regular_for_overtime = int(0.9 * regular_capacity) # Constraint 1: Can only use overtime if regular usage is high solver.Add(regular_usage >= min_regular_for_overtime * using_overtime) # Constraint 2: If any overtime is used, set the binary variable solver.Add(overtime_usage <= regular_capacity * using_overtime) overtime_constraints_added = len(employee_type_list) * len(date_span_list) * 2 # 2 constraints per employee type per day print(f"[OVERTIME] Added {overtime_constraints_added} constraints ensuring overtime only when regular shifts are at 90%+ capacity") # 7.5) Bulk payment linking constraints are now handled inline in the cost calculation # 7.6) *** FIXED MINIMUM UNICEF EMPLOYEES CONSTRAINT *** # Ensure minimum UNICEF fixed-term staff work in the REGULAR shift every day # The minimum applies to the regular shift specifically (not overtime or evening) if 'UNICEF Fixed term' in employee_type_list and FIXED_MIN_UNICEF_PER_DAY > 0: if ShiftType.REGULAR in active_shift_list: print(f"\n[FIXED STAFFING] Adding constraint for minimum {FIXED_MIN_UNICEF_PER_DAY} UNICEF employees in REGULAR shift per day...") for t in date_span_list: # At least FIXED_MIN_UNICEF_PER_DAY employees must work in the regular shift each day solver.Add( EMPLOYEE_COUNT['UNICEF Fixed term', ShiftType.REGULAR, t] >= FIXED_MIN_UNICEF_PER_DAY ) print(f"[FIXED STAFFING] Added {len(date_span_list)} constraints ensuring >= {FIXED_MIN_UNICEF_PER_DAY} UNICEF employees in regular shift per day") else: print(f"\n[FIXED STAFFING] Warning: Regular shift not available, cannot enforce minimum UNICEF staffing") # 8) *** HIERARCHY DEPENDENCY CONSTRAINTS *** # For subkits with prepack dependencies: dependencies should be produced before or same time print("\n[HIERARCHY] Adding dependency constraints...") dependency_constraints_added = 0 for p in sorted_product_list: dependencies = KIT_DEPENDENCIES.get(p, []) if dependencies: # Get the level of the current product p_level = KIT_LEVELS.get(p, 2) for dep in dependencies: if dep in sorted_product_list: # Only if dependency is also in production list # Calculate "completion time" for each product (sum of all production times) p_completion = solver.Sum( t * Hours[p, ell, s, t] for ell in line_tuples for s in active_shift_list for t in date_span_list ) dep_completion = solver.Sum( t * Hours[dep, ell, s, t] for ell in line_tuples for s in active_shift_list for t in date_span_list ) # Dependency should complete before or at the same time solver.Add(dep_completion <= p_completion) dependency_constraints_added += 1 print(f" Added constraint: {dep} (dependency) <= {p} (level {p_level})") print(f"[HIERARCHY] Added {dependency_constraints_added} dependency constraints") # --- Solve --- status = solver.Solve() if status != pywraplp.Solver.OPTIMAL: status_names = {pywraplp.Solver.INFEASIBLE: "INFEASIBLE", pywraplp.Solver.UNBOUNDED: "UNBOUNDED"} print(f"No optimal solution. Status: {status} ({status_names.get(status, 'UNKNOWN')})") # Debug hint: # solver.EnableOutput() # solver.ExportModelAsLpFile("model.lp") return None # --- Report --- result = {} result['objective'] = solver.Objective().Value() # Weekly production prod_week = {p: sum(Units[p, ell, s, t].solution_value() for ell in line_tuples for s in active_shift_list for t in date_span_list) for p in sorted_product_list} result['weekly_production'] = prod_week # Which product ran on which line/shift/day schedule = [] for t in date_span_list: for ell in line_tuples: for s in active_shift_list: chosen = [p for p in sorted_product_list if Assignment[p, ell, s, t].solution_value() > 0.5] if chosen: p = chosen[0] schedule.append({ 'day': t, 'line_type_id': ell[0], 'line_idx': ell[1], 'shift': s, 'product': p, 'run_hours': Hours[p, ell, s, t].solution_value(), 'units': Units[p, ell, s, t].solution_value(), }) result['run_schedule'] = schedule # Implied headcount by type/shift/day (ceil) headcount = [] for e in employee_type_list: for s in active_shift_list: for t in date_span_list: used_ph = sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t].solution_value() for p in sorted_product_list for ell in line_tuples) need = ceil(used_ph / (Hmax_s[s] + 1e-9)) headcount.append({'emp_type': e, 'shift': s, 'day': t, 'needed': need, 'available': max_employee_type_day[e][t]}) result['headcount_per_shift'] = headcount # Total person-hours by type/day (≤ 14h * headcount) ph_by_day = [] for e in employee_type_list: for t in date_span_list: used = sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t].solution_value() for s in active_shift_list for p in sorted_product_list for ell in line_tuples) ph_by_day.append({'emp_type': e, 'day': t, 'used_person_hours': used, 'cap_person_hours': Hmax_daily * max_employee_type_day[e][t]}) result['person_hours_by_day'] = ph_by_day # Actual employee count per type/shift/day (from EMPLOYEE_COUNT variable) employee_count_by_shift = [] for e in employee_type_list: for s in active_shift_list: for t in date_span_list: count = int(EMPLOYEE_COUNT[e, s, t].solution_value()) used_hours = sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t].solution_value() for p in sorted_product_list for ell in line_tuples) avg_hours_per_employee = used_hours / count if count > 0 else 0 if count > 0: # Only add entries where employees are working employee_count_by_shift.append({ 'emp_type': e, 'shift': s, 'day': t, 'employee_count': count, 'total_person_hours': used_hours, 'avg_hours_per_employee': avg_hours_per_employee, 'available': max_employee_type_day[e][t] }) result['employee_count_by_shift'] = employee_count_by_shift # Also calculate daily totals (summing across shifts) employee_count_by_day = [] for e in employee_type_list: for t in date_span_list: # Sum employees across all shifts for this day total_count = sum(int(EMPLOYEE_COUNT[e, s, t].solution_value()) for s in active_shift_list) used_hours = sum(TEAM_REQ_PER_PRODUCT[e][p] * Hours[p, ell, s, t].solution_value() for s in active_shift_list for p in sorted_product_list for ell in line_tuples) avg_hours_per_employee = used_hours / total_count if total_count > 0 else 0 if total_count > 0: # Only add days where employees are working employee_count_by_day.append({ 'emp_type': e, 'day': t, 'employee_count': total_count, 'total_person_hours': used_hours, 'avg_hours_per_employee': avg_hours_per_employee, 'available': max_employee_type_day[e][t] }) result['employee_count_by_day'] = employee_count_by_day # Note: Idle employee tracking removed - only counting employees actually working # Pretty print print("Objective (min cost):", result['objective']) print("\n--- Weekly production by product ---") for p, u in prod_week.items(): print(f"{p}: {u:.1f} / demand {DEMAND_DICTIONARY.get(p,0)}") print("\n--- Schedule (line, shift, day) ---") for row in schedule: shift_name = ShiftType.get_name(row['shift']) line_name = LineType.get_name(row['line_type_id']) print(f"date_span_list{row['day']} {line_name}-{row['line_idx']} {shift_name}: " f"{row['product']} Hours={row['run_hours']:.2f}h Units={row['units']:.1f}") print("\n--- Implied headcount need (per type/shift/day) ---") for row in headcount: shift_name = ShiftType.get_name(row['shift']) print(f"{row['emp_type']}, {shift_name}, date_span_list{row['day']}: " f"need={row['needed']} (avail {row['available']})") print("\n--- Total person-hours by type/day ---") for row in ph_by_day: print(f"{row['emp_type']}, date_span_list{row['day']}: used={row['used_person_hours']:.1f} " f"(cap {row['cap_person_hours']})") print("\n--- Actual employee count by type/shift/day ---") for row in employee_count_by_shift: shift_name = ShiftType.get_name(row['shift']) print(f"{row['emp_type']}, {shift_name}, date_span_list{row['day']}: " f"count={row['employee_count']} employees, " f"total_hours={row['total_person_hours']:.1f}h, " f"avg={row['avg_hours_per_employee']:.1f}h/employee") print("\n--- Daily employee totals by type/day (sum across shifts) ---") for row in employee_count_by_day: print(f"{row['emp_type']}, date_span_list{row['day']}: " f"count={row['employee_count']} employees total, " f"total_hours={row['total_person_hours']:.1f}h, " f"avg={row['avg_hours_per_employee']:.1f}h/employee " f"(available: {row['available']})") # Note: Idle employee reporting removed - only tracking employees actually working return result if __name__ == "__main__": optimizer = Optimizer() optimizer.run_optimization()