diff options
Diffstat (limited to 'server/continuedev/libs/chroma/query.py')
-rw-r--r-- | server/continuedev/libs/chroma/query.py | 218 |
1 files changed, 218 insertions, 0 deletions
diff --git a/server/continuedev/libs/chroma/query.py b/server/continuedev/libs/chroma/query.py new file mode 100644 index 00000000..d77cce49 --- /dev/null +++ b/server/continuedev/libs/chroma/query.py @@ -0,0 +1,218 @@ +import json +import os +import subprocess +from functools import cached_property +from typing import List, Tuple + +from llama_index import ( + Document, + GPTVectorStoreIndex, + StorageContext, + load_index_from_storage, +) +from llama_index.langchain_helpers.text_splitter import TokenTextSplitter + +from ..util.logging import logger +from .update import filter_ignored_files, load_gpt_index_documents + + +class ChromaIndexManager: + workspace_dir: str + + def __init__(self, workspace_dir: str): + self.workspace_dir = workspace_dir + + @cached_property + def current_commit(self) -> str: + """Get the current commit.""" + return ( + subprocess.check_output( + ["git", "rev-parse", "HEAD"], cwd=self.workspace_dir + ) + .decode("utf-8") + .strip() + ) + + @cached_property + def current_branch(self) -> str: + """Get the current branch.""" + return ( + subprocess.check_output( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=self.workspace_dir + ) + .decode("utf-8") + .strip() + ) + + @cached_property + def index_dir(self) -> str: + return os.path.join( + self.workspace_dir, ".continue", "chroma", self.current_branch + ) + + @cached_property + def git_root_dir(self): + """Get the root directory of a Git repository.""" + try: + return ( + subprocess.check_output( + ["git", "rev-parse", "--show-toplevel"], cwd=self.workspace_dir + ) + .strip() + .decode() + ) + except subprocess.CalledProcessError: + return None + + def check_index_exists(self): + return os.path.exists(os.path.join(self.index_dir, "metadata.json")) + + def create_codebase_index(self): + """Create a new index for the current branch.""" + if not self.check_index_exists(): + os.makedirs(self.index_dir) + else: + return + + documents = load_gpt_index_documents(self.workspace_dir) + + chunks = {} + doc_chunks = [] + for doc in documents: + text_splitter = TokenTextSplitter() + try: + text_chunks = text_splitter.split_text(doc.text) + except: + logger.warning(f"ERROR (probably found special token): {doc.text}") + continue # lol + filename = doc.extra_info["filename"] + chunks[filename] = len(text_chunks) + for i, text in enumerate(text_chunks): + doc_chunks.append(Document(text, doc_id=f"{filename}::{i}")) + + with open(f"{self.index_dir}/metadata.json", "w") as f: + json.dump({"commit": self.current_commit, "chunks": chunks}, f, indent=4) + + index = GPTVectorStoreIndex([]) + + for chunk in doc_chunks: + index.insert(chunk) + + # d = 1536 # Dimension of text-ada-embedding-002 + # faiss_index = faiss.IndexFlatL2(d) + # index = GPTFaissIndex(documents, faiss_index=faiss_index) + # index.save_to_disk(f"{index_dir_for(branch)}/index.json", faiss_index_save_path=f"{index_dir_for(branch)}/index_faiss_core.index") + + index.storage_context.persist(persist_dir=self.index_dir) + + logger.debug("Codebase index created") + + def get_modified_deleted_files(self) -> Tuple[List[str], List[str]]: + """Get a list of all files that have been modified since the last commit.""" + metadata = f"{self.index_dir}/metadata.json" + with open(metadata, "r") as f: + previous_commit = json.load(f)["commit"] + + modified_deleted_files = ( + subprocess.check_output( + ["git", "diff", "--name-only", previous_commit, self.current_commit] + ) + .decode("utf-8") + .strip() + ) + modified_deleted_files = modified_deleted_files.split("\n") + modified_deleted_files = [f for f in modified_deleted_files if f] + + deleted_files = [ + f + for f in modified_deleted_files + if not os.path.exists(os.path.join(self.workspace_dir, f)) + ] + modified_files = [ + f + for f in modified_deleted_files + if os.path.exists(os.path.join(self.workspace_dir, f)) + ] + + return filter_ignored_files( + modified_files, self.index_dir + ), filter_ignored_files(deleted_files, self.index_dir) + + def update_codebase_index(self): + """Update the index with a list of files.""" + + if not self.check_index_exists(): + self.create_codebase_index() + else: + # index = GPTFaissIndex.load_from_disk(f"{index_dir_for(branch)}/index.json", faiss_index_save_path=f"{index_dir_for(branch)}/index_faiss_core.index") + index = GPTVectorStoreIndex.load_from_disk(f"{self.index_dir}/index.json") + modified_files, deleted_files = self.get_modified_deleted_files() + + with open(f"{self.index_dir}/metadata.json", "r") as f: + metadata = json.load(f) + + for file in deleted_files: + num_chunks = metadata["chunks"][file] + for i in range(num_chunks): + index.delete(f"{file}::{i}") + + del metadata["chunks"][file] + + logger.debug(f"Deleted {file}") + + for file in modified_files: + if file in metadata["chunks"]: + num_chunks = metadata["chunks"][file] + + for i in range(num_chunks): + index.delete(f"{file}::{i}") + + logger.debug(f"Deleted old version of {file}") + + with open(file, "r") as f: + text = f.read() + + text_splitter = TokenTextSplitter() + text_chunks = text_splitter.split_text(text) + + for i, text in enumerate(text_chunks): + index.insert(Document(text, doc_id=f"{file}::{i}")) + + metadata["chunks"][file] = len(text_chunks) + + logger.debug(f"Inserted new version of {file}") + + metadata["commit"] = self.current_commit + + with open(f"{self.index_dir}/metadata.json", "w") as f: + json.dump(metadata, f, indent=4) + + logger.debug("Codebase index updated") + + def query_codebase_index(self, query: str) -> str: + """Query the codebase index.""" + if not self.check_index_exists(): + logger.debug(f"No index found for the codebase at {self.index_dir}") + return "" + + storage_context = StorageContext.from_defaults(persist_dir=self.index_dir) + index = load_index_from_storage(storage_context) + # index = GPTVectorStoreIndex.load_from_disk(path) + engine = index.as_query_engine() + return engine.query(query) + + def query_additional_index(self, query: str) -> str: + """Query the additional index.""" + index = GPTVectorStoreIndex.load_from_disk( + os.path.join(self.index_dir, "additional_index.json") + ) + return index.query(query) + + def replace_additional_index(self, info: str): + """Replace the additional index with the given info.""" + with open(f"{self.index_dir}/additional_context.txt", "w") as f: + f.write(info) + documents = [Document(info)] + index = GPTVectorStoreIndex(documents) + index.save_to_disk(f"{self.index_dir}/additional_index.json") + logger.debug("Additional index replaced") |