blob: eae8b55804206105ce7c6ee177aa35e9ef4e6ae6 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
from ..core.main import ContinueSDK, Models, Step
from .main import UserInputStep
from ..recipes.CreatePipelineRecipe.main import CreatePipelineRecipe
from ..recipes.DDtoBQRecipe.main import DDtoBQRecipe
from ..recipes.DeployPipelineAirflowRecipe.main import DeployPipelineAirflowRecipe
from ..recipes.DDtoBQRecipe.main import DDtoBQRecipe
from ..recipes.AddTransformRecipe.main import AddTransformRecipe
step_name_to_step_class = {
"UserInputStep": UserInputStep,
"CreatePipelineRecipe": CreatePipelineRecipe,
"DDtoBQRecipe": DDtoBQRecipe,
"DeployPipelineAirflowRecipe": DeployPipelineAirflowRecipe,
"AddTransformRecipe": AddTransformRecipe,
"DDtoBQRecipe": DDtoBQRecipe
}
class StepsOnStartupStep(Step):
hide: bool = True
async def describe(self, models: Models):
return "Running steps on startup"
async def run(self, sdk: ContinueSDK):
steps_descriptions = (await sdk.get_config()).steps_on_startup
for step_name, step_params in steps_descriptions.items():
try:
step = step_name_to_step_class[step_name](**step_params)
except:
print(
f"Incorrect parameters for step {step_name}. Parameters provided were: {step_params}")
continue
await sdk.run_step(step)
|