Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| from gradio_client import Client | |
| from huggingface_hub import InferenceClient | |
| import random | |
| from datetime import datetime | |
| #from models import models | |
| ss_client = Client("https://omnibus-html-image-current-tab.hf.space/") | |
| models=[ | |
| "google/gemma-7b", | |
| "google/gemma-7b-it", | |
| "google/gemma-2b", | |
| "google/gemma-2b-it", | |
| "openchat/openchat-3.5-0106", | |
| "NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO", | |
| "mistralai/Mixtral-8x7B-Instruct-v0.1", | |
| "JunRyeol/jr_model", | |
| "bigcode/starcoder2-15b", | |
| ] | |
| def test_models(): | |
| log_box=[] | |
| for model in models: | |
| start_time = datetime.now() | |
| try: | |
| generate_kwargs = dict( | |
| temperature=0.9, | |
| max_new_tokens=128, | |
| top_p=0.9, | |
| repetition_penalty=1.0, | |
| do_sample=True, | |
| seed=111111111, | |
| ) | |
| print(f'trying: {model}\n') | |
| client= InferenceClient(model) | |
| outp="" | |
| stream=client.text_generation("What is a cat", **generate_kwargs, stream=True, details=True, return_full_text=True) | |
| for response in stream: | |
| outp += response.token.text | |
| print (outp) | |
| time_delta = datetime.now() - start_time | |
| count=time_delta.total_seconds() | |
| #if time_delta.total_seconds() >= 180: | |
| log = {"Model":model,"Status":"Success","Output":outp, "Time":count} | |
| print(f'{log}\n') | |
| log_box.append(log) | |
| except Exception as e: | |
| time_delta = datetime.now() - start_time | |
| count=time_delta.total_seconds() | |
| log = {"Model":model,"Status":"Error","Output":e,"Time":count} | |
| print(f'{log}\n') | |
| log_box.append(log) | |
| yield log_box | |
| def format_prompt_default(message, history,cust_p): | |
| prompt = "" | |
| if history: | |
| #<start_of_turn>userHow does the brain work?<end_of_turn><start_of_turn>model | |
| for user_prompt, bot_response in history: | |
| prompt += f"{user_prompt}\n" | |
| print(prompt) | |
| prompt += f"{bot_response}\n" | |
| print(prompt) | |
| #prompt += f"{message}\n" | |
| prompt+=cust_p.replace("USER_INPUT",message) | |
| return prompt | |
| def format_prompt_gemma(message, history,cust_p): | |
| prompt = "" | |
| if history: | |
| for user_prompt, bot_response in history: | |
| prompt += f"<start_of_turn>user{user_prompt}<end_of_turn>" | |
| prompt += f"<start_of_turn>model{bot_response}<end_of_turn>" | |
| if VERBOSE==True: | |
| print(prompt) | |
| #prompt += f"<start_of_turn>user\n{message}<end_of_turn>\n<start_of_turn>model\n" | |
| prompt+=cust_p.replace("USER_INPUT",message) | |
| return prompt | |
| def format_prompt_openc(message, history,cust_p): | |
| #prompt = "GPT4 Correct User: " | |
| prompt="" | |
| if history: | |
| #<start_of_turn>userHow does the brain work?<end_of_turn><start_of_turn>model | |
| for user_prompt, bot_response in history: | |
| prompt += f"{user_prompt}" | |
| prompt += f"<|end_of_turn|>" | |
| prompt += f"GPT4 Correct Assistant: " | |
| prompt += f"{bot_response}" | |
| prompt += f"<|end_of_turn|>" | |
| print(prompt) | |
| #GPT4 Correct User: Hello<|end_of_turn|>GPT4 Correct Assistant: | |
| prompt+=cust_p.replace("USER_INPUT",message) | |
| return prompt | |
| def format_prompt_mixtral(message, history,cust_p): | |
| prompt = "<s>" | |
| if history: | |
| for user_prompt, bot_response in history: | |
| prompt += f"[INST] {user_prompt} [/INST]" | |
| prompt += f" {bot_response}</s> " | |
| #prompt += f"[INST] {message} [/INST]" | |
| prompt+=cust_p.replace("USER_INPUT",message) | |
| return prompt | |
| def format_prompt_choose(message, history, cust_p, model_name): | |
| if "gemma" in models[model_name].lower(): | |
| return format_prompt_gemma(message,history,cust_p) | |
| if "mixtral" in models[model_name].lower(): | |
| return format_prompt_mixtral(message,history,cust_p) | |
| if "openchat" in models[model_name].lower(): | |
| return format_prompt_openc(message,history,cust_p) | |
| else: | |
| return format_prompt_default(message,history,cust_p) | |
| def load_models(inp): | |
| print(type(inp)) | |
| print(inp) | |
| print(models[inp]) | |
| model_state= InferenceClient(models[inp]) | |
| out_box=gr.update(label=models[inp]) | |
| if "gemma" in models[inp].lower(): | |
| prompt_out="<start_of_turn>userUSER_INPUT<end_of_turn><start_of_turn>model" | |
| return out_box,prompt_out, model_state | |
| if "mixtral" in models[inp].lower(): | |
| prompt_out="[INST] USER_INPUT [/INST]" | |
| return out_box,prompt_out, model_state | |
| if "openchat" in models[inp].lower(): | |
| prompt_out="GPT4 Correct User: USER_INPUT<|end_of_turn|>GPT4 Correct Assistant: " | |
| return out_box,prompt_out, model_state | |
| else: | |
| prompt_out="USER_INPUT\n" | |
| return out_box,prompt_out, model_state | |
| VERBOSE=False | |
| def load_models_OG(inp): | |
| if VERBOSE==True: | |
| print(type(inp)) | |
| print(inp) | |
| print(models[inp]) | |
| #client_z.clear() | |
| #client_z.append(InferenceClient(models[inp])) | |
| return gr.update(label=models[inp]) | |
| def format_prompt(message, history, cust_p): | |
| prompt = "" | |
| if history: | |
| for user_prompt, bot_response in history: | |
| prompt += f"<start_of_turn>user{user_prompt}<end_of_turn>" | |
| prompt += f"<start_of_turn>model{bot_response}<end_of_turn>" | |
| if VERBOSE==True: | |
| print(prompt) | |
| #prompt += f"<start_of_turn>user\n{message}<end_of_turn>\n<start_of_turn>model\n" | |
| prompt+=cust_p.replace("USER_INPUT",message) | |
| return prompt | |
| def chat_inf(system_prompt,prompt,history,memory,model_state,model_name,seed,temp,tokens,top_p,rep_p,chat_mem,cust_p): | |
| #token max=8192 | |
| model_n=models[model_name] | |
| print(model_state) | |
| hist_len=0 | |
| client=model_state | |
| if not history: | |
| history = [] | |
| hist_len=0 | |
| if not memory: | |
| memory = [] | |
| mem_len=0 | |
| if memory: | |
| for ea in memory[0-chat_mem:]: | |
| hist_len+=len(str(ea)) | |
| in_len=len(system_prompt+prompt)+hist_len | |
| if (in_len+tokens) > 8000: | |
| history.append((prompt,"Wait, that's too many tokens, please reduce the 'Chat Memory' value, or reduce the 'Max new tokens' value")) | |
| yield history,memory | |
| else: | |
| generate_kwargs = dict( | |
| temperature=temp, | |
| max_new_tokens=tokens, | |
| top_p=top_p, | |
| repetition_penalty=rep_p, | |
| do_sample=True, | |
| seed=seed, | |
| ) | |
| if system_prompt: | |
| formatted_prompt = format_prompt_choose(f"{system_prompt}, {prompt}", memory[0-chat_mem:],cust_p,model_name) | |
| else: | |
| formatted_prompt = format_prompt_choose(prompt, memory[0-chat_mem:],cust_p,model_name) | |
| stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=True) | |
| output = "" | |
| for response in stream: | |
| output += response.token.text | |
| yield [(prompt,output)],memory | |
| history.append((prompt,output)) | |
| memory.append((prompt,output)) | |
| yield history,memory | |
| if VERBOSE==True: | |
| print("\n######### HIST "+str(in_len)) | |
| print("\n######### TOKENS "+str(tokens)) | |
| def get_screenshot(chat: list,height=5000,width=600,chatblock=[],theme="light",wait=3000,header=True): | |
| print(chatblock) | |
| tog = 0 | |
| if chatblock: | |
| tog = 3 | |
| result = ss_client.predict(str(chat),height,width,chatblock,header,theme,wait,api_name="/run_script") | |
| out = f'https://omnibus-html-image-current-tab.hf.space/file={result[tog]}' | |
| print(out) | |
| return out | |
| def clear_fn(): | |
| return None,None,None,None | |
| rand_val=random.randint(1,1111111111111111) | |
| def check_rand(inp,val): | |
| if inp==True: | |
| return gr.Slider(label="Seed", minimum=1, maximum=1111111111111111, value=random.randint(1,1111111111111111)) | |
| else: | |
| return gr.Slider(label="Seed", minimum=1, maximum=1111111111111111, value=int(val)) | |
| with gr.Blocks() as app: | |
| model_state=gr.State() | |
| memory=gr.State() | |
| gr.HTML("""<center><h1 style='font-size:xx-large;'>Huggingface Hub InferenceClient</h1><br><h3>Chatbot's</h3></center>""") | |
| chat_b = gr.Chatbot(height=500) | |
| with gr.Group(): | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| inp = gr.Textbox(label="Prompt") | |
| sys_inp = gr.Textbox(label="System Prompt (optional)") | |
| with gr.Accordion("Prompt Format",open=False): | |
| custom_prompt=gr.Textbox(label="Modify Prompt Format", info="For testing purposes. 'USER_INPUT' is where 'SYSTEM_PROMPT, PROMPT' will be placed", lines=3,value="<start_of_turn>userUSER_INPUT<end_of_turn><start_of_turn>model") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| btn = gr.Button("Chat") | |
| with gr.Column(scale=1): | |
| with gr.Group(): | |
| stop_btn=gr.Button("Stop") | |
| clear_btn=gr.Button("Clear") | |
| test_btn=gr.Button("Test") | |
| client_choice=gr.Dropdown(label="Models",type='index',choices=[c for c in models],value=models[0],interactive=True) | |
| with gr.Column(scale=1): | |
| with gr.Group(): | |
| rand = gr.Checkbox(label="Random Seed", value=True) | |
| seed=gr.Slider(label="Seed", minimum=1, maximum=1111111111111111,step=1, value=rand_val) | |
| tokens = gr.Slider(label="Max new tokens",value=1600,minimum=0,maximum=8000,step=64,interactive=True, visible=True,info="The maximum number of tokens") | |
| temp=gr.Slider(label="Temperature",step=0.01, minimum=0.01, maximum=1.0, value=0.49) | |
| top_p=gr.Slider(label="Top-P",step=0.01, minimum=0.01, maximum=1.0, value=0.49) | |
| rep_p=gr.Slider(label="Repetition Penalty",step=0.01, minimum=0.1, maximum=2.0, value=0.99) | |
| chat_mem=gr.Number(label="Chat Memory", info="Number of previous chats to retain",value=4) | |
| with gr.Accordion(label="Screenshot",open=False): | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| im_btn=gr.Button("Screenshot") | |
| img=gr.Image(type='filepath') | |
| with gr.Column(scale=1): | |
| with gr.Row(): | |
| im_height=gr.Number(label="Height",value=5000) | |
| im_width=gr.Number(label="Width",value=500) | |
| wait_time=gr.Number(label="Wait Time",value=3000) | |
| theme=gr.Radio(label="Theme", choices=["light","dark"],value="light") | |
| chatblock=gr.Dropdown(label="Chatblocks",info="Choose specific blocks of chat",choices=[c for c in range(1,40)],multiselect=True) | |
| test_json=gr.JSON(label="Test Output") | |
| test_btn.click(test_models,None,test_json) | |
| client_choice.change(load_models,client_choice,[chat_b,custom_prompt,model_state]) | |
| app.load(load_models,client_choice,[chat_b,custom_prompt,model_state]) | |
| im_go=im_btn.click(get_screenshot,[chat_b,im_height,im_width,chatblock,theme,wait_time],img) | |
| chat_sub=inp.submit(check_rand,[rand,seed],seed).then(chat_inf,[sys_inp,inp,chat_b,memory,model_state,client_choice,seed,temp,tokens,top_p,rep_p,chat_mem,custom_prompt],[chat_b,memory]) | |
| go=btn.click(check_rand,[rand,seed],seed).then(chat_inf,[sys_inp,inp,chat_b,memory,model_state,client_choice,seed,temp,tokens,top_p,rep_p,chat_mem,custom_prompt],[chat_b,memory]) | |
| stop_btn.click(None,None,None,cancels=[go,im_go,chat_sub]) | |
| clear_btn.click(clear_fn,None,[inp,sys_inp,chat_b,memory]) | |
| app.queue(default_concurrency_limit=10).launch() |