diff options
| author | Nate Sesti <sestinj@gmail.com> | 2023-06-11 22:22:46 -0700 | 
|---|---|---|
| committer | Nate Sesti <sestinj@gmail.com> | 2023-06-11 22:22:46 -0700 | 
| commit | 49f3ba8b252ef736eea23747ae3768b504e000c6 (patch) | |
| tree | 177ad1ca4e5e193d809dba3eaf3791599bad3cfe /continuedev/src | |
| parent | 78a0299d9ed21b7cbee5e64f35eb37c79fe77371 (diff) | |
| download | sncontinue-49f3ba8b252ef736eea23747ae3768b504e000c6.tar.gz sncontinue-49f3ba8b252ef736eea23747ae3768b504e000c6.tar.bz2 sncontinue-49f3ba8b252ef736eea23747ae3768b504e000c6.zip | |
finished airflow recipe
Diffstat (limited to 'continuedev/src')
| -rw-r--r-- | continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py | 16 | ||||
| -rw-r--r-- | continuedev/src/continuedev/steps/find_and_replace.py | 26 | 
2 files changed, 34 insertions, 8 deletions
| diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py index 6a912f0c..ce910252 100644 --- a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py @@ -12,6 +12,7 @@ from ...core.observation import DictObservation, InternalErrorObservation  from ...models.filesystem_edit import AddFile, FileEdit  from ...core.main import Step  from ...core.sdk import ContinueSDK +from ...steps.find_and_replace import FindAndReplaceStep  AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)" @@ -57,17 +58,16 @@ class DeployAirflowStep(Step):          filepath = os.path.join(              directory, f"dags/dag_{self.source_name}_pipeline.py") -        # TODO: Find and replace in file step. -        old_file_contents = await sdk.ide.readFile(filepath) -        file_contents = old_file_contents.replace("pipeline_name", f"{self.source_name}_pipeline").replace( -            "dataset_name", f"{self.source_name}_dataset") -        await sdk.apply_filesystem_edit(FileEdit(filepath=filepath, range=Range.from_entire_file(filepath, old_file_contents), replacement=file_contents)) +        # Replace the pipeline name and dataset name +        await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'pipeline_name'", replacement=f"'{self.source_name}_pipeline'")) +        await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'dataset_name'", replacement=f"'{self.source_name}_data'")) +        await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="pipeline_or_source_script", replacement=f"{self.source_name}_pipeline"))          # Prompt the user for the DAG schedule -        response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)", name="Set DAG Schedule"))          edit_dag_range = Range.from_shorthand(18, 0, 23, 0) -        await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=filepath, range=edit_dag_range)) -        await sdk.edit_file(filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response}'", +        await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=filepath, range=edit_dag_range), color="#33993333") +        response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)")) +        await sdk.edit_file(filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response.text}'",                              range=edit_dag_range)          # Tell the user to check the schedule and fill in owner, email, other default_args diff --git a/continuedev/src/continuedev/steps/find_and_replace.py b/continuedev/src/continuedev/steps/find_and_replace.py new file mode 100644 index 00000000..78511b27 --- /dev/null +++ b/continuedev/src/continuedev/steps/find_and_replace.py @@ -0,0 +1,26 @@ +from ..models.filesystem_edit import FileEdit, Range +from ..core.main import Models, Step +from ..core.sdk import ContinueSDK + + +class FindAndReplaceStep(Step): +    name: str = "Find and replace" +    filepath: str +    pattern: str +    replacement: str + +    async def describe(self, models: Models): +        return f"Replace all instances of `{self.pattern}` with `{self.replacement}` in `{self.filepath}`" + +    async def run(self, sdk: ContinueSDK): +        file_content = await sdk.ide.readFile(self.filepath) +        while self.pattern in file_content: +            start_index = file_content.index(self.pattern) +            end_index = start_index + len(self.pattern) +            await sdk.ide.applyFileSystemEdit(FileEdit( +                filepath=self.filepath, +                range=Range.from_indices(file_content, start_index, end_index), +                replacement=self.replacement +            )) +            file_content = file_content[:start_index] + \ +                self.replacement + file_content[end_index:] | 
