summaryrefslogtreecommitdiff
path: root/server/continuedev/plugins/recipes/DDtoBQRecipe/steps.py
blob: dfe25d9e02021771cfb88aadcc9faa7e97349a84 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
from textwrap import dedent

from ....core.main import Step
from ....core.sdk import ContinueSDK, Models
from ....core.steps import MessageStep
from ....libs.util.paths import find_data_file
from ....plugins.steps.find_and_replace import FindAndReplaceStep

AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)"


class SetUpChessPipelineStep(Step):
    hide: bool = True
    name: str = "Setup Chess.com API dlt Pipeline"

    async def describe(self, models: Models):
        return "This step will create a new dlt pipeline that loads data from the chess.com API."

    async def run(self, sdk: ContinueSDK):
        # running commands to get started when creating a new dlt pipeline
        await sdk.run(
            [
                "python3 -m venv .env",
                "source .env/bin/activate",
                "pip install dlt",
                "dlt --non-interactive init chess duckdb",
                "pip install -r requirements.txt",
            ],
            name="Set up Python environment",
            description=dedent(
                """\
            Running the following commands:
            - `python3 -m venv .env`: Create a Python virtual environment
            - `source .env/bin/activate`: Activate the virtual environment
            - `pip install dlt`: Install dlt
            - `dlt init chess duckdb`: Create a new dlt pipeline called "chess" that loads data into a local DuckDB instance
            - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""
            ),
        )


class SwitchDestinationStep(Step):
    hide: bool = True

    async def run(self, sdk: ContinueSDK):
        # Switch destination from DuckDB to Google BigQuery
        filepath = os.path.join(sdk.ide.workspace_directory, "chess_pipeline.py")
        await sdk.run_step(
            FindAndReplaceStep(
                filepath=filepath,
                pattern="destination='duckdb'",
                replacement="destination='bigquery'",
            )
        )

        # Add BigQuery credentials to your secrets.toml file
        template = dedent(
            """\
            [destination.bigquery.credentials]
            location = "US"  # change the location of the data
            project_id = "project_id" # please set me up!
            private_key = "private_key" # please set me up!
            client_email = "client_email" # please set me up!"""
        )

        # wait for user to put API key in secrets.toml
        secrets_path = os.path.join(sdk.ide.workspace_directory, ".dlt/secrets.toml")
        await sdk.ide.setFileOpen(secrets_path)
        await sdk.append_to_file(secrets_path, template)

        # append template to bottom of secrets.toml
        await sdk.wait_for_user_confirmation(
            "Please add your GCP credentials to `secrets.toml` file and then press `Continue`"
        )


class LoadDataStep(Step):
    name: str = "Load data to BigQuery"
    hide: bool = True

    async def run(self, sdk: ContinueSDK):
        # Run the pipeline again to load data to BigQuery
        output = await sdk.run(
            ".env/bin/python3 chess_pipeline.py",
            name="Load data to BigQuery",
            description="Running `.env/bin/python3 chess_pipeline.py` to load data to Google BigQuery",
        )

        if "Traceback" in output or "SyntaxError" in output:
            with open(find_data_file("dlt_duckdb_to_bigquery_docs.md"), "r") as f:
                docs = f.read()

            output = "Traceback" + output.split("Traceback")[-1]
            suggestion = await sdk.models.default.complete(
                dedent(
                    f"""\
                When trying to load data into BigQuery, the following error occurred:

                ```ascii
                {output}
                ```

                Here is documentation describing common errors and their causes/solutions:

                {docs}

                This is a brief summary of the error followed by a suggestion on how it can be fixed:"""
                )
            )

            sdk.raise_exception(
                title="Error while running query",
                message=output,
                with_step=MessageStep(
                    name=f"Suggestion to solve error {AI_ASSISTED_STRING}",
                    message=suggestion,
                ),
            )