1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
import os
from textwrap import dedent
from ....core.main import Step
from ....core.sdk import ContinueSDK, Models
from ....core.steps import MessageStep
from ....plugins.steps.find_and_replace import FindAndReplaceStep
AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)"
class SetupPipelineStep(Step):
hide: bool = True
name: str = "Setup dlt Pipeline"
source_name: str
async def describe(self, models: Models):
pass
async def run(self, sdk: ContinueSDK):
await sdk.run(
[
"python3 -m venv .env",
"source .env/bin/activate",
"pip install dlt",
f"dlt --non-interactive init {self.source_name} duckdb",
"pip install -r requirements.txt",
],
description=dedent(
f"""\
Running the following commands:
- `python3 -m venv .env`: Create a Python virtual environment
- `source .env/bin/activate`: Activate the virtual environment
- `pip install dlt`: Install dlt
- `dlt init {self.source_name} duckdb`: Create a new dlt pipeline called {self.source_name} that loads data into a local DuckDB instance
- `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""
),
name="Setup Python environment",
)
class RunPipelineStep(Step):
hide: bool = True
name: str = "Run dlt Pipeline"
source_name: str
async def describe(self, models: Models):
pass
async def run(self, sdk: ContinueSDK):
await sdk.run(
[
f"python3 {self.source_name}_pipeline.py",
],
description=dedent(
f"""\
Running the command `python3 {self.source_name}_pipeline.py to run the pipeline: """
),
name="Run dlt pipeline",
)
class DeployAirflowStep(Step):
hide: bool = True
source_name: str
async def run(self, sdk: ContinueSDK):
# Run dlt command to deploy pipeline to Airflow
await sdk.run(
[
"git init",
f"dlt --non-interactive deploy {self.source_name}_pipeline.py airflow-composer",
],
description="Running `dlt deploy airflow` to deploy the dlt pipeline to Airflow",
name="Deploy dlt pipeline to Airflow",
)
# Get filepaths, open the DAG file
directory = await sdk.ide.getWorkspaceDirectory()
pipeline_filepath = os.path.join(directory, f"{self.source_name}_pipeline.py")
dag_filepath = os.path.join(
directory, f"dags/dag_{self.source_name}_pipeline.py"
)
await sdk.ide.setFileOpen(dag_filepath)
# Replace the pipeline name and dataset name
await sdk.run_step(
FindAndReplaceStep(
filepath=pipeline_filepath,
pattern="'pipeline_name'",
replacement=f"'{self.source_name}_pipeline'",
)
)
await sdk.run_step(
FindAndReplaceStep(
filepath=pipeline_filepath,
pattern="'dataset_name'",
replacement=f"'{self.source_name}_data'",
)
)
await sdk.run_step(
FindAndReplaceStep(
filepath=pipeline_filepath,
pattern="pipeline_or_source_script",
replacement=f"{self.source_name}_pipeline",
)
)
# Prompt the user for the DAG schedule
# edit_dag_range = Range.from_shorthand(18, 0, 23, 0)
# await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=dag_filepath, range=edit_dag_range), color="#33993333")
# response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)"))
# await sdk.edit_file(dag_filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response.text}'",
# range=edit_dag_range)
# Tell the user to check the schedule and fill in owner, email, other default_args
await sdk.run_step(
MessageStep(
message="Fill in the owner, email, and other default_args in the DAG file with your own personal information. Then the DAG will be ready to run!",
name="Fill in default_args",
)
)
|