diff options
3 files changed, 189 insertions, 0 deletions
| diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/README.md diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py new file mode 100644 index 00000000..d7cd03db --- /dev/null +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py @@ -0,0 +1,37 @@ +from textwrap import dedent + +from ...core.main import Step +from ...core.sdk import ContinueSDK +from ...steps.core.core import WaitForUserInputStep +from ...steps.main import MessageStep +from .steps import SetupPipelineStep, ValidatePipelineStep + + +# https://github.com/dlt-hub/dlt-deploy-template/blob/master/airflow-composer/dag_template.py +# https://www.notion.so/dlthub/Deploy-a-pipeline-with-Airflow-245fd1058652479494307ead0b5565f3 +# 1. What verified pipeline do you want to deploy with Airflow? +# 2. Set up selected verified pipeline +# 3. Deploy selected verified pipeline with Airflow +# 4. Set up Airflow locally? + +class DeployPipelineAirflowRecipe(Step): +    hide: bool = True + +    async def run(self, sdk: ContinueSDK): +        text_observation = await sdk.run_step( +            MessageStep(name="Building your first dlt pipeline", message=dedent("""\ +                This recipe will walk you through the process of creating a dlt pipeline for your chosen data source. With the help of Continue, you will: +                - Create a Python virtual environment with dlt installed +                - Run `dlt init` to generate a pipeline template +                - Write the code to call the API +                - Add any required API keys to the `secrets.toml` file +                - Test that the API call works +                - Load the data into a local DuckDB instance +                - Write a query to view the data""")) >> +            WaitForUserInputStep( +                prompt="What API do you want to load data from? (e.g. weatherapi.com, chess.com)") +        ) +        await sdk.run_step( +            SetupPipelineStep(api_description=text_observation.text) >> +            ValidatePipelineStep() +        ) diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py new file mode 100644 index 00000000..c32ae923 --- /dev/null +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py @@ -0,0 +1,152 @@ +import os +import subprocess +from textwrap import dedent +import time + +from ...models.main import Range +from ...models.filesystem import RangeInFile +from ...steps.main import MessageStep +from ...core.sdk import Models +from ...core.observation import DictObservation, InternalErrorObservation +from ...models.filesystem_edit import AddFile, FileEdit +from ...core.main import Step +from ...core.sdk import ContinueSDK + +AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)" + + +class SetupPipelineStep(Step): +    hide: bool = True +    name: str = "Setup dlt Pipeline" + +    api_description: str  # e.g. "I want to load data from the weatherapi.com API" + +    async def describe(self, models: Models): +        return dedent(f"""\ +        This step will create a new dlt pipeline that loads data from an API, as per your request: +        {self.api_description} +        """) + +    async def run(self, sdk: ContinueSDK): +        sdk.context.set("api_description", self.api_description) + +        source_name = (await sdk.models.gpt35()).complete( +            f"Write a snake_case name for the data source described by {self.api_description}: ").strip() +        filename = f'{source_name}.py' + +        # running commands to get started when creating a new dlt pipeline +        await sdk.run([ +            'python3 -m venv env', +            'source env/bin/activate', +            'pip install dlt', +            f'dlt init {source_name} duckdb\n\rY', +            'pip install -r requirements.txt' +        ], description=dedent(f"""\ +            Running the following commands: +            - `python3 -m venv env`: Create a Python virtual environment +            - `source env/bin/activate`: Activate the virtual environment +            - `pip install dlt`: Install dlt +            - `dlt init {source_name} duckdb`: Create a new dlt pipeline called {source_name} that loads data into a local DuckDB instance +            - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""), name="Setup Python environment") + +        # editing the resource function to call the requested API +        await sdk.ide.highlightCode(RangeInFile(filepath=os.path.join(await sdk.ide.getWorkspaceDirectory(), filename), range=Range.from_shorthand(15, 0, 29, 0)), "#00ff0022") + +        # sdk.set_loading_message("Writing code to call the API...") +        await sdk.edit_file( +            filename=filename, +            prompt=f'Edit the resource function to call the API described by this: {self.api_description}. Do not move or remove the exit() call in __main__.', +            name=f"Edit the resource function to call the API {AI_ASSISTED_STRING}" +        ) + +        time.sleep(1) + +        # wait for user to put API key in secrets.toml +        await sdk.ide.setFileOpen(await sdk.ide.getWorkspaceDirectory() + "/.dlt/secrets.toml") +        await sdk.wait_for_user_confirmation("If this service requires an API key, please add it to the `secrets.toml` file and then press `Continue`") + +        sdk.context.set("source_name", source_name) + + +class ValidatePipelineStep(Step): +    hide: bool = True + +    async def run(self, sdk: ContinueSDK): +        workspace_dir = await sdk.ide.getWorkspaceDirectory() +        source_name = sdk.context.get("source_name") +        filename = f'{source_name}.py' + +        # await sdk.run_step(MessageStep(name="Validate the pipeline", message=dedent("""\ +        #         Next, we will validate that your dlt pipeline is working as expected: +        #         - Test that the API call works +        #         - Load the data into a local DuckDB instance +        #         - Write a query to view the data +        #         """))) + +        # test that the API call works +        output = await sdk.run(f'python3 {filename}', name="Test the pipeline", description=f"Running `python3 {filename}` to test loading data from the API") + +        # If it fails, return the error +        if "Traceback" in output: +            output = "Traceback" + output.split("Traceback")[-1] +            file_content = await sdk.ide.readFile(os.path.join(workspace_dir, filename)) +            suggestion = (await sdk.models.gpt35()).complete(dedent(f"""\ +                ```python +                {file_content} +                ``` +                This above code is a dlt pipeline that loads data from an API. The function with the @resource decorator is responsible for calling the API and returning the data. While attempting to run the pipeline, the following error occurred: + +                ```ascii +                {output} +                ``` + +                This is a brief summary of the error followed by a suggestion on how it can be fixed by editing the resource function:""")) + +            api_documentation_url = (await sdk.models.gpt35()).complete(dedent(f"""\ +                The API I am trying to call is the '{sdk.context.get('api_description')}'. I tried calling it in the @resource function like this: +                ```python        +                {file_content} +                ``` +                What is the URL for the API documentation that will help me learn how to make this call? Please format in markdown so I can click the link.""")) + +            sdk.raise_exception( +                title=f"Error while running pipeline.\nFix the resource function in {filename} and rerun this step", message=output, with_step=MessageStep(name=f"Suggestion to solve error {AI_ASSISTED_STRING}", message=dedent(f"""\ +                {suggestion} +                 +                {api_documentation_url} +                 +                After you've fixed the code, click the retry button at the top of the Validate Pipeline step above."""))) + +        # remove exit() from the main main function +        await sdk.run_step(MessageStep(name="Remove early exit() from main function", message="Remove the early exit() from the main function now that we are done testing and want the pipeline to load the data into DuckDB.")) + +        contents = await sdk.ide.readFile(os.path.join(workspace_dir, filename)) +        replacement = "\n".join( +            list(filter(lambda line: line.strip() != "exit()", contents.split("\n")))) +        await sdk.ide.applyFileSystemEdit(FileEdit( +            filepath=os.path.join(workspace_dir, filename), +            replacement=replacement, +            range=Range.from_entire_file(contents) +        )) + +        # load the data into the DuckDB instance +        await sdk.run(f'python3 {filename}', name="Load data into DuckDB", description=f"Running python3 {filename} to load data into DuckDB") + +        table_name = f"{source_name}.{source_name}_resource" +        tables_query_code = dedent(f'''\ +            import duckdb + +            # connect to DuckDB instance +            conn = duckdb.connect(database="{source_name}.duckdb") + +            # get table names +            rows = conn.execute("SELECT * FROM {table_name};").fetchall() + +            # print table names +            for row in rows: +                print(row) +        ''') + +        query_filename = os.path.join(workspace_dir, "query.py") +        await sdk.apply_filesystem_edit(AddFile(filepath=query_filename, content=tables_query_code), name="Add query.py file", description="Adding a file called `query.py` to the workspace that will run a test query on the DuckDB instance") +        await sdk.run('env/bin/python3 query.py', name="Run test query", description="Running `env/bin/python3 query.py` to test that the data was loaded into DuckDB as expected") | 
