summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--continuedev/src/continuedev/recipes/AddTransformRecipe/README.md7
-rw-r--r--continuedev/src/continuedev/recipes/AddTransformRecipe/main.py12
-rw-r--r--continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py70
3 files changed, 38 insertions, 51 deletions
diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md b/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md
index 9ad49a5f..d735e0cd 100644
--- a/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md
+++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md
@@ -1,3 +1,8 @@
# AddTransformRecipe
-Uses the Chess.com API example to show how to add map and filter Python transforms to a dlt pipeline. \ No newline at end of file
+Uses the Chess.com API example to show how to add map and filter Python transforms to a dlt pipeline.
+
+Background
+- https://dlthub.com/docs/general-usage/resource#filter-transform-and-pivot-data
+- https://dlthub.com/docs/customizations/customizing-pipelines/renaming_columns
+- https://dlthub.com/docs/customizations/customizing-pipelines/pseudonymizing_columns \ No newline at end of file
diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py
index 0fd96930..2a0736dd 100644
--- a/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py
+++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py
@@ -14,11 +14,11 @@ class AddTransformRecipe(Step):
await sdk.run_step(
MessageStep(message=dedent("""\
This recipe will walk you through the process of adding a transform to a dlt pipeline that uses the chess.com API source. With the help of Continue, you will:
- - X
- - Y
- - Z""")) >>
-
+ - Set up a dlt pipeline for the chess.com API
+ - Add a filter or map transform to the pipeline
+ - Run the pipeline and view the transformed data in a Streamlit app
+ - """)) >>
SetUpChessPipelineStep() >>
- WaitForUserInputStep(prompt="How do you want to transform the chess.com API data before loading it? For example, you could use the `python-chess` library to decode the moves or filter out certain games") >>
- AddTransformStep()
+ WaitForUserInputStep(prompt="How do you want to transform the Chess.com API data before loading it? For example, you could use the `python-chess` library to decode the moves or filter out certain games") >>
+ AddTransformStep(transform_description="Use the `python-chess` library to decode the moves in the game data") # Ask Nate how to not hardcode this here
)
diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py
index 8ab3eda1..46ddbed5 100644
--- a/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py
+++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py
@@ -8,24 +8,10 @@ from ...core.main import Step
from ...core.sdk import ContinueSDK
-"""
-https://dlthub.com/docs/general-usage/resource#filter-transform-and-pivot-data
-
-
-Example: https://dlthub.com/docs/customizations/customizing-pipelines/renaming_columns
-- dlt init chess duckdb
-- python chess.py
-- write a transform function: ideas for transform functions: using chess Python library decode the moves OR filter out certain games
-- use add_map or add_filter
-- run python and streamlit app
-"""
-
class SetUpChessPipelineStep(Step):
hide: bool = True
name: str = "Setup Chess.com API dlt Pipeline"
- api_description: str # e.g. "I want to load data from the weatherapi.com API"
-
async def describe(self, models: Models):
return dedent(f"""\
This step will create a new dlt pipeline that loads data from the chess.com API.
@@ -49,6 +35,8 @@ class SetUpChessPipelineStep(Step):
class AddTransformStep(Step):
hide: bool = True
+ transform_description: str # e.g. "Use the `python-chess` library to decode the moves in the game data"
+
async def run(self, sdk: ContinueSDK):
source_name = 'chess'
filename = f'{source_name}.py'
@@ -60,43 +48,29 @@ class AddTransformStep(Step):
- Open up a Streamlit app for you to view the data
""")))
- # test that the API call works
- await sdk.run(f'python3 {filename}')
-
- # remove exit() from the main main function
- await sdk.edit_file(
- filename=filename,
- prompt='Remove exit() from the main function'
- )
-
- # load the data into the DuckDB instance
- await sdk.run(f'python3 {filename}')
+ prompt = dedent(f"""\
+ Task: Write a transform function using the description below and then use `add_map` or `add_filter` from the `dlt` library to attach it a resource.
- table_name = f"{source_name}.{source_name}_resource"
- examples = dedent(f"""\
-
- Task: Use either the `add_map` or `add_filter` function to transform the data.
+ Description: {self.transform_description}
- Below you will find some docs page that will help you understand this task.
+ Here are some docs pages that will help you better understand how to use `dlt`.
# Customize resources
## Filter, transform and pivot data
You can attach any number of transformations that are evaluated on item per item basis to your resource. The available transformation types:
-
- map - transform the data item (resource.add_map)
- filter - filter the data item (resource.add_filter)
- yield map - a map that returns iterator (so single row may generate many rows - resource.add_yield_map)
- Example: We have a resource that loads a list of users from an api endpoint. We want to customize it so:
-
- we remove users with user_id == "me"
- we anonymize user data
+ Example: We have a resource that loads a list of users from an api endpoint. We want to customize it so:
+ - we remove users with user_id == 'me'
+ - we anonymize user data
Here's our resource:
```python
import dlt
- @dlt.resource(write_disposition="replace")
+ @dlt.resource(write_disposition='replace')
def users():
...
users = requests.get(...)
@@ -109,12 +83,12 @@ class AddTransformStep(Step):
from pipedrive import users
def anonymize_user(user_data):
- user_data["user_id"] = hash_str(user_data["user_id"])
- user_data["user_email"] = hash_str(user_data["user_email"])
+ user_data['user_id'] = hash_str(user_data['user_id'])
+ user_data['user_email'] = hash_str(user_data['user_email'])
return user_data
# add the filter and anonymize function to users resource and enumerate
- for user in users().add_filter(lambda user: user["user_id"] != "me").add_map(anonymize_user):
+ for user in users().add_filter(lambda user: user['user_id'] != 'me').add_map(anonymize_user):
print(user)
```
@@ -168,7 +142,7 @@ class AddTransformStep(Step):
# Pseudonymizing columns
## Pseudonymizing (or anonymizing) columns by replacing the special characters
- Pseudonymization is a deterministic way to hide personally identifiable info (PII), enabling us to consistently achieve the same mapping. If instead you wish to anonymize, you can delete the data, or replace it with a constant. In the example below, we create a dummy source with a PII column called "name", which we replace with deterministic hashes (i.e. replacing the German umlaut).
+ Pseudonymization is a deterministic way to hide personally identifiable info (PII), enabling us to consistently achieve the same mapping. If instead you wish to anonymize, you can delete the data, or replace it with a constant. In the example below, we create a dummy source with a PII column called 'name', which we replace with deterministic hashes (i.e. replacing the German umlaut).
```python
import dlt
@@ -216,8 +190,16 @@ class AddTransformStep(Step):
pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data')
load_info = pipeline.run(data_source)
```
- """)
+ """)
+
+ # edit the pipeline to add a tranform function and attach it to a resource
+ await sdk.edit_file(
+ filename=filename,
+ prompt=prompt
+ )
+
+ # run the pipeline and load the data
+ await sdk.run(f'python3 {filename}')
- query_filename = (await sdk.ide.getWorkspaceDirectory()) + "/query.py"
- await sdk.apply_filesystem_edit(AddFile(filepath=query_filename, content=tables_query_code))
- await sdk.run('env/bin/python3 query.py')
+ # run a streamlit app to show the data
+ await sdk.run(f'dlt pipeline {source_name} show') \ No newline at end of file