# This is a separate server from server/main.py import asyncio from functools import cached_property import json import os from typing import Any, Dict, List, Type, TypeVar, Union import uuid from fastapi import WebSocket, Body, APIRouter from uvicorn.main import Server from ..libs.util.queue import AsyncSubscriptionQueue from ..models.filesystem import FileSystem, RangeInFile, EditDiff, RealFileSystem from ..models.main import Traceback from ..models.filesystem_edit import AddDirectory, AddFile, DeleteDirectory, DeleteFile, FileSystemEdit, FileEdit, FileEditWithFullContents, RenameDirectory, RenameFile, SequentialFileSystemEdit from pydantic import BaseModel from .gui import SessionManager, session_manager from .ide_protocol import AbstractIdeProtocolServer router = APIRouter(prefix="/ide", tags=["ide"]) # Graceful shutdown by closing websockets original_handler = Server.handle_exit class AppStatus: should_exit = False @staticmethod def handle_exit(*args, **kwargs): AppStatus.should_exit = True print("Shutting down") original_handler(*args, **kwargs) Server.handle_exit = AppStatus.handle_exit # TYPES # class FileEditsUpdate(BaseModel): fileEdits: List[FileEditWithFullContents] class OpenFilesResponse(BaseModel): openFiles: List[str] class HighlightedCodeResponse(BaseModel): highlightedCode: List[RangeInFile] class ShowSuggestionRequest(BaseModel): suggestion: FileEdit class ShowSuggestionResponse(BaseModel): suggestion: FileEdit accepted: bool class ReadFileResponse(BaseModel): contents: str class EditFileResponse(BaseModel): fileEdit: FileEditWithFullContents class WorkspaceDirectoryResponse(BaseModel): workspaceDirectory: str class GetUserSecretResponse(BaseModel): value: str class RunCommandResponse(BaseModel): output: str T = TypeVar("T", bound=BaseModel) class IdeProtocolServer(AbstractIdeProtocolServer): websocket: WebSocket session_manager: SessionManager sub_queue: AsyncSubscriptionQueue = AsyncSubscriptionQueue() def __init__(self, session_manager: SessionManager): self.session_manager = session_manager async def _send_json(self, message_type: str, data: Any): await self.websocket.send_json({ "messageType": message_type, "data": data }) async def _receive_json(self, message_type: str) -> Any: return await self.sub_queue.get(message_type) async def _send_and_receive_json(self, data: Any, resp_model: Type[T], message_type: str) -> T: await self._send_json(message_type, data) resp = await self._receive_json(message_type) return resp_model.parse_obj(resp) async def handle_json(self, message_type: str, data: Any): if message_type == "openGUI": await self.openGUI() elif message_type == "setFileOpen": await self.setFileOpen(data["filepath"], data["open"]) elif message_type == "fileEdits": fileEdits = list( map(lambda d: FileEditWithFullContents.parse_obj(d), data["fileEdits"])) self.onFileEdits(fileEdits) elif message_type in ["highlightedCode", "openFiles", "readFile", "editFile", "workspaceDirectory", "getUserSecret", "runCommand"]: self.sub_queue.post(message_type, data) else: raise ValueError("Unknown message type", message_type) # ------------------------------- # # Request actions in IDE, doesn't matter which Session def showSuggestion(): pass async def setFileOpen(self, filepath: str, open: bool = True): # Autopilot needs access to this. await self._send_json("setFileOpen", { "filepath": filepath, "open": open }) async def openGUI(self): session_id = self.session_manager.new_session(self) await self._send_json("openGUI", { "sessionId": session_id }) async def highlightCode(self, range_in_file: RangeInFile, color: str = "#00ff0022"): await self._send_json("highlightCode", { "rangeInFile": range_in_file.dict(), "color": color }) async def runCommand(self, command: str) -> str: return (await self._send_and_receive_json({"command": command}, RunCommandResponse, "runCommand")).output async def showSuggestionsAndWait(self, suggestions: List[FileEdit]) -> bool: ids = [str(uuid.uuid4()) for _ in suggestions] for i in range(len(suggestions)): self._send_json("showSuggestion", { "suggestion": suggestions[i], "suggestionId": ids[i] }) responses = await asyncio.gather(*[ self._receive_json(ShowSuggestionResponse) for i in range(len(suggestions)) ]) # WORKING ON THIS FLOW HERE. Fine now to just await for response, instead of doing something fancy with a "waiting" state on the autopilot. # Just need connect the suggestionId to the IDE (and the gui) return any([r.accepted for r in responses]) # ------------------------------- # # Here needs to pass message onto the Autopilot OR Autopilot just subscribes. # This is where you might have triggers: plugins can subscribe to certian events # like file changes, tracebacks, etc... def onAcceptRejectSuggestion(self, suggestionId: str, accepted: bool): pass def onTraceback(self, traceback: Traceback): # Same as below, maybe not every autopilot? for _, session in self.session_manager.sessions.items(): session.autopilot.handle_traceback(traceback) def onFileSystemUpdate(self, update: FileSystemEdit): # Access to Autopilot (so SessionManager) pass def onCloseGUI(self, session_id: str): # Accesss to SessionManager pass def onOpenGUIRequest(self): pass def onFileEdits(self, edits: List[FileEditWithFullContents]): # Send the file edits to ALL autopilots. # Maybe not ideal behavior for _, session in self.session_manager.sessions.items(): session.autopilot.handle_manual_edits(edits) # Request information. Session doesn't matter. async def getOpenFiles(self) -> List[str]: resp = await self._send_and_receive_json({}, OpenFilesResponse, "openFiles") return resp.openFiles async def getWorkspaceDirectory(self) -> str: resp = await self._send_and_receive_json({}, WorkspaceDirectoryResponse, "workspaceDirectory") return resp.workspaceDirectory @cached_property def workspace_directory(self) -> str: return asyncio.run(self.getWorkspaceDirectory()) async def getHighlightedCode(self) -> List[RangeInFile]: resp = await self._send_and_receive_json({}, HighlightedCodeResponse, "highlightedCode") return resp.highlightedCode async def readFile(self, filepath: str) -> str: """Read a file""" resp = await self._send_and_receive_json({ "filepath": filepath }, ReadFileResponse, "readFile") return resp.contents async def getUserSecret(self, key: str) -> str: """Get a user secret""" resp = await self._send_and_receive_json({ "key": key }, GetUserSecretResponse, "getUserSecret") return resp.value async def saveFile(self, filepath: str): """Save a file""" await self._send_json("saveFile", { "filepath": filepath }) async def readRangeInFile(self, range_in_file: RangeInFile) -> str: """Read a range in a file""" full_contents = await self.readFile(range_in_file.filepath) return FileSystem.read_range_in_str(full_contents, range_in_file.range) async def editFile(self, edit: FileEdit) -> FileEditWithFullContents: """Edit a file""" resp = await self._send_and_receive_json({ "edit": edit.dict() }, EditFileResponse, "editFile") return resp.fileEdit async def applyFileSystemEdit(self, edit: FileSystemEdit) -> EditDiff: """Apply a file edit""" backward = None fs = RealFileSystem() if isinstance(edit, FileEdit): file_edit = await self.editFile(edit) _, diff = FileSystem.apply_edit_to_str( file_edit.fileContents, file_edit.fileEdit) backward = diff.backward elif isinstance(edit, AddFile): fs.write(edit.filepath, edit.content) backward = DeleteFile(filepath=edit.filepath) elif isinstance(edit, DeleteFile): contents = await self.readFile(edit.filepath) backward = AddFile(filepath=edit.filepath, content=contents) fs.delete_file(edit.filepath) elif isinstance(edit, RenameFile): fs.rename_file(edit.filepath, edit.new_filepath) backward = RenameFile(filepath=edit.new_filepath, new_filepath=edit.filepath) elif isinstance(edit, AddDirectory): fs.add_directory(edit.path) backward = DeleteDirectory(path=edit.path) elif isinstance(edit, DeleteDirectory): # This isn't atomic! backward_edits = [] for root, dirs, files in os.walk(edit.path, topdown=False): for f in files: path = os.path.join(root, f) edit_diff = await self.applyFileSystemEdit(DeleteFile(filepath=path)) backward_edits.append(edit_diff) for d in dirs: path = os.path.join(root, d) edit_diff = await self.applyFileSystemEdit(DeleteDirectory(path=path)) backward_edits.append(edit_diff) edit_diff = await self.applyFileSystemEdit(DeleteDirectory(path=edit.path)) backward_edits.append(edit_diff) backward_edits.reverse() backward = SequentialFileSystemEdit(edits=backward_edits) elif isinstance(edit, RenameDirectory): fs.rename_directory(edit.path, edit.new_path) backward = RenameDirectory(path=edit.new_path, new_path=edit.path) elif isinstance(edit, FileSystemEdit): diffs = [] for edit in edit.next_edit(): edit_diff = await self.applyFileSystemEdit(edit) diffs.append(edit_diff) backward = EditDiff.from_sequence(diffs=diffs).backward else: raise TypeError("Unknown FileSystemEdit type: " + str(type(edit))) return EditDiff( forward=edit, backward=backward ) ideProtocolServer = IdeProtocolServer(session_manager) @router.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() print("Accepted websocket connection from, ", websocket.client) await websocket.send_json({"messageType": "connected", "data": {}}) ideProtocolServer.websocket = websocket while True: message = await websocket.receive_text() message = json.loads(message) if "messageType" not in message or "data" not in message: continue message_type = message["messageType"] data = message["data"] await ideProtocolServer.handle_json(message_type, data) await websocket.close()