summaryrefslogtreecommitdiff
path: root/server/continuedev/plugins/policies
diff options
context:
space:
mode:
Diffstat (limited to 'server/continuedev/plugins/policies')
-rw-r--r--server/continuedev/plugins/policies/commit.py77
-rw-r--r--server/continuedev/plugins/policies/default.py85
-rw-r--r--server/continuedev/plugins/policies/headless.py18
3 files changed, 180 insertions, 0 deletions
diff --git a/server/continuedev/plugins/policies/commit.py b/server/continuedev/plugins/policies/commit.py
new file mode 100644
index 00000000..2fa43676
--- /dev/null
+++ b/server/continuedev/plugins/policies/commit.py
@@ -0,0 +1,77 @@
+# An agent that makes a full commit in the background
+# Plans
+# Write code
+# Reviews code
+# Cleans up
+
+# It's important that agents are configurable, because people need to be able to specify
+# which hooks they want to run. Specific linter, run tests, etc.
+# And all of this can be easily specified in the Policy.
+
+
+from textwrap import dedent
+from typing import Literal
+
+from ...core.config import ContinueConfig
+from ...core.main import History, Policy, Step
+from ...core.observation import TextObservation
+from ...core.sdk import ContinueSDK
+
+
+class PlanStep(Step):
+ user_input: str
+
+ _prompt = dedent(
+ """\
+ You were given the following instructions: "{user_input}".
+
+ Create a plan for how you will complete the task.
+
+ Here are relevant files:
+
+ {relevant_files}
+
+ Your plan will include:
+ 1. A high-level description of how you are going to accomplish the task
+ 2. A list of which files you will edit
+ 3. A description of what you will change in each file
+ """
+ )
+
+ async def run(self, sdk: ContinueSDK):
+ plan = await sdk.models.default.complete(
+ self._prompt.format(
+ {"user_input": self.user_input, "relevant_files": "TODO"}
+ )
+ )
+ return TextObservation(text=plan)
+
+
+class WriteCommitStep(Step):
+ async def run(self, sdk: ContinueSDK):
+ pass
+
+
+class ReviewCodeStep(Step):
+ async def run(self, sdk: ContinueSDK):
+ pass
+
+
+class CleanupStep(Step):
+ async def run(self, sdk: ContinueSDK):
+ pass
+
+
+class CommitPolicy(Policy):
+ user_input: str
+
+ current_step: Literal["plan", "write", "review", "cleanup"] = "plan"
+
+ def next(self, config: ContinueConfig, history: History) -> Step:
+ if history.get_current() is None:
+ return (
+ PlanStep(user_input=self.user_input)
+ >> WriteCommitStep()
+ >> ReviewCodeStep()
+ >> CleanupStep()
+ )
diff --git a/server/continuedev/plugins/policies/default.py b/server/continuedev/plugins/policies/default.py
new file mode 100644
index 00000000..574d2a1c
--- /dev/null
+++ b/server/continuedev/plugins/policies/default.py
@@ -0,0 +1,85 @@
+from typing import Type, Union
+
+from ...core.config import ContinueConfig
+from ...core.main import History, Policy, Step
+from ...core.observation import UserInputObservation
+from ..steps.chat import SimpleChatStep
+from ..steps.custom_command import CustomCommandStep
+from ..steps.main import EditHighlightedCodeStep
+from ..steps.steps_on_startup import StepsOnStartupStep
+
+
+def parse_slash_command(inp: str, config: ContinueConfig) -> Union[None, Step]:
+ """
+ Parses a slash command, returning the command name and the rest of the input.
+ """
+ if inp.startswith("/"):
+ command_name = inp.split(" ")[0].strip()
+ after_command = " ".join(inp.split(" ")[1:])
+
+ for slash_command in config.slash_commands:
+ if slash_command.name == command_name[1:]:
+ params = slash_command.params
+ params["user_input"] = after_command
+ try:
+ return slash_command.step(**params)
+ except TypeError as e:
+ raise Exception(
+ f"Incorrect params used for slash command '{command_name}': {e}"
+ )
+ return None
+
+
+def parse_custom_command(inp: str, config: ContinueConfig) -> Union[None, Step]:
+ command_name = inp.split(" ")[0].strip()
+ after_command = " ".join(inp.split(" ")[1:])
+ for custom_cmd in config.custom_commands:
+ if custom_cmd.name == command_name[1:]:
+ slash_command = parse_slash_command(custom_cmd.prompt, config)
+ if slash_command is not None:
+ return slash_command
+ return CustomCommandStep(
+ name=custom_cmd.name,
+ description=custom_cmd.description,
+ prompt=custom_cmd.prompt,
+ user_input=after_command,
+ slash_command=command_name,
+ )
+ return None
+
+
+class DefaultPolicy(Policy):
+ default_step: Type[Step] = SimpleChatStep
+ default_params: dict = {}
+
+ def next(self, config: ContinueConfig, history: History) -> Step:
+ # At the very start, run initial Steps specified in the config
+ if history.get_current() is None:
+ return StepsOnStartupStep()
+
+ observation = history.get_current().observation
+ if observation is not None and isinstance(observation, UserInputObservation):
+ # This could be defined with ObservationTypePolicy. Ergonomics not right though.
+ user_input = observation.user_input
+
+ slash_command = parse_slash_command(user_input, config)
+ if slash_command is not None:
+ if (
+ getattr(slash_command, "user_input", None) is None
+ and history.get_current().step.user_input is not None
+ ):
+ history.get_current().step.user_input = (
+ history.get_current().step.user_input.split()[0]
+ )
+ return slash_command
+
+ custom_command = parse_custom_command(user_input, config)
+ if custom_command is not None:
+ return custom_command
+
+ if user_input.startswith("/edit"):
+ return EditHighlightedCodeStep(user_input=user_input[5:])
+
+ return self.default_step(**self.default_params)
+
+ return None
diff --git a/server/continuedev/plugins/policies/headless.py b/server/continuedev/plugins/policies/headless.py
new file mode 100644
index 00000000..9fa0f3f2
--- /dev/null
+++ b/server/continuedev/plugins/policies/headless.py
@@ -0,0 +1,18 @@
+from ...core.config import ContinueConfig
+from ...core.main import History, Policy, Step
+from ...core.observation import TextObservation
+from ...core.steps import ShellCommandsStep
+from ...plugins.steps.on_traceback import DefaultOnTracebackStep
+
+
+class HeadlessPolicy(Policy):
+ command: str
+
+ def next(self, config: ContinueConfig, history: History) -> Step:
+ if history.get_current() is None:
+ return ShellCommandsStep(cmds=[self.command])
+ observation = history.get_current().observation
+ if isinstance(observation, TextObservation):
+ return DefaultOnTracebackStep(output=observation.text)
+
+ return None