summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTy Dunn <ty@tydunn.com>2023-06-05 13:27:09 +0200
committerTy Dunn <ty@tydunn.com>2023-06-05 13:27:09 +0200
commita6ca91d32148600e60bfd8b9c13ac45bc13454f8 (patch)
treee5fbfca447ed5456bfcee0d8677f38ff907280b1
parentffdfad51ffff037565e7b07a2e38c1cbcb6066da (diff)
downloadsncontinue-a6ca91d32148600e60bfd8b9c13ac45bc13454f8.tar.gz
sncontinue-a6ca91d32148600e60bfd8b9c13ac45bc13454f8.tar.bz2
sncontinue-a6ca91d32148600e60bfd8b9c13ac45bc13454f8.zip
add transform to dlt pipeline reicpe
-rw-r--r--continuedev/src/continuedev/recipes/AddTransformRecipe/README.md0
-rw-r--r--continuedev/src/continuedev/recipes/AddTransformRecipe/main.py23
-rw-r--r--continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py105
3 files changed, 128 insertions, 0 deletions
diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md b/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md
diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py
new file mode 100644
index 00000000..974336cf
--- /dev/null
+++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py
@@ -0,0 +1,23 @@
+from textwrap import dedent
+
+from ...core.main import Step
+from ...core.sdk import ContinueSDK
+from ...steps.core.core import WaitForUserInputStep
+from ...steps.main import MessageStep
+from .steps import SetupPipelineStep, ValidatePipelineStep
+
+
+class AddTransformRecipe(Step):
+ hide: bool = True
+
+ async def run(self, sdk: ContinueSDK):
+ await sdk.run_step(
+ MessageStep(message=dedent("""\
+ This recipe will walk you through the process of adding a transform to a dlt pipeline for your chosen data source. With the help of Continue, you will:
+ - X
+ - Y
+ - Z""")) >>
+ WaitForUserInputStep(prompt="What API do you want to load data from?") >>
+ SetupPipelineStep(api_description="WeatherAPI.com API") >>
+ ValidatePipelineStep()
+ )
diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py
new file mode 100644
index 00000000..c6059627
--- /dev/null
+++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py
@@ -0,0 +1,105 @@
+from textwrap import dedent
+
+from ...steps.main import MessageStep
+from ...core.sdk import Models
+from ...core.observation import DictObservation
+from ...models.filesystem_edit import AddFile
+from ...core.main import Step
+from ...core.sdk import ContinueSDK
+
+
+"""
+https://dlthub.com/docs/general-usage/resource#filter-transform-and-pivot-data
+
+Using chess pipeline we show how to add map and filter Python transforms.
+Example: https://dlthub.com/docs/customizations/customizing-pipelines/renaming_columns
+- dlt init chess duckdb
+- python chess.py
+- write a transform function: ideas for transform functions: using chess Python library decode the moves OR filter out certain games
+- use add_map or add_filter
+- run python and streamlit app
+"""
+
+class SetupPipelineStep(Step):
+ hide: bool = True
+ name: str = "Setup dlt Pipeline"
+
+ api_description: str # e.g. "I want to load data from the weatherapi.com API"
+
+ async def describe(self, models: Models):
+ return dedent(f"""\
+ This step will create a new dlt pipeline that loads data from an API, as per your request:
+ {self.api_description}
+ """)
+
+ async def run(self, sdk: ContinueSDK):
+ source_name = (await sdk.models.gpt35()).complete(
+ f"Write a snake_case name for the data source described by {self.api_description}: ").strip()
+ filename = f'{source_name}.py'
+
+ # running commands to get started when creating a new dlt pipeline
+ await sdk.run([
+ 'python3 -m venv env',
+ 'source env/bin/activate',
+ 'pip install dlt',
+ f'dlt init {source_name} duckdb',
+ 'Y',
+ 'pip install -r requirements.txt'
+ ])
+
+ # editing the resource function to call the requested API
+ await sdk.edit_file(
+ filename=filename,
+ prompt=f'Edit the resource function to call the API described by this: {self.api_description}'
+ )
+
+ # wait for user to put API key in secrets.toml
+ await sdk.ide.setFileOpen(await sdk.ide.getWorkspaceDirectory() + "/.dlt/secrets.toml")
+ await sdk.wait_for_user_confirmation("If this service requires an API key, please add it to the `secrets.toml` file and then press `Continue`")
+ return DictObservation(values={"source_name": source_name})
+
+
+class ValidatePipelineStep(Step):
+ hide: bool = True
+
+ async def run(self, sdk: ContinueSDK):
+ source_name = sdk.history.last_observation().values["source_name"]
+ filename = f'{source_name}.py'
+
+ await sdk.run_step(MessageStep(message=dedent("""\
+ This step will validate that your dlt pipeline is working as expected:
+ - Test that the API call works
+ - Load the data into a local DuckDB instance
+ - Write a query to view the data
+ """)))
+
+ # test that the API call works
+ await sdk.run(f'python3 {filename}')
+
+ # remove exit() from the main main function
+ await sdk.edit_file(
+ filename=filename,
+ prompt='Remove exit() from the main function'
+ )
+
+ # load the data into the DuckDB instance
+ await sdk.run(f'python3 {filename}')
+
+ table_name = f"{source_name}.{source_name}_resource"
+ tables_query_code = dedent(f'''\
+ import duckdb
+
+ # connect to DuckDB instance
+ conn = duckdb.connect(database="{source_name}.duckdb")
+
+ # get table names
+ rows = conn.execute("SELECT * FROM {table_name};").fetchall()
+
+ # print table names
+ for row in rows:
+ print(row)
+ ''')
+
+ query_filename = (await sdk.ide.getWorkspaceDirectory()) + "/query.py"
+ await sdk.apply_filesystem_edit(AddFile(filepath=query_filename, content=tables_query_code))
+ await sdk.run('env/bin/python3 query.py')