summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py7
-rw-r--r--continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py36
2 files changed, 40 insertions, 3 deletions
diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py
index 8e7d258d..fbd6e11d 100644
--- a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py
+++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py
@@ -5,7 +5,7 @@ from ...core.main import Step
from ...core.sdk import ContinueSDK
from ...steps.core.core import WaitForUserInputStep
from ...steps.main import MessageStep
-from .steps import SetupPipelineStep
+from .steps import SetupPipelineStep, DeployAirflowStep
# https://github.com/dlt-hub/dlt-deploy-template/blob/master/airflow-composer/dag_template.py
@@ -19,7 +19,7 @@ class DeployPipelineAirflowRecipe(Step):
hide: bool = True
async def run(self, sdk: ContinueSDK):
- text_observation = await sdk.run_step(
+ source_name = await sdk.run_step(
MessageStep(name="Deploying a pipeline to Airflow", message=dedent("""\
This recipe will show you how to deploy a pipeline to Airflow. With the help of Continue, you will:
- Select a dlt-verified pipeline
@@ -45,5 +45,6 @@ class DeployPipelineAirflowRecipe(Step):
])
)
await sdk.run_step(
- SetupPipelineStep(source_name=text_observation.text) >>
+ SetupPipelineStep(source_name=source_name) >>
+ DeployAirflowStep(source_name=source_name)
)
diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py
index b69b3adc..6a912f0c 100644
--- a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py
+++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py
@@ -3,6 +3,7 @@ import subprocess
from textwrap import dedent
import time
+from ...steps.core.core import WaitForUserInputStep
from ...models.main import Range
from ...models.filesystem import RangeInFile
from ...steps.main import MessageStep
@@ -38,3 +39,38 @@ class SetupPipelineStep(Step):
- `pip install dlt`: Install dlt
- `dlt init {self.source_name} duckdb`: Create a new dlt pipeline called {self.source_name} that loads data into a local DuckDB instance
- `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""), name="Setup Python environment")
+
+
+class DeployAirflowStep(Step):
+ hide: bool = True
+ source_name: str
+
+ async def run(self, sdk: ContinueSDK):
+
+ # Run dlt command to deploy pipeline to Airflow
+ await sdk.run([
+ f'dlt --non-interactive deploy {self.source_name}_pipeline.py airflow-composer',
+ ], description="Running `dlt deploy airflow` to deploy the dlt pipeline to Airflow", name="Deploy dlt pipeline to Airflow")
+
+ # Modify the DAG file
+ directory = await sdk.ide.getWorkspaceDirectory()
+ filepath = os.path.join(
+ directory, f"dags/dag_{self.source_name}_pipeline.py")
+
+ # TODO: Find and replace in file step.
+ old_file_contents = await sdk.ide.readFile(filepath)
+ file_contents = old_file_contents.replace("pipeline_name", f"{self.source_name}_pipeline").replace(
+ "dataset_name", f"{self.source_name}_dataset")
+ await sdk.apply_filesystem_edit(FileEdit(filepath=filepath, range=Range.from_entire_file(filepath, old_file_contents), replacement=file_contents))
+
+ # Prompt the user for the DAG schedule
+ response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)", name="Set DAG Schedule"))
+ edit_dag_range = Range.from_shorthand(18, 0, 23, 0)
+ await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=filepath, range=edit_dag_range))
+ await sdk.edit_file(filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response}'",
+ range=edit_dag_range)
+
+ # Tell the user to check the schedule and fill in owner, email, other default_args
+ await sdk.run_step(MessageStep(message="Fill in the owner, email, and other default_args in the DAG file with your own personal information.", name="Fill in default_args"))
+
+ # Run the DAG locally ??