diff options
author | Nate Sesti <sestinj@gmail.com> | 2023-06-11 22:22:46 -0700 |
---|---|---|
committer | Nate Sesti <sestinj@gmail.com> | 2023-06-11 22:22:46 -0700 |
commit | 49f3ba8b252ef736eea23747ae3768b504e000c6 (patch) | |
tree | 177ad1ca4e5e193d809dba3eaf3791599bad3cfe /continuedev | |
parent | 78a0299d9ed21b7cbee5e64f35eb37c79fe77371 (diff) | |
download | sncontinue-49f3ba8b252ef736eea23747ae3768b504e000c6.tar.gz sncontinue-49f3ba8b252ef736eea23747ae3768b504e000c6.tar.bz2 sncontinue-49f3ba8b252ef736eea23747ae3768b504e000c6.zip |
finished airflow recipe
Diffstat (limited to 'continuedev')
-rw-r--r-- | continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py | 16 | ||||
-rw-r--r-- | continuedev/src/continuedev/steps/find_and_replace.py | 26 |
2 files changed, 34 insertions, 8 deletions
diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py index 6a912f0c..ce910252 100644 --- a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py @@ -12,6 +12,7 @@ from ...core.observation import DictObservation, InternalErrorObservation from ...models.filesystem_edit import AddFile, FileEdit from ...core.main import Step from ...core.sdk import ContinueSDK +from ...steps.find_and_replace import FindAndReplaceStep AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)" @@ -57,17 +58,16 @@ class DeployAirflowStep(Step): filepath = os.path.join( directory, f"dags/dag_{self.source_name}_pipeline.py") - # TODO: Find and replace in file step. - old_file_contents = await sdk.ide.readFile(filepath) - file_contents = old_file_contents.replace("pipeline_name", f"{self.source_name}_pipeline").replace( - "dataset_name", f"{self.source_name}_dataset") - await sdk.apply_filesystem_edit(FileEdit(filepath=filepath, range=Range.from_entire_file(filepath, old_file_contents), replacement=file_contents)) + # Replace the pipeline name and dataset name + await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'pipeline_name'", replacement=f"'{self.source_name}_pipeline'")) + await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'dataset_name'", replacement=f"'{self.source_name}_data'")) + await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="pipeline_or_source_script", replacement=f"{self.source_name}_pipeline")) # Prompt the user for the DAG schedule - response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)", name="Set DAG Schedule")) edit_dag_range = Range.from_shorthand(18, 0, 23, 0) - await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=filepath, range=edit_dag_range)) - await sdk.edit_file(filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response}'", + await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=filepath, range=edit_dag_range), color="#33993333") + response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)")) + await sdk.edit_file(filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response.text}'", range=edit_dag_range) # Tell the user to check the schedule and fill in owner, email, other default_args diff --git a/continuedev/src/continuedev/steps/find_and_replace.py b/continuedev/src/continuedev/steps/find_and_replace.py new file mode 100644 index 00000000..78511b27 --- /dev/null +++ b/continuedev/src/continuedev/steps/find_and_replace.py @@ -0,0 +1,26 @@ +from ..models.filesystem_edit import FileEdit, Range +from ..core.main import Models, Step +from ..core.sdk import ContinueSDK + + +class FindAndReplaceStep(Step): + name: str = "Find and replace" + filepath: str + pattern: str + replacement: str + + async def describe(self, models: Models): + return f"Replace all instances of `{self.pattern}` with `{self.replacement}` in `{self.filepath}`" + + async def run(self, sdk: ContinueSDK): + file_content = await sdk.ide.readFile(self.filepath) + while self.pattern in file_content: + start_index = file_content.index(self.pattern) + end_index = start_index + len(self.pattern) + await sdk.ide.applyFileSystemEdit(FileEdit( + filepath=self.filepath, + range=Range.from_indices(file_content, start_index, end_index), + replacement=self.replacement + )) + file_content = file_content[:start_index] + \ + self.replacement + file_content[end_index:] |