diff options
authorNate Sesti <>2023-06-10 07:53:31 -0400
committerNate Sesti <>2023-06-10 07:53:31 -0400
commit262dffd21c4dac88050926d72b78f7e91a5df75b (patch)
parentc3b24a89105d22a5fa01400b7c9d5494a2d3ffc5 (diff)
selection step of airflow recipe
3 files changed, 57 insertions, 130 deletions
diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/
index d7cd03db..8e7d258d 100644
--- a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/
+++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/
@@ -1,10 +1,11 @@
from textwrap import dedent
+from ...steps.input.nl_multiselect import NLMultiselectStep
from ...core.main import Step
from ...core.sdk import ContinueSDK
from ...steps.core.core import WaitForUserInputStep
from ...steps.main import MessageStep
-from .steps import SetupPipelineStep, ValidatePipelineStep
+from .steps import SetupPipelineStep
@@ -19,19 +20,30 @@ class DeployPipelineAirflowRecipe(Step):
async def run(self, sdk: ContinueSDK):
text_observation = await sdk.run_step(
- MessageStep(name="Building your first dlt pipeline", message=dedent("""\
- This recipe will walk you through the process of creating a dlt pipeline for your chosen data source. With the help of Continue, you will:
- - Create a Python virtual environment with dlt installed
- - Run `dlt init` to generate a pipeline template
- - Write the code to call the API
- - Add any required API keys to the `secrets.toml` file
- - Test that the API call works
- - Load the data into a local DuckDB instance
- - Write a query to view the data""")) >>
- WaitForUserInputStep(
- prompt="What API do you want to load data from? (e.g.,")
+ MessageStep(name="Deploying a pipeline to Airflow", message=dedent("""\
+ This recipe will show you how to deploy a pipeline to Airflow. With the help of Continue, you will:
+ - Select a dlt-verified pipeline
+ - Setup the pipeline
+ - Deploy it to Airflow
+ - Optionally, setup Airflow locally""")) >>
+ NLMultiselectStep(
+ prompt=dedent("""\
+ Which verified pipeline do you want to deploy with Airflow? The options are:
+ - Asana
+ -
+ - GitHub
+ - Google Analytics
+ - Google Sheets
+ - HubSpot
+ - Matomo
+ - Pipedrive
+ - Shopify
+ - Strapi
+ - Zendesk"""),
+ options=[
+ "asana_dlt", "chess", "github", "google_analytics", "google_sheets", "hubspot", "matomo", "pipedrive", "shopify_dlt", "strapi", "zendesk"
+ ])
await sdk.run_step(
- SetupPipelineStep(api_description=text_observation.text) >>
- ValidatePipelineStep()
+ SetupPipelineStep(source_name=text_observation.text) >>
diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/
index c32ae923..b69b3adc 100644
--- a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/
+++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/
@@ -19,134 +19,22 @@ class SetupPipelineStep(Step):
hide: bool = True
name: str = "Setup dlt Pipeline"
- api_description: str # e.g. "I want to load data from the API"
+ source_name: str
async def describe(self, models: Models):
- return dedent(f"""\
- This step will create a new dlt pipeline that loads data from an API, as per your request:
- {self.api_description}
- """)
+ pass
async def run(self, sdk: ContinueSDK):
- sdk.context.set("api_description", self.api_description)
- source_name = (await sdk.models.gpt35()).complete(
- f"Write a snake_case name for the data source described by {self.api_description}: ").strip()
- filename = f'{source_name}.py'
- # running commands to get started when creating a new dlt pipeline
'python3 -m venv env',
'source env/bin/activate',
'pip install dlt',
- f'dlt init {source_name} duckdb\n\rY',
+ f'dlt --non-interactive init {self.source_name} duckdb',
'pip install -r requirements.txt'
], description=dedent(f"""\
Running the following commands:
- `python3 -m venv env`: Create a Python virtual environment
- `source env/bin/activate`: Activate the virtual environment
- `pip install dlt`: Install dlt
- - `dlt init {source_name} duckdb`: Create a new dlt pipeline called {source_name} that loads data into a local DuckDB instance
+ - `dlt init {self.source_name} duckdb`: Create a new dlt pipeline called {self.source_name} that loads data into a local DuckDB instance
- `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""), name="Setup Python environment")
- # editing the resource function to call the requested API
- await sdk.ide.highlightCode(RangeInFile(filepath=os.path.join(await sdk.ide.getWorkspaceDirectory(), filename), range=Range.from_shorthand(15, 0, 29, 0)), "#00ff0022")
- # sdk.set_loading_message("Writing code to call the API...")
- await sdk.edit_file(
- filename=filename,
- prompt=f'Edit the resource function to call the API described by this: {self.api_description}. Do not move or remove the exit() call in __main__.',
- name=f"Edit the resource function to call the API {AI_ASSISTED_STRING}"
- )
- time.sleep(1)
- # wait for user to put API key in secrets.toml
- await sdk.ide.setFileOpen(await sdk.ide.getWorkspaceDirectory() + "/.dlt/secrets.toml")
- await sdk.wait_for_user_confirmation("If this service requires an API key, please add it to the `secrets.toml` file and then press `Continue`")
- sdk.context.set("source_name", source_name)
-class ValidatePipelineStep(Step):
- hide: bool = True
- async def run(self, sdk: ContinueSDK):
- workspace_dir = await sdk.ide.getWorkspaceDirectory()
- source_name = sdk.context.get("source_name")
- filename = f'{source_name}.py'
- # await sdk.run_step(MessageStep(name="Validate the pipeline", message=dedent("""\
- # Next, we will validate that your dlt pipeline is working as expected:
- # - Test that the API call works
- # - Load the data into a local DuckDB instance
- # - Write a query to view the data
- # """)))
- # test that the API call works
- output = await'python3 {filename}', name="Test the pipeline", description=f"Running `python3 {filename}` to test loading data from the API")
- # If it fails, return the error
- if "Traceback" in output:
- output = "Traceback" + output.split("Traceback")[-1]
- file_content = await sdk.ide.readFile(os.path.join(workspace_dir, filename))
- suggestion = (await sdk.models.gpt35()).complete(dedent(f"""\
- ```python
- {file_content}
- ```
- This above code is a dlt pipeline that loads data from an API. The function with the @resource decorator is responsible for calling the API and returning the data. While attempting to run the pipeline, the following error occurred:
- ```ascii
- {output}
- ```
- This is a brief summary of the error followed by a suggestion on how it can be fixed by editing the resource function:"""))
- api_documentation_url = (await sdk.models.gpt35()).complete(dedent(f"""\
- The API I am trying to call is the '{sdk.context.get('api_description')}'. I tried calling it in the @resource function like this:
- ```python
- {file_content}
- ```
- What is the URL for the API documentation that will help me learn how to make this call? Please format in markdown so I can click the link."""))
- sdk.raise_exception(
- title=f"Error while running pipeline.\nFix the resource function in {filename} and rerun this step", message=output, with_step=MessageStep(name=f"Suggestion to solve error {AI_ASSISTED_STRING}", message=dedent(f"""\
- {suggestion}
- {api_documentation_url}
- After you've fixed the code, click the retry button at the top of the Validate Pipeline step above.""")))
- # remove exit() from the main main function
- await sdk.run_step(MessageStep(name="Remove early exit() from main function", message="Remove the early exit() from the main function now that we are done testing and want the pipeline to load the data into DuckDB."))
- contents = await sdk.ide.readFile(os.path.join(workspace_dir, filename))
- replacement = "\n".join(
- list(filter(lambda line: line.strip() != "exit()", contents.split("\n"))))
- await sdk.ide.applyFileSystemEdit(FileEdit(
- filepath=os.path.join(workspace_dir, filename),
- replacement=replacement,
- range=Range.from_entire_file(contents)
- ))
- # load the data into the DuckDB instance
- await'python3 {filename}', name="Load data into DuckDB", description=f"Running python3 {filename} to load data into DuckDB")
- table_name = f"{source_name}.{source_name}_resource"
- tables_query_code = dedent(f'''\
- import duckdb
- # connect to DuckDB instance
- conn = duckdb.connect(database="{source_name}.duckdb")
- # get table names
- rows = conn.execute("SELECT * FROM {table_name};").fetchall()
- # print table names
- for row in rows:
- print(row)
- ''')
- query_filename = os.path.join(workspace_dir, "")
- await sdk.apply_filesystem_edit(AddFile(filepath=query_filename, content=tables_query_code), name="Add file", description="Adding a file called `` to the workspace that will run a test query on the DuckDB instance")
- await'env/bin/python3', name="Run test query", description="Running `env/bin/python3` to test that the data was loaded into DuckDB as expected")
diff --git a/continuedev/src/continuedev/steps/input/ b/continuedev/src/continuedev/steps/input/
new file mode 100644
index 00000000..c3c832f5
--- /dev/null
+++ b/continuedev/src/continuedev/steps/input/
@@ -0,0 +1,27 @@
+from typing import List, Union
+from ..core.core import WaitForUserInputStep
+from ...core.main import Step
+from ...core.sdk import ContinueSDK
+class NLMultiselectStep(Step):
+ hide: bool = True
+ prompt: str
+ options: List[str]
+ async def run(self, sdk: ContinueSDK):
+ user_response = (await sdk.run_step(WaitForUserInputStep(prompt=self.prompt))).text
+ def extract_option(text: str) -> Union[str, None]:
+ for option in self.options:
+ if option in text:
+ return option
+ return None
+ first_try = extract_option(user_response.lower())
+ if first_try is not None:
+ return first_try
+ gpt_parsed = await sdk.models.gpt35.complete(f"These are the available options are: [{', '.join(self.options)}]. The user requested {user_response}. This is the exact string from the options array that they selected:")
+ return extract_option(gpt_parsed) or self.options[0]