def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore
if exc_type is not None:
error_types = {Stage.deploy: DeployError, Stage.execute: ExecuteError}
raise error_types[self._mode](
f"Oops... failed during: {self._mode}"
) from exc_val
if len(self._project.workflows) == 0:
_ilog.info("Doing nothing no workflows...")
return
if self._mode == Stage.unittest:
# Mode is purely for testing purposes for the _Project internal class
_ilog.info("Running unit tests...")
return
if self._mode.value == Stage.deploy.value:
_ilog.info("Deploying changes... to %s", ctx.env)
if self.codegen_mechanism is None:
raise ValueError(
"codegen_mechanism cannot be None; please raise a github issue for this."
)
codegen = self.codegen_mechanism(
project=self._project,
id_=f"{ctx.env}_{self.name}",
env=ctx.env,
**(self.codegen_kwargs or {}),
)
codegen.synth()
if self._mode.value == Stage.execute.value:
wf_id = ctx.get_parameter(
BrickflowInternalVariables.workflow_id.value,
self.debug_execute_workflow,
)
t_id = ctx.get_parameter(
BrickflowInternalVariables.task_id.value, self.debug_execute_task
)
if wf_id is None or t_id is None:
_ilog.info(
"No workflow id or task key was able to found; doing nothing..."
)
return
workflow = self._project.get_workflow(wf_id)
task = workflow.get_task(t_id)
task.execute()