diff options
author | Nate Sesti <33237525+sestinj@users.noreply.github.com> | 2023-06-12 02:39:26 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-12 02:39:26 -0400 |
commit | bde21973226c2ff35fa5beeac20edc2e4b2270c1 (patch) | |
tree | 260d020936b29d351d4afbc8406bbe4c9741bb4b | |
parent | c09e5e20986e6337a087dd18b30f08c4580cb389 (diff) | |
parent | 5c75b013162aae8504f9d255f885caf125181c7b (diff) | |
download | sncontinue-bde21973226c2ff35fa5beeac20edc2e4b2270c1.tar.gz sncontinue-bde21973226c2ff35fa5beeac20edc2e4b2270c1.tar.bz2 sncontinue-bde21973226c2ff35fa5beeac20edc2e4b2270c1.zip |
Merge pull request #72 from continuedev/deploy-airflow
deploy pipeline with airflow recipe
7 files changed, 185 insertions, 0 deletions
diff --git a/continuedev/src/continuedev/core/policy.py b/continuedev/src/continuedev/core/policy.py index 7661f0c4..a4c8d60f 100644 --- a/continuedev/src/continuedev/core/policy.py +++ b/continuedev/src/continuedev/core/policy.py @@ -3,6 +3,7 @@ from typing import List, Tuple, Type from ..steps.chroma import AnswerQuestionChroma, EditFileChroma, CreateCodebaseIndexChroma from ..steps.steps_on_startup import StepsOnStartupStep from ..recipes.CreatePipelineRecipe.main import CreatePipelineRecipe +from ..recipes.DeployPipelineAirflowRecipe.main import DeployPipelineAirflowRecipe from ..recipes.AddTransformRecipe.main import AddTransformRecipe from .main import Step, Validator, History, Policy from .observation import Observation, TracebackObservation, UserInputObservation @@ -32,6 +33,8 @@ class DemoPolicy(Policy): return WritePytestsRecipe(instructions=observation.user_input) elif "/dlt" in observation.user_input.lower(): return CreatePipelineRecipe() + elif "/airflow" in observation.user_input.lower(): + return DeployPipelineAirflowRecipe() elif "/transform" in observation.user_input.lower(): return AddTransformRecipe() elif "/comment" in observation.user_input.lower(): diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py new file mode 100644 index 00000000..fbd6e11d --- /dev/null +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py @@ -0,0 +1,50 @@ +from textwrap import dedent + +from ...steps.input.nl_multiselect import NLMultiselectStep +from ...core.main import Step +from ...core.sdk import ContinueSDK +from ...steps.core.core import WaitForUserInputStep +from ...steps.main import MessageStep +from .steps import SetupPipelineStep, DeployAirflowStep + + +# https://github.com/dlt-hub/dlt-deploy-template/blob/master/airflow-composer/dag_template.py +# https://www.notion.so/dlthub/Deploy-a-pipeline-with-Airflow-245fd1058652479494307ead0b5565f3 +# 1. What verified pipeline do you want to deploy with Airflow? +# 2. Set up selected verified pipeline +# 3. Deploy selected verified pipeline with Airflow +# 4. Set up Airflow locally? + +class DeployPipelineAirflowRecipe(Step): + hide: bool = True + + async def run(self, sdk: ContinueSDK): + source_name = await sdk.run_step( + MessageStep(name="Deploying a pipeline to Airflow", message=dedent("""\ + This recipe will show you how to deploy a pipeline to Airflow. With the help of Continue, you will: + - Select a dlt-verified pipeline + - Setup the pipeline + - Deploy it to Airflow + - Optionally, setup Airflow locally""")) >> + NLMultiselectStep( + prompt=dedent("""\ + Which verified pipeline do you want to deploy with Airflow? The options are: + - Asana + - Chess.com + - GitHub + - Google Analytics + - Google Sheets + - HubSpot + - Matomo + - Pipedrive + - Shopify + - Strapi + - Zendesk"""), + options=[ + "asana_dlt", "chess", "github", "google_analytics", "google_sheets", "hubspot", "matomo", "pipedrive", "shopify_dlt", "strapi", "zendesk" + ]) + ) + await sdk.run_step( + SetupPipelineStep(source_name=source_name) >> + DeployAirflowStep(source_name=source_name) + ) diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py new file mode 100644 index 00000000..ce910252 --- /dev/null +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py @@ -0,0 +1,76 @@ +import os +import subprocess +from textwrap import dedent +import time + +from ...steps.core.core import WaitForUserInputStep +from ...models.main import Range +from ...models.filesystem import RangeInFile +from ...steps.main import MessageStep +from ...core.sdk import Models +from ...core.observation import DictObservation, InternalErrorObservation +from ...models.filesystem_edit import AddFile, FileEdit +from ...core.main import Step +from ...core.sdk import ContinueSDK +from ...steps.find_and_replace import FindAndReplaceStep + +AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)" + + +class SetupPipelineStep(Step): + hide: bool = True + name: str = "Setup dlt Pipeline" + + source_name: str + + async def describe(self, models: Models): + pass + + async def run(self, sdk: ContinueSDK): + await sdk.run([ + 'python3 -m venv env', + 'source env/bin/activate', + 'pip install dlt', + f'dlt --non-interactive init {self.source_name} duckdb', + 'pip install -r requirements.txt' + ], description=dedent(f"""\ + Running the following commands: + - `python3 -m venv env`: Create a Python virtual environment + - `source env/bin/activate`: Activate the virtual environment + - `pip install dlt`: Install dlt + - `dlt init {self.source_name} duckdb`: Create a new dlt pipeline called {self.source_name} that loads data into a local DuckDB instance + - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""), name="Setup Python environment") + + +class DeployAirflowStep(Step): + hide: bool = True + source_name: str + + async def run(self, sdk: ContinueSDK): + + # Run dlt command to deploy pipeline to Airflow + await sdk.run([ + f'dlt --non-interactive deploy {self.source_name}_pipeline.py airflow-composer', + ], description="Running `dlt deploy airflow` to deploy the dlt pipeline to Airflow", name="Deploy dlt pipeline to Airflow") + + # Modify the DAG file + directory = await sdk.ide.getWorkspaceDirectory() + filepath = os.path.join( + directory, f"dags/dag_{self.source_name}_pipeline.py") + + # Replace the pipeline name and dataset name + await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'pipeline_name'", replacement=f"'{self.source_name}_pipeline'")) + await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'dataset_name'", replacement=f"'{self.source_name}_data'")) + await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="pipeline_or_source_script", replacement=f"{self.source_name}_pipeline")) + + # Prompt the user for the DAG schedule + edit_dag_range = Range.from_shorthand(18, 0, 23, 0) + await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=filepath, range=edit_dag_range), color="#33993333") + response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)")) + await sdk.edit_file(filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response.text}'", + range=edit_dag_range) + + # Tell the user to check the schedule and fill in owner, email, other default_args + await sdk.run_step(MessageStep(message="Fill in the owner, email, and other default_args in the DAG file with your own personal information.", name="Fill in default_args")) + + # Run the DAG locally ?? diff --git a/continuedev/src/continuedev/steps/find_and_replace.py b/continuedev/src/continuedev/steps/find_and_replace.py new file mode 100644 index 00000000..78511b27 --- /dev/null +++ b/continuedev/src/continuedev/steps/find_and_replace.py @@ -0,0 +1,26 @@ +from ..models.filesystem_edit import FileEdit, Range +from ..core.main import Models, Step +from ..core.sdk import ContinueSDK + + +class FindAndReplaceStep(Step): + name: str = "Find and replace" + filepath: str + pattern: str + replacement: str + + async def describe(self, models: Models): + return f"Replace all instances of `{self.pattern}` with `{self.replacement}` in `{self.filepath}`" + + async def run(self, sdk: ContinueSDK): + file_content = await sdk.ide.readFile(self.filepath) + while self.pattern in file_content: + start_index = file_content.index(self.pattern) + end_index = start_index + len(self.pattern) + await sdk.ide.applyFileSystemEdit(FileEdit( + filepath=self.filepath, + range=Range.from_indices(file_content, start_index, end_index), + replacement=self.replacement + )) + file_content = file_content[:start_index] + \ + self.replacement + file_content[end_index:] diff --git a/continuedev/src/continuedev/steps/input/nl_multiselect.py b/continuedev/src/continuedev/steps/input/nl_multiselect.py new file mode 100644 index 00000000..c3c832f5 --- /dev/null +++ b/continuedev/src/continuedev/steps/input/nl_multiselect.py @@ -0,0 +1,27 @@ +from typing import List, Union +from ..core.core import WaitForUserInputStep +from ...core.main import Step +from ...core.sdk import ContinueSDK + + +class NLMultiselectStep(Step): + hide: bool = True + + prompt: str + options: List[str] + + async def run(self, sdk: ContinueSDK): + user_response = (await sdk.run_step(WaitForUserInputStep(prompt=self.prompt))).text + + def extract_option(text: str) -> Union[str, None]: + for option in self.options: + if option in text: + return option + return None + + first_try = extract_option(user_response.lower()) + if first_try is not None: + return first_try + + gpt_parsed = await sdk.models.gpt35.complete(f"These are the available options are: [{', '.join(self.options)}]. The user requested {user_response}. This is the exact string from the options array that they selected:") + return extract_option(gpt_parsed) or self.options[0] diff --git a/continuedev/src/continuedev/steps/steps_on_startup.py b/continuedev/src/continuedev/steps/steps_on_startup.py index 63dedd82..fbdbbcff 100644 --- a/continuedev/src/continuedev/steps/steps_on_startup.py +++ b/continuedev/src/continuedev/steps/steps_on_startup.py @@ -1,11 +1,14 @@ from ..core.main import ContinueSDK, Models, Step from .main import UserInputStep from ..recipes.CreatePipelineRecipe.main import CreatePipelineRecipe +from ..recipes.DeployPipelineAirflowRecipe.main import DeployPipelineAirflowRecipe + from ..recipes.AddTransformRecipe.main import AddTransformRecipe step_name_to_step_class = { "UserInputStep": UserInputStep, "CreatePipelineRecipe": CreatePipelineRecipe, + "DeployPipelineAirflowRecipe": DeployPipelineAirflowRecipe, "AddTransformRecipe": AddTransformRecipe } |