blob: 32eca1d1aea2fa777670f12d25f17448c3ab9753 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
from ..core.main import ContinueSDK, Models, Step
from .main import UserInputStep
from ..recipes.CreatePipelineRecipe.main import CreatePipelineRecipe
from ..recipes.DDtoBQRecipe.main import DDtoBQRecipeRecipe
from ..recipes.DeployPipelineAirflowRecipe.main import DeployPipelineAirflowRecipe
from ..recipes.AddTransformRecipe.main import AddTransformRecipe
step_name_to_step_class = {
"UserInputStep": UserInputStep,
"CreatePipelineRecipe": CreatePipelineRecipe,
"DDtoBQRecipeRecipe": DDtoBQRecipeRecipe,
"DeployPipelineAirflowRecipe": DeployPipelineAirflowRecipe,
"AddTransformRecipe": AddTransformRecipe
}
class StepsOnStartupStep(Step):
hide: bool = True
async def describe(self, models: Models):
return "Running steps on startup"
async def run(self, sdk: ContinueSDK):
steps_descriptions = (await sdk.get_config()).steps_on_startup
for step_name, step_params in steps_descriptions.items():
try:
step = step_name_to_step_class[step_name](**step_params)
except:
print(
f"Incorrect parameters for step {step_name}. Parameters provided were: {step_params}")
continue
await sdk.run_step(step)
|