summaryrefslogtreecommitdiff
path: root/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe
diff options
context:
space:
mode:
Diffstat (limited to 'server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe')
-rw-r--r--server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/README.md0
-rw-r--r--server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/main.py86
-rw-r--r--server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/steps.py125
3 files changed, 211 insertions, 0 deletions
diff --git a/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/README.md b/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/README.md
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/README.md
diff --git a/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/main.py b/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/main.py
new file mode 100644
index 00000000..5b0bd320
--- /dev/null
+++ b/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/main.py
@@ -0,0 +1,86 @@
+from textwrap import dedent
+
+from ....core.main import Step
+from ....core.sdk import ContinueSDK
+from ....core.steps import MessageStep
+from ....plugins.steps.input.nl_multiselect import NLMultiselectStep
+from .steps import DeployAirflowStep, RunPipelineStep, SetupPipelineStep
+
+# https://github.com/dlt-hub/dlt-deploy-template/blob/master/airflow-composer/dag_template.py
+# https://www.notion.so/dlthub/Deploy-a-pipeline-with-Airflow-245fd1058652479494307ead0b5565f3
+# 1. What verified pipeline do you want to deploy with Airflow?
+# 2. Set up selected verified pipeline
+# 3. Deploy selected verified pipeline with Airflow
+# 4. Set up Airflow locally?
+
+
+class DeployPipelineAirflowRecipe(Step):
+ hide: bool = True
+
+ async def run(self, sdk: ContinueSDK):
+ source_name = await sdk.run_step(
+ MessageStep(
+ name="Deploying a pipeline to Airflow",
+ message=dedent(
+ """\
+ This recipe will show you how to deploy a pipeline to Airflow. With the help of Continue, you will:
+ - Select a dlt-verified pipeline
+ - Setup the pipeline
+ - Deploy it to Airflow
+ - Optionally, setup Airflow locally"""
+ ),
+ )
+ >> NLMultiselectStep(
+ prompt=dedent(
+ """\
+ Which verified pipeline do you want to deploy with Airflow? The options are:
+ - Asana
+ - Chess.com
+ - Facebook Ads
+ - GitHub
+ - Google Analytics
+ - Google Sheets
+ - HubSpot
+ - Jira
+ - Matomo
+ - Mux
+ - Notion
+ - Pipedrive
+ - Pokemon
+ - Salesforce
+ - Shopify
+ - Strapi
+ - Stripe
+ - SQL Database
+ - Workable
+ - Zendesk"""
+ ),
+ options=[
+ "asana_dlt",
+ "chess",
+ "github",
+ "google_analytics",
+ "google_sheets",
+ "hubspot",
+ "matomo",
+ "pipedrive",
+ "shopify_dlt",
+ "strapi",
+ "zendesk",
+ "facebook_ads",
+ "jira",
+ "mux",
+ "notion",
+ "pokemon",
+ "salesforce",
+ "stripe_analytics",
+ "sql_database",
+ "workable",
+ ],
+ )
+ )
+ await sdk.run_step(
+ SetupPipelineStep(source_name=source_name)
+ >> RunPipelineStep(source_name=source_name)
+ >> DeployAirflowStep(source_name=source_name)
+ )
diff --git a/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/steps.py b/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/steps.py
new file mode 100644
index 00000000..e4a932af
--- /dev/null
+++ b/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/steps.py
@@ -0,0 +1,125 @@
+import os
+from textwrap import dedent
+
+from ....core.main import Step
+from ....core.sdk import ContinueSDK, Models
+from ....core.steps import MessageStep
+from ....plugins.steps.find_and_replace import FindAndReplaceStep
+
+AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)"
+
+
+class SetupPipelineStep(Step):
+ hide: bool = True
+ name: str = "Setup dlt Pipeline"
+
+ source_name: str
+
+ async def describe(self, models: Models):
+ pass
+
+ async def run(self, sdk: ContinueSDK):
+ await sdk.run(
+ [
+ "python3 -m venv .env",
+ "source .env/bin/activate",
+ "pip install dlt",
+ f"dlt --non-interactive init {self.source_name} duckdb",
+ "pip install -r requirements.txt",
+ ],
+ description=dedent(
+ f"""\
+ Running the following commands:
+ - `python3 -m venv .env`: Create a Python virtual environment
+ - `source .env/bin/activate`: Activate the virtual environment
+ - `pip install dlt`: Install dlt
+ - `dlt init {self.source_name} duckdb`: Create a new dlt pipeline called {self.source_name} that loads data into a local DuckDB instance
+ - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""
+ ),
+ name="Setup Python environment",
+ )
+
+
+class RunPipelineStep(Step):
+ hide: bool = True
+ name: str = "Run dlt Pipeline"
+
+ source_name: str
+
+ async def describe(self, models: Models):
+ pass
+
+ async def run(self, sdk: ContinueSDK):
+ await sdk.run(
+ [
+ f"python3 {self.source_name}_pipeline.py",
+ ],
+ description=dedent(
+ f"""\
+ Running the command `python3 {self.source_name}_pipeline.py to run the pipeline: """
+ ),
+ name="Run dlt pipeline",
+ )
+
+
+class DeployAirflowStep(Step):
+ hide: bool = True
+ source_name: str
+
+ async def run(self, sdk: ContinueSDK):
+ # Run dlt command to deploy pipeline to Airflow
+ await sdk.run(
+ [
+ "git init",
+ f"dlt --non-interactive deploy {self.source_name}_pipeline.py airflow-composer",
+ ],
+ description="Running `dlt deploy airflow` to deploy the dlt pipeline to Airflow",
+ name="Deploy dlt pipeline to Airflow",
+ )
+
+ # Get filepaths, open the DAG file
+ directory = await sdk.ide.getWorkspaceDirectory()
+ pipeline_filepath = os.path.join(directory, f"{self.source_name}_pipeline.py")
+ dag_filepath = os.path.join(
+ directory, f"dags/dag_{self.source_name}_pipeline.py"
+ )
+
+ await sdk.ide.setFileOpen(dag_filepath)
+
+ # Replace the pipeline name and dataset name
+ await sdk.run_step(
+ FindAndReplaceStep(
+ filepath=pipeline_filepath,
+ pattern="'pipeline_name'",
+ replacement=f"'{self.source_name}_pipeline'",
+ )
+ )
+ await sdk.run_step(
+ FindAndReplaceStep(
+ filepath=pipeline_filepath,
+ pattern="'dataset_name'",
+ replacement=f"'{self.source_name}_data'",
+ )
+ )
+ await sdk.run_step(
+ FindAndReplaceStep(
+ filepath=pipeline_filepath,
+ pattern="pipeline_or_source_script",
+ replacement=f"{self.source_name}_pipeline",
+ )
+ )
+
+ # Prompt the user for the DAG schedule
+ # edit_dag_range = Range.from_shorthand(18, 0, 23, 0)
+ # await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=dag_filepath, range=edit_dag_range), color="#33993333")
+ # response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)"))
+ # await sdk.edit_file(dag_filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response.text}'",
+ # range=edit_dag_range)
+
+ # Tell the user to check the schedule and fill in owner, email, other default_args
+ await sdk.run_step(
+ MessageStep(
+ message="Fill in the owner, email, and other default_args in the DAG file with your own personal information. Then the DAG will be ready to run!",
+ name="Fill in default_args",
+ )
+ )