summaryrefslogtreecommitdiff
path: root/server/continuedev/plugins/steps/chat.py
diff options
context:
space:
mode:
Diffstat (limited to 'server/continuedev/plugins/steps/chat.py')
-rw-r--r--server/continuedev/plugins/steps/chat.py379
1 files changed, 379 insertions, 0 deletions
diff --git a/server/continuedev/plugins/steps/chat.py b/server/continuedev/plugins/steps/chat.py
new file mode 100644
index 00000000..1b0f76f9
--- /dev/null
+++ b/server/continuedev/plugins/steps/chat.py
@@ -0,0 +1,379 @@
+import html
+import json
+import os
+from textwrap import dedent
+from typing import Any, Coroutine, List
+
+import openai
+from directory_tree import display_tree
+from dotenv import load_dotenv
+from pydantic import Field
+
+from ...core.main import ChatMessage, FunctionCall, Models, Step, step_to_json_schema
+from ...core.sdk import ContinueSDK
+from ...core.steps import MessageStep
+from ...libs.llm.openai import OpenAI
+from ...libs.llm.openai_free_trial import OpenAIFreeTrial
+from ...libs.util.devdata import dev_data_logger
+from ...libs.util.strings import remove_quotes_and_escapes
+from ...libs.util.telemetry import posthog_logger
+from .main import EditHighlightedCodeStep
+
+load_dotenv()
+OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
+openai.api_key = OPENAI_API_KEY
+
+FREE_USAGE_STEP_NAME = "Please enter OpenAI API key"
+
+
+def add_ellipsis(text: str, max_length: int = 200) -> str:
+ if len(text) > max_length:
+ return text[: max_length - 3] + "..."
+ return text
+
+
+class SimpleChatStep(Step):
+ name: str = "Generating Response..."
+ manage_own_chat_context: bool = True
+ description: str = ""
+ messages: List[ChatMessage] = None
+
+ async def run(self, sdk: ContinueSDK):
+ # Check if proxy server API key
+ if (
+ isinstance(sdk.models.default, OpenAIFreeTrial)
+ and (
+ sdk.models.default.api_key is None
+ or sdk.models.default.api_key.strip() == ""
+ )
+ and len(list(filter(lambda x: not x.step.hide, sdk.history.timeline))) >= 10
+ and len(
+ list(
+ filter(
+ lambda x: x.step.name == FREE_USAGE_STEP_NAME,
+ sdk.history.timeline,
+ )
+ )
+ )
+ == 0
+ ):
+ await sdk.run_step(
+ MessageStep(
+ name=FREE_USAGE_STEP_NAME,
+ message=dedent(
+ """\
+ To make it easier to use Continue, you're getting limited free usage. When you have the chance, please enter your own OpenAI key in `~/.continue/config.py`. You can open the file by using the '/config' slash command in the text box below.
+
+ Here's an example of how to edit the file:
+ ```python
+ ...
+ config=ContinueConfig(
+ ...
+ models=Models(
+ default=OpenAIFreeTrial(api_key="<API_KEY>", model="gpt-4"),
+ summarize=OpenAIFreeTrial(api_key="<API_KEY>", model="gpt-3.5-turbo")
+ )
+ )
+ ```
+
+ You can also learn more about customizations [here](https://continue.dev/docs/customization).
+ """
+ ),
+ )
+ )
+
+ messages = self.messages or await sdk.get_chat_context()
+
+ generator = sdk.models.chat.stream_chat(
+ messages, temperature=sdk.config.temperature
+ )
+
+ posthog_logger.capture_event(
+ "model_use",
+ {
+ "model": sdk.models.default.model,
+ "provider": sdk.models.default.__class__.__name__,
+ },
+ )
+ dev_data_logger.capture(
+ "model_use",
+ {
+ "model": sdk.models.default.model,
+ "provider": sdk.models.default.__class__.__name__,
+ },
+ )
+
+ async for chunk in generator:
+ if sdk.current_step_was_deleted():
+ # So that the message doesn't disappear
+ self.hide = False
+ await sdk.update_ui()
+ break
+
+ if "content" in chunk:
+ self.description += chunk["content"]
+
+ # HTML unencode
+ end_size = len(chunk["content"]) - 6
+ if "&" in self.description[-end_size:]:
+ self.description = self.description[:-end_size] + html.unescape(
+ self.description[-end_size:]
+ )
+
+ await sdk.update_ui()
+
+ if sdk.config.disable_summaries:
+ self.name = ""
+ else:
+ self.name = "Generating title..."
+ await sdk.update_ui()
+ self.name = add_ellipsis(
+ remove_quotes_and_escapes(
+ await sdk.models.summarize.complete(
+ f'"{self.description}"\n\nPlease write a short title summarizing the message quoted above. Use no more than 10 words:',
+ max_tokens=20,
+ log=False,
+ )
+ ),
+ 200,
+ )
+
+ await sdk.update_ui()
+
+ self.chat_context.append(
+ ChatMessage(role="assistant", content=self.description, summary=self.name)
+ )
+
+ # TODO: Never actually closing.
+ await generator.aclose()
+
+
+class AddFileStep(Step):
+ name: str = "Add File"
+ description = "Add a file to the workspace. Should always view the directory tree before this."
+ filename: str
+ file_contents: str
+
+ async def describe(
+ self, models: Models
+ ) -> Coroutine[Any, Any, Coroutine[str, None, None]]:
+ return f"Added a file named `{self.filename}` to the workspace."
+
+ async def run(self, sdk: ContinueSDK):
+ await sdk.add_file(self.filename, self.file_contents)
+
+ await sdk.ide.setFileOpen(
+ os.path.join(sdk.ide.workspace_directory, self.filename)
+ )
+
+
+class DeleteFileStep(Step):
+ name: str = "Delete File"
+ description = "Delete a file from the workspace."
+ filename: str
+
+ async def describe(
+ self, models: Models
+ ) -> Coroutine[Any, Any, Coroutine[str, None, None]]:
+ return f"Deleted a file named `{self.filename}` from the workspace."
+
+ async def run(self, sdk: ContinueSDK):
+ await sdk.delete_file(self.filename)
+
+
+class AddDirectoryStep(Step):
+ name: str = "Add Directory"
+ description = "Add a directory to the workspace."
+ directory_name: str
+
+ async def describe(
+ self, models: Models
+ ) -> Coroutine[Any, Any, Coroutine[str, None, None]]:
+ return f"Added a directory named `{self.directory_name}` to the workspace."
+
+ async def run(self, sdk: ContinueSDK):
+ try:
+ await sdk.add_directory(self.directory_name)
+ except FileExistsError:
+ self.description = f"Directory {self.directory_name} already exists."
+
+
+class RunTerminalCommandStep(Step):
+ name: str = "Run Terminal Command"
+ description: str = "Run a terminal command."
+ command: str
+
+ async def run(self, sdk: ContinueSDK):
+ self.description = f"Copy this command and run in your terminal:\n\n```bash\n{self.command}\n```"
+
+
+class ViewDirectoryTreeStep(Step):
+ name: str = "View Directory Tree"
+ description: str = "View the directory tree to learn which folder and files exist. You should always do this before adding new files."
+
+ async def describe(
+ self, models: Models
+ ) -> Coroutine[Any, Any, Coroutine[str, None, None]]:
+ return "Viewed the directory tree."
+
+ async def run(self, sdk: ContinueSDK):
+ self.description = (
+ f"```\n{display_tree(sdk.ide.workspace_directory, True, max_depth=2)}\n```"
+ )
+
+
+class EditFileStep(Step):
+ name: str = "Edit File"
+ description: str = "Edit a file in the workspace that is not currently open."
+ filename: str = Field(..., description="The name of the file to edit.")
+ instructions: str = Field(..., description="The instructions to edit the file.")
+ hide: bool = True
+
+ async def run(self, sdk: ContinueSDK):
+ await sdk.edit_file(self.filename, self.instructions)
+
+
+class ChatWithFunctions(Step):
+ user_input: str
+ functions: List[Step] = [
+ AddFileStep(filename="", file_contents=""),
+ EditFileStep(filename="", instructions=""),
+ EditHighlightedCodeStep(user_input=""),
+ ViewDirectoryTreeStep(),
+ AddDirectoryStep(directory_name=""),
+ DeleteFileStep(filename=""),
+ RunTerminalCommandStep(command=""),
+ ]
+ name: str = "Input"
+ manage_own_chat_context: bool = True
+ description: str = ""
+ hide: bool = True
+
+ async def run(self, sdk: ContinueSDK):
+ await sdk.update_ui()
+
+ step_name_step_class_map = {
+ step.name.replace(" ", ""): step.__class__ for step in self.functions
+ }
+
+ functions = [step_to_json_schema(function) for function in self.functions]
+
+ self.chat_context.append(
+ ChatMessage(role="user", content=self.user_input, summary=self.user_input)
+ )
+
+ last_function_called_name = None
+ last_function_called_params = None
+ while True:
+ was_function_called = False
+ func_args = ""
+ func_name = ""
+ msg_content = ""
+ msg_step = None
+
+ gpt350613 = OpenAI(model="gpt-3.5-turbo-0613")
+ await sdk.start_model(gpt350613)
+
+ async for msg_chunk in gpt350613.stream_chat(
+ await sdk.get_chat_context(), functions=functions
+ ):
+ if sdk.current_step_was_deleted():
+ return
+
+ if "content" in msg_chunk and msg_chunk["content"] is not None:
+ msg_content += msg_chunk["content"]
+ # if last_function_called_index_in_history is not None:
+ # while sdk.history.timeline[last_function_called_index].step.hide:
+ # last_function_called_index += 1
+ # sdk.history.timeline[last_function_called_index_in_history].step.description = msg_content
+ if msg_step is None:
+ msg_step = MessageStep(
+ name="Chat", message=msg_chunk["content"]
+ )
+ await sdk.run_step(msg_step)
+ else:
+ msg_step.description = msg_content
+ await sdk.update_ui()
+ elif "function_call" in msg_chunk or func_name != "":
+ was_function_called = True
+ if "function_call" in msg_chunk:
+ if "arguments" in msg_chunk["function_call"]:
+ func_args += msg_chunk["function_call"]["arguments"]
+ if "name" in msg_chunk["function_call"]:
+ func_name += msg_chunk["function_call"]["name"]
+
+ if not was_function_called:
+ self.chat_context.append(
+ ChatMessage(
+ role="assistant", content=msg_content, summary=msg_content
+ )
+ )
+ break
+ else:
+ if func_name == "python" and "python" not in step_name_step_class_map:
+ # GPT must be fine-tuned to believe this exists, but it doesn't always
+ func_name = "EditHighlightedCodeStep"
+ func_args = json.dumps({"user_input": self.user_input})
+ # self.chat_context.append(ChatMessage(
+ # role="assistant",
+ # content=None,
+ # function_call=FunctionCall(
+ # name=func_name,
+ # arguments=func_args
+ # ),
+ # summary=f"Called function {func_name}"
+ # ))
+ # self.chat_context.append(ChatMessage(
+ # role="user",
+ # content="The 'python' function does not exist. Don't call it. Try again to call another function.",
+ # summary="'python' function does not exist."
+ # ))
+ # msg_step.hide = True
+ # continue
+ # Call the function, then continue to chat
+ func_args = "{}" if func_args == "" else func_args
+ try:
+ fn_call_params = json.loads(func_args)
+ except json.JSONDecodeError:
+ raise Exception("The model returned invalid JSON. Please try again")
+ self.chat_context.append(
+ ChatMessage(
+ role="assistant",
+ content=None,
+ function_call=FunctionCall(name=func_name, arguments=func_args),
+ summary=f"Called function {func_name}",
+ )
+ )
+ sdk.history.current_index + 1
+ if func_name not in step_name_step_class_map:
+ raise Exception(
+ f"The model tried to call a function ({func_name}) that does not exist. Please try again."
+ )
+
+ # if func_name == "AddFileStep":
+ # step_to_run.hide = True
+ # self.description += f"\nAdded file `{func_args['filename']}`"
+ # elif func_name == "AddDirectoryStep":
+ # step_to_run.hide = True
+ # self.description += f"\nAdded directory `{func_args['directory_name']}`"
+ # else:
+ # self.description += f"\n`Running function {func_name}`\n\n"
+ if func_name == "EditHighlightedCodeStep":
+ fn_call_params["user_input"] = self.user_input
+ elif func_name == "EditFile":
+ fn_call_params["instructions"] = self.user_input
+
+ step_to_run = step_name_step_class_map[func_name](**fn_call_params)
+ if (
+ last_function_called_name is not None
+ and last_function_called_name == func_name
+ and last_function_called_params is not None
+ and last_function_called_params == fn_call_params
+ ):
+ # If it's calling the same function more than once in a row, it's probably looping and confused
+ return
+ last_function_called_name = func_name
+ last_function_called_params = fn_call_params
+
+ await sdk.run_step(step_to_run)
+ await sdk.update_ui()