diff options
Diffstat (limited to 'server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe')
3 files changed, 211 insertions, 0 deletions
diff --git a/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/README.md b/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/README.md new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/README.md diff --git a/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/main.py b/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/main.py new file mode 100644 index 00000000..5b0bd320 --- /dev/null +++ b/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/main.py @@ -0,0 +1,86 @@ +from textwrap import dedent + +from ....core.main import Step +from ....core.sdk import ContinueSDK +from ....core.steps import MessageStep +from ....plugins.steps.input.nl_multiselect import NLMultiselectStep +from .steps import DeployAirflowStep, RunPipelineStep, SetupPipelineStep + +# https://github.com/dlt-hub/dlt-deploy-template/blob/master/airflow-composer/dag_template.py +# https://www.notion.so/dlthub/Deploy-a-pipeline-with-Airflow-245fd1058652479494307ead0b5565f3 +# 1. What verified pipeline do you want to deploy with Airflow? +# 2. Set up selected verified pipeline +# 3. Deploy selected verified pipeline with Airflow +# 4. Set up Airflow locally? + + +class DeployPipelineAirflowRecipe(Step): +    hide: bool = True + +    async def run(self, sdk: ContinueSDK): +        source_name = await sdk.run_step( +            MessageStep( +                name="Deploying a pipeline to Airflow", +                message=dedent( +                    """\ +                This recipe will show you how to deploy a pipeline to Airflow. With the help of Continue, you will: +                - Select a dlt-verified pipeline +                - Setup the pipeline +                - Deploy it to Airflow +                - Optionally, setup Airflow locally""" +                ), +            ) +            >> NLMultiselectStep( +                prompt=dedent( +                    """\ +                    Which verified pipeline do you want to deploy with Airflow? The options are: +                    - Asana +                    - Chess.com +                    - Facebook Ads +                    - GitHub +                    - Google Analytics +                    - Google Sheets +                    - HubSpot +                    - Jira +                    - Matomo +                    - Mux +                    - Notion +                    - Pipedrive +                    - Pokemon +                    - Salesforce +                    - Shopify +                    - Strapi +                    - Stripe +                    - SQL Database +                    - Workable +                    - Zendesk""" +                ), +                options=[ +                    "asana_dlt", +                    "chess", +                    "github", +                    "google_analytics", +                    "google_sheets", +                    "hubspot", +                    "matomo", +                    "pipedrive", +                    "shopify_dlt", +                    "strapi", +                    "zendesk", +                    "facebook_ads", +                    "jira", +                    "mux", +                    "notion", +                    "pokemon", +                    "salesforce", +                    "stripe_analytics", +                    "sql_database", +                    "workable", +                ], +            ) +        ) +        await sdk.run_step( +            SetupPipelineStep(source_name=source_name) +            >> RunPipelineStep(source_name=source_name) +            >> DeployAirflowStep(source_name=source_name) +        ) diff --git a/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/steps.py b/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/steps.py new file mode 100644 index 00000000..e4a932af --- /dev/null +++ b/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/steps.py @@ -0,0 +1,125 @@ +import os +from textwrap import dedent + +from ....core.main import Step +from ....core.sdk import ContinueSDK, Models +from ....core.steps import MessageStep +from ....plugins.steps.find_and_replace import FindAndReplaceStep + +AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)" + + +class SetupPipelineStep(Step): +    hide: bool = True +    name: str = "Setup dlt Pipeline" + +    source_name: str + +    async def describe(self, models: Models): +        pass + +    async def run(self, sdk: ContinueSDK): +        await sdk.run( +            [ +                "python3 -m venv .env", +                "source .env/bin/activate", +                "pip install dlt", +                f"dlt --non-interactive init {self.source_name} duckdb", +                "pip install -r requirements.txt", +            ], +            description=dedent( +                f"""\ +            Running the following commands: +            - `python3 -m venv .env`: Create a Python virtual environment +            - `source .env/bin/activate`: Activate the virtual environment +            - `pip install dlt`: Install dlt +            - `dlt init {self.source_name} duckdb`: Create a new dlt pipeline called {self.source_name} that loads data into a local DuckDB instance +            - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline""" +            ), +            name="Setup Python environment", +        ) + + +class RunPipelineStep(Step): +    hide: bool = True +    name: str = "Run dlt Pipeline" + +    source_name: str + +    async def describe(self, models: Models): +        pass + +    async def run(self, sdk: ContinueSDK): +        await sdk.run( +            [ +                f"python3 {self.source_name}_pipeline.py", +            ], +            description=dedent( +                f"""\ +            Running the command `python3 {self.source_name}_pipeline.py to run the pipeline: """ +            ), +            name="Run dlt pipeline", +        ) + + +class DeployAirflowStep(Step): +    hide: bool = True +    source_name: str + +    async def run(self, sdk: ContinueSDK): +        # Run dlt command to deploy pipeline to Airflow +        await sdk.run( +            [ +                "git init", +                f"dlt --non-interactive deploy {self.source_name}_pipeline.py airflow-composer", +            ], +            description="Running `dlt deploy airflow` to deploy the dlt pipeline to Airflow", +            name="Deploy dlt pipeline to Airflow", +        ) + +        # Get filepaths, open the DAG file +        directory = await sdk.ide.getWorkspaceDirectory() +        pipeline_filepath = os.path.join(directory, f"{self.source_name}_pipeline.py") +        dag_filepath = os.path.join( +            directory, f"dags/dag_{self.source_name}_pipeline.py" +        ) + +        await sdk.ide.setFileOpen(dag_filepath) + +        # Replace the pipeline name and dataset name +        await sdk.run_step( +            FindAndReplaceStep( +                filepath=pipeline_filepath, +                pattern="'pipeline_name'", +                replacement=f"'{self.source_name}_pipeline'", +            ) +        ) +        await sdk.run_step( +            FindAndReplaceStep( +                filepath=pipeline_filepath, +                pattern="'dataset_name'", +                replacement=f"'{self.source_name}_data'", +            ) +        ) +        await sdk.run_step( +            FindAndReplaceStep( +                filepath=pipeline_filepath, +                pattern="pipeline_or_source_script", +                replacement=f"{self.source_name}_pipeline", +            ) +        ) + +        # Prompt the user for the DAG schedule +        # edit_dag_range = Range.from_shorthand(18, 0, 23, 0) +        # await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=dag_filepath, range=edit_dag_range), color="#33993333") +        # response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)")) +        # await sdk.edit_file(dag_filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response.text}'", +        #                     range=edit_dag_range) + +        # Tell the user to check the schedule and fill in owner, email, other default_args +        await sdk.run_step( +            MessageStep( +                message="Fill in the owner, email, and other default_args in the DAG file with your own personal information. Then the DAG will be ready to run!", +                name="Fill in default_args", +            ) +        )  | 
