summaryrefslogtreecommitdiff
path: root/continuedev/src
diff options
context:
space:
mode:
authorNate Sesti <sestinj@gmail.com>2023-06-07 17:00:00 -0400
committerNate Sesti <sestinj@gmail.com>2023-06-07 17:00:00 -0400
commitc84eae1885489ec7b07e0bb0eea1bac36f40c181 (patch)
treecacf2a8141429ea336308e62c5c2149567e1a5cb /continuedev/src
parent82247453a12c7e697351a4464013c690c848d552 (diff)
downloadsncontinue-c84eae1885489ec7b07e0bb0eea1bac36f40c181.tar.gz
sncontinue-c84eae1885489ec7b07e0bb0eea1bac36f40c181.tar.bz2
sncontinue-c84eae1885489ec7b07e0bb0eea1bac36f40c181.zip
quick look over on transform recipe
Diffstat (limited to 'continuedev/src')
-rw-r--r--continuedev/src/continuedev/core/policy.py5
-rw-r--r--continuedev/src/continuedev/recipes/AddTransformRecipe/dlt_transform_docs.md135
-rw-r--r--continuedev/src/continuedev/recipes/AddTransformRecipe/main.py13
-rw-r--r--continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py173
-rw-r--r--continuedev/src/continuedev/steps/steps_on_startup.py5
5 files changed, 173 insertions, 158 deletions
diff --git a/continuedev/src/continuedev/core/policy.py b/continuedev/src/continuedev/core/policy.py
index 8aea8de7..c3f1d188 100644
--- a/continuedev/src/continuedev/core/policy.py
+++ b/continuedev/src/continuedev/core/policy.py
@@ -3,6 +3,7 @@ from typing import List, Tuple, Type
from ..steps.chroma import AnswerQuestionChroma, EditFileChroma, CreateCodebaseIndexChroma
from ..steps.steps_on_startup import StepsOnStartupStep
from ..recipes.CreatePipelineRecipe.main import CreatePipelineRecipe
+from ..recipes.AddTransformRecipe.main import AddTransformRecipe
from .main import Step, Validator, History, Policy
from .observation import Observation, TracebackObservation, UserInputObservation
from ..steps.main import EditHighlightedCodeStep, SolveTracebackStep, RunCodeStep, FasterEditHighlightedCodeStep, StarCoderEditHighlightedCodeStep, MessageStep, EmptyStep, SetupContinueWorkspaceStep
@@ -28,8 +29,10 @@ class DemoPolicy(Policy):
# This could be defined with ObservationTypePolicy. Ergonomics not right though.
if "/pytest" in observation.user_input.lower():
return WritePytestsRecipe(instructions=observation.user_input)
- elif "/dlt" in observation.user_input.lower() or " dlt" in observation.user_input.lower():
+ elif "/dlt" in observation.user_input.lower():
return CreatePipelineRecipe()
+ elif "/transform" in observation.user_input.lower():
+ return AddTransformRecipe()
elif "/comment" in observation.user_input.lower():
return CommentCodeStep()
elif "/ask" in observation.user_input:
diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/dlt_transform_docs.md b/continuedev/src/continuedev/recipes/AddTransformRecipe/dlt_transform_docs.md
new file mode 100644
index 00000000..658b285f
--- /dev/null
+++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/dlt_transform_docs.md
@@ -0,0 +1,135 @@
+# Customize resources
+## Filter, transform and pivot data
+
+You can attach any number of transformations that are evaluated on item per item basis to your resource. The available transformation types:
+- map - transform the data item (resource.add_map)
+- filter - filter the data item (resource.add_filter)
+- yield map - a map that returns iterator (so single row may generate many rows - resource.add_yield_map)
+
+Example: We have a resource that loads a list of users from an api endpoint. We want to customize it so:
+- we remove users with user_id == 'me'
+- we anonymize user data
+Here's our resource:
+```python
+import dlt
+
+@dlt.resource(write_disposition='replace')
+def users():
+ ...
+ users = requests.get(...)
+ ...
+ yield users
+```
+
+Here's our script that defines transformations and loads the data.
+```python
+from pipedrive import users
+
+def anonymize_user(user_data):
+ user_data['user_id'] = hash_str(user_data['user_id'])
+ user_data['user_email'] = hash_str(user_data['user_email'])
+ return user_data
+
+# add the filter and anonymize function to users resource and enumerate
+for user in users().add_filter(lambda user: user['user_id'] != 'me').add_map(anonymize_user):
+print(user)
+```
+
+Here is a more complex example of a filter transformation:
+
+ # Renaming columns
+ ## Renaming columns by replacing the special characters
+
+ In the example below, we create a dummy source with special characters in the name. We then write a function that we intend to apply to the resource to modify its output (i.e. replacing the German umlaut): replace_umlauts_in_dict_keys.
+ ```python
+ import dlt
+
+ # create a dummy source with umlauts (special characters) in key names (um)
+ @dlt.source
+ def dummy_source(prefix: str = None):
+ @dlt.resource
+ def dummy_data():
+ for _ in range(100):
+ yield {f'Objekt_{_}':{'Größe':_, 'Äquivalenzprüfung':True}}
+ return dummy_data(),
+
+ def replace_umlauts_in_dict_keys(d):
+ # Replaces umlauts in dictionary keys with standard characters.
+ umlaut_map = {'ä': 'ae', 'ö': 'oe', 'ü': 'ue', 'ß': 'ss', 'Ä': 'Ae', 'Ö': 'Oe', 'Ü': 'Ue'}
+ result = {}
+ for k, v in d.items():
+ new_key = ''.join(umlaut_map.get(c, c) for c in k)
+ if isinstance(v, dict):
+ result[new_key] = replace_umlauts_in_dict_keys(v)
+ else:
+ result[new_key] = v
+ return result
+
+ # We can add the map function to the resource
+
+ # 1. Create an instance of the source so you can edit it.
+ data_source = dummy_source()
+
+ # 2. Modify this source instance's resource
+ data_source = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys)
+
+ # 3. Inspect your result
+ for row in data_source:
+ print(row)
+
+ # {'Objekt_0': {'Groesse': 0, 'Aequivalenzpruefung': True}}
+ # ...
+ ```
+
+Here is a more complex example of a map transformation:
+
+# Pseudonymizing columns
+## Pseudonymizing (or anonymizing) columns by replacing the special characters
+Pseudonymization is a deterministic way to hide personally identifiable info (PII), enabling us to consistently achieve the same mapping. If instead you wish to anonymize, you can delete the data, or replace it with a constant. In the example below, we create a dummy source with a PII column called 'name', which we replace with deterministic hashes (i.e. replacing the German umlaut).
+
+```python
+import dlt
+import hashlib
+
+@dlt.source
+def dummy_source(prefix: str = None):
+ @dlt.resource
+ def dummy_data():
+ for _ in range(3):
+ yield {'id':_, 'name': f'Jane Washington {_}'}
+ return dummy_data(),
+
+def pseudonymize_name(doc):
+ Pseudonmyisation is a deterministic type of PII-obscuring
+ Its role is to allow identifying users by their hash, without revealing the underlying info.
+
+ # add a constant salt to generate
+ salt = 'WI@N57%zZrmk#88c'
+ salted_string = doc['name'] + salt
+ sh = hashlib.sha256()
+ sh.update(salted_string.encode())
+ hashed_string = sh.digest().hex()
+ doc['name'] = hashed_string
+ return doc
+
+ # run it as is
+ for row in dummy_source().dummy_data().add_map(pseudonymize_name):
+ print(row)
+
+ #{'id': 0, 'name': '96259edb2b28b48bebce8278c550e99fbdc4a3fac8189e6b90f183ecff01c442'}
+ #{'id': 1, 'name': '92d3972b625cbd21f28782fb5c89552ce1aa09281892a2ab32aee8feeb3544a1'}
+ #{'id': 2, 'name': '443679926a7cff506a3b5d5d094dc7734861352b9e0791af5d39db5a7356d11a'}
+
+ # Or create an instance of the data source, modify the resource and run the source.
+
+ # 1. Create an instance of the source so you can edit it.
+ data_source = dummy_source()
+ # 2. Modify this source instance's resource
+ data_source = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys)
+ # 3. Inspect your result
+ for row in data_source:
+ print(row)
+
+ pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data')
+ load_info = pipeline.run(data_source)
+``` \ No newline at end of file
diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py
index 2a0736dd..5e05b587 100644
--- a/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py
+++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py
@@ -11,14 +11,17 @@ class AddTransformRecipe(Step):
hide: bool = True
async def run(self, sdk: ContinueSDK):
- await sdk.run_step(
+ text_observation = await sdk.run_step(
MessageStep(message=dedent("""\
This recipe will walk you through the process of adding a transform to a dlt pipeline that uses the chess.com API source. With the help of Continue, you will:
- Set up a dlt pipeline for the chess.com API
- Add a filter or map transform to the pipeline
- - Run the pipeline and view the transformed data in a Streamlit app
- - """)) >>
+ - Run the pipeline and view the transformed data in a Streamlit app"""), name="Add transformation to a dlt pipeline") >>
SetUpChessPipelineStep() >>
- WaitForUserInputStep(prompt="How do you want to transform the Chess.com API data before loading it? For example, you could use the `python-chess` library to decode the moves or filter out certain games") >>
- AddTransformStep(transform_description="Use the `python-chess` library to decode the moves in the game data") # Ask Nate how to not hardcode this here
+ WaitForUserInputStep(
+ prompt="How do you want to transform the Chess.com API data before loading it? For example, you could use the `python-chess` library to decode the moves or filter out certain games")
+ )
+ await sdk.run_step(
+ AddTransformStep(
+ transform_description=text_observation.text)
)
diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py
index 46ddbed5..f7f5a43b 100644
--- a/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py
+++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py
@@ -1,3 +1,4 @@
+import os
from textwrap import dedent
from ...steps.main import MessageStep
@@ -7,35 +8,39 @@ from ...models.filesystem_edit import AddFile
from ...core.main import Step
from ...core.sdk import ContinueSDK
+AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)"
+
class SetUpChessPipelineStep(Step):
hide: bool = True
name: str = "Setup Chess.com API dlt Pipeline"
async def describe(self, models: Models):
- return dedent(f"""\
- This step will create a new dlt pipeline that loads data from the chess.com API.
- """)
+ return "This step will create a new dlt pipeline that loads data from the chess.com API."
async def run(self, sdk: ContinueSDK):
- filename = 'chess.py'
-
# running commands to get started when creating a new dlt pipeline
await sdk.run([
'python3 -m venv env',
'source env/bin/activate',
'pip install dlt',
- 'dlt init chess duckdb',
- 'Y',
+ 'dlt --non-interactive init chess duckdb',
'pip install -r requirements.txt'
- ])
+ ], name="Set up Python environment", description=dedent(f"""\
+ Running the following commands:
+ - `python3 -m venv env`: Create a Python virtual environment
+ - `source env/bin/activate`: Activate the virtual environment
+ - `pip install dlt`: Install dlt
+ - `dlt init chess duckdb`: Create a new dlt pipeline called "chess" that loads data into a local DuckDB instance
+ - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline"""))
class AddTransformStep(Step):
hide: bool = True
- transform_description: str # e.g. "Use the `python-chess` library to decode the moves in the game data"
+ # e.g. "Use the `python-chess` library to decode the moves in the game data"
+ transform_description: str
async def run(self, sdk: ContinueSDK):
source_name = 'chess'
@@ -45,8 +50,10 @@ class AddTransformStep(Step):
This step will customize your resource function with a transform of your choice:
- Add a filter or map transformation depending on your request
- Load the data into a local DuckDB instance
- - Open up a Streamlit app for you to view the data
- """)))
+ - Open up a Streamlit app for you to view the data"""), name="Write transformation function"))
+
+ with open(os.path.join(os.path.dirname(__file__), 'dlt_transform_docs.md')) as f:
+ dlt_transform_docs = f.read()
prompt = dedent(f"""\
Task: Write a transform function using the description below and then use `add_map` or `add_filter` from the `dlt` library to attach it a resource.
@@ -55,151 +62,17 @@ class AddTransformStep(Step):
Here are some docs pages that will help you better understand how to use `dlt`.
- # Customize resources
- ## Filter, transform and pivot data
-
- You can attach any number of transformations that are evaluated on item per item basis to your resource. The available transformation types:
- - map - transform the data item (resource.add_map)
- - filter - filter the data item (resource.add_filter)
- - yield map - a map that returns iterator (so single row may generate many rows - resource.add_yield_map)
-
- Example: We have a resource that loads a list of users from an api endpoint. We want to customize it so:
- - we remove users with user_id == 'me'
- - we anonymize user data
- Here's our resource:
- ```python
- import dlt
-
- @dlt.resource(write_disposition='replace')
- def users():
- ...
- users = requests.get(...)
- ...
- yield users
- ```
-
- Here's our script that defines transformations and loads the data.
- ```python
- from pipedrive import users
-
- def anonymize_user(user_data):
- user_data['user_id'] = hash_str(user_data['user_id'])
- user_data['user_email'] = hash_str(user_data['user_email'])
- return user_data
-
- # add the filter and anonymize function to users resource and enumerate
- for user in users().add_filter(lambda user: user['user_id'] != 'me').add_map(anonymize_user):
- print(user)
- ```
-
- Here is a more complex example of a filter transformation:
-
- # Renaming columns
- ## Renaming columns by replacing the special characters
-
- In the example below, we create a dummy source with special characters in the name. We then write a function that we intend to apply to the resource to modify its output (i.e. replacing the German umlaut): replace_umlauts_in_dict_keys.
- ```python
- import dlt
-
- # create a dummy source with umlauts (special characters) in key names (um)
- @dlt.source
- def dummy_source(prefix: str = None):
- @dlt.resource
- def dummy_data():
- for _ in range(100):
- yield {f'Objekt_{_}':{'Größe':_, 'Äquivalenzprüfung':True}}
- return dummy_data(),
-
- def replace_umlauts_in_dict_keys(d):
- # Replaces umlauts in dictionary keys with standard characters.
- umlaut_map = {'ä': 'ae', 'ö': 'oe', 'ü': 'ue', 'ß': 'ss', 'Ä': 'Ae', 'Ö': 'Oe', 'Ü': 'Ue'}
- result = {}
- for k, v in d.items():
- new_key = ''.join(umlaut_map.get(c, c) for c in k)
- if isinstance(v, dict):
- result[new_key] = replace_umlauts_in_dict_keys(v)
- else:
- result[new_key] = v
- return result
-
- # We can add the map function to the resource
-
- # 1. Create an instance of the source so you can edit it.
- data_source = dummy_source()
-
- # 2. Modify this source instance's resource
- data_source = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys)
-
- # 3. Inspect your result
- for row in data_source:
- print(row)
-
- # {'Objekt_0': {'Groesse': 0, 'Aequivalenzpruefung': True}}
- # ...
- ```
-
- Here is a more complex example of a map transformation:
-
- # Pseudonymizing columns
- ## Pseudonymizing (or anonymizing) columns by replacing the special characters
- Pseudonymization is a deterministic way to hide personally identifiable info (PII), enabling us to consistently achieve the same mapping. If instead you wish to anonymize, you can delete the data, or replace it with a constant. In the example below, we create a dummy source with a PII column called 'name', which we replace with deterministic hashes (i.e. replacing the German umlaut).
-
- ```python
- import dlt
- import hashlib
-
- @dlt.source
- def dummy_source(prefix: str = None):
- @dlt.resource
- def dummy_data():
- for _ in range(3):
- yield {'id':_, 'name': f'Jane Washington {_}'}
- return dummy_data(),
-
- def pseudonymize_name(doc):
- Pseudonmyisation is a deterministic type of PII-obscuring
- Its role is to allow identifying users by their hash, without revealing the underlying info.
-
- # add a constant salt to generate
- salt = 'WI@N57%zZrmk#88c'
- salted_string = doc['name'] + salt
- sh = hashlib.sha256()
- sh.update(salted_string.encode())
- hashed_string = sh.digest().hex()
- doc['name'] = hashed_string
- return doc
-
- # run it as is
- for row in dummy_source().dummy_data().add_map(pseudonymize_name):
- print(row)
-
- #{'id': 0, 'name': '96259edb2b28b48bebce8278c550e99fbdc4a3fac8189e6b90f183ecff01c442'}
- #{'id': 1, 'name': '92d3972b625cbd21f28782fb5c89552ce1aa09281892a2ab32aee8feeb3544a1'}
- #{'id': 2, 'name': '443679926a7cff506a3b5d5d094dc7734861352b9e0791af5d39db5a7356d11a'}
-
- # Or create an instance of the data source, modify the resource and run the source.
-
- # 1. Create an instance of the source so you can edit it.
- data_source = dummy_source()
- # 2. Modify this source instance's resource
- data_source = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys)
- # 3. Inspect your result
- for row in data_source:
- print(row)
-
- pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data')
- load_info = pipeline.run(data_source)
- ```
- """)
+ {dlt_transform_docs}""")
# edit the pipeline to add a tranform function and attach it to a resource
await sdk.edit_file(
filename=filename,
- prompt=prompt
+ prompt=prompt,
+ name=f"Writing transform function {AI_ASSISTED_STRING}"
)
# run the pipeline and load the data
- await sdk.run(f'python3 {filename}')
+ await sdk.run(f'python3 {filename}', name="Run the pipeline", description=f"Running `python3 {filename}` to load the data into a local DuckDB instance")
# run a streamlit app to show the data
- await sdk.run(f'dlt pipeline {source_name} show') \ No newline at end of file
+ await sdk.run(f'dlt pipeline {source_name} show', name="Show data in a Streamlit app", description=f"Running `dlt pipeline {source_name} show` to show the data in a Streamlit app, where you can view and play with the data.")
diff --git a/continuedev/src/continuedev/steps/steps_on_startup.py b/continuedev/src/continuedev/steps/steps_on_startup.py
index cd40ff56..63dedd82 100644
--- a/continuedev/src/continuedev/steps/steps_on_startup.py
+++ b/continuedev/src/continuedev/steps/steps_on_startup.py
@@ -1,11 +1,12 @@
from ..core.main import ContinueSDK, Models, Step
from .main import UserInputStep
from ..recipes.CreatePipelineRecipe.main import CreatePipelineRecipe
-
+from ..recipes.AddTransformRecipe.main import AddTransformRecipe
step_name_to_step_class = {
"UserInputStep": UserInputStep,
- "CreatePipelineRecipe": CreatePipelineRecipe
+ "CreatePipelineRecipe": CreatePipelineRecipe,
+ "AddTransformRecipe": AddTransformRecipe
}