# -*- coding: utf-8 -*-
import streamlit as st
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import numpy as np
import os
import datetime
import warnings
import traceback
import tempfile
import shutil
import math
import re
from fpdf import FPDF
import plotly.io as pio
from io import BytesIO # 導入 BytesIO
# --- 配置 ---
PDF_FONT_PATH = 'NotoSansTC_VariableFont_wght.ttf'
PDF_FONT_NAME = 'NotoSansTC'
APP_TITLE = "財務報表分析"
INCOME_STATEMENT_SHEET = '損益表'
BALANCE_SHEET_SHEET = '資產負債表'
ACCOUNT_NAMES = {
'revenue': ["營業收入", "銷貨收入", "營收"],
'cogs': ["營業成本", "銷貨成本"],
'op_income': ["營業利益", "營業利潤"],
'net_income': ["本期淨利(淨損)", "稅後淨利", "淨利(或虧損)", "淨利", "本期綜合損益總額"],
'gross_profit': ["營業毛利", "毛利"],
'income_before_tax': ["稅前淨利", "所得稅前淨利"],
'tax': ["所得稅費用", "所得稅"],
'total_assets': ["資產總計", "資產總額"],
'current_assets': ["流動資產合計", "流動資產"],
'inventory': ["存貨"],
'total_liabilities': ["負債總計", "負債總額"],
'current_liabilities': ["流動負債合計", "流動負債"],
'total_equity': ["權益總計", "股東權益總額", "權益"]
}
CHART_TITLES = [
"營收與成本趨勢 (含變化%)", "利潤率趨勢", "費用佔銷貨成本比率趨勢",
"資產負債結構 (最新)", "償債能力比率趨勢", "ROA 與 ROE 趨勢"
]
# --- 輔助函數 ---
def get_financial_data(df, key_or_names, period_col, log_list):
item_names = ACCOUNT_NAMES.get(key_or_names, []) if isinstance(key_or_names, str) else key_or_names
if not item_names:
if isinstance(key_or_names, str): log_list.append(f"[ERROR] 未知的會計科目鍵值: {key_or_names}")
return None
for item_name in item_names:
item_name_stripped = item_name.strip()
if df is not None and item_name_stripped in df.index:
try:
value = df.loc[item_name_stripped, period_col]
if pd.isna(value): return None
if isinstance(value, str):
value = value.replace(',', '').replace('(', '-').replace(')', '').strip()
if value == "" or value == "-": return None
if isinstance(value, str) and '%' in value:
numeric_value = pd.to_numeric(value.replace('%',''), errors='coerce')
return None if pd.isna(numeric_value) else numeric_value
else:
numeric_value = pd.to_numeric(value, errors='coerce')
return None if pd.isna(numeric_value) else numeric_value
except KeyError: continue
except Exception as e: log_list.append(f"[WARN] 獲取項目 '{item_name_stripped}' 數據時出錯: {e}"); continue
return None
def calculate_growth_rate(current, previous):
if current is None or previous is None or math.isnan(current) or math.isnan(previous): return None
if previous == 0: return float('inf') if current > 0 else float('-inf') if current < 0 else 0.0
if abs(previous) < 1e-9: return float('inf') if current > 0 else float('-inf') if current < 0 else 0.0
return ((current - previous) / abs(previous)) * 100
def format_currency(value, default="N/A"):
if value is None or math.isnan(value) or np.isinf(value): return default
try: return f"{value:,.0f}"
except (TypeError, ValueError): return str(value)
def format_percent(value, decimals=1, default="N/A", add_sign=False, use_parentheses=False):
if value is None or math.isnan(value): return default
if np.isinf(value): return "∞" if value > 0 else "-∞"
try:
sign = ""
if add_sign and not math.isnan(value) and value > 0: sign = "+"
formatted_value = f"{sign}{value:.{decimals}f}%" if add_sign else f"{value:.{decimals}f}%"
return f"({formatted_value})" if use_parentheses else formatted_value
except (TypeError, ValueError): return str(value)
def format_ratio(value, decimals=2, default="N/A"):
if value is None or math.isnan(value): return default
if np.isinf(value): return "∞" if value > 0 else "-∞"
try: return f"{value:.{decimals}f}"
except (TypeError, ValueError): return str(value)
# --- PDF 生成函數 (保持不變) ---
def create_pdf_with_charts(summary_md, plotly_figs, chart_titles, font_path, font_name):
pdf = FPDF()
pdf.add_page()
font_added_success = False
font_family_to_use = "Arial"
pdf_temp_dir = None
if not font_path or not os.path.exists(font_path):
st.error(f"❌ PDF 錯誤:找不到或未設定有效的字型檔案路徑 '{font_path}'。")
st.session_state['logs'].append(f"[ERROR] PDF: 找不到或未設定字型檔案: {font_path}")
return None
try:
pdf.add_font(font_name, '', font_path, uni=True)
font_family_to_use = font_name
pdf.set_font(font_family_to_use, '', 12)
font_added_success = True
st.session_state['logs'].append(f"[INFO] PDF: 成功添加字型 '{font_name}'。")
except Exception as e:
st.error(f"❌ PDF 錯誤:添加字型 '{font_name}' 從路徑 '{font_path}' 失敗。錯誤:{e}")
st.session_state['logs'].append(f"[ERROR] PDF: 添加字型失敗: {e}")
pdf.set_font(font_family_to_use, '', 12)
st.session_state['logs'].append("[WARN] PDF: 回退到 Arial 字型,中文將無法顯示。")
try:
pdf.set_font(font_family_to_use, '', 18); pdf.cell(0, 10, APP_TITLE, 0, 1, 'C'); pdf.ln(5)
pdf.set_font(font_family_to_use, '', 14); pdf.cell(0, 8, "財務摘要", 0, 1, 'L'); pdf.ln(2)
pdf.set_font(font_family_to_use, '', 10)
summary_lines = []
for line in summary_md.split('\n'):
if line.strip().startswith('## '): continue
if line.strip().startswith('|---'): continue
if line.strip().startswith('|'):
cells = [cell.strip() for cell in line.strip().strip('|').split('|')]
summary_lines.append(f"{cells[0]:<15}" + " ".join([f"{c:>12}" for c in cells[1:]]))
else: summary_lines.append(line)
summary_plain = "\n".join(summary_lines).replace("**", "")
pdf.multi_cell(0, 5, summary_plain); st.session_state['logs'].append("[INFO] PDF: 已寫入摘要。"); pdf.ln(5)
st.session_state['logs'].append("[INFO] PDF: 開始嵌入圖表...")
pdf.set_font(font_family_to_use, '', 14); pdf.cell(0, 10, "圖表分析", 0, 1, 'L'); pdf.ln(2)
pdf_temp_dir = tempfile.mkdtemp(prefix="pdf_charts_"); st.session_state['logs'].append(f"[INFO] PDF: 創建圖表圖片暫存目錄: {pdf_temp_dir}")
image_added_count = 0
num_figs_to_process = len(plotly_figs)
for i in range(num_figs_to_process):
fig = plotly_figs[i]; chart_title = chart_titles[i] if i < len(chart_titles) else f"圖表 {i+1}"
if fig is not None:
try:
page_width = pdf.w - 2 * pdf.l_margin; estimated_img_height = page_width * 0.6
if pdf.get_y() + estimated_img_height + 15 > pdf.h - pdf.b_margin:
pdf.add_page(); pdf.set_font(font_family_to_use, '', 14); pdf.cell(0, 10, "圖表分析 (續)", 0, 1, 'L'); pdf.ln(2)
pdf.set_font(font_family_to_use, '', 12); pdf.cell(0, 8, f"- {chart_title}", 0, 1, 'L')
img_filename = f"chart_{i+1}.png"; img_path = os.path.join(pdf_temp_dir, img_filename)
export_start = datetime.datetime.now()
pio.write_image(fig, img_path, format='png', scale=2, width=800)
export_end = datetime.datetime.now(); st.session_state['logs'].append(f"[INFO] PDF: 圖表 {i+1} 匯出為圖片完成 (耗時: {export_end - export_start}).")
pdf.image(img_path, x=pdf.l_margin, w=page_width, h=0); pdf.ln(5); image_added_count += 1
except ImportError:
st.error("❌ PDF 錯誤:需要安裝 `kaleido`。"); st.session_state['logs'].append("[ERROR] PDF: 缺少 kaleido 庫。");
if os.path.exists(pdf_temp_dir): shutil.rmtree(pdf_temp_dir)
return None
except Exception as img_err:
st.warning(f"⚠️ PDF 警告:嵌入圖表 '{chart_title}' 時失敗:{img_err}"); st.session_state['logs'].append(f"[WARN] PDF: 嵌入圖表 {i+1} 失敗: {img_err}"); pdf.cell(0, 5, f"(無法嵌入圖表: {chart_title})", 0, 1); pdf.ln(5)
if image_added_count == 0: pdf.set_font(font_family_to_use, '', 11); pdf.cell(0, 5, "(未能成功嵌入任何圖表)", 0, 1)
if os.path.exists(pdf_temp_dir):
try: shutil.rmtree(pdf_temp_dir); st.session_state['logs'].append(f"[INFO] PDF: 已清理圖表圖片暫存目錄。")
except Exception as clean_err: st.session_state['logs'].append(f"[WARN] PDF: 清理圖表暫存目錄失敗: {clean_err}")
pdf_bytes = pdf.output(dest='S'); # 直接返回 bytes
st.session_state['logs'].append("[INFO] PDF: PDF 內容已生成 (bytes)。");
return pdf_bytes
except Exception as e:
st.error(f"❌ PDF 錯誤:生成 PDF 內容時失敗:{e}"); st.session_state['logs'].append(f"[ERROR] PDF: 生成 PDF 內容時失敗: {e}")
if 'pdf_temp_dir' in locals() and os.path.exists(pdf_temp_dir):
try: shutil.rmtree(pdf_temp_dir)
except Exception as clean_err: st.session_state['logs'].append(f"[WARN] PDF: 清理錯誤處理中的臨時目錄失敗: {clean_err}")
return None
# --- 核心處理函數 (保持不變) ---
def generate_analysis_plotly(income_statement_file, balance_sheet_file, comparison_type, selected_expenses):
"""
接收上傳的 Excel 檔案(可選損益表和資產負債表)和比較類型,生成分析報告和 Plotly 圖表。
返回: (摘要Markdown, 日誌列表, Plotly 圖表物件列表)
"""
start_time = datetime.datetime.now()
logs = [f"[{start_time.strftime('%H:%M:%S')}] [INFO] 開始處理..."]
summary_md = f"## 財務分析摘要 ({comparison_type})\n"
plotly_figs = [None] * len(CHART_TITLES)
periods_to_analyze = []
data_points = {}
kpis = {}
df_is = None
df_bs = None
try:
if income_statement_file is None and balance_sheet_file is None:
logs.append("[ERROR] 未上傳任何檔案。")
return "請至少上傳一份報表檔案。", logs, plotly_figs
logs.append(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] [INFO] 選擇的比較類型: {comparison_type}")
if income_statement_file: logs.append(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] [INFO] 已接收損益表檔案: {income_statement_file.name}")
if balance_sheet_file: logs.append(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] [INFO] 已接收資產負債表檔案: {balance_sheet_file.name}")
if selected_expenses: logs.append(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] [INFO] 選定的費用項目: {', '.join(selected_expenses)}")
read_start = datetime.datetime.now(); logs.append(f"[{read_start.strftime('%H:%M:%S')}] [INFO] Reading Excel file(s)...")
if income_statement_file:
try:
all_sheets_df_is = pd.read_excel(income_statement_file, sheet_name=None, index_col=0, header=0)
if INCOME_STATEMENT_SHEET not in all_sheets_df_is: raise ValueError(f"'{INCOME_STATEMENT_SHEET}' sheet not found.")
df_is = all_sheets_df_is[INCOME_STATEMENT_SHEET]
df_is.index = df_is.index.astype(str).str.strip(); df_is.columns = df_is.columns.astype(str).str.strip()
logs.append(f"[INFO] Income Statement loaded.")
except Exception as e: raise ValueError(f"Error reading Income Statement: {e}")
if balance_sheet_file:
try:
all_sheets_df_bs = pd.read_excel(balance_sheet_file, sheet_name=None, index_col=0, header=0)
if BALANCE_SHEET_SHEET not in all_sheets_df_bs: raise ValueError(f"'{BALANCE_SHEET_SHEET}' sheet not found.")
df_bs = all_sheets_df_bs[BALANCE_SHEET_SHEET]
df_bs.index = df_bs.index.astype(str).str.strip(); df_bs.columns = df_bs.columns.astype(str).str.strip()
logs.append(f"[INFO] Balance Sheet loaded.")
except Exception as e: raise ValueError(f"Error reading Balance Sheet: {e}")
read_end = datetime.datetime.now(); logs.append(f"[{read_end.strftime('%H:%M:%S')}] [INFO] Excel reading finished (took: {read_end - read_start}).")
source_df_for_periods = df_is if df_is is not None else df_bs
if source_df_for_periods is None: raise ValueError("No valid DataFrame available.")
all_periods = source_df_for_periods.columns.tolist()
logs.append(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] [INFO] Available periods: {', '.join(all_periods)}")
if comparison_type == "季度比較 (同年度)":
quarter_periods = [p for p in reversed(all_periods) if re.search(r'Q\d|\d季度', p, re.IGNORECASE)]
if len(quarter_periods) < 4: raise ValueError(f"Error: Quarterly comparison requires at least 4 quarter columns, found {len(quarter_periods)}.")
periods_to_analyze = sorted(quarter_periods[:4]); num_periods = 4
logs.append(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] [INFO] Selected periods for quarterly comparison: {', '.join(periods_to_analyze)}")
extract_start = datetime.datetime.now(); logs.append(f"[{extract_start.strftime('%H:%M:%S')}] [INFO] Extracting quarterly data...")
for key in ACCOUNT_NAMES.keys():
df = df_bs if key in ['total_assets', 'current_assets', 'inventory', 'total_liabilities', 'current_liabilities', 'total_equity'] else df_is
data_points[key] = [get_financial_data(df, key, p, logs) for p in periods_to_analyze] if df is not None else [None] * num_periods
elif comparison_type == "年度比較 (兩年度)":
years_found = set(); year_pattern = re.compile(r'(\d{4})')
for p in all_periods:
match = year_pattern.search(p);
if match: years_found.add(int(match.group(1)))
if len(years_found) < 2: raise ValueError(f"Error: Annual comparison requires data from at least two years, found {len(years_found)}.")
sorted_years = sorted(list(years_found), reverse=True)
current_year = sorted_years[0]; previous_year = sorted_years[1]
periods_to_analyze = [f"{previous_year} FY", f"{current_year} FY"]; num_periods = 2
logs.append(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] [INFO] Selected years for annual comparison: {previous_year} vs {current_year}")
extract_start = datetime.datetime.now(); logs.append(f"[{extract_start.strftime('%H:%M:%S')}] [INFO] Aggregating annual data...")
for year in [previous_year, current_year]:
year_quarter_cols = [p for p in all_periods if str(year) in p and re.search(r'Q\d|\d季度', p, re.IGNORECASE)]
year_fy_col = next((p for p in all_periods if str(year) in p and 'FY' in p.upper()), None)
year_col_simple = str(year) if str(year) in all_periods else None
if year_quarter_cols: year_cols_to_use = year_quarter_cols; logs.append(f"[INFO] Year {year}: Aggregating from quarterly data.")
elif year_fy_col: year_cols_to_use = [year_fy_col]; logs.append(f"[INFO] Year {year}: Using FY data.")
elif year_col_simple: year_cols_to_use = [year_col_simple]; logs.append(f"[WARN] Year {year}: Using simple year column '{year_col_simple}'.")
else: raise ValueError(f"Error: Cannot find any data columns for year {year}.")
period_index = 1 if year == current_year else 0
for key in ACCOUNT_NAMES.keys():
is_bs = key in ['total_assets', 'current_assets', 'inventory', 'total_liabilities', 'current_liabilities', 'total_equity']
df = df_bs if is_bs else df_is
if key not in data_points: data_points[key] = [None] * num_periods
if df is None: logs.append(f"[WARN] Skipping {key} for year {year} as relevant sheet is missing."); continue
if is_bs: data_points[key][period_index] = get_financial_data(df, key, year_cols_to_use[-1], logs)
else:
yearly_sum = 0; valid_sum = False
for col in year_cols_to_use:
val = get_financial_data(df, key, col, logs)
if val is not None and not math.isnan(val): yearly_sum += val; valid_sum = True
data_points[key][period_index] = yearly_sum if valid_sum else None
else: raise ValueError("Invalid comparison type.")
for i in range(num_periods):
if data_points.get('total_equity') and data_points['total_equity'][i] is None and data_points.get('total_assets') and data_points['total_assets'][i] is not None and data_points.get('total_liabilities') and data_points['total_liabilities'][i] is not None:
data_points['total_equity'][i] = data_points['total_assets'][i] - data_points['total_liabilities'][i]
logs.append(f"[INFO] Note: Equity for period {periods_to_analyze[i]} calculated.")
logs.append(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] [INFO] Calculating KPIs...")
kpis = {}
for kpi_name in ['gross_margin', 'op_margin', 'net_margin', 'roa', 'roe', 'current_ratio', 'quick_ratio', 'debt_to_equity', 'cogs_ratio']: kpis[kpi_name] = [None] * num_periods
for expense_name in selected_expenses: kpis[f'expense_{expense_name}_cogs_ratio'] = [None] * num_periods
for i in range(num_periods):
rev = data_points.get('revenue', [None]*num_periods)[i]; cogs = data_points.get('cogs', [None]*num_periods)[i]; op_income = data_points.get('op_income', [None]*num_periods)[i]
net_income = data_points.get('net_income', [None]*num_periods)[i]; assets = data_points.get('total_assets', [None]*num_periods)[i]
curr_assets = data_points.get('current_assets', [None]*num_periods)[i]; inventory = data_points.get('inventory', [None]*num_periods)[i]
liabilities = data_points.get('total_liabilities', [None]*num_periods)[i]; curr_liabilities = data_points.get('current_liabilities', [None]*num_periods)[i]
equity = data_points.get('total_equity', [None]*num_periods)[i]
avg_assets = assets; avg_equity = equity
if i > 0:
prev_assets = data_points.get('total_assets', [None]*num_periods)[i-1]; prev_equity = data_points.get('total_equity', [None]*num_periods)[i-1]
if assets is not None and prev_assets is not None: avg_assets = (assets + prev_assets) / 2
if equity is not None and prev_equity is not None: avg_equity = (equity + prev_equity) / 2
gross_profit = (rev - cogs) if rev is not None and cogs is not None else None
kpis['gross_margin'][i] = (gross_profit / rev * 100) if gross_profit is not None and rev is not None and rev != 0 else None
kpis['op_margin'][i] = (op_income / rev * 100) if op_income is not None and rev is not None and rev != 0 else None
kpis['net_margin'][i] = (net_income / rev * 100) if net_income is not None and rev is not None and rev != 0 else None
kpis['roa'][i] = (net_income / avg_assets * 100) if net_income is not None and avg_assets is not None and avg_assets != 0 else None
kpis['roe'][i] = (net_income / avg_equity * 100) if net_income is not None and avg_equity is not None and avg_equity != 0 else None
kpis['current_ratio'][i] = (curr_assets / curr_liabilities) if curr_assets is not None and curr_liabilities is not None and curr_liabilities != 0 else None
quick_assets = (curr_assets - inventory) if curr_assets is not None and inventory is not None else None
kpis['quick_ratio'][i] = (quick_assets / curr_liabilities) if quick_assets is not None and curr_liabilities is not None and curr_liabilities != 0 else None
kpis['debt_to_equity'][i] = (liabilities / equity) if liabilities is not None and equity is not None and equity != 0 else None
kpis['cogs_ratio'][i] = (cogs / rev * 100) if cogs is not None and rev is not None and rev != 0 else None
if df_is is not None and cogs is not None and cogs != 0:
for expense_name in selected_expenses:
expense_val = get_financial_data(df_is, [expense_name], periods_to_analyze[i], logs)
if expense_val is not None: kpis[f'expense_{expense_name}_cogs_ratio'][i] = (expense_val / cogs * 100)
extract_end = datetime.datetime.now(); logs.append(f"[{extract_end.strftime('%H:%M:%S')}] [INFO] KPI calculation finished (took: {extract_end - extract_start}).")
summary_start = datetime.datetime.now()
summary_md += f"**比較期間:** {', '.join(periods_to_analyze)}\n\n**主要項目趨勢:**\n"
header = "| 指標 |" + "|".join([f" {p} " for p in periods_to_analyze]) + "|"
separator = "|-------------------|" + "|".join(["-----------"] * num_periods) + "|"
summary_md += header + "\n" + separator + "\n"
for key, display_name in [('revenue','營業收入'), ('cogs','營業成本'), ('op_income','營業利益'), ('net_income','淨利')]:
values = data_points.get(key, [None]*num_periods); summary_md += f"| {display_name:<15} |" + "|".join([f" {format_currency(v)} " for v in values]) + "|\n"
for key, display_name in [('gross_margin','毛利率 (%)'), ('op_margin','營業利益率 (%)'), ('net_margin','淨利率 (%)'), ('cogs_ratio','營業成本率 (%)'), ('roa','ROA (%)'), ('roe','ROE (%)')]:
values = kpis.get(key, [None]*num_periods); summary_md += f"| {display_name:<15} |" + "|".join([f" {format_percent(v, add_sign=False)} " for v in values]) + "|\n"
for key, display_name in [('current_ratio','流動比率'), ('quick_ratio','速動比率'), ('debt_to_equity','負債權益比')]:
values = kpis.get(key, [None]*num_periods); summary_md += f"| {display_name:<15} |" + "|".join([f" {format_ratio(v)} " for v in values]) + "|\n"
if selected_expenses:
summary_md += "\n**選定費用佔銷貨成本比率 (%):**\n"; header_exp = "| 費用項目 |" + "|".join([f" {p} " for p in periods_to_analyze]) + "|"; separator_exp= "|-------------------|" + "|".join(["-----------"] * num_periods) + "|"; summary_md += header_exp + "\n" + separator_exp + "\n"
for expense_name in selected_expenses:
key = f'expense_{expense_name}_cogs_ratio'; values = kpis.get(key, [None]*num_periods); summary_md += f"| {expense_name:<15} |" + "|".join([f" {format_percent(v, add_sign=False)} " for v in values]) + "|\n"
summary_end = datetime.datetime.now(); logs.append(f"[{summary_end.strftime('%H:%M:%S')}] [INFO] 摘要生成完成 (耗時: {summary_end - summary_start}).")
plot_start = datetime.datetime.now(); logs.append(f"[{plot_start.strftime('%H:%M:%S')}] [INFO] 開始生成視覺化圖表 (Plotly)...")
plotly_plot_funcs = [
plot_revenue_cogs_trend_plotly, plot_profit_margins_trend_plotly,
plot_expense_to_cogs_ratio_plotly, plot_asset_liability_structure_plotly,
plot_solvency_ratios_trend_plotly, plot_roa_roe_trend_plotly
]
current_chart_titles = CHART_TITLES
plotly_figs = [None] * len(current_chart_titles)
for i, plot_func in enumerate(plotly_plot_funcs):
chart_plot_start = datetime.datetime.now(); chart_name_base = current_chart_titles[i]
logs.append(f"[{chart_plot_start.strftime('%H:%M:%S')}] [INFO] - 開始繪製圖表 {i+1} ({chart_name_base})...")
try:
fig = None # Reset fig for each chart
if chart_name_base.startswith('資產負債結構'):
if df_bs is not None and data_points.get('total_assets') and any(v is not None for v in data_points['total_assets']): fig = plot_func(periods_to_analyze[-1], data_points, kpis, logs)
else: logs.append(f"[WARN] - 跳過圖表 {i+1} ({chart_name_base}),缺少資產負債表數據。")
elif chart_name_base.startswith('費用佔銷貨成本'):
if df_is is not None and selected_expenses and data_points.get('cogs') and any(v is not None for v in data_points['cogs']): fig = plot_func(periods_to_analyze, data_points, kpis, selected_expenses, logs)
else: logs.append(f"[WARN] - 跳過圖表 {i+1} ({chart_name_base}),缺少損益表/銷貨成本數據或未選擇費用。")
elif chart_name_base.startswith('償債能力') or chart_name_base.startswith('ROA'):
if df_bs is not None and data_points.get('total_assets') and any(v is not None for v in data_points['total_assets']): fig = plot_func(periods_to_analyze, data_points, kpis, logs)
else: logs.append(f"[WARN] - 跳過圖表 {i+1} ({chart_name_base}),缺少資產負債表數據。")
else: # Other charts depend on Income Statement
if df_is is not None: fig = plot_func(periods_to_analyze, data_points, kpis, logs)
else: logs.append(f"[WARN] - 跳過圖表 {i+1} ({chart_name_base}),缺少損益表數據。")
if fig:
plotly_figs[i] = fig; chart_plot_end = datetime.datetime.now()
logs.append(f"[{chart_plot_end.strftime('%H:%M:%S')}] [INFO] - 圖表 {i+1} ({chart_name_base}) Plotly 物件生成完成 (耗時: {chart_plot_end - chart_plot_start}).")
except Exception as e: logs.append(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] [ERROR] - 生成圖表 {i+1} ({chart_name_base}) 時發生錯誤: {traceback.format_exc()}")
plot_end = datetime.datetime.now(); logs.append(f"[{plot_end.strftime('%H:%M:%S')}] [INFO] 圖表生成完成 (總耗時: {plot_end - plot_start}).")
end_time = datetime.datetime.now(); logs.append(f"[{end_time.strftime('%H:%M:%S')}] [INFO] 全部分析處理完成 (總耗時: {end_time - start_time}).")
return summary_md, logs, plotly_figs
except Exception as e:
end_time = datetime.datetime.now()
error_msg = f"[ERROR] 處理過程中發生錯誤: {e}\n{traceback.format_exc()}"
logs.append(error_msg)
logs.append(f"[{end_time.strftime('%H:%M:%S')}] [INFO] 處理因錯誤終止 (總耗時: {end_time - start_time}).")
return f"處理失敗:{e}", logs, [None] * len(CHART_TITLES)
finally:
pass
# --- 核心處理函數結束 ---
# --- Plotly 圖表繪製函數 (保持不變) ---
def plot_revenue_cogs_trend_plotly(periods, data_points, kpis, logs):
rev_data = data_points.get('revenue', []); cogs_data = data_points.get('cogs', []); num_periods = len(periods)
has_rev = any(v is not None and not math.isnan(v) for v in rev_data); has_cogs = any(v is not None and not math.isnan(v) for v in cogs_data)
if not has_rev and not has_cogs: logs.append("[DEBUG] plot_revenue_cogs_trend_plotly: 無有效數據。"); return None
rev_growth_text = [""] * num_periods; cogs_growth_text = [""] * num_periods
for i in range(1, num_periods):
rev_growth = calculate_growth_rate(rev_data[i], rev_data[i-1]); cogs_growth = calculate_growth_rate(cogs_data[i], cogs_data[i-1])
rev_growth_text[i] = format_percent(rev_growth, default="", add_sign=True, use_parentheses=True)
cogs_growth_text[i] = format_percent(cogs_growth, default="", add_sign=True, use_parentheses=True)
rev_bar_texts = [f"{format_currency(rev_data[i])}
{rev_growth_text[i]}" if rev_data[i] is not None else "" for i in range(num_periods)]
cogs_bar_texts = [f"{format_currency(cogs_data[i])}
{cogs_growth_text[i]}" if cogs_data[i] is not None else "" for i in range(num_periods)]
fig = go.Figure()
if has_rev: fig.add_trace(go.Bar(x=periods, y=rev_data, name='營業收入', marker_color='rgb(55, 83, 109)', text=rev_bar_texts, textposition='outside', textfont_size=9))
if has_cogs: fig.add_trace(go.Bar(x=periods, y=cogs_data, name='營業成本', marker_color='rgb(26, 118, 255)', text=cogs_bar_texts, textposition='outside', textfont_size=9))
fig.update_layout(title='營業收入與營業成本趨勢 (含變化%)', xaxis_title='期間', yaxis_title='金額', barmode='group', legend_title_text='指標', template='plotly_white', hovermode="x unified", uniformtext_minsize=8, uniformtext_mode='hide')
return fig
def plot_profit_margins_trend_plotly(periods, data_points, kpis, logs):
margins = {'毛利率': kpis.get('gross_margin', []), '營業利益率': kpis.get('op_margin', []), '淨利率': kpis.get('net_margin', [])}
valid_margins = {k: v for k, v in margins.items() if any(val is not None and not math.isnan(val) for val in v)}
if not valid_margins: logs.append("[DEBUG] plot_profit_margins_trend_plotly: 無有效數據。"); return None
fig = go.Figure()
for label, values in valid_margins.items():
fig.add_trace(go.Scatter(x=periods, y=values, mode='lines+markers+text', name=label, text=[format_percent(v, add_sign=False) for v in values], textposition="top center", textfont=dict(size=10)))
fig.update_layout(title='利潤率趨勢比較', xaxis_title='期間', yaxis_title='利潤率 (%)', yaxis_ticksuffix='%', legend_title_text='指標', template='plotly_white', hovermode="x unified")
return fig
def plot_expense_to_cogs_ratio_plotly(periods, data_points, kpis, selected_expenses, logs):
if not selected_expenses: logs.append("[INFO] plot_expense_to_cogs_ratio_plotly: 未選擇任何費用項目。"); return None
fig = go.Figure(); has_data = False
for expense_name in selected_expenses:
kpi_key = f'expense_{expense_name}_cogs_ratio'; ratio_data = kpis.get(kpi_key, [])
if any(v is not None and not math.isnan(v) for v in ratio_data):
has_data = True
fig.add_trace(go.Scatter(x=periods, y=ratio_data, mode='lines+markers+text', name=expense_name, text=[format_percent(v, add_sign=False) for v in ratio_data], textposition="top center", textfont=dict(size=10)))
if not has_data: logs.append("[DEBUG] plot_expense_to_cogs_ratio_plotly: 選定的費用項目均無有效數據。"); return None
fig.update_layout(title='選定費用佔營業成本比率趨勢', xaxis_title='期間', yaxis_title='佔營業成本比率 (%)', yaxis_ticksuffix='%', legend_title_text='費用項目', template='plotly_white', hovermode="x unified")
return fig
def plot_asset_liability_structure_plotly(period, data_points, kpis, logs):
try:
period_idx = -1; assets_curr = data_points.get('total_assets', [None]*len(kpis.get('roa',[])))[period_idx]; liabilities_curr = data_points.get('total_liabilities', [None]*len(kpis.get('roa',[])))[period_idx]; equity_curr = data_points.get('total_equity', [None]*len(kpis.get('roa',[])))[period_idx]; curr_assets_curr = data_points.get('current_assets', [None]*len(kpis.get('roa',[])))[period_idx]; curr_liab_curr = data_points.get('current_liabilities', [None]*len(kpis.get('roa',[])))[period_idx]
required_data = [assets_curr, liabilities_curr, equity_curr, curr_assets_curr, curr_liab_curr]
if not all(v is not None and not math.isnan(v) for v in required_data): logs.append(f"[WARN] Plotly 無法繪製資產負債結構圖 ({period}),數據不完整。"); return None
non_curr_assets = assets_curr - curr_assets_curr if assets_curr is not None and curr_assets_curr is not None else None
non_curr_liab = liabilities_curr - curr_liab_curr if liabilities_curr is not None and curr_liab_curr is not None else None
if non_curr_assets is None or math.isnan(non_curr_assets) or non_curr_liab is None or math.isnan(non_curr_liab): logs.append(f"[WARN] Plotly 無法繪製資產負債結構圖 ({period}),計算結果無效。"); return None
curr_assets_curr = max(0, curr_assets_curr); non_curr_assets = max(0, non_curr_assets); curr_liab_curr = max(0, curr_liab_curr); non_curr_liab = max(0, non_curr_liab); equity_curr_abs = max(0, equity_curr) if equity_curr is not None else 0
if liabilities_curr is None or equity_curr is None: logs.append(f"[WARN] Plotly 無法繪製資產負債結構圖 ({period}),數據缺失。"); return None
total_le_abs = curr_liab_curr + non_curr_liab + equity_curr_abs
labels = ["總計", "資產", "負債與權益", "流動資產", "非流動資產", "流動負債", "非流動負債", "權益"]
parents = ["", "總計", "總計", "資產", "資產", "負債與權益", "負債與權益", "負債與權益"]
values_for_plot = [0, assets_curr, total_le_abs, curr_assets_curr, non_curr_assets, curr_liab_curr, non_curr_liab, equity_curr_abs]
values_for_hover = [0, assets_curr, liabilities_curr + equity_curr if liabilities_curr is not None and equity_curr is not None else None, curr_assets_curr, non_curr_assets, curr_liab_curr, non_curr_liab, equity_curr]
valid_indices = [i for i, v in enumerate(values_for_plot) if v is not None and not math.isnan(v) and v > 1e-9]
if len(valid_indices) <= 3: logs.append(f"[WARN] Plotly 無法繪製資產負債結構圖 ({period}),有效數據過少。"); return None
filtered_labels = [labels[i] for i in valid_indices]; filtered_parents = [parents[i] for i in valid_indices]; filtered_values = [values_for_plot[i] for i in valid_indices]; filtered_hover_values = [values_for_hover[i] for i in valid_indices]
valid_parents_set = set(filtered_labels); final_parents = [p if p in valid_parents_set else "" for p in filtered_parents]
text_values = [format_currency(v) for v in filtered_hover_values]
fig = go.Figure(go.Sunburst(labels=filtered_labels, parents=final_parents, values=filtered_values, branchvalues="total", hovertext=text_values, hoverinfo="label+text+percent parent", marker=dict(colors=px.colors.qualitative.Pastel), textinfo='label+percent entry'))
fig.update_layout(title=f'資產負債結構 ({period})', margin = dict(t=50, l=25, r=25, b=25)); return fig
except Exception as e: logs.append(f"[ERROR] Plotly 繪製資產負債結構圖時出錯: {e}\n{traceback.format_exc()}"); return None
def plot_solvency_ratios_trend_plotly(periods, data_points, kpis, logs):
ratios = {'流動比率': kpis.get('current_ratio', []), '速動比率': kpis.get('quick_ratio', []), '負債權益比': kpis.get('debt_to_equity', [])}
valid_ratios = {k: v for k, v in ratios.items() if any(val is not None and not math.isnan(val) and not np.isinf(val) for val in v)}
if not valid_ratios: logs.append("[DEBUG] plot_solvency_ratios_trend_plotly: 無有效數據。"); return None
fig = go.Figure()
for label, values in valid_ratios.items():
fig.add_trace(go.Scatter(x=periods, y=values, mode='lines+markers+text', name=label, text=[format_ratio(v) for v in values], textposition="top center", textfont=dict(size=10)))
fig.add_hline(y=1.0, line_dash="dash", line_color="grey", annotation_text="參考線 (1.0)", annotation_position="bottom right")
if '流動比率' in valid_ratios: fig.add_hline(y=2.0, line_dash="dot", line_color="lightgrey", annotation_text="參考線 (2.0)", annotation_position="top right")
fig.update_layout(title='償債能力比率趨勢', xaxis_title='期間', yaxis_title='比率值', legend_title_text='指標', template='plotly_white', hovermode="x unified"); return fig
def plot_roa_roe_trend_plotly(periods, data_points, kpis, logs):
roa_data = kpis.get('roa', []); roe_data = kpis.get('roe', [])
if not any(v is not None and not math.isnan(v) for v in roa_data) and not any(v is not None and not math.isnan(v) for v in roe_data): logs.append("[DEBUG] plot_roa_roe_trend_plotly: 無有效數據。"); return None
fig = go.Figure()
fig.add_trace(go.Scatter(x=periods, y=roa_data, mode='lines+markers+text', name='資產報酬率 (ROA)', line=dict(color='rgb(31, 119, 180)'), text=[format_percent(v, add_sign=False) for v in roa_data], textposition="top center", textfont=dict(size=10)))
fig.add_trace(go.Scatter(x=periods, y=roe_data, mode='lines+markers+text', name='權益報酬率 (ROE)', line=dict(color='rgb(255, 127, 14)', dash='dash'), text=[format_percent(v, add_sign=False) for v in roe_data], textposition="bottom center", textfont=dict(size=10)))
fig.update_layout(title='資產報酬率(ROA)與權益報酬率(ROE)趨勢', xaxis_title='期間', yaxis_title='報酬率 (%)', yaxis_ticksuffix='%', legend_title_text='指標', template='plotly_white', hovermode="x unified"); return fig
# --- Plotly 圖表繪製函數結束 ---
# --- Streamlit 介面定義 ---
st.set_page_config(layout="wide", page_title=APP_TITLE)
st.title(APP_TITLE + " 📊")
st.markdown("""
**財務報表自動分析與視覺化 (Streamlit + Plotly 版)**
可分別或同時上傳損益表和資產負債表的 Excel 檔案。選擇比較類型,系統將自動計算關鍵指標並生成分析圖表。
**注意:** 季度比較需 Excel 包含 'YYYY Q1', 'YYYY Q2'... 欄位 (按時間順序);年度比較需包含 'YYYY-1 FY', 'YYYY FY' 欄位。程式會自動選取最新的 4 或 2 個欄位。
""")
# --- 初始化 Session State ---
if 'analysis_done' not in st.session_state: st.session_state['analysis_done'] = False
if 'summary_md' not in st.session_state: st.session_state['summary_md'] = ""
if 'log_output' not in st.session_state: st.session_state['log_output'] = ""
if 'plotly_figs' not in st.session_state: st.session_state['plotly_figs'] = [None] * len(CHART_TITLES)
if 'logs' not in st.session_state: st.session_state['logs'] = []
if 'txt_data_bytes' not in st.session_state: st.session_state['txt_data_bytes'] = None
if 'pdf_data_bytes' not in st.session_state: st.session_state['pdf_data_bytes'] = None
if 'expense_options' not in st.session_state: st.session_state['expense_options'] = []
# selected_expenses 不需要在 session state 初始化,因為 multiselect 會處理
# --- 輸入區域 ---
with st.sidebar:
st.header("⚙️ 設定")
uploaded_is_file = st.file_uploader("1a. 上傳損益表 Excel 檔案 (可選)", type=['xlsx', 'xls'], key="is_uploader")
uploaded_bs_file = st.file_uploader("1b. 上傳資產負債表 Excel 檔案 (可選)", type=['xlsx', 'xls'], key="bs_uploader")
comparison_type = st.radio("2. 選擇比較類型", ["季度比較 (同年度)", "年度比較 (兩年度)"], index=0, key='comparison_type_radio')
st.subheader("費用分析選項")
expense_options = []
if uploaded_is_file is not None:
# --- 修改:使用 BytesIO 避免文件指針問題 ---
try:
uploaded_is_file.seek(0) # 重置文件指針
bytes_data = uploaded_is_file.read()
df_is_peek = pd.read_excel(BytesIO(bytes_data), sheet_name=INCOME_STATEMENT_SHEET, index_col=0, header=0)
uploaded_is_file.seek(0) # 再次重置,以便後續使用
# --- 修改結束 ---
df_is_peek.index = df_is_peek.index.astype(str).str.strip()
# --- 修改:自動識別費用項目的邏輯 ---
non_expense_indicators = set()
for key, names in ACCOUNT_NAMES.items():
# 排除非費用關鍵指標 (保留 COGS,因為費用比率需要它)
if key not in ['cogs']:
non_expense_indicators.update(names)
# 添加其他常見非費用關鍵詞
non_expense_indicators.update(["營業成本", "營業費用合計", "利息費用", "營業毛利", "毛利", "營業利益", "稅前淨利", "所得稅費用", "本期淨利(淨損)", "淨利", "收益", "收入", "利益", "所得稅"])
total_keywords = ["合計", "總額", "總計"]
potential_expenses = []
for item in df_is_peek.index:
item_stripped = item.strip()
is_non_expense = False
if item_stripped in non_expense_indicators:
is_non_expense = True
if not is_non_expense:
for keyword in total_keywords:
if item_stripped.endswith(keyword):
is_non_expense = True
break
# 額外排除:如果科目名稱包含 "營業外" 或以 "收益/損失" 結尾
if not is_non_expense and ("營業外" in item_stripped or item_stripped.endswith("收益") or item_stripped.endswith("損失")):
is_non_expense = True
if not is_non_expense:
potential_expenses.append(item_stripped)
expense_options = sorted(list(set(potential_expenses))) # 去重並排序
# --- 修改結束 ---
st.session_state['expense_options'] = expense_options
except Exception as e:
st.warning(f"讀取損益表以獲取費用選項時出錯: {e}")
st.session_state['expense_options'] = []
else: st.session_state['expense_options'] = []
# --- 修改:從 session state 讀取 default 值 ---
selected_expenses = st.multiselect(
"3. 選擇要計算佔銷貨成本比例的費用項目",
options=st.session_state.get('expense_options', []),
default=st.session_state.get('selected_expenses', []), # 從 session state 讀取上次選擇
key='expense_multiselect_key', # 給一個唯一的 key
help="僅在您上傳損益表後可用。"
)
# --- 修改結束 ---
analyze_button = st.button("🚀 開始分析", type="primary", use_container_width=True)
# --- 處理分析按鈕點擊 ---
if analyze_button:
if uploaded_is_file is not None or uploaded_bs_file is not None:
st.session_state['analysis_done'] = False; st.session_state['summary_md'] = ""; st.session_state['log_output'] = ""; st.session_state['plotly_figs'] = [None] * len(CHART_TITLES); st.session_state['logs'] = []; st.session_state['txt_data_bytes'] = None; st.session_state['pdf_data_bytes'] = None
# --- 修改:將選擇的費用存入 session state ---
st.session_state['selected_expenses'] = selected_expenses
# --- 修改結束 ---
with st.spinner('正在分析中,請稍候...'):
summary_md_result, logs_result, plotly_figs_result = generate_analysis_plotly(
uploaded_is_file, uploaded_bs_file, comparison_type, st.session_state['selected_expenses'] # 從 session state 讀取
)
st.session_state['summary_md'] = summary_md_result
st.session_state['log_output'] = "\n".join(logs_result)
st.session_state['plotly_figs'] = plotly_figs_result
st.session_state['analysis_done'] = not summary_md_result.startswith("處理失敗")
if st.session_state['analysis_done']:
txt_content = f"{APP_TITLE}\n分析類型: {comparison_type}\n\n{st.session_state['summary_md']}"
st.session_state['txt_data_bytes'] = txt_content.encode('utf-8')
if PDF_FONT_PATH and os.path.exists(PDF_FONT_PATH):
st.session_state['pdf_data_bytes'] = create_pdf_with_charts(st.session_state['summary_md'], st.session_state['plotly_figs'], CHART_TITLES, PDF_FONT_PATH, PDF_FONT_NAME)
else: st.session_state['pdf_data_bytes'] = None
else: st.error("❌ 請至少上傳一份 Excel 檔案!"); st.session_state['analysis_done'] = False
# --- 輸出區域 ---
st.divider()
if st.session_state['analysis_done']:
st.header("📊 分析結果")
col1, col2 = st.columns([2, 1])
with col1:
st.subheader("財務摘要")
st.markdown(st.session_state['summary_md'])
st.divider(); st.subheader("下載報告")
if st.session_state['txt_data_bytes']:
st.download_button(label="📄 下載 TXT 摘要報告", data=st.session_state['txt_data_bytes'], file_name=f"{APP_TITLE}_摘要_{comparison_type}_{datetime.datetime.now().strftime('%Y%m%d')}.txt", mime="text/plain")
# --- 修改:在下載按鈕前確保 pdf_data_bytes 是 bytes ---
pdf_data_to_download = st.session_state.get('pdf_data_bytes', None)
if isinstance(pdf_data_to_download, bytearray):
pdf_data_to_download = bytes(pdf_data_to_download) # 強制轉換
if pdf_data_to_download is not None and isinstance(pdf_data_to_download, bytes): # 確保是 bytes 類型
st.download_button(
label="📕 下載 PDF 完整報告 (含圖表)",
data=pdf_data_to_download, # 使用轉換後的 bytes
file_name=f"{APP_TITLE}_完整報告_{comparison_type}_{datetime.datetime.now().strftime('%Y%m%d')}.pdf",
mime="application/pdf"
)
elif PDF_FONT_PATH and os.path.exists(PDF_FONT_PATH):
# 如果 pdf_data_bytes 是 None 但字型路徑有效,說明生成失敗
st.info("PDF 生成失敗,請檢查日誌。")
else:
# 如果字型路徑無效
st.warning(f"⚠️ 無法生成 PDF:未找到或未設定有效的字型檔案路徑。\n請在程式碼中設定 `PDF_FONT_PATH`。\n當前設定: `{PDF_FONT_PATH}`")
# --- 修改結束 ---
with col2:
with st.expander("📝 查看處理日誌", expanded=True): st.text_area("日誌", st.session_state['log_output'], height=400)
st.divider(); st.header("📈 圖表分析")
plotly_figs = st.session_state['plotly_figs']
if any(fig is not None for fig in plotly_figs):
row1_cols = st.columns(2); row2_cols = st.columns(2); row3_cols = st.columns(2)
chart_containers = [row1_cols[0], row1_cols[1], row2_cols[0], row2_cols[1], row3_cols[0], row3_cols[1]]
for i, fig in enumerate(plotly_figs):
if i < len(chart_containers):
if fig is not None:
with chart_containers[i]: st.plotly_chart(fig, use_container_width=True)
else:
with chart_containers[i]: st.info(f"圖表 '{CHART_TITLES[i]}' 無法生成。")
else: st.warning(f"圖表 '{CHART_TITLES[i]}' 沒有對應的顯示位置。")
else: st.warning("未能生成任何圖表,請檢查輸入數據或處理日誌。")
elif analyze_button and uploaded_file is None: pass
st.divider(); st.caption("由 EJ 設計與實作")