translate100 / translate100.py
xnx3's picture
Upload 8 files
5192e41 verified
# -*- coding: utf-8 -*-
print("translate100 start ...")
from flask import Flask, request, jsonify, send_from_directory
from flask_cors import CORS
from transformers import M2M100ForConditionalGeneration
from tokenization_small100 import SMALL100Tokenizer
import os
import torch
import json
import logging
import time
import sys # 添加sys模块导入
import dataclasses
# 手动设置版本属性
setattr(dataclasses, '__version__', '0.8')
# 获取Transformers库的logger
transformers_logger = logging.getLogger("transformers")
# 设置日志级别为ERROR(只显示错误,不显示警告)
transformers_logger.setLevel(logging.ERROR)
# 自动设置CPU线程
cpu_count = os.cpu_count() or 1
os.environ["OMP_NUM_THREADS"] = str(cpu_count)
os.environ["MKL_NUM_THREADS"] = str(cpu_count)
os.environ["TOKENIZERS_PARALLELISM"] = "true"
# 启用CPU优化
torch.set_num_threads(os.cpu_count())
torch.set_num_interop_threads(1)
app = Flask(__name__)
werkzeugLog = logging.getLogger('werkzeug')
werkzeugLog.setLevel(logging.ERROR) # 只显示错误,忽略警告和信息
app.config['JSON_AS_ASCII'] = False # 新增此行,禁用ASCII转义
CORS(app) # 允许跨域
# TRANSLATE100_QUICK=True 则是启用快速模式,比如如果支持量化,则使用量化的能力。默认不使用
quick = os.environ.get('TRANSLATE100_QUICK', 'False').lower() in ('true', '1', 'yes')
# 如果使用CPU,是否启用量化能力
#quick = False
# 环境变量未设置时,默认根据cuda_available自动选择
useGPU = os.environ.get('TRANSLATE100_USE_GPU', 'True').lower() in ('true', '1', 'yes') # 设置为False则是即使有GPU也不用,强制使用CPU
# 端口号
port = os.environ.get('TRANSLATE100_PORT', '80')
print("TRANSLATE100_USE_GPU: "+str(useGPU))
print("TRANSLATE100_QUICK: "+str(quick))
print("TRANSLATE100_PORT: "+str(port))
# GPU使用控制参数(动态检测+环境变量覆盖)
# 先检测torch.cuda是否存在且可用
cuda_available = hasattr(torch, 'cuda') and torch.cuda.is_available()
if useGPU:
if cuda_available:
useGPU = True
else:
print("TRANSLATE100_USE_GPU is true, but current cuda is not support, use CPU instead , set TRANSLATE100_USE_GPU = false")
useGPU = False
#
# 语言词典,对于 translate.js 的语言表示进行对应
# 数组中的每个元素包含:
# - id: translate.js 中的语言标识
# - name: 语言名称
# - serviceId: 当前的简写代码
#
language_dict_translatejs = [
{"id": "afrikaans", "name": "南非荷兰语", "serviceId": "af"},
{"id": "amharic", "name": "阿姆哈拉语", "serviceId": "am"},
{"id": "arabic", "name": "阿拉伯语", "serviceId": "ar"},
{"id": "asturian", "name": "阿斯图里亚斯语", "serviceId": "ast"},
{"id": "azerbaijani", "name": "阿塞拜疆语", "serviceId": "az"},
{"id": "bashkir", "name": "巴什基尔语", "serviceId": "ba"},
{"id": "belarusian", "name": "白俄罗斯语", "serviceId": "be"},
{"id": "bulgarian", "name": "保加利亚语", "serviceId": "bg"},
{"id": "bengali", "name": "孟加拉语", "serviceId": "bn"},
{"id": "breton", "name": "布列塔尼语", "serviceId": "br"},
{"id": "bosnian", "name": "波斯尼亚语", "serviceId": "bs"},
{"id": "cebuano", "name": "宿务语", "serviceId": "ceb"},
{"id": "czech", "name": "捷克语", "serviceId": "cs"},
{"id": "welsh", "name": "威尔士语", "serviceId": "cy"},
{"id": "danish", "name": "丹麦语", "serviceId": "da"},
{"id": "deutsch", "name": "德语", "serviceId": "de"},
{"id": "greek", "name": "希腊语", "serviceId": "el"},
{"id": "english", "name": "英语", "serviceId": "en"},
{"id": "spanish", "name": "西班牙语", "serviceId": "es"},
{"id": "estonian", "name": "爱沙尼亚语", "serviceId": "et"},
{"id": "persian", "name": "波斯语", "serviceId": "fa"},
{"id": "nigerian_fulfulde", "name": "富拉语", "serviceId": "ff"},
{"id": "finnish", "name": "芬兰语", "serviceId": "fi"},
{"id": "french", "name": "法语", "serviceId": "fr"},
{"id": "irish", "name": "爱尔兰语", "serviceId": "ga"},
{"id": "scottish_gaelic", "name": "苏格兰盖尔语", "serviceId": "gd"},
{"id": "galician", "name": "加利西亚语", "serviceId": "gl"},
{"id": "gujarati", "name": "古吉拉特语", "serviceId": "gu"},
{"id": "hausa", "name": "豪萨语", "serviceId": "ha"},
{"id": "hebrew", "name": "希伯来语", "serviceId": "he"},
{"id": "hindi", "name": "印地语", "serviceId": "hi"},
{"id": "croatian", "name": "克罗地亚语", "serviceId": "hr"},
{"id": "haitian_creole", "name": "海地克里奥尔语", "serviceId": "ht"},
{"id": "hungarian", "name": "匈牙利语", "serviceId": "hu"},
{"id": "armenian", "name": "亚美尼亚语", "serviceId": "hy"},
{"id": "indonesian", "name": "印尼语", "serviceId": "id"},
{"id": "igbo", "name": "伊博语", "serviceId": "ig"},
{"id": "ilocano", "name": "伊洛卡语", "serviceId": "ilo"},
{"id": "icelandic", "name": "冰岛语", "serviceId": "is"},
{"id": "italian", "name": "意大利语", "serviceId": "it"},
{"id": "japanese", "name": "日语", "serviceId": "ja"},
{"id": "javanese", "name": "爪哇语", "serviceId": "jv"},
{"id": "georgian", "name": "格鲁吉亚语", "serviceId": "ka"},
{"id": "kazakh", "name": "哈萨克语", "serviceId": "kk"},
{"id": "khmer", "name": "中部高棉语", "serviceId": "km"},
{"id": "kannada", "name": "卡纳达语", "serviceId": "kn"},
{"id": "korean", "name": "韩语", "serviceId": "ko"},
{"id": "luxembourgish", "name": "卢森堡语", "serviceId": "lb"},
#{"id": "luganda", "name": "干达语", "serviceId": "lg"},
{"id": "lingala", "name": "林加拉语", "serviceId": "ln"},
{"id": "lao", "name": "老挝语", "serviceId": "lo"},
{"id": "lithuanian", "name": "立陶宛语", "serviceId": "lt"},
{"id": "latvian", "name": "拉脱维亚语", "serviceId": "lv"},
{"id": "macedonian", "name": "马其顿语", "serviceId": "mk"},
{"id": "malayalam", "name": "马拉雅拉姆语", "serviceId": "ml"},
{"id": "mongolian", "name": "蒙古语", "serviceId": "mn"},
{"id": "marathi", "name": "马拉地语", "serviceId": "mr"},
{"id": "malay", "name": "马来语", "serviceId": "ms"},
{"id": "burmese", "name": "缅甸语", "serviceId": "my"},
{"id": "nepali", "name": "尼泊尔语", "serviceId": "ne"},
{"id": "norwegian", "name": "挪威语", "serviceId": "no"},
{"id": "occitan", "name": "奥克语(1500 年后)", "serviceId": "oc"},
{"id": "punjabi", "name": "旁遮普语", "serviceId": "pa"},
{"id": "polish", "name": "波兰语", "serviceId": "pl"},
{"id": "pashto", "name": "普什图语", "serviceId": "ps"},
{"id": "portuguese", "name": "葡萄牙语", "serviceId": "pt"},
{"id": "russian", "name": "俄语", "serviceId": "ru"},
{"id": "sindhi", "name": "信德语", "serviceId": "sd"},
{"id": "singapore", "name": "僧伽罗语", "serviceId": "si"},
{"id": "slovak", "name": "斯洛伐克语", "serviceId": "sk"},
{"id": "slovene", "name": "斯洛文尼亚语", "serviceId": "sl"},
{"id": "somali", "name": "索马里语", "serviceId": "so"},
{"id": "albanian", "name": "阿尔巴尼亚语", "serviceId": "sq"},
{"id": "serbian", "name": "塞尔维亚语", "serviceId": "sr"},
{"id": "sundanese", "name": "巽他语", "serviceId": "su"},
{"id": "swedish", "name": "瑞典语", "serviceId": "sv"},
{"id": "congo_swahili", "name": "斯瓦希里语", "serviceId": "sw"},
{"id": "tamil", "name": "泰米尔语", "serviceId": "ta"},
{"id": "thai", "name": "泰语", "serviceId": "th"},
{"id": "tagalog", "name": "他加禄语", "serviceId": "tl"},
{"id": "tswana", "name": "茨瓦纳语", "serviceId": "tn"},
{"id": "turkish", "name": "土耳其语", "serviceId": "tr"},
{"id": "ukrainian", "name": "乌克兰语", "serviceId": "uk"},
{"id": "urdu", "name": "乌尔都语", "serviceId": "ur"},
{"id": "uzbek", "name": "乌兹别克语", "serviceId": "uz"},
{"id": "vietnamese", "name": "越南语", "serviceId": "vi"},
{"id": "wolof", "name": "沃洛夫语", "serviceId": "wo"},
{"id": "afrikaans_xhosa", "name": "科萨语", "serviceId": "xh"},
{"id": "yiddish", "name": "意第绪语", "serviceId": "yi"},
{"id": "yoruba", "name": "约鲁巴语", "serviceId": "yo"},
{"id": "chinese_simplified", "name": "简体中文", "serviceId": "zh"},
{"id": "south_african_zulu", "name": "祖鲁语", "serviceId": "zu"},
{"id": "catalan", "name": "加泰罗尼亚语", "serviceId": "ca"},
{"id": "frisian", "name": "弗里西语", "serviceId": "fy"},
{"id": "malagasy", "name": "马达加斯加语", "serviceId": "mg"},
{"id": "dutch", "name": "荷兰语", "serviceId": "nl"},
{"id": "northern_sotho", "name": "北索托语", "serviceId": "ns"},
{"id": "oriya", "name": "奥里亚语", "serviceId": "or"},
{"id": "romanian", "name": "罗马尼亚语", "serviceId": "ro"},
{"id": "swati", "name": "斯威士语", "serviceId": "ss"}
]
# 将 translate.js 的语言标识转化为 m2m100 的
# 如果不存在,则返回空字符串
def translatejsToM2m(language):
# 遍历语言数组查找匹配的id
for lang_item in language_dict_translatejs:
if lang_item["id"] == language:
return lang_item["serviceId"]
return ""
local_model_path = os.getcwd()
# 确定模型路径
# 检查是否是打包后的可执行文件
if hasattr(sys, '_MEIPASS'):
# 打包后的路径
local_model_path = sys._MEIPASS
else:
# 开发环境的路径
local_model_path = os.getcwd()
# 加载模型和分词器
print("Loading model and tokenizer ..")
#model = M2M100ForConditionalGeneration.from_pretrained(local_model_path)
# 加载量化模型(移除重复的cuda检查)
model = M2M100ForConditionalGeneration.from_pretrained(
local_model_path,
torch_dtype=torch.float16 if useGPU else torch.float32 # 直接根据useGPU判断精度
)
# CPU动态量化
# 检测CPU是否支持量化所需的指令集,兼容更多环境
def is_cpu_support_avx2():
try:
import cpuinfo
info = cpuinfo.get_cpu_info()
#print(info)
# 获取flags并处理可能的列表类型
flags = info.get('flags', [])
# 处理flags可能是列表或字符串的情况
if isinstance(flags, list):
flags_str = ' '.join(flags).lower()
else:
flags_str = str(flags).lower()
# 检查AVX2或AVX指令集
has_avx2 = 'avx2' in flags_str
result = has_avx2
#print(f"CPU指令集检测: AVX2={has_avx2}, 支持量化={result}")
return result
except ImportError:
print("警告: cpuinfo库未安装,默认启用量化加速")
return True # 未安装库时默认启用,保持原行为
except Exception as e:
print("CPU检测出错: %s, 默认启用量化加速" % str(e))
return True # 其他错误时默认启用
if useGPU:
model = model.to('cuda')
# GPU量化
if quick:
print("Using GPU computing with quantization acceleration")
# 对于GPU,我们可以使用半精度浮点数作为量化方式
model = model.half() # 半精度量化,适合GPU
else:
print("Using GPU computing")
# 检查PyTorch版本,支持的话使用torch.compile
try:
# 获取PyTorch版本号
torch_version = torch.__version__.split('.')
major_version = int(torch_version[0])
minor_version = int(torch_version[1])
# 检查是否支持torch.compile (PyTorch >= 2.0)
if major_version > 2 or (major_version == 2 and minor_version >= 0):
print(f"PyTorch version {torch.__version__} support torch.compile, Compiling model ...")
# 编译模型
model = torch.compile(model, mode='max-autotune')
print("Model compilation completed, acceleration enabled")
else:
print(f"PyTorch version {torch.__version__} not support torch.compile,ignore ...")
except Exception as e:
print(f"Error checking PyTorch version or compiling model: {str(e)}")
elif quick and is_cpu_support_avx2():
# 使用的是CPU,判断是否支持量化能力
print("Using CPU computing to perform int8 quantization acceleration")
model = torch.quantization.quantize_dynamic(
model, {torch.nn.Linear}, dtype=torch.qint8
)
if torch.backends.mkldnn.enabled:
try:
# 显式测试channels_last支持性
test_tensor = torch.randn(1, 3, 224, 224)
test_tensor = test_tensor.to(memory_format=torch.channels_last)
model = model.to(memory_format=torch.channels_last)
print("Channels_last memory format optimization enabled")
except RuntimeError:
print("CPU supports AVX2 but does not support channels_last, disabled")
else:
print("Using CPU computation (quantization not enabled, old architecture does not support AVX2 instruction set)")
# 模型优化(适用于Intel CPU)
# 条件启用channels_last内存格式优化
# 修改channels_last检测条件
# if useCPUQuant and is_cpu_support_avx2() and torch.backends.mkldnn.enabled:
# try:
# # 显式测试channels_last支持性
# test_tensor = torch.randn(1, 3, 224, 224)
# test_tensor = test_tensor.to(memory_format=torch.channels_last)
# model = model.to(memory_format=torch.channels_last)
# print("已启用channels_last内存格式优化")
# except RuntimeError:
# print("CPU支持AVX2但不支持channels_last,已禁用")
# else:
# print("未启用channels_last内存格式优化")
model.eval() # 设置为评估模式
tokenizer = SMALL100Tokenizer.from_pretrained(
local_model_path,
sp_model_path=os.path.join(local_model_path, "sentencepiece.bpe.model")
)
print("Model and tokenizer loading completed!")
# 当前支持的 m2m100 格式的语种标识
supported_langs = set(tokenizer.lang_code_to_id.keys())
# 当前支持的translate.js 格式的语种标识,从数组中提取所有id
translate_support_langs = set(item["id"] for item in language_dict_translatejs)
#print(translate_support_langs)
def translate_single(text, target_lang):
"""翻译单个文本,返回(翻译结果,该文本的tokens数)"""
try:
tokenizer.tgt_lang = target_lang
# 编码文本并获取tokens数(input_ids的长度即tokens数)
encoded = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
# 添加目标语言强制设置
# 添加输入数据GPU迁移
if useGPU:
# 在translate_single函数中
encoded = {k: v.to('cuda') for k, v in encoded.items()}
# tokens_count = len(encoded["input_ids"][0]) # 取第一个维度的长度(单条文本)
# # 生成翻译结果
# generated_tokens = model.generate(**encoded, max_length=512, num_beams=3)
# translation = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0]
# return translation, tokens_count
#print(encoded)
# 批量生成 - 使用推理模式和自动混合精度加速
if useGPU:
with torch.inference_mode(), torch.amp.autocast('cuda'):
if quick:
generated_tokens = model.generate(
**encoded,
max_length=512,
num_beams=1, # 增加束搜索数量提高准确性
do_sample=False,
early_stopping=False,
repetition_penalty=1.5,
use_cache=True
)
else:
generated_tokens = model.generate(
**encoded,
max_length=512,
num_beams=3, # 增加束搜索数量提高准确性
do_sample=True,
early_stopping=True,
repetition_penalty=1.5,
temperature=0.7, # 设置温度
top_k=50,
top_p=0.9,
use_cache=True,
forced_bos_token_id=tokenizer.lang_code_to_id.get(target_lang, None) # 强制使用目标语言起始token
)
else:
with torch.inference_mode():
if quick:
generated_tokens = model.generate(
**encoded,
max_length=512,
num_beams=1,
do_sample=False,
early_stopping=False,
repetition_penalty=1.5,
use_cache=True
)
else:
generated_tokens = model.generate(
**encoded,
max_length=512,
num_beams=2,
do_sample=True,
early_stopping=True,
repetition_penalty=1.5,
use_cache=True
)
translations = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)
#print(translations)
# 优化token计数,排除所有特殊标记
# 获取所有特殊标记ID
special_token_ids = set(tokenizer.all_special_ids)
#print(special_token_ids)
# 过滤特殊标记并计数
total_tokens = 0
for ids in encoded["input_ids"]:
# 排除所有特殊标记后的实际token数(修复张量比较问题)
content_tokens = [id.item() for id in ids if id.item() not in special_token_ids]
#print("输入token列表:", content_tokens)
#print("输入token数量:", len(content_tokens))
total_tokens += len(content_tokens)
#print("total_tokens"+str(total_tokens))
return translations, total_tokens # 保持返回格式一致性
except Exception as e:
return "翻译失败:%s" % str(e), 0 # 失败时tokens数记为0
def translate_batch(text_list, target_lang):
"""翻译文本数组,返回(结果数组,总tokens数)"""
results = []
total_tokens = 0 # 累计总tokens数
for text in text_list:
if not isinstance(text, str):
results.append("无效输入:%s(必须是字符串)" % text)
continue
# 获取单条翻译结果和tokens数
translation, tokens = translate_single(text, target_lang)
#print(translation)
results.append(translation[0])
total_tokens += tokens # 累加tokens数
return results, total_tokens
@app.route('/translate.json', methods=['POST'])
def translate():
start_time = time.perf_counter() # 记录开始时间
# 忽略Content-Type,强制解析数据
try:
data = request.get_json(force=True)
except:
data = request.form.to_dict()
if not data:
raw_data = request.data.decode('utf-8').strip()
if raw_data:
parts = raw_data.split('&')
for part in parts:
if '=' in part:
k, v = part.split('=', 1)
data[k] = v
# 验证必填参数(失败响应)
if not data or "text" not in data or "to" not in data:
elapsed_time = (time.perf_counter() - start_time) * 1000
return jsonify({
"result": 0,
"info": "缺少参数!请传入 'text'(待翻译内容,支持单文本或数组)和 'to'(目标语言代码)",
"time": int(elapsed_time)
}), 400
text_input = data["text"]
translatejs_to_lang = data["to"].lower()
target_lang = translatejsToM2m(translatejs_to_lang)
#判断 translatejs_to_lang 是否为空
if target_lang == "":
return jsonify({
"result": 0,
"info": "语言 "+translatejs_to_lang+" 不支持"
}), 400
original_from = data.get("from")
#print(original_from)
# 验证目标语言(失败响应)
if target_lang not in supported_langs:
return jsonify({
"result": 0,
"info": "不支持的语言!支持的代码:%s" % sorted(supported_langs)
}), 400
# 处理输入类型并翻译
try:
# 解析text_input为数组
if isinstance(text_input, str):
try:
text_list = json.loads(text_input)
if not isinstance(text_list, list):
text_list = [text_input]
except:
text_list = [text_input]
elif isinstance(text_input, list):
text_list = text_input
else:
return jsonify({
"result": 0,
"info": "text参数必须是字符串或数组"
}), 400
# 执行翻译(获取结果数组和总tokens数)
translated_results, total_tokens = translate_batch(text_list, target_lang)
# 成功响应:result=1 + text数组 + tokens数
elapsed_time = (time.perf_counter() - start_time) * 1000
response_data = {
"result": 1,
"text": translated_results, # 原data参数改为text
"to":translatejs_to_lang,
"tokens": total_tokens, # 新增:总tokens数
"time": int(elapsed_time) # 新增:耗时(毫秒)
}
# 如果传入了from参数,则添加到响应中
if original_from:
response_data["from"] = original_from
return jsonify(response_data)
except Exception as e:
# 处理过程中出错(失败响应)
elapsed_time = (time.perf_counter() - start_time) * 1000
return jsonify({
"result": 0,
"info": "处理失败:%s" % str(e),
"time": int(elapsed_time)
}), 500
# 添加语言列表接口
@app.route('/language.json',methods=['POST','GET'])
def get_supported_languages():
# 返回JSON响应,包含耗时信息
response = jsonify({
"list": language_dict_translatejs,
"result": 1,
"info":"success"
})
# 显式设置响应编码为UTF-8
#response.headers['Content-Type'] = 'application/json; charset=utf-8'
return response
# 首页
@app.route('/')
def index():
html = f"""<span>Welcome use</span> <span class="ignore"> translate100 </span><span>, its original intention is to provide</span> <span><a href='https://github.com/xnx3/translate' class="ignore"> translate.js </a><span>with translation switching support between 100 languages.</span>
<br><span>my email</span>: <span class="ignore">[email protected]</span>
<br>
<script src='/translate.js'></script>
<script>
translate.request.api.host=window.location.origin+'/';
translate.request.api.ip = '';
translate.request.api.connectTest = '';
translate.request.api.init = '';
translate.whole.enableAll();
translate.setAutoDiscriminateLocalLanguage();
translate.progress.api.startUITip();
translate.nomenclature.append('english','chinese_simplified',`
with translation switching support between 100 languages.=支持100种语言之间的翻译切换。
its original intention is to provide=其初衷是提供
`);
translate.execute();
</script>"""
return html
# translate.min.js
@app.route('/translate.js')
def serve_translate_js():
return send_from_directory('resources', 'translate.js')
if __name__ == '__main__':
# 启动Flask应用前打印提示
print(f"The system is running and you can use it normally now\nAccess port number: {port}")
app.run(host='0.0.0.0', port=port, debug=True, use_reloader=False)