Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| from typing import List | |
| from tenacity import retry, stop_after_attempt, wait_random_exponential | |
| from ..message import SYSTEM_NAME, Message | |
| from .base import IntelligenceBackend | |
| try: | |
| from langchain.llms import OpenAI | |
| except ImportError: | |
| is_langchain_openai_available = False | |
| # logging.warning("openai package is not installed") | |
| else: | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if api_key is None: | |
| # logging.warning("OpenAI API key is not set. Please set the environment variable OPENAI_API_KEY") | |
| is_langchain_openai_available = False | |
| else: | |
| is_langchain_openai_available = True | |
| # Default config follows the OpenAI playground | |
| DEFAULT_TEMPERATURE = 0.7 | |
| DEFAULT_MAX_TOKENS = 2048 | |
| DEFAULT_MODEL = "gpt-4" | |
| END_OF_MESSAGE = "<EOS>" # End of message token specified by us not OpenAI | |
| STOP = ("<|endoftext|>", END_OF_MESSAGE) # End of sentence token | |
| BASE_PROMPT = f"The messages always end with the token {END_OF_MESSAGE}." | |
| class LangChainOpenAIChat(IntelligenceBackend): | |
| """Interface to the ChatGPT style model with system, user, assistant roles separation.""" | |
| stateful = False | |
| type_name = "openai-chat" | |
| def __init__( | |
| self, | |
| temperature: float = DEFAULT_TEMPERATURE, | |
| max_tokens: int = DEFAULT_MAX_TOKENS, | |
| model: str = DEFAULT_MODEL, | |
| merge_other_agents_as_one_user: bool = True, | |
| **kwargs, | |
| ): | |
| """ | |
| Instantiate the OpenAIChat backend. | |
| args: | |
| temperature: the temperature of the sampling | |
| max_tokens: the maximum number of tokens to sample | |
| model: the model to use | |
| merge_other_agents_as_one_user: whether to merge messages from other agents as one user message | |
| """ | |
| assert ( | |
| is_langchain_openai_available | |
| ), "langchain package is not installed or the API key is not set" | |
| super().__init__( | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| model=model, | |
| merge_other_agents_as_one_user=merge_other_agents_as_one_user, | |
| **kwargs, | |
| ) | |
| self.temperature = temperature | |
| self.max_tokens = max_tokens | |
| self.model = model | |
| self.merge_other_agent_as_user = merge_other_agents_as_one_user | |
| self.llm = OpenAI( | |
| model_name=model, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| openai_api_key=api_key, | |
| ) | |
| def _get_response(self, messages): | |
| response = self.llm(prompt=messages, stop=STOP) | |
| return response | |
| def query( | |
| self, | |
| agent_name: str, | |
| role_desc: str, | |
| history_messages: List[Message], | |
| global_prompt: str = None, | |
| request_msg: Message = None, | |
| *args, | |
| **kwargs, | |
| ) -> str: | |
| """ | |
| Format the input and call the ChatGPT/GPT-4 API. | |
| args: | |
| agent_name: the name of the agent | |
| role_desc: the description of the role of the agent | |
| env_desc: the description of the environment | |
| history_messages: the history of the conversation, or the observation for the agent | |
| request_msg: the request from the system to guide the agent's next response | |
| """ | |
| # Merge the role description and the global prompt as the system prompt for the agent | |
| if global_prompt: # Prepend the global prompt if it exists | |
| system_prompt = f"{global_prompt.strip()}\n{BASE_PROMPT}\n\nYour name: {agent_name}\n\nYour role:{role_desc}" | |
| else: | |
| system_prompt = ( | |
| f"You are {agent_name}.\n\nYour role:{role_desc}\n\n{BASE_PROMPT}" | |
| ) | |
| all_messages = [(SYSTEM_NAME, system_prompt)] | |
| for msg in history_messages: | |
| if msg.agent_name == SYSTEM_NAME: | |
| all_messages.append((SYSTEM_NAME, msg.content)) | |
| else: # non-system messages are suffixed with the end of message token | |
| all_messages.append((msg.agent_name, f"{msg.content}{END_OF_MESSAGE}")) | |
| if request_msg: | |
| all_messages.append((SYSTEM_NAME, request_msg.content)) | |
| else: # The default request message that reminds the agent its role and instruct it to speak | |
| all_messages.append( | |
| (SYSTEM_NAME, f"Now you speak, {agent_name}.{END_OF_MESSAGE}") | |
| ) | |
| messages = [] | |
| for i, msg in enumerate(all_messages): | |
| if i == 0: | |
| assert ( | |
| msg[0] == SYSTEM_NAME | |
| ) # The first message should be from the system | |
| messages.append({"role": "system", "content": msg[1]}) | |
| else: | |
| if msg[0] == agent_name: | |
| messages.append({"role": "assistant", "content": msg[1]}) | |
| else: | |
| if messages[-1]["role"] == "user": # last message is from user | |
| if self.merge_other_agent_as_user: | |
| messages[-1][ | |
| "content" | |
| ] = f"{messages[-1]['content']}\n\n[{msg[0]}]: {msg[1]}" | |
| else: | |
| messages.append( | |
| {"role": "user", "content": f"[{msg[0]}]: {msg[1]}"} | |
| ) | |
| elif ( | |
| messages[-1]["role"] == "assistant" | |
| ): # consecutive assistant messages | |
| # Merge the assistant messages | |
| messages[-1]["content"] = f"{messages[-1]['content']}\n{msg[1]}" | |
| elif messages[-1]["role"] == "system": | |
| messages.append( | |
| {"role": "user", "content": f"[{msg[0]}]: {msg[1]}"} | |
| ) | |
| else: | |
| raise ValueError(f"Invalid role: {messages[-1]['role']}") | |
| response = self._get_response(messages, *args, **kwargs) | |
| # Remove the agent name if the response starts with it | |
| response = re.sub(rf"^\s*\[.*]:", "", response).strip() # noqa: F541 | |
| response = re.sub( | |
| rf"^\s*{re.escape(agent_name)}\s*:", "", response | |
| ).strip() # noqa: F541 | |
| # Remove the tailing end of message token | |
| response = re.sub(rf"{END_OF_MESSAGE}$", "", response).strip() | |
| return response | |