summaryrefslogtreecommitdiff
path: root/server/continuedev/core/steps.py
diff options
context:
space:
mode:
Diffstat (limited to 'server/continuedev/core/steps.py')
-rw-r--r--server/continuedev/core/steps.py963
1 files changed, 963 insertions, 0 deletions
diff --git a/server/continuedev/core/steps.py b/server/continuedev/core/steps.py
new file mode 100644
index 00000000..5c20dd15
--- /dev/null
+++ b/server/continuedev/core/steps.py
@@ -0,0 +1,963 @@
+# These steps are depended upon by ContinueSDK
+import difflib
+import subprocess
+from textwrap import dedent
+from typing import Coroutine, List, Optional, Union
+
+from ..libs.llm.base import LLM
+from ..libs.llm.openai_free_trial import OpenAIFreeTrial
+from ..libs.util.count_tokens import DEFAULT_MAX_TOKENS
+from ..libs.util.devdata import dev_data_logger
+from ..libs.util.strings import (
+ dedent_and_get_common_whitespace,
+ remove_quotes_and_escapes,
+)
+from ..libs.util.telemetry import posthog_logger
+from ..libs.util.templating import render_prompt_template
+from ..models.filesystem import FileSystem, RangeInFile, RangeInFileWithContents
+from ..models.filesystem_edit import (
+ EditDiff,
+ FileEdit,
+ FileEditWithFullContents,
+ FileSystemEdit,
+)
+
+# from ....libs.llm.replicate import ReplicateLLM
+from ..models.main import Range
+from .main import ChatMessage, ContinueCustomException, Step
+from .observation import Observation, TextObservation, UserInputObservation
+
+
+class ContinueSDK:
+ pass
+
+
+class Models:
+ pass
+
+
+class ReversibleStep(Step):
+ async def reverse(self, sdk: ContinueSDK):
+ raise NotImplementedError
+
+
+class MessageStep(Step):
+ name: str = "Message"
+ message: str
+
+ async def describe(self, models: Models) -> Coroutine[str, None, None]:
+ return self.message
+
+ async def run(self, sdk: ContinueSDK) -> Coroutine[Observation, None, None]:
+ return TextObservation(text=self.message)
+
+
+class DisplayErrorStep(Step):
+ name: str = "Error in the Continue server"
+
+ title: str = "Error in the Continue server"
+ message: str = "There was an error in the Continue server."
+
+ @staticmethod
+ def from_exception(e: Exception) -> "DisplayErrorStep":
+ if isinstance(e, ContinueCustomException):
+ return DisplayErrorStep(title=e.title, message=e.message, name=e.title)
+
+ return DisplayErrorStep(message=str(e))
+
+ class Config:
+ arbitrary_types_allowed = True
+
+ async def describe(self, models: Models) -> Coroutine[str, None, None]:
+ return self.message
+
+ async def run(self, sdk: ContinueSDK) -> Coroutine[Observation, None, None]:
+ raise ContinueCustomException(message=self.message, title=self.title)
+
+
+class FileSystemEditStep(ReversibleStep):
+ edit: FileSystemEdit
+ _diff: Union[EditDiff, None] = None
+
+ hide: bool = True
+
+ async def run(self, sdk: "ContinueSDK") -> Coroutine[Observation, None, None]:
+ self._diff = await sdk.ide.applyFileSystemEdit(self.edit)
+ return None
+
+ async def reverse(self, sdk: "ContinueSDK"):
+ await sdk.ide.applyFileSystemEdit(self._diff.backward)
+ # Where and when should file saves happen?
+
+
+def output_contains_error(output: str) -> bool:
+ return "Traceback" in output or "SyntaxError" in output
+
+
+AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)"
+
+
+class ShellCommandsStep(Step):
+ cmds: List[str]
+ cwd: Union[str, None] = None
+ name: str = "Run Shell Commands"
+ handle_error: bool = True
+
+ _err_text: Union[str, None] = None
+
+ async def describe(self, models: Models) -> Coroutine[str, None, None]:
+ if self._err_text is not None:
+ return f"Error when running shell commands:\n```\n{self._err_text}\n```"
+
+ cmds_str = "\n".join(self.cmds)
+ return await models.summarize.complete(
+ f"{cmds_str}\n\nSummarize what was done in these shell commands, using markdown bullet points:"
+ )
+
+ async def run(self, sdk: ContinueSDK) -> Coroutine[Observation, None, None]:
+ process = subprocess.Popen(
+ "/bin/bash",
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ cwd=self.cwd or sdk.ide.workspace_directory,
+ )
+
+ stdin_input = "\n".join(self.cmds)
+ out, err = process.communicate(stdin_input.encode())
+
+ # If it fails, return the error
+ if err is not None and err != "":
+ self._err_text = err
+ return TextObservation(text=err)
+
+ return None
+
+
+class DefaultModelEditCodeStep(Step):
+ user_input: str
+ model: Optional[LLM] = None
+ range_in_files: List[RangeInFile]
+ name: str = "Editing Code"
+ hide = False
+ description: str = ""
+ _prompt: str = dedent(
+ """\
+ Take the file prefix and suffix into account, but only rewrite the code_to_edit as specified in the user_request. The code you write in modified_code_to_edit will replace the code between the code_to_edit tags. Do NOT preface your answer or write anything other than code. The </modified_code_to_edit> tag should be written to indicate the end of the modified code section. Do not ever use nested tags.
+
+ Example:
+
+ <file_prefix>
+ class Database:
+ def __init__(self):
+ self._data = {{}}
+
+ def get(self, key):
+ return self._data[key]
+
+ </file_prefix>
+ <code_to_edit>
+ def set(self, key, value):
+ self._data[key] = value
+ </code_to_edit>
+ <file_suffix>
+
+ def clear_all():
+ self._data = {{}}
+ </file_suffix>
+ <user_request>
+ Raise an error if the key already exists.
+ </user_request>
+ <modified_code_to_edit>
+ def set(self, key, value):
+ if key in self._data:
+ raise KeyError(f"Key {{key}} already exists")
+ self._data[key] = value
+ </modified_code_to_edit>
+
+ Main task:
+ """
+ )
+ _previous_contents: str = ""
+ _new_contents: str = ""
+ _prompt_and_completion: str = ""
+
+ summary_prompt: str = "Please briefly explain the changes made to the code above. Give no more than 2-3 sentences, and use markdown bullet points:"
+
+ async def describe(self, models: Models) -> Coroutine[str, None, None]:
+ name = await models.summarize.complete(
+ f"Write a very short title to describe this requested change (no quotes): '{self.user_input}'. This is the title:"
+ )
+ self.name = remove_quotes_and_escapes(name)
+
+ if self._previous_contents.strip() == self._new_contents.strip():
+ return "No edits were made"
+ else:
+ return None
+
+ async def get_prompt_parts(
+ self, rif: RangeInFileWithContents, sdk: ContinueSDK, full_file_contents: str
+ ):
+ # We don't know here all of the functions being passed in.
+ # We care because if this prompt itself goes over the limit, then the entire message will have to be cut from the completion.
+ # Overflow won't happen, but prune_chat_messages in count_tokens.py will cut out this whole thing, instead of us cutting out only as many lines as we need.
+ if self.model is not None:
+ await sdk.start_model(self.model)
+
+ model_to_use = self.model or sdk.models.edit
+ max_tokens = int(model_to_use.context_length / 2)
+
+ TOKENS_TO_BE_CONSIDERED_LARGE_RANGE = 1200
+ if (
+ model_to_use.count_tokens(rif.contents)
+ > TOKENS_TO_BE_CONSIDERED_LARGE_RANGE
+ ):
+ self.description += "\n\n**It looks like you've selected a large range to edit, which may take a while to complete. If you'd like to cancel, click the 'X' button above. If you highlight a more specific range, Continue will only edit within it.**"
+
+ # At this point, we also increase the max_tokens parameter so it doesn't stop in the middle of generation
+ # Increase max_tokens to be double the size of the range
+ # But don't exceed twice default max tokens
+ max_tokens = int(
+ min(model_to_use.count_tokens(rif.contents), DEFAULT_MAX_TOKENS) * 2.5
+ )
+
+ BUFFER_FOR_FUNCTIONS = 400
+ total_tokens = (
+ model_to_use.count_tokens(
+ full_file_contents + self._prompt + self.user_input
+ )
+ + BUFFER_FOR_FUNCTIONS
+ + max_tokens
+ )
+
+ # If using 3.5 and overflows, upgrade to 3.5.16k
+ if model_to_use.model == "gpt-3.5-turbo":
+ if total_tokens > model_to_use.context_length:
+ model_to_use = OpenAIFreeTrial(model="gpt-3.5-turbo-0613")
+ await sdk.start_model(model_to_use)
+
+ # Remove tokens from the end first, and then the start to clear space
+ # This part finds the start and end lines
+ full_file_contents_lst = full_file_contents.split("\n")
+ max_start_line = rif.range.start.line
+ min_end_line = rif.range.end.line
+ cur_start_line = 0
+ cur_end_line = len(full_file_contents_lst) - 1
+
+ if total_tokens > model_to_use.context_length:
+ while cur_end_line > min_end_line:
+ total_tokens -= model_to_use.count_tokens(
+ full_file_contents_lst[cur_end_line]
+ )
+ cur_end_line -= 1
+ if total_tokens < model_to_use.context_length:
+ break
+
+ if total_tokens > model_to_use.context_length:
+ while cur_start_line < max_start_line:
+ cur_start_line += 1
+ total_tokens -= model_to_use.count_tokens(
+ full_file_contents_lst[cur_start_line]
+ )
+ if total_tokens < model_to_use.context_length:
+ break
+
+ # Now use the found start/end lines to get the prefix and suffix strings
+ file_prefix = "\n".join(full_file_contents_lst[cur_start_line:max_start_line])
+ file_suffix = "\n".join(full_file_contents_lst[min_end_line : cur_end_line - 1])
+
+ # Move any surrounding blank line in rif.contents to the prefix/suffix
+ # TODO: Keep track of start line of the range, because it's needed below for offset stuff
+ if len(rif.contents) > 0:
+ lines = rif.contents.splitlines(keepends=True)
+ first_line = lines[0] if lines else None
+ while first_line and first_line.strip() == "":
+ file_prefix += first_line
+ rif.contents = rif.contents[len(first_line) :]
+ lines = rif.contents.splitlines(keepends=True)
+ first_line = lines[0] if lines else None
+
+ last_line = lines[-1] if lines else None
+ while last_line and last_line.strip() == "":
+ file_suffix = last_line + file_suffix
+ rif.contents = rif.contents[: len(rif.contents) - len(last_line)]
+ lines = rif.contents.splitlines(keepends=True)
+ last_line = lines[-1] if lines else None
+
+ while rif.contents.startswith("\n"):
+ file_prefix += "\n"
+ rif.contents = rif.contents[1:]
+ while rif.contents.endswith("\n"):
+ file_suffix = "\n" + file_suffix
+ rif.contents = rif.contents[:-1]
+
+ return file_prefix, rif.contents, file_suffix, model_to_use, max_tokens
+
+ def compile_prompt(
+ self, file_prefix: str, contents: str, file_suffix: str, sdk: ContinueSDK
+ ) -> str:
+ if contents.strip() == "":
+ # Separate prompt for insertion at the cursor, the other tends to cause it to repeat whole file
+ prompt = dedent(
+ f"""\
+<file_prefix>
+{file_prefix}
+</file_prefix>
+<insertion_code_here>
+<file_suffix>
+{file_suffix}
+</file_suffix>
+<user_request>
+{self.user_input}
+</user_request>
+
+Please output the code to be inserted at the cursor in order to fulfill the user_request. Do NOT preface your answer or write anything other than code. You should not write any tags, just the code. Make sure to correctly indent the code:"""
+ )
+ return prompt
+
+ prompt = self._prompt
+ if file_prefix.strip() != "":
+ prompt += dedent(
+ f"""
+<file_prefix>
+{file_prefix}
+</file_prefix>"""
+ )
+ prompt += dedent(
+ f"""
+<code_to_edit>
+{contents}
+</code_to_edit>"""
+ )
+ if file_suffix.strip() != "":
+ prompt += dedent(
+ f"""
+<file_suffix>
+{file_suffix}
+</file_suffix>"""
+ )
+ prompt += dedent(
+ f"""
+<user_request>
+{self.user_input}
+</user_request>
+<modified_code_to_edit>
+"""
+ )
+
+ return prompt
+
+ def is_end_line(self, line: str) -> bool:
+ return "</modified_code_to_edit>" in line or "</code_to_edit>" in line
+
+ def line_to_be_ignored(self, line: str, is_first_line: bool = False) -> bool:
+ return (
+ "```" in line
+ or "<modified_code_to_edit>" in line
+ or "<file_prefix>" in line
+ or "</file_prefix>" in line
+ or "<file_suffix>" in line
+ or "</file_suffix>" in line
+ or "<user_request>" in line
+ or "</user_request>" in line
+ or "<code_to_edit>" in line
+ )
+
+ async def stream_rif(self, rif: RangeInFileWithContents, sdk: ContinueSDK):
+ await sdk.ide.saveFile(rif.filepath)
+ full_file_contents = await sdk.ide.readFile(rif.filepath)
+
+ (
+ file_prefix,
+ contents,
+ file_suffix,
+ model_to_use,
+ max_tokens,
+ ) = await self.get_prompt_parts(rif, sdk, full_file_contents)
+ contents, common_whitespace = dedent_and_get_common_whitespace(contents)
+ prompt = self.compile_prompt(file_prefix, contents, file_suffix, sdk)
+ full_file_contents_lines = full_file_contents.split("\n")
+
+ lines_to_display = []
+
+ async def sendDiffUpdate(
+ lines: List[str], sdk: ContinueSDK, final: bool = False
+ ):
+ nonlocal full_file_contents_lines, rif, lines_to_display
+
+ completion = "\n".join(lines)
+
+ full_prefix_lines = full_file_contents_lines[: rif.range.start.line]
+ full_suffix_lines = full_file_contents_lines[rif.range.end.line :]
+
+ # Don't do this at the very end, just show the inserted code
+ if final:
+ lines_to_display = []
+ # Only recalculate at every new-line, because this is sort of expensive
+ elif completion.endswith("\n"):
+ contents_lines = rif.contents.split("\n")
+ rewritten_lines = 0
+ for line in lines:
+ for i in range(rewritten_lines, len(contents_lines)):
+ if (
+ difflib.SequenceMatcher(
+ None, line, contents_lines[i]
+ ).ratio()
+ > 0.7
+ and contents_lines[i].strip() != ""
+ ):
+ rewritten_lines = i + 1
+ break
+ lines_to_display = contents_lines[rewritten_lines:]
+
+ new_file_contents = (
+ "\n".join(full_prefix_lines)
+ + "\n"
+ + completion
+ + "\n"
+ + (
+ "\n".join(lines_to_display) + "\n"
+ if len(lines_to_display) > 0
+ else ""
+ )
+ + "\n".join(full_suffix_lines)
+ )
+
+ step_index = sdk.history.current_index
+
+ await sdk.ide.showDiff(rif.filepath, new_file_contents, step_index)
+
+ # Important state variables
+ # -------------------------
+ original_lines = [] if rif.contents == "" else rif.contents.split("\n")
+ # In the actual file, taking into account block offset
+ current_line_in_file = rif.range.start.line
+ current_block_lines = []
+ original_lines_below_previous_blocks = original_lines
+ # The start of the current block in file, taking into account block offset
+ current_block_start = -1
+ offset_from_blocks = 0
+
+ # Don't end the block until you've matched N simultaneous lines
+ # This helps avoid many tiny blocks
+ LINES_TO_MATCH_BEFORE_ENDING_BLOCK = 2
+ # If a line has been matched at the end of the block, this is its index within original_lines_below_previous_blocks
+ # Except we are keeping track of multiple potentialities, so it's a list
+ # We always check the lines following each of these leads, but if multiple make it out at the end, we use the first one
+ # This is a tuple of (index_of_last_matched_line, number_of_lines_matched)
+ indices_of_last_matched_lines = []
+
+ async def handle_generated_line(line: str):
+ nonlocal current_block_start, current_line_in_file, original_lines, original_lines_below_previous_blocks, current_block_lines, indices_of_last_matched_lines, LINES_TO_MATCH_BEFORE_ENDING_BLOCK, offset_from_blocks
+
+ # Highlight the line to show progress
+ line_to_highlight = current_line_in_file - len(current_block_lines)
+ if False:
+ await sdk.ide.highlightCode(
+ RangeInFile(
+ filepath=rif.filepath,
+ range=Range.from_shorthand(
+ line_to_highlight, 0, line_to_highlight, 0
+ ),
+ ),
+ "#FFFFFF22" if len(current_block_lines) == 0 else "#00FF0022",
+ )
+
+ if len(current_block_lines) == 0:
+ # Set this as the start of the next block
+ current_block_start = (
+ rif.range.start.line
+ + len(original_lines)
+ - len(original_lines_below_previous_blocks)
+ + offset_from_blocks
+ )
+ if (
+ len(original_lines_below_previous_blocks) > 0
+ and line == original_lines_below_previous_blocks[0]
+ ):
+ # Line is equal to the next line in file, move past this line
+ original_lines_below_previous_blocks = (
+ original_lines_below_previous_blocks[1:]
+ )
+ return
+
+ # In a block, and have already matched at least one line
+ # Check if the next line matches, for each of the candidates
+ matches_found = []
+ first_valid_match = None
+ for (
+ index_of_last_matched_line,
+ num_lines_matched,
+ ) in indices_of_last_matched_lines:
+ if (
+ index_of_last_matched_line + 1
+ < len(original_lines_below_previous_blocks)
+ and line
+ == original_lines_below_previous_blocks[
+ index_of_last_matched_line + 1
+ ]
+ ):
+ matches_found.append(
+ (index_of_last_matched_line + 1, num_lines_matched + 1)
+ )
+ if (
+ first_valid_match is None
+ and num_lines_matched + 1 >= LINES_TO_MATCH_BEFORE_ENDING_BLOCK
+ ):
+ first_valid_match = (
+ index_of_last_matched_line + 1,
+ num_lines_matched + 1,
+ )
+ indices_of_last_matched_lines = matches_found
+
+ if first_valid_match is not None:
+ # We've matched the required number of lines, insert suggestion!
+
+ # We added some lines to the block that were matched (including maybe some blank lines)
+ # So here we will strip all matching lines from the end of current_block_lines
+ lines_stripped = []
+ index_of_last_line_in_block = first_valid_match[0]
+ while (
+ len(current_block_lines) > 0
+ and current_block_lines[-1]
+ == original_lines_below_previous_blocks[
+ index_of_last_line_in_block - 1
+ ]
+ ):
+ lines_stripped.append(current_block_lines.pop())
+ index_of_last_line_in_block -= 1
+
+ # It's also possible that some lines match at the beginning of the block
+ # lines_stripped_at_beginning = []
+ # j = 0
+ # while len(current_block_lines) > 0 and current_block_lines[0] == original_lines_below_previous_blocks[first_valid_match[0] - first_valid_match[1] + j]:
+ # lines_stripped_at_beginning.append(
+ # current_block_lines.pop(0))
+ # j += 1
+ # # current_block_start += 1
+
+ # Insert the suggestion
+ replacement = "\n".join(current_block_lines)
+ start_line = current_block_start
+ end_line = current_block_start + index_of_last_line_in_block
+
+ if False:
+ await sdk.ide.showSuggestion(
+ FileEdit(
+ filepath=rif.filepath,
+ range=Range.from_shorthand(start_line, 0, end_line, 0),
+ replacement=replacement,
+ )
+ )
+
+ # Reset current block / update variables
+ current_line_in_file += 1
+ offset_from_blocks += len(current_block_lines)
+ original_lines_below_previous_blocks = (
+ original_lines_below_previous_blocks[
+ index_of_last_line_in_block + 1 :
+ ]
+ )
+ current_block_lines = []
+ current_block_start = -1
+ indices_of_last_matched_lines = []
+
+ return
+
+ # Always look for new matching candidates
+ new_matches = []
+ for i in range(len(original_lines_below_previous_blocks)):
+ og_line = original_lines_below_previous_blocks[i]
+ # TODO: It's a bit sus to be disqualifying empty lines.
+ # What you ideally do is find ALL matches, and then throw them out as you check the following lines
+ if og_line == line: # and og_line.strip() != "":
+ new_matches.append((i, 1))
+ indices_of_last_matched_lines += new_matches
+
+ # Make sure they are sorted by index
+ indices_of_last_matched_lines = sorted(
+ indices_of_last_matched_lines, key=lambda x: x[0]
+ )
+
+ current_block_lines.append(line)
+
+ messages = await sdk.get_chat_context()
+ # Delete the last user and assistant messages
+ i = len(messages) - 1
+ deleted = 0
+ while i >= 0 and deleted < 2:
+ if messages[i].role == "user" or messages[i].role == "assistant":
+ messages.pop(i)
+ deleted += 1
+ i -= 1
+ messages.append(
+ ChatMessage(role="user", content=prompt, summary=self.user_input)
+ )
+
+ lines_of_prefix_copied = 0
+ lines = []
+ unfinished_line = ""
+ completion_lines_covered = 0
+ repeating_file_suffix = False
+ line_below_highlighted_range = file_suffix.lstrip().split("\n")[0]
+
+ # Use custom templates defined by the model
+ if template := model_to_use.prompt_templates.get("edit"):
+ rendered = render_prompt_template(
+ template,
+ messages[:-1],
+ {
+ "code_to_edit": rif.contents,
+ "user_input": self.user_input,
+ "file_prefix": file_prefix,
+ "file_suffix": file_suffix,
+ },
+ )
+ if isinstance(rendered, str):
+ messages = [
+ ChatMessage(
+ role="user",
+ content=rendered,
+ summary=self.user_input,
+ )
+ ]
+ else:
+ messages = rendered
+
+ generator = model_to_use.stream_complete(
+ rendered,
+ temperature=sdk.config.temperature,
+ max_tokens=min(max_tokens, model_to_use.context_length // 2),
+ )
+
+ else:
+
+ async def gen():
+ async for chunk in model_to_use.stream_chat(
+ messages,
+ temperature=sdk.config.temperature,
+ max_tokens=min(max_tokens, model_to_use.context_length // 2),
+ ):
+ if "content" in chunk:
+ yield chunk["content"]
+
+ generator = gen()
+
+ posthog_logger.capture_event(
+ "model_use",
+ {"model": model_to_use.model, "provider": model_to_use.__class__.__name__},
+ )
+ dev_data_logger.capture(
+ "model_use",
+ {"model": model_to_use.model, "provider": model_to_use.__class__.__name__},
+ )
+
+ try:
+ async for chunk in generator:
+ # Stop early if it is repeating the file_suffix or the step was deleted
+ if repeating_file_suffix:
+ break
+ if sdk.current_step_was_deleted():
+ return
+
+ # Accumulate lines
+ chunk_lines = chunk.split("\n")
+ chunk_lines[0] = unfinished_line + chunk_lines[0]
+ if chunk.endswith("\n"):
+ unfinished_line = ""
+ chunk_lines.pop() # because this will be an empty string
+ else:
+ unfinished_line = chunk_lines.pop()
+
+ # Deal with newly accumulated lines
+ for i in range(len(chunk_lines)):
+ # Trailing whitespace doesn't matter
+ chunk_lines[i] = chunk_lines[i].rstrip()
+ chunk_lines[i] = common_whitespace + chunk_lines[i]
+
+ # Lines that should signify the end of generation
+ if self.is_end_line(chunk_lines[i]):
+ break
+ # Lines that should be ignored, like the <> tags
+ elif self.line_to_be_ignored(
+ chunk_lines[i], completion_lines_covered == 0
+ ):
+ continue # noice
+ # Check if we are currently just copying the prefix
+ elif (
+ (lines_of_prefix_copied > 0 or completion_lines_covered == 0)
+ and lines_of_prefix_copied < len(file_prefix.splitlines())
+ and chunk_lines[i]
+ == full_file_contents_lines[lines_of_prefix_copied]
+ ):
+ # This is a sketchy way of stopping it from repeating the file_prefix. Is a bug if output happens to have a matching line
+ lines_of_prefix_copied += 1
+ continue # also nice
+ # Because really short lines might be expected to be repeated, this is only a !heuristic!
+ # Stop when it starts copying the file_suffix
+ elif (
+ chunk_lines[i].strip() == line_below_highlighted_range.strip()
+ and len(chunk_lines[i].strip()) > 4
+ and not (
+ len(original_lines_below_previous_blocks) > 0
+ and chunk_lines[i].strip()
+ == original_lines_below_previous_blocks[0].strip()
+ )
+ ):
+ repeating_file_suffix = True
+ break
+
+ # If none of the above, insert the line!
+ if False:
+ await handle_generated_line(chunk_lines[i])
+
+ lines.append(chunk_lines[i])
+ completion_lines_covered += 1
+ current_line_in_file += 1
+
+ await sendDiffUpdate(
+ lines
+ + [
+ common_whitespace
+ if unfinished_line.startswith("<")
+ else (common_whitespace + unfinished_line)
+ ],
+ sdk,
+ )
+ finally:
+ await generator.aclose()
+ # Add the unfinished line
+ if (
+ unfinished_line != ""
+ and not self.line_to_be_ignored(
+ unfinished_line, completion_lines_covered == 0
+ )
+ and not self.is_end_line(unfinished_line)
+ ):
+ unfinished_line = common_whitespace + unfinished_line
+ lines.append(unfinished_line)
+ await handle_generated_line(unfinished_line)
+ completion_lines_covered += 1
+ current_line_in_file += 1
+
+ await sendDiffUpdate(lines, sdk, final=True)
+
+ if False:
+ # If the current block isn't empty, add that suggestion
+ if len(current_block_lines) > 0:
+ # We have a chance to back-track here for blank lines that are repeats of the end of the original
+ # Don't want to have the same ending in both the original and the generated, can just leave it there
+ num_to_remove = 0
+ for i in range(-1, -len(current_block_lines) - 1, -1):
+ if len(original_lines_below_previous_blocks) == 0:
+ break
+ if (
+ current_block_lines[i]
+ == original_lines_below_previous_blocks[-1]
+ ):
+ num_to_remove += 1
+ original_lines_below_previous_blocks.pop()
+ else:
+ break
+ current_block_lines = (
+ current_block_lines[:-num_to_remove]
+ if num_to_remove > 0
+ else current_block_lines
+ )
+
+ # It's also possible that some lines match at the beginning of the block
+ # while len(current_block_lines) > 0 and len(original_lines_below_previous_blocks) > 0 and current_block_lines[0] == original_lines_below_previous_blocks[0]:
+ # current_block_lines.pop(0)
+ # original_lines_below_previous_blocks.pop(0)
+ # current_block_start += 1
+
+ await sdk.ide.showSuggestion(
+ FileEdit(
+ filepath=rif.filepath,
+ range=Range.from_shorthand(
+ current_block_start,
+ 0,
+ current_block_start
+ + len(original_lines_below_previous_blocks),
+ 0,
+ ),
+ replacement="\n".join(current_block_lines),
+ )
+ )
+
+ # Record the completion
+ completion = "\n".join(lines)
+ self._previous_contents = "\n".join(original_lines)
+ self._new_contents = completion
+ self._prompt_and_completion += prompt + completion
+
+ async def run(self, sdk: ContinueSDK) -> Coroutine[Observation, None, None]:
+ await sdk.update_ui()
+
+ rif_with_contents = []
+ for range_in_file in map(
+ lambda x: RangeInFile(
+ filepath=x.filepath,
+ # Only consider the range line-by-line. Maybe later don't if it's only a single line.
+ range=x.range.to_full_lines(),
+ ),
+ self.range_in_files,
+ ):
+ file_contents = await sdk.ide.readRangeInFile(range_in_file)
+ rif_with_contents.append(
+ RangeInFileWithContents.from_range_in_file(range_in_file, file_contents)
+ )
+
+ rif_dict = {}
+ for rif in rif_with_contents:
+ rif_dict[rif.filepath] = rif.contents
+
+ for rif in rif_with_contents:
+ await sdk.ide.setSuggestionsLocked(rif.filepath, True)
+ await self.stream_rif(rif, sdk)
+ await sdk.ide.setSuggestionsLocked(rif.filepath, False)
+
+ changes = "\n".join(
+ difflib.ndiff(
+ self._previous_contents.splitlines(),
+ self._new_contents.splitlines(),
+ )
+ )
+
+ if sdk.config.disable_summaries:
+ self.name = ""
+ self.description = f"Edited {len(self.range_in_files)} files"
+ await sdk.update_ui()
+ else:
+ self.name = "Generating summary"
+ self.description = ""
+ async for chunk in sdk.models.summarize.stream_complete(
+ dedent(
+ f"""\
+ Diff summary: "{self.user_input}"
+
+ ```diff
+ {changes}
+ ```
+
+ {self.summary_prompt}"""
+ )
+ ):
+ self.description += chunk
+ await sdk.update_ui()
+
+ sdk.context.set("last_edit_user_input", self.user_input)
+ sdk.context.set("last_edit_diff", changes)
+ sdk.context.set("last_edit_range", self.range_in_files[-1].range)
+
+
+class EditFileStep(Step):
+ filepath: str
+ prompt: str
+ hide: bool = True
+ model: Optional[LLM] = None
+
+ async def describe(self, models: Models) -> Coroutine[str, None, None]:
+ return "Editing file: " + self.filepath
+
+ async def run(self, sdk: ContinueSDK) -> Coroutine[Observation, None, None]:
+ file_contents = await sdk.ide.readFile(self.filepath)
+ await sdk.run_step(
+ DefaultModelEditCodeStep(
+ range_in_files=[
+ RangeInFile.from_entire_file(self.filepath, file_contents)
+ ],
+ user_input=self.prompt,
+ model=self.model,
+ )
+ )
+
+
+class ManualEditStep(ReversibleStep):
+ edit_diff: EditDiff
+ hide: bool = True
+
+ hide: bool = True
+
+ async def describe(self, models: Models) -> Coroutine[str, None, None]:
+ return "Manual edit step"
+ # TODO - only handling FileEdit here, but need all other types of FileSystemEdits
+ # Also requires the merge_file_edit function
+ # return llm.complete(dedent(f"""This code was replaced:
+
+ # {self.edit_diff.backward.replacement}
+
+ # With this code:
+
+ # {self.edit_diff.forward.replacement}
+
+ # Maximally concise summary of changes in bullet points (can use markdown):
+ # """))
+
+ @classmethod
+ def from_sequence(cls, edits: List[FileEditWithFullContents]) -> "ManualEditStep":
+ diffs = []
+ for edit in edits:
+ _, diff = FileSystem.apply_edit_to_str(edit.fileContents, edit.fileEdit)
+ diffs.append(diff)
+ return cls(edit_diff=EditDiff.from_sequence(diffs))
+
+ async def run(self, sdk: ContinueSDK) -> Coroutine[Observation, None, None]:
+ return None
+
+ async def reverse(self, sdk: ContinueSDK):
+ await sdk.ide.applyFileSystemEdit(self.edit_diff.backward)
+
+
+class UserInputStep(Step):
+ user_input: str
+ name: str = "User Input"
+ hide: bool = False
+
+ manage_own_chat_context: bool = True
+
+ async def describe(self, models: Models) -> Coroutine[str, None, None]:
+ if self.description is not None:
+ return self.description
+ return self.user_input
+
+ async def run(
+ self, sdk: ContinueSDK
+ ) -> Coroutine[UserInputObservation, None, None]:
+ self.chat_context.append(
+ ChatMessage(role="user", content=self.user_input, summary=self.user_input)
+ )
+ self.description = self.user_input
+ return UserInputObservation(user_input=self.user_input)
+
+
+class WaitForUserInputStep(Step):
+ prompt: str
+ name: str = "Waiting for user input"
+
+ _description: Union[str, None] = None
+ _response: Union[str, None] = None
+
+ async def describe(self, models: Models) -> Coroutine[str, None, None]:
+ if self._response is None:
+ return self.prompt
+ else:
+ return f"{self.prompt}\n\n`{self._response}`"
+
+ async def run(self, sdk: ContinueSDK) -> Coroutine[Observation, None, None]:
+ self.description = self.prompt
+ resp = await sdk.wait_for_user_input()
+ self.description = f"{self.prompt}\n\n`{resp}`"
+ return TextObservation(text=resp)
+
+
+class WaitForUserConfirmationStep(Step):
+ prompt: str
+ name: str = "Waiting for user confirmation"
+
+ async def describe(self, models: Models) -> Coroutine[str, None, None]:
+ return self.prompt
+
+ async def run(self, sdk: ContinueSDK) -> Coroutine[Observation, None, None]:
+ self.description = self.prompt
+ resp = await sdk.wait_for_user_input()
+ return TextObservation(text=resp)