From e38c6aa6f965f00b039c10789a42eae21e0e14b4 Mon Sep 17 00:00:00 2001 From: Ty Dunn Date: Wed, 7 Jun 2023 13:29:53 +0200 Subject: initial structure --- .../recipes/DeployPipelineAirflowRecipe/README.md | 0 .../recipes/DeployPipelineAirflowRecipe/main.py | 37 +++++ .../recipes/DeployPipelineAirflowRecipe/steps.py | 152 +++++++++++++++++++++ 3 files changed, 189 insertions(+) create mode 100644 continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md create mode 100644 continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py create mode 100644 continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md new file mode 100644 index 00000000..e69de29b diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py new file mode 100644 index 00000000..d7cd03db --- /dev/null +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py @@ -0,0 +1,37 @@ +from textwrap import dedent + +from ...core.main import Step +from ...core.sdk import ContinueSDK +from ...steps.core.core import WaitForUserInputStep +from ...steps.main import MessageStep +from .steps import SetupPipelineStep, ValidatePipelineStep + + +# https://github.com/dlt-hub/dlt-deploy-template/blob/master/airflow-composer/dag_template.py +# https://www.notion.so/dlthub/Deploy-a-pipeline-with-Airflow-245fd1058652479494307ead0b5565f3 +# 1. What verified pipeline do you want to deploy with Airflow? +# 2. Set up selected verified pipeline +# 3. Deploy selected verified pipeline with Airflow +# 4. Set up Airflow locally? + +class DeployPipelineAirflowRecipe(Step): + hide: bool = True + + async def run(self, sdk: ContinueSDK): + text_observation = await sdk.run_step( + MessageStep(name="Building your first dlt pipeline", message=dedent("""\ + This recipe will walk you through the process of creating a dlt pipeline for your chosen data source. With the help of Continue, you will: + - Create a Python virtual environment with dlt installed + - Run `dlt init` to generate a pipeline template + - Write the code to call the API + - Add any required API keys to the `secrets.toml` file + - Test that the API call works + - Load the data into a local DuckDB instance + - Write a query to view the data""")) >> + WaitForUserInputStep( + prompt="What API do you want to load data from? (e.g. weatherapi.com, chess.com)") + ) + await sdk.run_step( + SetupPipelineStep(api_description=text_observation.text) >> + ValidatePipelineStep() + ) diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py new file mode 100644 index 00000000..c32ae923 --- /dev/null +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py @@ -0,0 +1,152 @@ +import os +import subprocess +from textwrap import dedent +import time + +from ...models.main import Range +from ...models.filesystem import RangeInFile +from ...steps.main import MessageStep +from ...core.sdk import Models +from ...core.observation import DictObservation, InternalErrorObservation +from ...models.filesystem_edit import AddFile, FileEdit +from ...core.main import Step +from ...core.sdk import ContinueSDK + +AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)" + + +class SetupPipelineStep(Step): + hide: bool = True + name: str = "Setup dlt Pipeline" + + api_description: str # e.g. "I want to load data from the weatherapi.com API" + + async def describe(self, models: Models): + return dedent(f"""\ + This step will create a new dlt pipeline that loads data from an API, as per your request: + {self.api_description} + """) + + async def run(self, sdk: ContinueSDK): + sdk.context.set("api_description", self.api_description) + + source_name = (await sdk.models.gpt35()).complete( + f"Write a snake_case name for the data source described by {self.api_description}: ").strip() + filename = f'{source_name}.py' + + # running commands to get started when creating a new dlt pipeline + await sdk.run([ + 'python3 -m venv env', + 'source env/bin/activate', + 'pip install dlt', + f'dlt init {source_name} duckdb\n\rY', + 'pip install -r requirements.txt' + ], description=dedent(f"""\ + Running the following commands: + - `python3 -m venv env`: Create a Python virtual environment + - `source env/bin/activate`: Activate the virtual environment + - `pip install dlt`: Install dlt + - `dlt init {source_name} duckdb`: Create a new dlt pipeline called {source_name} that loads data into a local DuckDB instance + - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""), name="Setup Python environment") + + # editing the resource function to call the requested API + await sdk.ide.highlightCode(RangeInFile(filepath=os.path.join(await sdk.ide.getWorkspaceDirectory(), filename), range=Range.from_shorthand(15, 0, 29, 0)), "#00ff0022") + + # sdk.set_loading_message("Writing code to call the API...") + await sdk.edit_file( + filename=filename, + prompt=f'Edit the resource function to call the API described by this: {self.api_description}. Do not move or remove the exit() call in __main__.', + name=f"Edit the resource function to call the API {AI_ASSISTED_STRING}" + ) + + time.sleep(1) + + # wait for user to put API key in secrets.toml + await sdk.ide.setFileOpen(await sdk.ide.getWorkspaceDirectory() + "/.dlt/secrets.toml") + await sdk.wait_for_user_confirmation("If this service requires an API key, please add it to the `secrets.toml` file and then press `Continue`") + + sdk.context.set("source_name", source_name) + + +class ValidatePipelineStep(Step): + hide: bool = True + + async def run(self, sdk: ContinueSDK): + workspace_dir = await sdk.ide.getWorkspaceDirectory() + source_name = sdk.context.get("source_name") + filename = f'{source_name}.py' + + # await sdk.run_step(MessageStep(name="Validate the pipeline", message=dedent("""\ + # Next, we will validate that your dlt pipeline is working as expected: + # - Test that the API call works + # - Load the data into a local DuckDB instance + # - Write a query to view the data + # """))) + + # test that the API call works + output = await sdk.run(f'python3 {filename}', name="Test the pipeline", description=f"Running `python3 {filename}` to test loading data from the API") + + # If it fails, return the error + if "Traceback" in output: + output = "Traceback" + output.split("Traceback")[-1] + file_content = await sdk.ide.readFile(os.path.join(workspace_dir, filename)) + suggestion = (await sdk.models.gpt35()).complete(dedent(f"""\ + ```python + {file_content} + ``` + This above code is a dlt pipeline that loads data from an API. The function with the @resource decorator is responsible for calling the API and returning the data. While attempting to run the pipeline, the following error occurred: + + ```ascii + {output} + ``` + + This is a brief summary of the error followed by a suggestion on how it can be fixed by editing the resource function:""")) + + api_documentation_url = (await sdk.models.gpt35()).complete(dedent(f"""\ + The API I am trying to call is the '{sdk.context.get('api_description')}'. I tried calling it in the @resource function like this: + ```python + {file_content} + ``` + What is the URL for the API documentation that will help me learn how to make this call? Please format in markdown so I can click the link.""")) + + sdk.raise_exception( + title=f"Error while running pipeline.\nFix the resource function in {filename} and rerun this step", message=output, with_step=MessageStep(name=f"Suggestion to solve error {AI_ASSISTED_STRING}", message=dedent(f"""\ + {suggestion} + + {api_documentation_url} + + After you've fixed the code, click the retry button at the top of the Validate Pipeline step above."""))) + + # remove exit() from the main main function + await sdk.run_step(MessageStep(name="Remove early exit() from main function", message="Remove the early exit() from the main function now that we are done testing and want the pipeline to load the data into DuckDB.")) + + contents = await sdk.ide.readFile(os.path.join(workspace_dir, filename)) + replacement = "\n".join( + list(filter(lambda line: line.strip() != "exit()", contents.split("\n")))) + await sdk.ide.applyFileSystemEdit(FileEdit( + filepath=os.path.join(workspace_dir, filename), + replacement=replacement, + range=Range.from_entire_file(contents) + )) + + # load the data into the DuckDB instance + await sdk.run(f'python3 {filename}', name="Load data into DuckDB", description=f"Running python3 {filename} to load data into DuckDB") + + table_name = f"{source_name}.{source_name}_resource" + tables_query_code = dedent(f'''\ + import duckdb + + # connect to DuckDB instance + conn = duckdb.connect(database="{source_name}.duckdb") + + # get table names + rows = conn.execute("SELECT * FROM {table_name};").fetchall() + + # print table names + for row in rows: + print(row) + ''') + + query_filename = os.path.join(workspace_dir, "query.py") + await sdk.apply_filesystem_edit(AddFile(filepath=query_filename, content=tables_query_code), name="Add query.py file", description="Adding a file called `query.py` to the workspace that will run a test query on the DuckDB instance") + await sdk.run('env/bin/python3 query.py', name="Run test query", description="Running `env/bin/python3 query.py` to test that the data was loaded into DuckDB as expected") -- cgit v1.2.3-70-g09d2 From d637bb3c6b9c8bb77f7a9f30573bc4f9e54a367d Mon Sep 17 00:00:00 2001 From: Nate Sesti Date: Wed, 7 Jun 2023 17:03:57 -0400 Subject: added /airflow command and config string mapping --- continuedev/src/continuedev/core/policy.py | 5 ++++- continuedev/src/continuedev/steps/steps_on_startup.py | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/continuedev/src/continuedev/core/policy.py b/continuedev/src/continuedev/core/policy.py index 8aea8de7..d87a3582 100644 --- a/continuedev/src/continuedev/core/policy.py +++ b/continuedev/src/continuedev/core/policy.py @@ -3,6 +3,7 @@ from typing import List, Tuple, Type from ..steps.chroma import AnswerQuestionChroma, EditFileChroma, CreateCodebaseIndexChroma from ..steps.steps_on_startup import StepsOnStartupStep from ..recipes.CreatePipelineRecipe.main import CreatePipelineRecipe +from ..recipes.DeployPipelineAirflowRecipe.main import DeployPipelineAirflowRecipe from .main import Step, Validator, History, Policy from .observation import Observation, TracebackObservation, UserInputObservation from ..steps.main import EditHighlightedCodeStep, SolveTracebackStep, RunCodeStep, FasterEditHighlightedCodeStep, StarCoderEditHighlightedCodeStep, MessageStep, EmptyStep, SetupContinueWorkspaceStep @@ -28,8 +29,10 @@ class DemoPolicy(Policy): # This could be defined with ObservationTypePolicy. Ergonomics not right though. if "/pytest" in observation.user_input.lower(): return WritePytestsRecipe(instructions=observation.user_input) - elif "/dlt" in observation.user_input.lower() or " dlt" in observation.user_input.lower(): + elif "/dlt" in observation.user_input.lower(): return CreatePipelineRecipe() + elif "/airflow" in observation.user_input.lower(): + return DeployPipelineAirflowRecipe() elif "/comment" in observation.user_input.lower(): return CommentCodeStep() elif "/ask" in observation.user_input: diff --git a/continuedev/src/continuedev/steps/steps_on_startup.py b/continuedev/src/continuedev/steps/steps_on_startup.py index cd40ff56..b1376e8a 100644 --- a/continuedev/src/continuedev/steps/steps_on_startup.py +++ b/continuedev/src/continuedev/steps/steps_on_startup.py @@ -1,11 +1,13 @@ from ..core.main import ContinueSDK, Models, Step from .main import UserInputStep from ..recipes.CreatePipelineRecipe.main import CreatePipelineRecipe +from ..recipes.DeployPipelineAirflowRecipe.main import DeployPipelineAirflowRecipe step_name_to_step_class = { "UserInputStep": UserInputStep, - "CreatePipelineRecipe": CreatePipelineRecipe + "CreatePipelineRecipe": CreatePipelineRecipe, + "DeployPipelineAirflowRecipe": DeployPipelineAirflowRecipe } -- cgit v1.2.3-70-g09d2 From 39cb9c96f21b336db32097f6c48fb814a191c57d Mon Sep 17 00:00:00 2001 From: Nate Sesti Date: Sat, 10 Jun 2023 07:53:31 -0400 Subject: selection step of airflow recipe --- .../recipes/DeployPipelineAirflowRecipe/main.py | 40 ++++--- .../recipes/DeployPipelineAirflowRecipe/steps.py | 120 +-------------------- .../src/continuedev/steps/input/nl_multiselect.py | 27 +++++ 3 files changed, 57 insertions(+), 130 deletions(-) create mode 100644 continuedev/src/continuedev/steps/input/nl_multiselect.py diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py index d7cd03db..8e7d258d 100644 --- a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py @@ -1,10 +1,11 @@ from textwrap import dedent +from ...steps.input.nl_multiselect import NLMultiselectStep from ...core.main import Step from ...core.sdk import ContinueSDK from ...steps.core.core import WaitForUserInputStep from ...steps.main import MessageStep -from .steps import SetupPipelineStep, ValidatePipelineStep +from .steps import SetupPipelineStep # https://github.com/dlt-hub/dlt-deploy-template/blob/master/airflow-composer/dag_template.py @@ -19,19 +20,30 @@ class DeployPipelineAirflowRecipe(Step): async def run(self, sdk: ContinueSDK): text_observation = await sdk.run_step( - MessageStep(name="Building your first dlt pipeline", message=dedent("""\ - This recipe will walk you through the process of creating a dlt pipeline for your chosen data source. With the help of Continue, you will: - - Create a Python virtual environment with dlt installed - - Run `dlt init` to generate a pipeline template - - Write the code to call the API - - Add any required API keys to the `secrets.toml` file - - Test that the API call works - - Load the data into a local DuckDB instance - - Write a query to view the data""")) >> - WaitForUserInputStep( - prompt="What API do you want to load data from? (e.g. weatherapi.com, chess.com)") + MessageStep(name="Deploying a pipeline to Airflow", message=dedent("""\ + This recipe will show you how to deploy a pipeline to Airflow. With the help of Continue, you will: + - Select a dlt-verified pipeline + - Setup the pipeline + - Deploy it to Airflow + - Optionally, setup Airflow locally""")) >> + NLMultiselectStep( + prompt=dedent("""\ + Which verified pipeline do you want to deploy with Airflow? The options are: + - Asana + - Chess.com + - GitHub + - Google Analytics + - Google Sheets + - HubSpot + - Matomo + - Pipedrive + - Shopify + - Strapi + - Zendesk"""), + options=[ + "asana_dlt", "chess", "github", "google_analytics", "google_sheets", "hubspot", "matomo", "pipedrive", "shopify_dlt", "strapi", "zendesk" + ]) ) await sdk.run_step( - SetupPipelineStep(api_description=text_observation.text) >> - ValidatePipelineStep() + SetupPipelineStep(source_name=text_observation.text) >> ) diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py index c32ae923..b69b3adc 100644 --- a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py @@ -19,134 +19,22 @@ class SetupPipelineStep(Step): hide: bool = True name: str = "Setup dlt Pipeline" - api_description: str # e.g. "I want to load data from the weatherapi.com API" + source_name: str async def describe(self, models: Models): - return dedent(f"""\ - This step will create a new dlt pipeline that loads data from an API, as per your request: - {self.api_description} - """) + pass async def run(self, sdk: ContinueSDK): - sdk.context.set("api_description", self.api_description) - - source_name = (await sdk.models.gpt35()).complete( - f"Write a snake_case name for the data source described by {self.api_description}: ").strip() - filename = f'{source_name}.py' - - # running commands to get started when creating a new dlt pipeline await sdk.run([ 'python3 -m venv env', 'source env/bin/activate', 'pip install dlt', - f'dlt init {source_name} duckdb\n\rY', + f'dlt --non-interactive init {self.source_name} duckdb', 'pip install -r requirements.txt' ], description=dedent(f"""\ Running the following commands: - `python3 -m venv env`: Create a Python virtual environment - `source env/bin/activate`: Activate the virtual environment - `pip install dlt`: Install dlt - - `dlt init {source_name} duckdb`: Create a new dlt pipeline called {source_name} that loads data into a local DuckDB instance + - `dlt init {self.source_name} duckdb`: Create a new dlt pipeline called {self.source_name} that loads data into a local DuckDB instance - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""), name="Setup Python environment") - - # editing the resource function to call the requested API - await sdk.ide.highlightCode(RangeInFile(filepath=os.path.join(await sdk.ide.getWorkspaceDirectory(), filename), range=Range.from_shorthand(15, 0, 29, 0)), "#00ff0022") - - # sdk.set_loading_message("Writing code to call the API...") - await sdk.edit_file( - filename=filename, - prompt=f'Edit the resource function to call the API described by this: {self.api_description}. Do not move or remove the exit() call in __main__.', - name=f"Edit the resource function to call the API {AI_ASSISTED_STRING}" - ) - - time.sleep(1) - - # wait for user to put API key in secrets.toml - await sdk.ide.setFileOpen(await sdk.ide.getWorkspaceDirectory() + "/.dlt/secrets.toml") - await sdk.wait_for_user_confirmation("If this service requires an API key, please add it to the `secrets.toml` file and then press `Continue`") - - sdk.context.set("source_name", source_name) - - -class ValidatePipelineStep(Step): - hide: bool = True - - async def run(self, sdk: ContinueSDK): - workspace_dir = await sdk.ide.getWorkspaceDirectory() - source_name = sdk.context.get("source_name") - filename = f'{source_name}.py' - - # await sdk.run_step(MessageStep(name="Validate the pipeline", message=dedent("""\ - # Next, we will validate that your dlt pipeline is working as expected: - # - Test that the API call works - # - Load the data into a local DuckDB instance - # - Write a query to view the data - # """))) - - # test that the API call works - output = await sdk.run(f'python3 {filename}', name="Test the pipeline", description=f"Running `python3 {filename}` to test loading data from the API") - - # If it fails, return the error - if "Traceback" in output: - output = "Traceback" + output.split("Traceback")[-1] - file_content = await sdk.ide.readFile(os.path.join(workspace_dir, filename)) - suggestion = (await sdk.models.gpt35()).complete(dedent(f"""\ - ```python - {file_content} - ``` - This above code is a dlt pipeline that loads data from an API. The function with the @resource decorator is responsible for calling the API and returning the data. While attempting to run the pipeline, the following error occurred: - - ```ascii - {output} - ``` - - This is a brief summary of the error followed by a suggestion on how it can be fixed by editing the resource function:""")) - - api_documentation_url = (await sdk.models.gpt35()).complete(dedent(f"""\ - The API I am trying to call is the '{sdk.context.get('api_description')}'. I tried calling it in the @resource function like this: - ```python - {file_content} - ``` - What is the URL for the API documentation that will help me learn how to make this call? Please format in markdown so I can click the link.""")) - - sdk.raise_exception( - title=f"Error while running pipeline.\nFix the resource function in {filename} and rerun this step", message=output, with_step=MessageStep(name=f"Suggestion to solve error {AI_ASSISTED_STRING}", message=dedent(f"""\ - {suggestion} - - {api_documentation_url} - - After you've fixed the code, click the retry button at the top of the Validate Pipeline step above."""))) - - # remove exit() from the main main function - await sdk.run_step(MessageStep(name="Remove early exit() from main function", message="Remove the early exit() from the main function now that we are done testing and want the pipeline to load the data into DuckDB.")) - - contents = await sdk.ide.readFile(os.path.join(workspace_dir, filename)) - replacement = "\n".join( - list(filter(lambda line: line.strip() != "exit()", contents.split("\n")))) - await sdk.ide.applyFileSystemEdit(FileEdit( - filepath=os.path.join(workspace_dir, filename), - replacement=replacement, - range=Range.from_entire_file(contents) - )) - - # load the data into the DuckDB instance - await sdk.run(f'python3 {filename}', name="Load data into DuckDB", description=f"Running python3 {filename} to load data into DuckDB") - - table_name = f"{source_name}.{source_name}_resource" - tables_query_code = dedent(f'''\ - import duckdb - - # connect to DuckDB instance - conn = duckdb.connect(database="{source_name}.duckdb") - - # get table names - rows = conn.execute("SELECT * FROM {table_name};").fetchall() - - # print table names - for row in rows: - print(row) - ''') - - query_filename = os.path.join(workspace_dir, "query.py") - await sdk.apply_filesystem_edit(AddFile(filepath=query_filename, content=tables_query_code), name="Add query.py file", description="Adding a file called `query.py` to the workspace that will run a test query on the DuckDB instance") - await sdk.run('env/bin/python3 query.py', name="Run test query", description="Running `env/bin/python3 query.py` to test that the data was loaded into DuckDB as expected") diff --git a/continuedev/src/continuedev/steps/input/nl_multiselect.py b/continuedev/src/continuedev/steps/input/nl_multiselect.py new file mode 100644 index 00000000..c3c832f5 --- /dev/null +++ b/continuedev/src/continuedev/steps/input/nl_multiselect.py @@ -0,0 +1,27 @@ +from typing import List, Union +from ..core.core import WaitForUserInputStep +from ...core.main import Step +from ...core.sdk import ContinueSDK + + +class NLMultiselectStep(Step): + hide: bool = True + + prompt: str + options: List[str] + + async def run(self, sdk: ContinueSDK): + user_response = (await sdk.run_step(WaitForUserInputStep(prompt=self.prompt))).text + + def extract_option(text: str) -> Union[str, None]: + for option in self.options: + if option in text: + return option + return None + + first_try = extract_option(user_response.lower()) + if first_try is not None: + return first_try + + gpt_parsed = await sdk.models.gpt35.complete(f"These are the available options are: [{', '.join(self.options)}]. The user requested {user_response}. This is the exact string from the options array that they selected:") + return extract_option(gpt_parsed) or self.options[0] -- cgit v1.2.3-70-g09d2 From ba0479d90d6f7cf16366485fce7095b728760848 Mon Sep 17 00:00:00 2001 From: Nate Sesti Date: Sun, 11 Jun 2023 19:02:34 -0700 Subject: working on airflow --- .../recipes/DeployPipelineAirflowRecipe/main.py | 7 +++-- .../recipes/DeployPipelineAirflowRecipe/steps.py | 36 ++++++++++++++++++++++ 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py index 8e7d258d..fbd6e11d 100644 --- a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py @@ -5,7 +5,7 @@ from ...core.main import Step from ...core.sdk import ContinueSDK from ...steps.core.core import WaitForUserInputStep from ...steps.main import MessageStep -from .steps import SetupPipelineStep +from .steps import SetupPipelineStep, DeployAirflowStep # https://github.com/dlt-hub/dlt-deploy-template/blob/master/airflow-composer/dag_template.py @@ -19,7 +19,7 @@ class DeployPipelineAirflowRecipe(Step): hide: bool = True async def run(self, sdk: ContinueSDK): - text_observation = await sdk.run_step( + source_name = await sdk.run_step( MessageStep(name="Deploying a pipeline to Airflow", message=dedent("""\ This recipe will show you how to deploy a pipeline to Airflow. With the help of Continue, you will: - Select a dlt-verified pipeline @@ -45,5 +45,6 @@ class DeployPipelineAirflowRecipe(Step): ]) ) await sdk.run_step( - SetupPipelineStep(source_name=text_observation.text) >> + SetupPipelineStep(source_name=source_name) >> + DeployAirflowStep(source_name=source_name) ) diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py index b69b3adc..6a912f0c 100644 --- a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py @@ -3,6 +3,7 @@ import subprocess from textwrap import dedent import time +from ...steps.core.core import WaitForUserInputStep from ...models.main import Range from ...models.filesystem import RangeInFile from ...steps.main import MessageStep @@ -38,3 +39,38 @@ class SetupPipelineStep(Step): - `pip install dlt`: Install dlt - `dlt init {self.source_name} duckdb`: Create a new dlt pipeline called {self.source_name} that loads data into a local DuckDB instance - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""), name="Setup Python environment") + + +class DeployAirflowStep(Step): + hide: bool = True + source_name: str + + async def run(self, sdk: ContinueSDK): + + # Run dlt command to deploy pipeline to Airflow + await sdk.run([ + f'dlt --non-interactive deploy {self.source_name}_pipeline.py airflow-composer', + ], description="Running `dlt deploy airflow` to deploy the dlt pipeline to Airflow", name="Deploy dlt pipeline to Airflow") + + # Modify the DAG file + directory = await sdk.ide.getWorkspaceDirectory() + filepath = os.path.join( + directory, f"dags/dag_{self.source_name}_pipeline.py") + + # TODO: Find and replace in file step. + old_file_contents = await sdk.ide.readFile(filepath) + file_contents = old_file_contents.replace("pipeline_name", f"{self.source_name}_pipeline").replace( + "dataset_name", f"{self.source_name}_dataset") + await sdk.apply_filesystem_edit(FileEdit(filepath=filepath, range=Range.from_entire_file(filepath, old_file_contents), replacement=file_contents)) + + # Prompt the user for the DAG schedule + response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)", name="Set DAG Schedule")) + edit_dag_range = Range.from_shorthand(18, 0, 23, 0) + await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=filepath, range=edit_dag_range)) + await sdk.edit_file(filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response}'", + range=edit_dag_range) + + # Tell the user to check the schedule and fill in owner, email, other default_args + await sdk.run_step(MessageStep(message="Fill in the owner, email, and other default_args in the DAG file with your own personal information.", name="Fill in default_args")) + + # Run the DAG locally ?? -- cgit v1.2.3-70-g09d2 From bb44ad69a91be1d678baa04acb07777b8cd325ed Mon Sep 17 00:00:00 2001 From: Nate Sesti Date: Sun, 11 Jun 2023 22:22:46 -0700 Subject: finished airflow recipe --- .../recipes/DeployPipelineAirflowRecipe/steps.py | 16 ++++++------- .../src/continuedev/steps/find_and_replace.py | 26 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 8 deletions(-) create mode 100644 continuedev/src/continuedev/steps/find_and_replace.py diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py index 6a912f0c..ce910252 100644 --- a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py @@ -12,6 +12,7 @@ from ...core.observation import DictObservation, InternalErrorObservation from ...models.filesystem_edit import AddFile, FileEdit from ...core.main import Step from ...core.sdk import ContinueSDK +from ...steps.find_and_replace import FindAndReplaceStep AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)" @@ -57,17 +58,16 @@ class DeployAirflowStep(Step): filepath = os.path.join( directory, f"dags/dag_{self.source_name}_pipeline.py") - # TODO: Find and replace in file step. - old_file_contents = await sdk.ide.readFile(filepath) - file_contents = old_file_contents.replace("pipeline_name", f"{self.source_name}_pipeline").replace( - "dataset_name", f"{self.source_name}_dataset") - await sdk.apply_filesystem_edit(FileEdit(filepath=filepath, range=Range.from_entire_file(filepath, old_file_contents), replacement=file_contents)) + # Replace the pipeline name and dataset name + await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'pipeline_name'", replacement=f"'{self.source_name}_pipeline'")) + await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'dataset_name'", replacement=f"'{self.source_name}_data'")) + await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="pipeline_or_source_script", replacement=f"{self.source_name}_pipeline")) # Prompt the user for the DAG schedule - response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)", name="Set DAG Schedule")) edit_dag_range = Range.from_shorthand(18, 0, 23, 0) - await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=filepath, range=edit_dag_range)) - await sdk.edit_file(filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response}'", + await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=filepath, range=edit_dag_range), color="#33993333") + response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)")) + await sdk.edit_file(filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response.text}'", range=edit_dag_range) # Tell the user to check the schedule and fill in owner, email, other default_args diff --git a/continuedev/src/continuedev/steps/find_and_replace.py b/continuedev/src/continuedev/steps/find_and_replace.py new file mode 100644 index 00000000..78511b27 --- /dev/null +++ b/continuedev/src/continuedev/steps/find_and_replace.py @@ -0,0 +1,26 @@ +from ..models.filesystem_edit import FileEdit, Range +from ..core.main import Models, Step +from ..core.sdk import ContinueSDK + + +class FindAndReplaceStep(Step): + name: str = "Find and replace" + filepath: str + pattern: str + replacement: str + + async def describe(self, models: Models): + return f"Replace all instances of `{self.pattern}` with `{self.replacement}` in `{self.filepath}`" + + async def run(self, sdk: ContinueSDK): + file_content = await sdk.ide.readFile(self.filepath) + while self.pattern in file_content: + start_index = file_content.index(self.pattern) + end_index = start_index + len(self.pattern) + await sdk.ide.applyFileSystemEdit(FileEdit( + filepath=self.filepath, + range=Range.from_indices(file_content, start_index, end_index), + replacement=self.replacement + )) + file_content = file_content[:start_index] + \ + self.replacement + file_content[end_index:] -- cgit v1.2.3-70-g09d2