diff options
author | Ty Dunn <ty@tydunn.com> | 2023-06-05 13:27:09 +0200 |
---|---|---|
committer | Ty Dunn <ty@tydunn.com> | 2023-06-05 13:27:09 +0200 |
commit | a6ca91d32148600e60bfd8b9c13ac45bc13454f8 (patch) | |
tree | e5fbfca447ed5456bfcee0d8677f38ff907280b1 /continuedev | |
parent | ffdfad51ffff037565e7b07a2e38c1cbcb6066da (diff) | |
download | sncontinue-a6ca91d32148600e60bfd8b9c13ac45bc13454f8.tar.gz sncontinue-a6ca91d32148600e60bfd8b9c13ac45bc13454f8.tar.bz2 sncontinue-a6ca91d32148600e60bfd8b9c13ac45bc13454f8.zip |
add transform to dlt pipeline reicpe
Diffstat (limited to 'continuedev')
3 files changed, 128 insertions, 0 deletions
diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md b/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py new file mode 100644 index 00000000..974336cf --- /dev/null +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py @@ -0,0 +1,23 @@ +from textwrap import dedent + +from ...core.main import Step +from ...core.sdk import ContinueSDK +from ...steps.core.core import WaitForUserInputStep +from ...steps.main import MessageStep +from .steps import SetupPipelineStep, ValidatePipelineStep + + +class AddTransformRecipe(Step): + hide: bool = True + + async def run(self, sdk: ContinueSDK): + await sdk.run_step( + MessageStep(message=dedent("""\ + This recipe will walk you through the process of adding a transform to a dlt pipeline for your chosen data source. With the help of Continue, you will: + - X + - Y + - Z""")) >> + WaitForUserInputStep(prompt="What API do you want to load data from?") >> + SetupPipelineStep(api_description="WeatherAPI.com API") >> + ValidatePipelineStep() + ) diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py new file mode 100644 index 00000000..c6059627 --- /dev/null +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py @@ -0,0 +1,105 @@ +from textwrap import dedent + +from ...steps.main import MessageStep +from ...core.sdk import Models +from ...core.observation import DictObservation +from ...models.filesystem_edit import AddFile +from ...core.main import Step +from ...core.sdk import ContinueSDK + + +""" +https://dlthub.com/docs/general-usage/resource#filter-transform-and-pivot-data + +Using chess pipeline we show how to add map and filter Python transforms. +Example: https://dlthub.com/docs/customizations/customizing-pipelines/renaming_columns +- dlt init chess duckdb +- python chess.py +- write a transform function: ideas for transform functions: using chess Python library decode the moves OR filter out certain games +- use add_map or add_filter +- run python and streamlit app +""" + +class SetupPipelineStep(Step): + hide: bool = True + name: str = "Setup dlt Pipeline" + + api_description: str # e.g. "I want to load data from the weatherapi.com API" + + async def describe(self, models: Models): + return dedent(f"""\ + This step will create a new dlt pipeline that loads data from an API, as per your request: + {self.api_description} + """) + + async def run(self, sdk: ContinueSDK): + source_name = (await sdk.models.gpt35()).complete( + f"Write a snake_case name for the data source described by {self.api_description}: ").strip() + filename = f'{source_name}.py' + + # running commands to get started when creating a new dlt pipeline + await sdk.run([ + 'python3 -m venv env', + 'source env/bin/activate', + 'pip install dlt', + f'dlt init {source_name} duckdb', + 'Y', + 'pip install -r requirements.txt' + ]) + + # editing the resource function to call the requested API + await sdk.edit_file( + filename=filename, + prompt=f'Edit the resource function to call the API described by this: {self.api_description}' + ) + + # wait for user to put API key in secrets.toml + await sdk.ide.setFileOpen(await sdk.ide.getWorkspaceDirectory() + "/.dlt/secrets.toml") + await sdk.wait_for_user_confirmation("If this service requires an API key, please add it to the `secrets.toml` file and then press `Continue`") + return DictObservation(values={"source_name": source_name}) + + +class ValidatePipelineStep(Step): + hide: bool = True + + async def run(self, sdk: ContinueSDK): + source_name = sdk.history.last_observation().values["source_name"] + filename = f'{source_name}.py' + + await sdk.run_step(MessageStep(message=dedent("""\ + This step will validate that your dlt pipeline is working as expected: + - Test that the API call works + - Load the data into a local DuckDB instance + - Write a query to view the data + """))) + + # test that the API call works + await sdk.run(f'python3 {filename}') + + # remove exit() from the main main function + await sdk.edit_file( + filename=filename, + prompt='Remove exit() from the main function' + ) + + # load the data into the DuckDB instance + await sdk.run(f'python3 {filename}') + + table_name = f"{source_name}.{source_name}_resource" + tables_query_code = dedent(f'''\ + import duckdb + + # connect to DuckDB instance + conn = duckdb.connect(database="{source_name}.duckdb") + + # get table names + rows = conn.execute("SELECT * FROM {table_name};").fetchall() + + # print table names + for row in rows: + print(row) + ''') + + query_filename = (await sdk.ide.getWorkspaceDirectory()) + "/query.py" + await sdk.apply_filesystem_edit(AddFile(filepath=query_filename, content=tables_query_code)) + await sdk.run('env/bin/python3 query.py') |