summaryrefslogtreecommitdiff
path: root/continuedev/src/continuedev/core/sdk.py
blob: ed67079903e4758e69ad41da69cd0d0e312dc3ec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import asyncio
from functools import cached_property
from typing import Coroutine, Union
import os

from ..steps.core.core import DefaultModelEditCodeStep
from ..models.main import Range
from .abstract_sdk import AbstractContinueSDK
from .config import ContinueConfig, load_config, load_global_config, update_global_config
from ..models.filesystem_edit import FileEdit, FileSystemEdit, AddFile, DeleteFile, AddDirectory, DeleteDirectory
from ..models.filesystem import RangeInFile
from ..libs.llm.hf_inference_api import HuggingFaceInferenceAPI
from ..libs.llm.openai import OpenAI
from .observation import Observation
from ..server.ide_protocol import AbstractIdeProtocolServer
from .main import Context, ContinueCustomException, History, Step, ChatMessage, ChatMessageRole
from ..steps.core.core import *
from ..libs.llm.proxy_server import ProxyServer


class Autopilot:
    pass


class Models:
    def __init__(self, sdk: "ContinueSDK"):
        self.sdk = sdk

    def __load_openai_model(self, model: str) -> OpenAI:
        async def load_openai_model():
            api_key = await self.sdk.get_user_secret(
                'OPENAI_API_KEY', 'Enter your OpenAI API key or press enter to try for free')
            if api_key == "":
                return ProxyServer(self.sdk.ide.unique_id, model)
            return OpenAI(api_key=api_key, default_model=model)
        return asyncio.get_event_loop().run_until_complete(load_openai_model())

    @cached_property
    def starcoder(self):
        async def load_starcoder():
            api_key = await self.sdk.get_user_secret(
                'HUGGING_FACE_TOKEN', 'Please add your Hugging Face token to the .env file')
            return HuggingFaceInferenceAPI(api_key=api_key)
        return asyncio.get_event_loop().run_until_complete(load_starcoder())

    @cached_property
    def gpt35(self):
        return self.__load_openai_model("gpt-3.5-turbo")

    @cached_property
    def gpt350613(self):
        return self.__load_openai_model("gpt-3.5-turbo-0613")

    @cached_property
    def gpt3516k(self):
        return self.__load_openai_model("gpt-3.5-turbo-16k")

    @cached_property
    def gpt4(self):
        return self.__load_openai_model("gpt-4")

    def __model_from_name(self, model_name: str):
        if model_name == "starcoder":
            return self.starcoder
        elif model_name == "gpt-3.5-turbo":
            return self.gpt35
        elif model_name == "gpt-3.5-turbo-16k":
            return self.gpt3516k
        elif model_name == "gpt-4":
            return self.gpt4
        else:
            raise Exception(f"Unknown model {model_name}")

    @property
    def default(self):
        default_model = self.sdk.config.default_model
        return self.__model_from_name(default_model) if default_model is not None else self.gpt35


class ContinueSDK(AbstractContinueSDK):
    """The SDK provided as parameters to a step"""
    ide: AbstractIdeProtocolServer
    models: Models
    context: Context
    __autopilot: Autopilot

    def __init__(self, autopilot: Autopilot):
        self.ide = autopilot.ide
        self.__autopilot = autopilot
        self.models = Models(self)
        self.context = autopilot.context

    @property
    def history(self) -> History:
        return self.__autopilot.history

    async def _ensure_absolute_path(self, path: str) -> str:
        if os.path.isabs(path):
            return path

        # Else if in workspace
        workspace_path = os.path.join(self.ide.workspace_directory, path)
        if os.path.exists(workspace_path):
            return workspace_path
        else:
            # Check if it matches any of the open files, then use that absolute path
            open_files = await self.ide.getOpenFiles()
            for open_file in open_files:
                if os.path.basename(open_file) == os.path.basename(path):
                    return open_file
            raise Exception(f"Path {path} does not exist")

    async def run_step(self, step: Step) -> Coroutine[Observation, None, None]:
        return await self.__autopilot._run_singular_step(step)

    async def apply_filesystem_edit(self, edit: FileSystemEdit, name: str = None, description: str = None):
        return await self.run_step(FileSystemEditStep(edit=edit, description=description, **({'name': name} if name else {})))

    async def wait_for_user_input(self) -> str:
        return await self.__autopilot.wait_for_user_input()

    async def wait_for_user_confirmation(self, prompt: str):
        return await self.run_step(WaitForUserConfirmationStep(prompt=prompt))

    async def run(self, commands: Union[List[str], str], cwd: str = None, name: str = None, description: str = None, handle_error: bool = True) -> Coroutine[str, None, None]:
        commands = commands if isinstance(commands, List) else [commands]
        return (await self.run_step(ShellCommandsStep(cmds=commands, cwd=cwd, description=description, handle_error=handle_error, **({'name': name} if name else {})))).text

    async def edit_file(self, filename: str, prompt: str, name: str = None, description: str = "", range: Range = None):
        filepath = await self._ensure_absolute_path(filename)

        await self.ide.setFileOpen(filepath)
        contents = await self.ide.readFile(filepath)
        await self.run_step(DefaultModelEditCodeStep(
            range_in_files=[RangeInFile(filepath=filepath, range=range) if range is not None else RangeInFile.from_entire_file(
                filepath, contents)],
            user_input=prompt,
            description=description,
            **({'name': name} if name else {})
        ))

    async def append_to_file(self, filename: str, content: str):
        filepath = await self._ensure_absolute_path(filename)
        previous_content = await self.ide.readFile(filepath)
        file_edit = FileEdit.from_append(filepath, previous_content, content)
        await self.ide.applyFileSystemEdit(file_edit)

    async def add_file(self, filename: str, content: Union[str, None]):
        filepath = await self._ensure_absolute_path(filename)
        dir_name = os.path.dirname(filepath)
        os.makedirs(dir_name, exist_ok=True)
        return await self.run_step(FileSystemEditStep(edit=AddFile(filepath=filepath, content=content)))

    async def delete_file(self, filename: str):
        filename = await self._ensure_absolute_path(filename)
        return await self.run_step(FileSystemEditStep(edit=DeleteFile(filepath=filename)))

    async def add_directory(self, path: str):
        path = await self._ensure_absolute_path(path)
        return await self.run_step(FileSystemEditStep(edit=AddDirectory(path=path)))

    async def delete_directory(self, path: str):
        path = await self._ensure_absolute_path(path)
        return await self.run_step(FileSystemEditStep(edit=DeleteDirectory(path=path)))

    async def get_user_secret(self, env_var: str, prompt: str) -> str:
        return await self.ide.getUserSecret(env_var)

    @property
    def config(self) -> ContinueConfig:
        dir = self.ide.workspace_directory
        yaml_path = os.path.join(dir, '.continue', 'config.yaml')
        json_path = os.path.join(dir, '.continue', 'config.json')
        if os.path.exists(yaml_path):
            return load_config(yaml_path)
        elif os.path.exists(json_path):
            return load_config(json_path)
        else:
            return load_global_config()

    def update_default_model(self, model: str):
        config = self.config
        config.default_model = model
        update_global_config(config)

    def set_loading_message(self, message: str):
        # self.__autopilot.set_loading_message(message)
        raise NotImplementedError()

    def raise_exception(self, message: str, title: str, with_step: Union[Step, None] = None):
        raise ContinueCustomException(message, title, with_step)

    async def get_chat_context(self) -> List[ChatMessage]:
        history_context = self.history.to_chat_history()
        highlighted_code = [
            hr.range for hr in self.__autopilot._highlighted_ranges]

        preface = "The following code is highlighted"

        if len(highlighted_code) == 0:
            preface = "The following file is open"
            # Get the full contents of all open files
            files = await self.ide.getOpenFiles()
            if len(files) > 0:
                content = await self.ide.readFile(files[0])
                highlighted_code = [
                    RangeInFileWithContents.from_entire_file(files[0], content)]

        for rif in highlighted_code:
            msg = ChatMessage(content=f"{preface} ({rif.filepath}):\n```\n{rif.contents}\n```",
                              role="system", summary=f"{preface}: {rif.filepath}")

            # Don't insert after latest user message or function call
            i = -1
            if len(history_context) > 0 and (history_context[i].role == "user" or history_context[i].role == "function"):
                i -= 1
            history_context.insert(i, msg)

        return history_context

    async def update_ui(self):
        await self.__autopilot.update_subscribers()

    async def clear_history(self):
        await self.__autopilot.clear_history()

    def current_step_was_deleted(self):
        return self.history.timeline[self.history.current_index].deleted