summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py16
1 files changed, 9 insertions, 7 deletions
diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py
index 4a128786..500a89ba 100644
--- a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py
+++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py
@@ -71,19 +71,21 @@ class DeployAirflowStep(Step):
# Modify the DAG file
directory = await sdk.ide.getWorkspaceDirectory()
- filepath = os.path.join(
- directory, f"dags/dag_{self.source_name}_pipeline.py")
+ pipeline_filepath = os.path.join(
+ directory, f"{self.source_name}_pipeline.py")
+ dag_filepath = os.path.join(
+ directory, f"dags/dag_{self.source_name}.py")
# Replace the pipeline name and dataset name
- await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'pipeline_name'", replacement=f"'{self.source_name}_pipeline'"))
- await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'dataset_name'", replacement=f"'{self.source_name}_data'"))
- await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="pipeline_or_source_script", replacement=f"{self.source_name}_pipeline"))
+ await sdk.run_step(FindAndReplaceStep(filepath=pipeline_filepath, pattern="'pipeline_name'", replacement=f"'{self.source_name}_pipeline'"))
+ await sdk.run_step(FindAndReplaceStep(filepath=pipeline_filepath, pattern="'dataset_name'", replacement=f"'{self.source_name}_data'"))
+ await sdk.run_step(FindAndReplaceStep(filepath=pipeline_filepath, pattern="pipeline_or_source_script", replacement=f"{self.source_name}_pipeline"))
# Prompt the user for the DAG schedule
edit_dag_range = Range.from_shorthand(18, 0, 23, 0)
- await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=filepath, range=edit_dag_range), color="#33993333")
+ await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=dag_filepath, range=edit_dag_range), color="#33993333")
response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)"))
- await sdk.edit_file(filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response.text}'",
+ await sdk.edit_file(dag_filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response.text}'",
range=edit_dag_range)
# Tell the user to check the schedule and fill in owner, email, other default_args