1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
|
import asyncio
from functools import cached_property
from typing import Coroutine, Union
import os
from ..steps.core.core import DefaultModelEditCodeStep
from ..models.main import Range
from .abstract_sdk import AbstractContinueSDK
from .config import ContinueConfig, load_config, load_global_config, update_global_config
from ..models.filesystem_edit import FileEdit, FileSystemEdit, AddFile, DeleteFile, AddDirectory, DeleteDirectory
from ..models.filesystem import RangeInFile
from ..libs.llm.hf_inference_api import HuggingFaceInferenceAPI
from ..libs.llm.openai import OpenAI
from .observation import Observation
from ..server.ide_protocol import AbstractIdeProtocolServer
from .main import Context, ContinueCustomException, History, Step, ChatMessage, ChatMessageRole
from ..steps.core.core import *
from ..libs.llm.proxy_server import ProxyServer
class Autopilot:
pass
class Models:
def __init__(self, sdk: "ContinueSDK"):
self.sdk = sdk
def __load_openai_model(self, model: str) -> OpenAI:
async def load_openai_model():
api_key = await self.sdk.get_user_secret(
'OPENAI_API_KEY', 'Enter your OpenAI API key or press enter to try for free')
if api_key == "":
return ProxyServer(self.sdk.ide.unique_id, model)
return OpenAI(api_key=api_key, default_model=model)
return asyncio.get_event_loop().run_until_complete(load_openai_model())
@cached_property
def starcoder(self):
async def load_starcoder():
api_key = await self.sdk.get_user_secret(
'HUGGING_FACE_TOKEN', 'Please add your Hugging Face token to the .env file')
return HuggingFaceInferenceAPI(api_key=api_key)
return asyncio.get_event_loop().run_until_complete(load_starcoder())
@cached_property
def gpt35(self):
return self.__load_openai_model("gpt-3.5-turbo")
@cached_property
def gpt350613(self):
return self.__load_openai_model("gpt-3.5-turbo-0613")
@cached_property
def gpt3516k(self):
return self.__load_openai_model("gpt-3.5-turbo-16k")
@cached_property
def gpt4(self):
return self.__load_openai_model("gpt-4")
def __model_from_name(self, model_name: str):
if model_name == "starcoder":
return self.starcoder
elif model_name == "gpt-3.5-turbo":
return self.gpt35
elif model_name == "gpt-3.5-turbo-16k":
return self.gpt3516k
elif model_name == "gpt-4":
return self.gpt4
else:
raise Exception(f"Unknown model {model_name}")
@property
def default(self):
default_model = self.sdk.config.default_model
return self.__model_from_name(default_model) if default_model is not None else self.gpt35
class ContinueSDK(AbstractContinueSDK):
"""The SDK provided as parameters to a step"""
ide: AbstractIdeProtocolServer
models: Models
context: Context
__autopilot: Autopilot
def __init__(self, autopilot: Autopilot):
self.ide = autopilot.ide
self.__autopilot = autopilot
self.models = Models(self)
self.context = autopilot.context
@property
def history(self) -> History:
return self.__autopilot.history
async def _ensure_absolute_path(self, path: str) -> str:
if os.path.isabs(path):
return path
# Else if in workspace
workspace_path = os.path.join(self.ide.workspace_directory, path)
if os.path.exists(workspace_path):
return workspace_path
else:
# Check if it matches any of the open files, then use that absolute path
open_files = await self.ide.getOpenFiles()
for open_file in open_files:
if os.path.basename(open_file) == os.path.basename(path):
return open_file
raise Exception(f"Path {path} does not exist")
async def run_step(self, step: Step) -> Coroutine[Observation, None, None]:
return await self.__autopilot._run_singular_step(step)
async def apply_filesystem_edit(self, edit: FileSystemEdit, name: str = None, description: str = None):
return await self.run_step(FileSystemEditStep(edit=edit, description=description, **({'name': name} if name else {})))
async def wait_for_user_input(self) -> str:
return await self.__autopilot.wait_for_user_input()
async def wait_for_user_confirmation(self, prompt: str):
return await self.run_step(WaitForUserConfirmationStep(prompt=prompt))
async def run(self, commands: Union[List[str], str], cwd: str = None, name: str = None, description: str = None, handle_error: bool = True) -> Coroutine[str, None, None]:
commands = commands if isinstance(commands, List) else [commands]
return (await self.run_step(ShellCommandsStep(cmds=commands, cwd=cwd, description=description, handle_error=handle_error, **({'name': name} if name else {})))).text
async def edit_file(self, filename: str, prompt: str, name: str = None, description: str = "", range: Range = None):
filepath = await self._ensure_absolute_path(filename)
await self.ide.setFileOpen(filepath)
contents = await self.ide.readFile(filepath)
await self.run_step(DefaultModelEditCodeStep(
range_in_files=[RangeInFile(filepath=filepath, range=range) if range is not None else RangeInFile.from_entire_file(
filepath, contents)],
user_input=prompt,
description=description,
**({'name': name} if name else {})
))
async def append_to_file(self, filename: str, content: str):
filepath = await self._ensure_absolute_path(filename)
previous_content = await self.ide.readFile(filepath)
file_edit = FileEdit.from_append(filepath, previous_content, content)
await self.ide.applyFileSystemEdit(file_edit)
async def add_file(self, filename: str, content: Union[str, None]):
filepath = await self._ensure_absolute_path(filename)
dir_name = os.path.dirname(filepath)
os.makedirs(dir_name, exist_ok=True)
return await self.run_step(FileSystemEditStep(edit=AddFile(filepath=filepath, content=content)))
async def delete_file(self, filename: str):
filename = await self._ensure_absolute_path(filename)
return await self.run_step(FileSystemEditStep(edit=DeleteFile(filepath=filename)))
async def add_directory(self, path: str):
path = await self._ensure_absolute_path(path)
return await self.run_step(FileSystemEditStep(edit=AddDirectory(path=path)))
async def delete_directory(self, path: str):
path = await self._ensure_absolute_path(path)
return await self.run_step(FileSystemEditStep(edit=DeleteDirectory(path=path)))
async def get_user_secret(self, env_var: str, prompt: str) -> str:
return await self.ide.getUserSecret(env_var)
@property
def config(self) -> ContinueConfig:
dir = self.ide.workspace_directory
yaml_path = os.path.join(dir, '.continue', 'config.yaml')
json_path = os.path.join(dir, '.continue', 'config.json')
if os.path.exists(yaml_path):
return load_config(yaml_path)
elif os.path.exists(json_path):
return load_config(json_path)
else:
return load_global_config()
def update_default_model(self, model: str):
config = self.config
config.default_model = model
update_global_config(config)
def set_loading_message(self, message: str):
# self.__autopilot.set_loading_message(message)
raise NotImplementedError()
def raise_exception(self, message: str, title: str, with_step: Union[Step, None] = None):
raise ContinueCustomException(message, title, with_step)
async def get_chat_context(self) -> List[ChatMessage]:
history_context = self.history.to_chat_history()
highlighted_code = [
hr.range for hr in self.__autopilot._highlighted_ranges]
preface = "The following code is highlighted"
if len(highlighted_code) == 0:
preface = "The following file is open"
# Get the full contents of all open files
files = await self.ide.getOpenFiles()
if len(files) > 0:
content = await self.ide.readFile(files[0])
highlighted_code = [
RangeInFileWithContents.from_entire_file(files[0], content)]
for rif in highlighted_code:
msg = ChatMessage(content=f"{preface} ({rif.filepath}):\n```\n{rif.contents}\n```",
role="system", summary=f"{preface}: {rif.filepath}")
# Don't insert after latest user message or function call
i = -1
if len(history_context) > 0 and (history_context[i].role == "user" or history_context[i].role == "function"):
i -= 1
history_context.insert(i, msg)
return history_context
async def update_ui(self):
await self.__autopilot.update_subscribers()
async def clear_history(self):
await self.__autopilot.clear_history()
def current_step_was_deleted(self):
return self.history.timeline[self.history.current_index].deleted
|