From 5cd4ff9d0183233bc4bc079e7c181372eef636e7 Mon Sep 17 00:00:00 2001 From: Ty Dunn Date: Mon, 5 Jun 2023 13:27:09 +0200 Subject: add transform to dlt pipeline reicpe --- .../recipes/AddTransformRecipe/README.md | 0 .../continuedev/recipes/AddTransformRecipe/main.py | 23 +++++ .../recipes/AddTransformRecipe/steps.py | 105 +++++++++++++++++++++ 3 files changed, 128 insertions(+) create mode 100644 continuedev/src/continuedev/recipes/AddTransformRecipe/README.md create mode 100644 continuedev/src/continuedev/recipes/AddTransformRecipe/main.py create mode 100644 continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md b/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md new file mode 100644 index 00000000..e69de29b diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py new file mode 100644 index 00000000..974336cf --- /dev/null +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py @@ -0,0 +1,23 @@ +from textwrap import dedent + +from ...core.main import Step +from ...core.sdk import ContinueSDK +from ...steps.core.core import WaitForUserInputStep +from ...steps.main import MessageStep +from .steps import SetupPipelineStep, ValidatePipelineStep + + +class AddTransformRecipe(Step): + hide: bool = True + + async def run(self, sdk: ContinueSDK): + await sdk.run_step( + MessageStep(message=dedent("""\ + This recipe will walk you through the process of adding a transform to a dlt pipeline for your chosen data source. With the help of Continue, you will: + - X + - Y + - Z""")) >> + WaitForUserInputStep(prompt="What API do you want to load data from?") >> + SetupPipelineStep(api_description="WeatherAPI.com API") >> + ValidatePipelineStep() + ) diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py new file mode 100644 index 00000000..c6059627 --- /dev/null +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py @@ -0,0 +1,105 @@ +from textwrap import dedent + +from ...steps.main import MessageStep +from ...core.sdk import Models +from ...core.observation import DictObservation +from ...models.filesystem_edit import AddFile +from ...core.main import Step +from ...core.sdk import ContinueSDK + + +""" +https://dlthub.com/docs/general-usage/resource#filter-transform-and-pivot-data + +Using chess pipeline we show how to add map and filter Python transforms. +Example: https://dlthub.com/docs/customizations/customizing-pipelines/renaming_columns +- dlt init chess duckdb +- python chess.py +- write a transform function: ideas for transform functions: using chess Python library decode the moves OR filter out certain games +- use add_map or add_filter +- run python and streamlit app +""" + +class SetupPipelineStep(Step): + hide: bool = True + name: str = "Setup dlt Pipeline" + + api_description: str # e.g. "I want to load data from the weatherapi.com API" + + async def describe(self, models: Models): + return dedent(f"""\ + This step will create a new dlt pipeline that loads data from an API, as per your request: + {self.api_description} + """) + + async def run(self, sdk: ContinueSDK): + source_name = (await sdk.models.gpt35()).complete( + f"Write a snake_case name for the data source described by {self.api_description}: ").strip() + filename = f'{source_name}.py' + + # running commands to get started when creating a new dlt pipeline + await sdk.run([ + 'python3 -m venv env', + 'source env/bin/activate', + 'pip install dlt', + f'dlt init {source_name} duckdb', + 'Y', + 'pip install -r requirements.txt' + ]) + + # editing the resource function to call the requested API + await sdk.edit_file( + filename=filename, + prompt=f'Edit the resource function to call the API described by this: {self.api_description}' + ) + + # wait for user to put API key in secrets.toml + await sdk.ide.setFileOpen(await sdk.ide.getWorkspaceDirectory() + "/.dlt/secrets.toml") + await sdk.wait_for_user_confirmation("If this service requires an API key, please add it to the `secrets.toml` file and then press `Continue`") + return DictObservation(values={"source_name": source_name}) + + +class ValidatePipelineStep(Step): + hide: bool = True + + async def run(self, sdk: ContinueSDK): + source_name = sdk.history.last_observation().values["source_name"] + filename = f'{source_name}.py' + + await sdk.run_step(MessageStep(message=dedent("""\ + This step will validate that your dlt pipeline is working as expected: + - Test that the API call works + - Load the data into a local DuckDB instance + - Write a query to view the data + """))) + + # test that the API call works + await sdk.run(f'python3 {filename}') + + # remove exit() from the main main function + await sdk.edit_file( + filename=filename, + prompt='Remove exit() from the main function' + ) + + # load the data into the DuckDB instance + await sdk.run(f'python3 {filename}') + + table_name = f"{source_name}.{source_name}_resource" + tables_query_code = dedent(f'''\ + import duckdb + + # connect to DuckDB instance + conn = duckdb.connect(database="{source_name}.duckdb") + + # get table names + rows = conn.execute("SELECT * FROM {table_name};").fetchall() + + # print table names + for row in rows: + print(row) + ''') + + query_filename = (await sdk.ide.getWorkspaceDirectory()) + "/query.py" + await sdk.apply_filesystem_edit(AddFile(filepath=query_filename, content=tables_query_code)) + await sdk.run('env/bin/python3 query.py') -- cgit v1.2.3-70-g09d2 From 8d67b7ea38811957e23d3b566e5c0fce394588d5 Mon Sep 17 00:00:00 2001 From: Ty Dunn Date: Mon, 5 Jun 2023 17:59:35 +0200 Subject: more add transform progress --- .../recipes/AddTransformRecipe/README.md | 3 + .../continuedev/recipes/AddTransformRecipe/main.py | 11 +- .../recipes/AddTransformRecipe/steps.py | 194 +++++++++++++++++---- 3 files changed, 165 insertions(+), 43 deletions(-) diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md b/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md index e69de29b..9ad49a5f 100644 --- a/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md @@ -0,0 +1,3 @@ +# AddTransformRecipe + +Uses the Chess.com API example to show how to add map and filter Python transforms to a dlt pipeline. \ No newline at end of file diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py index 974336cf..0fd96930 100644 --- a/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py @@ -4,7 +4,7 @@ from ...core.main import Step from ...core.sdk import ContinueSDK from ...steps.core.core import WaitForUserInputStep from ...steps.main import MessageStep -from .steps import SetupPipelineStep, ValidatePipelineStep +from .steps import SetUpChessPipelineStep, AddTransformStep class AddTransformRecipe(Step): @@ -13,11 +13,12 @@ class AddTransformRecipe(Step): async def run(self, sdk: ContinueSDK): await sdk.run_step( MessageStep(message=dedent("""\ - This recipe will walk you through the process of adding a transform to a dlt pipeline for your chosen data source. With the help of Continue, you will: + This recipe will walk you through the process of adding a transform to a dlt pipeline that uses the chess.com API source. With the help of Continue, you will: - X - Y - Z""")) >> - WaitForUserInputStep(prompt="What API do you want to load data from?") >> - SetupPipelineStep(api_description="WeatherAPI.com API") >> - ValidatePipelineStep() + + SetUpChessPipelineStep() >> + WaitForUserInputStep(prompt="How do you want to transform the chess.com API data before loading it? For example, you could use the `python-chess` library to decode the moves or filter out certain games") >> + AddTransformStep() ) diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py index c6059627..8ab3eda1 100644 --- a/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py @@ -11,7 +11,7 @@ from ...core.sdk import ContinueSDK """ https://dlthub.com/docs/general-usage/resource#filter-transform-and-pivot-data -Using chess pipeline we show how to add map and filter Python transforms. + Example: https://dlthub.com/docs/customizations/customizing-pipelines/renaming_columns - dlt init chess duckdb - python chess.py @@ -20,57 +20,44 @@ Example: https://dlthub.com/docs/customizations/customizing-pipelines/renaming_c - run python and streamlit app """ -class SetupPipelineStep(Step): +class SetUpChessPipelineStep(Step): hide: bool = True - name: str = "Setup dlt Pipeline" + name: str = "Setup Chess.com API dlt Pipeline" api_description: str # e.g. "I want to load data from the weatherapi.com API" async def describe(self, models: Models): return dedent(f"""\ - This step will create a new dlt pipeline that loads data from an API, as per your request: - {self.api_description} + This step will create a new dlt pipeline that loads data from the chess.com API. """) async def run(self, sdk: ContinueSDK): - source_name = (await sdk.models.gpt35()).complete( - f"Write a snake_case name for the data source described by {self.api_description}: ").strip() - filename = f'{source_name}.py' + + filename = 'chess.py' # running commands to get started when creating a new dlt pipeline await sdk.run([ 'python3 -m venv env', 'source env/bin/activate', 'pip install dlt', - f'dlt init {source_name} duckdb', + 'dlt init chess duckdb', 'Y', 'pip install -r requirements.txt' ]) - # editing the resource function to call the requested API - await sdk.edit_file( - filename=filename, - prompt=f'Edit the resource function to call the API described by this: {self.api_description}' - ) - # wait for user to put API key in secrets.toml - await sdk.ide.setFileOpen(await sdk.ide.getWorkspaceDirectory() + "/.dlt/secrets.toml") - await sdk.wait_for_user_confirmation("If this service requires an API key, please add it to the `secrets.toml` file and then press `Continue`") - return DictObservation(values={"source_name": source_name}) - - -class ValidatePipelineStep(Step): +class AddTransformStep(Step): hide: bool = True async def run(self, sdk: ContinueSDK): - source_name = sdk.history.last_observation().values["source_name"] + source_name = 'chess' filename = f'{source_name}.py' await sdk.run_step(MessageStep(message=dedent("""\ - This step will validate that your dlt pipeline is working as expected: - - Test that the API call works + This step will customize your resource function with a transform of your choice: + - Add a filter or map transformation depending on your request - Load the data into a local DuckDB instance - - Write a query to view the data + - Open up a Streamlit app for you to view the data """))) # test that the API call works @@ -86,19 +73,150 @@ class ValidatePipelineStep(Step): await sdk.run(f'python3 {filename}') table_name = f"{source_name}.{source_name}_resource" - tables_query_code = dedent(f'''\ - import duckdb - - # connect to DuckDB instance - conn = duckdb.connect(database="{source_name}.duckdb") - - # get table names - rows = conn.execute("SELECT * FROM {table_name};").fetchall() - - # print table names - for row in rows: - print(row) - ''') + examples = dedent(f"""\ + + Task: Use either the `add_map` or `add_filter` function to transform the data. + + Below you will find some docs page that will help you understand this task. + + # Customize resources + ## Filter, transform and pivot data + + You can attach any number of transformations that are evaluated on item per item basis to your resource. The available transformation types: + + - map - transform the data item (resource.add_map) + - filter - filter the data item (resource.add_filter) + - yield map - a map that returns iterator (so single row may generate many rows - resource.add_yield_map) + + Example: We have a resource that loads a list of users from an api endpoint. We want to customize it so: + + we remove users with user_id == "me" + we anonymize user data + Here's our resource: + ```python + import dlt + + @dlt.resource(write_disposition="replace") + def users(): + ... + users = requests.get(...) + ... + yield users + ``` + + Here's our script that defines transformations and loads the data. + ```python + from pipedrive import users + + def anonymize_user(user_data): + user_data["user_id"] = hash_str(user_data["user_id"]) + user_data["user_email"] = hash_str(user_data["user_email"]) + return user_data + + # add the filter and anonymize function to users resource and enumerate + for user in users().add_filter(lambda user: user["user_id"] != "me").add_map(anonymize_user): + print(user) + ``` + + Here is a more complex example of a filter transformation: + + # Renaming columns + ## Renaming columns by replacing the special characters + + In the example below, we create a dummy source with special characters in the name. We then write a function that we intend to apply to the resource to modify its output (i.e. replacing the German umlaut): replace_umlauts_in_dict_keys. + ```python + import dlt + + # create a dummy source with umlauts (special characters) in key names (um) + @dlt.source + def dummy_source(prefix: str = None): + @dlt.resource + def dummy_data(): + for _ in range(100): + yield {f'Objekt_{_}':{'Größe':_, 'Äquivalenzprüfung':True}} + return dummy_data(), + + def replace_umlauts_in_dict_keys(d): + # Replaces umlauts in dictionary keys with standard characters. + umlaut_map = {'ä': 'ae', 'ö': 'oe', 'ü': 'ue', 'ß': 'ss', 'Ä': 'Ae', 'Ö': 'Oe', 'Ü': 'Ue'} + result = {} + for k, v in d.items(): + new_key = ''.join(umlaut_map.get(c, c) for c in k) + if isinstance(v, dict): + result[new_key] = replace_umlauts_in_dict_keys(v) + else: + result[new_key] = v + return result + + # We can add the map function to the resource + + # 1. Create an instance of the source so you can edit it. + data_source = dummy_source() + + # 2. Modify this source instance's resource + data_source = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys) + + # 3. Inspect your result + for row in data_source: + print(row) + + # {'Objekt_0': {'Groesse': 0, 'Aequivalenzpruefung': True}} + # ... + ``` + + Here is a more complex example of a map transformation: + + # Pseudonymizing columns + ## Pseudonymizing (or anonymizing) columns by replacing the special characters + Pseudonymization is a deterministic way to hide personally identifiable info (PII), enabling us to consistently achieve the same mapping. If instead you wish to anonymize, you can delete the data, or replace it with a constant. In the example below, we create a dummy source with a PII column called "name", which we replace with deterministic hashes (i.e. replacing the German umlaut). + + ```python + import dlt + import hashlib + + @dlt.source + def dummy_source(prefix: str = None): + @dlt.resource + def dummy_data(): + for _ in range(3): + yield {'id':_, 'name': f'Jane Washington {_}'} + return dummy_data(), + + def pseudonymize_name(doc): + Pseudonmyisation is a deterministic type of PII-obscuring + Its role is to allow identifying users by their hash, without revealing the underlying info. + + # add a constant salt to generate + salt = 'WI@N57%zZrmk#88c' + salted_string = doc['name'] + salt + sh = hashlib.sha256() + sh.update(salted_string.encode()) + hashed_string = sh.digest().hex() + doc['name'] = hashed_string + return doc + + # run it as is + for row in dummy_source().dummy_data().add_map(pseudonymize_name): + print(row) + + #{'id': 0, 'name': '96259edb2b28b48bebce8278c550e99fbdc4a3fac8189e6b90f183ecff01c442'} + #{'id': 1, 'name': '92d3972b625cbd21f28782fb5c89552ce1aa09281892a2ab32aee8feeb3544a1'} + #{'id': 2, 'name': '443679926a7cff506a3b5d5d094dc7734861352b9e0791af5d39db5a7356d11a'} + + # Or create an instance of the data source, modify the resource and run the source. + + # 1. Create an instance of the source so you can edit it. + data_source = dummy_source() + # 2. Modify this source instance's resource + data_source = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys) + # 3. Inspect your result + for row in data_source: + print(row) + + pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data') + load_info = pipeline.run(data_source) + ``` + """) query_filename = (await sdk.ide.getWorkspaceDirectory()) + "/query.py" await sdk.apply_filesystem_edit(AddFile(filepath=query_filename, content=tables_query_code)) -- cgit v1.2.3-70-g09d2 From 0be241fe44edcc35c79835ac59971d60869d1c34 Mon Sep 17 00:00:00 2001 From: Ty Dunn Date: Tue, 6 Jun 2023 17:34:49 +0200 Subject: getting to first version --- .../recipes/AddTransformRecipe/README.md | 7 ++- .../continuedev/recipes/AddTransformRecipe/main.py | 12 ++-- .../recipes/AddTransformRecipe/steps.py | 70 ++++++++-------------- 3 files changed, 38 insertions(+), 51 deletions(-) diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md b/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md index 9ad49a5f..d735e0cd 100644 --- a/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/README.md @@ -1,3 +1,8 @@ # AddTransformRecipe -Uses the Chess.com API example to show how to add map and filter Python transforms to a dlt pipeline. \ No newline at end of file +Uses the Chess.com API example to show how to add map and filter Python transforms to a dlt pipeline. + +Background +- https://dlthub.com/docs/general-usage/resource#filter-transform-and-pivot-data +- https://dlthub.com/docs/customizations/customizing-pipelines/renaming_columns +- https://dlthub.com/docs/customizations/customizing-pipelines/pseudonymizing_columns \ No newline at end of file diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py index 0fd96930..2a0736dd 100644 --- a/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py @@ -14,11 +14,11 @@ class AddTransformRecipe(Step): await sdk.run_step( MessageStep(message=dedent("""\ This recipe will walk you through the process of adding a transform to a dlt pipeline that uses the chess.com API source. With the help of Continue, you will: - - X - - Y - - Z""")) >> - + - Set up a dlt pipeline for the chess.com API + - Add a filter or map transform to the pipeline + - Run the pipeline and view the transformed data in a Streamlit app + - """)) >> SetUpChessPipelineStep() >> - WaitForUserInputStep(prompt="How do you want to transform the chess.com API data before loading it? For example, you could use the `python-chess` library to decode the moves or filter out certain games") >> - AddTransformStep() + WaitForUserInputStep(prompt="How do you want to transform the Chess.com API data before loading it? For example, you could use the `python-chess` library to decode the moves or filter out certain games") >> + AddTransformStep(transform_description="Use the `python-chess` library to decode the moves in the game data") # Ask Nate how to not hardcode this here ) diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py index 8ab3eda1..46ddbed5 100644 --- a/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py @@ -8,24 +8,10 @@ from ...core.main import Step from ...core.sdk import ContinueSDK -""" -https://dlthub.com/docs/general-usage/resource#filter-transform-and-pivot-data - - -Example: https://dlthub.com/docs/customizations/customizing-pipelines/renaming_columns -- dlt init chess duckdb -- python chess.py -- write a transform function: ideas for transform functions: using chess Python library decode the moves OR filter out certain games -- use add_map or add_filter -- run python and streamlit app -""" - class SetUpChessPipelineStep(Step): hide: bool = True name: str = "Setup Chess.com API dlt Pipeline" - api_description: str # e.g. "I want to load data from the weatherapi.com API" - async def describe(self, models: Models): return dedent(f"""\ This step will create a new dlt pipeline that loads data from the chess.com API. @@ -49,6 +35,8 @@ class SetUpChessPipelineStep(Step): class AddTransformStep(Step): hide: bool = True + transform_description: str # e.g. "Use the `python-chess` library to decode the moves in the game data" + async def run(self, sdk: ContinueSDK): source_name = 'chess' filename = f'{source_name}.py' @@ -60,43 +48,29 @@ class AddTransformStep(Step): - Open up a Streamlit app for you to view the data """))) - # test that the API call works - await sdk.run(f'python3 {filename}') - - # remove exit() from the main main function - await sdk.edit_file( - filename=filename, - prompt='Remove exit() from the main function' - ) - - # load the data into the DuckDB instance - await sdk.run(f'python3 {filename}') + prompt = dedent(f"""\ + Task: Write a transform function using the description below and then use `add_map` or `add_filter` from the `dlt` library to attach it a resource. - table_name = f"{source_name}.{source_name}_resource" - examples = dedent(f"""\ - - Task: Use either the `add_map` or `add_filter` function to transform the data. + Description: {self.transform_description} - Below you will find some docs page that will help you understand this task. + Here are some docs pages that will help you better understand how to use `dlt`. # Customize resources ## Filter, transform and pivot data You can attach any number of transformations that are evaluated on item per item basis to your resource. The available transformation types: - - map - transform the data item (resource.add_map) - filter - filter the data item (resource.add_filter) - yield map - a map that returns iterator (so single row may generate many rows - resource.add_yield_map) - Example: We have a resource that loads a list of users from an api endpoint. We want to customize it so: - - we remove users with user_id == "me" - we anonymize user data + Example: We have a resource that loads a list of users from an api endpoint. We want to customize it so: + - we remove users with user_id == 'me' + - we anonymize user data Here's our resource: ```python import dlt - @dlt.resource(write_disposition="replace") + @dlt.resource(write_disposition='replace') def users(): ... users = requests.get(...) @@ -109,12 +83,12 @@ class AddTransformStep(Step): from pipedrive import users def anonymize_user(user_data): - user_data["user_id"] = hash_str(user_data["user_id"]) - user_data["user_email"] = hash_str(user_data["user_email"]) + user_data['user_id'] = hash_str(user_data['user_id']) + user_data['user_email'] = hash_str(user_data['user_email']) return user_data # add the filter and anonymize function to users resource and enumerate - for user in users().add_filter(lambda user: user["user_id"] != "me").add_map(anonymize_user): + for user in users().add_filter(lambda user: user['user_id'] != 'me').add_map(anonymize_user): print(user) ``` @@ -168,7 +142,7 @@ class AddTransformStep(Step): # Pseudonymizing columns ## Pseudonymizing (or anonymizing) columns by replacing the special characters - Pseudonymization is a deterministic way to hide personally identifiable info (PII), enabling us to consistently achieve the same mapping. If instead you wish to anonymize, you can delete the data, or replace it with a constant. In the example below, we create a dummy source with a PII column called "name", which we replace with deterministic hashes (i.e. replacing the German umlaut). + Pseudonymization is a deterministic way to hide personally identifiable info (PII), enabling us to consistently achieve the same mapping. If instead you wish to anonymize, you can delete the data, or replace it with a constant. In the example below, we create a dummy source with a PII column called 'name', which we replace with deterministic hashes (i.e. replacing the German umlaut). ```python import dlt @@ -216,8 +190,16 @@ class AddTransformStep(Step): pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data') load_info = pipeline.run(data_source) ``` - """) + """) + + # edit the pipeline to add a tranform function and attach it to a resource + await sdk.edit_file( + filename=filename, + prompt=prompt + ) + + # run the pipeline and load the data + await sdk.run(f'python3 {filename}') - query_filename = (await sdk.ide.getWorkspaceDirectory()) + "/query.py" - await sdk.apply_filesystem_edit(AddFile(filepath=query_filename, content=tables_query_code)) - await sdk.run('env/bin/python3 query.py') + # run a streamlit app to show the data + await sdk.run(f'dlt pipeline {source_name} show') \ No newline at end of file -- cgit v1.2.3-70-g09d2 From c84eae1885489ec7b07e0bb0eea1bac36f40c181 Mon Sep 17 00:00:00 2001 From: Nate Sesti Date: Wed, 7 Jun 2023 17:00:00 -0400 Subject: quick look over on transform recipe --- continuedev/src/continuedev/core/policy.py | 5 +- .../AddTransformRecipe/dlt_transform_docs.md | 135 ++++++++++++++++ .../continuedev/recipes/AddTransformRecipe/main.py | 13 +- .../recipes/AddTransformRecipe/steps.py | 173 +++------------------ .../src/continuedev/steps/steps_on_startup.py | 5 +- 5 files changed, 173 insertions(+), 158 deletions(-) create mode 100644 continuedev/src/continuedev/recipes/AddTransformRecipe/dlt_transform_docs.md diff --git a/continuedev/src/continuedev/core/policy.py b/continuedev/src/continuedev/core/policy.py index 8aea8de7..c3f1d188 100644 --- a/continuedev/src/continuedev/core/policy.py +++ b/continuedev/src/continuedev/core/policy.py @@ -3,6 +3,7 @@ from typing import List, Tuple, Type from ..steps.chroma import AnswerQuestionChroma, EditFileChroma, CreateCodebaseIndexChroma from ..steps.steps_on_startup import StepsOnStartupStep from ..recipes.CreatePipelineRecipe.main import CreatePipelineRecipe +from ..recipes.AddTransformRecipe.main import AddTransformRecipe from .main import Step, Validator, History, Policy from .observation import Observation, TracebackObservation, UserInputObservation from ..steps.main import EditHighlightedCodeStep, SolveTracebackStep, RunCodeStep, FasterEditHighlightedCodeStep, StarCoderEditHighlightedCodeStep, MessageStep, EmptyStep, SetupContinueWorkspaceStep @@ -28,8 +29,10 @@ class DemoPolicy(Policy): # This could be defined with ObservationTypePolicy. Ergonomics not right though. if "/pytest" in observation.user_input.lower(): return WritePytestsRecipe(instructions=observation.user_input) - elif "/dlt" in observation.user_input.lower() or " dlt" in observation.user_input.lower(): + elif "/dlt" in observation.user_input.lower(): return CreatePipelineRecipe() + elif "/transform" in observation.user_input.lower(): + return AddTransformRecipe() elif "/comment" in observation.user_input.lower(): return CommentCodeStep() elif "/ask" in observation.user_input: diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/dlt_transform_docs.md b/continuedev/src/continuedev/recipes/AddTransformRecipe/dlt_transform_docs.md new file mode 100644 index 00000000..658b285f --- /dev/null +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/dlt_transform_docs.md @@ -0,0 +1,135 @@ +# Customize resources +## Filter, transform and pivot data + +You can attach any number of transformations that are evaluated on item per item basis to your resource. The available transformation types: +- map - transform the data item (resource.add_map) +- filter - filter the data item (resource.add_filter) +- yield map - a map that returns iterator (so single row may generate many rows - resource.add_yield_map) + +Example: We have a resource that loads a list of users from an api endpoint. We want to customize it so: +- we remove users with user_id == 'me' +- we anonymize user data +Here's our resource: +```python +import dlt + +@dlt.resource(write_disposition='replace') +def users(): + ... + users = requests.get(...) + ... + yield users +``` + +Here's our script that defines transformations and loads the data. +```python +from pipedrive import users + +def anonymize_user(user_data): + user_data['user_id'] = hash_str(user_data['user_id']) + user_data['user_email'] = hash_str(user_data['user_email']) + return user_data + +# add the filter and anonymize function to users resource and enumerate +for user in users().add_filter(lambda user: user['user_id'] != 'me').add_map(anonymize_user): +print(user) +``` + +Here is a more complex example of a filter transformation: + + # Renaming columns + ## Renaming columns by replacing the special characters + + In the example below, we create a dummy source with special characters in the name. We then write a function that we intend to apply to the resource to modify its output (i.e. replacing the German umlaut): replace_umlauts_in_dict_keys. + ```python + import dlt + + # create a dummy source with umlauts (special characters) in key names (um) + @dlt.source + def dummy_source(prefix: str = None): + @dlt.resource + def dummy_data(): + for _ in range(100): + yield {f'Objekt_{_}':{'Größe':_, 'Äquivalenzprüfung':True}} + return dummy_data(), + + def replace_umlauts_in_dict_keys(d): + # Replaces umlauts in dictionary keys with standard characters. + umlaut_map = {'ä': 'ae', 'ö': 'oe', 'ü': 'ue', 'ß': 'ss', 'Ä': 'Ae', 'Ö': 'Oe', 'Ü': 'Ue'} + result = {} + for k, v in d.items(): + new_key = ''.join(umlaut_map.get(c, c) for c in k) + if isinstance(v, dict): + result[new_key] = replace_umlauts_in_dict_keys(v) + else: + result[new_key] = v + return result + + # We can add the map function to the resource + + # 1. Create an instance of the source so you can edit it. + data_source = dummy_source() + + # 2. Modify this source instance's resource + data_source = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys) + + # 3. Inspect your result + for row in data_source: + print(row) + + # {'Objekt_0': {'Groesse': 0, 'Aequivalenzpruefung': True}} + # ... + ``` + +Here is a more complex example of a map transformation: + +# Pseudonymizing columns +## Pseudonymizing (or anonymizing) columns by replacing the special characters +Pseudonymization is a deterministic way to hide personally identifiable info (PII), enabling us to consistently achieve the same mapping. If instead you wish to anonymize, you can delete the data, or replace it with a constant. In the example below, we create a dummy source with a PII column called 'name', which we replace with deterministic hashes (i.e. replacing the German umlaut). + +```python +import dlt +import hashlib + +@dlt.source +def dummy_source(prefix: str = None): + @dlt.resource + def dummy_data(): + for _ in range(3): + yield {'id':_, 'name': f'Jane Washington {_}'} + return dummy_data(), + +def pseudonymize_name(doc): + Pseudonmyisation is a deterministic type of PII-obscuring + Its role is to allow identifying users by their hash, without revealing the underlying info. + + # add a constant salt to generate + salt = 'WI@N57%zZrmk#88c' + salted_string = doc['name'] + salt + sh = hashlib.sha256() + sh.update(salted_string.encode()) + hashed_string = sh.digest().hex() + doc['name'] = hashed_string + return doc + + # run it as is + for row in dummy_source().dummy_data().add_map(pseudonymize_name): + print(row) + + #{'id': 0, 'name': '96259edb2b28b48bebce8278c550e99fbdc4a3fac8189e6b90f183ecff01c442'} + #{'id': 1, 'name': '92d3972b625cbd21f28782fb5c89552ce1aa09281892a2ab32aee8feeb3544a1'} + #{'id': 2, 'name': '443679926a7cff506a3b5d5d094dc7734861352b9e0791af5d39db5a7356d11a'} + + # Or create an instance of the data source, modify the resource and run the source. + + # 1. Create an instance of the source so you can edit it. + data_source = dummy_source() + # 2. Modify this source instance's resource + data_source = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys) + # 3. Inspect your result + for row in data_source: + print(row) + + pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data') + load_info = pipeline.run(data_source) +``` \ No newline at end of file diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py index 2a0736dd..5e05b587 100644 --- a/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py @@ -11,14 +11,17 @@ class AddTransformRecipe(Step): hide: bool = True async def run(self, sdk: ContinueSDK): - await sdk.run_step( + text_observation = await sdk.run_step( MessageStep(message=dedent("""\ This recipe will walk you through the process of adding a transform to a dlt pipeline that uses the chess.com API source. With the help of Continue, you will: - Set up a dlt pipeline for the chess.com API - Add a filter or map transform to the pipeline - - Run the pipeline and view the transformed data in a Streamlit app - - """)) >> + - Run the pipeline and view the transformed data in a Streamlit app"""), name="Add transformation to a dlt pipeline") >> SetUpChessPipelineStep() >> - WaitForUserInputStep(prompt="How do you want to transform the Chess.com API data before loading it? For example, you could use the `python-chess` library to decode the moves or filter out certain games") >> - AddTransformStep(transform_description="Use the `python-chess` library to decode the moves in the game data") # Ask Nate how to not hardcode this here + WaitForUserInputStep( + prompt="How do you want to transform the Chess.com API data before loading it? For example, you could use the `python-chess` library to decode the moves or filter out certain games") + ) + await sdk.run_step( + AddTransformStep( + transform_description=text_observation.text) ) diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py index 46ddbed5..f7f5a43b 100644 --- a/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py @@ -1,3 +1,4 @@ +import os from textwrap import dedent from ...steps.main import MessageStep @@ -7,35 +8,39 @@ from ...models.filesystem_edit import AddFile from ...core.main import Step from ...core.sdk import ContinueSDK +AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)" + class SetUpChessPipelineStep(Step): hide: bool = True name: str = "Setup Chess.com API dlt Pipeline" async def describe(self, models: Models): - return dedent(f"""\ - This step will create a new dlt pipeline that loads data from the chess.com API. - """) + return "This step will create a new dlt pipeline that loads data from the chess.com API." async def run(self, sdk: ContinueSDK): - filename = 'chess.py' - # running commands to get started when creating a new dlt pipeline await sdk.run([ 'python3 -m venv env', 'source env/bin/activate', 'pip install dlt', - 'dlt init chess duckdb', - 'Y', + 'dlt --non-interactive init chess duckdb', 'pip install -r requirements.txt' - ]) + ], name="Set up Python environment", description=dedent(f"""\ + Running the following commands: + - `python3 -m venv env`: Create a Python virtual environment + - `source env/bin/activate`: Activate the virtual environment + - `pip install dlt`: Install dlt + - `dlt init chess duckdb`: Create a new dlt pipeline called "chess" that loads data into a local DuckDB instance + - `pip install -r requirements.txt`: Install the Python dependencies for the pipeline""")) class AddTransformStep(Step): hide: bool = True - transform_description: str # e.g. "Use the `python-chess` library to decode the moves in the game data" + # e.g. "Use the `python-chess` library to decode the moves in the game data" + transform_description: str async def run(self, sdk: ContinueSDK): source_name = 'chess' @@ -45,8 +50,10 @@ class AddTransformStep(Step): This step will customize your resource function with a transform of your choice: - Add a filter or map transformation depending on your request - Load the data into a local DuckDB instance - - Open up a Streamlit app for you to view the data - """))) + - Open up a Streamlit app for you to view the data"""), name="Write transformation function")) + + with open(os.path.join(os.path.dirname(__file__), 'dlt_transform_docs.md')) as f: + dlt_transform_docs = f.read() prompt = dedent(f"""\ Task: Write a transform function using the description below and then use `add_map` or `add_filter` from the `dlt` library to attach it a resource. @@ -55,151 +62,17 @@ class AddTransformStep(Step): Here are some docs pages that will help you better understand how to use `dlt`. - # Customize resources - ## Filter, transform and pivot data - - You can attach any number of transformations that are evaluated on item per item basis to your resource. The available transformation types: - - map - transform the data item (resource.add_map) - - filter - filter the data item (resource.add_filter) - - yield map - a map that returns iterator (so single row may generate many rows - resource.add_yield_map) - - Example: We have a resource that loads a list of users from an api endpoint. We want to customize it so: - - we remove users with user_id == 'me' - - we anonymize user data - Here's our resource: - ```python - import dlt - - @dlt.resource(write_disposition='replace') - def users(): - ... - users = requests.get(...) - ... - yield users - ``` - - Here's our script that defines transformations and loads the data. - ```python - from pipedrive import users - - def anonymize_user(user_data): - user_data['user_id'] = hash_str(user_data['user_id']) - user_data['user_email'] = hash_str(user_data['user_email']) - return user_data - - # add the filter and anonymize function to users resource and enumerate - for user in users().add_filter(lambda user: user['user_id'] != 'me').add_map(anonymize_user): - print(user) - ``` - - Here is a more complex example of a filter transformation: - - # Renaming columns - ## Renaming columns by replacing the special characters - - In the example below, we create a dummy source with special characters in the name. We then write a function that we intend to apply to the resource to modify its output (i.e. replacing the German umlaut): replace_umlauts_in_dict_keys. - ```python - import dlt - - # create a dummy source with umlauts (special characters) in key names (um) - @dlt.source - def dummy_source(prefix: str = None): - @dlt.resource - def dummy_data(): - for _ in range(100): - yield {f'Objekt_{_}':{'Größe':_, 'Äquivalenzprüfung':True}} - return dummy_data(), - - def replace_umlauts_in_dict_keys(d): - # Replaces umlauts in dictionary keys with standard characters. - umlaut_map = {'ä': 'ae', 'ö': 'oe', 'ü': 'ue', 'ß': 'ss', 'Ä': 'Ae', 'Ö': 'Oe', 'Ü': 'Ue'} - result = {} - for k, v in d.items(): - new_key = ''.join(umlaut_map.get(c, c) for c in k) - if isinstance(v, dict): - result[new_key] = replace_umlauts_in_dict_keys(v) - else: - result[new_key] = v - return result - - # We can add the map function to the resource - - # 1. Create an instance of the source so you can edit it. - data_source = dummy_source() - - # 2. Modify this source instance's resource - data_source = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys) - - # 3. Inspect your result - for row in data_source: - print(row) - - # {'Objekt_0': {'Groesse': 0, 'Aequivalenzpruefung': True}} - # ... - ``` - - Here is a more complex example of a map transformation: - - # Pseudonymizing columns - ## Pseudonymizing (or anonymizing) columns by replacing the special characters - Pseudonymization is a deterministic way to hide personally identifiable info (PII), enabling us to consistently achieve the same mapping. If instead you wish to anonymize, you can delete the data, or replace it with a constant. In the example below, we create a dummy source with a PII column called 'name', which we replace with deterministic hashes (i.e. replacing the German umlaut). - - ```python - import dlt - import hashlib - - @dlt.source - def dummy_source(prefix: str = None): - @dlt.resource - def dummy_data(): - for _ in range(3): - yield {'id':_, 'name': f'Jane Washington {_}'} - return dummy_data(), - - def pseudonymize_name(doc): - Pseudonmyisation is a deterministic type of PII-obscuring - Its role is to allow identifying users by their hash, without revealing the underlying info. - - # add a constant salt to generate - salt = 'WI@N57%zZrmk#88c' - salted_string = doc['name'] + salt - sh = hashlib.sha256() - sh.update(salted_string.encode()) - hashed_string = sh.digest().hex() - doc['name'] = hashed_string - return doc - - # run it as is - for row in dummy_source().dummy_data().add_map(pseudonymize_name): - print(row) - - #{'id': 0, 'name': '96259edb2b28b48bebce8278c550e99fbdc4a3fac8189e6b90f183ecff01c442'} - #{'id': 1, 'name': '92d3972b625cbd21f28782fb5c89552ce1aa09281892a2ab32aee8feeb3544a1'} - #{'id': 2, 'name': '443679926a7cff506a3b5d5d094dc7734861352b9e0791af5d39db5a7356d11a'} - - # Or create an instance of the data source, modify the resource and run the source. - - # 1. Create an instance of the source so you can edit it. - data_source = dummy_source() - # 2. Modify this source instance's resource - data_source = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys) - # 3. Inspect your result - for row in data_source: - print(row) - - pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data') - load_info = pipeline.run(data_source) - ``` - """) + {dlt_transform_docs}""") # edit the pipeline to add a tranform function and attach it to a resource await sdk.edit_file( filename=filename, - prompt=prompt + prompt=prompt, + name=f"Writing transform function {AI_ASSISTED_STRING}" ) # run the pipeline and load the data - await sdk.run(f'python3 {filename}') + await sdk.run(f'python3 {filename}', name="Run the pipeline", description=f"Running `python3 {filename}` to load the data into a local DuckDB instance") # run a streamlit app to show the data - await sdk.run(f'dlt pipeline {source_name} show') \ No newline at end of file + await sdk.run(f'dlt pipeline {source_name} show', name="Show data in a Streamlit app", description=f"Running `dlt pipeline {source_name} show` to show the data in a Streamlit app, where you can view and play with the data.") diff --git a/continuedev/src/continuedev/steps/steps_on_startup.py b/continuedev/src/continuedev/steps/steps_on_startup.py index cd40ff56..63dedd82 100644 --- a/continuedev/src/continuedev/steps/steps_on_startup.py +++ b/continuedev/src/continuedev/steps/steps_on_startup.py @@ -1,11 +1,12 @@ from ..core.main import ContinueSDK, Models, Step from .main import UserInputStep from ..recipes.CreatePipelineRecipe.main import CreatePipelineRecipe - +from ..recipes.AddTransformRecipe.main import AddTransformRecipe step_name_to_step_class = { "UserInputStep": UserInputStep, - "CreatePipelineRecipe": CreatePipelineRecipe + "CreatePipelineRecipe": CreatePipelineRecipe, + "AddTransformRecipe": AddTransformRecipe } -- cgit v1.2.3-70-g09d2 From 7db0178fb8de64731de403e3916e6fb303ee64d3 Mon Sep 17 00:00:00 2001 From: Nate Sesti Date: Fri, 9 Jun 2023 14:35:21 -0400 Subject: touching up transform recipe, chat context --- continuedev/src/continuedev/core/abstract_sdk.py | 12 ++++++-- continuedev/src/continuedev/core/autopilot.py | 1 + continuedev/src/continuedev/core/main.py | 24 ++++++++++++++- continuedev/src/continuedev/core/policy.py | 3 +- continuedev/src/continuedev/core/sdk.py | 10 +++++- continuedev/src/continuedev/libs/llm/__init__.py | 6 ++-- .../src/continuedev/libs/llm/hf_inference_api.py | 4 ++- continuedev/src/continuedev/libs/llm/openai.py | 15 +++++---- .../continuedev/recipes/AddTransformRecipe/main.py | 2 +- .../recipes/AddTransformRecipe/steps.py | 19 +++++++++--- .../recipes/CreatePipelineRecipe/main.py | 2 +- .../recipes/CreatePipelineRecipe/steps.py | 4 +-- continuedev/src/continuedev/server/ide.py | 2 +- continuedev/src/continuedev/steps/core/core.py | 36 ++++++++++++++++++++-- continuedev/src/continuedev/steps/main.py | 13 +------- docs/docs/walkthroughs/create-a-recipe.md | 4 +-- 16 files changed, 117 insertions(+), 40 deletions(-) diff --git a/continuedev/src/continuedev/core/abstract_sdk.py b/continuedev/src/continuedev/core/abstract_sdk.py index 1c800875..417971cd 100644 --- a/continuedev/src/continuedev/core/abstract_sdk.py +++ b/continuedev/src/continuedev/core/abstract_sdk.py @@ -1,10 +1,10 @@ -from abc import ABC, abstractmethod +from abc import ABC, abstractmethod, abstractproperty from typing import Coroutine, List, Union from .config import ContinueConfig from ..models.filesystem_edit import FileSystemEdit from .observation import Observation -from .main import History, Step +from .main import ChatMessage, History, Step, ChatMessageRole """ @@ -83,3 +83,11 @@ class AbstractContinueSDK(ABC): @abstractmethod def set_loading_message(self, message: str): pass + + @abstractmethod + def add_chat_context(self, content: str, role: ChatMessageRole = "assistent"): + pass + + @abstractproperty + def chat_context(self) -> List[ChatMessage]: + pass diff --git a/continuedev/src/continuedev/core/autopilot.py b/continuedev/src/continuedev/core/autopilot.py index b82e1fef..c979d53a 100644 --- a/continuedev/src/continuedev/core/autopilot.py +++ b/continuedev/src/continuedev/core/autopilot.py @@ -35,6 +35,7 @@ class Autopilot(ContinueBaseModel): class Config: arbitrary_types_allowed = True + keep_untouched = (cached_property,) def get_full_state(self) -> FullState: return FullState(history=self.history, active=self._active, user_input_queue=self._main_user_input_queue) diff --git a/continuedev/src/continuedev/core/main.py b/continuedev/src/continuedev/core/main.py index 37d80de3..19b36a6a 100644 --- a/continuedev/src/continuedev/core/main.py +++ b/continuedev/src/continuedev/core/main.py @@ -1,10 +1,18 @@ -from typing import Callable, Coroutine, Dict, Generator, List, Tuple, Union +from textwrap import dedent +from typing import Callable, Coroutine, Dict, Generator, List, Literal, Tuple, Union from ..models.main import ContinueBaseModel from pydantic import validator from ..libs.llm import LLM from .observation import Observation +ChatMessageRole = Literal["assistant", "user", "system"] + + +class ChatMessage(ContinueBaseModel): + role: ChatMessageRole + content: str + class HistoryNode(ContinueBaseModel): """A point in history, a list of which make up History""" @@ -12,12 +20,25 @@ class HistoryNode(ContinueBaseModel): observation: Union[Observation, None] depth: int + def to_chat_messages(self) -> List[ChatMessage]: + return self.step.chat_context + [ChatMessage(role="assistant", content=self.step.description)] + class History(ContinueBaseModel): """A history of steps taken and their results""" timeline: List[HistoryNode] current_index: int + def to_chat_history(self) -> List[ChatMessage]: + msgs = [] + for node in self.timeline: + if not node.step.hide: + msgs += [ + ChatMessage(role="assistant", content=msg) + for msg in node.to_chat_messages() + ] + return msgs + def add_node(self, node: HistoryNode): self.timeline.insert(self.current_index + 1, node) self.current_index += 1 @@ -113,6 +134,7 @@ class Step(ContinueBaseModel): description: Union[str, None] = None system_message: Union[str, None] = None + chat_context: List[ChatMessage] = [] class Config: copy_on_model_validation = False diff --git a/continuedev/src/continuedev/core/policy.py b/continuedev/src/continuedev/core/policy.py index c3f1d188..7661f0c4 100644 --- a/continuedev/src/continuedev/core/policy.py +++ b/continuedev/src/continuedev/core/policy.py @@ -6,10 +6,11 @@ from ..recipes.CreatePipelineRecipe.main import CreatePipelineRecipe from ..recipes.AddTransformRecipe.main import AddTransformRecipe from .main import Step, Validator, History, Policy from .observation import Observation, TracebackObservation, UserInputObservation -from ..steps.main import EditHighlightedCodeStep, SolveTracebackStep, RunCodeStep, FasterEditHighlightedCodeStep, StarCoderEditHighlightedCodeStep, MessageStep, EmptyStep, SetupContinueWorkspaceStep +from ..steps.main import EditHighlightedCodeStep, SolveTracebackStep, RunCodeStep, FasterEditHighlightedCodeStep, StarCoderEditHighlightedCodeStep, EmptyStep, SetupContinueWorkspaceStep from ..recipes.WritePytestsRecipe.main import WritePytestsRecipe from ..recipes.ContinueRecipeRecipe.main import ContinueStepStep from ..steps.comment_code import CommentCodeStep +from ..steps.core.core import MessageStep class DemoPolicy(Policy): diff --git a/continuedev/src/continuedev/core/sdk.py b/continuedev/src/continuedev/core/sdk.py index ea90a13a..11127361 100644 --- a/continuedev/src/continuedev/core/sdk.py +++ b/continuedev/src/continuedev/core/sdk.py @@ -14,7 +14,7 @@ from ..libs.llm.hf_inference_api import HuggingFaceInferenceAPI from ..libs.llm.openai import OpenAI from .observation import Observation from ..server.ide_protocol import AbstractIdeProtocolServer -from .main import Context, ContinueCustomException, History, Step +from .main import Context, ContinueCustomException, History, Step, ChatMessage, ChatMessageRole from ..steps.core.core import * @@ -136,3 +136,11 @@ class ContinueSDK(AbstractContinueSDK): def raise_exception(self, message: str, title: str, with_step: Union[Step, None] = None): raise ContinueCustomException(message, title, with_step) + + def add_chat_context(self, content: str, role: ChatMessageRole = "assistent"): + self.history.timeline[self.history.current_index].step.chat_context.append( + ChatMessage(content=content, role=role)) + + @property + def chat_context(self) -> List[ChatMessage]: + return self.history.to_chat_history() diff --git a/continuedev/src/continuedev/libs/llm/__init__.py b/continuedev/src/continuedev/libs/llm/__init__.py index 6bae2222..24fd34be 100644 --- a/continuedev/src/continuedev/libs/llm/__init__.py +++ b/continuedev/src/continuedev/libs/llm/__init__.py @@ -1,4 +1,6 @@ -from typing import Union +from typing import List, Union + +from ...core.main import ChatMessage from ...models.main import AbstractModel from pydantic import BaseModel @@ -6,7 +8,7 @@ from pydantic import BaseModel class LLM(BaseModel): system_message: Union[str, None] = None - def complete(self, prompt: str, **kwargs): + def complete(self, prompt: str, with_history: List[ChatMessage] = [], **kwargs): """Return the completion of the text with the given temperature.""" raise diff --git a/continuedev/src/continuedev/libs/llm/hf_inference_api.py b/continuedev/src/continuedev/libs/llm/hf_inference_api.py index 734da160..1586c620 100644 --- a/continuedev/src/continuedev/libs/llm/hf_inference_api.py +++ b/continuedev/src/continuedev/libs/llm/hf_inference_api.py @@ -1,3 +1,5 @@ +from typing import List +from ...core.main import ChatMessage from ..llm import LLM import requests @@ -9,7 +11,7 @@ class HuggingFaceInferenceAPI(LLM): api_key: str model: str = "bigcode/starcoder" - def complete(self, prompt: str, **kwargs): + def complete(self, prompt: str, with_history: List[ChatMessage] = [], **kwargs): """Return the completion of the text with the given temperature.""" API_URL = f"https://api-inference.huggingface.co/models/{self.model}" headers = { diff --git a/continuedev/src/continuedev/libs/llm/openai.py b/continuedev/src/continuedev/libs/llm/openai.py index 10801465..da8c5caf 100644 --- a/continuedev/src/continuedev/libs/llm/openai.py +++ b/continuedev/src/continuedev/libs/llm/openai.py @@ -1,6 +1,7 @@ import asyncio import time from typing import Any, Dict, Generator, List, Union +from ...core.main import ChatMessage import openai import aiohttp from ..llm import LLM @@ -62,7 +63,7 @@ class OpenAI(LLM): for chunk in generator: yield chunk.choices[0].text - def complete(self, prompt: str, **kwargs) -> str: + def complete(self, prompt: str, with_history: List[ChatMessage] = [], **kwargs) -> str: t1 = time.time() self.completion_count += 1 @@ -70,15 +71,17 @@ class OpenAI(LLM): "frequency_penalty": 0, "presence_penalty": 0, "stream": False} | kwargs if args["model"] == "gpt-3.5-turbo": - messages = [{ - "role": "user", - "content": prompt - }] + messages = [] if self.system_message: - messages.insert(0, { + messages.append({ "role": "system", "content": self.system_message }) + message += [msg.dict() for msg in with_history] + messages.append({ + "role": "user", + "content": prompt + }) resp = openai.ChatCompletion.create( messages=messages, **args, diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py index 5e05b587..e9a998e3 100644 --- a/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/main.py @@ -3,7 +3,7 @@ from textwrap import dedent from ...core.main import Step from ...core.sdk import ContinueSDK from ...steps.core.core import WaitForUserInputStep -from ...steps.main import MessageStep +from ...steps.core.core import MessageStep from .steps import SetUpChessPipelineStep, AddTransformStep diff --git a/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py index f7f5a43b..7bb0fc23 100644 --- a/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py +++ b/continuedev/src/continuedev/recipes/AddTransformRecipe/steps.py @@ -1,7 +1,9 @@ import os from textwrap import dedent -from ...steps.main import MessageStep +from ...models.main import Range +from ...models.filesystem import RangeInFile +from ...steps.core.core import MessageStep from ...core.sdk import Models from ...core.observation import DictObservation from ...models.filesystem_edit import AddFile @@ -26,7 +28,8 @@ class SetUpChessPipelineStep(Step): 'source env/bin/activate', 'pip install dlt', 'dlt --non-interactive init chess duckdb', - 'pip install -r requirements.txt' + 'pip install -r requirements.txt', + 'pip install pandas streamlit' # Needed for the pipeline show step later ], name="Set up Python environment", description=dedent(f"""\ Running the following commands: - `python3 -m venv env`: Create a Python virtual environment @@ -44,7 +47,8 @@ class AddTransformStep(Step): async def run(self, sdk: ContinueSDK): source_name = 'chess' - filename = f'{source_name}.py' + filename = f'{source_name}_pipeline.py' + abs_filepath = os.path.join(sdk.ide.workspace_directory, filename) await sdk.run_step(MessageStep(message=dedent("""\ This step will customize your resource function with a transform of your choice: @@ -52,6 +56,13 @@ class AddTransformStep(Step): - Load the data into a local DuckDB instance - Open up a Streamlit app for you to view the data"""), name="Write transformation function")) + # Open the file and highlight the function to be edited + await sdk.ide.setFileOpen(abs_filepath) + await sdk.ide.highlightCode(range_in_file=RangeInFile( + filepath=abs_filepath, + range=Range.from_shorthand(47, 0, 51, 0) + )) + with open(os.path.join(os.path.dirname(__file__), 'dlt_transform_docs.md')) as f: dlt_transform_docs = f.read() @@ -75,4 +86,4 @@ class AddTransformStep(Step): await sdk.run(f'python3 {filename}', name="Run the pipeline", description=f"Running `python3 {filename}` to load the data into a local DuckDB instance") # run a streamlit app to show the data - await sdk.run(f'dlt pipeline {source_name} show', name="Show data in a Streamlit app", description=f"Running `dlt pipeline {source_name} show` to show the data in a Streamlit app, where you can view and play with the data.") + await sdk.run(f'dlt pipeline {source_name}_pipeline show', name="Show data in a Streamlit app", description=f"Running `dlt pipeline {source_name} show` to show the data in a Streamlit app, where you can view and play with the data.") diff --git a/continuedev/src/continuedev/recipes/CreatePipelineRecipe/main.py b/continuedev/src/continuedev/recipes/CreatePipelineRecipe/main.py index 39e1ba42..818168ba 100644 --- a/continuedev/src/continuedev/recipes/CreatePipelineRecipe/main.py +++ b/continuedev/src/continuedev/recipes/CreatePipelineRecipe/main.py @@ -3,7 +3,7 @@ from textwrap import dedent from ...core.main import Step from ...core.sdk import ContinueSDK from ...steps.core.core import WaitForUserInputStep -from ...steps.main import MessageStep +from ...steps.core.core import MessageStep from .steps import SetupPipelineStep, ValidatePipelineStep, RunQueryStep diff --git a/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py b/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py index 3b9a8c85..ea40a058 100644 --- a/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py +++ b/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py @@ -5,7 +5,7 @@ import time from ...models.main import Range from ...models.filesystem import RangeInFile -from ...steps.main import MessageStep +from ...steps.core.core import MessageStep from ...core.sdk import Models from ...core.observation import DictObservation, InternalErrorObservation from ...models.filesystem_edit import AddFile, FileEdit @@ -51,7 +51,7 @@ class SetupPipelineStep(Step): # editing the resource function to call the requested API resource_function_range = Range.from_shorthand(15, 0, 29, 0) - await sdk.ide.highlightCode(RangeInFile(filepath=os.path.join(await sdk.ide.getWorkspaceDirectory(), filename), range=resource_function_range), "#00ff0022") + await sdk.ide.highlightCode(RangeInFile(filepath=os.path.join(await sdk.ide.getWorkspaceDirectory(), filename), range=resource_function_range)) # sdk.set_loading_message("Writing code to call the API...") await sdk.edit_file( diff --git a/continuedev/src/continuedev/server/ide.py b/continuedev/src/continuedev/server/ide.py index 5826f15f..f4ea1071 100644 --- a/continuedev/src/continuedev/server/ide.py +++ b/continuedev/src/continuedev/server/ide.py @@ -138,7 +138,7 @@ class IdeProtocolServer(AbstractIdeProtocolServer): "sessionId": session_id }) - async def highlightCode(self, range_in_file: RangeInFile, color: str): + async def highlightCode(self, range_in_file: RangeInFile, color: str = "#00ff0022"): await self._send_json("highlightCode", { "rangeInFile": range_in_file.dict(), "color": color diff --git a/continuedev/src/continuedev/steps/core/core.py b/continuedev/src/continuedev/steps/core/core.py index dfd765eb..5117d479 100644 --- a/continuedev/src/continuedev/steps/core/core.py +++ b/continuedev/src/continuedev/steps/core/core.py @@ -1,4 +1,5 @@ # These steps are depended upon by ContinueSDK +import os import subprocess from textwrap import dedent from typing import Coroutine, List, Union @@ -23,6 +24,17 @@ class ReversibleStep(Step): raise NotImplementedError +class MessageStep(Step): + name: str = "Message" + message: str + + async def describe(self, models: Models) -> Coroutine[str, None, None]: + return self.message + + async def run(self, sdk: ContinueSDK) -> Coroutine[Observation, None, None]: + return TextObservation(text=self.message) + + class FileSystemEditStep(ReversibleStep): edit: FileSystemEdit _diff: Union[EditDiff, None] = None @@ -38,6 +50,13 @@ class FileSystemEditStep(ReversibleStep): # Where and when should file saves happen? +def output_contains_error(output: str) -> bool: + return "Traceback" in output or "SyntaxError" in output + + +AI_ASSISTED_STRING = "(✨ AI-Assisted ✨)" + + class ShellCommandsStep(Step): cmds: List[str] cwd: Union[str, None] = None @@ -50,13 +69,26 @@ class ShellCommandsStep(Step): return f"Error when running shell commands:\n```\n{self._err_text}\n```" cmds_str = "\n".join(self.cmds) - return (await models.gpt35()).complete(f"{cmds_str}\n\nSummarize what was done in these shell commands, using markdown bullet points:") + return models.gpt35.complete(f"{cmds_str}\n\nSummarize what was done in these shell commands, using markdown bullet points:") async def run(self, sdk: ContinueSDK) -> Coroutine[Observation, None, None]: cwd = await sdk.ide.getWorkspaceDirectory() if self.cwd is None else self.cwd for cmd in self.cmds: output = await sdk.ide.runCommand(cmd) + if output is not None and output_contains_error(output): + suggestion = sdk.models.gpt35.complete(dedent(f"""\ + While running the command `{cmd}`, the following error occurred: + + ```ascii + {output} + ``` + + This is a brief summary of the error followed by a suggestion on how it can be fixed:"""), with_context=sdk.chat_context) + + sdk.raise_exception( + title="Error while running query", message=output, with_step=MessageStep(name=f"Suggestion to solve error {AI_ASSISTED_STRING}", message=suggestion) + ) return TextObservation(text=output) @@ -116,7 +148,7 @@ class Gpt35EditCodeStep(Step): _prompt_and_completion: str = "" async def describe(self, models: Models) -> Coroutine[str, None, None]: - return (await models.gpt35()).complete(f"{self._prompt_and_completion}\n\nPlease give brief a description of the changes made above using markdown bullet points:") + return models.gpt35.complete(f"{self._prompt_and_completion}\n\nPlease give brief a description of the changes made above using markdown bullet points:") async def run(self, sdk: ContinueSDK) -> Coroutine[Observation, None, None]: rif_with_contents = [] diff --git a/continuedev/src/continuedev/steps/main.py b/continuedev/src/continuedev/steps/main.py index 81a1e3a9..24335b4f 100644 --- a/continuedev/src/continuedev/steps/main.py +++ b/continuedev/src/continuedev/steps/main.py @@ -212,7 +212,7 @@ class StarCoderEditHighlightedCodeStep(Step): _prompt_and_completion: str = "" async def describe(self, models: Models) -> Coroutine[str, None, None]: - return (await models.gpt35()).complete(f"{self._prompt_and_completion}\n\nPlease give brief a description of the changes made above using markdown bullet points:") + return models.gpt35.complete(f"{self._prompt_and_completion}\n\nPlease give brief a description of the changes made above using markdown bullet points:") async def run(self, sdk: ContinueSDK) -> Coroutine[Observation, None, None]: range_in_files = await sdk.ide.getHighlightedCode() @@ -317,17 +317,6 @@ class SolveTracebackStep(Step): return None -class MessageStep(Step): - name: str = "Message" - message: str - - async def describe(self, models: Models) -> Coroutine[str, None, None]: - return self.message - - async def run(self, sdk: ContinueSDK) -> Coroutine[Observation, None, None]: - return TextObservation(text=self.message) - - class EmptyStep(Step): hide: bool = True diff --git a/docs/docs/walkthroughs/create-a-recipe.md b/docs/docs/walkthroughs/create-a-recipe.md index 60bfe9a8..3b80df8a 100644 --- a/docs/docs/walkthroughs/create-a-recipe.md +++ b/docs/docs/walkthroughs/create-a-recipe.md @@ -17,8 +17,6 @@ continue/continuedev/src/continuedev/recipes ## 1. Create a step - - ### a. Start by creating a subclass of Step You should first consider what will be the parameters of your recipe. These are defined as attributes in the step, as with `input_file_path: str` below @@ -33,7 +31,7 @@ If you'd like to override the default description of your steps, which is just t - Return a static string - Store state in a class attribute (prepend with a double underscore, which signifies (through Pydantic) that this is not a parameter for the Step, just internal state) during the run method, and then grab this in the describe method. -- Use state in conjunction with the `models` parameter of the describe method to autogenerate a description with a language model. For example, if you'd used an attribute called `__code_written` to store a string representing some code that was written, you could implement describe as `return (await models.gpt35()).complete(f"{self.\_\_code_written}\n\nSummarize the changes made in the above code.")`. +- Use state in conjunction with the `models` parameter of the describe method to autogenerate a description with a language model. For example, if you'd used an attribute called `__code_written` to store a string representing some code that was written, you could implement describe as `return models.gpt35.complete(f"{self.\_\_code_written}\n\nSummarize the changes made in the above code.")`. ## 2. Compose steps together into a complete recipe -- cgit v1.2.3-70-g09d2 From 8bc43221b32fda1bffa6157ab335b48d0c605973 Mon Sep 17 00:00:00 2001 From: Nate Sesti Date: Fri, 9 Jun 2023 23:00:46 -0400 Subject: cleaning up transform pipeline --- continuedev/src/continuedev/core/main.py | 8 +++----- continuedev/src/continuedev/core/sdk.py | 4 ++-- continuedev/src/continuedev/libs/llm/openai.py | 2 +- continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py | 4 ++-- continuedev/src/continuedev/steps/core/core.py | 3 ++- 5 files changed, 10 insertions(+), 11 deletions(-) diff --git a/continuedev/src/continuedev/core/main.py b/continuedev/src/continuedev/core/main.py index 19b36a6a..3053e5a1 100644 --- a/continuedev/src/continuedev/core/main.py +++ b/continuedev/src/continuedev/core/main.py @@ -3,7 +3,6 @@ from typing import Callable, Coroutine, Dict, Generator, List, Literal, Tuple, U from ..models.main import ContinueBaseModel from pydantic import validator -from ..libs.llm import LLM from .observation import Observation ChatMessageRole = Literal["assistant", "user", "system"] @@ -21,6 +20,8 @@ class HistoryNode(ContinueBaseModel): depth: int def to_chat_messages(self) -> List[ChatMessage]: + if self.step.description is None: + return self.step.chat_context return self.step.chat_context + [ChatMessage(role="assistant", content=self.step.description)] @@ -33,10 +34,7 @@ class History(ContinueBaseModel): msgs = [] for node in self.timeline: if not node.step.hide: - msgs += [ - ChatMessage(role="assistant", content=msg) - for msg in node.to_chat_messages() - ] + msgs += node.to_chat_messages() return msgs def add_node(self, node: HistoryNode): diff --git a/continuedev/src/continuedev/core/sdk.py b/continuedev/src/continuedev/core/sdk.py index 11127361..59bfc0f2 100644 --- a/continuedev/src/continuedev/core/sdk.py +++ b/continuedev/src/continuedev/core/sdk.py @@ -77,9 +77,9 @@ class ContinueSDK(AbstractContinueSDK): async def wait_for_user_confirmation(self, prompt: str): return await self.run_step(WaitForUserConfirmationStep(prompt=prompt)) - async def run(self, commands: Union[List[str], str], cwd: str = None, name: str = None, description: str = None) -> Coroutine[str, None, None]: + async def run(self, commands: Union[List[str], str], cwd: str = None, name: str = None, description: str = None, handle_error: bool = True) -> Coroutine[str, None, None]: commands = commands if isinstance(commands, List) else [commands] - return (await self.run_step(ShellCommandsStep(cmds=commands, cwd=cwd, description=description, **({'name': name} if name else {})))).text + return (await self.run_step(ShellCommandsStep(cmds=commands, cwd=cwd, description=description, handle_error=handle_error, **({'name': name} if name else {})))).text async def edit_file(self, filename: str, prompt: str, name: str = None, description: str = None, range: Range = None): filepath = await self._ensure_absolute_path(filename) diff --git a/continuedev/src/continuedev/libs/llm/openai.py b/continuedev/src/continuedev/libs/llm/openai.py index da8c5caf..6a537afd 100644 --- a/continuedev/src/continuedev/libs/llm/openai.py +++ b/continuedev/src/continuedev/libs/llm/openai.py @@ -77,7 +77,7 @@ class OpenAI(LLM): "role": "system", "content": self.system_message }) - message += [msg.dict() for msg in with_history] + messages += [msg.dict() for msg in with_history] messages.append({ "role": "user", "content": prompt diff --git a/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py b/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py index ea40a058..e59cc51c 100644 --- a/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py +++ b/continuedev/src/continuedev/recipes/CreatePipelineRecipe/steps.py @@ -86,7 +86,7 @@ class ValidatePipelineStep(Step): # """))) # test that the API call works - output = await sdk.run(f'python3 {filename}', name="Test the pipeline", description=f"Running `python3 {filename}` to test loading data from the API") + output = await sdk.run(f'python3 {filename}', name="Test the pipeline", description=f"Running `python3 {filename}` to test loading data from the API", handle_error=False) # If it fails, return the error if "Traceback" in output or "SyntaxError" in output: @@ -157,7 +157,7 @@ class RunQueryStep(Step): hide: bool = True async def run(self, sdk: ContinueSDK): - output = await sdk.run('env/bin/python3 query.py', name="Run test query", description="Running `env/bin/python3 query.py` to test that the data was loaded into DuckDB as expected") + output = await sdk.run('env/bin/python3 query.py', name="Run test query", description="Running `env/bin/python3 query.py` to test that the data was loaded into DuckDB as expected", handle_error=False) if "Traceback" in output or "SyntaxError" in output: suggestion = sdk.models.gpt35.complete(dedent(f"""\ diff --git a/continuedev/src/continuedev/steps/core/core.py b/continuedev/src/continuedev/steps/core/core.py index 5117d479..40e992e7 100644 --- a/continuedev/src/continuedev/steps/core/core.py +++ b/continuedev/src/continuedev/steps/core/core.py @@ -61,6 +61,7 @@ class ShellCommandsStep(Step): cmds: List[str] cwd: Union[str, None] = None name: str = "Run Shell Commands" + handle_error: bool = True _err_text: Union[str, None] = None @@ -76,7 +77,7 @@ class ShellCommandsStep(Step): for cmd in self.cmds: output = await sdk.ide.runCommand(cmd) - if output is not None and output_contains_error(output): + if self.handle_error and output is not None and output_contains_error(output): suggestion = sdk.models.gpt35.complete(dedent(f"""\ While running the command `{cmd}`, the following error occurred: -- cgit v1.2.3-70-g09d2