diff options
| author | Nate Sesti <sestinj@gmail.com> | 2023-06-11 22:50:34 -0700 | 
|---|---|---|
| committer | Nate Sesti <sestinj@gmail.com> | 2023-06-11 22:50:34 -0700 | 
| commit | 8691b101761602f272700e71f340068d4c1ef5c4 (patch) | |
| tree | 9716cdb81f755ebe85687e269992480541e31f2c /continuedev/src | |
| parent | 904a5adccad2ff49370082c32dc90457898fd6d4 (diff) | |
| parent | bb44ad69a91be1d678baa04acb07777b8cd325ed (diff) | |
| download | sncontinue-8691b101761602f272700e71f340068d4c1ef5c4.tar.gz sncontinue-8691b101761602f272700e71f340068d4c1ef5c4.tar.bz2 sncontinue-8691b101761602f272700e71f340068d4c1ef5c4.zip | |
Merge branch 'deploy-airflow' into dd-to-bq
Diffstat (limited to 'continuedev/src')
7 files changed, 186 insertions, 2 deletions
| diff --git a/continuedev/src/continuedev/core/policy.py b/continuedev/src/continuedev/core/policy.py index 8612d834..3bb9f61a 100644 --- a/continuedev/src/continuedev/core/policy.py +++ b/continuedev/src/continuedev/core/policy.py @@ -3,6 +3,7 @@ from typing import List, Tuple, Type  from ..steps.chroma import AnswerQuestionChroma, EditFileChroma, CreateCodebaseIndexChroma  from ..steps.steps_on_startup import StepsOnStartupStep  from ..recipes.CreatePipelineRecipe.main import CreatePipelineRecipe +from ..recipes.DeployPipelineAirflowRecipe.main import DeployPipelineAirflowRecipe  from .main import Step, Validator, History, Policy  from .observation import Observation, TracebackObservation, UserInputObservation  from ..steps.main import EditHighlightedCodeStep, SolveTracebackStep, RunCodeStep, FasterEditHighlightedCodeStep, StarCoderEditHighlightedCodeStep, MessageStep, EmptyStep, SetupContinueWorkspaceStep @@ -29,10 +30,12 @@ class DemoPolicy(Policy):              # This could be defined with ObservationTypePolicy. Ergonomics not right though.              if "/pytest" in observation.user_input.lower():                  return WritePytestsRecipe(instructions=observation.user_input) -            elif "/dlt" in observation.user_input.lower() or " dlt" in observation.user_input.lower(): +            elif "/dlt" in observation.user_input.lower():                  return CreatePipelineRecipe()              elif "/ddtobq" in observation.user_input.lower():                  return DDtoBQRecipeRecipe() +            elif "/airflow" in observation.user_input.lower(): +                return DeployPipelineAirflowRecipe()              elif "/comment" in observation.user_input.lower():                  return CommentCodeStep()              elif "/ask" in observation.user_input: diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py new file mode 100644 index 00000000..fbd6e11d --- /dev/null +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py @@ -0,0 +1,50 @@ +from textwrap import dedent + +from ...steps.input.nl_multiselect import NLMultiselectStep +from ...core.main import Step +from ...core.sdk import ContinueSDK +from ...steps.core.core import WaitForUserInputStep +from ...steps.main import MessageStep +from .steps import SetupPipelineStep, DeployAirflowStep + + +# https://github.com/dlt-hub/dlt-deploy-template/blob/master/airflow-composer/dag_template.py +# https://www.notion.so/dlthub/Deploy-a-pipeline-with-Airflow-245fd1058652479494307ead0b5565f3 +# 1. What verified pipeline do you want to deploy with Airflow? +# 2. Set up selected verified pipeline +# 3. Deploy selected verified pipeline with Airflow +# 4. Set up Airflow locally? + +class DeployPipelineAirflowRecipe(Step): +    hide: bool = True + +    async def run(self, sdk: ContinueSDK): +        source_name = await sdk.run_step( +            MessageStep(name="Deploying a pipeline to Airflow", message=dedent("""\ +                This recipe will show you how to deploy a pipeline to Airflow. With the help of Continue, you will: +                - Select a dlt-verified pipeline +                - Setup the pipeline +                - Deploy it to Airflow +                - Optionally, setup Airflow locally""")) >> +            NLMultiselectStep( +                prompt=dedent("""\ +                    Which verified pipeline do you want to deploy with Airflow? The options are: +                    - Asana +                    - Chess.com +                    - GitHub +                    - Google Analytics +                    - Google Sheets +                    - HubSpot +                    - Matomo +                    - Pipedrive +                    - Shopify +                    - Strapi +                    - Zendesk"""), +                options=[ +                    "asana_dlt", "chess", "github", "google_analytics", "google_sheets", "hubspot", "matomo", "pipedrive", "shopify_dlt", "strapi", "zendesk" +                ]) +        ) +        await sdk.run_step( +            SetupPipelineStep(source_name=source_name) >> +            DeployAirflowStep(source_name=source_name) +        ) diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py new file mode 100644 index 00000000..ce910252 --- /dev/null +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py @@ -0,0 +1,76 @@ +import os +import subprocess +from textwrap import dedent +import time + +from ...steps.core.core import WaitForUserInputStep +from ...models.main import Range +from ...models.filesystem import RangeInFile +from ...steps.main import MessageStep +from ...core.sdk import Models +from ...core.observation import DictObservation, InternalErrorObservation +from ...models.filesystem_edit import AddFile, FileEdit +from ...core.main import Step +from ...core.sdk import ContinueSDK +from ...steps.find_and_replace import FindAndReplaceStep + +AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)" + + +class SetupPipelineStep(Step): +    hide: bool = True +    name: str = "Setup dlt Pipeline" + +    source_name: str + +    async def describe(self, models: Models): +        pass + +    async def run(self, sdk: ContinueSDK): +        await sdk.run([ +            'python3 -m venv env', +            'source env/bin/activate', +            'pip install dlt', +            f'dlt --non-interactive init {self.source_name} duckdb', +            'pip install -r requirements.txt' +        ], description=dedent(f"""\ +            Running the following commands: +            - `python3 -m venv env`: Create a Python virtual environment +            - `source env/bin/activate`: Activate the virtual environment +            - `pip install dlt`: Install dlt +            - `dlt init {self.source_name} duckdb`: Create a new dlt pipeline called {self.source_name} that loads data into a local DuckDB instance +            - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""), name="Setup Python environment") + + +class DeployAirflowStep(Step): +    hide: bool = True +    source_name: str + +    async def run(self, sdk: ContinueSDK): + +        # Run dlt command to deploy pipeline to Airflow +        await sdk.run([ +            f'dlt --non-interactive deploy {self.source_name}_pipeline.py airflow-composer', +        ], description="Running `dlt deploy airflow` to deploy the dlt pipeline to Airflow", name="Deploy dlt pipeline to Airflow") + +        # Modify the DAG file +        directory = await sdk.ide.getWorkspaceDirectory() +        filepath = os.path.join( +            directory, f"dags/dag_{self.source_name}_pipeline.py") + +        # Replace the pipeline name and dataset name +        await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'pipeline_name'", replacement=f"'{self.source_name}_pipeline'")) +        await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'dataset_name'", replacement=f"'{self.source_name}_data'")) +        await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="pipeline_or_source_script", replacement=f"{self.source_name}_pipeline")) + +        # Prompt the user for the DAG schedule +        edit_dag_range = Range.from_shorthand(18, 0, 23, 0) +        await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=filepath, range=edit_dag_range), color="#33993333") +        response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)")) +        await sdk.edit_file(filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response.text}'", +                            range=edit_dag_range) + +        # Tell the user to check the schedule and fill in owner, email, other default_args +        await sdk.run_step(MessageStep(message="Fill in the owner, email, and other default_args in the DAG file with your own personal information.", name="Fill in default_args")) + +        # Run the DAG locally ?? diff --git a/continuedev/src/continuedev/steps/find_and_replace.py b/continuedev/src/continuedev/steps/find_and_replace.py new file mode 100644 index 00000000..78511b27 --- /dev/null +++ b/continuedev/src/continuedev/steps/find_and_replace.py @@ -0,0 +1,26 @@ +from ..models.filesystem_edit import FileEdit, Range +from ..core.main import Models, Step +from ..core.sdk import ContinueSDK + + +class FindAndReplaceStep(Step): +    name: str = "Find and replace" +    filepath: str +    pattern: str +    replacement: str + +    async def describe(self, models: Models): +        return f"Replace all instances of `{self.pattern}` with `{self.replacement}` in `{self.filepath}`" + +    async def run(self, sdk: ContinueSDK): +        file_content = await sdk.ide.readFile(self.filepath) +        while self.pattern in file_content: +            start_index = file_content.index(self.pattern) +            end_index = start_index + len(self.pattern) +            await sdk.ide.applyFileSystemEdit(FileEdit( +                filepath=self.filepath, +                range=Range.from_indices(file_content, start_index, end_index), +                replacement=self.replacement +            )) +            file_content = file_content[:start_index] + \ +                self.replacement + file_content[end_index:] diff --git a/continuedev/src/continuedev/steps/input/nl_multiselect.py b/continuedev/src/continuedev/steps/input/nl_multiselect.py new file mode 100644 index 00000000..c3c832f5 --- /dev/null +++ b/continuedev/src/continuedev/steps/input/nl_multiselect.py @@ -0,0 +1,27 @@ +from typing import List, Union +from ..core.core import WaitForUserInputStep +from ...core.main import Step +from ...core.sdk import ContinueSDK + + +class NLMultiselectStep(Step): +    hide: bool = True + +    prompt: str +    options: List[str] + +    async def run(self, sdk: ContinueSDK): +        user_response = (await sdk.run_step(WaitForUserInputStep(prompt=self.prompt))).text + +        def extract_option(text: str) -> Union[str, None]: +            for option in self.options: +                if option in text: +                    return option +            return None + +        first_try = extract_option(user_response.lower()) +        if first_try is not None: +            return first_try + +        gpt_parsed = await sdk.models.gpt35.complete(f"These are the available options are: [{', '.join(self.options)}]. The user requested {user_response}. This is the exact string from the options array that they selected:") +        return extract_option(gpt_parsed) or self.options[0] diff --git a/continuedev/src/continuedev/steps/steps_on_startup.py b/continuedev/src/continuedev/steps/steps_on_startup.py index ba793425..d0b47201 100644 --- a/continuedev/src/continuedev/steps/steps_on_startup.py +++ b/continuedev/src/continuedev/steps/steps_on_startup.py @@ -2,12 +2,14 @@ from ..core.main import ContinueSDK, Models, Step  from .main import UserInputStep  from ..recipes.CreatePipelineRecipe.main import CreatePipelineRecipe  from ..recipes.DDtoBQRecipe.main import DDtoBQRecipeRecipe +from ..recipes.DeployPipelineAirflowRecipe.main import DeployPipelineAirflowRecipe  step_name_to_step_class = {      "UserInputStep": UserInputStep,      "CreatePipelineRecipe": CreatePipelineRecipe, -    "DDtoBQRecipeRecipe": DDtoBQRecipeRecipe +    "DDtoBQRecipeRecipe": DDtoBQRecipeRecipe, +    "DeployPipelineAirflowRecipe": DeployPipelineAirflowRecipe  } | 
