summaryrefslogtreecommitdiff
path: root/server/continuedev/plugins/recipes/DeployPipelineAirflowRecipe/steps.py
blob: e4a932af395cf9b612e66ef04c587240b029ffbc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import os
from textwrap import dedent

from ....core.main import Step
from ....core.sdk import ContinueSDK, Models
from ....core.steps import MessageStep
from ....plugins.steps.find_and_replace import FindAndReplaceStep

AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)"


class SetupPipelineStep(Step):
    hide: bool = True
    name: str = "Setup dlt Pipeline"

    source_name: str

    async def describe(self, models: Models):
        pass

    async def run(self, sdk: ContinueSDK):
        await sdk.run(
            [
                "python3 -m venv .env",
                "source .env/bin/activate",
                "pip install dlt",
                f"dlt --non-interactive init {self.source_name} duckdb",
                "pip install -r requirements.txt",
            ],
            description=dedent(
                f"""\
            Running the following commands:
            - `python3 -m venv .env`: Create a Python virtual environment
            - `source .env/bin/activate`: Activate the virtual environment
            - `pip install dlt`: Install dlt
            - `dlt init {self.source_name} duckdb`: Create a new dlt pipeline called {self.source_name} that loads data into a local DuckDB instance
            - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""
            ),
            name="Setup Python environment",
        )


class RunPipelineStep(Step):
    hide: bool = True
    name: str = "Run dlt Pipeline"

    source_name: str

    async def describe(self, models: Models):
        pass

    async def run(self, sdk: ContinueSDK):
        await sdk.run(
            [
                f"python3 {self.source_name}_pipeline.py",
            ],
            description=dedent(
                f"""\
            Running the command `python3 {self.source_name}_pipeline.py to run the pipeline: """
            ),
            name="Run dlt pipeline",
        )


class DeployAirflowStep(Step):
    hide: bool = True
    source_name: str

    async def run(self, sdk: ContinueSDK):
        # Run dlt command to deploy pipeline to Airflow
        await sdk.run(
            [
                "git init",
                f"dlt --non-interactive deploy {self.source_name}_pipeline.py airflow-composer",
            ],
            description="Running `dlt deploy airflow` to deploy the dlt pipeline to Airflow",
            name="Deploy dlt pipeline to Airflow",
        )

        # Get filepaths, open the DAG file
        directory = await sdk.ide.getWorkspaceDirectory()
        pipeline_filepath = os.path.join(directory, f"{self.source_name}_pipeline.py")
        dag_filepath = os.path.join(
            directory, f"dags/dag_{self.source_name}_pipeline.py"
        )

        await sdk.ide.setFileOpen(dag_filepath)

        # Replace the pipeline name and dataset name
        await sdk.run_step(
            FindAndReplaceStep(
                filepath=pipeline_filepath,
                pattern="'pipeline_name'",
                replacement=f"'{self.source_name}_pipeline'",
            )
        )
        await sdk.run_step(
            FindAndReplaceStep(
                filepath=pipeline_filepath,
                pattern="'dataset_name'",
                replacement=f"'{self.source_name}_data'",
            )
        )
        await sdk.run_step(
            FindAndReplaceStep(
                filepath=pipeline_filepath,
                pattern="pipeline_or_source_script",
                replacement=f"{self.source_name}_pipeline",
            )
        )

        # Prompt the user for the DAG schedule
        # edit_dag_range = Range.from_shorthand(18, 0, 23, 0)
        # await sdk.ide.highlightCode(range_in_file=RangeInFile(filepath=dag_filepath, range=edit_dag_range), color="#33993333")
        # response = await sdk.run_step(WaitForUserInputStep(prompt="When would you like this Airflow DAG to run? (e.g. every day, every Monday, every 1st of the month, etc.)"))
        # await sdk.edit_file(dag_filepath, prompt=f"Edit the DAG so that it runs at the following schedule: '{response.text}'",
        #                     range=edit_dag_range)

        # Tell the user to check the schedule and fill in owner, email, other default_args
        await sdk.run_step(
            MessageStep(
                message="Fill in the owner, email, and other default_args in the DAG file with your own personal information. Then the DAG will be ready to run!",
                name="Fill in default_args",
            )
        )