diff options
| author | Nate Sesti <sestinj@gmail.com> | 2023-06-10 07:53:31 -0400 | 
|---|---|---|
| committer | Nate Sesti <sestinj@gmail.com> | 2023-06-10 07:53:31 -0400 | 
| commit | 262dffd21c4dac88050926d72b78f7e91a5df75b (patch) | |
| tree | c361c4d95fd999df9a3c1cc950a07796780463fc | |
| parent | c3b24a89105d22a5fa01400b7c9d5494a2d3ffc5 (diff) | |
| download | sncontinue-262dffd21c4dac88050926d72b78f7e91a5df75b.tar.gz sncontinue-262dffd21c4dac88050926d72b78f7e91a5df75b.tar.bz2 sncontinue-262dffd21c4dac88050926d72b78f7e91a5df75b.zip | |
selection step of airflow recipe
3 files changed, 57 insertions, 130 deletions
| diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py index d7cd03db..8e7d258d 100644 --- a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/main.py @@ -1,10 +1,11 @@  from textwrap import dedent +from ...steps.input.nl_multiselect import NLMultiselectStep  from ...core.main import Step  from ...core.sdk import ContinueSDK  from ...steps.core.core import WaitForUserInputStep  from ...steps.main import MessageStep -from .steps import SetupPipelineStep, ValidatePipelineStep +from .steps import SetupPipelineStep  # https://github.com/dlt-hub/dlt-deploy-template/blob/master/airflow-composer/dag_template.py @@ -19,19 +20,30 @@ class DeployPipelineAirflowRecipe(Step):      async def run(self, sdk: ContinueSDK):          text_observation = await sdk.run_step( -            MessageStep(name="Building your first dlt pipeline", message=dedent("""\ -                This recipe will walk you through the process of creating a dlt pipeline for your chosen data source. With the help of Continue, you will: -                - Create a Python virtual environment with dlt installed -                - Run `dlt init` to generate a pipeline template -                - Write the code to call the API -                - Add any required API keys to the `secrets.toml` file -                - Test that the API call works -                - Load the data into a local DuckDB instance -                - Write a query to view the data""")) >> -            WaitForUserInputStep( -                prompt="What API do you want to load data from? (e.g. weatherapi.com, chess.com)") +            MessageStep(name="Deploying a pipeline to Airflow", message=dedent("""\ +                This recipe will show you how to deploy a pipeline to Airflow. With the help of Continue, you will: +                - Select a dlt-verified pipeline +                - Setup the pipeline +                - Deploy it to Airflow +                - Optionally, setup Airflow locally""")) >> +            NLMultiselectStep( +                prompt=dedent("""\ +                    Which verified pipeline do you want to deploy with Airflow? The options are: +                    - Asana +                    - Chess.com +                    - GitHub +                    - Google Analytics +                    - Google Sheets +                    - HubSpot +                    - Matomo +                    - Pipedrive +                    - Shopify +                    - Strapi +                    - Zendesk"""), +                options=[ +                    "asana_dlt", "chess", "github", "google_analytics", "google_sheets", "hubspot", "matomo", "pipedrive", "shopify_dlt", "strapi", "zendesk" +                ])          )          await sdk.run_step( -            SetupPipelineStep(api_description=text_observation.text) >> -            ValidatePipelineStep() +            SetupPipelineStep(source_name=text_observation.text) >>          ) diff --git a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py index c32ae923..b69b3adc 100644 --- a/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py +++ b/continuedev/src/continuedev/recipes/DeployPipelineAirflowRecipe/steps.py @@ -19,134 +19,22 @@ class SetupPipelineStep(Step):      hide: bool = True      name: str = "Setup dlt Pipeline" -    api_description: str  # e.g. "I want to load data from the weatherapi.com API" +    source_name: str      async def describe(self, models: Models): -        return dedent(f"""\ -        This step will create a new dlt pipeline that loads data from an API, as per your request: -        {self.api_description} -        """) +        pass      async def run(self, sdk: ContinueSDK): -        sdk.context.set("api_description", self.api_description) - -        source_name = (await sdk.models.gpt35()).complete( -            f"Write a snake_case name for the data source described by {self.api_description}: ").strip() -        filename = f'{source_name}.py' - -        # running commands to get started when creating a new dlt pipeline          await sdk.run([              'python3 -m venv env',              'source env/bin/activate',              'pip install dlt', -            f'dlt init {source_name} duckdb\n\rY', +            f'dlt --non-interactive init {self.source_name} duckdb',              'pip install -r requirements.txt'          ], description=dedent(f"""\              Running the following commands:              - `python3 -m venv env`: Create a Python virtual environment              - `source env/bin/activate`: Activate the virtual environment              - `pip install dlt`: Install dlt -            - `dlt init {source_name} duckdb`: Create a new dlt pipeline called {source_name} that loads data into a local DuckDB instance +            - `dlt init {self.source_name} duckdb`: Create a new dlt pipeline called {self.source_name} that loads data into a local DuckDB instance              - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""), name="Setup Python environment") - -        # editing the resource function to call the requested API -        await sdk.ide.highlightCode(RangeInFile(filepath=os.path.join(await sdk.ide.getWorkspaceDirectory(), filename), range=Range.from_shorthand(15, 0, 29, 0)), "#00ff0022") - -        # sdk.set_loading_message("Writing code to call the API...") -        await sdk.edit_file( -            filename=filename, -            prompt=f'Edit the resource function to call the API described by this: {self.api_description}. Do not move or remove the exit() call in __main__.', -            name=f"Edit the resource function to call the API {AI_ASSISTED_STRING}" -        ) - -        time.sleep(1) - -        # wait for user to put API key in secrets.toml -        await sdk.ide.setFileOpen(await sdk.ide.getWorkspaceDirectory() + "/.dlt/secrets.toml") -        await sdk.wait_for_user_confirmation("If this service requires an API key, please add it to the `secrets.toml` file and then press `Continue`") - -        sdk.context.set("source_name", source_name) - - -class ValidatePipelineStep(Step): -    hide: bool = True - -    async def run(self, sdk: ContinueSDK): -        workspace_dir = await sdk.ide.getWorkspaceDirectory() -        source_name = sdk.context.get("source_name") -        filename = f'{source_name}.py' - -        # await sdk.run_step(MessageStep(name="Validate the pipeline", message=dedent("""\ -        #         Next, we will validate that your dlt pipeline is working as expected: -        #         - Test that the API call works -        #         - Load the data into a local DuckDB instance -        #         - Write a query to view the data -        #         """))) - -        # test that the API call works -        output = await sdk.run(f'python3 {filename}', name="Test the pipeline", description=f"Running `python3 {filename}` to test loading data from the API") - -        # If it fails, return the error -        if "Traceback" in output: -            output = "Traceback" + output.split("Traceback")[-1] -            file_content = await sdk.ide.readFile(os.path.join(workspace_dir, filename)) -            suggestion = (await sdk.models.gpt35()).complete(dedent(f"""\ -                ```python -                {file_content} -                ``` -                This above code is a dlt pipeline that loads data from an API. The function with the @resource decorator is responsible for calling the API and returning the data. While attempting to run the pipeline, the following error occurred: - -                ```ascii -                {output} -                ``` - -                This is a brief summary of the error followed by a suggestion on how it can be fixed by editing the resource function:""")) - -            api_documentation_url = (await sdk.models.gpt35()).complete(dedent(f"""\ -                The API I am trying to call is the '{sdk.context.get('api_description')}'. I tried calling it in the @resource function like this: -                ```python        -                {file_content} -                ``` -                What is the URL for the API documentation that will help me learn how to make this call? Please format in markdown so I can click the link.""")) - -            sdk.raise_exception( -                title=f"Error while running pipeline.\nFix the resource function in {filename} and rerun this step", message=output, with_step=MessageStep(name=f"Suggestion to solve error {AI_ASSISTED_STRING}", message=dedent(f"""\ -                {suggestion} -                 -                {api_documentation_url} -                 -                After you've fixed the code, click the retry button at the top of the Validate Pipeline step above."""))) - -        # remove exit() from the main main function -        await sdk.run_step(MessageStep(name="Remove early exit() from main function", message="Remove the early exit() from the main function now that we are done testing and want the pipeline to load the data into DuckDB.")) - -        contents = await sdk.ide.readFile(os.path.join(workspace_dir, filename)) -        replacement = "\n".join( -            list(filter(lambda line: line.strip() != "exit()", contents.split("\n")))) -        await sdk.ide.applyFileSystemEdit(FileEdit( -            filepath=os.path.join(workspace_dir, filename), -            replacement=replacement, -            range=Range.from_entire_file(contents) -        )) - -        # load the data into the DuckDB instance -        await sdk.run(f'python3 {filename}', name="Load data into DuckDB", description=f"Running python3 {filename} to load data into DuckDB") - -        table_name = f"{source_name}.{source_name}_resource" -        tables_query_code = dedent(f'''\ -            import duckdb - -            # connect to DuckDB instance -            conn = duckdb.connect(database="{source_name}.duckdb") - -            # get table names -            rows = conn.execute("SELECT * FROM {table_name};").fetchall() - -            # print table names -            for row in rows: -                print(row) -        ''') - -        query_filename = os.path.join(workspace_dir, "query.py") -        await sdk.apply_filesystem_edit(AddFile(filepath=query_filename, content=tables_query_code), name="Add query.py file", description="Adding a file called `query.py` to the workspace that will run a test query on the DuckDB instance") -        await sdk.run('env/bin/python3 query.py', name="Run test query", description="Running `env/bin/python3 query.py` to test that the data was loaded into DuckDB as expected") diff --git a/continuedev/src/continuedev/steps/input/nl_multiselect.py b/continuedev/src/continuedev/steps/input/nl_multiselect.py new file mode 100644 index 00000000..c3c832f5 --- /dev/null +++ b/continuedev/src/continuedev/steps/input/nl_multiselect.py @@ -0,0 +1,27 @@ +from typing import List, Union +from ..core.core import WaitForUserInputStep +from ...core.main import Step +from ...core.sdk import ContinueSDK + + +class NLMultiselectStep(Step): +    hide: bool = True + +    prompt: str +    options: List[str] + +    async def run(self, sdk: ContinueSDK): +        user_response = (await sdk.run_step(WaitForUserInputStep(prompt=self.prompt))).text + +        def extract_option(text: str) -> Union[str, None]: +            for option in self.options: +                if option in text: +                    return option +            return None + +        first_try = extract_option(user_response.lower()) +        if first_try is not None: +            return first_try + +        gpt_parsed = await sdk.models.gpt35.complete(f"These are the available options are: [{', '.join(self.options)}]. The user requested {user_response}. This is the exact string from the options array that they selected:") +        return extract_option(gpt_parsed) or self.options[0] | 
