diff options
3 files changed, 128 insertions, 0 deletions
| diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md b/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py new file mode 100644 index 00000000..974336cf --- /dev/null +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py @@ -0,0 +1,23 @@ +from textwrap import dedent + +from ...core.main import Step +from ...core.sdk import ContinueSDK +from ...steps.core.core import WaitForUserInputStep +from ...steps.main import MessageStep +from .steps import SetupPipelineStep, ValidatePipelineStep + + +class AddTransformRecipe(Step): +    hide: bool = True + +    async def run(self, sdk: ContinueSDK): +        await sdk.run_step( +            MessageStep(message=dedent("""\ +                This recipe will walk you through the process of adding a transform to a dlt pipeline for your chosen data source. With the help of Continue, you will: +                - X +                - Y +                - Z""")) >> +            WaitForUserInputStep(prompt="What API do you want to load data from?") >> +            SetupPipelineStep(api_description="WeatherAPI.com API") >> +            ValidatePipelineStep() +        ) diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py new file mode 100644 index 00000000..c6059627 --- /dev/null +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py @@ -0,0 +1,105 @@ +from textwrap import dedent + +from ...steps.main import MessageStep +from ...core.sdk import Models +from ...core.observation import DictObservation +from ...models.filesystem_edit import AddFile +from ...core.main import Step +from ...core.sdk import ContinueSDK + + +""" +https://dlthub.com/docs/general-usage/resource#filter-transform-and-pivot-data + +Using chess pipeline we show how to add map and filter Python transforms. +Example: https://dlthub.com/docs/customizations/customizing-pipelines/renaming_columns +- dlt init chess duckdb +- python chess.py +- write a transform function: ideas for transform functions: using chess Python library decode the moves OR filter out certain games +- use add_map or add_filter +- run python and streamlit app +""" + +class SetupPipelineStep(Step): +    hide: bool = True +    name: str = "Setup dlt Pipeline" + +    api_description: str  # e.g. "I want to load data from the weatherapi.com API" + +    async def describe(self, models: Models): +        return dedent(f"""\ +        This step will create a new dlt pipeline that loads data from an API, as per your request: +        {self.api_description} +        """) + +    async def run(self, sdk: ContinueSDK): +        source_name = (await sdk.models.gpt35()).complete( +            f"Write a snake_case name for the data source described by {self.api_description}: ").strip() +        filename = f'{source_name}.py' + +        # running commands to get started when creating a new dlt pipeline +        await sdk.run([ +            'python3 -m venv env', +            'source env/bin/activate', +            'pip install dlt', +            f'dlt init {source_name} duckdb', +            'Y', +            'pip install -r requirements.txt' +        ]) + +        # editing the resource function to call the requested API +        await sdk.edit_file( +            filename=filename, +            prompt=f'Edit the resource function to call the API described by this: {self.api_description}' +        ) + +        # wait for user to put API key in secrets.toml +        await sdk.ide.setFileOpen(await sdk.ide.getWorkspaceDirectory() + "/.dlt/secrets.toml") +        await sdk.wait_for_user_confirmation("If this service requires an API key, please add it to the `secrets.toml` file and then press `Continue`") +        return DictObservation(values={"source_name": source_name}) + + +class ValidatePipelineStep(Step): +    hide: bool = True + +    async def run(self, sdk: ContinueSDK): +        source_name = sdk.history.last_observation().values["source_name"] +        filename = f'{source_name}.py' + +        await sdk.run_step(MessageStep(message=dedent("""\ +                This step will validate that your dlt pipeline is working as expected: +                - Test that the API call works +                - Load the data into a local DuckDB instance +                - Write a query to view the data +                """))) + +        # test that the API call works +        await sdk.run(f'python3 {filename}') + +        # remove exit() from the main main function +        await sdk.edit_file( +            filename=filename, +            prompt='Remove exit() from the main function' +        ) + +        # load the data into the DuckDB instance +        await sdk.run(f'python3 {filename}') + +        table_name = f"{source_name}.{source_name}_resource" +        tables_query_code = dedent(f'''\ +            import duckdb + +            # connect to DuckDB instance +            conn = duckdb.connect(database="{source_name}.duckdb") + +            # get table names +            rows = conn.execute("SELECT * FROM {table_name};").fetchall() + +            # print table names +            for row in rows: +                print(row) +        ''') + +        query_filename = (await sdk.ide.getWorkspaceDirectory()) + "/query.py" +        await sdk.apply_filesystem_edit(AddFile(filepath=query_filename, content=tables_query_code)) +        await sdk.run('env/bin/python3 query.py') | 
