summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNate Sesti <sestinj@gmail.com>2023-06-11 22:22:46 -0700
committerNate Sesti <sestinj@gmail.com>2023-06-11 22:22:46 -0700
commitbb44ad69a91be1d678baa04acb07777b8cd325ed (patch)
tree3ccfd8091c3326a10a16bddea424a1a493437c35
parentba0479d90d6f7cf16366485fce7095b728760848 (diff)
downloadsncontinue-bb44ad69a91be1d678baa04acb07777b8cd325ed.tar.gz
sncontinue-bb44ad69a91be1d678baa04acb07777b8cd325ed.tar.bz2
sncontinue-bb44ad69a91be1d678baa04acb07777b8cd325ed.zip
finished airflow recipe
-rw-r--r--continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py16
-rw-r--r--continuedev/src/continuedev/steps/find_and_replace.py26
2 files changed, 34 insertions, 8 deletions
diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py
index 6a912f0c..ce910252 100644
--- a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py
+++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py
@@ -12,6 +12,7 @@ from ...core.observation import DictObservation, InternalErrorObservation
from ...models.filesystem_edit import AddFile, FileEdit
from ...core.main import Step
from ...core.sdk import ContinueSDK
+from ...steps.find_and_replace import FindAndReplaceStep
AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)"
@@ -57,17 +58,16 @@ class DeployAirflowStep(Step):
filepath = os.path.join(
directory, f"dags/dag_{self.source_name}_pipeline.py")
- # TODO: Find and replace in file step.
- old_file_contents = await sdk.ide.readFile(filepath)
- file_contents = old_file_contents.replace("pipeline_name", f"{self.source_name}_pipeline").replace(
- "dataset_name", f"{self.source_name}_dataset")
- await sdk.apply_filesystem_edit(FileEdit(filepath=filepath, range=Range.from_entire_file(filepath, old_file_contents), replacement=file_contents))
+ # Replace the pipeline name and dataset name
+ await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'pipeline_name'", replacement=f"'{self.source_name}_pipeline'"))
+ await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'dataset_name'", replacement=f"'{self.source_name}_data'"))
+ await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="pipeline_or_source_script", replacement=f"{self.source_name}_pipeline"))
# Prompt the user for the DAG schedule
- response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)", name="Set DAG Schedule"))
edit_dag_range = Range.from_shorthand(18, 0, 23, 0)
- await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=filepath, range=edit_dag_range))
- await sdk.edit_file(filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response}'",
+ await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=filepath, range=edit_dag_range), color="#33993333")
+ response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)"))
+ await sdk.edit_file(filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response.text}'",
range=edit_dag_range)
# Tell the user to check the schedule and fill in owner, email, other default_args
diff --git a/continuedev/src/continuedev/steps/find_and_replace.py b/continuedev/src/continuedev/steps/find_and_replace.py
new file mode 100644
index 00000000..78511b27
--- /dev/null
+++ b/continuedev/src/continuedev/steps/find_and_replace.py
@@ -0,0 +1,26 @@
+from ..models.filesystem_edit import FileEdit, Range
+from ..core.main import Models, Step
+from ..core.sdk import ContinueSDK
+
+
+class FindAndReplaceStep(Step):
+ name: str = "Find and replace"
+ filepath: str
+ pattern: str
+ replacement: str
+
+ async def describe(self, models: Models):
+ return f"Replace all instances of `{self.pattern}` with `{self.replacement}` in `{self.filepath}`"
+
+ async def run(self, sdk: ContinueSDK):
+ file_content = await sdk.ide.readFile(self.filepath)
+ while self.pattern in file_content:
+ start_index = file_content.index(self.pattern)
+ end_index = start_index + len(self.pattern)
+ await sdk.ide.applyFileSystemEdit(FileEdit(
+ filepath=self.filepath,
+ range=Range.from_indices(file_content, start_index, end_index),
+ replacement=self.replacement
+ ))
+ file_content = file_content[:start_index] + \
+ self.replacement + file_content[end_index:]