diff options
Diffstat (limited to 'continuedev')
-rw-r--r-- | continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py index 4a128786..500a89ba 100644 --- a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py @@ -71,19 +71,21 @@ class DeployAirflowStep(Step): # Modify the DAG file directory = await sdk.ide.getWorkspaceDirectory() - filepath = os.path.join( - directory, f"dags/dag_{self.source_name}_pipeline.py") + pipeline_filepath = os.path.join( + directory, f"{self.source_name}_pipeline.py") + dag_filepath = os.path.join( + directory, f"dags/dag_{self.source_name}.py") # Replace the pipeline name and dataset name - await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'pipeline_name'", replacement=f"'{self.source_name}_pipeline'")) - await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'dataset_name'", replacement=f"'{self.source_name}_data'")) - await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="pipeline_or_source_script", replacement=f"{self.source_name}_pipeline")) + await sdk.run_step(FindAndReplaceStep(filepath=pipeline_filepath, pattern="'pipeline_name'", replacement=f"'{self.source_name}_pipeline'")) + await sdk.run_step(FindAndReplaceStep(filepath=pipeline_filepath, pattern="'dataset_name'", replacement=f"'{self.source_name}_data'")) + await sdk.run_step(FindAndReplaceStep(filepath=pipeline_filepath, pattern="pipeline_or_source_script", replacement=f"{self.source_name}_pipeline")) # Prompt the user for the DAG schedule edit_dag_range = Range.from_shorthand(18, 0, 23, 0) - await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=filepath, range=edit_dag_range), color="#33993333") + await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=dag_filepath, range=edit_dag_range), color="#33993333") response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)")) - await sdk.edit_file(filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response.text}'", + await sdk.edit_file(dag_filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response.text}'", range=edit_dag_range) # Tell the user to check the schedule and fill in owner, email, other default_args |