1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
import os
from textwrap import dedent
from ....plugins.steps.core.core import MessageStep
from ....core.sdk import Models
from ....core.main import Step
from ....core.sdk import ContinueSDK
from ....plugins.steps.find_and_replace import FindAndReplaceStep
AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)"
class SetupPipelineStep(Step):
hide: bool = True
name: str = "Setup dlt Pipeline"
source_name: str
async def describe(self, models: Models):
pass
async def run(self, sdk: ContinueSDK):
await sdk.run([
'python3 -m venv .env',
'source .env/bin/activate',
'pip install dlt',
f'dlt --non-interactive init {self.source_name} duckdb',
'pip install -r requirements.txt'
], description=dedent(f"""\
Running the following commands:
- `python3 -m venv .env`: Create a Python virtual environment
- `source .env/bin/activate`: Activate the virtual environment
- `pip install dlt`: Install dlt
- `dlt init {self.source_name} duckdb`: Create a new dlt pipeline called {self.source_name} that loads data into a local DuckDB instance
- `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""), name="Setup Python environment")
class RunPipelineStep(Step):
hide: bool = True
name: str = "Run dlt Pipeline"
source_name: str
async def describe(self, models: Models):
pass
async def run(self, sdk: ContinueSDK):
await sdk.run([
f'python3 {self.source_name}_pipeline.py',
], description=dedent(f"""\
Running the command `python3 {self.source_name}_pipeline.py to run the pipeline: """), name="Run dlt pipeline")
class DeployAirflowStep(Step):
hide: bool = True
source_name: str
async def run(self, sdk: ContinueSDK):
# Run dlt command to deploy pipeline to Airflow
await sdk.run(
['git init',
f'dlt --non-interactive deploy {self.source_name}_pipeline.py airflow-composer'],
description="Running `dlt deploy airflow` to deploy the dlt pipeline to Airflow", name="Deploy dlt pipeline to Airflow")
# Get filepaths, open the DAG file
directory = await sdk.ide.getWorkspaceDirectory()
pipeline_filepath = os.path.join(
directory, f"{self.source_name}_pipeline.py")
dag_filepath = os.path.join(
directory, f"dags/dag_{self.source_name}_pipeline.py")
await sdk.ide.setFileOpen(dag_filepath)
# Replace the pipeline name and dataset name
await sdk.run_step(FindAndReplaceStep(filepath=pipeline_filepath, pattern="'pipeline_name'", replacement=f"'{self.source_name}_pipeline'"))
await sdk.run_step(FindAndReplaceStep(filepath=pipeline_filepath, pattern="'dataset_name'", replacement=f"'{self.source_name}_data'"))
await sdk.run_step(FindAndReplaceStep(filepath=pipeline_filepath, pattern="pipeline_or_source_script", replacement=f"{self.source_name}_pipeline"))
# Prompt the user for the DAG schedule
# edit_dag_range = Range.from_shorthand(18, 0, 23, 0)
# await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=dag_filepath, range=edit_dag_range), color="#33993333")
# response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)"))
# await sdk.edit_file(dag_filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response.text}'",
# range=edit_dag_range)
# Tell the user to check the schedule and fill in owner, email, other default_args
await sdk.run_step(MessageStep(message="Fill in the owner, email, and other default_args in the DAG file with your own personal information. Then the DAG will be ready to run!", name="Fill in default_args"))
|