| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
 | import os
from textwrap import dedent
from ....core.main import Step
from ....core.sdk import ContinueSDK, Models
from ....core.steps import MessageStep
from ....libs.util.paths import find_data_file
from ....plugins.steps.find_and_replace import FindAndReplaceStep
AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)"
class SetUpChessPipelineStep(Step):
    hide: bool = True
    name: str = "Setup Chess.com API dlt Pipeline"
    async def describe(self, models: Models):
        return "This step will create a new dlt pipeline that loads data from the chess.com API."
    async def run(self, sdk: ContinueSDK):
        # running commands to get started when creating a new dlt pipeline
        await sdk.run(
            [
                "python3 -m venv .env",
                "source .env/bin/activate",
                "pip install dlt",
                "dlt --non-interactive init chess duckdb",
                "pip install -r requirements.txt",
            ],
            name="Set up Python environment",
            description=dedent(
                """\
            Running the following commands:
            - `python3 -m venv .env`: Create a Python virtual environment
            - `source .env/bin/activate`: Activate the virtual environment
            - `pip install dlt`: Install dlt
            - `dlt init chess duckdb`: Create a new dlt pipeline called "chess" that loads data into a local DuckDB instance
            - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""
            ),
        )
class SwitchDestinationStep(Step):
    hide: bool = True
    async def run(self, sdk: ContinueSDK):
        # Switch destination from DuckDB to Google BigQuery
        filepath = os.path.join(sdk.ide.workspace_directory, "chess_pipeline.py")
        await sdk.run_step(
            FindAndReplaceStep(
                filepath=filepath,
                pattern="destination='duckdb'",
                replacement="destination='bigquery'",
            )
        )
        # Add BigQuery credentials to your secrets.toml file
        template = dedent(
            """\
            [destination.bigquery.credentials]
            location = "US"  # change the location of the data
            project_id = "project_id" # please set me up!
            private_key = "private_key" # please set me up!
            client_email = "client_email" # please set me up!"""
        )
        # wait for user to put API key in secrets.toml
        secrets_path = os.path.join(sdk.ide.workspace_directory, ".dlt/secrets.toml")
        await sdk.ide.setFileOpen(secrets_path)
        await sdk.append_to_file(secrets_path, template)
        # append template to bottom of secrets.toml
        await sdk.wait_for_user_confirmation(
            "Please add your GCP credentials to `secrets.toml` file and then press `Continue`"
        )
class LoadDataStep(Step):
    name: str = "Load data to BigQuery"
    hide: bool = True
    async def run(self, sdk: ContinueSDK):
        # Run the pipeline again to load data to BigQuery
        output = await sdk.run(
            ".env/bin/python3 chess_pipeline.py",
            name="Load data to BigQuery",
            description="Running `.env/bin/python3 chess_pipeline.py` to load data to Google BigQuery",
        )
        if "Traceback" in output or "SyntaxError" in output:
            with open(find_data_file("dlt_duckdb_to_bigquery_docs.md"), "r") as f:
                docs = f.read()
            output = "Traceback" + output.split("Traceback")[-1]
            suggestion = await sdk.models.default.complete(
                dedent(
                    f"""\
                When trying to load data into BigQuery, the following error occurred:
                ```ascii
                {output}
                ```
                Here is documentation describing common errors and their causes/solutions:
                {docs}
                This is a brief summary of the error followed by a suggestion on how it can be fixed:"""
                )
            )
            sdk.raise_exception(
                title="Error while running query",
                message=output,
                with_step=MessageStep(
                    name=f"Suggestion to solve error {AI_ASSISTED_STRING}",
                    message=suggestion,
                ),
            )
 |