summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTy Dunn <ty@tydunn.com>2023-06-07 13:29:53 +0200
committerTy Dunn <ty@tydunn.com>2023-06-07 13:29:53 +0200
commit86369432eb6d35727f87fffa4a79646a85bb5498 (patch)
treebff4d6f235fa2f49a4501c70c3ac7651535a9c6c
parent6ebb5088a1363d4de8b9d2e6abaa02c49ee90f05 (diff)
downloadsncontinue-86369432eb6d35727f87fffa4a79646a85bb5498.tar.gz
sncontinue-86369432eb6d35727f87fffa4a79646a85bb5498.tar.bz2
sncontinue-86369432eb6d35727f87fffa4a79646a85bb5498.zip
initial structure
-rw-r--r--continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md0
-rw-r--r--continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py37
-rw-r--r--continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py152
3 files changed, 189 insertions, 0 deletions
diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md
diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py
new file mode 100644
index 00000000..d7cd03db
--- /dev/null
+++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py
@@ -0,0 +1,37 @@
+from textwrap import dedent
+
+from ...core.main import Step
+from ...core.sdk import ContinueSDK
+from ...steps.core.core import WaitForUserInputStep
+from ...steps.main import MessageStep
+from .steps import SetupPipelineStep, ValidatePipelineStep
+
+
+# https://github.com/dlt-hub/dlt-deploy-template/blob/master/airflow-composer/dag_template.py
+# https://www.notion.so/dlthub/Deploy-a-pipeline-with-Airflow-245fd1058652479494307ead0b5565f3
+# 1. What verified pipeline do you want to deploy with Airflow?
+# 2. Set up selected verified pipeline
+# 3. Deploy selected verified pipeline with Airflow
+# 4. Set up Airflow locally?
+
+class DeployPipelineAirflowRecipe(Step):
+ hide: bool = True
+
+ async def run(self, sdk: ContinueSDK):
+ text_observation = await sdk.run_step(
+ MessageStep(name="Building your first dlt pipeline", message=dedent("""\
+ This recipe will walk you through the process of creating a dlt pipeline for your chosen data source. With the help of Continue, you will:
+ - Create a Python virtual environment with dlt installed
+ - Run `dlt init` to generate a pipeline template
+ - Write the code to call the API
+ - Add any required API keys to the `secrets.toml` file
+ - Test that the API call works
+ - Load the data into a local DuckDB instance
+ - Write a query to view the data""")) >>
+ WaitForUserInputStep(
+ prompt="What API do you want to load data from? (e.g. weatherapi.com, chess.com)")
+ )
+ await sdk.run_step(
+ SetupPipelineStep(api_description=text_observation.text) >>
+ ValidatePipelineStep()
+ )
diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py
new file mode 100644
index 00000000..c32ae923
--- /dev/null
+++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py
@@ -0,0 +1,152 @@
+import os
+import subprocess
+from textwrap import dedent
+import time
+
+from ...models.main import Range
+from ...models.filesystem import RangeInFile
+from ...steps.main import MessageStep
+from ...core.sdk import Models
+from ...core.observation import DictObservation, InternalErrorObservation
+from ...models.filesystem_edit import AddFile, FileEdit
+from ...core.main import Step
+from ...core.sdk import ContinueSDK
+
+AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)"
+
+
+class SetupPipelineStep(Step):
+ hide: bool = True
+ name: str = "Setup dlt Pipeline"
+
+ api_description: str # e.g. "I want to load data from the weatherapi.com API"
+
+ async def describe(self, models: Models):
+ return dedent(f"""\
+ This step will create a new dlt pipeline that loads data from an API, as per your request:
+ {self.api_description}
+ """)
+
+ async def run(self, sdk: ContinueSDK):
+ sdk.context.set("api_description", self.api_description)
+
+ source_name = (await sdk.models.gpt35()).complete(
+ f"Write a snake_case name for the data source described by {self.api_description}: ").strip()
+ filename = f'{source_name}.py'
+
+ # running commands to get started when creating a new dlt pipeline
+ await sdk.run([
+ 'python3 -m venv env',
+ 'source env/bin/activate',
+ 'pip install dlt',
+ f'dlt init {source_name} duckdb\n\rY',
+ 'pip install -r requirements.txt'
+ ], description=dedent(f"""\
+ Running the following commands:
+ - `python3 -m venv env`: Create a Python virtual environment
+ - `source env/bin/activate`: Activate the virtual environment
+ - `pip install dlt`: Install dlt
+ - `dlt init {source_name} duckdb`: Create a new dlt pipeline called {source_name} that loads data into a local DuckDB instance
+ - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""), name="Setup Python environment")
+
+ # editing the resource function to call the requested API
+ await sdk.ide.highlightCode(RangeInFile(filepath=os.path.join(await sdk.ide.getWorkspaceDirectory(), filename), range=Range.from_shorthand(15, 0, 29, 0)), "#00ff0022")
+
+ # sdk.set_loading_message("Writing code to call the API...")
+ await sdk.edit_file(
+ filename=filename,
+ prompt=f'Edit the resource function to call the API described by this: {self.api_description}. Do not move or remove the exit() call in __main__.',
+ name=f"Edit the resource function to call the API {AI_ASSISTED_STRING}"
+ )
+
+ time.sleep(1)
+
+ # wait for user to put API key in secrets.toml
+ await sdk.ide.setFileOpen(await sdk.ide.getWorkspaceDirectory() + "/.dlt/secrets.toml")
+ await sdk.wait_for_user_confirmation("If this service requires an API key, please add it to the `secrets.toml` file and then press `Continue`")
+
+ sdk.context.set("source_name", source_name)
+
+
+class ValidatePipelineStep(Step):
+ hide: bool = True
+
+ async def run(self, sdk: ContinueSDK):
+ workspace_dir = await sdk.ide.getWorkspaceDirectory()
+ source_name = sdk.context.get("source_name")
+ filename = f'{source_name}.py'
+
+ # await sdk.run_step(MessageStep(name="Validate the pipeline", message=dedent("""\
+ # Next, we will validate that your dlt pipeline is working as expected:
+ # - Test that the API call works
+ # - Load the data into a local DuckDB instance
+ # - Write a query to view the data
+ # """)))
+
+ # test that the API call works
+ output = await sdk.run(f'python3 {filename}', name="Test the pipeline", description=f"Running `python3 {filename}` to test loading data from the API")
+
+ # If it fails, return the error
+ if "Traceback" in output:
+ output = "Traceback" + output.split("Traceback")[-1]
+ file_content = await sdk.ide.readFile(os.path.join(workspace_dir, filename))
+ suggestion = (await sdk.models.gpt35()).complete(dedent(f"""\
+ ```python
+ {file_content}
+ ```
+ This above code is a dlt pipeline that loads data from an API. The function with the @resource decorator is responsible for calling the API and returning the data. While attempting to run the pipeline, the following error occurred:
+
+ ```ascii
+ {output}
+ ```
+
+ This is a brief summary of the error followed by a suggestion on how it can be fixed by editing the resource function:"""))
+
+ api_documentation_url = (await sdk.models.gpt35()).complete(dedent(f"""\
+ The API I am trying to call is the '{sdk.context.get('api_description')}'. I tried calling it in the @resource function like this:
+ ```python
+ {file_content}
+ ```
+ What is the URL for the API documentation that will help me learn how to make this call? Please format in markdown so I can click the link."""))
+
+ sdk.raise_exception(
+ title=f"Error while running pipeline.\nFix the resource function in {filename} and rerun this step", message=output, with_step=MessageStep(name=f"Suggestion to solve error {AI_ASSISTED_STRING}", message=dedent(f"""\
+ {suggestion}
+
+ {api_documentation_url}
+
+ After you've fixed the code, click the retry button at the top of the Validate Pipeline step above.""")))
+
+ # remove exit() from the main main function
+ await sdk.run_step(MessageStep(name="Remove early exit() from main function", message="Remove the early exit() from the main function now that we are done testing and want the pipeline to load the data into DuckDB."))
+
+ contents = await sdk.ide.readFile(os.path.join(workspace_dir, filename))
+ replacement = "\n".join(
+ list(filter(lambda line: line.strip() != "exit()", contents.split("\n"))))
+ await sdk.ide.applyFileSystemEdit(FileEdit(
+ filepath=os.path.join(workspace_dir, filename),
+ replacement=replacement,
+ range=Range.from_entire_file(contents)
+ ))
+
+ # load the data into the DuckDB instance
+ await sdk.run(f'python3 {filename}', name="Load data into DuckDB", description=f"Running python3 {filename} to load data into DuckDB")
+
+ table_name = f"{source_name}.{source_name}_resource"
+ tables_query_code = dedent(f'''\
+ import duckdb
+
+ # connect to DuckDB instance
+ conn = duckdb.connect(database="{source_name}.duckdb")
+
+ # get table names
+ rows = conn.execute("SELECT * FROM {table_name};").fetchall()
+
+ # print table names
+ for row in rows:
+ print(row)
+ ''')
+
+ query_filename = os.path.join(workspace_dir, "query.py")
+ await sdk.apply_filesystem_edit(AddFile(filepath=query_filename, content=tables_query_code), name="Add query.py file", description="Adding a file called `query.py` to the workspace that will run a test query on the DuckDB instance")
+ await sdk.run('env/bin/python3 query.py', name="Run test query", description="Running `env/bin/python3 query.py` to test that the data was loaded into DuckDB as expected")