summaryrefslogtreecommitdiff
path: root/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py
diff options
context:
space:
mode:
Diffstat (limited to 'continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py')
-rw-r--r--continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py64
1 files changed, 51 insertions, 13 deletions
diff --git a/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py b/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py
index 9bee4c95..c32ae923 100644
--- a/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py
+++ b/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py
@@ -1,16 +1,19 @@
import os
import subprocess
from textwrap import dedent
+import time
from ...models.main import Range
from ...models.filesystem import RangeInFile
from ...steps.main import MessageStep
from ...core.sdk import Models
from ...core.observation import DictObservation, InternalErrorObservation
-from ...models.filesystem_edit import AddFile
+from ...models.filesystem_edit import AddFile, FileEdit
from ...core.main import Step
from ...core.sdk import ContinueSDK
+AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)"
+
class SetupPipelineStep(Step):
hide: bool = True
@@ -25,6 +28,8 @@ class SetupPipelineStep(Step):
""")
async def run(self, sdk: ContinueSDK):
+ sdk.context.set("api_description", self.api_description)
+
source_name = (await sdk.models.gpt35()).complete(
f"Write a snake_case name for the data source described by {self.api_description}: ").strip()
filename = f'{source_name}.py'
@@ -45,14 +50,17 @@ class SetupPipelineStep(Step):
- `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""), name="Setup Python environment")
# editing the resource function to call the requested API
- await sdk.ide.highlightCode(RangeInFile(filepath=os.path.join(await sdk.ide.getWorkspaceDirectory(), filename), range=Range.from_shorthand(15, 0, 30, 0)), "#00ff0022")
+ await sdk.ide.highlightCode(RangeInFile(filepath=os.path.join(await sdk.ide.getWorkspaceDirectory(), filename), range=Range.from_shorthand(15, 0, 29, 0)), "#00ff0022")
+ # sdk.set_loading_message("Writing code to call the API...")
await sdk.edit_file(
filename=filename,
- prompt=f'Edit the resource function to call the API described by this: {self.api_description}',
- name="Edit the resource function to call the API"
+ prompt=f'Edit the resource function to call the API described by this: {self.api_description}. Do not move or remove the exit() call in __main__.',
+ name=f"Edit the resource function to call the API {AI_ASSISTED_STRING}"
)
+ time.sleep(1)
+
# wait for user to put API key in secrets.toml
await sdk.ide.setFileOpen(await sdk.ide.getWorkspaceDirectory() + "/.dlt/secrets.toml")
await sdk.wait_for_user_confirmation("If this service requires an API key, please add it to the `secrets.toml` file and then press `Continue`")
@@ -76,20 +84,50 @@ class ValidatePipelineStep(Step):
# """)))
# test that the API call works
- output = await sdk.run(f'python3 {filename}', name="Test the pipeline", description=f"Running python3 {filename} to test loading data from the API")
+ output = await sdk.run(f'python3 {filename}', name="Test the pipeline", description=f"Running `python3 {filename}` to test loading data from the API")
# If it fails, return the error
if "Traceback" in output:
+ output = "Traceback" + output.split("Traceback")[-1]
+ file_content = await sdk.ide.readFile(os.path.join(workspace_dir, filename))
+ suggestion = (await sdk.models.gpt35()).complete(dedent(f"""\
+ ```python
+ {file_content}
+ ```
+ This above code is a dlt pipeline that loads data from an API. The function with the @resource decorator is responsible for calling the API and returning the data. While attempting to run the pipeline, the following error occurred:
+
+ ```ascii
+ {output}
+ ```
+
+ This is a brief summary of the error followed by a suggestion on how it can be fixed by editing the resource function:"""))
+
+ api_documentation_url = (await sdk.models.gpt35()).complete(dedent(f"""\
+ The API I am trying to call is the '{sdk.context.get('api_description')}'. I tried calling it in the @resource function like this:
+ ```python
+ {file_content}
+ ```
+ What is the URL for the API documentation that will help me learn how to make this call? Please format in markdown so I can click the link."""))
+
sdk.raise_exception(
- title="Error while running pipeline.\nFix the resource function in {filename} and rerun this step", description=output)
+ title=f"Error while running pipeline.\nFix the resource function in {filename} and rerun this step", message=output, with_step=MessageStep(name=f"Suggestion to solve error {AI_ASSISTED_STRING}", message=dedent(f"""\
+ {suggestion}
+
+ {api_documentation_url}
+
+ After you've fixed the code, click the retry button at the top of the Validate Pipeline step above.""")))
# remove exit() from the main main function
- await sdk.edit_file(
- filename=filename,
- prompt='Remove exit() from the main function',
- name="Remove early exit() from main function",
- description="Remove the `exit()` call from the main function in the pipeline file so that the data is loaded into DuckDB"
- )
+ await sdk.run_step(MessageStep(name="Remove early exit() from main function", message="Remove the early exit() from the main function now that we are done testing and want the pipeline to load the data into DuckDB."))
+
+ contents = await sdk.ide.readFile(os.path.join(workspace_dir, filename))
+ replacement = "\n".join(
+ list(filter(lambda line: line.strip() != "exit()", contents.split("\n"))))
+ await sdk.ide.applyFileSystemEdit(FileEdit(
+ filepath=os.path.join(workspace_dir, filename),
+ replacement=replacement,
+ range=Range.from_entire_file(contents)
+ ))
# load the data into the DuckDB instance
await sdk.run(f'python3 {filename}', name="Load data into DuckDB", description=f"Running python3 {filename} to load data into DuckDB")
@@ -109,6 +147,6 @@ class ValidatePipelineStep(Step):
print(row)
''')
- query_filename = (await sdk.ide.getWorkspaceDirectory()) + "/query.py"
+ query_filename = os.path.join(workspace_dir, "query.py")
await sdk.apply_filesystem_edit(AddFile(filepath=query_filename, content=tables_query_code), name="Add query.py file", description="Adding a file called `query.py` to the workspace that will run a test query on the DuckDB instance")
await sdk.run('env/bin/python3 query.py', name="Run test query", description="Running `env/bin/python3 query.py` to test that the data was loaded into DuckDB as expected")