diff options
Diffstat (limited to 'continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py')
-rw-r--r-- | continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py | 62 |
1 files changed, 45 insertions, 17 deletions
diff --git a/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py b/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py index ef5e3b43..3c8277c0 100644 --- a/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py +++ b/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py @@ -1,8 +1,12 @@ +import os +import subprocess from textwrap import dedent +from ...models.main import Range +from ...models.filesystem import RangeInFile from ...steps.main import MessageStep from ...core.sdk import Models -from ...core.observation import DictObservation +from ...core.observation import DictObservation, InternalErrorObservation from ...models.filesystem_edit import AddFile from ...core.main import Step from ...core.sdk import ContinueSDK @@ -33,45 +37,69 @@ class SetupPipelineStep(Step): f'dlt init {source_name} duckdb', 'Y', 'pip install -r requirements.txt' - ]) - + ], description=dedent(f"""\ + Running the following commands: + - `python3 -m venv env`: Create a Python virtual environment + - `source env/bin/activate`: Activate the virtual environment + - `pip install dlt`: Install dlt + - `dlt init {source_name} duckdb`: Create a new dlt pipeline called {source_name} that loads data into a local DuckDB instance + - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline""")) + + await sdk.wait_for_user_confirmation("Wait for the commands to finish running, then press `Continue`") # editing the resource function to call the requested API + await sdk.ide.highlightCode(RangeInFile(filepath=os.path.join(await sdk.ide.getWorkspaceDirectory(), filename), range=Range.from_shorthand(15, 0, 30, 0)), "#00ff0022") + await sdk.edit_file( filename=filename, - prompt=f'Edit the resource function to call the API described by this: {self.api_description}' + prompt=f'Edit the resource function to call the API described by this: {self.api_description}', + name="Edit the resource function to call the API" ) # wait for user to put API key in secrets.toml await sdk.ide.setFileOpen(await sdk.ide.getWorkspaceDirectory() + "/.dlt/secrets.toml") await sdk.wait_for_user_confirmation("If this service requires an API key, please add it to the `secrets.toml` file and then press `Continue`") - return DictObservation(values={"source_name": source_name}) + + sdk.context.set("source_name", source_name) class ValidatePipelineStep(Step): hide: bool = True async def run(self, sdk: ContinueSDK): - source_name = sdk.history.last_observation().values["source_name"] + workspace_dir = await sdk.ide.getWorkspaceDirectory() + source_name = sdk.context.get("source_name") filename = f'{source_name}.py' - await sdk.run_step(MessageStep(message=dedent("""\ - This step will validate that your dlt pipeline is working as expected: - - Test that the API call works - - Load the data into a local DuckDB instance - - Write a query to view the data - """))) + # await sdk.run_step(MessageStep(name="Validate the pipeline", message=dedent("""\ + # Next, we will validate that your dlt pipeline is working as expected: + # - Test that the API call works + # - Load the data into a local DuckDB instance + # - Write a query to view the data + # """))) # test that the API call works - await sdk.run(f'python3 {filename}') + + p = subprocess.run( + ['python3', f'{filename}'], capture_output=True, text=True, cwd=workspace_dir) + err = p.stderr + + # If it fails, return the error + if err is not None and err != "": + sdk.raise_exception( + f"Error while running pipeline. Fix the resource function in {filename} and rerun this step: \n\n" + err) + + await sdk.run(f'python3 {filename}', name="Test the pipeline", description=f"Running python3 {filename} to test loading data from the API") # remove exit() from the main main function await sdk.edit_file( filename=filename, - prompt='Remove exit() from the main function' + prompt='Remove exit() from the main function', + name="Remove early exit() from main function", + description="Remove the `exit()` call from the main function in the pipeline file so that the data is loaded into DuckDB" ) # load the data into the DuckDB instance - await sdk.run(f'python3 {filename}') + await sdk.run(f'python3 {filename}', name="Load data into DuckDB", description=f"Running python3 {filename} to load data into DuckDB") table_name = f"{source_name}.{source_name}_resource" tables_query_code = dedent(f'''\ @@ -89,5 +117,5 @@ class ValidatePipelineStep(Step): ''') query_filename = (await sdk.ide.getWorkspaceDirectory()) + "/query.py" - await sdk.apply_filesystem_edit(AddFile(filepath=query_filename, content=tables_query_code)) - await sdk.run('env/bin/python3 query.py') + await sdk.apply_filesystem_edit(AddFile(filepath=query_filename, content=tables_query_code), name="Add query.py file", description="Adding a file called `query.py` to the workspace that will run a test query on the DuckDB instance") + await sdk.run('env/bin/python3 query.py', name="Run test query", description="Running `env/bin/python3 query.py` to test that the data was loaded into DuckDB as expected") |