import html import json import os from textwrap import dedent from typing import Any, Coroutine, List import openai from directory_tree import display_tree from dotenv import load_dotenv from pydantic import Field from ...core.main import ChatMessage, FunctionCall, Models, Step, step_to_json_schema from ...core.sdk import ContinueSDK from ...core.steps import MessageStep from ...libs.llm.openai import OpenAI from ...libs.llm.openai_free_trial import OpenAIFreeTrial from ...libs.util.devdata import dev_data_logger from ...libs.util.strings import remove_quotes_and_escapes from ...libs.util.telemetry import posthog_logger from .main import EditHighlightedCodeStep load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") openai.api_key = OPENAI_API_KEY FREE_USAGE_STEP_NAME = "Please enter OpenAI API key" def add_ellipsis(text: str, max_length: int = 200) -> str: if len(text) > max_length: return text[: max_length - 3] + "..." return text class SimpleChatStep(Step): name: str = "Generating Response..." manage_own_chat_context: bool = True description: str = "" messages: List[ChatMessage] = None async def run(self, sdk: ContinueSDK): # Check if proxy server API key if ( isinstance(sdk.models.default, OpenAIFreeTrial) and ( sdk.models.default.api_key is None or sdk.models.default.api_key.strip() == "" ) and len(list(filter(lambda x: not x.step.hide, sdk.history.timeline))) >= 10 and len( list( filter( lambda x: x.step.name == FREE_USAGE_STEP_NAME, sdk.history.timeline, ) ) ) == 0 ): await sdk.run_step( MessageStep( name=FREE_USAGE_STEP_NAME, message=dedent( """\ To make it easier to use Continue, you're getting limited free usage. When you have the chance, please enter your own OpenAI key in `~/.continue/config.py`. You can open the file by using the '/config' slash command in the text box below. Here's an example of how to edit the file: ```python ... config=ContinueConfig( ... models=Models( default=OpenAIFreeTrial(api_key="", model="gpt-4"), summarize=OpenAIFreeTrial(api_key="", model="gpt-3.5-turbo") ) ) ``` You can also learn more about customizations [here](https://continue.dev/docs/customization). """ ), ) ) messages = self.messages or await sdk.get_chat_context() generator = sdk.models.chat.stream_chat( messages, temperature=sdk.config.temperature ) posthog_logger.capture_event( "model_use", { "model": sdk.models.default.model, "provider": sdk.models.default.__class__.__name__, }, ) dev_data_logger.capture( "model_use", { "model": sdk.models.default.model, "provider": sdk.models.default.__class__.__name__, }, ) async for chunk in generator: if sdk.current_step_was_deleted(): # So that the message doesn't disappear self.hide = False await sdk.update_ui() break if "content" in chunk: self.description += chunk["content"] # HTML unencode end_size = len(chunk["content"]) - 6 if "&" in self.description[-end_size:]: self.description = self.description[:-end_size] + html.unescape( self.description[-end_size:] ) await sdk.update_ui() if sdk.config.disable_summaries: self.name = "" else: self.name = "Generating title..." await sdk.update_ui() self.name = add_ellipsis( remove_quotes_and_escapes( await sdk.models.summarize.complete( f'"{self.description}"\n\nPlease write a short title summarizing the message quoted above. Use no more than 10 words:', max_tokens=20, log=False, ) ), 200, ) await sdk.update_ui() self.chat_context.append( ChatMessage(role="assistant", content=self.description, summary=self.name) ) # TODO: Never actually closing. await generator.aclose() class AddFileStep(Step): name: str = "Add File" description = "Add a file to the workspace. Should always view the directory tree before this." filename: str file_contents: str async def describe( self, models: Models ) -> Coroutine[Any, Any, Coroutine[str, None, None]]: return f"Added a file named `{self.filename}` to the workspace." async def run(self, sdk: ContinueSDK): await sdk.add_file(self.filename, self.file_contents) await sdk.ide.setFileOpen( os.path.join(sdk.ide.workspace_directory, self.filename) ) class DeleteFileStep(Step): name: str = "Delete File" description = "Delete a file from the workspace." filename: str async def describe( self, models: Models ) -> Coroutine[Any, Any, Coroutine[str, None, None]]: return f"Deleted a file named `{self.filename}` from the workspace." async def run(self, sdk: ContinueSDK): await sdk.delete_file(self.filename) class AddDirectoryStep(Step): name: str = "Add Directory" description = "Add a directory to the workspace." directory_name: str async def describe( self, models: Models ) -> Coroutine[Any, Any, Coroutine[str, None, None]]: return f"Added a directory named `{self.directory_name}` to the workspace." async def run(self, sdk: ContinueSDK): try: await sdk.add_directory(self.directory_name) except FileExistsError: self.description = f"Directory {self.directory_name} already exists." class RunTerminalCommandStep(Step): name: str = "Run Terminal Command" description: str = "Run a terminal command." command: str async def run(self, sdk: ContinueSDK): self.description = f"Copy this command and run in your terminal:\n\n```bash\n{self.command}\n```" class ViewDirectoryTreeStep(Step): name: str = "View Directory Tree" description: str = "View the directory tree to learn which folder and files exist. You should always do this before adding new files." async def describe( self, models: Models ) -> Coroutine[Any, Any, Coroutine[str, None, None]]: return "Viewed the directory tree." async def run(self, sdk: ContinueSDK): self.description = ( f"```\n{display_tree(sdk.ide.workspace_directory, True, max_depth=2)}\n```" ) class EditFileStep(Step): name: str = "Edit File" description: str = "Edit a file in the workspace that is not currently open." filename: str = Field(..., description="The name of the file to edit.") instructions: str = Field(..., description="The instructions to edit the file.") hide: bool = True async def run(self, sdk: ContinueSDK): await sdk.edit_file(self.filename, self.instructions) class ChatWithFunctions(Step): user_input: str functions: List[Step] = [ AddFileStep(filename="", file_contents=""), EditFileStep(filename="", instructions=""), EditHighlightedCodeStep(user_input=""), ViewDirectoryTreeStep(), AddDirectoryStep(directory_name=""), DeleteFileStep(filename=""), RunTerminalCommandStep(command=""), ] name: str = "Input" manage_own_chat_context: bool = True description: str = "" hide: bool = True async def run(self, sdk: ContinueSDK): await sdk.update_ui() step_name_step_class_map = { step.name.replace(" ", ""): step.__class__ for step in self.functions } functions = [step_to_json_schema(function) for function in self.functions] self.chat_context.append( ChatMessage(role="user", content=self.user_input, summary=self.user_input) ) last_function_called_name = None last_function_called_params = None while True: was_function_called = False func_args = "" func_name = "" msg_content = "" msg_step = None gpt350613 = OpenAI(model="gpt-3.5-turbo-0613") await sdk.start_model(gpt350613) async for msg_chunk in gpt350613.stream_chat( await sdk.get_chat_context(), functions=functions ): if sdk.current_step_was_deleted(): return if "content" in msg_chunk and msg_chunk["content"] is not None: msg_content += msg_chunk["content"] # if last_function_called_index_in_history is not None: # while sdk.history.timeline[last_function_called_index].step.hide: # last_function_called_index += 1 # sdk.history.timeline[last_function_called_index_in_history].step.description = msg_content if msg_step is None: msg_step = MessageStep( name="Chat", message=msg_chunk["content"] ) await sdk.run_step(msg_step) else: msg_step.description = msg_content await sdk.update_ui() elif "function_call" in msg_chunk or func_name != "": was_function_called = True if "function_call" in msg_chunk: if "arguments" in msg_chunk["function_call"]: func_args += msg_chunk["function_call"]["arguments"] if "name" in msg_chunk["function_call"]: func_name += msg_chunk["function_call"]["name"] if not was_function_called: self.chat_context.append( ChatMessage( role="assistant", content=msg_content, summary=msg_content ) ) break else: if func_name == "python" and "python" not in step_name_step_class_map: # GPT must be fine-tuned to believe this exists, but it doesn't always func_name = "EditHighlightedCodeStep" func_args = json.dumps({"user_input": self.user_input}) # self.chat_context.append(ChatMessage( # role="assistant", # content=None, # function_call=FunctionCall( # name=func_name, # arguments=func_args # ), # summary=f"Called function {func_name}" # )) # self.chat_context.append(ChatMessage( # role="user", # content="The 'python' function does not exist. Don't call it. Try again to call another function.", # summary="'python' function does not exist." # )) # msg_step.hide = True # continue # Call the function, then continue to chat func_args = "{}" if func_args == "" else func_args try: fn_call_params = json.loads(func_args) except json.JSONDecodeError: raise Exception("The model returned invalid JSON. Please try again") self.chat_context.append( ChatMessage( role="assistant", content=None, function_call=FunctionCall(name=func_name, arguments=func_args), summary=f"Called function {func_name}", ) ) sdk.history.current_index + 1 if func_name not in step_name_step_class_map: raise Exception( f"The model tried to call a function ({func_name}) that does not exist. Please try again." ) # if func_name == "AddFileStep": # step_to_run.hide = True # self.description += f"\nAdded file `{func_args['filename']}`" # elif func_name == "AddDirectoryStep": # step_to_run.hide = True # self.description += f"\nAdded directory `{func_args['directory_name']}`" # else: # self.description += f"\n`Running function {func_name}`\n\n" if func_name == "EditHighlightedCodeStep": fn_call_params["user_input"] = self.user_input elif func_name == "EditFile": fn_call_params["instructions"] = self.user_input step_to_run = step_name_step_class_map[func_name](**fn_call_params) if ( last_function_called_name is not None and last_function_called_name == func_name and last_function_called_params is not None and last_function_called_params == fn_call_params ): # If it's calling the same function more than once in a row, it's probably looping and confused return last_function_called_name = func_name last_function_called_params = fn_call_params await sdk.run_step(step_to_run) await sdk.update_ui()