summaryrefslogtreecommitdiff
path: root/server/continuedev/libs/chroma/query.py
diff options
context:
space:
mode:
Diffstat (limited to 'server/continuedev/libs/chroma/query.py')
-rw-r--r--server/continuedev/libs/chroma/query.py218
1 files changed, 218 insertions, 0 deletions
diff --git a/server/continuedev/libs/chroma/query.py b/server/continuedev/libs/chroma/query.py
new file mode 100644
index 00000000..d77cce49
--- /dev/null
+++ b/server/continuedev/libs/chroma/query.py
@@ -0,0 +1,218 @@
+import json
+import os
+import subprocess
+from functools import cached_property
+from typing import List, Tuple
+
+from llama_index import (
+ Document,
+ GPTVectorStoreIndex,
+ StorageContext,
+ load_index_from_storage,
+)
+from llama_index.langchain_helpers.text_splitter import TokenTextSplitter
+
+from ..util.logging import logger
+from .update import filter_ignored_files, load_gpt_index_documents
+
+
+class ChromaIndexManager:
+ workspace_dir: str
+
+ def __init__(self, workspace_dir: str):
+ self.workspace_dir = workspace_dir
+
+ @cached_property
+ def current_commit(self) -> str:
+ """Get the current commit."""
+ return (
+ subprocess.check_output(
+ ["git", "rev-parse", "HEAD"], cwd=self.workspace_dir
+ )
+ .decode("utf-8")
+ .strip()
+ )
+
+ @cached_property
+ def current_branch(self) -> str:
+ """Get the current branch."""
+ return (
+ subprocess.check_output(
+ ["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=self.workspace_dir
+ )
+ .decode("utf-8")
+ .strip()
+ )
+
+ @cached_property
+ def index_dir(self) -> str:
+ return os.path.join(
+ self.workspace_dir, ".continue", "chroma", self.current_branch
+ )
+
+ @cached_property
+ def git_root_dir(self):
+ """Get the root directory of a Git repository."""
+ try:
+ return (
+ subprocess.check_output(
+ ["git", "rev-parse", "--show-toplevel"], cwd=self.workspace_dir
+ )
+ .strip()
+ .decode()
+ )
+ except subprocess.CalledProcessError:
+ return None
+
+ def check_index_exists(self):
+ return os.path.exists(os.path.join(self.index_dir, "metadata.json"))
+
+ def create_codebase_index(self):
+ """Create a new index for the current branch."""
+ if not self.check_index_exists():
+ os.makedirs(self.index_dir)
+ else:
+ return
+
+ documents = load_gpt_index_documents(self.workspace_dir)
+
+ chunks = {}
+ doc_chunks = []
+ for doc in documents:
+ text_splitter = TokenTextSplitter()
+ try:
+ text_chunks = text_splitter.split_text(doc.text)
+ except:
+ logger.warning(f"ERROR (probably found special token): {doc.text}")
+ continue # lol
+ filename = doc.extra_info["filename"]
+ chunks[filename] = len(text_chunks)
+ for i, text in enumerate(text_chunks):
+ doc_chunks.append(Document(text, doc_id=f"{filename}::{i}"))
+
+ with open(f"{self.index_dir}/metadata.json", "w") as f:
+ json.dump({"commit": self.current_commit, "chunks": chunks}, f, indent=4)
+
+ index = GPTVectorStoreIndex([])
+
+ for chunk in doc_chunks:
+ index.insert(chunk)
+
+ # d = 1536 # Dimension of text-ada-embedding-002
+ # faiss_index = faiss.IndexFlatL2(d)
+ # index = GPTFaissIndex(documents, faiss_index=faiss_index)
+ # index.save_to_disk(f"{index_dir_for(branch)}/index.json", faiss_index_save_path=f"{index_dir_for(branch)}/index_faiss_core.index")
+
+ index.storage_context.persist(persist_dir=self.index_dir)
+
+ logger.debug("Codebase index created")
+
+ def get_modified_deleted_files(self) -> Tuple[List[str], List[str]]:
+ """Get a list of all files that have been modified since the last commit."""
+ metadata = f"{self.index_dir}/metadata.json"
+ with open(metadata, "r") as f:
+ previous_commit = json.load(f)["commit"]
+
+ modified_deleted_files = (
+ subprocess.check_output(
+ ["git", "diff", "--name-only", previous_commit, self.current_commit]
+ )
+ .decode("utf-8")
+ .strip()
+ )
+ modified_deleted_files = modified_deleted_files.split("\n")
+ modified_deleted_files = [f for f in modified_deleted_files if f]
+
+ deleted_files = [
+ f
+ for f in modified_deleted_files
+ if not os.path.exists(os.path.join(self.workspace_dir, f))
+ ]
+ modified_files = [
+ f
+ for f in modified_deleted_files
+ if os.path.exists(os.path.join(self.workspace_dir, f))
+ ]
+
+ return filter_ignored_files(
+ modified_files, self.index_dir
+ ), filter_ignored_files(deleted_files, self.index_dir)
+
+ def update_codebase_index(self):
+ """Update the index with a list of files."""
+
+ if not self.check_index_exists():
+ self.create_codebase_index()
+ else:
+ # index = GPTFaissIndex.load_from_disk(f"{index_dir_for(branch)}/index.json", faiss_index_save_path=f"{index_dir_for(branch)}/index_faiss_core.index")
+ index = GPTVectorStoreIndex.load_from_disk(f"{self.index_dir}/index.json")
+ modified_files, deleted_files = self.get_modified_deleted_files()
+
+ with open(f"{self.index_dir}/metadata.json", "r") as f:
+ metadata = json.load(f)
+
+ for file in deleted_files:
+ num_chunks = metadata["chunks"][file]
+ for i in range(num_chunks):
+ index.delete(f"{file}::{i}")
+
+ del metadata["chunks"][file]
+
+ logger.debug(f"Deleted {file}")
+
+ for file in modified_files:
+ if file in metadata["chunks"]:
+ num_chunks = metadata["chunks"][file]
+
+ for i in range(num_chunks):
+ index.delete(f"{file}::{i}")
+
+ logger.debug(f"Deleted old version of {file}")
+
+ with open(file, "r") as f:
+ text = f.read()
+
+ text_splitter = TokenTextSplitter()
+ text_chunks = text_splitter.split_text(text)
+
+ for i, text in enumerate(text_chunks):
+ index.insert(Document(text, doc_id=f"{file}::{i}"))
+
+ metadata["chunks"][file] = len(text_chunks)
+
+ logger.debug(f"Inserted new version of {file}")
+
+ metadata["commit"] = self.current_commit
+
+ with open(f"{self.index_dir}/metadata.json", "w") as f:
+ json.dump(metadata, f, indent=4)
+
+ logger.debug("Codebase index updated")
+
+ def query_codebase_index(self, query: str) -> str:
+ """Query the codebase index."""
+ if not self.check_index_exists():
+ logger.debug(f"No index found for the codebase at {self.index_dir}")
+ return ""
+
+ storage_context = StorageContext.from_defaults(persist_dir=self.index_dir)
+ index = load_index_from_storage(storage_context)
+ # index = GPTVectorStoreIndex.load_from_disk(path)
+ engine = index.as_query_engine()
+ return engine.query(query)
+
+ def query_additional_index(self, query: str) -> str:
+ """Query the additional index."""
+ index = GPTVectorStoreIndex.load_from_disk(
+ os.path.join(self.index_dir, "additional_index.json")
+ )
+ return index.query(query)
+
+ def replace_additional_index(self, info: str):
+ """Replace the additional index with the given info."""
+ with open(f"{self.index_dir}/additional_context.txt", "w") as f:
+ f.write(info)
+ documents = [Document(info)]
+ index = GPTVectorStoreIndex(documents)
+ index.save_to_disk(f"{self.index_dir}/additional_index.json")
+ logger.debug("Additional index replaced")