summaryrefslogtreecommitdiff
path: root/continuedev
diff options
context:
space:
mode:
authorNate Sesti <sestinj@gmail.com>2023-06-11 22:50:34 -0700
committerNate Sesti <sestinj@gmail.com>2023-06-11 22:50:34 -0700
commit83869b261bd8b664fe17613f343c96d48803497b (patch)
tree7c0b07ab9f253a8aec1a1627a41cdb410a0209c5 /continuedev
parent18cfe5c697fe1ecf22edd99c72372756279594d7 (diff)
parent49f3ba8b252ef736eea23747ae3768b504e000c6 (diff)
downloadsncontinue-83869b261bd8b664fe17613f343c96d48803497b.tar.gz
sncontinue-83869b261bd8b664fe17613f343c96d48803497b.tar.bz2
sncontinue-83869b261bd8b664fe17613f343c96d48803497b.zip
Merge branch 'deploy-airflow' into dd-to-bq
Diffstat (limited to 'continuedev')
-rw-r--r--continuedev/src/continuedev/core/policy.py5
-rw-r--r--continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md0
-rw-r--r--continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py50
-rw-r--r--continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py76
-rw-r--r--continuedev/src/continuedev/steps/find_and_replace.py26
-rw-r--r--continuedev/src/continuedev/steps/input/nl_multiselect.py27
-rw-r--r--continuedev/src/continuedev/steps/steps_on_startup.py4
7 files changed, 186 insertions, 2 deletions
diff --git a/continuedev/src/continuedev/core/policy.py b/continuedev/src/continuedev/core/policy.py
index 8612d834..3bb9f61a 100644
--- a/continuedev/src/continuedev/core/policy.py
+++ b/continuedev/src/continuedev/core/policy.py
@@ -3,6 +3,7 @@ from typing import List, Tuple, Type
from ..steps.chroma import AnswerQuestionChroma, EditFileChroma, CreateCodebaseIndexChroma
from ..steps.steps_on_startup import StepsOnStartupStep
from ..recipes.CreatePipelineRecipe.main import CreatePipelineRecipe
+from ..recipes.DeployPipelineAirflowRecipe.main import DeployPipelineAirflowRecipe
from .main import Step, Validator, History, Policy
from .observation import Observation, TracebackObservation, UserInputObservation
from ..steps.main import EditHighlightedCodeStep, SolveTracebackStep, RunCodeStep, FasterEditHighlightedCodeStep, StarCoderEditHighlightedCodeStep, MessageStep, EmptyStep, SetupContinueWorkspaceStep
@@ -29,10 +30,12 @@ class DemoPolicy(Policy):
# This could be defined with ObservationTypePolicy. Ergonomics not right though.
if "/pytest" in observation.user_input.lower():
return WritePytestsRecipe(instructions=observation.user_input)
- elif "/dlt" in observation.user_input.lower() or " dlt" in observation.user_input.lower():
+ elif "/dlt" in observation.user_input.lower():
return CreatePipelineRecipe()
elif "/ddtobq" in observation.user_input.lower():
return DDtoBQRecipeRecipe()
+ elif "/airflow" in observation.user_input.lower():
+ return DeployPipelineAirflowRecipe()
elif "/comment" in observation.user_input.lower():
return CommentCodeStep()
elif "/ask" in observation.user_input:
diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md
diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py
new file mode 100644
index 00000000..fbd6e11d
--- /dev/null
+++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py
@@ -0,0 +1,50 @@
+from textwrap import dedent
+
+from ...steps.input.nl_multiselect import NLMultiselectStep
+from ...core.main import Step
+from ...core.sdk import ContinueSDK
+from ...steps.core.core import WaitForUserInputStep
+from ...steps.main import MessageStep
+from .steps import SetupPipelineStep, DeployAirflowStep
+
+
+# https://github.com/dlt-hub/dlt-deploy-template/blob/master/airflow-composer/dag_template.py
+# https://www.notion.so/dlthub/Deploy-a-pipeline-with-Airflow-245fd1058652479494307ead0b5565f3
+# 1. What verified pipeline do you want to deploy with Airflow?
+# 2. Set up selected verified pipeline
+# 3. Deploy selected verified pipeline with Airflow
+# 4. Set up Airflow locally?
+
+class DeployPipelineAirflowRecipe(Step):
+ hide: bool = True
+
+ async def run(self, sdk: ContinueSDK):
+ source_name = await sdk.run_step(
+ MessageStep(name="Deploying a pipeline to Airflow", message=dedent("""\
+ This recipe will show you how to deploy a pipeline to Airflow. With the help of Continue, you will:
+ - Select a dlt-verified pipeline
+ - Setup the pipeline
+ - Deploy it to Airflow
+ - Optionally, setup Airflow locally""")) >>
+ NLMultiselectStep(
+ prompt=dedent("""\
+ Which verified pipeline do you want to deploy with Airflow? The options are:
+ - Asana
+ - Chess.com
+ - GitHub
+ - Google Analytics
+ - Google Sheets
+ - HubSpot
+ - Matomo
+ - Pipedrive
+ - Shopify
+ - Strapi
+ - Zendesk"""),
+ options=[
+ "asana_dlt", "chess", "github", "google_analytics", "google_sheets", "hubspot", "matomo", "pipedrive", "shopify_dlt", "strapi", "zendesk"
+ ])
+ )
+ await sdk.run_step(
+ SetupPipelineStep(source_name=source_name) >>
+ DeployAirflowStep(source_name=source_name)
+ )
diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py
new file mode 100644
index 00000000..ce910252
--- /dev/null
+++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py
@@ -0,0 +1,76 @@
+import os
+import subprocess
+from textwrap import dedent
+import time
+
+from ...steps.core.core import WaitForUserInputStep
+from ...models.main import Range
+from ...models.filesystem import RangeInFile
+from ...steps.main import MessageStep
+from ...core.sdk import Models
+from ...core.observation import DictObservation, InternalErrorObservation
+from ...models.filesystem_edit import AddFile, FileEdit
+from ...core.main import Step
+from ...core.sdk import ContinueSDK
+from ...steps.find_and_replace import FindAndReplaceStep
+
+AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)"
+
+
+class SetupPipelineStep(Step):
+ hide: bool = True
+ name: str = "Setup dlt Pipeline"
+
+ source_name: str
+
+ async def describe(self, models: Models):
+ pass
+
+ async def run(self, sdk: ContinueSDK):
+ await sdk.run([
+ 'python3 -m venv env',
+ 'source env/bin/activate',
+ 'pip install dlt',
+ f'dlt --non-interactive init {self.source_name} duckdb',
+ 'pip install -r requirements.txt'
+ ], description=dedent(f"""\
+ Running the following commands:
+ - `python3 -m venv env`: Create a Python virtual environment
+ - `source env/bin/activate`: Activate the virtual environment
+ - `pip install dlt`: Install dlt
+ - `dlt init {self.source_name} duckdb`: Create a new dlt pipeline called {self.source_name} that loads data into a local DuckDB instance
+ - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""), name="Setup Python environment")
+
+
+class DeployAirflowStep(Step):
+ hide: bool = True
+ source_name: str
+
+ async def run(self, sdk: ContinueSDK):
+
+ # Run dlt command to deploy pipeline to Airflow
+ await sdk.run([
+ f'dlt --non-interactive deploy {self.source_name}_pipeline.py airflow-composer',
+ ], description="Running `dlt deploy airflow` to deploy the dlt pipeline to Airflow", name="Deploy dlt pipeline to Airflow")
+
+ # Modify the DAG file
+ directory = await sdk.ide.getWorkspaceDirectory()
+ filepath = os.path.join(
+ directory, f"dags/dag_{self.source_name}_pipeline.py")
+
+ # Replace the pipeline name and dataset name
+ await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'pipeline_name'", replacement=f"'{self.source_name}_pipeline'"))
+ await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="'dataset_name'", replacement=f"'{self.source_name}_data'"))
+ await sdk.run_step(FindAndReplaceStep(filepath=filepath, pattern="pipeline_or_source_script", replacement=f"{self.source_name}_pipeline"))
+
+ # Prompt the user for the DAG schedule
+ edit_dag_range = Range.from_shorthand(18, 0, 23, 0)
+ await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=filepath, range=edit_dag_range), color="#33993333")
+ response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)"))
+ await sdk.edit_file(filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response.text}'",
+ range=edit_dag_range)
+
+ # Tell the user to check the schedule and fill in owner, email, other default_args
+ await sdk.run_step(MessageStep(message="Fill in the owner, email, and other default_args in the DAG file with your own personal information.", name="Fill in default_args"))
+
+ # Run the DAG locally ??
diff --git a/continuedev/src/continuedev/steps/find_and_replace.py b/continuedev/src/continuedev/steps/find_and_replace.py
new file mode 100644
index 00000000..78511b27
--- /dev/null
+++ b/continuedev/src/continuedev/steps/find_and_replace.py
@@ -0,0 +1,26 @@
+from ..models.filesystem_edit import FileEdit, Range
+from ..core.main import Models, Step
+from ..core.sdk import ContinueSDK
+
+
+class FindAndReplaceStep(Step):
+ name: str = "Find and replace"
+ filepath: str
+ pattern: str
+ replacement: str
+
+ async def describe(self, models: Models):
+ return f"Replace all instances of `{self.pattern}` with `{self.replacement}` in `{self.filepath}`"
+
+ async def run(self, sdk: ContinueSDK):
+ file_content = await sdk.ide.readFile(self.filepath)
+ while self.pattern in file_content:
+ start_index = file_content.index(self.pattern)
+ end_index = start_index + len(self.pattern)
+ await sdk.ide.applyFileSystemEdit(FileEdit(
+ filepath=self.filepath,
+ range=Range.from_indices(file_content, start_index, end_index),
+ replacement=self.replacement
+ ))
+ file_content = file_content[:start_index] + \
+ self.replacement + file_content[end_index:]
diff --git a/continuedev/src/continuedev/steps/input/nl_multiselect.py b/continuedev/src/continuedev/steps/input/nl_multiselect.py
new file mode 100644
index 00000000..c3c832f5
--- /dev/null
+++ b/continuedev/src/continuedev/steps/input/nl_multiselect.py
@@ -0,0 +1,27 @@
+from typing import List, Union
+from ..core.core import WaitForUserInputStep
+from ...core.main import Step
+from ...core.sdk import ContinueSDK
+
+
+class NLMultiselectStep(Step):
+ hide: bool = True
+
+ prompt: str
+ options: List[str]
+
+ async def run(self, sdk: ContinueSDK):
+ user_response = (await sdk.run_step(WaitForUserInputStep(prompt=self.prompt))).text
+
+ def extract_option(text: str) -> Union[str, None]:
+ for option in self.options:
+ if option in text:
+ return option
+ return None
+
+ first_try = extract_option(user_response.lower())
+ if first_try is not None:
+ return first_try
+
+ gpt_parsed = await sdk.models.gpt35.complete(f"These are the available options are: [{', '.join(self.options)}]. The user requested {user_response}. This is the exact string from the options array that they selected:")
+ return extract_option(gpt_parsed) or self.options[0]
diff --git a/continuedev/src/continuedev/steps/steps_on_startup.py b/continuedev/src/continuedev/steps/steps_on_startup.py
index ba793425..d0b47201 100644
--- a/continuedev/src/continuedev/steps/steps_on_startup.py
+++ b/continuedev/src/continuedev/steps/steps_on_startup.py
@@ -2,12 +2,14 @@ from ..core.main import ContinueSDK, Models, Step
from .main import UserInputStep
from ..recipes.CreatePipelineRecipe.main import CreatePipelineRecipe
from ..recipes.DDtoBQRecipe.main import DDtoBQRecipeRecipe
+from ..recipes.DeployPipelineAirflowRecipe.main import DeployPipelineAirflowRecipe
step_name_to_step_class = {
"UserInputStep": UserInputStep,
"CreatePipelineRecipe": CreatePipelineRecipe,
- "DDtoBQRecipeRecipe": DDtoBQRecipeRecipe
+ "DDtoBQRecipeRecipe": DDtoBQRecipeRecipe,
+ "DeployPipelineAirflowRecipe": DeployPipelineAirflowRecipe
}