blob: b1376e8a71a09ebecc2aa29d25fab83a2a11eb4c (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
from ..core.main import ContinueSDK, Models, Step
from .main import UserInputStep
from ..recipes.CreatePipelineRecipe.main import CreatePipelineRecipe
from ..recipes.DeployPipelineAirflowRecipe.main import DeployPipelineAirflowRecipe
step_name_to_step_class = {
"UserInputStep": UserInputStep,
"CreatePipelineRecipe": CreatePipelineRecipe,
"DeployPipelineAirflowRecipe": DeployPipelineAirflowRecipe
}
class StepsOnStartupStep(Step):
hide: bool = True
async def describe(self, models: Models):
return "Running steps on startup"
async def run(self, sdk: ContinueSDK):
steps_descriptions = (await sdk.get_config()).steps_on_startup
for step_name, step_params in steps_descriptions.items():
try:
step = step_name_to_step_class[step_name](**step_params)
except:
print(
f"Incorrect parameters for step {step_name}. Parameters provided were: {step_params}")
continue
await sdk.run_step(step)
|